fix: duplicate session start (race condition) + remove old device id handling

This commit is contained in:
Carl-Gerhard Lindesvärd
2026-02-27 09:56:51 +01:00
parent a42adcdbfb
commit 928c44ef6a
6 changed files with 87 additions and 105 deletions

View File

@@ -61,8 +61,6 @@ export async function postEvent(
}, },
uaInfo, uaInfo,
geo, geo,
currentDeviceId: '',
previousDeviceId: '',
deviceId, deviceId,
sessionId: sessionId ?? '', sessionId: sessionId ?? '',
}, },

View File

@@ -217,8 +217,6 @@ async function handleTrack(
geo, geo,
deviceId, deviceId,
sessionId, sessionId,
currentDeviceId: '', // TODO: Remove
previousDeviceId: '', // TODO: Remove
}, },
groupId, groupId,
jobId, jobId,

View File

@@ -14,7 +14,8 @@ import {
} from '@openpanel/db'; } from '@openpanel/db';
import type { ILogger } from '@openpanel/logger'; import type { ILogger } from '@openpanel/logger';
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue';
import * as R from 'ramda'; import { getLock } from '@openpanel/redis';
import { anyPass, isEmpty, isNil, mergeDeepRight, omit, reject } from 'ramda';
import { logger as baseLogger } from '@/utils/logger'; import { logger as baseLogger } from '@/utils/logger';
import { createSessionEndJob, getSessionEnd } from '@/utils/session-handler'; import { createSessionEndJob, getSessionEnd } from '@/utils/session-handler';
@@ -24,7 +25,22 @@ const GLOBAL_PROPERTIES = ['__path', '__referrer', '__timestamp', '__revenue'];
// First it will strip '' and undefined/null from B // First it will strip '' and undefined/null from B
// Then it will merge the two objects with a standard ramda merge function // Then it will merge the two objects with a standard ramda merge function
const merge = <A, B>(a: Partial<A>, b: Partial<B>): A & B => const merge = <A, B>(a: Partial<A>, b: Partial<B>): A & B =>
R.mergeDeepRight(a, R.reject(R.anyPass([R.isEmpty, R.isNil]))(b)) as A & B; mergeDeepRight(a, reject(anyPass([isEmpty, isNil]))(b)) as A & B;
/** Check if payload matches project-level event exclude filters */
async function isEventExcludedByProjectFilter(
payload: IServiceCreateEventPayload,
projectId: string
): Promise<boolean> {
const project = await getProjectByIdCached(projectId);
const eventExcludeFilters = (project?.filters ?? []).filter(
(f) => f.type === 'event'
);
if (eventExcludeFilters.length === 0) {
return false;
}
return eventExcludeFilters.some((filter) => matchEvent(payload, filter));
}
async function createEventAndNotify( async function createEventAndNotify(
payload: IServiceCreateEventPayload, payload: IServiceCreateEventPayload,
@@ -32,21 +48,13 @@ async function createEventAndNotify(
projectId: string projectId: string
) { ) {
// Check project-level event exclude filters // Check project-level event exclude filters
const project = await getProjectByIdCached(projectId); const isExcluded = await isEventExcludedByProjectFilter(payload, projectId);
const eventExcludeFilters = (project?.filters ?? []).filter( if (isExcluded) {
(f) => f.type === 'event' logger.info('Event excluded by project filter', {
); event: payload.name,
if (eventExcludeFilters.length > 0) { projectId,
const isExcluded = eventExcludeFilters.some((filter) => });
matchEvent(payload, filter) return null;
);
if (isExcluded) {
logger.info('Event excluded by project filter', {
event: payload.name,
projectId,
});
return null;
}
} }
logger.info('Creating event', { event: payload }); logger.info('Creating event', { event: payload });
@@ -83,8 +91,6 @@ export async function incomingEvent(
event: body, event: body,
headers, headers,
projectId, projectId,
currentDeviceId,
previousDeviceId,
deviceId, deviceId,
sessionId, sessionId,
uaInfo: _uaInfo, uaInfo: _uaInfo,
@@ -125,7 +131,7 @@ export async function incomingEvent(
name: body.name, name: body.name,
profileId, profileId,
projectId, projectId,
properties: R.omit(GLOBAL_PROPERTIES, { properties: omit(GLOBAL_PROPERTIES, {
...properties, ...properties,
__hash: hash, __hash: hash,
__query: query, __query: query,
@@ -194,8 +200,6 @@ export async function incomingEvent(
const sessionEnd = await getSessionEnd({ const sessionEnd = await getSessionEnd({
projectId, projectId,
currentDeviceId,
previousDeviceId,
deviceId, deviceId,
profileId, profileId,
}); });
@@ -216,20 +220,44 @@ export async function incomingEvent(
origin: baseEvent.origin || activeSession?.exit_origin || '', origin: baseEvent.origin || activeSession?.exit_origin || '',
} as Partial<IServiceCreateEventPayload>) as IServiceCreateEventPayload; } as Partial<IServiceCreateEventPayload>) as IServiceCreateEventPayload;
if (!sessionEnd) { // If the triggering event is filtered, do not create session_start or the event (issue #2)
logger.info('Creating session start event', { event: payload }); const isExcluded = await isEventExcludedByProjectFilter(payload, projectId);
await createEventAndNotify( if (isExcluded) {
logger.info(
'Skipping session_start and event (excluded by project filter)',
{ {
...payload, event: payload.name,
name: 'session_start', projectId,
createdAt: new Date(getTime(payload.createdAt) - 100), }
}, );
logger, return null;
projectId }
).catch((error) => {
logger.error('Error creating session start event', { event: payload }); if (!sessionEnd) {
throw error; const locked = await getLock(
}); `session_start:${projectId}:${sessionId}`,
'1',
1000
);
if (locked) {
logger.info('Creating session start event', { event: payload });
await createEventAndNotify(
{
...payload,
name: 'session_start',
createdAt: new Date(getTime(payload.createdAt) - 100),
},
logger,
projectId
).catch((error) => {
logger.error('Error creating session start event', { event: payload });
throw error;
});
} else {
logger.info('Session start already claimed by another worker', {
event: payload,
});
}
} }
const event = await createEventAndNotify(payload, logger, projectId); const event = await createEventAndNotify(payload, logger, projectId);

View File

@@ -30,8 +30,7 @@ vi.mock('@openpanel/db', async () => {
// 30 minutes // 30 minutes
const SESSION_TIMEOUT = 30 * 60 * 1000; const SESSION_TIMEOUT = 30 * 60 * 1000;
const projectId = 'test-project'; const projectId = 'test-project';
const currentDeviceId = 'device-123'; const deviceId = 'device-123';
const previousDeviceId = 'device-456';
// Valid UUID used when creating a new session in tests // Valid UUID used when creating a new session in tests
const newSessionId = 'a1b2c3d4-e5f6-4789-a012-345678901234'; const newSessionId = 'a1b2c3d4-e5f6-4789-a012-345678901234';
const geo = { const geo = {
@@ -90,14 +89,12 @@ describe('incomingEvent', () => {
'openpanel-sdk-version': '1.0.0', 'openpanel-sdk-version': '1.0.0',
}, },
projectId, projectId,
currentDeviceId, deviceId,
previousDeviceId,
deviceId: currentDeviceId,
sessionId: newSessionId, sessionId: newSessionId,
}; };
const event = { const event = {
name: 'test_event', name: 'test_event',
deviceId: currentDeviceId, deviceId,
profileId: '', profileId: '',
sessionId: expect.stringMatching( sessionId: expect.stringMatching(
// biome-ignore lint/performance/useTopLevelRegex: test // biome-ignore lint/performance/useTopLevelRegex: test
@@ -145,7 +142,7 @@ describe('incomingEvent', () => {
}, },
{ {
delay: SESSION_TIMEOUT, delay: SESSION_TIMEOUT,
jobId: `sessionEnd:${projectId}:${currentDeviceId}`, jobId: `sessionEnd:${projectId}:${deviceId}`,
attempts: 3, attempts: 3,
backoff: { backoff: {
delay: 200, delay: 200,
@@ -185,9 +182,7 @@ describe('incomingEvent', () => {
}, },
uaInfo, uaInfo,
projectId, projectId,
currentDeviceId, deviceId,
previousDeviceId,
deviceId: currentDeviceId,
sessionId: 'session-123', sessionId: 'session-123',
}; };
@@ -201,9 +196,7 @@ describe('incomingEvent', () => {
type: 'createSessionEnd', type: 'createSessionEnd',
payload: { payload: {
sessionId: 'session-123', sessionId: 'session-123',
deviceId: currentDeviceId, deviceId,
profileId: currentDeviceId,
projectId,
}, },
}, },
} as Partial<Job> as Job); } as Partial<Job> as Job);
@@ -212,7 +205,7 @@ describe('incomingEvent', () => {
const event = { const event = {
name: 'test_event', name: 'test_event',
deviceId: currentDeviceId, deviceId,
profileId: '', profileId: '',
sessionId: 'session-123', sessionId: 'session-123',
projectId, projectId,
@@ -268,8 +261,6 @@ describe('incomingEvent', () => {
'request-id': '123', 'request-id': '123',
}, },
projectId, projectId,
currentDeviceId: '',
previousDeviceId: '',
deviceId: '', deviceId: '',
sessionId: '', sessionId: '',
uaInfo: uaInfoServer, uaInfo: uaInfoServer,
@@ -374,8 +365,6 @@ describe('incomingEvent', () => {
'request-id': '123', 'request-id': '123',
}, },
projectId, projectId,
currentDeviceId: '',
previousDeviceId: '',
deviceId: '', deviceId: '',
sessionId: '', sessionId: '',
uaInfo: uaInfoServer, uaInfo: uaInfoServer,

View File

@@ -1,5 +1,4 @@
import { getTime } from '@openpanel/common'; import type { IServiceCreateEventPayload } from '@openpanel/db';
import { type IServiceCreateEventPayload, createEvent } from '@openpanel/db';
import { import {
type EventsQueuePayloadCreateSessionEnd, type EventsQueuePayloadCreateSessionEnd,
sessionsQueue, sessionsQueue,
@@ -12,7 +11,7 @@ export const SESSION_TIMEOUT = 1000 * 60 * 30;
const getSessionEndJobId = (projectId: string, deviceId: string) => const getSessionEndJobId = (projectId: string, deviceId: string) =>
`sessionEnd:${projectId}:${deviceId}`; `sessionEnd:${projectId}:${deviceId}`;
export async function createSessionEndJob({ export function createSessionEndJob({
payload, payload,
}: { }: {
payload: IServiceCreateEventPayload; payload: IServiceCreateEventPayload;
@@ -31,27 +30,21 @@ export async function createSessionEndJob({
type: 'exponential', type: 'exponential',
delay: 200, delay: 200,
}, },
}, }
); );
} }
export async function getSessionEnd({ export async function getSessionEnd({
projectId, projectId,
currentDeviceId,
previousDeviceId,
deviceId, deviceId,
profileId, profileId,
}: { }: {
projectId: string; projectId: string;
currentDeviceId: string;
previousDeviceId: string;
deviceId: string; deviceId: string;
profileId: string; profileId: string;
}) { }) {
const sessionEnd = await getSessionEndJob({ const sessionEnd = await getSessionEndJob({
projectId, projectId,
currentDeviceId,
previousDeviceId,
deviceId, deviceId,
}); });
@@ -82,8 +75,6 @@ export async function getSessionEnd({
export async function getSessionEndJob(args: { export async function getSessionEndJob(args: {
projectId: string; projectId: string;
currentDeviceId: string;
previousDeviceId: string;
deviceId: string; deviceId: string;
retryCount?: number; retryCount?: number;
}): Promise<{ }): Promise<{
@@ -98,7 +89,7 @@ export async function getSessionEndJob(args: {
async function handleJobStates( async function handleJobStates(
job: Job<EventsQueuePayloadCreateSessionEnd>, job: Job<EventsQueuePayloadCreateSessionEnd>,
deviceId: string, deviceId: string
): Promise<{ ): Promise<{
deviceId: string; deviceId: string;
job: Job<EventsQueuePayloadCreateSessionEnd>; job: Job<EventsQueuePayloadCreateSessionEnd>;
@@ -134,28 +125,9 @@ export async function getSessionEndJob(args: {
return null; return null;
} }
// TODO: Remove this when migrated to deviceId
if (args.currentDeviceId && args.previousDeviceId) {
// Check current device job
const currentJob = await sessionsQueue.getJob(
getSessionEndJobId(args.projectId, args.currentDeviceId),
);
if (currentJob) {
return await handleJobStates(currentJob, args.currentDeviceId);
}
// Check previous device job
const previousJob = await sessionsQueue.getJob(
getSessionEndJobId(args.projectId, args.previousDeviceId),
);
if (previousJob) {
return await handleJobStates(previousJob, args.previousDeviceId);
}
}
// Check current device job // Check current device job
const currentJob = await sessionsQueue.getJob( const currentJob = await sessionsQueue.getJob(
getSessionEndJobId(args.projectId, args.deviceId), getSessionEndJobId(args.projectId, args.deviceId)
); );
if (currentJob) { if (currentJob) {
return await handleJobStates(currentJob, args.deviceId); return await handleJobStates(currentJob, args.deviceId);

View File

@@ -1,5 +1,3 @@
import { Queue, QueueEvents } from 'bullmq';
import { createHash } from 'node:crypto'; import { createHash } from 'node:crypto';
import type { import type {
IServiceCreateEventPayload, IServiceCreateEventPayload,
@@ -8,12 +6,13 @@ import type {
} from '@openpanel/db'; } from '@openpanel/db';
import { createLogger } from '@openpanel/logger'; import { createLogger } from '@openpanel/logger';
import { getRedisGroupQueue, getRedisQueue } from '@openpanel/redis'; import { getRedisGroupQueue, getRedisQueue } from '@openpanel/redis';
import { Queue, QueueEvents } from 'bullmq';
import { Queue as GroupQueue } from 'groupmq'; import { Queue as GroupQueue } from 'groupmq';
import type { ITrackPayload } from '../../validation'; import type { ITrackPayload } from '../../validation';
export const EVENTS_GROUP_QUEUES_SHARDS = Number.parseInt( export const EVENTS_GROUP_QUEUES_SHARDS = Number.parseInt(
process.env.EVENTS_GROUP_QUEUES_SHARDS || '1', process.env.EVENTS_GROUP_QUEUES_SHARDS || '1',
10, 10
); );
export const getQueueName = (name: string) => export const getQueueName = (name: string) =>
@@ -65,8 +64,6 @@ export interface EventsQueuePayloadIncomingEvent {
latitude: number | undefined; latitude: number | undefined;
}; };
headers: Record<string, string | undefined>; headers: Record<string, string | undefined>;
currentDeviceId: string; // TODO: Remove
previousDeviceId: string; // TODO: Remove
deviceId: string; deviceId: string;
sessionId: string; sessionId: string;
}; };
@@ -154,12 +151,12 @@ export type CronQueueType = CronQueuePayload['type'];
const orderingDelayMs = Number.parseInt( const orderingDelayMs = Number.parseInt(
process.env.ORDERING_DELAY_MS || '100', process.env.ORDERING_DELAY_MS || '100',
10, 10
); );
const autoBatchMaxWaitMs = Number.parseInt( const autoBatchMaxWaitMs = Number.parseInt(
process.env.AUTO_BATCH_MAX_WAIT_MS || '0', process.env.AUTO_BATCH_MAX_WAIT_MS || '0',
10, 10
); );
const autoBatchSize = Number.parseInt(process.env.AUTO_BATCH_SIZE || '0', 10); const autoBatchSize = Number.parseInt(process.env.AUTO_BATCH_SIZE || '0', 10);
@@ -170,12 +167,12 @@ export const eventsGroupQueues = Array.from({
new GroupQueue<EventsQueuePayloadIncomingEvent['payload']>({ new GroupQueue<EventsQueuePayloadIncomingEvent['payload']>({
logger: process.env.NODE_ENV === 'production' ? queueLogger : undefined, logger: process.env.NODE_ENV === 'production' ? queueLogger : undefined,
namespace: getQueueName( namespace: getQueueName(
list.length === 1 ? 'group_events' : `group_events_${index}`, list.length === 1 ? 'group_events' : `group_events_${index}`
), ),
redis: getRedisGroupQueue(), redis: getRedisGroupQueue(),
keepCompleted: 1_000, keepCompleted: 1000,
keepFailed: 10_000, keepFailed: 10_000,
orderingDelayMs: orderingDelayMs, orderingDelayMs,
autoBatch: autoBatch:
autoBatchMaxWaitMs && autoBatchSize autoBatchMaxWaitMs && autoBatchSize
? { ? {
@@ -183,7 +180,7 @@ export const eventsGroupQueues = Array.from({
size: autoBatchSize, size: autoBatchSize,
} }
: undefined, : undefined,
}), })
); );
export const getEventsGroupQueueShard = (groupId: string) => { export const getEventsGroupQueueShard = (groupId: string) => {
@@ -202,7 +199,7 @@ export const sessionsQueue = new Queue<SessionsQueuePayload>(
defaultJobOptions: { defaultJobOptions: {
removeOnComplete: 10, removeOnComplete: 10,
}, },
}, }
); );
export const sessionsQueueEvents = new QueueEvents(getQueueName('sessions'), { export const sessionsQueueEvents = new QueueEvents(getQueueName('sessions'), {
connection: getRedisQueue(), connection: getRedisQueue(),
@@ -236,7 +233,7 @@ export const notificationQueue = new Queue<NotificationQueuePayload>(
defaultJobOptions: { defaultJobOptions: {
removeOnComplete: 10, removeOnComplete: 10,
}, },
}, }
); );
export type ImportQueuePayload = { export type ImportQueuePayload = {
@@ -254,7 +251,7 @@ export const importQueue = new Queue<ImportQueuePayload>(
removeOnComplete: 10, removeOnComplete: 10,
removeOnFail: 50, removeOnFail: 50,
}, },
}, }
); );
export type InsightsQueuePayloadProject = { export type InsightsQueuePayloadProject = {
@@ -269,5 +266,5 @@ export const insightsQueue = new Queue<InsightsQueuePayloadProject>(
defaultJobOptions: { defaultJobOptions: {
removeOnComplete: 100, removeOnComplete: 100,
}, },
}, }
); );