fix: change order keys for clickhouse tables

* wip

* rename

* fix: minor things before merging new order keys

* fix: add maintenance mode

* fix: update order by for session and events

* fix: remove properties from sessions and final migration test

* fix: set end date on migrations

* fix: comments
This commit is contained in:
Carl-Gerhard Lindesvärd
2025-12-16 12:48:51 +01:00
committed by GitHub
parent 3b61b28290
commit d7c6e88adc
18 changed files with 463 additions and 46 deletions

View File

@@ -1,6 +1,5 @@
import { type Redis, getRedisCache } from '@openpanel/redis';
import { toDots } from '@openpanel/common';
import { getSafeJson } from '@openpanel/json';
import { assocPath, clone } from 'ramda';
import { TABLE_NAMES, ch } from '../clickhouse/client';
@@ -91,10 +90,6 @@ export class SessionBuffer extends BaseBuffer {
session: newSession,
});
}
newSession.properties = toDots({
...(event.properties || {}),
...(newSession.properties || {}),
});
const addedRevenue = event.name === 'revenue' ? (event.revenue ?? 0) : 0;
newSession.revenue = (newSession.revenue ?? 0) + addedRevenue;
@@ -168,7 +163,6 @@ export class SessionBuffer extends BaseBuffer {
: '',
sign: 1,
version: 1,
properties: toDots(event.properties || {}),
},
];
}

View File

@@ -217,6 +217,7 @@ export function moveDataBetweenTables({
from,
to,
batch,
columns,
}: {
from: string;
to: string;
@@ -227,11 +228,15 @@ export function moveDataBetweenTables({
endDate?: Date;
startDate?: Date;
};
columns?: string[];
}): string[] {
const sqls: string[] = [];
// Build the SELECT clause
const selectClause = columns && columns.length > 0 ? columns.join(', ') : '*';
if (!batch) {
return [`INSERT INTO ${to} SELECT * FROM ${from}`];
return [`INSERT INTO ${to} SELECT ${selectClause} FROM ${from}`];
}
// Start from today and go back 3 years
@@ -247,32 +252,109 @@ export function moveDataBetweenTables({
let currentDate = endDate;
const interval = batch.interval || 'day';
while (currentDate > startDate) {
// Helper function to get the start of the week (Monday) for a given date
const getWeekStart = (date: Date): Date => {
const d = new Date(date);
const day = d.getDay();
const diff = d.getDate() - day + (day === 0 ? -6 : 1); // Adjust to Monday
d.setDate(diff);
d.setHours(0, 0, 0, 0); // Normalize to start of day
return d;
};
// Helper function to compare dates based on interval
const shouldContinue = (
current: Date,
start: Date,
intervalType: string,
): boolean => {
if (intervalType === 'month') {
// For months, compare by year and month
// Continue if current month is >= start month
const currentYear = current.getFullYear();
const currentMonth = current.getMonth();
const startYear = start.getFullYear();
const startMonth = start.getMonth();
return (
currentYear > startYear ||
(currentYear === startYear && currentMonth >= startMonth)
);
}
if (intervalType === 'week') {
// For weeks, compare by week start dates
const currentWeekStart = getWeekStart(current);
const startWeekStart = getWeekStart(start);
return currentWeekStart >= startWeekStart;
}
return current > start;
};
while (shouldContinue(currentDate, startDate, interval)) {
const previousDate = new Date(currentDate);
switch (interval) {
case 'month':
previousDate.setMonth(previousDate.getMonth() - 1);
break;
case 'week':
previousDate.setDate(previousDate.getDate() - 7);
// Ensure we don't go below startDate
if (previousDate < startDate) {
previousDate.setTime(startDate.getTime());
// If we've gone below startDate's month, adjust to start of startDate's month
// This ensures we generate SQL for the month containing startDate
if (
previousDate.getFullYear() < startDate.getFullYear() ||
(previousDate.getFullYear() === startDate.getFullYear() &&
previousDate.getMonth() < startDate.getMonth())
) {
previousDate.setFullYear(startDate.getFullYear());
previousDate.setMonth(startDate.getMonth());
previousDate.setDate(1);
}
break;
case 'week': {
previousDate.setDate(previousDate.getDate() - 7);
// If we've gone below startDate's week, adjust to start of startDate's week
const startWeekStart = getWeekStart(startDate);
const prevWeekStart = getWeekStart(previousDate);
if (prevWeekStart < startWeekStart) {
previousDate.setTime(startWeekStart.getTime());
}
break;
}
// day
default:
previousDate.setDate(previousDate.getDate() - 1);
break;
}
// For monthly/weekly intervals with transform, upperBoundDate should be currentDate
// because currentDate already represents the start of the period we're processing
// The WHERE clause uses > previousDate AND <= currentDate to get exactly one period
let upperBoundDate = currentDate;
// Don't exceed the endDate
if (upperBoundDate > endDate) {
upperBoundDate = endDate;
}
const sql = `INSERT INTO ${to}
SELECT * FROM ${from}
SELECT ${selectClause} FROM ${from}
WHERE ${batch.column} > '${batch.transform ? batch.transform(previousDate) : formatClickhouseDate(previousDate, true)}'
AND ${batch.column} <= '${batch.transform ? batch.transform(currentDate) : formatClickhouseDate(currentDate, true)}'`;
AND ${batch.column} <= '${batch.transform ? batch.transform(upperBoundDate) : formatClickhouseDate(upperBoundDate, true)}'`;
sqls.push(sql);
// For monthly/weekly intervals, stop if we've reached the start period
if (interval === 'month') {
const prevYear = previousDate.getFullYear();
const prevMonth = previousDate.getMonth();
const startYear = startDate.getFullYear();
const startMonth = startDate.getMonth();
if (prevYear === startYear && prevMonth === startMonth) {
break;
}
} else if (interval === 'week') {
const prevWeekStart = getWeekStart(previousDate);
const startWeekStart = getWeekStart(startDate);
if (prevWeekStart.getTime() === startWeekStart.getTime()) {
break;
}
}
currentDate = previousDate;
}

View File

@@ -131,7 +131,6 @@ export function transformSessionToEvent(
duration: 0,
revenue: session.revenue,
properties: {
...session.properties,
is_bounce: session.is_bounce,
__query: {
utm_medium: session.utm_medium,
@@ -631,8 +630,7 @@ export async function getEventList(options: GetEventListOptions) {
}
}
sb.orderBy.created_at =
'toDate(created_at) DESC, created_at DESC, profile_id DESC, name DESC';
sb.orderBy.created_at = 'created_at DESC';
if (custom) {
custom(sb);

View File

@@ -1,6 +1,5 @@
import { cacheable } from '@openpanel/redis';
import type { IChartEventFilter } from '@openpanel/validation';
import { uniq } from 'ramda';
import sqlstring from 'sqlstring';
import {
TABLE_NAMES,
@@ -53,7 +52,6 @@ export type IClickhouseSession = {
revenue: number;
sign: 1 | 0;
version: number;
properties: Record<string, string>;
};
export interface IServiceSession {
@@ -92,7 +90,6 @@ export interface IServiceSession {
utmContent: string;
utmTerm: string;
revenue: number;
properties: Record<string, string>;
profile?: IServiceProfile;
}
@@ -144,7 +141,6 @@ export function transformSession(session: IClickhouseSession): IServiceSession {
utmContent: session.utm_content,
utmTerm: session.utm_term,
revenue: session.revenue,
properties: session.properties,
profile: undefined,
};
}
@@ -200,12 +196,13 @@ export async function getSessionList({
if (cursor) {
const cAt = sqlstring.escape(cursor.createdAt);
// TODO: remove id from cursor
const cId = sqlstring.escape(cursor.id);
sb.where.cursor = `(created_at < toDateTime64(${cAt}, 3) OR (created_at = toDateTime64(${cAt}, 3) AND id < ${cId}))`;
sb.where.cursor = `created_at < toDateTime64(${cAt}, 3)`;
sb.where.cursorWindow = `created_at >= toDateTime64(${cAt}, 3) - INTERVAL ${dateIntervalInDays} DAY`;
sb.orderBy.created_at = 'toDate(created_at) DESC, created_at DESC, id DESC';
sb.orderBy.created_at = 'created_at DESC';
} else {
sb.orderBy.created_at = 'toDate(created_at) DESC, created_at DESC, id DESC';
sb.orderBy.created_at = 'created_at DESC';
sb.where.created_at = `created_at > now() - INTERVAL ${dateIntervalInDays} DAY`;
}