feature(worker+api): improve buffer
This commit is contained in:
@@ -1,181 +1,219 @@
|
||||
import { v4 as uuidv4 } from 'uuid';
|
||||
|
||||
import type { ILogger } from '@openpanel/logger';
|
||||
import { createLogger } from '@openpanel/logger';
|
||||
import { getRedisCache } from '@openpanel/redis';
|
||||
|
||||
export const DELETE = '__DELETE__';
|
||||
|
||||
export type QueueItem<T> = {
|
||||
event: T;
|
||||
index: number;
|
||||
};
|
||||
|
||||
export type OnInsert<T> = (data: T) => unknown;
|
||||
|
||||
export type OnCompleted<T> =
|
||||
| ((data: T[]) => Promise<unknown[]>)
|
||||
| ((data: T[]) => unknown[]);
|
||||
|
||||
export type ProcessQueue<T> = (data: QueueItem<T>[]) => Promise<number[]>;
|
||||
|
||||
export type Find<T, R = unknown> = (
|
||||
callback: (item: QueueItem<T>) => boolean,
|
||||
callback: (item: T) => boolean,
|
||||
) => Promise<R | null>;
|
||||
|
||||
export type FindMany<T, R = unknown> = (
|
||||
callback: (item: QueueItem<T>) => boolean,
|
||||
callback: (item: T) => boolean,
|
||||
) => Promise<R[]>;
|
||||
|
||||
const getError = (e: unknown) => {
|
||||
if (e instanceof Error) {
|
||||
return [
|
||||
`Name: ${e.name}`,
|
||||
`Message: ${e.message}`,
|
||||
`Stack: ${e.stack}`,
|
||||
`Cause: ${e.cause ? String(e.cause) : ''}`,
|
||||
].join('\n');
|
||||
}
|
||||
return 'Unknown error';
|
||||
};
|
||||
export class RedisBuffer<T> {
|
||||
protected prefix = 'op:buffer';
|
||||
protected bufferKey: string;
|
||||
private lockKey: string;
|
||||
protected maxBufferSize: number | null;
|
||||
protected logger: ILogger;
|
||||
|
||||
export abstract class RedisBuffer<T> {
|
||||
// constructor
|
||||
public prefix = 'op:buffer';
|
||||
public table: string;
|
||||
public batchSize?: number;
|
||||
public logger: ReturnType<typeof createLogger>;
|
||||
public disableAutoFlush?: boolean;
|
||||
|
||||
// abstract methods
|
||||
public abstract onInsert?: OnInsert<T>;
|
||||
public abstract onCompleted?: OnCompleted<T>;
|
||||
public abstract processQueue: ProcessQueue<T>;
|
||||
public abstract find: Find<T, unknown>;
|
||||
public abstract findMany: FindMany<T, unknown>;
|
||||
|
||||
constructor(options: {
|
||||
table: string;
|
||||
batchSize?: number;
|
||||
disableAutoFlush?: boolean;
|
||||
}) {
|
||||
this.table = options.table;
|
||||
this.batchSize = options.batchSize;
|
||||
this.disableAutoFlush = options.disableAutoFlush;
|
||||
constructor(bufferName: string, maxBufferSize: number | null) {
|
||||
this.bufferKey = bufferName;
|
||||
this.lockKey = `lock:${bufferName}`;
|
||||
this.maxBufferSize = maxBufferSize;
|
||||
this.logger = createLogger({ name: 'buffer' }).child({
|
||||
table: this.table,
|
||||
buffer: bufferName,
|
||||
});
|
||||
}
|
||||
|
||||
public getKey(name?: string) {
|
||||
const key = `${this.prefix}:${this.table}`;
|
||||
protected getKey(name?: string) {
|
||||
const key = `${this.prefix}:${this.bufferKey}`;
|
||||
if (name) {
|
||||
return `${key}:${name}`;
|
||||
}
|
||||
return key;
|
||||
}
|
||||
|
||||
public async insert(value: T) {
|
||||
this.onInsert?.(value);
|
||||
await getRedisCache().rpush(this.getKey(), JSON.stringify(value));
|
||||
async add(item: T): Promise<void> {
|
||||
try {
|
||||
this.onAdd(item);
|
||||
await getRedisCache().rpush(this.getKey(), JSON.stringify(item));
|
||||
const bufferSize = await getRedisCache().llen(this.getKey());
|
||||
|
||||
const length = await getRedisCache().llen(this.getKey());
|
||||
this.logger.debug(
|
||||
`Inserted item into buffer ${this.table}. Current length: ${length}`,
|
||||
);
|
||||
this.logger.debug(`Item added. Current size: ${bufferSize}`);
|
||||
|
||||
if (!this.disableAutoFlush && this.batchSize && length >= this.batchSize) {
|
||||
this.logger.info(
|
||||
`Buffer ${this.table} reached batch size (${this.batchSize}). Flushing...`,
|
||||
);
|
||||
this.flush();
|
||||
if (this.maxBufferSize && bufferSize >= this.maxBufferSize) {
|
||||
await this.tryFlush();
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to add item to buffer', { error, item });
|
||||
}
|
||||
}
|
||||
|
||||
public async flush() {
|
||||
try {
|
||||
const queue = await this.getQueue(this.batchSize || -1);
|
||||
|
||||
if (queue.length === 0) {
|
||||
this.logger.debug(`Flush called on empty buffer ${this.table}`);
|
||||
return { count: 0, data: [] };
|
||||
}
|
||||
|
||||
this.logger.info(
|
||||
`Flushing ${queue.length} items from buffer ${this.table}`,
|
||||
);
|
||||
public async tryFlush(): Promise<void> {
|
||||
const lockId = uuidv4();
|
||||
const acquired = await getRedisCache().set(
|
||||
this.lockKey,
|
||||
lockId,
|
||||
'EX',
|
||||
8,
|
||||
'NX',
|
||||
);
|
||||
|
||||
if (acquired === 'OK') {
|
||||
this.logger.debug('Lock acquired. Attempting to flush.');
|
||||
try {
|
||||
const indexes = await this.processQueue(queue);
|
||||
await this.deleteIndexes(indexes);
|
||||
const data = indexes
|
||||
.map((index) => queue[index]?.event)
|
||||
.filter((event): event is T => event !== null);
|
||||
await this.flush();
|
||||
} finally {
|
||||
await this.releaseLock(lockId);
|
||||
}
|
||||
} else {
|
||||
this.logger.debug('Failed to acquire lock for. Skipping flush.');
|
||||
}
|
||||
}
|
||||
|
||||
if (this.onCompleted) {
|
||||
const res = await this.onCompleted(data);
|
||||
this.logger.info(
|
||||
`Completed processing ${res.length} items from buffer ${this.table}`,
|
||||
);
|
||||
return { count: res.length, data: res };
|
||||
}
|
||||
protected async waitForReleasedLock(
|
||||
maxWaitTime = 8000,
|
||||
checkInterval = 500,
|
||||
): Promise<boolean> {
|
||||
const startTime = performance.now();
|
||||
|
||||
this.logger.info(
|
||||
`Processed ${indexes.length} items from buffer ${this.table}`,
|
||||
);
|
||||
return { count: indexes.length, data: indexes };
|
||||
while (performance.now() - startTime < maxWaitTime) {
|
||||
const lock = await getRedisCache().get(this.lockKey);
|
||||
if (!lock) {
|
||||
return true;
|
||||
}
|
||||
|
||||
await new Promise((resolve) => setTimeout(resolve, checkInterval));
|
||||
}
|
||||
|
||||
this.logger.warn('Timeout waiting for lock release');
|
||||
return false;
|
||||
}
|
||||
|
||||
private async retryOnce(cb: () => Promise<void>) {
|
||||
try {
|
||||
await cb();
|
||||
} catch (e) {
|
||||
this.logger.error(`#1 Failed to execute callback: ${cb.name}`, e);
|
||||
await new Promise((resolve) => setTimeout(resolve, 1000));
|
||||
try {
|
||||
await cb();
|
||||
} catch (e) {
|
||||
this.logger.error(
|
||||
`Failed to process queue while flushing buffer ${this.table}:`,
|
||||
e,
|
||||
);
|
||||
const timestamp = new Date().getTime();
|
||||
await getRedisCache().hset(this.getKey(`failed:${timestamp}`), {
|
||||
error: getError(e),
|
||||
data: JSON.stringify(queue.map((item) => item.event)),
|
||||
retries: 0,
|
||||
});
|
||||
this.logger.warn(
|
||||
`Stored ${queue.length} failed items in ${this.getKey(`failed:${timestamp}`)}`,
|
||||
this.logger.error(`#2 Failed to execute callback: ${cb.name}`, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private async flush(): Promise<void> {
|
||||
// Use a transaction to ensure atomicity
|
||||
const result = await getRedisCache()
|
||||
.multi()
|
||||
.lrange(this.getKey(), 0, -1)
|
||||
.del(this.getKey())
|
||||
.exec();
|
||||
|
||||
if (!result) {
|
||||
throw new Error('Redis transaction failed');
|
||||
}
|
||||
|
||||
const lrange = result[0];
|
||||
|
||||
if (!lrange || lrange[0] instanceof Error) {
|
||||
throw new Error('Redis transaction failed');
|
||||
}
|
||||
|
||||
const items = lrange[1] as string[];
|
||||
|
||||
const parsedItems = items.map((item) => JSON.parse(item) as T);
|
||||
|
||||
if (parsedItems.length === 0) {
|
||||
this.logger.debug('No items to flush');
|
||||
return;
|
||||
}
|
||||
|
||||
this.logger.info(`Flushing ${parsedItems.length} items`);
|
||||
|
||||
try {
|
||||
const { toInsert, toKeep } = await this.processItems(parsedItems);
|
||||
|
||||
if (toInsert.length) {
|
||||
await this.retryOnce(() => this.insertIntoDB(toInsert));
|
||||
this.onInsert(toInsert);
|
||||
}
|
||||
|
||||
// Add back items to keep
|
||||
if (toKeep.length > 0) {
|
||||
await getRedisCache().lpush(
|
||||
this.getKey(),
|
||||
...toKeep.map((item) => JSON.stringify(item)),
|
||||
);
|
||||
}
|
||||
} catch (e) {
|
||||
this.logger.error(
|
||||
`Failed to get queue while flushing buffer ${this.table}:`,
|
||||
e,
|
||||
|
||||
this.logger.info(
|
||||
`Inserted ${toInsert.length} items into DB, kept ${toKeep.length} items in buffer`,
|
||||
{
|
||||
toInsert: toInsert.length,
|
||||
toKeep: toKeep.length,
|
||||
},
|
||||
);
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to process queue while flushing buffer}:', {
|
||||
error,
|
||||
queueSize: items.length,
|
||||
});
|
||||
|
||||
if (items.length > 0) {
|
||||
// Add back items to keep
|
||||
this.logger.debug('Adding all items back to buffer');
|
||||
await getRedisCache().lpush(
|
||||
this.getKey(),
|
||||
...items.map((item) => JSON.stringify(item)),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public async deleteIndexes(indexes: number[]) {
|
||||
const multi = getRedisCache().multi();
|
||||
indexes.forEach((index) => {
|
||||
multi.lset(this.getKey(), index, DELETE);
|
||||
});
|
||||
multi.lrem(this.getKey(), 0, DELETE);
|
||||
await multi.exec();
|
||||
this.logger.debug(
|
||||
`Deleted ${indexes.length} items from buffer ${this.table}`,
|
||||
);
|
||||
private async releaseLock(lockId: string): Promise<void> {
|
||||
this.logger.debug(`Released lock for ${this.getKey()}`);
|
||||
const script = `
|
||||
if redis.call("get", KEYS[1]) == ARGV[1] then
|
||||
return redis.call("del", KEYS[1])
|
||||
else
|
||||
return 0
|
||||
end
|
||||
`;
|
||||
await getRedisCache().eval(script, 1, this.lockKey, lockId);
|
||||
}
|
||||
|
||||
public async getQueue(limit: number): Promise<QueueItem<T>[]> {
|
||||
const queue = await getRedisCache().lrange(this.getKey(), 0, limit);
|
||||
const result = queue
|
||||
.map((item, index) => ({
|
||||
event: this.transformQueueItem(item),
|
||||
index,
|
||||
}))
|
||||
.filter((item): item is QueueItem<T> => item.event !== null);
|
||||
this.logger.debug(
|
||||
`Retrieved ${result.length} items from buffer ${this.table}`,
|
||||
);
|
||||
return result;
|
||||
protected async getQueue(count?: number): Promise<T[]> {
|
||||
const items = await getRedisCache().lrange(this.getKey(), 0, count ?? -1);
|
||||
return items.map((item) => JSON.parse(item) as T);
|
||||
}
|
||||
|
||||
private transformQueueItem(item: string): T | null {
|
||||
try {
|
||||
return JSON.parse(item);
|
||||
} catch (e) {
|
||||
this.logger.warn(`Failed to parse item in buffer ${this.table}:`, e);
|
||||
return null;
|
||||
}
|
||||
protected processItems(items: T[]): Promise<{ toInsert: T[]; toKeep: T[] }> {
|
||||
return Promise.resolve({ toInsert: items, toKeep: [] });
|
||||
}
|
||||
|
||||
protected insertIntoDB(_items: T[]): Promise<void> {
|
||||
throw new Error('Not implemented');
|
||||
}
|
||||
|
||||
protected onAdd(_item: T): void {
|
||||
// Override in subclass
|
||||
}
|
||||
|
||||
protected onInsert(_item: T[]): void {
|
||||
// Override in subclass
|
||||
}
|
||||
|
||||
public findMany: FindMany<T, unknown> = () => {
|
||||
return Promise.resolve([]);
|
||||
};
|
||||
|
||||
public find: Find<T, unknown> = () => {
|
||||
return Promise.resolve(null);
|
||||
};
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user