fix(worker): better deletion of project

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-04-01 10:39:33 +02:00
parent 58c4a6a741
commit d38ccb4717
4 changed files with 88 additions and 23 deletions

View File

@@ -1,9 +1,10 @@
import { logger } from '@/utils/logger';
import { generateSalt } from '@openpanel/common/server';
import { TABLE_NAMES, ch, chQuery, db } from '@openpanel/db';
import { TABLE_NAMES, ch, db } from '@openpanel/db';
import type { CronQueuePayload } from '@openpanel/queue';
import type { Job } from 'bullmq';
import { escape } from 'sqlstring';
export async function deleteProjects() {
export async function deleteProjects(job: Job<CronQueuePayload>) {
const projects = await db.project.findMany({
where: {
deleteAt: {
@@ -24,16 +25,33 @@ export async function deleteProjects() {
});
}
if (process.env.SELF_HOSTED) {
logger.info('Deleting projects', {
projects,
});
projects.forEach((project) => {
job.log(`Delete project: "${project.id}"`);
});
const where = `project_id IN (${projects.map((project) => escape(project.id)).join(',')})`;
const tables = [
TABLE_NAMES.events,
TABLE_NAMES.profiles,
TABLE_NAMES.events_bots,
TABLE_NAMES.sessions,
TABLE_NAMES.cohort_events_mv,
TABLE_NAMES.dau_mv,
TABLE_NAMES.event_names_mv,
TABLE_NAMES.event_property_values_mv,
];
for (const table of tables) {
const query = process.env.SELF_HOSTED
? `ALTER TABLE ${table} DELETE WHERE ${where};`
: `ALTER TABLE ${table}_replicated ON CLUSTER '{cluster}' DELETE WHERE ${where};`;
await ch.command({
query: `DELETE FROM ${TABLE_NAMES.events} WHERE project_id IN (${projects.map((project) => escape(project.id)).join(',')});`,
clickhouse_settings: {
lightweight_deletes_sync: 0,
},
});
} else {
await ch.command({
query: `DELETE FROM ${TABLE_NAMES.events}_replicated ON CLUSTER '{cluster}' WHERE project_id IN (${projects.map((project) => escape(project.id)).join(',')});`,
query,
clickhouse_settings: {
lightweight_deletes_sync: 0,
},

View File

@@ -25,7 +25,7 @@ export async function cronJob(job: Job<CronQueuePayload>) {
return await ping();
}
case 'deleteProjects': {
return await deleteProjects();
return await deleteProjects(job);
}
}
}