add: admin cli

This commit is contained in:
Carl-Gerhard Lindesvärd
2025-11-12 14:51:08 +01:00
parent 9cafd61b25
commit 723ba3ef6c
18 changed files with 1641 additions and 47 deletions

View File

@@ -1,10 +1,9 @@
import { logger } from '@/utils/logger';
import { TABLE_NAMES, ch, db, getReplicatedTableName } from '@openpanel/db';
import { db, deleteFromClickhouse, deleteProjects } from '@openpanel/db';
import type { CronQueuePayload } from '@openpanel/queue';
import type { Job } from 'bullmq';
import sqlstring from 'sqlstring';
export async function deleteProjects(job: Job<CronQueuePayload>) {
export async function jobdeleteProjects(job: Job<CronQueuePayload>) {
const projects = await db.project.findMany({
where: {
deleteAt: {
@@ -17,44 +16,13 @@ export async function deleteProjects(job: Job<CronQueuePayload>) {
return;
}
for (const project of projects) {
await db.project.delete({
where: {
id: project.id,
},
});
}
await deleteProjects(projects.map((project) => project.id));
logger.info('Deleting projects', {
projects,
});
projects.forEach((project) => {
job.log(`Delete project: "${project.id}"`);
});
const where = `project_id IN (${projects.map((project) => sqlstring.escape(project.id)).join(',')})`;
const tables = [
TABLE_NAMES.events,
TABLE_NAMES.profiles,
TABLE_NAMES.events_bots,
TABLE_NAMES.sessions,
TABLE_NAMES.cohort_events_mv,
TABLE_NAMES.dau_mv,
TABLE_NAMES.event_names_mv,
TABLE_NAMES.event_property_values_mv,
];
for (const table of tables) {
const query = `ALTER TABLE ${getReplicatedTableName(table)} DELETE WHERE ${where};`;
await ch.command({
query,
clickhouse_settings: {
lightweight_deletes_sync: '0',
},
});
}
await deleteFromClickhouse(projects.map((project) => project.id));
logger.info(`Deleted ${projects.length} projects`, {
projects,

View File

@@ -3,7 +3,7 @@ import type { Job } from 'bullmq';
import { eventBuffer, profileBuffer, sessionBuffer } from '@openpanel/db';
import type { CronQueuePayload } from '@openpanel/queue';
import { deleteProjects } from './cron.delete-projects';
import { jobdeleteProjects } from './cron.delete-projects';
import { ping } from './cron.ping';
import { salt } from './cron.salt';
@@ -25,7 +25,7 @@ export async function cronJob(job: Job<CronQueuePayload>) {
return await ping();
}
case 'deleteProjects': {
return await deleteProjects(job);
return await jobdeleteProjects(job);
}
}
}