import { getRedisCache } from '@openpanel/redis'; export const DELETE = '__DELETE__'; export type QueueItem = { event: T; index: number; }; export type OnInsert = (data: T) => unknown; export type OnCompleted = | ((data: T[]) => Promise) | ((data: T[]) => unknown[]); export type ProcessQueue = (data: QueueItem[]) => Promise; export type Find = ( callback: (item: QueueItem) => boolean ) => Promise; export type FindMany = ( callback: (item: QueueItem) => boolean ) => Promise; const getError = (e: unknown) => { if (e instanceof Error) { return [ 'Name: ' + e.name, 'Message: ' + e.message, 'Stack: ' + e.stack, 'Cause: ' + (e.cause ? String(e.cause) : ''), ].join('\n'); } return 'Unknown error'; }; export abstract class RedisBuffer { // constructor public prefix = 'op:buffer'; public table: string; public batchSize?: number; // abstract methods public abstract onInsert?: OnInsert; public abstract onCompleted?: OnCompleted; public abstract processQueue: ProcessQueue; public abstract find: Find; public abstract findMany: FindMany; constructor(options: { table: string; batchSize?: number }) { this.table = options.table; this.batchSize = options.batchSize; } public getKey(name?: string) { const key = this.prefix + ':' + this.table; if (name) { return `${key}:${name}`; } return key; } public async insert(value: T) { this.onInsert?.(value); await getRedisCache().rpush(this.getKey(), JSON.stringify(value)); const length = await getRedisCache().llen(this.getKey()); if (this.batchSize && length >= this.batchSize) { this.flush(); } } public async flush() { try { const queue = await this.getQueue(this.batchSize || -1); if (queue.length === 0) { return { count: 0, data: [], }; } try { const indexes = await this.processQueue(queue); await this.deleteIndexes(indexes); const data = indexes .map((index) => queue[index]?.event) .filter((event): event is T => event !== null); if (this.onCompleted) { const res = await this.onCompleted(data); return { count: res.length, data: res, }; } return { count: indexes.length, data: indexes, }; } catch (e) { console.log( `[${this.getKey()}] Failed to processQueue while flushing:`, e ); const timestamp = new Date().getTime(); await getRedisCache().hset(this.getKey(`failed:${timestamp}`), { error: getError(e), data: JSON.stringify(queue.map((item) => item.event)), retries: 0, }); } } catch (e) { console.log(`[${this.getKey()}] Failed to getQueue while flushing:`, e); } } public async deleteIndexes(indexes: number[]) { const multi = getRedisCache().multi(); indexes.forEach((index) => { multi.lset(this.getKey(), index, DELETE); }); multi.lrem(this.getKey(), 0, DELETE); await multi.exec(); } public async getQueue(limit: number): Promise[]> { const queue = await getRedisCache().lrange(this.getKey(), 0, limit); return queue .map((item, index) => ({ event: this.transformQueueItem(item), index, })) .filter((item): item is QueueItem => item.event !== null); } private transformQueueItem(item: string): T | null { try { return JSON.parse(item); } catch (e) { return null; } } }