fix(session): negative duration #2
This commit is contained in:
@@ -191,14 +191,23 @@ export class SessionBuffer extends BaseBuffer {
|
||||
|
||||
if (events.length === 0) return;
|
||||
|
||||
const sessions = events.map((e) => getSafeJson<IClickhouseSession>(e));
|
||||
const sessions = events
|
||||
.map((e) => getSafeJson<IClickhouseSession>(e))
|
||||
.map((session) => {
|
||||
return {
|
||||
...session,
|
||||
duration: Math.max(0, session?.duration || 0),
|
||||
};
|
||||
});
|
||||
|
||||
// Insert to ClickHouse
|
||||
await ch.insert({
|
||||
table: TABLE_NAMES.sessions,
|
||||
values: sessions,
|
||||
format: 'JSONEachRow',
|
||||
});
|
||||
for (const chunk of this.chunks(sessions, 1000)) {
|
||||
// Insert to ClickHouse
|
||||
await ch.insert({
|
||||
table: TABLE_NAMES.sessions,
|
||||
values: chunk,
|
||||
format: 'JSONEachRow',
|
||||
});
|
||||
}
|
||||
|
||||
// Only remove events after successful insert
|
||||
await this.redis.ltrim(this.redisKey, events.length, -1);
|
||||
|
||||
Reference in New Issue
Block a user