import { Queue, QueueEvents } from 'bullmq'; import type { IServiceEvent, Notification } from '@openpanel/db'; import { getRedisQueue } from '@openpanel/redis'; import type { TrackPayload } from '@openpanel/sdk'; export interface EventsQueuePayloadIncomingEvent { type: 'incomingEvent'; payload: { projectId: string; event: TrackPayload & { timestamp: string; isTimestampFromThePast: boolean; }; geo: { country: string | undefined; city: string | undefined; region: string | undefined; longitude: number | undefined; latitude: number | undefined; }; headers: Record; currentDeviceId: string; previousDeviceId: string; priority: boolean; }; } export interface EventsQueuePayloadCreateEvent { type: 'createEvent'; payload: Omit; } type SessionEndRequired = 'sessionId' | 'deviceId' | 'profileId' | 'projectId'; export interface EventsQueuePayloadCreateSessionEnd { type: 'createSessionEnd'; payload: Partial> & Pick; } // TODO: Rename `EventsQueuePayloadCreateSessionEnd` export type SessionsQueuePayload = EventsQueuePayloadCreateSessionEnd; export type EventsQueuePayload = | EventsQueuePayloadCreateEvent | EventsQueuePayloadCreateSessionEnd | EventsQueuePayloadIncomingEvent; export type CronQueuePayloadSalt = { type: 'salt'; payload: undefined; }; export type CronQueuePayloadFlushEvents = { type: 'flushEvents'; payload: undefined; }; export type CronQueuePayloadFlushProfiles = { type: 'flushProfiles'; payload: undefined; }; export type CronQueuePayloadPing = { type: 'ping'; payload: undefined; }; export type CronQueuePayload = | CronQueuePayloadSalt | CronQueuePayloadFlushEvents | CronQueuePayloadFlushProfiles | CronQueuePayloadPing; export type CronQueueType = CronQueuePayload['type']; export const eventsQueue = new Queue('events', { connection: getRedisQueue(), defaultJobOptions: { removeOnComplete: 10, attempts: 3, backoff: { type: 'exponential', delay: 1000, }, }, }); export const sessionsQueue = new Queue('sessions', { connection: getRedisQueue(), defaultJobOptions: { removeOnComplete: 10, }, }); export const sessionsQueueEvents = new QueueEvents('sessions', { connection: getRedisQueue(), }); export const cronQueue = new Queue('cron', { connection: getRedisQueue(), defaultJobOptions: { removeOnComplete: 10, }, }); export type NotificationQueuePayload = { type: 'sendNotification'; payload: { notification: Notification; }; }; export const notificationQueue = new Queue( 'notification', { connection: getRedisQueue(), defaultJobOptions: { removeOnComplete: 10, }, }, );