From e5d2b13b2117ade06feee68aeb0cfc15d0c3bc17 Mon Sep 17 00:00:00 2001 From: zias Date: Wed, 11 Mar 2026 17:13:35 +0100 Subject: [PATCH] feat(email): route all email sends through Cloudflare Queue Introduces a CF Queue binding (kk-email-queue) to decouple email delivery from request handlers, preventing slow responses and providing automatic retries. All send*Email calls now go through the queue when the binding is available, with direct-send fallbacks for local dev. Reminder fan-outs mark DB rows optimistically before enqueueing to prevent re-enqueue on subsequent cron ticks. --- apps/web/package.json | 1 + apps/web/src/router.tsx | 19 ++ apps/web/src/routes/api/rpc/$.ts | 28 ++- apps/web/src/routes/api/webhook/mollie.ts | 41 ++++- apps/web/src/server.ts | 64 ++++++- bun.lock | 1 + packages/api/src/context.ts | 22 ++- packages/api/src/email-queue.ts | 147 +++++++++++++++ packages/api/src/lib/drinkkaart-email.ts | 60 ++++--- packages/api/src/routers/drinkkaart.ts | 18 +- packages/api/src/routers/index.ts | 209 ++++++++++++++++------ packages/infra/alchemy.run.ts | 20 ++- 12 files changed, 524 insertions(+), 106 deletions(-) create mode 100644 packages/api/src/email-queue.ts diff --git a/apps/web/package.json b/apps/web/package.json index 599def1..9af967a 100644 --- a/apps/web/package.json +++ b/apps/web/package.json @@ -52,6 +52,7 @@ }, "devDependencies": { "@cloudflare/vite-plugin": "^1.17.1", + "@tanstack/router-core": "^1.141.1", "@kk/config": "workspace:*", "@tanstack/react-query-devtools": "^5.91.1", "@tanstack/react-router-devtools": "^1.141.1", diff --git a/apps/web/src/router.tsx b/apps/web/src/router.tsx index c1394fa..040d613 100644 --- a/apps/web/src/router.tsx +++ b/apps/web/src/router.tsx @@ -1,3 +1,4 @@ +import type { EmailMessage } from "@kk/api/email-queue"; import { QueryClientProvider } from "@tanstack/react-query"; import { createRouter as createTanStackRouter } from "@tanstack/react-router"; @@ -6,6 +7,18 @@ import Loader from "./components/loader"; import { routeTree } from "./routeTree.gen"; import { orpc, queryClient } from "./utils/orpc"; +// Minimal CF Queue binding shape needed for type inference +type Queue = { + send( + message: EmailMessage, + options?: { contentType?: string }, + ): Promise; + sendBatch( + messages: Array<{ body: EmailMessage }>, + options?: { contentType?: string }, + ): Promise; +}; + export const getRouter = () => { const router = createTanStackRouter({ routeTree, @@ -26,3 +39,9 @@ declare module "@tanstack/react-router" { router: ReturnType; } } + +declare module "@tanstack/router-core" { + interface Register { + server: { requestContext: { emailQueue?: Queue } }; + } +} diff --git a/apps/web/src/routes/api/rpc/$.ts b/apps/web/src/routes/api/rpc/$.ts index 108e0df..865ad23 100644 --- a/apps/web/src/routes/api/rpc/$.ts +++ b/apps/web/src/routes/api/rpc/$.ts @@ -1,4 +1,5 @@ import { createContext } from "@kk/api/context"; +import type { EmailMessage } from "@kk/api/email-queue"; import { appRouter } from "@kk/api/routers/index"; import { OpenAPIHandler } from "@orpc/openapi/fetch"; import { OpenAPIReferencePlugin } from "@orpc/openapi/plugins"; @@ -7,6 +8,18 @@ import { RPCHandler } from "@orpc/server/fetch"; import { ZodToJsonSchemaConverter } from "@orpc/zod/zod4"; import { createFileRoute } from "@tanstack/react-router"; +// Minimal CF Queue binding shape — mirrors the declaration in router.tsx / context.ts +type Queue = { + send( + message: EmailMessage, + options?: { contentType?: string }, + ): Promise; + sendBatch( + messages: Array<{ body: EmailMessage }>, + options?: { contentType?: string }, + ): Promise; +}; + const rpcHandler = new RPCHandler(appRouter, { interceptors: [ onError((error) => { @@ -28,16 +41,21 @@ const apiHandler = new OpenAPIHandler(appRouter, { ], }); -async function handle({ request }: { request: Request }) { - const rpcResult = await rpcHandler.handle(request, { +async function handle(ctx: { request: Request; context?: unknown }) { + // emailQueue is threaded in via the fetch wrapper in server.ts + const emailQueue = (ctx.context as { emailQueue?: Queue } | undefined) + ?.emailQueue; + const context = await createContext({ req: ctx.request, emailQueue }); + + const rpcResult = await rpcHandler.handle(ctx.request, { prefix: "/api/rpc", - context: await createContext({ req: request }), + context, }); if (rpcResult.response) return rpcResult.response; - const apiResult = await apiHandler.handle(request, { + const apiResult = await apiHandler.handle(ctx.request, { prefix: "/api/rpc/api-reference", - context: await createContext({ req: request }), + context, }); if (apiResult.response) return apiResult.response; diff --git a/apps/web/src/routes/api/webhook/mollie.ts b/apps/web/src/routes/api/webhook/mollie.ts index 6031d58..d110266 100644 --- a/apps/web/src/routes/api/webhook/mollie.ts +++ b/apps/web/src/routes/api/webhook/mollie.ts @@ -1,5 +1,6 @@ import { randomUUID } from "node:crypto"; import { sendPaymentConfirmationEmail } from "@kk/api/email"; +import type { EmailMessage } from "@kk/api/email-queue"; import { creditRegistrationToAccount } from "@kk/api/routers/index"; import { db } from "@kk/db"; import { drinkkaart, drinkkaartTopup, registration } from "@kk/db/schema"; @@ -8,6 +9,14 @@ import { env } from "@kk/env/server"; import { createFileRoute } from "@tanstack/react-router"; import { and, eq } from "drizzle-orm"; +// Minimal CF Queue binding shape used to enqueue emails +type EmailQueue = { + send( + message: EmailMessage, + options?: { contentType?: string }, + ): Promise; +}; + // Mollie payment object (relevant fields only) interface MolliePayment { id: string; @@ -41,7 +50,15 @@ async function fetchMolliePayment(paymentId: string): Promise { return response.json() as Promise; } -async function handleWebhook({ request }: { request: Request }) { +async function handleWebhook({ + request, + context, +}: { + request: Request; + context?: unknown; +}) { + const emailQueue = (context as { emailQueue?: EmailQueue } | undefined) + ?.emailQueue; if (!env.MOLLIE_API_KEY) { console.error("MOLLIE_API_KEY not configured"); return new Response("Payment provider not configured", { status: 500 }); @@ -207,19 +224,25 @@ async function handleWebhook({ request }: { request: Request }) { `Payment successful for registration ${registrationToken}, payment ${payment.id}`, ); - // Send payment confirmation email (fire-and-forget; don't block webhook) - sendPaymentConfirmationEmail({ + // Send payment confirmation email via queue when available, otherwise direct. + const confirmMsg: EmailMessage = { + type: "paymentConfirmation", to: regRow.email, firstName: regRow.firstName, managementToken: regRow.managementToken ?? registrationToken, drinkCardValue: regRow.drinkCardValue ?? undefined, giftAmount: regRow.giftAmount ?? undefined, - }).catch((err) => - console.error( - `Failed to send payment confirmation email for ${regRow.email}:`, - err, - ), - ); + }; + if (emailQueue) { + await emailQueue.send(confirmMsg); + } else { + sendPaymentConfirmationEmail(confirmMsg).catch((err) => + console.error( + `Failed to send payment confirmation email for ${regRow.email}:`, + err, + ), + ); + } // If this is a watcher with a drink card value, try to credit their // drinkkaart immediately — but only if they already have an account. diff --git a/apps/web/src/server.ts b/apps/web/src/server.ts index 5f0cd15..7003ca8 100644 --- a/apps/web/src/server.ts +++ b/apps/web/src/server.ts @@ -1,18 +1,72 @@ +import type { EmailMessage } from "@kk/api/email-queue"; +import { dispatchEmailMessage } from "@kk/api/email-queue"; import { runSendReminders } from "@kk/api/routers/index"; import { createStartHandler, defaultStreamHandler, } from "@tanstack/react-start/server"; -const fetch = createStartHandler(defaultStreamHandler); +// --------------------------------------------------------------------------- +// CF Workers globals — not in DOM/ES2022 lib +// --------------------------------------------------------------------------- +type MessageItem = { + body: Body; + ack(): void; + retry(): void; +}; +type MessageBatch = { messages: Array> }; +type ExecutionContext = { waitUntil(promise: Promise): void }; + +// --------------------------------------------------------------------------- +// Minimal CF Queue binding shape +// --------------------------------------------------------------------------- +type EmailQueue = { + send( + message: EmailMessage, + options?: { contentType?: string }, + ): Promise; + sendBatch( + messages: Array<{ body: EmailMessage }>, + options?: { contentType?: string }, + ): Promise; +}; + +type Env = { + EMAIL_QUEUE?: EmailQueue; +}; + +const startHandler = createStartHandler(defaultStreamHandler); export default { - fetch, + fetch(request: Request, env: Env) { + // Cast required: TanStack Start's BaseContext doesn't know about emailQueue, + // but it threads the value through to route handlers via requestContext. + return startHandler(request, { + context: { emailQueue: env.EMAIL_QUEUE } as Record, + }); + }, + + async queue( + batch: MessageBatch, + _env: Env, + _ctx: ExecutionContext, + ) { + for (const msg of batch.messages) { + try { + await dispatchEmailMessage(msg.body); + msg.ack(); + } catch (err) { + console.error("Queue dispatch failed:", err); + msg.retry(); + } + } + }, + async scheduled( _event: { cron: string; scheduledTime: number }, - _env: Record, - ctx: { waitUntil: (promise: Promise) => void }, + env: Env, + ctx: ExecutionContext, ) { - ctx.waitUntil(runSendReminders()); + ctx.waitUntil(runSendReminders(env.EMAIL_QUEUE)); }, }; diff --git a/bun.lock b/bun.lock index 8d8666b..58309a8 100644 --- a/bun.lock +++ b/bun.lock @@ -67,6 +67,7 @@ "@kk/config": "workspace:*", "@tanstack/react-query-devtools": "^5.91.1", "@tanstack/react-router-devtools": "^1.141.1", + "@tanstack/router-core": "^1.141.1", "@testing-library/dom": "^10.4.0", "@testing-library/react": "^16.2.0", "@types/canvas-confetti": "^1.9.0", diff --git a/packages/api/src/context.ts b/packages/api/src/context.ts index 9a77b41..79affc9 100644 --- a/packages/api/src/context.ts +++ b/packages/api/src/context.ts @@ -1,13 +1,33 @@ import { auth } from "@kk/auth"; import { env } from "@kk/env/server"; +import type { EmailMessage } from "./email-queue"; -export async function createContext({ req }: { req: Request }) { +// CF Workers runtime Queue type (not the alchemy resource type) +type Queue = { + send( + message: EmailMessage, + options?: { contentType?: string }, + ): Promise; + sendBatch( + messages: Array<{ body: EmailMessage }>, + options?: { contentType?: string }, + ): Promise; +}; + +export async function createContext({ + req, + emailQueue, +}: { + req: Request; + emailQueue?: Queue; +}) { const session = await auth.api.getSession({ headers: req.headers, }); return { session, env, + emailQueue, }; } diff --git a/packages/api/src/email-queue.ts b/packages/api/src/email-queue.ts new file mode 100644 index 0000000..1b5ce62 --- /dev/null +++ b/packages/api/src/email-queue.ts @@ -0,0 +1,147 @@ +/** + * Email queue types and dispatcher. + * + * All email sends are modelled as a discriminated union so the Cloudflare Queue + * consumer can pattern-match on `msg.type` and call the right send*Email(). + * + * The `Queue` type used in context.ts / server.ts refers to the + * CF runtime binding type (`import type { Queue } from "@cloudflare/workers-types"`). + */ + +import { + sendCancellationEmail, + sendConfirmationEmail, + sendPaymentConfirmationEmail, + sendPaymentReminderEmail, + sendReminder24hEmail, + sendReminderEmail, + sendSubscriptionConfirmationEmail, + sendUpdateEmail, +} from "./email"; +import { sendDeductionEmail } from "./lib/drinkkaart-email"; + +// --------------------------------------------------------------------------- +// Message types — one variant per send*Email function +// --------------------------------------------------------------------------- + +export type ConfirmationMessage = { + type: "registrationConfirmation"; + to: string; + firstName: string; + managementToken: string; + wantsToPerform: boolean; + artForm?: string | null; + giftAmount?: number; + drinkCardValue?: number; + signupUrl?: string; +}; + +export type UpdateMessage = { + type: "updateConfirmation"; + to: string; + firstName: string; + managementToken: string; + wantsToPerform: boolean; + artForm?: string | null; + giftAmount?: number; + drinkCardValue?: number; +}; + +export type CancellationMessage = { + type: "cancellation"; + to: string; + firstName: string; +}; + +export type SubscriptionConfirmationMessage = { + type: "subscriptionConfirmation"; + to: string; +}; + +export type Reminder24hMessage = { + type: "reminder24h"; + to: string; + firstName?: string | null; +}; + +export type Reminder1hMessage = { + type: "reminder1h"; + to: string; + firstName?: string | null; +}; + +export type PaymentReminderMessage = { + type: "paymentReminder"; + to: string; + firstName: string; + managementToken: string; + drinkCardValue?: number; + giftAmount?: number; +}; + +export type PaymentConfirmationMessage = { + type: "paymentConfirmation"; + to: string; + firstName: string; + managementToken: string; + drinkCardValue?: number; + giftAmount?: number; +}; + +export type DeductionMessage = { + type: "deduction"; + to: string; + firstName: string; + amountCents: number; + newBalanceCents: number; +}; + +export type EmailMessage = + | ConfirmationMessage + | UpdateMessage + | CancellationMessage + | SubscriptionConfirmationMessage + | Reminder24hMessage + | Reminder1hMessage + | PaymentReminderMessage + | PaymentConfirmationMessage + | DeductionMessage; + +// --------------------------------------------------------------------------- +// Consumer-side dispatcher — called once per queue message +// --------------------------------------------------------------------------- + +export async function dispatchEmailMessage(msg: EmailMessage): Promise { + switch (msg.type) { + case "registrationConfirmation": + await sendConfirmationEmail(msg); + break; + case "updateConfirmation": + await sendUpdateEmail(msg); + break; + case "cancellation": + await sendCancellationEmail(msg); + break; + case "subscriptionConfirmation": + await sendSubscriptionConfirmationEmail({ to: msg.to }); + break; + case "reminder24h": + await sendReminder24hEmail(msg); + break; + case "reminder1h": + await sendReminderEmail(msg); + break; + case "paymentReminder": + await sendPaymentReminderEmail(msg); + break; + case "paymentConfirmation": + await sendPaymentConfirmationEmail(msg); + break; + case "deduction": + await sendDeductionEmail(msg); + break; + default: + // Exhaustiveness check — TypeScript will catch unhandled variants at compile time + msg satisfies never; + } +} diff --git a/packages/api/src/lib/drinkkaart-email.ts b/packages/api/src/lib/drinkkaart-email.ts index ae5be43..e82746a 100644 --- a/packages/api/src/lib/drinkkaart-email.ts +++ b/packages/api/src/lib/drinkkaart-email.ts @@ -1,13 +1,15 @@ import { env } from "@kk/env/server"; import nodemailer from "nodemailer"; +import { emailLog } from "../email"; -// Re-use the same SMTP transport strategy as email.ts. -let _transport: nodemailer.Transporter | null | undefined; +// Re-use the same SMTP transport strategy as email.ts: +// only cache a successfully-created transporter; never cache null so that a +// cold isolate that receives env vars after module init can still pick them up. +let _transport: nodemailer.Transporter | undefined; function getTransport(): nodemailer.Transporter | null { - if (_transport !== undefined) return _transport; + if (_transport) return _transport; if (!env.SMTP_HOST || !env.SMTP_USER || !env.SMTP_PASS) { - _transport = null; return null; } _transport = nodemailer.createTransport({ @@ -22,9 +24,6 @@ function getTransport(): nodemailer.Transporter | null { return _transport; } -const from = env.SMTP_FROM ?? "Kunstenkamp "; -const baseUrl = env.BETTER_AUTH_URL ?? "https://kunstenkamp.be"; - function formatEuro(cents: number): string { return new Intl.NumberFormat("nl-BE", { style: "currency", @@ -116,7 +115,7 @@ function deductionHtml(params: { /** * Send a deduction notification email to the cardholder. - * Fire-and-forget: errors are logged but not re-thrown. + * Throws on SMTP error so the queue consumer can retry. */ export async function sendDeductionEmail(params: { to: string; @@ -124,9 +123,18 @@ export async function sendDeductionEmail(params: { amountCents: number; newBalanceCents: number; }): Promise { + emailLog("info", "email.attempt", { type: "deduction", to: params.to }); + const transport = getTransport(); if (!transport) { - console.warn("SMTP not configured — skipping deduction email"); + emailLog("warn", "email.skipped", { + type: "deduction", + to: params.to, + reason: "smtp_not_configured", + hasHost: !!env.SMTP_HOST, + hasUser: !!env.SMTP_USER, + hasPass: !!env.SMTP_PASS, + }); return; } @@ -144,16 +152,26 @@ export async function sendDeductionEmail(params: { currency: "EUR", }).format(params.amountCents / 100); - await transport.sendMail({ - from, - to: params.to, - subject: `Drinkkaart — ${amountFormatted} afgeschreven`, - html: deductionHtml({ - firstName: params.firstName, - amountCents: params.amountCents, - newBalanceCents: params.newBalanceCents, - dateTime, - drinkkaartUrl: `${baseUrl}/drinkkaart`, - }), - }); + try { + await transport.sendMail({ + from: env.SMTP_FROM, + to: params.to, + subject: `Drinkkaart — ${amountFormatted} afgeschreven`, + html: deductionHtml({ + firstName: params.firstName, + amountCents: params.amountCents, + newBalanceCents: params.newBalanceCents, + dateTime, + drinkkaartUrl: `${env.BETTER_AUTH_URL}/drinkkaart`, + }), + }); + emailLog("info", "email.sent", { type: "deduction", to: params.to }); + } catch (err) { + emailLog("error", "email.error", { + type: "deduction", + to: params.to, + error: String(err), + }); + throw err; + } } diff --git a/packages/api/src/routers/drinkkaart.ts b/packages/api/src/routers/drinkkaart.ts index 6e3626d..324aa5b 100644 --- a/packages/api/src/routers/drinkkaart.ts +++ b/packages/api/src/routers/drinkkaart.ts @@ -9,6 +9,7 @@ import { user } from "@kk/db/schema/auth"; import { ORPCError } from "@orpc/server"; import { and, desc, eq, gte, lte, sql } from "drizzle-orm"; import { z } from "zod"; +import type { EmailMessage } from "../email-queue"; import { adminProcedure, protectedProcedure } from "../index"; import { sendDeductionEmail } from "../lib/drinkkaart-email"; import { @@ -339,7 +340,7 @@ export const drinkkaartRouter = { createdAt: now, }); - // Fire-and-forget deduction email + // Fire-and-forget deduction email (via queue when available) const cardUser = await db .select({ email: user.email, name: user.name }) .from(user) @@ -348,14 +349,21 @@ export const drinkkaartRouter = { .then((r) => r[0]); if (cardUser) { - await sendDeductionEmail({ + const deductionMsg: EmailMessage = { + type: "deduction", to: cardUser.email, firstName: cardUser.name.split(" ")[0] ?? cardUser.name, amountCents: input.amountCents, newBalanceCents: balanceAfter, - }).catch((err) => - console.error("Failed to send deduction email:", err), - ); + }; + + if (context.emailQueue) { + await context.emailQueue.send(deductionMsg); + } else { + await sendDeductionEmail(deductionMsg).catch((err) => + console.error("Failed to send deduction email:", err), + ); + } } return { diff --git a/packages/api/src/routers/index.ts b/packages/api/src/routers/index.ts index 8e213ca..073b95c 100644 --- a/packages/api/src/routers/index.ts +++ b/packages/api/src/routers/index.ts @@ -29,6 +29,20 @@ import { sendSubscriptionConfirmationEmail, sendUpdateEmail, } from "../email"; +import type { EmailMessage } from "../email-queue"; + +// Minimal CF Queue binding shape (mirrors context.ts) +type EmailQueue = { + send( + message: EmailMessage, + options?: { contentType?: string }, + ): Promise; + sendBatch( + messages: Array<{ body: EmailMessage }>, + options?: { contentType?: string }, + ): Promise; +}; + import { adminProcedure, protectedProcedure, publicProcedure } from "../index"; import { generateQrSecret } from "../lib/drinkkaart-utils"; import { drinkkaartRouter } from "./drinkkaart"; @@ -294,7 +308,7 @@ export const appRouter = { submitRegistration: publicProcedure .input(submitRegistrationSchema) - .handler(async ({ input }) => { + .handler(async ({ input, context }) => { const managementToken = randomUUID(); const isPerformer = input.registrationType === "performer"; const guests = isPerformer ? [] : (input.guests ?? []); @@ -316,7 +330,8 @@ export const appRouter = { managementToken, }); - await sendConfirmationEmail({ + const confirmationMsg: EmailMessage = { + type: "registrationConfirmation", to: input.email, firstName: input.firstName, managementToken, @@ -324,12 +339,18 @@ export const appRouter = { artForm: input.artForm, giftAmount: input.giftAmount, drinkCardValue: isPerformer ? 0 : drinkCardEuros(guests.length), - }).catch((err) => - emailLog("error", "email.catch", { - type: "confirmation", - error: String(err), - }), - ); + }; + + if (context.emailQueue) { + await context.emailQueue.send(confirmationMsg); + } else { + await sendConfirmationEmail(confirmationMsg).catch((err) => + emailLog("error", "email.catch", { + type: "confirmation", + error: String(err), + }), + ); + } return { success: true, managementToken }; }), @@ -352,7 +373,7 @@ export const appRouter = { updateRegistration: publicProcedure .input(updateRegistrationSchema) - .handler(async ({ input }) => { + .handler(async ({ input, context }) => { const row = await getActiveRegistration(input.token); const isPerformer = input.registrationType === "performer"; @@ -408,7 +429,8 @@ export const appRouter = { }) .where(eq(registration.managementToken, input.token)); - await sendUpdateEmail({ + const updateMsg: EmailMessage = { + type: "updateConfirmation", to: input.email, firstName: input.firstName, managementToken: input.token, @@ -416,19 +438,25 @@ export const appRouter = { artForm: input.artForm, giftAmount: input.giftAmount, drinkCardValue: isPerformer ? 0 : drinkCardEuros(guests.length), - }).catch((err) => - emailLog("error", "email.catch", { - type: "update", - error: String(err), - }), - ); + }; + + if (context.emailQueue) { + await context.emailQueue.send(updateMsg); + } else { + await sendUpdateEmail(updateMsg).catch((err) => + emailLog("error", "email.catch", { + type: "update", + error: String(err), + }), + ); + } return { success: true }; }), cancelRegistration: publicProcedure .input(z.object({ token: z.string().uuid() })) - .handler(async ({ input }) => { + .handler(async ({ input, context }) => { const row = await getActiveRegistration(input.token); await db @@ -436,15 +464,22 @@ export const appRouter = { .set({ cancelledAt: new Date() }) .where(eq(registration.managementToken, input.token)); - await sendCancellationEmail({ + const cancellationMsg: EmailMessage = { + type: "cancellation", to: row.email, firstName: row.firstName, - }).catch((err) => - emailLog("error", "email.catch", { - type: "cancellation", - error: String(err), - }), - ); + }; + + if (context.emailQueue) { + await context.emailQueue.send(cancellationMsg); + } else { + await sendCancellationEmail(cancellationMsg).catch((err) => + emailLog("error", "email.catch", { + type: "cancellation", + error: String(err), + }), + ); + } return { success: true }; }), @@ -854,7 +889,7 @@ export const appRouter = { */ subscribeReminder: publicProcedure .input(z.object({ email: z.string().email() })) - .handler(async ({ input }) => { + .handler(async ({ input, context }) => { const now = Date.now(); // Registration is already open — no point subscribing @@ -879,16 +914,25 @@ export const appRouter = { email: input.email, }); - // Awaited — CF Workers abandon unawaited promises when the response - // returns. Mail errors are caught so they don't fail the request. - await sendSubscriptionConfirmationEmail({ to: input.email }).catch( - (err) => - emailLog("error", "email.catch", { - type: "subscription_confirmation", - to: input.email, - error: String(err), - }), - ); + const subMsg: EmailMessage = { + type: "subscriptionConfirmation", + to: input.email, + }; + + if (context.emailQueue) { + await context.emailQueue.send(subMsg); + } else { + // Awaited — CF Workers abandon unawaited promises when the response + // returns. Mail errors are caught so they don't fail the request. + await sendSubscriptionConfirmationEmail({ to: input.email }).catch( + (err) => + emailLog("error", "email.catch", { + type: "subscription_confirmation", + to: input.email, + error: String(err), + }), + ); + } return { ok: true }; }), @@ -1104,8 +1148,14 @@ export type AppRouterClient = RouterClient; * Standalone function that sends pending reminder emails. * Exported so that the Cloudflare Cron HTTP handler can call it directly * without needing drizzle-orm as a direct dependency of apps/web. + * + * When `emailQueue` is provided, reminder fan-outs mark the DB rows + * optimistically before calling `sendBatch()` — preventing re-enqueue on + * the next cron tick while still letting the queue handle email delivery + * and retries. Payment reminders always run directly since they need + * per-row DB updates tightly coupled to the send. */ -export async function runSendReminders(): Promise<{ +export async function runSendReminders(emailQueue?: EmailQueue): Promise<{ sent: number; skipped: boolean; reason?: string; @@ -1137,21 +1187,41 @@ export async function runSendReminders(): Promise<{ emailLog("info", "reminders.24h.pending", { count: pending.length }); - for (const row of pending) { - try { - await sendReminder24hEmail({ to: row.email }); + if (emailQueue && pending.length > 0) { + // Mark rows optimistically before enqueuing — prevents re-enqueue on the + // next cron tick if the queue send succeeds but the consumer hasn't run yet. + for (const row of pending) { await db .update(reminder) .set({ sent24hAt: new Date() }) .where(eq(reminder.id, row.id)); - sent++; - } catch (err) { - errors.push(`24h ${row.email}: ${String(err)}`); - emailLog("error", "email.catch", { - type: "reminder_24h", - to: row.email, - error: String(err), - }); + } + await emailQueue.sendBatch( + pending.map((row) => ({ + body: { + type: "reminder24h" as const, + to: row.email, + } satisfies EmailMessage, + })), + ); + sent += pending.length; + } else { + for (const row of pending) { + try { + await sendReminder24hEmail({ to: row.email }); + await db + .update(reminder) + .set({ sent24hAt: new Date() }) + .where(eq(reminder.id, row.id)); + sent++; + } catch (err) { + errors.push(`24h ${row.email}: ${String(err)}`); + emailLog("error", "email.catch", { + type: "reminder_24h", + to: row.email, + error: String(err), + }); + } } } } @@ -1164,26 +1234,47 @@ export async function runSendReminders(): Promise<{ emailLog("info", "reminders.1h.pending", { count: pending.length }); - for (const row of pending) { - try { - await sendReminderEmail({ to: row.email }); + if (emailQueue && pending.length > 0) { + // Mark rows optimistically before enqueuing — prevents re-enqueue on the + // next cron tick if the queue send succeeds but the consumer hasn't run yet. + for (const row of pending) { await db .update(reminder) .set({ sentAt: new Date() }) .where(eq(reminder.id, row.id)); - sent++; - } catch (err) { - errors.push(`1h ${row.email}: ${String(err)}`); - emailLog("error", "email.catch", { - type: "reminder_1h", - to: row.email, - error: String(err), - }); + } + await emailQueue.sendBatch( + pending.map((row) => ({ + body: { + type: "reminder1h" as const, + to: row.email, + } satisfies EmailMessage, + })), + ); + sent += pending.length; + } else { + for (const row of pending) { + try { + await sendReminderEmail({ to: row.email }); + await db + .update(reminder) + .set({ sentAt: new Date() }) + .where(eq(reminder.id, row.id)); + sent++; + } catch (err) { + errors.push(`1h ${row.email}: ${String(err)}`); + emailLog("error", "email.catch", { + type: "reminder_1h", + to: row.email, + error: String(err), + }); + } } } } - // Payment reminders: watchers with pending payment older than 3 days + // Payment reminders: watchers with pending payment older than 3 days. + // Always run directly (not via queue) — each row needs a DB update after send. const threeDaysAgo = now - 3 * 24 * 60 * 60 * 1000; const unpaidWatchers = await db .select() diff --git a/packages/infra/alchemy.run.ts b/packages/infra/alchemy.run.ts index ebf6583..ab5d0ad 100644 --- a/packages/infra/alchemy.run.ts +++ b/packages/infra/alchemy.run.ts @@ -1,5 +1,5 @@ import alchemy from "alchemy"; -import { TanStackStart } from "alchemy/cloudflare"; +import { Queue, TanStackStart } from "alchemy/cloudflare"; import { config } from "dotenv"; config({ path: "./.env" }); @@ -16,6 +16,10 @@ function getEnvVar(name: string): string { return value; } +const emailQueue = await Queue("email-queue", { + name: "kk-email-queue", +}); + export const web = await TanStackStart("web", { cwd: "../../apps/web", bindings: { @@ -34,7 +38,21 @@ export const web = await TanStackStart("web", { MOLLIE_API_KEY: getEnvVar("MOLLIE_API_KEY"), // Cron secret for protected scheduled endpoints CRON_SECRET: getEnvVar("CRON_SECRET"), + // Queue binding for async email sends + EMAIL_QUEUE: emailQueue, }, + // Queue consumer: the worker's queue() handler processes EmailMessage batches + eventSources: [ + { + queue: emailQueue, + settings: { + batchSize: 10, + maxRetries: 3, + retryDelay: 60, // seconds before retrying a failed message + maxWaitTimeMs: 1000, + }, + }, + ], // Fire every hour so reminder checks can run at 19:00 on 2026-03-15 (24h) and 18:00 on 2026-03-16 (1h) crons: ["0 * * * *"], domains: ["kunstenkamp.be", "www.kunstenkamp.be"],