fix redis timeout connection on serverless (avoid init redis directly)
This commit is contained in:
@@ -1,4 +1,4 @@
|
|||||||
import type { Redis } from '@openpanel/redis';
|
import { getRedisCache } from '@openpanel/redis';
|
||||||
|
|
||||||
export const DELETE = '__DELETE__';
|
export const DELETE = '__DELETE__';
|
||||||
|
|
||||||
@@ -40,7 +40,6 @@ export abstract class RedisBuffer<T> {
|
|||||||
public prefix = 'op:buffer';
|
public prefix = 'op:buffer';
|
||||||
public table: string;
|
public table: string;
|
||||||
public batchSize?: number;
|
public batchSize?: number;
|
||||||
public redis: Redis;
|
|
||||||
|
|
||||||
// abstract methods
|
// abstract methods
|
||||||
public abstract onInsert?: OnInsert<T>;
|
public abstract onInsert?: OnInsert<T>;
|
||||||
@@ -49,9 +48,8 @@ export abstract class RedisBuffer<T> {
|
|||||||
public abstract find: Find<T, unknown>;
|
public abstract find: Find<T, unknown>;
|
||||||
public abstract findMany: FindMany<T, unknown>;
|
public abstract findMany: FindMany<T, unknown>;
|
||||||
|
|
||||||
constructor(options: { table: string; redis: Redis; batchSize?: number }) {
|
constructor(options: { table: string; batchSize?: number }) {
|
||||||
this.table = options.table;
|
this.table = options.table;
|
||||||
this.redis = options.redis;
|
|
||||||
this.batchSize = options.batchSize;
|
this.batchSize = options.batchSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -65,9 +63,9 @@ export abstract class RedisBuffer<T> {
|
|||||||
|
|
||||||
public async insert(value: T) {
|
public async insert(value: T) {
|
||||||
this.onInsert?.(value);
|
this.onInsert?.(value);
|
||||||
await this.redis.rpush(this.getKey(), JSON.stringify(value));
|
await getRedisCache().rpush(this.getKey(), JSON.stringify(value));
|
||||||
|
|
||||||
const length = await this.redis.llen(this.getKey());
|
const length = await getRedisCache().llen(this.getKey());
|
||||||
if (this.batchSize && length >= this.batchSize) {
|
if (this.batchSize && length >= this.batchSize) {
|
||||||
this.flush();
|
this.flush();
|
||||||
}
|
}
|
||||||
@@ -109,7 +107,7 @@ export abstract class RedisBuffer<T> {
|
|||||||
e
|
e
|
||||||
);
|
);
|
||||||
const timestamp = new Date().getTime();
|
const timestamp = new Date().getTime();
|
||||||
await this.redis.hset(this.getKey(`failed:${timestamp}`), {
|
await getRedisCache().hset(this.getKey(`failed:${timestamp}`), {
|
||||||
error: getError(e),
|
error: getError(e),
|
||||||
data: JSON.stringify(queue.map((item) => item.event)),
|
data: JSON.stringify(queue.map((item) => item.event)),
|
||||||
retries: 0,
|
retries: 0,
|
||||||
@@ -121,7 +119,7 @@ export abstract class RedisBuffer<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async deleteIndexes(indexes: number[]) {
|
public async deleteIndexes(indexes: number[]) {
|
||||||
const multi = this.redis.multi();
|
const multi = getRedisCache().multi();
|
||||||
indexes.forEach((index) => {
|
indexes.forEach((index) => {
|
||||||
multi.lset(this.getKey(), index, DELETE);
|
multi.lset(this.getKey(), index, DELETE);
|
||||||
});
|
});
|
||||||
@@ -130,7 +128,7 @@ export abstract class RedisBuffer<T> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async getQueue(limit: number): Promise<QueueItem<T>[]> {
|
public async getQueue(limit: number): Promise<QueueItem<T>[]> {
|
||||||
const queue = await this.redis.lrange(this.getKey(), 0, limit);
|
const queue = await getRedisCache().lrange(this.getKey(), 0, limit);
|
||||||
return queue
|
return queue
|
||||||
.map((item, index) => ({
|
.map((item, index) => ({
|
||||||
event: this.transformQueueItem(item),
|
event: this.transformQueueItem(item),
|
||||||
|
|||||||
@@ -31,7 +31,6 @@ export class EventBuffer extends RedisBuffer<IClickhouseEvent> {
|
|||||||
constructor() {
|
constructor() {
|
||||||
super({
|
super({
|
||||||
table: TABLE_NAMES.events,
|
table: TABLE_NAMES.events,
|
||||||
redis: getRedisCache(),
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -41,7 +40,7 @@ export class EventBuffer extends RedisBuffer<IClickhouseEvent> {
|
|||||||
SuperJSON.stringify(transformEvent(event))
|
SuperJSON.stringify(transformEvent(event))
|
||||||
);
|
);
|
||||||
if (event.profile_id) {
|
if (event.profile_id) {
|
||||||
this.redis.set(
|
getRedisCache().set(
|
||||||
`live:event:${event.project_id}:${event.profile_id}`,
|
`live:event:${event.project_id}:${event.profile_id}`,
|
||||||
'',
|
'',
|
||||||
'EX',
|
'EX',
|
||||||
@@ -168,7 +167,7 @@ export class EventBuffer extends RedisBuffer<IClickhouseEvent> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
if (itemsToStalled.size > 0) {
|
if (itemsToStalled.size > 0) {
|
||||||
const multi = this.redis.multi();
|
const multi = getRedisCache().multi();
|
||||||
for (const item of itemsToStalled) {
|
for (const item of itemsToStalled) {
|
||||||
multi.rpush(this.getKey('stalled'), JSON.stringify(item.event));
|
multi.rpush(this.getKey('stalled'), JSON.stringify(item.event));
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -22,7 +22,6 @@ import { RedisBuffer } from './buffer';
|
|||||||
export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
|
export class ProfileBuffer extends RedisBuffer<IClickhouseProfile> {
|
||||||
constructor() {
|
constructor() {
|
||||||
super({
|
super({
|
||||||
redis: getRedisCache(),
|
|
||||||
table: 'profiles',
|
table: 'profiles',
|
||||||
batchSize: 100,
|
batchSize: 100,
|
||||||
});
|
});
|
||||||
|
|||||||
Reference in New Issue
Block a user