fix: add getReplicatedTableName
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
import { logger } from '@/utils/logger';
|
import { logger } from '@/utils/logger';
|
||||||
import { TABLE_NAMES, ch, db } from '@openpanel/db';
|
import { TABLE_NAMES, ch, db, getReplicatedTableName } from '@openpanel/db';
|
||||||
import type { CronQueuePayload } from '@openpanel/queue';
|
import type { CronQueuePayload } from '@openpanel/queue';
|
||||||
import type { Job } from 'bullmq';
|
import type { Job } from 'bullmq';
|
||||||
import sqlstring from 'sqlstring';
|
import sqlstring from 'sqlstring';
|
||||||
@@ -46,10 +46,7 @@ export async function deleteProjects(job: Job<CronQueuePayload>) {
|
|||||||
];
|
];
|
||||||
|
|
||||||
for (const table of tables) {
|
for (const table of tables) {
|
||||||
const query =
|
const query = `ALTER TABLE ${getReplicatedTableName(table)} DELETE WHERE ${where};`;
|
||||||
process.env.SELF_HOSTED === 'true'
|
|
||||||
? `ALTER TABLE ${table} DELETE WHERE ${where};`
|
|
||||||
: `ALTER TABLE ${table}_replicated ON CLUSTER '{cluster}' DELETE WHERE ${where};`;
|
|
||||||
|
|
||||||
await ch.command({
|
await ch.command({
|
||||||
query,
|
query,
|
||||||
|
|||||||
@@ -57,6 +57,29 @@ export const TABLE_NAMES = {
|
|||||||
events_imports: 'events_imports',
|
events_imports: 'events_imports',
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if ClickHouse is running in clustered mode
|
||||||
|
* Clustered mode = production (not self-hosted)
|
||||||
|
* Non-clustered mode = self-hosted environments
|
||||||
|
*/
|
||||||
|
export function isClickhouseClustered(): boolean {
|
||||||
|
return !(
|
||||||
|
process.env.SELF_HOSTED === 'true' || process.env.SELF_HOSTED === '1'
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the replicated table name for mutations
|
||||||
|
* In clustered mode, returns table_name_replicated
|
||||||
|
* In non-clustered mode, returns the original table name
|
||||||
|
*/
|
||||||
|
export function getReplicatedTableName(tableName: string): string {
|
||||||
|
if (isClickhouseClustered()) {
|
||||||
|
return `${tableName}_replicated ON CLUSTER '{cluster}'`;
|
||||||
|
}
|
||||||
|
return tableName;
|
||||||
|
}
|
||||||
|
|
||||||
export const CLICKHOUSE_OPTIONS: NodeClickHouseClientConfigOptions = {
|
export const CLICKHOUSE_OPTIONS: NodeClickHouseClientConfigOptions = {
|
||||||
max_open_connections: 30,
|
max_open_connections: 30,
|
||||||
request_timeout: 300000,
|
request_timeout: 300000,
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import {
|
|||||||
chInsertCSV,
|
chInsertCSV,
|
||||||
convertClickhouseDateToJs,
|
convertClickhouseDateToJs,
|
||||||
formatClickhouseDate,
|
formatClickhouseDate,
|
||||||
|
getReplicatedTableName,
|
||||||
} from '../clickhouse/client';
|
} from '../clickhouse/client';
|
||||||
import { csvEscapeField, csvEscapeJson } from '../clickhouse/csv';
|
import { csvEscapeField, csvEscapeJson } from '../clickhouse/csv';
|
||||||
import { type Prisma, db } from '../prisma-client';
|
import { type Prisma, db } from '../prisma-client';
|
||||||
@@ -107,8 +108,10 @@ export async function generateSessionIds(
|
|||||||
|
|
||||||
// Use SQL to generate deterministic session IDs based on device_id + 30-min time windows
|
// Use SQL to generate deterministic session IDs based on device_id + 30-min time windows
|
||||||
// This ensures same events always get same session IDs regardless of import order
|
// This ensures same events always get same session IDs regardless of import order
|
||||||
|
// In clustered mode, we must use the replicated table for mutations
|
||||||
|
const mutationTableName = getReplicatedTableName(TABLE_NAMES.events_imports);
|
||||||
const updateQuery = `
|
const updateQuery = `
|
||||||
ALTER TABLE ${TABLE_NAMES.events_imports}
|
ALTER TABLE ${mutationTableName}
|
||||||
UPDATE session_id = lower(hex(MD5(concat(
|
UPDATE session_id = lower(hex(MD5(concat(
|
||||||
device_id,
|
device_id,
|
||||||
'-',
|
'-',
|
||||||
@@ -572,8 +575,10 @@ export async function backfillSessionsToProduction(
|
|||||||
* Mark import as complete by updating status
|
* Mark import as complete by updating status
|
||||||
*/
|
*/
|
||||||
export async function markImportComplete(importId: string): Promise<void> {
|
export async function markImportComplete(importId: string): Promise<void> {
|
||||||
|
// In clustered mode, we must use the replicated table for mutations
|
||||||
|
const mutationTableName = getReplicatedTableName(TABLE_NAMES.events_imports);
|
||||||
const updateQuery = `
|
const updateQuery = `
|
||||||
ALTER TABLE ${TABLE_NAMES.events_imports}
|
ALTER TABLE ${mutationTableName}
|
||||||
UPDATE import_status = 'processed'
|
UPDATE import_status = 'processed'
|
||||||
WHERE import_id = {importId:String}
|
WHERE import_id = {importId:String}
|
||||||
`;
|
`;
|
||||||
|
|||||||
Reference in New Issue
Block a user