feat(email): route all email sends through Cloudflare Queue

Introduces a CF Queue binding (kk-email-queue) to decouple email
delivery from request handlers, preventing slow responses and
providing automatic retries. All send*Email calls now go through
the queue when the binding is available, with direct-send fallbacks
for local dev. Reminder fan-outs mark DB rows optimistically before
enqueueing to prevent re-enqueue on subsequent cron ticks.
This commit is contained in:
2026-03-11 17:13:35 +01:00
parent 7f431c0091
commit e5d2b13b21
12 changed files with 524 additions and 106 deletions

View File

@@ -9,6 +9,7 @@ import { user } from "@kk/db/schema/auth";
import { ORPCError } from "@orpc/server";
import { and, desc, eq, gte, lte, sql } from "drizzle-orm";
import { z } from "zod";
import type { EmailMessage } from "../email-queue";
import { adminProcedure, protectedProcedure } from "../index";
import { sendDeductionEmail } from "../lib/drinkkaart-email";
import {
@@ -339,7 +340,7 @@ export const drinkkaartRouter = {
createdAt: now,
});
// Fire-and-forget deduction email
// Fire-and-forget deduction email (via queue when available)
const cardUser = await db
.select({ email: user.email, name: user.name })
.from(user)
@@ -348,14 +349,21 @@ export const drinkkaartRouter = {
.then((r) => r[0]);
if (cardUser) {
await sendDeductionEmail({
const deductionMsg: EmailMessage = {
type: "deduction",
to: cardUser.email,
firstName: cardUser.name.split(" ")[0] ?? cardUser.name,
amountCents: input.amountCents,
newBalanceCents: balanceAfter,
}).catch((err) =>
console.error("Failed to send deduction email:", err),
);
};
if (context.emailQueue) {
await context.emailQueue.send(deductionMsg);
} else {
await sendDeductionEmail(deductionMsg).catch((err) =>
console.error("Failed to send deduction email:", err),
);
}
}
return {

View File

@@ -29,6 +29,20 @@ import {
sendSubscriptionConfirmationEmail,
sendUpdateEmail,
} from "../email";
import type { EmailMessage } from "../email-queue";
// Minimal CF Queue binding shape (mirrors context.ts)
type EmailQueue = {
send(
message: EmailMessage,
options?: { contentType?: string },
): Promise<void>;
sendBatch(
messages: Array<{ body: EmailMessage }>,
options?: { contentType?: string },
): Promise<void>;
};
import { adminProcedure, protectedProcedure, publicProcedure } from "../index";
import { generateQrSecret } from "../lib/drinkkaart-utils";
import { drinkkaartRouter } from "./drinkkaart";
@@ -294,7 +308,7 @@ export const appRouter = {
submitRegistration: publicProcedure
.input(submitRegistrationSchema)
.handler(async ({ input }) => {
.handler(async ({ input, context }) => {
const managementToken = randomUUID();
const isPerformer = input.registrationType === "performer";
const guests = isPerformer ? [] : (input.guests ?? []);
@@ -316,7 +330,8 @@ export const appRouter = {
managementToken,
});
await sendConfirmationEmail({
const confirmationMsg: EmailMessage = {
type: "registrationConfirmation",
to: input.email,
firstName: input.firstName,
managementToken,
@@ -324,12 +339,18 @@ export const appRouter = {
artForm: input.artForm,
giftAmount: input.giftAmount,
drinkCardValue: isPerformer ? 0 : drinkCardEuros(guests.length),
}).catch((err) =>
emailLog("error", "email.catch", {
type: "confirmation",
error: String(err),
}),
);
};
if (context.emailQueue) {
await context.emailQueue.send(confirmationMsg);
} else {
await sendConfirmationEmail(confirmationMsg).catch((err) =>
emailLog("error", "email.catch", {
type: "confirmation",
error: String(err),
}),
);
}
return { success: true, managementToken };
}),
@@ -352,7 +373,7 @@ export const appRouter = {
updateRegistration: publicProcedure
.input(updateRegistrationSchema)
.handler(async ({ input }) => {
.handler(async ({ input, context }) => {
const row = await getActiveRegistration(input.token);
const isPerformer = input.registrationType === "performer";
@@ -408,7 +429,8 @@ export const appRouter = {
})
.where(eq(registration.managementToken, input.token));
await sendUpdateEmail({
const updateMsg: EmailMessage = {
type: "updateConfirmation",
to: input.email,
firstName: input.firstName,
managementToken: input.token,
@@ -416,19 +438,25 @@ export const appRouter = {
artForm: input.artForm,
giftAmount: input.giftAmount,
drinkCardValue: isPerformer ? 0 : drinkCardEuros(guests.length),
}).catch((err) =>
emailLog("error", "email.catch", {
type: "update",
error: String(err),
}),
);
};
if (context.emailQueue) {
await context.emailQueue.send(updateMsg);
} else {
await sendUpdateEmail(updateMsg).catch((err) =>
emailLog("error", "email.catch", {
type: "update",
error: String(err),
}),
);
}
return { success: true };
}),
cancelRegistration: publicProcedure
.input(z.object({ token: z.string().uuid() }))
.handler(async ({ input }) => {
.handler(async ({ input, context }) => {
const row = await getActiveRegistration(input.token);
await db
@@ -436,15 +464,22 @@ export const appRouter = {
.set({ cancelledAt: new Date() })
.where(eq(registration.managementToken, input.token));
await sendCancellationEmail({
const cancellationMsg: EmailMessage = {
type: "cancellation",
to: row.email,
firstName: row.firstName,
}).catch((err) =>
emailLog("error", "email.catch", {
type: "cancellation",
error: String(err),
}),
);
};
if (context.emailQueue) {
await context.emailQueue.send(cancellationMsg);
} else {
await sendCancellationEmail(cancellationMsg).catch((err) =>
emailLog("error", "email.catch", {
type: "cancellation",
error: String(err),
}),
);
}
return { success: true };
}),
@@ -854,7 +889,7 @@ export const appRouter = {
*/
subscribeReminder: publicProcedure
.input(z.object({ email: z.string().email() }))
.handler(async ({ input }) => {
.handler(async ({ input, context }) => {
const now = Date.now();
// Registration is already open — no point subscribing
@@ -879,16 +914,25 @@ export const appRouter = {
email: input.email,
});
// Awaited — CF Workers abandon unawaited promises when the response
// returns. Mail errors are caught so they don't fail the request.
await sendSubscriptionConfirmationEmail({ to: input.email }).catch(
(err) =>
emailLog("error", "email.catch", {
type: "subscription_confirmation",
to: input.email,
error: String(err),
}),
);
const subMsg: EmailMessage = {
type: "subscriptionConfirmation",
to: input.email,
};
if (context.emailQueue) {
await context.emailQueue.send(subMsg);
} else {
// Awaited — CF Workers abandon unawaited promises when the response
// returns. Mail errors are caught so they don't fail the request.
await sendSubscriptionConfirmationEmail({ to: input.email }).catch(
(err) =>
emailLog("error", "email.catch", {
type: "subscription_confirmation",
to: input.email,
error: String(err),
}),
);
}
return { ok: true };
}),
@@ -1104,8 +1148,14 @@ export type AppRouterClient = RouterClient<typeof appRouter>;
* Standalone function that sends pending reminder emails.
* Exported so that the Cloudflare Cron HTTP handler can call it directly
* without needing drizzle-orm as a direct dependency of apps/web.
*
* When `emailQueue` is provided, reminder fan-outs mark the DB rows
* optimistically before calling `sendBatch()` — preventing re-enqueue on
* the next cron tick while still letting the queue handle email delivery
* and retries. Payment reminders always run directly since they need
* per-row DB updates tightly coupled to the send.
*/
export async function runSendReminders(): Promise<{
export async function runSendReminders(emailQueue?: EmailQueue): Promise<{
sent: number;
skipped: boolean;
reason?: string;
@@ -1137,21 +1187,41 @@ export async function runSendReminders(): Promise<{
emailLog("info", "reminders.24h.pending", { count: pending.length });
for (const row of pending) {
try {
await sendReminder24hEmail({ to: row.email });
if (emailQueue && pending.length > 0) {
// Mark rows optimistically before enqueuing — prevents re-enqueue on the
// next cron tick if the queue send succeeds but the consumer hasn't run yet.
for (const row of pending) {
await db
.update(reminder)
.set({ sent24hAt: new Date() })
.where(eq(reminder.id, row.id));
sent++;
} catch (err) {
errors.push(`24h ${row.email}: ${String(err)}`);
emailLog("error", "email.catch", {
type: "reminder_24h",
to: row.email,
error: String(err),
});
}
await emailQueue.sendBatch(
pending.map((row) => ({
body: {
type: "reminder24h" as const,
to: row.email,
} satisfies EmailMessage,
})),
);
sent += pending.length;
} else {
for (const row of pending) {
try {
await sendReminder24hEmail({ to: row.email });
await db
.update(reminder)
.set({ sent24hAt: new Date() })
.where(eq(reminder.id, row.id));
sent++;
} catch (err) {
errors.push(`24h ${row.email}: ${String(err)}`);
emailLog("error", "email.catch", {
type: "reminder_24h",
to: row.email,
error: String(err),
});
}
}
}
}
@@ -1164,26 +1234,47 @@ export async function runSendReminders(): Promise<{
emailLog("info", "reminders.1h.pending", { count: pending.length });
for (const row of pending) {
try {
await sendReminderEmail({ to: row.email });
if (emailQueue && pending.length > 0) {
// Mark rows optimistically before enqueuing — prevents re-enqueue on the
// next cron tick if the queue send succeeds but the consumer hasn't run yet.
for (const row of pending) {
await db
.update(reminder)
.set({ sentAt: new Date() })
.where(eq(reminder.id, row.id));
sent++;
} catch (err) {
errors.push(`1h ${row.email}: ${String(err)}`);
emailLog("error", "email.catch", {
type: "reminder_1h",
to: row.email,
error: String(err),
});
}
await emailQueue.sendBatch(
pending.map((row) => ({
body: {
type: "reminder1h" as const,
to: row.email,
} satisfies EmailMessage,
})),
);
sent += pending.length;
} else {
for (const row of pending) {
try {
await sendReminderEmail({ to: row.email });
await db
.update(reminder)
.set({ sentAt: new Date() })
.where(eq(reminder.id, row.id));
sent++;
} catch (err) {
errors.push(`1h ${row.email}: ${String(err)}`);
emailLog("error", "email.catch", {
type: "reminder_1h",
to: row.email,
error: String(err),
});
}
}
}
}
// Payment reminders: watchers with pending payment older than 3 days
// Payment reminders: watchers with pending payment older than 3 days.
// Always run directly (not via queue) — each row needs a DB update after send.
const threeDaysAgo = now - 3 * 24 * 60 * 60 * 1000;
const unpaidWatchers = await db
.select()