fix(api): ensure we always have profile in cache (before inserted to clickhouse)
This commit is contained in:
@@ -3,7 +3,7 @@ import { escape } from 'sqlstring';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import { DateTime, toDots } from '@openpanel/common';
|
||||
import { cacheable, getCache } from '@openpanel/redis';
|
||||
import { cacheable } from '@openpanel/redis';
|
||||
import type { IChartEventFilter } from '@openpanel/validation';
|
||||
|
||||
import { botBuffer, eventBuffer, sessionBuffer } from '../buffers';
|
||||
@@ -19,7 +19,7 @@ import type { EventMeta, Prisma } from '../prisma-client';
|
||||
import { db } from '../prisma-client';
|
||||
import { createSqlBuilder } from '../sql-builder';
|
||||
import { getEventFiltersWhereClause } from './chart.service';
|
||||
import type { IServiceProfile } from './profile.service';
|
||||
import type { IServiceProfile, IServiceUpsertProfile } from './profile.service';
|
||||
import { getProfileById, getProfiles, upsertProfile } from './profile.service';
|
||||
|
||||
export type IImportedEvent = Omit<
|
||||
@@ -325,7 +325,7 @@ export async function createEvent(payload: IServiceCreateEventPayload) {
|
||||
await Promise.all([sessionBuffer.add(event), eventBuffer.add(event)]);
|
||||
|
||||
if (payload.profileId) {
|
||||
const profile = {
|
||||
const profile: IServiceUpsertProfile = {
|
||||
id: String(payload.profileId),
|
||||
isExternal: payload.profileId !== payload.deviceId,
|
||||
projectId: payload.projectId,
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import { omit, uniq } from 'ramda';
|
||||
import { escape } from 'sqlstring';
|
||||
|
||||
import { toObject } from '@openpanel/common';
|
||||
import { strip, toObject } from '@openpanel/common';
|
||||
import { cacheable } from '@openpanel/redis';
|
||||
import type { IChartEventFilter } from '@openpanel/validation';
|
||||
|
||||
@@ -69,7 +69,7 @@ export async function getProfileById(id: string, projectId: string) {
|
||||
return transformProfile(profile);
|
||||
}
|
||||
|
||||
export const getProfileByIdCached = getProfileById; //cacheable(getProfileById, 60 * 30);
|
||||
export const getProfileByIdCached = cacheable(getProfileById, 60 * 30);
|
||||
|
||||
interface GetProfileListOptions {
|
||||
projectId: string;
|
||||
@@ -142,14 +142,15 @@ export async function getProfileListCount({
|
||||
return data[0]?.count ?? 0;
|
||||
}
|
||||
|
||||
export type IServiceProfile = Omit<
|
||||
IClickhouseProfile,
|
||||
'created_at' | 'properties' | 'first_name' | 'last_name' | 'is_external'
|
||||
> & {
|
||||
export type IServiceProfile = {
|
||||
id: string;
|
||||
email: string;
|
||||
avatar: string;
|
||||
firstName: string;
|
||||
lastName: string;
|
||||
createdAt: Date;
|
||||
isExternal: boolean;
|
||||
projectId: string;
|
||||
properties: Record<string, unknown> & {
|
||||
region?: string;
|
||||
country?: string;
|
||||
@@ -197,14 +198,15 @@ export function transformProfile({
|
||||
...profile
|
||||
}: IClickhouseProfile): IServiceProfile {
|
||||
return {
|
||||
...profile,
|
||||
firstName: first_name,
|
||||
lastName: last_name,
|
||||
isExternal: profile.is_external,
|
||||
properties: profile.properties
|
||||
? omit(['browserVersion', 'osVersion'], toObject(profile.properties))
|
||||
: {},
|
||||
properties: toObject(profile.properties),
|
||||
createdAt: new Date(created_at),
|
||||
projectId: profile.project_id,
|
||||
id: profile.id,
|
||||
email: profile.email,
|
||||
avatar: profile.avatar,
|
||||
};
|
||||
}
|
||||
|
||||
@@ -221,18 +223,22 @@ export async function upsertProfile(
|
||||
}: IServiceUpsertProfile,
|
||||
isFromEvent = false,
|
||||
) {
|
||||
return profileBuffer.add(
|
||||
{
|
||||
id,
|
||||
first_name: firstName!,
|
||||
last_name: lastName!,
|
||||
email: email!,
|
||||
avatar: avatar!,
|
||||
properties: properties as Record<string, string | undefined>,
|
||||
project_id: projectId,
|
||||
created_at: formatClickhouseDate(new Date()),
|
||||
is_external: isExternal,
|
||||
},
|
||||
isFromEvent,
|
||||
);
|
||||
const profile: IClickhouseProfile = {
|
||||
id,
|
||||
first_name: firstName || '',
|
||||
last_name: lastName || '',
|
||||
email: email || '',
|
||||
avatar: avatar || '',
|
||||
properties: strip((properties as Record<string, string | undefined>) || {}),
|
||||
project_id: projectId,
|
||||
created_at: formatClickhouseDate(new Date()),
|
||||
is_external: isExternal,
|
||||
};
|
||||
|
||||
if (!isFromEvent) {
|
||||
// Save to cache directly since the profile might be used before its saved in clickhouse
|
||||
getProfileByIdCached.set(id, projectId)(transformProfile(profile));
|
||||
}
|
||||
|
||||
return profileBuffer.add(profile, isFromEvent);
|
||||
}
|
||||
|
||||
@@ -87,6 +87,12 @@ export function cacheable<T extends (...args: any) => any>(
|
||||
const key = getKey(...args);
|
||||
return getRedisCache().del(key);
|
||||
};
|
||||
cachedFn.set =
|
||||
(...args: Parameters<T>) =>
|
||||
async (payload: Awaited<ReturnType<T>>) => {
|
||||
const key = getKey(...args);
|
||||
return getRedisCache().setex(key, expireInSec, JSON.stringify(payload));
|
||||
};
|
||||
|
||||
return cachedFn;
|
||||
}
|
||||
|
||||
@@ -121,6 +121,7 @@ export const notificationRouter = createTRPCRouter({
|
||||
.map((id) => ({ id })),
|
||||
},
|
||||
config: input.config,
|
||||
template: input.template || null,
|
||||
},
|
||||
});
|
||||
}),
|
||||
|
||||
Reference in New Issue
Block a user