fix(buffer): increase lock timer
This commit is contained in:
@@ -1,6 +1,4 @@
|
|||||||
import { v4 as uuidv4 } from 'uuid';
|
import { generateId, getSafeJson } from '@openpanel/common';
|
||||||
|
|
||||||
import { getSafeJson } from '@openpanel/common';
|
|
||||||
import type { ILogger } from '@openpanel/logger';
|
import type { ILogger } from '@openpanel/logger';
|
||||||
import { createLogger } from '@openpanel/logger';
|
import { createLogger } from '@openpanel/logger';
|
||||||
import { getRedisCache } from '@openpanel/redis';
|
import { getRedisCache } from '@openpanel/redis';
|
||||||
@@ -57,26 +55,27 @@ export class RedisBuffer<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async tryFlush(): Promise<void> {
|
public async tryFlush(): Promise<void> {
|
||||||
const lockId = uuidv4();
|
const lockId = generateId();
|
||||||
const acquired = await getRedisCache().set(
|
const acquired = await getRedisCache().set(
|
||||||
this.lockKey,
|
this.lockKey,
|
||||||
lockId,
|
lockId,
|
||||||
'EX',
|
'EX',
|
||||||
8,
|
60,
|
||||||
'NX',
|
'NX',
|
||||||
);
|
);
|
||||||
|
|
||||||
if (acquired === 'OK') {
|
if (acquired === 'OK') {
|
||||||
this.logger.info('Lock acquired. Attempting to flush.');
|
this.logger.info(`Lock acquired. Attempting to flush. ID: ${lockId}`);
|
||||||
try {
|
try {
|
||||||
await this.flush();
|
await this.flush();
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.logger.error('Failed to flush buffer', { error });
|
this.logger.error(`Failed to flush buffer. ID: ${lockId}`, { error });
|
||||||
} finally {
|
} finally {
|
||||||
|
this.logger.info(`Releasing lock. ID: ${lockId}`);
|
||||||
await this.releaseLock(lockId);
|
await this.releaseLock(lockId);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
this.logger.warn('Failed to acquire lock. Skipping flush.');
|
this.logger.warn(`Failed to acquire lock. Skipping flush. ID: ${lockId}`);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user