fix(api,worker): general improvements for workers, debugging and logging
This commit is contained in:
@@ -20,7 +20,6 @@ export class RedisBuffer<T> {
|
||||
private lockKey: string;
|
||||
protected maxBufferSize: number | null;
|
||||
protected logger: ILogger;
|
||||
private isCurrentlyFlushing = false;
|
||||
|
||||
constructor(bufferName: string, maxBufferSize: number | null) {
|
||||
this.bufferKey = bufferName;
|
||||
@@ -58,11 +57,6 @@ export class RedisBuffer<T> {
|
||||
}
|
||||
|
||||
public async tryFlush(): Promise<void> {
|
||||
if (this.isCurrentlyFlushing) {
|
||||
this.logger.debug('Already flushing. Skipping additional flush attempt.');
|
||||
return;
|
||||
}
|
||||
|
||||
const lockId = uuidv4();
|
||||
const acquired = await getRedisCache().set(
|
||||
this.lockKey,
|
||||
@@ -73,18 +67,16 @@ export class RedisBuffer<T> {
|
||||
);
|
||||
|
||||
if (acquired === 'OK') {
|
||||
this.logger.debug('Lock acquired. Attempting to flush.');
|
||||
this.isCurrentlyFlushing = true;
|
||||
this.logger.info('Lock acquired. Attempting to flush.');
|
||||
try {
|
||||
await this.flush();
|
||||
} catch (error) {
|
||||
this.logger.error('Failed to flush buffer', { error });
|
||||
} finally {
|
||||
this.isCurrentlyFlushing = false;
|
||||
await this.releaseLock(lockId);
|
||||
}
|
||||
} else {
|
||||
this.logger.debug('Failed to acquire lock. Skipping flush.');
|
||||
this.logger.warn('Failed to acquire lock. Skipping flush.');
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -48,14 +48,32 @@ const cleanQuery = (query?: string) =>
|
||||
? query.replace(/\n/g, '').replace(/\s+/g, ' ').trim()
|
||||
: undefined;
|
||||
|
||||
const createChildLogger = (property: string, args?: any[]) => {
|
||||
if (property === 'insert') {
|
||||
return logger.child({
|
||||
property,
|
||||
table: args?.[0]?.table,
|
||||
values: (args?.[0]?.values || []).length,
|
||||
});
|
||||
}
|
||||
|
||||
return logger.child({
|
||||
property,
|
||||
table: args?.[0]?.table,
|
||||
query: cleanQuery(args?.[0]?.query),
|
||||
});
|
||||
};
|
||||
|
||||
export const ch = new Proxy(originalCh, {
|
||||
get(target, property, receiver) {
|
||||
if (property === 'insert' || property === 'query') {
|
||||
return async (...args: any[]) => {
|
||||
const childLogger = logger.child({
|
||||
query: cleanQuery(args[0].query),
|
||||
property,
|
||||
});
|
||||
const childLogger = createChildLogger(property, args);
|
||||
|
||||
if (property === 'insert') {
|
||||
childLogger.info('insert info');
|
||||
}
|
||||
|
||||
try {
|
||||
// First attempt
|
||||
if (property in target) {
|
||||
@@ -69,23 +87,23 @@ export const ch = new Proxy(originalCh, {
|
||||
error.message.includes('socket hang up') ||
|
||||
error.message.includes('Timeout error'))
|
||||
) {
|
||||
childLogger.error('Captured error', {
|
||||
childLogger.error('First failed attempt', {
|
||||
error,
|
||||
});
|
||||
await new Promise((resolve) => setTimeout(resolve, 500));
|
||||
try {
|
||||
// Retry once
|
||||
childLogger.info('Retrying query');
|
||||
childLogger.info(`Retrying ${property}`);
|
||||
if (property in target) {
|
||||
// @ts-expect-error
|
||||
return await target[property](...args);
|
||||
}
|
||||
} catch (retryError) {
|
||||
logger.error('Retry failed', retryError);
|
||||
childLogger.error('Second failed attempt', retryError);
|
||||
throw retryError; // Rethrow or handle as needed
|
||||
}
|
||||
} else {
|
||||
logger.error('query failed', {
|
||||
childLogger.error('Failed without retry', {
|
||||
...args[0],
|
||||
error,
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user