improve(api): how we check duplicate requests
This commit is contained in:
@@ -7,6 +7,7 @@ import { eventsQueue } from '@openpanel/queue';
|
|||||||
import { getLock } from '@openpanel/redis';
|
import { getLock } from '@openpanel/redis';
|
||||||
import type { PostEventPayload } from '@openpanel/sdk';
|
import type { PostEventPayload } from '@openpanel/sdk';
|
||||||
|
|
||||||
|
import { checkDuplicatedEvent } from '@/utils/deduplicate';
|
||||||
import { getStringHeaders, getTimestamp } from './track.controller';
|
import { getStringHeaders, getTimestamp } from './track.controller';
|
||||||
|
|
||||||
export async function postEvent(
|
export async function postEvent(
|
||||||
@@ -39,6 +40,21 @@ export async function postEvent(
|
|||||||
ua,
|
ua,
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (
|
||||||
|
await checkDuplicatedEvent({
|
||||||
|
reply,
|
||||||
|
payload: {
|
||||||
|
...request.body,
|
||||||
|
timestamp,
|
||||||
|
previousDeviceId,
|
||||||
|
currentDeviceId,
|
||||||
|
},
|
||||||
|
projectId,
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const isScreenView = request.body.name === 'screen_view';
|
const isScreenView = request.body.name === 'screen_view';
|
||||||
// this will ensure that we don't have multiple events creating sessions
|
// this will ensure that we don't have multiple events creating sessions
|
||||||
const LOCK_DURATION = 1000;
|
const LOCK_DURATION = 1000;
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import { getClientIp, parseIp } from '@/utils/parse-ip';
|
|||||||
import type { FastifyReply, FastifyRequest } from 'fastify';
|
import type { FastifyReply, FastifyRequest } from 'fastify';
|
||||||
import { assocPath, pathOr } from 'ramda';
|
import { assocPath, pathOr } from 'ramda';
|
||||||
|
|
||||||
|
import { checkDuplicatedEvent, isDuplicatedEvent } from '@/utils/deduplicate';
|
||||||
import { parseUserAgent } from '@openpanel/common/server';
|
import { parseUserAgent } from '@openpanel/common/server';
|
||||||
import { getProfileById, upsertProfile } from '@openpanel/db';
|
import { getProfileById, upsertProfile } from '@openpanel/db';
|
||||||
import type {
|
import type {
|
||||||
@@ -25,6 +26,18 @@ export async function updateProfile(
|
|||||||
const uaInfo = parseUserAgent(ua, properties);
|
const uaInfo = parseUserAgent(ua, properties);
|
||||||
const geo = await parseIp(ip);
|
const geo = await parseIp(ip);
|
||||||
|
|
||||||
|
if (
|
||||||
|
await checkDuplicatedEvent({
|
||||||
|
reply,
|
||||||
|
payload: {
|
||||||
|
...request.body,
|
||||||
|
},
|
||||||
|
projectId,
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
await upsertProfile({
|
await upsertProfile({
|
||||||
id: profileId,
|
id: profileId,
|
||||||
isExternal: true,
|
isExternal: true,
|
||||||
@@ -52,6 +65,18 @@ export async function incrementProfileProperty(
|
|||||||
return reply.status(400).send('No projectId');
|
return reply.status(400).send('No projectId');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
await checkDuplicatedEvent({
|
||||||
|
reply,
|
||||||
|
payload: {
|
||||||
|
...request.body,
|
||||||
|
},
|
||||||
|
projectId,
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const profile = await getProfileById(profileId, projectId);
|
const profile = await getProfileById(profileId, projectId);
|
||||||
if (!profile) {
|
if (!profile) {
|
||||||
return reply.status(404).send('Not found');
|
return reply.status(404).send('Not found');
|
||||||
@@ -94,6 +119,18 @@ export async function decrementProfileProperty(
|
|||||||
return reply.status(400).send('No projectId');
|
return reply.status(400).send('No projectId');
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (
|
||||||
|
await checkDuplicatedEvent({
|
||||||
|
reply,
|
||||||
|
payload: {
|
||||||
|
...request.body,
|
||||||
|
},
|
||||||
|
projectId,
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const profile = await getProfileById(profileId, projectId);
|
const profile = await getProfileById(profileId, projectId);
|
||||||
if (!profile) {
|
if (!profile) {
|
||||||
return reply.status(404).send('Not found');
|
return reply.status(404).send('Not found');
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ import { getClientIp, parseIp } from '@/utils/parse-ip';
|
|||||||
import type { FastifyReply, FastifyRequest } from 'fastify';
|
import type { FastifyReply, FastifyRequest } from 'fastify';
|
||||||
import { path, assocPath, pathOr, pick } from 'ramda';
|
import { path, assocPath, pathOr, pick } from 'ramda';
|
||||||
|
|
||||||
|
import { checkDuplicatedEvent, isDuplicatedEvent } from '@/utils/deduplicate';
|
||||||
import { generateDeviceId, parseUserAgent } from '@openpanel/common/server';
|
import { generateDeviceId, parseUserAgent } from '@openpanel/common/server';
|
||||||
import { getProfileById, getSalts, upsertProfile } from '@openpanel/db';
|
import { getProfileById, getSalts, upsertProfile } from '@openpanel/db';
|
||||||
import { eventsQueue } from '@openpanel/queue';
|
import { eventsQueue } from '@openpanel/queue';
|
||||||
@@ -131,6 +132,21 @@ export async function handler(
|
|||||||
})
|
})
|
||||||
: '';
|
: '';
|
||||||
|
|
||||||
|
if (
|
||||||
|
await checkDuplicatedEvent({
|
||||||
|
reply,
|
||||||
|
payload: {
|
||||||
|
...request.body,
|
||||||
|
timestamp,
|
||||||
|
previousDeviceId,
|
||||||
|
currentDeviceId,
|
||||||
|
},
|
||||||
|
projectId,
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const promises = [
|
const promises = [
|
||||||
track({
|
track({
|
||||||
payload: request.body.payload,
|
payload: request.body.payload,
|
||||||
@@ -161,6 +177,19 @@ export async function handler(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'identify': {
|
case 'identify': {
|
||||||
|
if (
|
||||||
|
await checkDuplicatedEvent({
|
||||||
|
reply,
|
||||||
|
payload: {
|
||||||
|
...request.body,
|
||||||
|
timestamp,
|
||||||
|
},
|
||||||
|
projectId,
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
const geo = await parseIp(ip);
|
const geo = await parseIp(ip);
|
||||||
await identify({
|
await identify({
|
||||||
payload: request.body.payload,
|
payload: request.body.payload,
|
||||||
@@ -179,6 +208,19 @@ export async function handler(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'increment': {
|
case 'increment': {
|
||||||
|
if (
|
||||||
|
await checkDuplicatedEvent({
|
||||||
|
reply,
|
||||||
|
payload: {
|
||||||
|
...request.body,
|
||||||
|
timestamp,
|
||||||
|
},
|
||||||
|
projectId,
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
await increment({
|
await increment({
|
||||||
payload: request.body.payload,
|
payload: request.body.payload,
|
||||||
projectId,
|
projectId,
|
||||||
@@ -186,6 +228,19 @@ export async function handler(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 'decrement': {
|
case 'decrement': {
|
||||||
|
if (
|
||||||
|
await checkDuplicatedEvent({
|
||||||
|
reply,
|
||||||
|
payload: {
|
||||||
|
...request.body,
|
||||||
|
timestamp,
|
||||||
|
},
|
||||||
|
projectId,
|
||||||
|
})
|
||||||
|
) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
await decrement({
|
await decrement({
|
||||||
payload: request.body.payload,
|
payload: request.body.payload,
|
||||||
projectId,
|
projectId,
|
||||||
@@ -201,6 +256,8 @@ export async function handler(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
reply.status(200).send('ok');
|
||||||
}
|
}
|
||||||
|
|
||||||
type TrackPayload = {
|
type TrackPayload = {
|
||||||
|
|||||||
@@ -1,21 +0,0 @@
|
|||||||
import { getLock } from '@openpanel/redis';
|
|
||||||
import fastJsonStableHash from 'fast-json-stable-hash';
|
|
||||||
import type { FastifyReply, FastifyRequest } from 'fastify';
|
|
||||||
|
|
||||||
export async function deduplicateHook(
|
|
||||||
request: FastifyRequest,
|
|
||||||
reply: FastifyReply,
|
|
||||||
) {
|
|
||||||
if (typeof request.body === 'object') {
|
|
||||||
const locked = await getLock(
|
|
||||||
`fastify:deduplicate:${fastJsonStableHash.hash(request.body, 'md5')}`,
|
|
||||||
'1',
|
|
||||||
100,
|
|
||||||
);
|
|
||||||
|
|
||||||
if (locked) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
reply.status(200).send('Duplicated event');
|
|
||||||
}
|
|
||||||
@@ -2,11 +2,9 @@ import * as controller from '@/controllers/event.controller';
|
|||||||
import type { FastifyPluginCallback } from 'fastify';
|
import type { FastifyPluginCallback } from 'fastify';
|
||||||
|
|
||||||
import { clientHook } from '@/hooks/client.hook';
|
import { clientHook } from '@/hooks/client.hook';
|
||||||
import { deduplicateHook } from '@/hooks/deduplicate.hook';
|
|
||||||
import { isBotHook } from '@/hooks/is-bot.hook';
|
import { isBotHook } from '@/hooks/is-bot.hook';
|
||||||
|
|
||||||
const eventRouter: FastifyPluginCallback = async (fastify) => {
|
const eventRouter: FastifyPluginCallback = async (fastify) => {
|
||||||
fastify.addHook('preHandler', deduplicateHook);
|
|
||||||
fastify.addHook('preHandler', clientHook);
|
fastify.addHook('preHandler', clientHook);
|
||||||
fastify.addHook('preHandler', isBotHook);
|
fastify.addHook('preHandler', isBotHook);
|
||||||
|
|
||||||
|
|||||||
@@ -1,11 +1,9 @@
|
|||||||
import * as controller from '@/controllers/profile.controller';
|
import * as controller from '@/controllers/profile.controller';
|
||||||
import { clientHook } from '@/hooks/client.hook';
|
import { clientHook } from '@/hooks/client.hook';
|
||||||
import { deduplicateHook } from '@/hooks/deduplicate.hook';
|
|
||||||
import { isBotHook } from '@/hooks/is-bot.hook';
|
import { isBotHook } from '@/hooks/is-bot.hook';
|
||||||
import type { FastifyPluginCallback } from 'fastify';
|
import type { FastifyPluginCallback } from 'fastify';
|
||||||
|
|
||||||
const eventRouter: FastifyPluginCallback = async (fastify) => {
|
const eventRouter: FastifyPluginCallback = async (fastify) => {
|
||||||
fastify.addHook('preHandler', deduplicateHook);
|
|
||||||
fastify.addHook('preHandler', clientHook);
|
fastify.addHook('preHandler', clientHook);
|
||||||
fastify.addHook('preHandler', isBotHook);
|
fastify.addHook('preHandler', isBotHook);
|
||||||
|
|
||||||
|
|||||||
@@ -2,11 +2,9 @@ import { handler } from '@/controllers/track.controller';
|
|||||||
import type { FastifyPluginCallback } from 'fastify';
|
import type { FastifyPluginCallback } from 'fastify';
|
||||||
|
|
||||||
import { clientHook } from '@/hooks/client.hook';
|
import { clientHook } from '@/hooks/client.hook';
|
||||||
import { deduplicateHook } from '@/hooks/deduplicate.hook';
|
|
||||||
import { isBotHook } from '@/hooks/is-bot.hook';
|
import { isBotHook } from '@/hooks/is-bot.hook';
|
||||||
|
|
||||||
const trackRouter: FastifyPluginCallback = (fastify) => {
|
const trackRouter: FastifyPluginCallback = (fastify) => {
|
||||||
fastify.addHook('preHandler', deduplicateHook);
|
|
||||||
fastify.addHook('preHandler', clientHook);
|
fastify.addHook('preHandler', clientHook);
|
||||||
fastify.addHook('preHandler', isBotHook);
|
fastify.addHook('preHandler', isBotHook);
|
||||||
|
|
||||||
|
|||||||
50
apps/api/src/utils/deduplicate.ts
Normal file
50
apps/api/src/utils/deduplicate.ts
Normal file
@@ -0,0 +1,50 @@
|
|||||||
|
import { getLock } from '@openpanel/redis';
|
||||||
|
import fastJsonStableHash from 'fast-json-stable-hash';
|
||||||
|
import type { FastifyReply } from 'fastify';
|
||||||
|
|
||||||
|
export async function isDuplicatedEvent({
|
||||||
|
payload,
|
||||||
|
projectId,
|
||||||
|
}: {
|
||||||
|
payload: Record<string, any>;
|
||||||
|
projectId: string;
|
||||||
|
}) {
|
||||||
|
const locked = await getLock(
|
||||||
|
`fastify:deduplicate:${fastJsonStableHash.hash(
|
||||||
|
{
|
||||||
|
...payload,
|
||||||
|
projectId,
|
||||||
|
},
|
||||||
|
'md5',
|
||||||
|
)}`,
|
||||||
|
'1',
|
||||||
|
100,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (locked) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function checkDuplicatedEvent({
|
||||||
|
reply,
|
||||||
|
payload,
|
||||||
|
projectId,
|
||||||
|
}: {
|
||||||
|
reply: FastifyReply;
|
||||||
|
payload: Record<string, any>;
|
||||||
|
projectId: string;
|
||||||
|
}) {
|
||||||
|
if (await isDuplicatedEvent({ payload, projectId })) {
|
||||||
|
reply.log.info('duplicated event', {
|
||||||
|
payload,
|
||||||
|
projectId,
|
||||||
|
});
|
||||||
|
reply.status(200).send('duplicated');
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user