a lot
This commit is contained in:
@@ -5,10 +5,12 @@ import { Worker } from 'bullmq';
|
||||
import express from 'express';
|
||||
|
||||
import { connection, eventsQueue } from '@mixan/queue';
|
||||
import { cronQueue } from '@mixan/queue/src/queues';
|
||||
|
||||
import { cronJob } from './jobs/cron';
|
||||
import { eventsJob } from './jobs/events';
|
||||
|
||||
const PORT = process.env.PORT || 3001;
|
||||
const PORT = process.env.PORT || 3000;
|
||||
const serverAdapter = new ExpressAdapter();
|
||||
serverAdapter.setBasePath('/');
|
||||
const app = express();
|
||||
@@ -17,13 +19,43 @@ new Worker(eventsQueue.name, eventsJob, {
|
||||
connection,
|
||||
});
|
||||
|
||||
createBullBoard({
|
||||
queues: [new BullMQAdapter(eventsQueue)],
|
||||
serverAdapter: serverAdapter,
|
||||
new Worker(cronQueue.name, cronJob, {
|
||||
connection,
|
||||
});
|
||||
|
||||
app.use('/', serverAdapter.getRouter());
|
||||
async function start() {
|
||||
createBullBoard({
|
||||
queues: [new BullMQAdapter(eventsQueue), new BullMQAdapter(cronQueue)],
|
||||
serverAdapter: serverAdapter,
|
||||
});
|
||||
|
||||
app.listen(PORT, () => {
|
||||
console.log(`For the UI, open http://localhost:${PORT}/`);
|
||||
});
|
||||
app.use('/', serverAdapter.getRouter());
|
||||
|
||||
app.listen(PORT, () => {
|
||||
console.log(`For the UI, open http://localhost:${PORT}/`);
|
||||
});
|
||||
|
||||
const repeatableJobs = await cronQueue.getRepeatableJobs();
|
||||
|
||||
console.log(repeatableJobs);
|
||||
|
||||
await cronQueue.add(
|
||||
'salt',
|
||||
{
|
||||
type: 'salt',
|
||||
payload: undefined,
|
||||
},
|
||||
{
|
||||
jobId: 'salt',
|
||||
repeat: {
|
||||
utc: true,
|
||||
pattern: '0 0 * * *',
|
||||
},
|
||||
}
|
||||
);
|
||||
// if (!repeatableJobs.find((job) => job.name === 'salt')) {
|
||||
// console.log('Add salt job to queue');
|
||||
// }
|
||||
}
|
||||
|
||||
start();
|
||||
|
||||
22
apps/worker/src/jobs/cron.salt.ts
Normal file
22
apps/worker/src/jobs/cron.salt.ts
Normal file
@@ -0,0 +1,22 @@
|
||||
import { generateSalt } from '@mixan/common';
|
||||
import { db, getCurrentSalt } from '@mixan/db';
|
||||
|
||||
export async function salt() {
|
||||
const oldSalt = await getCurrentSalt();
|
||||
const newSalt = await db.salt.create({
|
||||
data: {
|
||||
salt: generateSalt(),
|
||||
},
|
||||
});
|
||||
|
||||
// Delete rest of the salts
|
||||
await db.salt.deleteMany({
|
||||
where: {
|
||||
salt: {
|
||||
notIn: [newSalt.salt, oldSalt],
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
return newSalt;
|
||||
}
|
||||
13
apps/worker/src/jobs/cron.ts
Normal file
13
apps/worker/src/jobs/cron.ts
Normal file
@@ -0,0 +1,13 @@
|
||||
import type { Job } from 'bullmq';
|
||||
|
||||
import type { CronQueuePayload } from '@mixan/queue/src/queues';
|
||||
|
||||
import { salt } from './cron.salt';
|
||||
|
||||
export async function cronJob(job: Job<CronQueuePayload>) {
|
||||
switch (job.data.type) {
|
||||
case 'salt': {
|
||||
return await salt();
|
||||
}
|
||||
}
|
||||
}
|
||||
56
apps/worker/src/jobs/events.create-session-end.ts
Normal file
56
apps/worker/src/jobs/events.create-session-end.ts
Normal file
@@ -0,0 +1,56 @@
|
||||
import type { Job } from 'bullmq';
|
||||
|
||||
import { getTime, toISOString } from '@mixan/common';
|
||||
import { createEvent, getEvents } from '@mixan/db';
|
||||
import type { EventsQueuePayloadCreateSessionEnd } from '@mixan/queue/src/queues';
|
||||
|
||||
export async function createSessionEnd(
|
||||
job: Job<EventsQueuePayloadCreateSessionEnd>
|
||||
) {
|
||||
const payload = job.data.payload;
|
||||
|
||||
const sql = `
|
||||
SELECT * FROM events
|
||||
WHERE
|
||||
profile_id = '${payload.profileId}'
|
||||
AND created_at >= (
|
||||
SELECT created_at
|
||||
FROM events
|
||||
WHERE
|
||||
profile_id = '${payload.profileId}'
|
||||
AND name = 'session_start'
|
||||
ORDER BY created_at DESC
|
||||
LIMIT 1
|
||||
)
|
||||
ORDER BY created_at DESC
|
||||
`;
|
||||
job.log(sql);
|
||||
const events = await getEvents(sql);
|
||||
|
||||
const sessionDuration = events.reduce((acc, event) => {
|
||||
return acc + event.duration;
|
||||
}, 0);
|
||||
|
||||
const sessionStart = events.find((event) => event.name === 'session_start');
|
||||
const lastScreenView = events.find((event) => event.name === 'screen_view');
|
||||
const screenViews = events.filter((event) => event.name === 'screen_view');
|
||||
|
||||
if (!sessionStart) {
|
||||
throw new Error('Failed to find a session_start');
|
||||
}
|
||||
|
||||
if (!lastScreenView) {
|
||||
throw new Error('Failed to find a screen_view');
|
||||
}
|
||||
|
||||
return createEvent({
|
||||
...sessionStart,
|
||||
properties: {
|
||||
_bounce: screenViews.length === 1,
|
||||
},
|
||||
name: 'session_end',
|
||||
duration: sessionDuration,
|
||||
path: lastScreenView?.path ?? '',
|
||||
createdAt: toISOString(getTime(lastScreenView.createdAt) + 100),
|
||||
});
|
||||
}
|
||||
@@ -1,257 +1,24 @@
|
||||
import { getDevice, getOS } from '@/utils/user-agent';
|
||||
import type { Job } from 'bullmq';
|
||||
import { mergeDeepRight } from 'ramda';
|
||||
|
||||
import { db } from '@mixan/db';
|
||||
import type { EventsQueuePayload } from '@mixan/queue/src/queues';
|
||||
import type { BatchPayload } from '@mixan/types';
|
||||
import { createEvent } from '@mixan/db';
|
||||
import type {
|
||||
EventsQueuePayload,
|
||||
EventsQueuePayloadCreateSessionEnd,
|
||||
} from '@mixan/queue/src/queues';
|
||||
|
||||
import { createSessionEnd } from './events.create-session-end';
|
||||
|
||||
export async function eventsJob(job: Job<EventsQueuePayload>) {
|
||||
const projectId = job.data.projectId;
|
||||
const body = job.data.payload;
|
||||
|
||||
const profileIds = new Set<string>(
|
||||
body
|
||||
.map((item) => item.payload.profileId)
|
||||
.filter((id): id is string => typeof id === 'string' && id.length > 0)
|
||||
);
|
||||
|
||||
if (profileIds.size === 0) {
|
||||
return null;
|
||||
switch (job.data.type) {
|
||||
case 'createEvent': {
|
||||
return await createEvent(job.data.payload);
|
||||
}
|
||||
}
|
||||
|
||||
const profiles = await db.profile.findMany({
|
||||
where: {
|
||||
id: {
|
||||
in: Array.from(profileIds),
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
async function getProfile(profileId: string) {
|
||||
const profile = profiles.find((profile) => profile.id === profileId);
|
||||
if (profile) {
|
||||
return profile;
|
||||
switch (job.data.type) {
|
||||
case 'createSessionEnd': {
|
||||
return await createSessionEnd(
|
||||
job as Job<EventsQueuePayloadCreateSessionEnd>
|
||||
);
|
||||
}
|
||||
|
||||
const created = await db.profile.create({
|
||||
data: {
|
||||
id: profileId,
|
||||
properties: {},
|
||||
project_id: projectId,
|
||||
},
|
||||
});
|
||||
|
||||
profiles.push(created);
|
||||
|
||||
return created;
|
||||
}
|
||||
|
||||
const mergedBody: BatchPayload[] = body.reduce((acc, item) => {
|
||||
const canMerge =
|
||||
item.type === 'update_profile' || item.type === 'update_session';
|
||||
|
||||
if (!canMerge) {
|
||||
return [...acc, item];
|
||||
}
|
||||
|
||||
const match = acc.findIndex(
|
||||
(i) =>
|
||||
i.type === item.type && i.payload.profileId === item.payload.profileId
|
||||
);
|
||||
|
||||
if (acc[match]) {
|
||||
acc[match]!.payload = mergeDeepRight(acc[match]!.payload, item.payload);
|
||||
} else {
|
||||
acc.push(item);
|
||||
}
|
||||
|
||||
return acc;
|
||||
}, [] as BatchPayload[]);
|
||||
|
||||
const failedEvents: BatchPayload[] = [];
|
||||
|
||||
for (const item of mergedBody) {
|
||||
try {
|
||||
const { type, payload } = item;
|
||||
const profile = await getProfile(payload.profileId);
|
||||
switch (type) {
|
||||
case 'create_profile': {
|
||||
profile.properties = {
|
||||
...(typeof profile.properties === 'object'
|
||||
? profile.properties ?? {}
|
||||
: {}),
|
||||
...(payload.properties ?? {}),
|
||||
};
|
||||
await db.profile.update({
|
||||
where: {
|
||||
id: payload.profileId,
|
||||
},
|
||||
data: {
|
||||
properties: profile.properties,
|
||||
},
|
||||
});
|
||||
break;
|
||||
}
|
||||
case 'update_profile': {
|
||||
profile.properties = {
|
||||
...(typeof profile.properties === 'object'
|
||||
? profile.properties ?? {}
|
||||
: {}),
|
||||
...(payload.properties ?? {}),
|
||||
};
|
||||
await db.profile.update({
|
||||
where: {
|
||||
id: payload.profileId,
|
||||
},
|
||||
data: {
|
||||
external_id: payload.id,
|
||||
email: payload.email,
|
||||
first_name: payload.first_name,
|
||||
last_name: payload.last_name,
|
||||
avatar: payload.avatar,
|
||||
properties: profile.properties,
|
||||
},
|
||||
});
|
||||
break;
|
||||
}
|
||||
case 'set_profile_property': {
|
||||
if (
|
||||
typeof (profile.properties as Record<string, unknown>)[
|
||||
payload.name
|
||||
] === 'undefined'
|
||||
) {
|
||||
(profile.properties as Record<string, unknown>)[payload.name] =
|
||||
payload.value;
|
||||
|
||||
await db.profile.update({
|
||||
where: {
|
||||
id: payload.profileId,
|
||||
},
|
||||
data: {
|
||||
// @ts-expect-error
|
||||
properties: profile.properties,
|
||||
},
|
||||
});
|
||||
}
|
||||
break;
|
||||
}
|
||||
case 'increment': {
|
||||
await tickProfileProperty({
|
||||
profileId: payload.profileId,
|
||||
name: payload.name,
|
||||
tick: payload.value,
|
||||
});
|
||||
break;
|
||||
}
|
||||
case 'decrement': {
|
||||
await tickProfileProperty({
|
||||
profileId: payload.profileId,
|
||||
name: payload.name,
|
||||
tick: -Math.abs(payload.value),
|
||||
});
|
||||
break;
|
||||
}
|
||||
case 'event': {
|
||||
const userAgent = payload.properties.ua as string | undefined;
|
||||
if (userAgent) {
|
||||
payload.properties.device = getDevice(userAgent);
|
||||
payload.properties.os = getOS(userAgent);
|
||||
delete payload.properties.ua;
|
||||
}
|
||||
await db.event.create({
|
||||
data: {
|
||||
name: payload.name,
|
||||
properties: payload.properties,
|
||||
createdAt: payload.time,
|
||||
project_id: projectId,
|
||||
profile_id: payload.profileId,
|
||||
},
|
||||
});
|
||||
break;
|
||||
}
|
||||
case 'update_session': {
|
||||
const session = await db.event.findFirst({
|
||||
where: {
|
||||
profile_id: payload.profileId,
|
||||
project_id: projectId,
|
||||
name: 'session_start',
|
||||
},
|
||||
orderBy: {
|
||||
createdAt: 'desc',
|
||||
},
|
||||
});
|
||||
if (session) {
|
||||
await db.$executeRawUnsafe(
|
||||
`UPDATE events SET properties = '${JSON.stringify(
|
||||
payload.properties
|
||||
)}' || properties WHERE "createdAt" >= '${session.createdAt.toISOString()}' AND profile_id = '${
|
||||
payload.profileId
|
||||
}'`
|
||||
);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
job.log(`Failed to create "${item.type}"`);
|
||||
job.log(` > Payload: ${JSON.stringify(item.payload)}`);
|
||||
if (error instanceof Error) {
|
||||
job.log(` > Error: ${error.message.trim()}`);
|
||||
job.log(` > Stack: ${error.stack}`);
|
||||
}
|
||||
failedEvents.push(item);
|
||||
job.log(`---`);
|
||||
}
|
||||
} // end for
|
||||
|
||||
await db.eventFailed.createMany({
|
||||
data: failedEvents.map((item) => ({
|
||||
data: item as Record<string, any>,
|
||||
})),
|
||||
});
|
||||
|
||||
return body;
|
||||
}
|
||||
|
||||
export async function tickProfileProperty({
|
||||
profileId,
|
||||
tick,
|
||||
name,
|
||||
}: {
|
||||
profileId: string;
|
||||
tick: number;
|
||||
name: string;
|
||||
}) {
|
||||
const profile = await db.profile.findUniqueOrThrow({
|
||||
where: {
|
||||
id: profileId,
|
||||
},
|
||||
});
|
||||
|
||||
const properties = (
|
||||
typeof profile.properties === 'object' ? profile.properties ?? {} : {}
|
||||
) as Record<string, number>;
|
||||
const value = name in properties ? properties[name] : 0;
|
||||
|
||||
if (typeof value !== 'number') {
|
||||
return `Property "${name}" on user is of type ${typeof value}`;
|
||||
}
|
||||
|
||||
if (typeof tick !== 'number') {
|
||||
// eslint-disable-next-line @typescript-eslint/restrict-template-expressions
|
||||
return `Value is not a number ${tick} (${typeof tick})`;
|
||||
}
|
||||
|
||||
await db.profile.update({
|
||||
where: {
|
||||
id: profileId,
|
||||
},
|
||||
data: {
|
||||
properties: {
|
||||
...properties,
|
||||
[name]: value + tick,
|
||||
},
|
||||
},
|
||||
});
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user