diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index a264b672..87b78606 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -7,6 +7,7 @@ import { eventsQueue } from '@openpanel/queue'; import { getLock } from '@openpanel/redis'; import type { PostEventPayload } from '@openpanel/sdk'; +import { checkDuplicatedEvent } from '@/utils/deduplicate'; import { getStringHeaders, getTimestamp } from './track.controller'; export async function postEvent( @@ -39,6 +40,21 @@ export async function postEvent( ua, }); + if ( + await checkDuplicatedEvent({ + reply, + payload: { + ...request.body, + timestamp, + previousDeviceId, + currentDeviceId, + }, + projectId, + }) + ) { + return; + } + const isScreenView = request.body.name === 'screen_view'; // this will ensure that we don't have multiple events creating sessions const LOCK_DURATION = 1000; diff --git a/apps/api/src/controllers/profile.controller.ts b/apps/api/src/controllers/profile.controller.ts index b58d6df6..3da6bee5 100644 --- a/apps/api/src/controllers/profile.controller.ts +++ b/apps/api/src/controllers/profile.controller.ts @@ -2,6 +2,7 @@ import { getClientIp, parseIp } from '@/utils/parse-ip'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { assocPath, pathOr } from 'ramda'; +import { checkDuplicatedEvent, isDuplicatedEvent } from '@/utils/deduplicate'; import { parseUserAgent } from '@openpanel/common/server'; import { getProfileById, upsertProfile } from '@openpanel/db'; import type { @@ -25,6 +26,18 @@ export async function updateProfile( const uaInfo = parseUserAgent(ua, properties); const geo = await parseIp(ip); + if ( + await checkDuplicatedEvent({ + reply, + payload: { + ...request.body, + }, + projectId, + }) + ) { + return; + } + await upsertProfile({ id: profileId, isExternal: true, @@ -52,6 +65,18 @@ export async function incrementProfileProperty( return reply.status(400).send('No projectId'); } + if ( + await checkDuplicatedEvent({ + reply, + payload: { + ...request.body, + }, + projectId, + }) + ) { + return; + } + const profile = await getProfileById(profileId, projectId); if (!profile) { return reply.status(404).send('Not found'); @@ -94,6 +119,18 @@ export async function decrementProfileProperty( return reply.status(400).send('No projectId'); } + if ( + await checkDuplicatedEvent({ + reply, + payload: { + ...request.body, + }, + projectId, + }) + ) { + return; + } + const profile = await getProfileById(profileId, projectId); if (!profile) { return reply.status(404).send('Not found'); diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index 34a9f50e..398c59b4 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -3,6 +3,7 @@ import { getClientIp, parseIp } from '@/utils/parse-ip'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { path, assocPath, pathOr, pick } from 'ramda'; +import { checkDuplicatedEvent, isDuplicatedEvent } from '@/utils/deduplicate'; import { generateDeviceId, parseUserAgent } from '@openpanel/common/server'; import { getProfileById, getSalts, upsertProfile } from '@openpanel/db'; import { eventsQueue } from '@openpanel/queue'; @@ -131,6 +132,21 @@ export async function handler( }) : ''; + if ( + await checkDuplicatedEvent({ + reply, + payload: { + ...request.body, + timestamp, + previousDeviceId, + currentDeviceId, + }, + projectId, + }) + ) { + return; + } + const promises = [ track({ payload: request.body.payload, @@ -161,6 +177,19 @@ export async function handler( break; } case 'identify': { + if ( + await checkDuplicatedEvent({ + reply, + payload: { + ...request.body, + timestamp, + }, + projectId, + }) + ) { + return; + } + const geo = await parseIp(ip); await identify({ payload: request.body.payload, @@ -179,6 +208,19 @@ export async function handler( break; } case 'increment': { + if ( + await checkDuplicatedEvent({ + reply, + payload: { + ...request.body, + timestamp, + }, + projectId, + }) + ) { + return; + } + await increment({ payload: request.body.payload, projectId, @@ -186,6 +228,19 @@ export async function handler( break; } case 'decrement': { + if ( + await checkDuplicatedEvent({ + reply, + payload: { + ...request.body, + timestamp, + }, + projectId, + }) + ) { + return; + } + await decrement({ payload: request.body.payload, projectId, @@ -201,6 +256,8 @@ export async function handler( break; } } + + reply.status(200).send('ok'); } type TrackPayload = { diff --git a/apps/api/src/hooks/deduplicate.hook.ts b/apps/api/src/hooks/deduplicate.hook.ts deleted file mode 100644 index 15f2e635..00000000 --- a/apps/api/src/hooks/deduplicate.hook.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { getLock } from '@openpanel/redis'; -import fastJsonStableHash from 'fast-json-stable-hash'; -import type { FastifyReply, FastifyRequest } from 'fastify'; - -export async function deduplicateHook( - request: FastifyRequest, - reply: FastifyReply, -) { - if (typeof request.body === 'object') { - const locked = await getLock( - `fastify:deduplicate:${fastJsonStableHash.hash(request.body, 'md5')}`, - '1', - 100, - ); - - if (locked) { - return; - } - } - reply.status(200).send('Duplicated event'); -} diff --git a/apps/api/src/routes/event.router.ts b/apps/api/src/routes/event.router.ts index b84a553f..2a3a169c 100644 --- a/apps/api/src/routes/event.router.ts +++ b/apps/api/src/routes/event.router.ts @@ -2,11 +2,9 @@ import * as controller from '@/controllers/event.controller'; import type { FastifyPluginCallback } from 'fastify'; import { clientHook } from '@/hooks/client.hook'; -import { deduplicateHook } from '@/hooks/deduplicate.hook'; import { isBotHook } from '@/hooks/is-bot.hook'; const eventRouter: FastifyPluginCallback = async (fastify) => { - fastify.addHook('preHandler', deduplicateHook); fastify.addHook('preHandler', clientHook); fastify.addHook('preHandler', isBotHook); diff --git a/apps/api/src/routes/profile.router.ts b/apps/api/src/routes/profile.router.ts index 9715a997..990f8b10 100644 --- a/apps/api/src/routes/profile.router.ts +++ b/apps/api/src/routes/profile.router.ts @@ -1,11 +1,9 @@ import * as controller from '@/controllers/profile.controller'; import { clientHook } from '@/hooks/client.hook'; -import { deduplicateHook } from '@/hooks/deduplicate.hook'; import { isBotHook } from '@/hooks/is-bot.hook'; import type { FastifyPluginCallback } from 'fastify'; const eventRouter: FastifyPluginCallback = async (fastify) => { - fastify.addHook('preHandler', deduplicateHook); fastify.addHook('preHandler', clientHook); fastify.addHook('preHandler', isBotHook); diff --git a/apps/api/src/routes/track.router.ts b/apps/api/src/routes/track.router.ts index 9bed8818..c2456db4 100644 --- a/apps/api/src/routes/track.router.ts +++ b/apps/api/src/routes/track.router.ts @@ -2,11 +2,9 @@ import { handler } from '@/controllers/track.controller'; import type { FastifyPluginCallback } from 'fastify'; import { clientHook } from '@/hooks/client.hook'; -import { deduplicateHook } from '@/hooks/deduplicate.hook'; import { isBotHook } from '@/hooks/is-bot.hook'; const trackRouter: FastifyPluginCallback = (fastify) => { - fastify.addHook('preHandler', deduplicateHook); fastify.addHook('preHandler', clientHook); fastify.addHook('preHandler', isBotHook); diff --git a/apps/api/src/utils/deduplicate.ts b/apps/api/src/utils/deduplicate.ts new file mode 100644 index 00000000..f6ca99a0 --- /dev/null +++ b/apps/api/src/utils/deduplicate.ts @@ -0,0 +1,50 @@ +import { getLock } from '@openpanel/redis'; +import fastJsonStableHash from 'fast-json-stable-hash'; +import type { FastifyReply } from 'fastify'; + +export async function isDuplicatedEvent({ + payload, + projectId, +}: { + payload: Record; + projectId: string; +}) { + const locked = await getLock( + `fastify:deduplicate:${fastJsonStableHash.hash( + { + ...payload, + projectId, + }, + 'md5', + )}`, + '1', + 100, + ); + + if (locked) { + return false; + } + + return true; +} + +export async function checkDuplicatedEvent({ + reply, + payload, + projectId, +}: { + reply: FastifyReply; + payload: Record; + projectId: string; +}) { + if (await isDuplicatedEvent({ payload, projectId })) { + reply.log.info('duplicated event', { + payload, + projectId, + }); + reply.status(200).send('duplicated'); + return true; + } + + return false; +}