This commit is contained in:
Carl-Gerhard Lindesvärd
2026-03-06 13:59:03 +01:00
parent 70ca44f039
commit 2981638893
21 changed files with 1609 additions and 1 deletions

View File

@@ -16,3 +16,9 @@ export const google = new Arctic.Google(
process.env.GOOGLE_CLIENT_SECRET ?? '',
process.env.GOOGLE_REDIRECT_URI ?? '',
);
export const googleGsc = new Arctic.Google(
process.env.GOOGLE_CLIENT_ID ?? '',
process.env.GOOGLE_CLIENT_SECRET ?? '',
process.env.GSC_GOOGLE_REDIRECT_URI ?? '',
);

View File

@@ -0,0 +1,85 @@
import fs from 'node:fs';
import path from 'node:path';
import { createTable, runClickhouseMigrationCommands } from '../src/clickhouse/migration';
import { getIsCluster } from './helpers';
export async function up() {
const isClustered = getIsCluster();
const commonMetricColumns = [
'`clicks` UInt32 CODEC(Delta(4), LZ4)',
'`impressions` UInt32 CODEC(Delta(4), LZ4)',
'`ctr` Float32 CODEC(Gorilla, LZ4)',
'`position` Float32 CODEC(Gorilla, LZ4)',
'`synced_at` DateTime DEFAULT now() CODEC(Delta(4), LZ4)',
];
const sqls: string[] = [
// Daily totals — accurate overview numbers
...createTable({
name: 'gsc_daily',
columns: [
'`project_id` String CODEC(ZSTD(3))',
'`date` Date CODEC(Delta(2), LZ4)',
...commonMetricColumns,
],
orderBy: ['project_id', 'date'],
partitionBy: 'toYYYYMM(date)',
engine: 'ReplacingMergeTree(synced_at)',
distributionHash: 'cityHash64(project_id)',
replicatedVersion: '1',
isClustered,
}),
// Per-page breakdown
...createTable({
name: 'gsc_pages_daily',
columns: [
'`project_id` String CODEC(ZSTD(3))',
'`date` Date CODEC(Delta(2), LZ4)',
'`page` String CODEC(ZSTD(3))',
...commonMetricColumns,
],
orderBy: ['project_id', 'date', 'page'],
partitionBy: 'toYYYYMM(date)',
engine: 'ReplacingMergeTree(synced_at)',
distributionHash: 'cityHash64(project_id)',
replicatedVersion: '1',
isClustered,
}),
// Per-query breakdown
...createTable({
name: 'gsc_queries_daily',
columns: [
'`project_id` String CODEC(ZSTD(3))',
'`date` Date CODEC(Delta(2), LZ4)',
'`query` String CODEC(ZSTD(3))',
...commonMetricColumns,
],
orderBy: ['project_id', 'date', 'query'],
partitionBy: 'toYYYYMM(date)',
engine: 'ReplacingMergeTree(synced_at)',
distributionHash: 'cityHash64(project_id)',
replicatedVersion: '1',
isClustered,
}),
];
fs.writeFileSync(
path.join(__filename.replace('.ts', '.sql')),
sqls
.map((sql) =>
sql
.trim()
.replace(/;$/, '')
.replace(/\n{2,}/g, '\n')
.concat(';'),
)
.join('\n\n---\n\n'),
);
if (!process.argv.includes('--dry')) {
await runClickhouseMigrationCommands(sqls);
}
}

View File

@@ -31,3 +31,4 @@ export * from './src/services/overview.service';
export * from './src/services/pages.service';
export * from './src/services/insights';
export * from './src/session-context';
export * from './src/gsc';

View File

@@ -203,6 +203,7 @@ model Project {
notificationRules NotificationRule[]
notifications Notification[]
imports Import[]
gscConnection GscConnection?
// When deleteAt > now(), the project will be deleted
deleteAt DateTime?
@@ -612,6 +613,24 @@ model InsightEvent {
@@map("insight_events")
}
model GscConnection {
id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
projectId String @unique
project Project @relation(fields: [projectId], references: [id], onDelete: Cascade)
siteUrl String @default("")
accessToken String
refreshToken String
accessTokenExpiresAt DateTime?
lastSyncedAt DateTime?
lastSyncStatus String?
lastSyncError String?
backfillStatus String?
createdAt DateTime @default(now())
updatedAt DateTime @default(now()) @updatedAt
@@map("gsc_connections")
}
model EmailUnsubscribe {
id String @id @default(dbgenerated("gen_random_uuid()")) @db.Uuid
email String

View File

@@ -58,6 +58,9 @@ export const TABLE_NAMES = {
sessions: 'sessions',
events_imports: 'events_imports',
session_replay_chunks: 'session_replay_chunks',
gsc_daily: 'gsc_daily',
gsc_pages_daily: 'gsc_pages_daily',
gsc_queries_daily: 'gsc_queries_daily',
};
/**

341
packages/db/src/gsc.ts Normal file
View File

@@ -0,0 +1,341 @@
import { originalCh } from './clickhouse/client';
import { db } from './prisma-client';
export interface GscSite {
siteUrl: string;
permissionLevel: string;
}
async function refreshGscToken(
refreshToken: string
): Promise<{ accessToken: string; expiresAt: Date }> {
const params = new URLSearchParams({
client_id: process.env.GOOGLE_CLIENT_ID ?? '',
client_secret: process.env.GOOGLE_CLIENT_SECRET ?? '',
refresh_token: refreshToken,
grant_type: 'refresh_token',
});
const res = await fetch('https://oauth2.googleapis.com/token', {
method: 'POST',
headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
body: params.toString(),
});
if (!res.ok) {
const text = await res.text();
throw new Error(`Failed to refresh GSC token: ${text}`);
}
const data = (await res.json()) as {
access_token: string;
expires_in: number;
};
const expiresAt = new Date(Date.now() + data.expires_in * 1000);
return { accessToken: data.access_token, expiresAt };
}
export async function getGscAccessToken(projectId: string): Promise<string> {
const conn = await db.gscConnection.findUniqueOrThrow({
where: { projectId },
});
if (
conn.accessTokenExpiresAt &&
conn.accessTokenExpiresAt.getTime() > Date.now() + 60_000
) {
return conn.accessToken;
}
const { accessToken, expiresAt } = await refreshGscToken(conn.refreshToken);
await db.gscConnection.update({
where: { projectId },
data: { accessToken, accessTokenExpiresAt: expiresAt },
});
return accessToken;
}
export async function listGscSites(projectId: string): Promise<GscSite[]> {
const accessToken = await getGscAccessToken(projectId);
const res = await fetch('https://www.googleapis.com/webmaster/v3/sites', {
headers: { Authorization: `Bearer ${accessToken}` },
});
if (!res.ok) {
const text = await res.text();
throw new Error(`Failed to list GSC sites: ${text}`);
}
const data = (await res.json()) as {
siteEntry?: Array<{ siteUrl: string; permissionLevel: string }>;
};
return data.siteEntry ?? [];
}
interface GscApiRow {
keys: string[];
clicks: number;
impressions: number;
ctr: number;
position: number;
}
async function queryGscSearchAnalytics(
accessToken: string,
siteUrl: string,
startDate: string,
endDate: string,
dimensions: string[]
): Promise<GscApiRow[]> {
const encodedSiteUrl = encodeURIComponent(siteUrl);
const url = `https://www.googleapis.com/webmaster/v3/sites/${encodedSiteUrl}/searchAnalytics/query`;
const allRows: GscApiRow[] = [];
let startRow = 0;
const rowLimit = 25000;
while (true) {
const res = await fetch(url, {
method: 'POST',
headers: {
Authorization: `Bearer ${accessToken}`,
'Content-Type': 'application/json',
},
body: JSON.stringify({
startDate,
endDate,
dimensions,
rowLimit,
startRow,
dataState: 'all',
}),
});
if (!res.ok) {
const text = await res.text();
throw new Error(`GSC query failed for dimensions [${dimensions.join(',')}]: ${text}`);
}
const data = (await res.json()) as { rows?: GscApiRow[] };
const rows = data.rows ?? [];
allRows.push(...rows);
if (rows.length < rowLimit) break;
startRow += rowLimit;
}
return allRows;
}
function formatDate(date: Date): string {
return date.toISOString().slice(0, 10);
}
function nowString(): string {
return new Date().toISOString().replace('T', ' ').replace('Z', '');
}
export async function syncGscData(
projectId: string,
startDate: Date,
endDate: Date
): Promise<void> {
const conn = await db.gscConnection.findUniqueOrThrow({
where: { projectId },
});
if (!conn.siteUrl) {
throw new Error('No GSC site URL configured for this project');
}
const accessToken = await getGscAccessToken(projectId);
const start = formatDate(startDate);
const end = formatDate(endDate);
const syncedAt = nowString();
// 1. Daily totals — authoritative numbers for overview chart
const dailyRows = await queryGscSearchAnalytics(
accessToken,
conn.siteUrl,
start,
end,
['date']
);
if (dailyRows.length > 0) {
await originalCh.insert({
table: 'gsc_daily',
values: dailyRows.map((row) => ({
project_id: projectId,
date: row.keys[0] ?? '',
clicks: row.clicks,
impressions: row.impressions,
ctr: row.ctr,
position: row.position,
synced_at: syncedAt,
})),
format: 'JSONEachRow',
});
}
// 2. Per-page breakdown
const pageRows = await queryGscSearchAnalytics(
accessToken,
conn.siteUrl,
start,
end,
['date', 'page']
);
if (pageRows.length > 0) {
await originalCh.insert({
table: 'gsc_pages_daily',
values: pageRows.map((row) => ({
project_id: projectId,
date: row.keys[0] ?? '',
page: row.keys[1] ?? '',
clicks: row.clicks,
impressions: row.impressions,
ctr: row.ctr,
position: row.position,
synced_at: syncedAt,
})),
format: 'JSONEachRow',
});
}
// 3. Per-query breakdown
const queryRows = await queryGscSearchAnalytics(
accessToken,
conn.siteUrl,
start,
end,
['date', 'query']
);
if (queryRows.length > 0) {
await originalCh.insert({
table: 'gsc_queries_daily',
values: queryRows.map((row) => ({
project_id: projectId,
date: row.keys[0] ?? '',
query: row.keys[1] ?? '',
clicks: row.clicks,
impressions: row.impressions,
ctr: row.ctr,
position: row.position,
synced_at: syncedAt,
})),
format: 'JSONEachRow',
});
}
}
export async function getGscOverview(
projectId: string,
startDate: string,
endDate: string
): Promise<
Array<{
date: string;
clicks: number;
impressions: number;
ctr: number;
position: number;
}>
> {
const result = await originalCh.query({
query: `
SELECT
date,
sum(clicks) as clicks,
sum(impressions) as impressions,
avg(ctr) as ctr,
avg(position) as position
FROM gsc_daily
FINAL
WHERE project_id = {projectId: String}
AND date >= {startDate: String}
AND date <= {endDate: String}
GROUP BY date
ORDER BY date ASC
`,
query_params: { projectId, startDate, endDate },
format: 'JSONEachRow',
});
return result.json();
}
export async function getGscPages(
projectId: string,
startDate: string,
endDate: string,
limit = 100
): Promise<
Array<{
page: string;
clicks: number;
impressions: number;
ctr: number;
position: number;
}>
> {
const result = await originalCh.query({
query: `
SELECT
page,
sum(clicks) as clicks,
sum(impressions) as impressions,
avg(ctr) as ctr,
avg(position) as position
FROM gsc_pages_daily
FINAL
WHERE project_id = {projectId: String}
AND date >= {startDate: String}
AND date <= {endDate: String}
GROUP BY page
ORDER BY clicks DESC
LIMIT {limit: UInt32}
`,
query_params: { projectId, startDate, endDate, limit },
format: 'JSONEachRow',
});
return result.json();
}
export async function getGscQueries(
projectId: string,
startDate: string,
endDate: string,
limit = 100
): Promise<
Array<{
query: string;
clicks: number;
impressions: number;
ctr: number;
position: number;
}>
> {
const result = await originalCh.query({
query: `
SELECT
query,
sum(clicks) as clicks,
sum(impressions) as impressions,
avg(ctr) as ctr,
avg(position) as position
FROM gsc_queries_daily
FINAL
WHERE project_id = {projectId: String}
AND date >= {startDate: String}
AND date <= {endDate: String}
GROUP BY query
ORDER BY clicks DESC
LIMIT {limit: UInt32}
`,
query_params: { projectId, startDate, endDate, limit },
format: 'JSONEachRow',
});
return result.json();
}

View File

@@ -126,6 +126,10 @@ export type CronQueuePayloadFlushReplay = {
type: 'flushReplay';
payload: undefined;
};
export type CronQueuePayloadGscSync = {
type: 'gscSync';
payload: undefined;
};
export type CronQueuePayload =
| CronQueuePayloadSalt
| CronQueuePayloadFlushEvents
@@ -136,7 +140,8 @@ export type CronQueuePayload =
| CronQueuePayloadPing
| CronQueuePayloadProject
| CronQueuePayloadInsightsDaily
| CronQueuePayloadOnboarding;
| CronQueuePayloadOnboarding
| CronQueuePayloadGscSync;
export type MiscQueuePayloadTrialEndingSoon = {
type: 'trialEndingSoon';
@@ -268,3 +273,21 @@ export const insightsQueue = new Queue<InsightsQueuePayloadProject>(
},
}
);
export type GscQueuePayloadSync = {
type: 'gscProjectSync';
payload: { projectId: string };
};
export type GscQueuePayloadBackfill = {
type: 'gscProjectBackfill';
payload: { projectId: string };
};
export type GscQueuePayload = GscQueuePayloadSync | GscQueuePayloadBackfill;
export const gscQueue = new Queue<GscQueuePayload>(getQueueName('gsc'), {
connection: getRedisQueue(),
defaultJobOptions: {
removeOnComplete: 50,
removeOnFail: 100,
},
});

View File

@@ -1,4 +1,5 @@
import { authRouter } from './routers/auth';
import { gscRouter } from './routers/gsc';
import { chartRouter } from './routers/chart';
import { chatRouter } from './routers/chat';
import { clientRouter } from './routers/client';
@@ -53,6 +54,7 @@ export const appRouter = createTRPCRouter({
insight: insightRouter,
widget: widgetRouter,
email: emailRouter,
gsc: gscRouter,
});
// export type definition of API

View File

@@ -0,0 +1,201 @@
import { Arctic, googleGsc } from '@openpanel/auth';
import {
db,
getGscOverview,
getGscPages,
getGscQueries,
listGscSites,
} from '@openpanel/db';
import { gscQueue } from '@openpanel/queue';
import { z } from 'zod';
import { getProjectAccess } from '../access';
import { TRPCAccessError, TRPCNotFoundError } from '../errors';
import { createTRPCRouter, protectedProcedure } from '../trpc';
export const gscRouter = createTRPCRouter({
getConnection: protectedProcedure
.input(z.object({ projectId: z.string() }))
.query(async ({ input, ctx }) => {
const access = await getProjectAccess({
projectId: input.projectId,
userId: ctx.session.userId,
});
if (!access) {
throw TRPCAccessError('You do not have access to this project');
}
return db.gscConnection.findUnique({
where: { projectId: input.projectId },
select: {
id: true,
siteUrl: true,
lastSyncedAt: true,
lastSyncStatus: true,
lastSyncError: true,
backfillStatus: true,
createdAt: true,
updatedAt: true,
},
});
}),
initiateOAuth: protectedProcedure
.input(z.object({ projectId: z.string() }))
.mutation(async ({ input, ctx }) => {
const access = await getProjectAccess({
projectId: input.projectId,
userId: ctx.session.userId,
});
if (!access) {
throw TRPCAccessError('You do not have access to this project');
}
const state = Arctic.generateState();
const codeVerifier = Arctic.generateCodeVerifier();
const url = googleGsc.createAuthorizationURL(state, codeVerifier, [
'https://www.googleapis.com/auth/webmaster.readonly',
]);
url.searchParams.set('access_type', 'offline');
url.searchParams.set('prompt', 'consent');
return {
url: url.toString(),
state,
codeVerifier,
projectId: input.projectId,
};
}),
getSites: protectedProcedure
.input(z.object({ projectId: z.string() }))
.query(async ({ input, ctx }) => {
const access = await getProjectAccess({
projectId: input.projectId,
userId: ctx.session.userId,
});
if (!access) {
throw TRPCAccessError('You do not have access to this project');
}
return listGscSites(input.projectId);
}),
selectSite: protectedProcedure
.input(z.object({ projectId: z.string(), siteUrl: z.string() }))
.mutation(async ({ input, ctx }) => {
const access = await getProjectAccess({
projectId: input.projectId,
userId: ctx.session.userId,
});
if (!access) {
throw TRPCAccessError('You do not have access to this project');
}
const conn = await db.gscConnection.findUnique({
where: { projectId: input.projectId },
});
if (!conn) {
throw TRPCNotFoundError('GSC connection not found');
}
await db.gscConnection.update({
where: { projectId: input.projectId },
data: {
siteUrl: input.siteUrl,
backfillStatus: 'pending',
},
});
await gscQueue.add('gscProjectBackfill', {
type: 'gscProjectBackfill',
payload: { projectId: input.projectId },
});
return { ok: true };
}),
disconnect: protectedProcedure
.input(z.object({ projectId: z.string() }))
.mutation(async ({ input, ctx }) => {
const access = await getProjectAccess({
projectId: input.projectId,
userId: ctx.session.userId,
});
if (!access) {
throw TRPCAccessError('You do not have access to this project');
}
await db.gscConnection.deleteMany({
where: { projectId: input.projectId },
});
return { ok: true };
}),
getOverview: protectedProcedure
.input(
z.object({
projectId: z.string(),
startDate: z.string(),
endDate: z.string(),
})
)
.query(async ({ input, ctx }) => {
const access = await getProjectAccess({
projectId: input.projectId,
userId: ctx.session.userId,
});
if (!access) {
throw TRPCAccessError('You do not have access to this project');
}
return getGscOverview(input.projectId, input.startDate, input.endDate);
}),
getPages: protectedProcedure
.input(
z.object({
projectId: z.string(),
startDate: z.string(),
endDate: z.string(),
limit: z.number().min(1).max(1000).optional().default(100),
})
)
.query(async ({ input, ctx }) => {
const access = await getProjectAccess({
projectId: input.projectId,
userId: ctx.session.userId,
});
if (!access) {
throw TRPCAccessError('You do not have access to this project');
}
return getGscPages(
input.projectId,
input.startDate,
input.endDate,
input.limit
);
}),
getQueries: protectedProcedure
.input(
z.object({
projectId: z.string(),
startDate: z.string(),
endDate: z.string(),
limit: z.number().min(1).max(1000).optional().default(100),
})
)
.query(async ({ input, ctx }) => {
const access = await getProjectAccess({
projectId: input.projectId,
userId: ctx.session.userId,
});
if (!access) {
throw TRPCAccessError('You do not have access to this project');
}
return getGscQueries(
input.projectId,
input.startDate,
input.endDate,
input.limit
);
}),
});