This commit is contained in:
Carl-Gerhard Lindesvärd
2026-01-21 08:25:32 +01:00
parent 56f1c5e894
commit a58761e8d7
29 changed files with 777 additions and 298 deletions

View File

@@ -20,6 +20,7 @@
"@openpanel/json": "workspace:*",
"@openpanel/logger": "workspace:*",
"@openpanel/importer": "workspace:*",
"@openpanel/payments": "workspace:*",
"@openpanel/queue": "workspace:*",
"@openpanel/redis": "workspace:*",
"bullmq": "^5.63.0",

View File

@@ -1,180 +1,265 @@
import { differenceInDays } from 'date-fns';
import type { Job } from 'bullmq';
import { differenceInDays } from 'date-fns';
import { db } from '@openpanel/db';
import { sendEmail } from '@openpanel/email';
import {
type EmailData,
type EmailTemplate,
sendEmail,
} from '@openpanel/email';
import type { CronQueuePayload } from '@openpanel/queue';
import { getRecommendedPlan } from '@openpanel/payments';
import { logger } from '../utils/logger';
const EMAIL_SCHEDULE = {
1: 0, // Welcome email - Day 0
2: 2, // What to track - Day 2
3: 6, // Dashboards - Day 6
4: 14, // Replace stack - Day 14
5: 26, // Trial ending - Day 26
// Types for the onboarding email system
const orgQuery = {
include: {
createdBy: {
select: {
id: true,
email: true,
firstName: true,
deletedAt: true,
},
},
},
} as const;
type OrgWithCreator = Awaited<
ReturnType<typeof db.organization.findMany<typeof orgQuery>>
>[number];
type OnboardingContext = {
org: OrgWithCreator;
user: NonNullable<OrgWithCreator['createdBy']>;
};
type OnboardingEmail<T extends EmailTemplate = EmailTemplate> = {
day: number;
template: T;
shouldSend?: (ctx: OnboardingContext) => Promise<boolean | 'complete'>;
data: (ctx: OnboardingContext) => EmailData<T>;
};
// Helper to create type-safe email entries with correlated template/data types
function email<T extends EmailTemplate>(config: OnboardingEmail<T>) {
return config;
}
const getters = {
firstName: (ctx: OnboardingContext) => ctx.user.firstName || undefined,
organizationName: (ctx: OnboardingContext) => ctx.org.name,
dashboardUrl: (ctx: OnboardingContext) => {
return `${process.env.DASHBOARD_URL}/${ctx.org.id}`;
},
billingUrl: (ctx: OnboardingContext) => {
return `${process.env.DASHBOARD_URL}/${ctx.org.id}/billing`;
},
recommendedPlan: (ctx: OnboardingContext) => {
return getRecommendedPlan(
ctx.org.subscriptionPeriodEventsCount,
(plan) =>
`${plan.formattedEvents} events per month for ${plan.formattedPrice}`,
);
},
} as const;
// Declarative email schedule - easy to add, remove, or reorder
const ONBOARDING_EMAILS = [
email({
day: 0,
template: 'onboarding-welcome',
data: (ctx) => ({
firstName: getters.firstName(ctx),
dashboardUrl: getters.dashboardUrl(ctx),
}),
}),
email({
day: 2,
template: 'onboarding-what-to-track',
data: (ctx) => ({
firstName: getters.firstName(ctx),
}),
}),
email({
day: 6,
template: 'onboarding-dashboards',
data: (ctx) => ({
firstName: getters.firstName(ctx),
dashboardUrl: getters.dashboardUrl(ctx),
}),
}),
email({
day: 14,
template: 'onboarding-featue-request',
data: (ctx) => ({
firstName: getters.firstName(ctx),
}),
}),
email({
day: 26,
template: 'onboarding-trial-ending',
shouldSend: async ({ org }) => {
if (org.subscriptionStatus === 'active') {
return 'complete';
}
return true;
},
data: (ctx) => {
return {
firstName: getters.firstName(ctx),
organizationName: getters.organizationName(ctx),
billingUrl: getters.billingUrl(ctx),
recommendedPlan: getters.recommendedPlan(ctx),
};
},
}),
email({
day: 30,
template: 'onboarding-trial-ended',
shouldSend: async ({ org }) => {
if (org.subscriptionStatus === 'active') {
return 'complete';
}
return true;
},
data: (ctx) => {
return {
firstName: getters.firstName(ctx),
billingUrl: getters.billingUrl(ctx),
recommendedPlan: getters.recommendedPlan(ctx),
};
},
}),
];
export async function onboardingJob(job: Job<CronQueuePayload>) {
logger.info('Starting onboarding email job');
// Fetch organizations with their creators who are in onboarding
const organizations = await db.organization.findMany({
// Fetch organizations that are in onboarding (not completed)
const orgs = await db.organization.findMany({
where: {
createdByUserId: {
not: null,
onboarding: {
not: 'completed',
},
deleteAt: null,
createdBy: {
onboarding: {
not: null,
gte: 1,
lte: 5,
},
deletedAt: null,
},
},
include: {
createdBy: {
select: {
id: true,
email: true,
firstName: true,
lastName: true,
onboarding: true,
},
},
},
...orgQuery,
});
logger.info(`Found ${organizations.length} organizations with creators in onboarding`);
logger.info(`Found ${orgs.length} organizations in onboarding`);
let emailsSent = 0;
let usersCompleted = 0;
let usersSkipped = 0;
let orgsCompleted = 0;
let orgsSkipped = 0;
for (const org of organizations) {
if (!org.createdBy || !org.createdByUserId) {
for (const org of orgs) {
// Skip if no creator or creator is deleted
if (!org.createdBy || org.createdBy.deletedAt) {
orgsSkipped++;
continue;
}
const user = org.createdBy;
// Check if organization has active subscription
if (org.subscriptionStatus === 'active') {
// Stop onboarding for users with active subscriptions
await db.user.update({
where: { id: user.id },
data: { onboarding: null },
});
usersCompleted++;
logger.info(`Stopped onboarding for user ${user.id} (active subscription)`);
continue;
}
if (!user.onboarding || user.onboarding < 1 || user.onboarding > 5) {
continue;
}
// Use organization creation date instead of user registration date
const daysSinceOrgCreation = differenceInDays(new Date(), org.createdAt);
const requiredDays = EMAIL_SCHEDULE[user.onboarding as keyof typeof EMAIL_SCHEDULE];
if (daysSinceOrgCreation < requiredDays) {
usersSkipped++;
// Find the next email to send
// If org.onboarding is empty string, they haven't received any email yet
const lastSentIndex = org.onboarding
? ONBOARDING_EMAILS.findIndex((e) => e.template === org.onboarding)
: -1;
const nextEmailIndex = lastSentIndex + 1;
// No more emails to send
if (nextEmailIndex >= ONBOARDING_EMAILS.length) {
await db.organization.update({
where: { id: org.id },
data: { onboarding: 'completed' },
});
orgsCompleted++;
logger.info(
`Completed onboarding for organization ${org.id} (all emails sent)`,
);
continue;
}
const dashboardUrl = `${process.env.DASHBOARD_URL || process.env.NEXT_PUBLIC_DASHBOARD_URL || 'https://dashboard.openpanel.dev'}/${org.id}`;
const billingUrl = `${process.env.DASHBOARD_URL || process.env.NEXT_PUBLIC_DASHBOARD_URL || 'https://dashboard.openpanel.dev'}/${org.id}/billing`;
const nextEmail = ONBOARDING_EMAILS[nextEmailIndex];
if (!nextEmail) {
continue;
}
try {
// Send appropriate email based on onboarding step
switch (user.onboarding) {
case 1: {
// Welcome email
await sendEmail('onboarding-welcome', {
to: user.email,
data: {
firstName: user.firstName || undefined,
dashboardUrl,
},
});
break;
}
case 2: {
// What to track email
await sendEmail('onboarding-what-to-track', {
to: user.email,
data: {
firstName: user.firstName || undefined,
},
});
break;
}
case 3: {
// Dashboards email
await sendEmail('onboarding-dashboards', {
to: user.email,
data: {
firstName: user.firstName || undefined,
dashboardUrl,
},
});
break;
}
case 4: {
// Replace stack email
await sendEmail('onboarding-replace-stack', {
to: user.email,
data: {
firstName: user.firstName || undefined,
},
});
break;
}
case 5: {
// Trial ending email
await sendEmail('onboarding-trial-ending', {
to: user.email,
data: {
firstName: user.firstName || undefined,
organizationName: org.name,
billingUrl,
recommendedPlan: undefined, // TODO: Calculate based on usage
},
});
break;
}
// Check if enough days have passed
if (daysSinceOrgCreation < nextEmail.day) {
orgsSkipped++;
continue;
}
// Check shouldSend callback if defined
if (nextEmail.shouldSend) {
const result = await nextEmail.shouldSend({ org, user });
if (result === 'complete') {
await db.organization.update({
where: { id: org.id },
data: { onboarding: 'completed' },
});
orgsCompleted++;
logger.info(
`Completed onboarding for organization ${org.id} (shouldSend returned complete)`,
);
continue;
}
// Increment onboarding state
const nextOnboardingState = user.onboarding + 1;
await db.user.update({
where: { id: user.id },
data: {
onboarding: nextOnboardingState > 5 ? null : nextOnboardingState,
},
if (result === false) {
orgsSkipped++;
continue;
}
}
try {
const emailData = nextEmail.data({ org, user });
await sendEmail(nextEmail.template, {
to: user.email,
data: emailData as never,
});
// Update onboarding to the template name we just sent
await db.organization.update({
where: { id: org.id },
data: { onboarding: nextEmail.template },
});
emailsSent++;
logger.info(`Sent onboarding email ${user.onboarding} to user ${user.id} for org ${org.id}`);
if (nextOnboardingState > 5) {
usersCompleted++;
}
logger.info(
`Sent onboarding email "${nextEmail.template}" to organization ${org.id} (user ${user.id})`,
);
} catch (error) {
logger.error(`Failed to send onboarding email to user ${user.id}`, {
error,
onboardingStep: user.onboarding,
organizationId: org.id,
});
logger.error(
`Failed to send onboarding email to organization ${org.id}`,
{
error,
template: nextEmail.template,
},
);
}
}
logger.info('Completed onboarding email job', {
totalOrganizations: organizations.length,
totalOrgs: orgs.length,
emailsSent,
usersCompleted,
usersSkipped,
orgsCompleted,
orgsSkipped,
});
return {
totalOrgs: orgs.length,
emailsSent,
orgsCompleted,
orgsSkipped,
};
}