diff --git a/apps/worker/src/jobs/cron.delete-projects.ts b/apps/worker/src/jobs/cron.delete-projects.ts index eab4f84c..9b675b1e 100644 --- a/apps/worker/src/jobs/cron.delete-projects.ts +++ b/apps/worker/src/jobs/cron.delete-projects.ts @@ -1,5 +1,5 @@ import { logger } from '@/utils/logger'; -import { TABLE_NAMES, ch, db } from '@openpanel/db'; +import { TABLE_NAMES, ch, db, getReplicatedTableName } from '@openpanel/db'; import type { CronQueuePayload } from '@openpanel/queue'; import type { Job } from 'bullmq'; import sqlstring from 'sqlstring'; @@ -46,10 +46,7 @@ export async function deleteProjects(job: Job) { ]; for (const table of tables) { - const query = - process.env.SELF_HOSTED === 'true' - ? `ALTER TABLE ${table} DELETE WHERE ${where};` - : `ALTER TABLE ${table}_replicated ON CLUSTER '{cluster}' DELETE WHERE ${where};`; + const query = `ALTER TABLE ${getReplicatedTableName(table)} DELETE WHERE ${where};`; await ch.command({ query, diff --git a/packages/db/src/clickhouse/client.ts b/packages/db/src/clickhouse/client.ts index 373a6573..3c79f9e5 100644 --- a/packages/db/src/clickhouse/client.ts +++ b/packages/db/src/clickhouse/client.ts @@ -57,6 +57,29 @@ export const TABLE_NAMES = { events_imports: 'events_imports', }; +/** + * Check if ClickHouse is running in clustered mode + * Clustered mode = production (not self-hosted) + * Non-clustered mode = self-hosted environments + */ +export function isClickhouseClustered(): boolean { + return !( + process.env.SELF_HOSTED === 'true' || process.env.SELF_HOSTED === '1' + ); +} + +/** + * Get the replicated table name for mutations + * In clustered mode, returns table_name_replicated + * In non-clustered mode, returns the original table name + */ +export function getReplicatedTableName(tableName: string): string { + if (isClickhouseClustered()) { + return `${tableName}_replicated ON CLUSTER '{cluster}'`; + } + return tableName; +} + export const CLICKHOUSE_OPTIONS: NodeClickHouseClientConfigOptions = { max_open_connections: 30, request_timeout: 300000, diff --git a/packages/db/src/services/import.service.ts b/packages/db/src/services/import.service.ts index 6d0b8f55..84caf10b 100644 --- a/packages/db/src/services/import.service.ts +++ b/packages/db/src/services/import.service.ts @@ -6,6 +6,7 @@ import { chInsertCSV, convertClickhouseDateToJs, formatClickhouseDate, + getReplicatedTableName, } from '../clickhouse/client'; import { csvEscapeField, csvEscapeJson } from '../clickhouse/csv'; import { type Prisma, db } from '../prisma-client'; @@ -107,8 +108,10 @@ export async function generateSessionIds( // Use SQL to generate deterministic session IDs based on device_id + 30-min time windows // This ensures same events always get same session IDs regardless of import order + // In clustered mode, we must use the replicated table for mutations + const mutationTableName = getReplicatedTableName(TABLE_NAMES.events_imports); const updateQuery = ` - ALTER TABLE ${TABLE_NAMES.events_imports} + ALTER TABLE ${mutationTableName} UPDATE session_id = lower(hex(MD5(concat( device_id, '-', @@ -572,8 +575,10 @@ export async function backfillSessionsToProduction( * Mark import as complete by updating status */ export async function markImportComplete(importId: string): Promise { + // In clustered mode, we must use the replicated table for mutations + const mutationTableName = getReplicatedTableName(TABLE_NAMES.events_imports); const updateQuery = ` - ALTER TABLE ${TABLE_NAMES.events_imports} + ALTER TABLE ${mutationTableName} UPDATE import_status = 'processed' WHERE import_id = {importId:String} `;