fix: remove properties from sessions and final migration test
This commit is contained in:
@@ -117,7 +117,7 @@ export async function bootWorkers() {
|
||||
const worker = new GroupWorker<EventsQueuePayloadIncomingEvent['payload']>({
|
||||
queue,
|
||||
concurrency,
|
||||
logger: queueLogger,
|
||||
logger: process.env.NODE_ENV === 'production' ? queueLogger : undefined,
|
||||
blockingTimeoutSec: Number.parseFloat(
|
||||
process.env.EVENT_BLOCKING_TIMEOUT_SEC || '1',
|
||||
),
|
||||
|
||||
@@ -96,7 +96,6 @@ export async function createSessionEnd(
|
||||
...payload,
|
||||
properties: {
|
||||
...payload.properties,
|
||||
...(session?.properties ?? {}),
|
||||
__bounce: session.is_bounce,
|
||||
},
|
||||
name: 'session_end',
|
||||
|
||||
@@ -319,7 +319,6 @@ describe('incomingEvent', () => {
|
||||
utm_content: '',
|
||||
utm_medium: '',
|
||||
revenue: 0,
|
||||
properties: {},
|
||||
project_id: projectId,
|
||||
device_id: 'last-device-123',
|
||||
profile_id: 'profile-123',
|
||||
|
||||
@@ -115,7 +115,6 @@ export async function up() {
|
||||
'`referrer_type` LowCardinality(String)',
|
||||
'`sign` Int8',
|
||||
'`version` UInt64',
|
||||
'`properties` Map(String, String) CODEC(ZSTD(3))',
|
||||
],
|
||||
// New ORDER BY: project_id, toDate(created_at), created_at, id
|
||||
// Removed profile_id, reordered to match query patterns (date first, then id)
|
||||
@@ -175,6 +174,45 @@ export async function up() {
|
||||
...moveDataBetweenTables({
|
||||
from: 'sessions',
|
||||
to: 'sessions_new_20251123',
|
||||
columns: [
|
||||
'id',
|
||||
'project_id',
|
||||
'profile_id',
|
||||
'device_id',
|
||||
'created_at',
|
||||
'ended_at',
|
||||
'is_bounce',
|
||||
'entry_origin',
|
||||
'entry_path',
|
||||
'exit_origin',
|
||||
'exit_path',
|
||||
'screen_view_count',
|
||||
'revenue',
|
||||
'event_count',
|
||||
'duration',
|
||||
'country',
|
||||
'region',
|
||||
'city',
|
||||
'longitude',
|
||||
'latitude',
|
||||
'device',
|
||||
'brand',
|
||||
'model',
|
||||
'browser',
|
||||
'browser_version',
|
||||
'os',
|
||||
'os_version',
|
||||
'utm_medium',
|
||||
'utm_source',
|
||||
'utm_campaign',
|
||||
'utm_content',
|
||||
'utm_term',
|
||||
'referrer',
|
||||
'referrer_name',
|
||||
'referrer_type',
|
||||
'sign',
|
||||
'version',
|
||||
],
|
||||
batch: {
|
||||
startDate: firstSessionDate,
|
||||
column: 'toDate(created_at)',
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { type Redis, getRedisCache } from '@openpanel/redis';
|
||||
|
||||
import { toDots } from '@openpanel/common';
|
||||
import { getSafeJson } from '@openpanel/json';
|
||||
import { assocPath, clone } from 'ramda';
|
||||
import { TABLE_NAMES, ch } from '../clickhouse/client';
|
||||
@@ -91,10 +90,6 @@ export class SessionBuffer extends BaseBuffer {
|
||||
session: newSession,
|
||||
});
|
||||
}
|
||||
newSession.properties = toDots({
|
||||
...(event.properties || {}),
|
||||
...(newSession.properties || {}),
|
||||
});
|
||||
|
||||
const addedRevenue = event.name === 'revenue' ? (event.revenue ?? 0) : 0;
|
||||
newSession.revenue = (newSession.revenue ?? 0) + addedRevenue;
|
||||
@@ -168,7 +163,6 @@ export class SessionBuffer extends BaseBuffer {
|
||||
: '',
|
||||
sign: 1,
|
||||
version: 1,
|
||||
properties: toDots(event.properties || {}),
|
||||
},
|
||||
];
|
||||
}
|
||||
|
||||
@@ -217,6 +217,7 @@ export function moveDataBetweenTables({
|
||||
from,
|
||||
to,
|
||||
batch,
|
||||
columns,
|
||||
}: {
|
||||
from: string;
|
||||
to: string;
|
||||
@@ -227,11 +228,15 @@ export function moveDataBetweenTables({
|
||||
endDate?: Date;
|
||||
startDate?: Date;
|
||||
};
|
||||
columns?: string[];
|
||||
}): string[] {
|
||||
const sqls: string[] = [];
|
||||
|
||||
// Build the SELECT clause
|
||||
const selectClause = columns && columns.length > 0 ? columns.join(', ') : '*';
|
||||
|
||||
if (!batch) {
|
||||
return [`INSERT INTO ${to} SELECT * FROM ${from}`];
|
||||
return [`INSERT INTO ${to} SELECT ${selectClause} FROM ${from}`];
|
||||
}
|
||||
|
||||
// Start from today and go back 3 years
|
||||
@@ -328,7 +333,7 @@ export function moveDataBetweenTables({
|
||||
}
|
||||
|
||||
const sql = `INSERT INTO ${to}
|
||||
SELECT * FROM ${from}
|
||||
SELECT ${selectClause} FROM ${from}
|
||||
WHERE ${batch.column} > '${batch.transform ? batch.transform(previousDate) : formatClickhouseDate(previousDate, true)}'
|
||||
AND ${batch.column} <= '${batch.transform ? batch.transform(upperBoundDate) : formatClickhouseDate(upperBoundDate, true)}'`;
|
||||
sqls.push(sql);
|
||||
|
||||
@@ -131,7 +131,6 @@ export function transformSessionToEvent(
|
||||
duration: 0,
|
||||
revenue: session.revenue,
|
||||
properties: {
|
||||
...session.properties,
|
||||
is_bounce: session.is_bounce,
|
||||
__query: {
|
||||
utm_medium: session.utm_medium,
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
import { cacheable } from '@openpanel/redis';
|
||||
import type { IChartEventFilter } from '@openpanel/validation';
|
||||
import { uniq } from 'ramda';
|
||||
import sqlstring from 'sqlstring';
|
||||
import {
|
||||
TABLE_NAMES,
|
||||
@@ -53,7 +52,6 @@ export type IClickhouseSession = {
|
||||
revenue: number;
|
||||
sign: 1 | 0;
|
||||
version: number;
|
||||
properties: Record<string, string>;
|
||||
};
|
||||
|
||||
export interface IServiceSession {
|
||||
@@ -92,7 +90,6 @@ export interface IServiceSession {
|
||||
utmContent: string;
|
||||
utmTerm: string;
|
||||
revenue: number;
|
||||
properties: Record<string, string>;
|
||||
profile?: IServiceProfile;
|
||||
}
|
||||
|
||||
@@ -144,7 +141,6 @@ export function transformSession(session: IClickhouseSession): IServiceSession {
|
||||
utmContent: session.utm_content,
|
||||
utmTerm: session.utm_term,
|
||||
revenue: session.revenue,
|
||||
properties: session.properties,
|
||||
profile: undefined,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -146,7 +146,7 @@ export const eventsGroupQueues = Array.from({
|
||||
}).map(
|
||||
(_, index, list) =>
|
||||
new GroupQueue<EventsQueuePayloadIncomingEvent['payload']>({
|
||||
logger: queueLogger,
|
||||
logger: process.env.NODE_ENV === 'production' ? queueLogger : undefined,
|
||||
namespace: getQueueName(
|
||||
list.length === 1 ? 'group_events' : `group_events_${index}`,
|
||||
),
|
||||
|
||||
Reference in New Issue
Block a user