35 Commits

Author SHA1 Message Date
Carl-Gerhard Lindesvärd
d0abf16e73 remove wait_for_async_insert 2025-11-15 22:03:57 +01:00
Carl-Gerhard Lindesvärd
fc531ce971 fix lock 2025-11-15 20:09:59 +01:00
Carl-Gerhard Lindesvärd
e6c7dec048 remove reqid and user agent 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
2ddc1754e0 better logger 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
3d1cdadb6f wip 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
2286c331d3 fix 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
9ee3d61a25 remove cluster names and add it behind env flag (if someone want to scale) 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
d8661acd66 fix: sync cachable 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
bb0e413b06 wip 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
37246f57f0 fix: groupmq 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
843f95f237 fix 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
ed8deeec3c fix comments 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
310a867cfa fix: comments 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
9bae0fb2db add: cleanup scripts 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
730c953bbc fix: default to 1 events queue shard 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
742ee8dc1c fix: simply event buffer 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
1285ad85a2 fix: performance related fixes 2025-11-15 20:09:04 +01:00
Carl-Gerhard Lindesvärd
8bb0c87ec9 fix: ignore private ips 2025-11-15 20:07:57 +01:00
Carl-Gerhard Lindesvärd
38cc53890a fix: update ip headers order 2025-11-14 14:30:40 +01:00
Carl-Gerhard Lindesvärd
e4fac81d27 fix: use latest sdks and proxy both request and op1.js in public 2025-11-14 13:30:39 +01:00
Carl-Gerhard Lindesvärd
7719985ad1 bump: @openpanel/nextjs 1.0.12 2025-11-14 13:29:01 +01:00
Carl-Gerhard Lindesvärd
c0cefe704b fix: exports in nextjs package 2025-11-14 13:28:30 +01:00
Carl-Gerhard Lindesvärd
5dbb462578 bump: @openpanel/nextjs 1.0.11 2025-11-14 13:26:40 +01:00
Carl-Gerhard Lindesvärd
2b0b62d64c fix: ts issue for nextjs sdk 2025-11-14 13:26:08 +01:00
Carl-Gerhard Lindesvärd
87e98baeb3 fix: lock file 2025-11-14 13:25:08 +01:00
Carl-Gerhard Lindesvärd
2abb44831c bump: @openpanel/nextjs 1.0.10 2025-11-14 13:20:33 +01:00
Carl-Gerhard Lindesvärd
f990cfcc18 bump: @openpanel/express 1.0.1 2025-11-14 13:20:00 +01:00
Carl-Gerhard Lindesvärd
8fbe944df0 fix: use correct client ip header 2025-11-14 13:16:57 +01:00
Carl-Gerhard Lindesvärd
c1801adaa2 fix: better formula support 2025-11-12 23:15:21 +01:00
Carl-Gerhard Lindesvärd
84fd5ce22f fix: metric chart total count 2025-11-12 23:15:20 +01:00
Carl-Gerhard Lindesvärd
447b7668fd fix: show prompt if usage limit is exceeded 2025-11-12 15:10:28 +01:00
Carl-Gerhard Lindesvärd
e505c0ea45 fix: max age on ui cookies 2025-11-12 15:10:12 +01:00
Carl-Gerhard Lindesvärd
e613a4e01c fix: dont count session events for current billing period 2025-11-12 14:57:03 +01:00
Carl-Gerhard Lindesvärd
723ba3ef6c add: admin cli 2025-11-12 14:57:02 +01:00
Carl-Gerhard Lindesvärd
9cafd61b25 feat: new billing and restrict access when trial has ended
* fix: simply billing

* fix usage graph

* imporve billing more + supporter prompt on self-hosting

* revert service change

* revert query builder

* fix: comments
2025-11-11 11:09:11 +01:00
119 changed files with 7761 additions and 4178 deletions

161
admin/README.md Normal file
View File

@@ -0,0 +1,161 @@
# OpenPanel Admin CLI
An interactive CLI tool to help manage and lookup OpenPanel organizations, projects, and clients.
## Setup
First, install dependencies:
```bash
cd admin
pnpm install
```
## Usage
Run the CLI from the admin directory:
```bash
pnpm start
```
Or use the convenient shell script from anywhere:
```bash
./admin/cli
```
## Features
The CLI provides 4 focused lookup commands for easier navigation:
### 🏢 Lookup by Organization
Search and view detailed information about an organization.
- Fuzzy search across all organizations by name or ID
- Shows full organization details with all projects, clients, and members
### 📊 Lookup by Project
Search for a specific project and view its organization context.
- Fuzzy search across all projects by name or ID
- Highlights the selected project in the organization view
- Displays: `org → project`
### 🔑 Lookup by Client ID
Search for a specific client and view its full context.
- Fuzzy search across all clients by name or ID
- Highlights the selected client and its project
- Displays: `org → project → client`
### 📧 Lookup by Email
Search for a member by email address.
- Fuzzy search across all member emails
- Shows which organization(s) the member belongs to
- Displays member role (👑 owner, ⭐ admin, 👤 member)
**All lookups display:**
- Organization information (ID, name, subscription status, timezone, event usage)
- Organization members and their roles
- All projects with their settings (domain, CORS, event counts)
- All clients for each project (ID, name, type, credentials)
- Deletion warnings if scheduled
---
### 🗑️ Clear Cache
Clear cache for an organization and all its projects.
- Fuzzy search to find the organization
- Shows organization details and all projects
- Confirms before clearing cache
- Provides organization ID and all project IDs for cache clearing logic
**Use when:**
- You need to invalidate cache after data changes
- Troubleshooting caching issues
- After manual database updates
**Note:** The cache clearing logic needs to be implemented. The command provides the organization and project data structure for you to add your cache clearing calls.
---
### 🔴 Delete Organization
Permanently delete an organization and all its data.
- Fuzzy search to find the organization
- Shows detailed preview of what will be deleted (projects, members, events)
- Requires **3 confirmations**:
1. Initial confirmation
2. Type organization name to confirm
3. Final warning confirmation
- Deletes from both PostgreSQL and ClickHouse
**Use when:**
- Removing organizations that are no longer needed
- Cleaning up test/demo organizations
- Handling deletion requests
**⚠️ WARNING:** This action is PERMANENT and cannot be undone!
**What gets deleted:**
- Organization record
- All projects and their settings
- All clients and credentials
- All events and analytics data (from ClickHouse)
- All member associations
- All dashboards and reports
---
### 🔴 Delete User
Permanently delete a user account and remove them from all organizations.
- Fuzzy search by email or name
- Shows which organizations the user belongs to
- Shows if user created any organizations (won't delete those orgs)
- Requires **3 confirmations**:
1. Initial confirmation
2. Type user email to confirm
3. Final warning confirmation
**Use when:**
- Removing user accounts at user request
- Cleaning up inactive accounts
- Handling GDPR/data deletion requests
**⚠️ WARNING:** This action is PERMANENT and cannot be undone!
**What gets deleted:**
- User account
- All auth sessions and tokens
- All memberships (removed from all orgs)
- All personal data
**What is NOT deleted:**
- Organizations created by the user (only the creator reference is removed)
## Environment Variables
Make sure you have the proper environment variables set up:
- `DATABASE_URL` - PostgreSQL connection string
- `DATABASE_URL_REPLICA` (optional) - Read replica connection string
## Development
The CLI uses:
- **jiti** - Direct TypeScript execution without build step
- **inquirer** - Interactive prompts
- **inquirer-autocomplete-prompt** - Fuzzy search functionality
- **chalk** - Colored terminal output
- **@openpanel/db** - Direct Prisma database access

25
admin/package.json Normal file
View File

@@ -0,0 +1,25 @@
{
"name": "@openpanel/admin",
"version": "0.0.1",
"type": "module",
"private": true,
"scripts": {
"start": "dotenv -e .env -c -- jiti src/cli.ts",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"@openpanel/common": "workspace:*",
"@openpanel/db": "workspace:*",
"chalk": "^5.3.0",
"fuzzy": "^0.1.3",
"inquirer": "^9.3.5",
"inquirer-autocomplete-prompt": "^3.0.1",
"jiti": "^2.4.2"
},
"devDependencies": {
"@types/inquirer": "^9.0.7",
"@types/inquirer-autocomplete-prompt": "^3.0.3",
"@types/node": "catalog:",
"typescript": "catalog:"
}
}

118
admin/src/cli.ts Normal file
View File

@@ -0,0 +1,118 @@
#!/usr/bin/env node
import inquirer from 'inquirer';
import { clearCache } from './commands/clear-cache';
import { deleteOrganization } from './commands/delete-organization';
import { deleteUser } from './commands/delete-user';
import { lookupByClient } from './commands/lookup-client';
import { lookupByEmail } from './commands/lookup-email';
import { lookupByOrg } from './commands/lookup-org';
import { lookupByProject } from './commands/lookup-project';
const secureEnv = (url: string) => {
const parsed = new URL(url);
if (parsed.username && parsed.password) {
return `${parsed.protocol}//${parsed.username}:${parsed.password.slice(0, 1)}...${parsed.password.slice(-1)}@${parsed.hostname}:${parsed.port}`;
}
return url;
};
async function main() {
console.log('\n🔧 OpenPanel Admin CLI\n');
const DATABASE_URL = process.env.DATABASE_URL;
const CLICKHOUSE_URL = process.env.CLICKHOUSE_URL;
const REDIS_URL = process.env.REDIS_URL;
if (!DATABASE_URL || !CLICKHOUSE_URL || !REDIS_URL) {
console.error('Environment variables are not set');
process.exit(1);
}
// Log environment variables for debugging
console.log('Environment:', {
NODE_ENV: process.env.NODE_ENV,
SELF_HOSTED: process.env.SELF_HOSTED ? 'Yes' : 'No',
DATABASE_URL: secureEnv(DATABASE_URL),
CLICKHOUSE_URL: secureEnv(CLICKHOUSE_URL),
REDIS_URL: secureEnv(REDIS_URL),
});
console.log('');
const { command } = await inquirer.prompt([
{
type: 'list',
name: 'command',
message: 'What would you like to do?',
pageSize: 20,
choices: [
{
name: '🏢 Lookup by Organization',
value: 'lookup-org',
},
{
name: '📊 Lookup by Project',
value: 'lookup-project',
},
{
name: '🔑 Lookup by Client ID',
value: 'lookup-client',
},
{
name: '📧 Lookup by Email',
value: 'lookup-email',
},
{
name: '🗑️ Clear Cache',
value: 'clear-cache',
},
{ name: '─────────────────────', value: 'separator', disabled: true },
{
name: '🔴 Delete Organization',
value: 'delete-org',
},
{
name: '🔴 Delete User',
value: 'delete-user',
},
{ name: '─────────────────────', value: 'separator', disabled: true },
{ name: '❌ Exit', value: 'exit' },
],
},
]);
switch (command) {
case 'lookup-org':
await lookupByOrg();
break;
case 'lookup-project':
await lookupByProject();
break;
case 'lookup-client':
await lookupByClient();
break;
case 'lookup-email':
await lookupByEmail();
break;
case 'clear-cache':
await clearCache();
break;
case 'delete-org':
await deleteOrganization();
break;
case 'delete-user':
await deleteUser();
break;
case 'exit':
console.log('Goodbye! 👋');
process.exit(0);
}
// Loop back to main menu
await main();
}
main().catch((error) => {
console.error('Error:', error);
process.exit(1);
});

View File

@@ -0,0 +1,162 @@
import {
db,
getOrganizationAccess,
getOrganizationByProjectIdCached,
getProjectAccess,
getProjectByIdCached,
} from '@openpanel/db';
import chalk from 'chalk';
import fuzzy from 'fuzzy';
import inquirer from 'inquirer';
import autocomplete from 'inquirer-autocomplete-prompt';
// Register autocomplete prompt
inquirer.registerPrompt('autocomplete', autocomplete);
interface OrgSearchItem {
id: string;
name: string;
displayText: string;
}
export async function clearCache() {
console.log(chalk.blue('\n🗑 Clear Cache\n'));
console.log('Loading organizations...\n');
const organizations = await db.organization.findMany({
orderBy: {
name: 'asc',
},
});
if (organizations.length === 0) {
console.log(chalk.red('No organizations found.'));
return;
}
const searchItems: OrgSearchItem[] = organizations.map((org) => ({
id: org.id,
name: org.name,
displayText: `${org.name} ${chalk.gray(`(${org.id})`)}`,
}));
const searchFunction = async (_answers: unknown, input = '') => {
const fuzzyResult = fuzzy.filter(input, searchItems, {
extract: (item: OrgSearchItem) => `${item.name} ${item.id}`,
});
return fuzzyResult.map((result: fuzzy.FilterResult<OrgSearchItem>) => ({
name: result.original.displayText,
value: result.original,
}));
};
const { selectedOrg } = (await inquirer.prompt([
{
type: 'autocomplete',
name: 'selectedOrg',
message: 'Search for an organization:',
source: searchFunction,
pageSize: 15,
},
])) as { selectedOrg: OrgSearchItem };
// Fetch organization with all projects
const organization = await db.organization.findUnique({
where: {
id: selectedOrg.id,
},
include: {
projects: {
orderBy: {
name: 'asc',
},
},
members: {
include: {
user: true,
},
},
},
});
if (!organization) {
console.log(chalk.red('Organization not found.'));
return;
}
console.log(chalk.yellow('\n📋 Organization Details:\n'));
console.log(` ${chalk.gray('ID:')} ${organization.id}`);
console.log(` ${chalk.gray('Name:')} ${organization.name}`);
console.log(` ${chalk.gray('Projects:')} ${organization.projects.length}`);
if (organization.projects.length > 0) {
console.log(chalk.yellow('\n📊 Projects:\n'));
for (const project of organization.projects) {
console.log(
` - ${project.name} ${chalk.gray(`(${project.id})`)} - ${chalk.cyan(`${project.eventsCount.toLocaleString()} events`)}`,
);
}
}
// Confirm before clearing cache
const { confirm } = await inquirer.prompt([
{
type: 'confirm',
name: 'confirm',
message: `Clear cache for organization "${organization.name}" and all ${organization.projects.length} projects?`,
default: false,
},
]);
if (!confirm) {
console.log(chalk.yellow('\nCache clear cancelled.'));
return;
}
console.log(chalk.blue('\n🔄 Clearing cache...\n'));
for (const project of organization.projects) {
// Clear project access cache for each member
for (const member of organization.members) {
if (!member.user?.id) continue;
console.log(
`Clearing cache for project: ${project.name} and member: ${member.user?.email}`,
);
await getProjectAccess.clear({
userId: member.user?.id,
projectId: project.id,
});
await getOrganizationAccess.clear({
userId: member.user?.id,
organizationId: organization.id,
});
}
console.log(`Clearing cache for project: ${project.name}`);
await getOrganizationByProjectIdCached.clear(project.id);
await getProjectByIdCached.clear(project.id);
}
console.log(chalk.gray(`Organization ID: ${organization.id}`));
console.log(
chalk.gray(
`Project IDs: ${organization.projects.map((p) => p.id).join(', ')}`,
),
);
// Example of what you might do:
/*
for (const project of organization.projects) {
console.log(`Clearing cache for project: ${project.name}...`);
// await clearProjectCache(project.id);
// await redis.del(`project:${project.id}:*`);
}
// Clear organization-level cache
// await clearOrganizationCache(organization.id);
// await redis.del(`organization:${organization.id}:*`);
console.log(chalk.green('\n✅ Cache cleared successfully!'));
*/
}

View File

@@ -0,0 +1,215 @@
import {
db,
deleteFromClickhouse,
deleteOrganization as deleteOrg,
} from '@openpanel/db';
import chalk from 'chalk';
import fuzzy from 'fuzzy';
import inquirer from 'inquirer';
import autocomplete from 'inquirer-autocomplete-prompt';
// Register autocomplete prompt
inquirer.registerPrompt('autocomplete', autocomplete);
interface OrgSearchItem {
id: string;
name: string;
displayText: string;
}
export async function deleteOrganization() {
console.log(chalk.red('\n🗑 Delete Organization\n'));
console.log(
chalk.yellow(
'⚠️ WARNING: This will permanently delete the organization and all its data!\n',
),
);
console.log('Loading organizations...\n');
const organizations = await db.organization.findMany({
include: {
projects: true,
members: {
include: {
user: true,
},
},
},
orderBy: {
name: 'asc',
},
});
if (organizations.length === 0) {
console.log(chalk.red('No organizations found.'));
return;
}
const searchItems: OrgSearchItem[] = organizations.map((org) => ({
id: org.id,
name: org.name,
displayText: `${org.name} ${chalk.gray(`(${org.id})`)} ${chalk.cyan(`- ${org.projects.length} projects, ${org.members.length} members`)}`,
}));
const searchFunction = async (_answers: unknown, input = '') => {
const fuzzyResult = fuzzy.filter(input, searchItems, {
extract: (item: OrgSearchItem) => `${item.name} ${item.id}`,
});
return fuzzyResult.map((result: fuzzy.FilterResult<OrgSearchItem>) => ({
name: result.original.displayText,
value: result.original,
}));
};
const { selectedOrg } = (await inquirer.prompt([
{
type: 'autocomplete',
name: 'selectedOrg',
message: 'Search for an organization to delete:',
source: searchFunction,
pageSize: 15,
},
])) as { selectedOrg: OrgSearchItem };
// Fetch full organization details
const organization = await db.organization.findUnique({
where: {
id: selectedOrg.id,
},
include: {
projects: {
include: {
clients: true,
},
},
members: {
include: {
user: true,
},
},
},
});
if (!organization) {
console.log(chalk.red('Organization not found.'));
return;
}
// Display what will be deleted
console.log(chalk.red('\n⚠ YOU ARE ABOUT TO DELETE:\n'));
console.log(` ${chalk.bold('Organization:')} ${organization.name}`);
console.log(` ${chalk.gray('ID:')} ${organization.id}`);
console.log(` ${chalk.gray('Projects:')} ${organization.projects.length}`);
console.log(` ${chalk.gray('Members:')} ${organization.members.length}`);
if (organization.projects.length > 0) {
console.log(chalk.red('\n Projects that will be deleted:'));
for (const project of organization.projects) {
console.log(
` - ${project.name} ${chalk.gray(`(${project.eventsCount.toLocaleString()} events, ${project.clients.length} clients)`)}`,
);
}
}
if (organization.members.length > 0) {
console.log(chalk.red('\n Members who will lose access:'));
for (const member of organization.members) {
const email = member.user?.email || member.email || 'Unknown';
console.log(` - ${email} ${chalk.gray(`(${member.role})`)}`);
}
}
console.log(
chalk.red(
'\n⚠ This will delete ALL projects, clients, events, and data associated with this organization!',
),
);
// First confirmation
const { confirmFirst } = await inquirer.prompt([
{
type: 'confirm',
name: 'confirmFirst',
message: chalk.red(
`Are you ABSOLUTELY SURE you want to delete "${organization.name}"?`,
),
default: false,
},
]);
if (!confirmFirst) {
console.log(chalk.yellow('\nDeletion cancelled.'));
return;
}
// Second confirmation - type organization name
const { confirmName } = await inquirer.prompt([
{
type: 'input',
name: 'confirmName',
message: `Type the organization name "${organization.name}" to confirm deletion:`,
},
]);
if (confirmName !== organization.name) {
console.log(
chalk.red('\n❌ Organization name does not match. Deletion cancelled.'),
);
return;
}
// Final confirmation
const { confirmFinal } = await inquirer.prompt([
{
type: 'confirm',
name: 'confirmFinal',
message: chalk.red(
'FINAL WARNING: This action CANNOT be undone. Delete now?',
),
default: false,
},
]);
if (!confirmFinal) {
console.log(chalk.yellow('\nDeletion cancelled.'));
return;
}
console.log(chalk.red('\n🗑 Deleting organization...\n'));
try {
const projectIds = organization.projects.map((p) => p.id);
// Step 1: Delete from ClickHouse (events, profiles, etc.)
if (projectIds.length > 0) {
console.log(
chalk.yellow(
`Deleting data from ClickHouse for ${projectIds.length} projects...`,
),
);
await deleteFromClickhouse(projectIds);
console.log(chalk.green('✓ ClickHouse data deletion initiated'));
}
// Step 2: Delete the organization from PostgreSQL (cascade will handle related records)
console.log(chalk.yellow('Deleting organization from database...'));
await deleteOrg(organization.id);
console.log(chalk.green('✓ Organization deleted from database'));
console.log(chalk.green('\n✅ Organization deleted successfully!'));
console.log(
chalk.gray(
`Deleted: ${organization.name} with ${organization.projects.length} projects and ${organization.members.length} members`,
),
);
console.log(
chalk.gray(
'\nNote: ClickHouse deletions are processed asynchronously and may take a few moments to complete.',
),
);
} catch (error) {
console.error(chalk.red('\n❌ Error deleting organization:'), error);
throw error;
}
}

View File

@@ -0,0 +1,220 @@
import { db } from '@openpanel/db';
import chalk from 'chalk';
import fuzzy from 'fuzzy';
import inquirer from 'inquirer';
import autocomplete from 'inquirer-autocomplete-prompt';
// Register autocomplete prompt
inquirer.registerPrompt('autocomplete', autocomplete);
interface UserSearchItem {
id: string;
email: string;
firstName: string | null;
lastName: string | null;
displayText: string;
}
export async function deleteUser() {
console.log(chalk.red('\n🗑 Delete User\n'));
console.log(
chalk.yellow(
'⚠️ WARNING: This will permanently delete the user and remove them from all organizations!\n',
),
);
console.log('Loading users...\n');
const users = await db.user.findMany({
include: {
membership: {
include: {
organization: true,
},
},
accounts: true,
},
orderBy: {
email: 'asc',
},
});
if (users.length === 0) {
console.log(chalk.red('No users found.'));
return;
}
const searchItems: UserSearchItem[] = users.map((user) => {
const fullName =
user.firstName || user.lastName
? `${user.firstName || ''} ${user.lastName || ''}`.trim()
: '';
const orgCount = user.membership.length;
return {
id: user.id,
email: user.email,
firstName: user.firstName,
lastName: user.lastName,
displayText: `${user.email} ${fullName ? chalk.gray(`(${fullName})`) : ''} ${chalk.cyan(`- ${orgCount} orgs`)}`,
};
});
const searchFunction = async (_answers: unknown, input = '') => {
const fuzzyResult = fuzzy.filter(input, searchItems, {
extract: (item: UserSearchItem) =>
`${item.email} ${item.firstName || ''} ${item.lastName || ''}`,
});
return fuzzyResult.map((result: fuzzy.FilterResult<UserSearchItem>) => ({
name: result.original.displayText,
value: result.original,
}));
};
const { selectedUser } = (await inquirer.prompt([
{
type: 'autocomplete',
name: 'selectedUser',
message: 'Search for a user to delete:',
source: searchFunction,
pageSize: 15,
},
])) as { selectedUser: UserSearchItem };
// Fetch full user details
const user = await db.user.findUnique({
where: {
id: selectedUser.id,
},
include: {
membership: {
include: {
organization: true,
},
},
accounts: true,
createdOrganizations: true,
},
});
if (!user) {
console.log(chalk.red('User not found.'));
return;
}
// Display what will be deleted
console.log(chalk.red('\n⚠ YOU ARE ABOUT TO DELETE:\n'));
console.log(` ${chalk.bold('User:')} ${user.email}`);
if (user.firstName || user.lastName) {
console.log(
` ${chalk.gray('Name:')} ${user.firstName || ''} ${user.lastName || ''}`,
);
}
console.log(` ${chalk.gray('ID:')} ${user.id}`);
console.log(
` ${chalk.gray('Member of:')} ${user.membership.length} organizations`,
);
console.log(` ${chalk.gray('Auth accounts:')} ${user.accounts.length}`);
if (user.createdOrganizations.length > 0) {
console.log(
chalk.red(
`\n ⚠️ This user CREATED ${user.createdOrganizations.length} organization(s):`,
),
);
for (const org of user.createdOrganizations) {
console.log(` - ${org.name} ${chalk.gray(`(${org.id})`)}`);
}
console.log(
chalk.yellow(
' Note: These organizations will NOT be deleted, only the user reference.',
),
);
}
if (user.membership.length > 0) {
console.log(
chalk.red('\n Organizations where user will be removed from:'),
);
for (const member of user.membership) {
console.log(
` - ${member.organization.name} ${chalk.gray(`(${member.role})`)}`,
);
}
}
console.log(
chalk.red(
'\n⚠ This will delete the user account, all sessions, and remove them from all organizations!',
),
);
// First confirmation
const { confirmFirst } = await inquirer.prompt([
{
type: 'confirm',
name: 'confirmFirst',
message: chalk.red(
`Are you ABSOLUTELY SURE you want to delete user "${user.email}"?`,
),
default: false,
},
]);
if (!confirmFirst) {
console.log(chalk.yellow('\nDeletion cancelled.'));
return;
}
// Second confirmation - type email
const { confirmEmail } = await inquirer.prompt([
{
type: 'input',
name: 'confirmEmail',
message: `Type the user email "${user.email}" to confirm deletion:`,
},
]);
if (confirmEmail !== user.email) {
console.log(chalk.red('\n❌ Email does not match. Deletion cancelled.'));
return;
}
// Final confirmation
const { confirmFinal } = await inquirer.prompt([
{
type: 'confirm',
name: 'confirmFinal',
message: chalk.red(
'FINAL WARNING: This action CANNOT be undone. Delete now?',
),
default: false,
},
]);
if (!confirmFinal) {
console.log(chalk.yellow('\nDeletion cancelled.'));
return;
}
console.log(chalk.red('\n🗑 Deleting user...\n'));
try {
// Delete the user (cascade will handle related records like sessions, accounts, memberships)
await db.user.delete({
where: {
id: user.id,
},
});
console.log(chalk.green('\n✅ User deleted successfully!'));
console.log(
chalk.gray(
`Deleted: ${user.email} (removed from ${user.membership.length} organizations)`,
),
);
} catch (error) {
console.error(chalk.red('\n❌ Error deleting user:'), error);
throw error;
}
}

View File

@@ -0,0 +1,104 @@
import { db } from '@openpanel/db';
import chalk from 'chalk';
import fuzzy from 'fuzzy';
import inquirer from 'inquirer';
import autocomplete from 'inquirer-autocomplete-prompt';
import { displayOrganizationDetails } from '../utils/display';
// Register autocomplete prompt
inquirer.registerPrompt('autocomplete', autocomplete);
interface ClientSearchItem {
id: string;
name: string;
organizationId: string;
organizationName: string;
projectId: string | null;
projectName: string | null;
displayText: string;
}
export async function lookupByClient() {
console.log(chalk.blue('\n🔑 Lookup by Client ID\n'));
console.log('Loading clients...\n');
const clients = await db.client.findMany({
include: {
organization: true,
project: true,
},
orderBy: {
name: 'asc',
},
});
if (clients.length === 0) {
console.log(chalk.red('No clients found.'));
return;
}
const searchItems: ClientSearchItem[] = clients.map((client) => ({
id: client.id,
name: client.name,
organizationId: client.organizationId,
organizationName: client.organization.name,
projectId: client.projectId,
projectName: client.project?.name || null,
displayText: `${client.organization.name}${client.project?.name || '[No Project]'}${client.name} ${chalk.gray(`(${client.id})`)}`,
}));
const searchFunction = async (_answers: unknown, input = '') => {
const fuzzyResult = fuzzy.filter(input, searchItems, {
extract: (item: ClientSearchItem) =>
`${item.organizationName} ${item.projectName || ''} ${item.name} ${item.id}`,
});
return fuzzyResult.map((result: fuzzy.FilterResult<ClientSearchItem>) => ({
name: result.original.displayText,
value: result.original,
}));
};
const { selectedClient } = (await inquirer.prompt([
{
type: 'autocomplete',
name: 'selectedClient',
message: 'Search for a client:',
source: searchFunction,
pageSize: 15,
},
])) as { selectedClient: ClientSearchItem };
// Fetch full organization details
const organization = await db.organization.findUnique({
where: {
id: selectedClient.organizationId,
},
include: {
projects: {
include: {
clients: true,
},
orderBy: {
name: 'asc',
},
},
members: {
include: {
user: true,
},
},
},
});
if (!organization) {
console.log(chalk.red('Organization not found.'));
return;
}
displayOrganizationDetails(organization, {
highlightProjectId: selectedClient.projectId || undefined,
highlightClientId: selectedClient.id,
});
}

View File

@@ -0,0 +1,112 @@
import { db } from '@openpanel/db';
import chalk from 'chalk';
import fuzzy from 'fuzzy';
import inquirer from 'inquirer';
import autocomplete from 'inquirer-autocomplete-prompt';
import { displayOrganizationDetails } from '../utils/display';
// Register autocomplete prompt
inquirer.registerPrompt('autocomplete', autocomplete);
interface EmailSearchItem {
email: string;
organizationId: string;
organizationName: string;
role: string;
userId: string | null;
displayText: string;
}
export async function lookupByEmail() {
console.log(chalk.blue('\n📧 Lookup by Email\n'));
console.log('Loading members...\n');
const members = await db.member.findMany({
include: {
organization: true,
user: true,
},
orderBy: {
email: 'asc',
},
});
if (members.length === 0) {
console.log(chalk.red('No members found.'));
return;
}
// Group by email (in case same email is in multiple orgs)
const searchItems: EmailSearchItem[] = members.map((member) => {
const email = member.user?.email || member.email || 'Unknown';
const roleBadge =
member.role === 'owner' ? '👑' : member.role === 'admin' ? '⭐' : '👤';
return {
email,
organizationId: member.organizationId,
organizationName: member.organization.name,
role: member.role,
userId: member.userId,
displayText: `${email} ${chalk.gray(`${member.organization.name}`)} ${roleBadge}`,
};
});
const searchFunction = async (_answers: unknown, input = '') => {
const fuzzyResult = fuzzy.filter(input, searchItems, {
extract: (item: EmailSearchItem) =>
`${item.email} ${item.organizationName}`,
});
return fuzzyResult.map((result: fuzzy.FilterResult<EmailSearchItem>) => ({
name: result.original.displayText,
value: result.original,
}));
};
const { selectedMember } = (await inquirer.prompt([
{
type: 'autocomplete',
name: 'selectedMember',
message: 'Search for a member by email:',
source: searchFunction,
pageSize: 15,
},
])) as { selectedMember: EmailSearchItem };
// Fetch full organization details
const organization = await db.organization.findUnique({
where: {
id: selectedMember.organizationId,
},
include: {
projects: {
include: {
clients: true,
},
orderBy: {
name: 'asc',
},
},
members: {
include: {
user: true,
},
},
},
});
if (!organization) {
console.log(chalk.red('Organization not found.'));
return;
}
console.log(
chalk.yellow(
`\nShowing organization for: ${selectedMember.email} (${selectedMember.role})\n`,
),
);
displayOrganizationDetails(organization);
}

View File

@@ -0,0 +1,88 @@
import { db } from '@openpanel/db';
import chalk from 'chalk';
import fuzzy from 'fuzzy';
import inquirer from 'inquirer';
import autocomplete from 'inquirer-autocomplete-prompt';
import { displayOrganizationDetails } from '../utils/display';
// Register autocomplete prompt
inquirer.registerPrompt('autocomplete', autocomplete);
interface OrgSearchItem {
id: string;
name: string;
displayText: string;
}
export async function lookupByOrg() {
console.log(chalk.blue('\n🏢 Lookup by Organization\n'));
console.log('Loading organizations...\n');
const organizations = await db.organization.findMany({
orderBy: {
name: 'asc',
},
});
if (organizations.length === 0) {
console.log(chalk.red('No organizations found.'));
return;
}
const searchItems: OrgSearchItem[] = organizations.map((org) => ({
id: org.id,
name: org.name,
displayText: `${org.name} ${chalk.gray(`(${org.id})`)}`,
}));
const searchFunction = async (_answers: unknown, input = '') => {
const fuzzyResult = fuzzy.filter(input, searchItems, {
extract: (item: OrgSearchItem) => `${item.name} ${item.id}`,
});
return fuzzyResult.map((result: fuzzy.FilterResult<OrgSearchItem>) => ({
name: result.original.displayText,
value: result.original,
}));
};
const { selectedOrg } = (await inquirer.prompt([
{
type: 'autocomplete',
name: 'selectedOrg',
message: 'Search for an organization:',
source: searchFunction,
pageSize: 15,
},
])) as { selectedOrg: OrgSearchItem };
// Fetch full organization details
const organization = await db.organization.findUnique({
where: {
id: selectedOrg.id,
},
include: {
projects: {
include: {
clients: true,
},
orderBy: {
name: 'asc',
},
},
members: {
include: {
user: true,
},
},
},
});
if (!organization) {
console.log(chalk.red('Organization not found.'));
return;
}
displayOrganizationDetails(organization);
}

View File

@@ -0,0 +1,98 @@
import { db } from '@openpanel/db';
import chalk from 'chalk';
import fuzzy from 'fuzzy';
import inquirer from 'inquirer';
import autocomplete from 'inquirer-autocomplete-prompt';
import { displayOrganizationDetails } from '../utils/display';
// Register autocomplete prompt
inquirer.registerPrompt('autocomplete', autocomplete);
interface ProjectSearchItem {
id: string;
name: string;
organizationId: string;
organizationName: string;
displayText: string;
}
export async function lookupByProject() {
console.log(chalk.blue('\n📊 Lookup by Project\n'));
console.log('Loading projects...\n');
const projects = await db.project.findMany({
include: {
organization: true,
},
orderBy: {
name: 'asc',
},
});
if (projects.length === 0) {
console.log(chalk.red('No projects found.'));
return;
}
const searchItems: ProjectSearchItem[] = projects.map((project) => ({
id: project.id,
name: project.name,
organizationId: project.organizationId,
organizationName: project.organization.name,
displayText: `${project.organization.name}${project.name} ${chalk.gray(`(${project.id})`)}`,
}));
const searchFunction = async (_answers: unknown, input = '') => {
const fuzzyResult = fuzzy.filter(input, searchItems, {
extract: (item: ProjectSearchItem) =>
`${item.organizationName} ${item.name} ${item.id}`,
});
return fuzzyResult.map((result: fuzzy.FilterResult<ProjectSearchItem>) => ({
name: result.original.displayText,
value: result.original,
}));
};
const { selectedProject } = (await inquirer.prompt([
{
type: 'autocomplete',
name: 'selectedProject',
message: 'Search for a project:',
source: searchFunction,
pageSize: 15,
},
])) as { selectedProject: ProjectSearchItem };
// Fetch full organization details
const organization = await db.organization.findUnique({
where: {
id: selectedProject.organizationId,
},
include: {
projects: {
include: {
clients: true,
},
orderBy: {
name: 'asc',
},
},
members: {
include: {
user: true,
},
},
},
});
if (!organization) {
console.log(chalk.red('Organization not found.'));
return;
}
displayOrganizationDetails(organization, {
highlightProjectId: selectedProject.id,
});
}

206
admin/src/utils/display.ts Normal file
View File

@@ -0,0 +1,206 @@
import type {
Client,
Member,
Organization,
Project,
User,
} from '@openpanel/db';
import chalk from 'chalk';
type OrganizationWithDetails = Organization & {
projects: (Project & {
clients: Client[];
})[];
members: (Member & {
user: User | null;
})[];
};
interface DisplayOptions {
highlightProjectId?: string;
highlightClientId?: string;
}
export function displayOrganizationDetails(
organization: OrganizationWithDetails,
options: DisplayOptions = {},
) {
console.log(`\n${'='.repeat(80)}`);
console.log(chalk.bold.yellow(`\n📊 ORGANIZATION: ${organization.name}`));
console.log(`${'='.repeat(80)}\n`);
// Organization Details
console.log(chalk.bold('Organization Details:'));
console.log(` ${chalk.gray('ID:')} ${organization.id}`);
console.log(` ${chalk.gray('Name:')} ${organization.name}`);
console.log(
` ${chalk.gray('Created:')} ${organization.createdAt.toISOString()}`,
);
console.log(` ${chalk.gray('Timezone:')} ${organization.timezone || 'UTC'}`);
// Subscription info
if (organization.subscriptionStatus) {
console.log(
` ${chalk.gray('Subscription Status:')} ${getSubscriptionStatusColor(organization.subscriptionStatus)}`,
);
if (organization.subscriptionPriceId) {
console.log(
` ${chalk.gray('Price ID:')} ${organization.subscriptionPriceId}`,
);
}
if (organization.subscriptionPeriodEventsLimit) {
const usage = `${organization.subscriptionPeriodEventsCount}/${organization.subscriptionPeriodEventsLimit}`;
const percentage =
(organization.subscriptionPeriodEventsCount /
organization.subscriptionPeriodEventsLimit) *
100;
const color =
percentage > 90
? chalk.red
: percentage > 70
? chalk.yellow
: chalk.green;
console.log(
` ${chalk.gray('Event Usage:')} ${color(usage)} (${percentage.toFixed(1)}%)`,
);
}
if (organization.subscriptionStartsAt) {
console.log(
` ${chalk.gray('Starts:')} ${organization.subscriptionStartsAt.toISOString()}`,
);
}
if (organization.subscriptionEndsAt) {
console.log(
` ${chalk.gray('Ends:')} ${organization.subscriptionEndsAt.toISOString()}`,
);
}
}
if (organization.deleteAt) {
console.log(
` ${chalk.red.bold('⚠️ Scheduled for deletion:')} ${organization.deleteAt.toISOString()}`,
);
}
// Members
console.log(`\n${chalk.bold('Members:')}`);
if (organization.members.length === 0) {
console.log(' No members');
} else {
for (const member of organization.members) {
const roleBadge = getRoleBadge(member.role);
console.log(
` ${roleBadge} ${member.user?.email || member.email || 'Unknown'} ${chalk.gray(`(${member.role})`)}`,
);
}
}
// Projects
console.log(`\n${chalk.bold(`Projects (${organization.projects.length}):`)}`);
if (organization.projects.length === 0) {
console.log(' No projects');
} else {
for (const project of organization.projects) {
const isHighlighted = project.id === options.highlightProjectId;
const projectPrefix = isHighlighted ? chalk.yellow.bold('→ ') : ' ';
console.log(`\n${projectPrefix}${chalk.bold.green(project.name)}`);
console.log(` ${chalk.gray('ID:')} ${project.id}`);
console.log(
` ${chalk.gray('Events Count:')} ${project.eventsCount.toLocaleString()}`,
);
if (project.domain) {
console.log(` ${chalk.gray('Domain:')} ${project.domain}`);
}
if (project.cors.length > 0) {
console.log(` ${chalk.gray('CORS:')} ${project.cors.join(', ')}`);
}
console.log(
` ${chalk.gray('Cross Domain:')} ${project.crossDomain ? chalk.green('✓') : chalk.red('✗')}`,
);
console.log(
` ${chalk.gray('Created:')} ${project.createdAt.toISOString()}`,
);
if (project.deleteAt) {
console.log(
` ${chalk.red.bold('⚠️ Scheduled for deletion:')} ${project.deleteAt.toISOString()}`,
);
}
// Clients for this project
if (project.clients.length > 0) {
console.log(` ${chalk.gray('Clients:')}`);
for (const client of project.clients) {
const isClientHighlighted = client.id === options.highlightClientId;
const clientPrefix = isClientHighlighted
? chalk.yellow.bold(' → ')
: ' ';
const typeBadge = getClientTypeBadge(client.type);
console.log(`${clientPrefix}${typeBadge} ${chalk.cyan(client.name)}`);
console.log(` ${chalk.gray('ID:')} ${client.id}`);
console.log(` ${chalk.gray('Type:')} ${client.type}`);
console.log(
` ${chalk.gray('Has Secret:')} ${client.secret ? chalk.green('✓') : chalk.red('✗')}`,
);
console.log(
` ${chalk.gray('Ignore CORS/Secret:')} ${client.ignoreCorsAndSecret ? chalk.yellow('✓') : chalk.gray('✗')}`,
);
}
} else {
console.log(` ${chalk.gray('Clients:')} None`);
}
}
}
// Clients without projects (organization-level clients)
const orgLevelClients = organization.projects.length > 0 ? [] : []; // We need to query these separately
console.log(`\n${'='.repeat(80)}\n`);
}
function getSubscriptionStatusColor(status: string): string {
switch (status) {
case 'active':
return chalk.green(status);
case 'trialing':
return chalk.blue(status);
case 'canceled':
return chalk.red(status);
case 'past_due':
return chalk.yellow(status);
default:
return chalk.gray(status);
}
}
function getRoleBadge(role: string): string {
switch (role) {
case 'owner':
return chalk.red.bold('👑');
case 'admin':
return chalk.yellow.bold('⭐');
case 'member':
return chalk.blue('👤');
default:
return chalk.gray('•');
}
}
function getClientTypeBadge(type: string): string {
switch (type) {
case 'root':
return chalk.red.bold('[ROOT]');
case 'write':
return chalk.green('[WRITE]');
case 'read':
return chalk.blue('[READ]');
default:
return chalk.gray('[UNKNOWN]');
}
}

12
admin/tsconfig.json Normal file
View File

@@ -0,0 +1,12 @@
{
"extends": "../tooling/typescript/base.json",
"compilerOptions": {
"outDir": "dist",
"rootDir": "src",
"target": "ES2022",
"lib": ["ES2022"],
"types": ["node"]
},
"include": ["src"]
}

View File

@@ -13,11 +13,11 @@
"dependencies": {
"@ai-sdk/anthropic": "^1.2.10",
"@ai-sdk/openai": "^1.3.12",
"@fastify/compress": "^8.0.1",
"@fastify/compress": "^8.1.0",
"@fastify/cookie": "^11.0.2",
"@fastify/cors": "^11.0.0",
"@fastify/rate-limit": "^10.2.2",
"@fastify/websocket": "^11.0.2",
"@fastify/cors": "^11.1.0",
"@fastify/rate-limit": "^10.3.0",
"@fastify/websocket": "^11.2.0",
"@node-rs/argon2": "^2.0.2",
"@openpanel/auth": "workspace:^",
"@openpanel/common": "workspace:*",
@@ -35,13 +35,12 @@
"@trpc/server": "^11.6.0",
"ai": "^4.2.10",
"fast-json-stable-hash": "^1.0.3",
"fastify": "^5.2.1",
"fastify": "^5.6.1",
"fastify-metrics": "^12.1.0",
"fastify-raw-body": "^5.0.0",
"groupmq": "1.0.0-next.19",
"groupmq": "1.1.0-next.6",
"jsonwebtoken": "^9.0.2",
"ramda": "^0.29.1",
"request-ip": "^3.3.0",
"sharp": "^0.33.5",
"source-map-support": "^0.5.21",
"sqlstring": "^2.3.3",
@@ -58,7 +57,6 @@
"@types/js-yaml": "^4.0.9",
"@types/jsonwebtoken": "^9.0.9",
"@types/ramda": "^0.30.2",
"@types/request-ip": "^0.0.41",
"@types/source-map-support": "^0.5.10",
"@types/sqlstring": "^2.3.2",
"@types/uuid": "^10.0.0",

View File

@@ -7,6 +7,23 @@ const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
import yaml from 'js-yaml';
// Regex special characters that indicate we need actual regex
const regexSpecialChars = /[|^$.*+?(){}\[\]\\]/;
function transformBots(bots: any[]): any[] {
return bots.map((bot) => {
const { regex, ...rest } = bot;
const hasRegexChars = regexSpecialChars.test(regex);
if (hasRegexChars) {
// Keep as regex
return { regex, ...rest };
}
// Convert to includes
return { includes: regex, ...rest };
});
}
async function main() {
// Get document, or throw exception on error
try {
@@ -14,6 +31,9 @@ async function main() {
'https://raw.githubusercontent.com/matomo-org/device-detector/master/regexes/bots.yml',
).then((res) => res.text());
const parsedData = yaml.load(data) as any[];
const transformedBots = transformBots(parsedData);
fs.writeFileSync(
path.resolve(__dirname, '../src/bots/bots.ts'),
[
@@ -21,11 +41,20 @@ async function main() {
'',
'// The data is fetch from device-detector https://raw.githubusercontent.com/matomo-org/device-detector/master/regexes/bots.yml',
'',
`const bots = ${JSON.stringify(yaml.load(data))} as const;`,
`const bots = ${JSON.stringify(transformedBots, null, 2)} as const;`,
'export default bots;',
'',
].join('\n'),
'utf-8',
);
console.log(
`✅ Generated bots.ts with ${transformedBots.length} bot entries`,
);
const regexCount = transformedBots.filter((b) => 'regex' in b).length;
const includesCount = transformedBots.filter((b) => 'includes' in b).length;
console.log(` - ${includesCount} simple string matches (includes)`);
console.log(` - ${regexCount} regex patterns`);
} catch (e) {
console.log(e);
}

View File

@@ -40,8 +40,6 @@ async function main() {
properties: {
hash: 'test-hash',
'query.utm_source': 'test',
__reqId: `req_${Math.floor(Math.random() * 1000)}`,
__user_agent: 'Mozilla/5.0 (Test)',
},
created_at: formatClickhouseDate(eventTime),
country: 'US',

File diff suppressed because it is too large Load Diff

View File

@@ -1,19 +1,47 @@
import { cacheable, cacheableLru } from '@openpanel/redis';
import bots from './bots';
export function isBot(ua: string) {
const res = bots.find((bot) => {
if (new RegExp(bot.regex).test(ua)) {
return true;
}
return false;
});
if (!res) {
return null;
// Pre-compile regex patterns at module load time
const compiledBots = bots.map((bot) => {
if ('regex' in bot) {
return {
...bot,
compiledRegex: new RegExp(bot.regex),
};
}
return bot;
});
return {
name: res.name,
type: 'category' in res ? res.category : 'Unknown',
};
}
const regexBots = compiledBots.filter((bot) => 'compiledRegex' in bot);
const includesBots = compiledBots.filter((bot) => 'includes' in bot);
export const isBot = cacheableLru(
'is-bot',
(ua: string) => {
// Check simple string patterns first (fast)
for (const bot of includesBots) {
if (ua.includes(bot.includes)) {
return {
name: bot.name,
type: 'category' in bot ? bot.category : 'Unknown',
};
}
}
// Check regex patterns (slower)
for (const bot of regexBots) {
if (bot.compiledRegex.test(ua)) {
return {
name: bot.name,
type: 'category' in bot ? bot.category : 'Unknown',
};
}
}
return null;
},
{
maxSize: 1000,
ttl: 60 * 5,
},
);

View File

@@ -1,12 +1,10 @@
import { getClientIp } from '@/utils/get-client-ip';
import type { FastifyReply, FastifyRequest } from 'fastify';
import { generateDeviceId, parseUserAgent } from '@openpanel/common/server';
import { getSalts } from '@openpanel/db';
import { eventsGroupQueue } from '@openpanel/queue';
import { getEventsGroupQueueShard } from '@openpanel/queue';
import type { PostEventPayload } from '@openpanel/sdk';
import { checkDuplicatedEvent } from '@/utils/deduplicate';
import { generateId } from '@openpanel/common';
import { getGeoLocation } from '@openpanel/geo';
import { getStringHeaders, getTimestamp } from './track.controller';
@@ -21,7 +19,7 @@ export async function postEvent(
request.timestamp,
request.body,
);
const ip = getClientIp(request)!;
const ip = request.clientIp;
const ua = request.headers['user-agent']!;
const projectId = request.client?.projectId;
const headers = getStringHeaders(request.headers);
@@ -45,28 +43,22 @@ export async function postEvent(
ua,
});
if (
await checkDuplicatedEvent({
reply,
payload: {
...request.body,
timestamp,
previousDeviceId,
currentDeviceId,
},
projectId,
})
) {
return;
}
const uaInfo = parseUserAgent(ua, request.body?.properties);
const groupId = uaInfo.isServer
? request.body?.profileId
? `${projectId}:${request.body?.profileId}`
: `${projectId}:${generateId()}`
: currentDeviceId;
await eventsGroupQueue.add({
const jobId = [
request.body.name,
timestamp,
projectId,
currentDeviceId,
groupId,
]
.filter(Boolean)
.join('-');
await getEventsGroupQueueShard(groupId).add({
orderMs: new Date(timestamp).getTime(),
data: {
projectId,
@@ -76,11 +68,13 @@ export async function postEvent(
timestamp,
isTimestampFromThePast,
},
uaInfo,
geo,
currentDeviceId,
previousDeviceId,
},
groupId,
jobId,
});
reply.status(202).send('ok');

View File

@@ -4,7 +4,7 @@ import superjson from 'superjson';
import type { WebSocket } from '@fastify/websocket';
import {
eventBuffer,
getProfileByIdCached,
getProfileById,
transformMinimalEvent,
} from '@openpanel/db';
import { setSuperJson } from '@openpanel/json';
@@ -92,10 +92,7 @@ export async function wsProjectEvents(
type,
async (event) => {
if (event.projectId === params.projectId) {
const profile = await getProfileByIdCached(
event.profileId,
event.projectId,
);
const profile = await getProfileById(event.profileId, event.projectId);
socket.send(
superjson.stringify(
access

View File

@@ -4,9 +4,12 @@ import { parseUrlMeta } from '@/utils/parseUrlMeta';
import type { FastifyReply, FastifyRequest } from 'fastify';
import sharp from 'sharp';
import { getClientIp } from '@/utils/get-client-ip';
import {
DEFAULT_HEADER_ORDER,
getClientIpFromHeaders,
} from '@openpanel/common/server/get-client-ip';
import { TABLE_NAMES, ch, chQuery, formatClickhouseDate } from '@openpanel/db';
import { getGeoLocation } from '@openpanel/geo';
import { type GeoLocation, getGeoLocation } from '@openpanel/geo';
import { getCache, getRedisCache } from '@openpanel/redis';
interface GetFaviconParams {
@@ -129,7 +132,7 @@ async function processImage(
): Promise<Buffer> {
// If it's an ICO file, just return it as-is (no conversion needed)
if (originalUrl && isIcoFile(originalUrl, contentType)) {
logger.info('Serving ICO file directly', {
logger.debug('Serving ICO file directly', {
originalUrl,
bufferSize: buffer.length,
});
@@ -137,7 +140,7 @@ async function processImage(
}
if (originalUrl && isSvgFile(originalUrl, contentType)) {
logger.info('Serving SVG file directly', {
logger.debug('Serving SVG file directly', {
originalUrl,
bufferSize: buffer.length,
});
@@ -146,7 +149,7 @@ async function processImage(
// If buffer isnt to big just return it as well
if (buffer.length < 5000) {
logger.info('Serving image directly without processing', {
logger.debug('Serving image directly without processing', {
originalUrl,
bufferSize: buffer.length,
});
@@ -190,7 +193,7 @@ async function processOgImage(
): Promise<Buffer> {
// If buffer is small enough, return it as-is
if (buffer.length < 10000) {
logger.info('Serving OG image directly without processing', {
logger.debug('Serving OG image directly without processing', {
originalUrl,
bufferSize: buffer.length,
});
@@ -394,12 +397,35 @@ export async function stats(request: FastifyRequest, reply: FastifyReply) {
}
export async function getGeo(request: FastifyRequest, reply: FastifyReply) {
const ip = getClientIp(request);
const ip = getClientIpFromHeaders(request.headers);
const others = await Promise.all(
DEFAULT_HEADER_ORDER.map(async (header) => {
const ip = getClientIpFromHeaders(request.headers, header);
return {
header,
ip,
geo: await getGeoLocation(ip),
};
}),
);
if (!ip) {
return reply.status(400).send('Bad Request');
}
const geo = await getGeoLocation(ip);
return reply.status(200).send(geo);
return reply.status(200).send({
selected: {
geo,
ip,
},
...others.reduce(
(acc, other) => {
acc[other.header] = other;
return acc;
},
{} as Record<string, { ip: string; geo: GeoLocation }>,
),
});
}
export async function getOgImage(

View File

@@ -1,8 +1,6 @@
import { getClientIp } from '@/utils/get-client-ip';
import type { FastifyReply, FastifyRequest } from 'fastify';
import { assocPath, pathOr } from 'ramda';
import { checkDuplicatedEvent, isDuplicatedEvent } from '@/utils/deduplicate';
import { parseUserAgent } from '@openpanel/common/server';
import { getProfileById, upsertProfile } from '@openpanel/db';
import { getGeoLocation } from '@openpanel/geo';
@@ -17,41 +15,39 @@ export async function updateProfile(
}>,
reply: FastifyReply,
) {
const { profileId, properties, ...rest } = request.body;
const payload = request.body;
const projectId = request.client!.projectId;
if (!projectId) {
return reply.status(400).send('No projectId');
}
const ip = getClientIp(request)!;
const ip = request.clientIp;
const ua = request.headers['user-agent']!;
const uaInfo = parseUserAgent(ua, properties);
const uaInfo = parseUserAgent(ua, payload.properties);
const geo = await getGeoLocation(ip);
if (
await checkDuplicatedEvent({
reply,
payload: {
...request.body,
},
projectId,
})
) {
return;
}
await upsertProfile({
id: profileId,
...payload,
id: payload.profileId,
isExternal: true,
projectId,
properties: {
...(properties ?? {}),
...(ip ? geo : {}),
...uaInfo,
...(payload.properties ?? {}),
country: geo.country,
city: geo.city,
region: geo.region,
longitude: geo.longitude,
latitude: geo.latitude,
os: uaInfo.os,
os_version: uaInfo.osVersion,
browser: uaInfo.browser,
browser_version: uaInfo.browserVersion,
device: uaInfo.device,
brand: uaInfo.brand,
model: uaInfo.model,
},
...rest,
});
reply.status(202).send(profileId);
reply.status(202).send(payload.profileId);
}
export async function incrementProfileProperty(
@@ -66,18 +62,6 @@ export async function incrementProfileProperty(
return reply.status(400).send('No projectId');
}
if (
await checkDuplicatedEvent({
reply,
payload: {
...request.body,
},
projectId,
})
) {
return;
}
const profile = await getProfileById(profileId, projectId);
if (!profile) {
return reply.status(404).send('Not found');
@@ -120,18 +104,6 @@ export async function decrementProfileProperty(
return reply.status(400).send('No projectId');
}
if (
await checkDuplicatedEvent({
reply,
payload: {
...request.body,
},
projectId,
})
) {
return;
}
const profile = await getProfileById(profileId, projectId);
if (!profile) {
return reply.status(404).send('Not found');

View File

@@ -1,13 +1,11 @@
import { getClientIp } from '@/utils/get-client-ip';
import type { FastifyReply, FastifyRequest } from 'fastify';
import { path, assocPath, pathOr, pick } from 'ramda';
import { assocPath, pathOr, pick } from 'ramda';
import { checkDuplicatedEvent } from '@/utils/deduplicate';
import { generateId } from '@openpanel/common';
import { generateDeviceId, parseUserAgent } from '@openpanel/common/server';
import { getProfileById, getSalts, upsertProfile } from '@openpanel/db';
import { type GeoLocation, getGeoLocation } from '@openpanel/geo';
import { eventsGroupQueue } from '@openpanel/queue';
import { getEventsGroupQueueShard } from '@openpanel/queue';
import type {
DecrementPayload,
IdentifyPayload,
@@ -38,10 +36,10 @@ export function getStringHeaders(headers: FastifyRequest['headers']) {
}
function getIdentity(body: TrackHandlerPayload): IdentifyPayload | undefined {
const identity = path<IdentifyPayload>(
['properties', '__identify'],
body.payload,
);
const identity =
'properties' in body.payload
? (body.payload?.properties?.__identify as IdentifyPayload | undefined)
: undefined;
return (
identity ||
@@ -57,27 +55,28 @@ export function getTimestamp(
timestamp: FastifyRequest['timestamp'],
payload: TrackHandlerPayload['payload'],
) {
const safeTimestamp = new Date(timestamp || Date.now()).toISOString();
const userDefinedTimestamp = path<string>(
['properties', '__timestamp'],
payload,
);
const safeTimestamp = timestamp || Date.now();
const userDefinedTimestamp =
'properties' in payload
? (payload?.properties?.__timestamp as string | undefined)
: undefined;
if (!userDefinedTimestamp) {
return { timestamp: safeTimestamp, isTimestampFromThePast: false };
}
const clientTimestamp = new Date(userDefinedTimestamp);
const clientTimestampNumber = clientTimestamp.getTime();
if (
Number.isNaN(clientTimestamp.getTime()) ||
clientTimestamp > new Date(safeTimestamp)
Number.isNaN(clientTimestampNumber) ||
clientTimestampNumber > safeTimestamp
) {
return { timestamp: safeTimestamp, isTimestampFromThePast: false };
}
return {
timestamp: clientTimestamp.toISOString(),
timestamp: clientTimestampNumber,
isTimestampFromThePast: true,
};
}
@@ -90,18 +89,19 @@ export async function handler(
) {
const timestamp = getTimestamp(request.timestamp, request.body.payload);
const ip =
path<string>(['properties', '__ip'], request.body.payload) ||
getClientIp(request)!;
'properties' in request.body.payload &&
request.body.payload.properties?.__ip
? (request.body.payload.properties.__ip as string)
: request.clientIp;
const ua = request.headers['user-agent']!;
const projectId = request.client?.projectId;
if (!projectId) {
reply.status(400).send({
return reply.status(400).send({
status: 400,
error: 'Bad Request',
message: 'Missing projectId',
});
return;
}
const identity = getIdentity(request.body);
@@ -133,33 +133,7 @@ export async function handler(
})
: '';
if (
await checkDuplicatedEvent({
reply,
payload: {
...request.body,
timestamp,
previousDeviceId,
currentDeviceId,
},
projectId,
})
) {
return;
}
const promises = [
track({
payload: request.body.payload,
currentDeviceId,
previousDeviceId,
projectId,
geo,
headers: getStringHeaders(request.headers),
timestamp: timestamp.timestamp,
isTimestampFromThePast: timestamp.isTimestampFromThePast,
}),
];
const promises = [];
// If we have more than one property in the identity object, we should identify the user
// Otherwise its only a profileId and we should not identify the user
@@ -174,23 +148,23 @@ export async function handler(
);
}
promises.push(
track({
payload: request.body.payload,
currentDeviceId,
previousDeviceId,
projectId,
geo,
headers: getStringHeaders(request.headers),
timestamp: timestamp.timestamp,
isTimestampFromThePast: timestamp.isTimestampFromThePast,
}),
);
await Promise.all(promises);
break;
}
case 'identify': {
if (
await checkDuplicatedEvent({
reply,
payload: {
...request.body,
timestamp,
},
projectId,
})
) {
return;
}
const geo = await getGeoLocation(ip);
await identify({
payload: request.body.payload,
@@ -201,27 +175,13 @@ export async function handler(
break;
}
case 'alias': {
reply.status(400).send({
return reply.status(400).send({
status: 400,
error: 'Bad Request',
message: 'Alias is not supported',
});
break;
}
case 'increment': {
if (
await checkDuplicatedEvent({
reply,
payload: {
...request.body,
timestamp,
},
projectId,
})
) {
return;
}
await increment({
payload: request.body.payload,
projectId,
@@ -229,19 +189,6 @@ export async function handler(
break;
}
case 'decrement': {
if (
await checkDuplicatedEvent({
reply,
payload: {
...request.body,
timestamp,
},
projectId,
})
) {
return;
}
await decrement({
payload: request.body.payload,
projectId,
@@ -249,12 +196,11 @@ export async function handler(
break;
}
default: {
reply.status(400).send({
return reply.status(400).send({
status: 400,
error: 'Bad Request',
message: 'Invalid type',
});
break;
}
}
@@ -277,7 +223,7 @@ async function track({
projectId: string;
geo: GeoLocation;
headers: Record<string, string | undefined>;
timestamp: string;
timestamp: number;
isTimestampFromThePast: boolean;
}) {
const uaInfo = parseUserAgent(headers['user-agent'], payload.properties);
@@ -286,8 +232,11 @@ async function track({
? `${projectId}:${payload.profileId}`
: `${projectId}:${generateId()}`
: currentDeviceId;
await eventsGroupQueue.add({
orderMs: new Date(timestamp).getTime(),
const jobId = [payload.name, timestamp, projectId, currentDeviceId, groupId]
.filter(Boolean)
.join('-');
await getEventsGroupQueueShard(groupId).add({
orderMs: timestamp,
data: {
projectId,
headers,
@@ -296,11 +245,13 @@ async function track({
timestamp,
isTimestampFromThePast,
},
uaInfo,
geo,
currentDeviceId,
previousDeviceId,
},
groupId,
jobId,
});
}
@@ -323,8 +274,18 @@ async function identify({
projectId,
properties: {
...(payload.properties ?? {}),
...(geo ?? {}),
...uaInfo,
country: geo.country,
city: geo.city,
region: geo.region,
longitude: geo.longitude,
latitude: geo.latitude,
os: uaInfo.os,
os_version: uaInfo.osVersion,
browser: uaInfo.browser,
browser_version: uaInfo.browserVersion,
device: uaInfo.device,
brand: uaInfo.brand,
model: uaInfo.model,
},
});
}

View File

@@ -0,0 +1,28 @@
import { isDuplicatedEvent } from '@/utils/deduplicate';
import type { PostEventPayload, TrackHandlerPayload } from '@openpanel/sdk';
import type { FastifyReply, FastifyRequest } from 'fastify';
export async function duplicateHook(
req: FastifyRequest<{
Body: PostEventPayload | TrackHandlerPayload;
}>,
reply: FastifyReply,
) {
const ip = req.clientIp;
const origin = req.headers.origin;
const clientId = req.headers['openpanel-client-id'];
const shouldCheck = ip && origin && clientId;
const isDuplicate = shouldCheck
? await isDuplicatedEvent({
ip,
origin,
payload: req.body,
projectId: clientId as string,
})
: false;
if (isDuplicate) {
return reply.status(200).send('Duplicate event');
}
}

View File

@@ -1,16 +0,0 @@
import type { FastifyRequest } from 'fastify';
export async function fixHook(request: FastifyRequest) {
const ua = request.headers['user-agent'];
// Swift SDK issue: https://github.com/Openpanel-dev/swift-sdk/commit/d588fa761a36a33f3b78eb79d83bfd524e3c7144
if (ua) {
const regex = /OpenPanel\/(\d+\.\d+\.\d+)\sOpenPanel\/(\d+\.\d+\.\d+)/;
const match = ua.match(regex);
if (match) {
request.headers['user-agent'] = ua.replace(
regex,
`OpenPanel/${match[1]}`,
);
}
}
}

View File

@@ -1,13 +1,12 @@
import { getClientIp } from '@/utils/get-client-ip';
import type {
FastifyReply,
FastifyRequest,
HookHandlerDoneFunction,
} from 'fastify';
import { getClientIpFromHeaders } from '@openpanel/common/server/get-client-ip';
import type { FastifyRequest } from 'fastify';
export async function ipHook(request: FastifyRequest) {
const ip = getClientIp(request);
const ip = getClientIpFromHeaders(request.headers);
if (ip) {
request.clientIp = ip;
} else {
request.clientIp = '';
}
}

View File

@@ -28,7 +28,6 @@ import {
liveness,
readiness,
} from './controllers/healthcheck.controller';
import { fixHook } from './hooks/fix.hook';
import { ipHook } from './hooks/ip.hook';
import { requestIdHook } from './hooks/request-id.hook';
import { requestLoggingHook } from './hooks/request-logging.hook';
@@ -55,7 +54,7 @@ process.env.TZ = 'UTC';
declare module 'fastify' {
interface FastifyRequest {
client: IServiceClientWithProject | null;
clientIp?: string;
clientIp: string;
timestamp?: number;
session: SessionValidationResult;
}
@@ -125,7 +124,6 @@ const startServer = async () => {
fastify.addHook('onRequest', requestIdHook);
fastify.addHook('onRequest', timestampHook);
fastify.addHook('onRequest', ipHook);
fastify.addHook('onRequest', fixHook);
fastify.addHook('onResponse', requestLoggingHook);
fastify.register(compress, {

View File

@@ -2,9 +2,11 @@ import * as controller from '@/controllers/event.controller';
import type { FastifyPluginCallback } from 'fastify';
import { clientHook } from '@/hooks/client.hook';
import { duplicateHook } from '@/hooks/duplicate.hook';
import { isBotHook } from '@/hooks/is-bot.hook';
const eventRouter: FastifyPluginCallback = async (fastify) => {
fastify.addHook('preValidation', duplicateHook);
fastify.addHook('preHandler', clientHook);
fastify.addHook('preHandler', isBotHook);

View File

@@ -2,9 +2,11 @@ import { handler } from '@/controllers/track.controller';
import type { FastifyPluginCallback } from 'fastify';
import { clientHook } from '@/hooks/client.hook';
import { duplicateHook } from '@/hooks/duplicate.hook';
import { isBotHook } from '@/hooks/is-bot.hook';
const trackRouter: FastifyPluginCallback = async (fastify) => {
fastify.addHook('preValidation', duplicateHook);
fastify.addHook('preHandler', clientHook);
fastify.addHook('preHandler', isBotHook);

View File

@@ -3,6 +3,7 @@ import type { FastifyRequest, RawRequestDefaultExpression } from 'fastify';
import { verifyPassword } from '@openpanel/common/server';
import type { IServiceClientWithProject } from '@openpanel/db';
import { ClientType, getClientByIdCached } from '@openpanel/db';
import { getCache } from '@openpanel/redis';
import type { PostEventPayload, TrackHandlerPayload } from '@openpanel/sdk';
import type {
IProjectFilterIp,
@@ -135,7 +136,13 @@ export async function validateSdkRequest(
}
if (client.secret && clientSecret) {
if (await verifyPassword(clientSecret, client.secret)) {
const isVerified = await getCache(
`client:auth:${clientId}:${Buffer.from(clientSecret).toString('base64')}`,
60 * 5,
async () => await verifyPassword(clientSecret, client.secret!),
true,
);
if (isVerified) {
return client;
}
}

View File

@@ -1,11 +1,14 @@
import { getLock } from '@openpanel/redis';
import fastJsonStableHash from 'fast-json-stable-hash';
import type { FastifyReply } from 'fastify';
export async function isDuplicatedEvent({
ip,
origin,
payload,
projectId,
}: {
ip: string;
origin: string;
payload: Record<string, any>;
projectId: string;
}) {
@@ -13,6 +16,8 @@ export async function isDuplicatedEvent({
`fastify:deduplicate:${fastJsonStableHash.hash(
{
...payload,
ip,
origin,
projectId,
},
'md5',
@@ -27,24 +32,3 @@ export async function isDuplicatedEvent({
return true;
}
export async function checkDuplicatedEvent({
reply,
payload,
projectId,
}: {
reply: FastifyReply;
payload: Record<string, any>;
projectId: string;
}) {
if (await isDuplicatedEvent({ payload, projectId })) {
reply.log.info('duplicated event', {
payload,
projectId,
});
reply.status(200).send('duplicated');
return true;
}
return false;
}

View File

@@ -1,8 +0,0 @@
import type { FastifyRequest } from 'fastify';
import requestIp from 'request-ip';
const ignore = ['127.0.0.1', '::1'];
export function getClientIp(req: FastifyRequest) {
return requestIp.getClientIp(req);
}

View File

@@ -1,7 +1,7 @@
import { ch, db } from '@openpanel/db';
import {
cronQueue,
eventsGroupQueue,
eventsGroupQueues,
miscQueue,
notificationQueue,
sessionsQueue,
@@ -71,7 +71,7 @@ export async function shutdown(
// Step 6: Close Bull queues (graceful shutdown of queue state)
try {
await Promise.all([
eventsGroupQueue.close(),
...eventsGroupQueues.map((queue) => queue.close()),
sessionsQueue.close(),
cronQueue.close(),
miscQueue.close(),

View File

@@ -0,0 +1,7 @@
import {
createNextRouteHandler,
createScriptHandler,
} from '@openpanel/nextjs/server';
export const POST = createNextRouteHandler();
export const GET = createScriptHandler();

View File

@@ -61,12 +61,9 @@ export default async function Layout({ children }: { children: ReactNode }) {
<RootProvider>
<TooltipProvider>{children}</TooltipProvider>
</RootProvider>
<Script
defer
src="http://localhost:3000/script.js"
data-website-id="44d65df1-e9cb-4c2c-917d-4bf1c7850948"
/>
<OpenPanelComponent
apiUrl="/api/op"
cdnUrl="/api/op/op1.js"
clientId="301c6dc1-424c-4bc3-9886-a8beab09b615"
trackAttributes
trackScreenViews

View File

@@ -273,19 +273,21 @@ export function GET() {
### Proxy events
With `createNextRouteHandler` you can proxy your events through your server, this will ensure all events are tracked since there is a lot of adblockers that block requests to third party domains.
With `createNextRouteHandler` you can proxy your events through your server, this will ensure all events are tracked since there is a lot of adblockers that block requests to third party domains. You'll also need to either host our tracking script or you can use `createScriptHandler` function which proxies this as well.
```typescript title="/app/api/[...op]/route.ts"
import { createNextRouteHandler } from '@openpanel/nextjs/server';
import { createNextRouteHandler, createScriptHandler } from '@openpanel/nextjs/server';
export const POST = createNextRouteHandler();
export const GET = createScriptHandler()
```
Remember to change the `apiUrl` in the `OpenPanelComponent` to your own server.
Remember to change the `apiUrl` and `cdnUrl` in the `OpenPanelComponent` to your own server.
```tsx
<OpenPanelComponent
apiUrl="/api/op" // [!code highlight]
cdnUrl="/api/op/op1.js" // [!code highlight]
clientId="your-client-id"
trackScreenViews={true}
/>

View File

@@ -13,7 +13,8 @@
"dependencies": {
"@hyperdx/node-opentelemetry": "^0.8.1",
"@number-flow/react": "0.3.5",
"@openpanel/nextjs": "^1.0.5",
"@openpanel/common": "workspace:*",
"@openpanel/nextjs": "^1.0.12",
"@openpanel/payments": "workspace:^",
"@openpanel/sdk-info": "workspace:^",
"@openstatus/react": "0.0.3",

View File

@@ -19,7 +19,6 @@
},
"dependencies": {
"@ai-sdk/react": "^1.2.5",
"@clickhouse/client": "^1.2.0",
"@dnd-kit/core": "^6.3.1",
"@dnd-kit/sortable": "^10.0.0",
"@dnd-kit/utilities": "^3.2.2",

View File

@@ -2,6 +2,34 @@ import { createContext, useContext as useBaseContext } from 'react';
import { Tooltip as RechartsTooltip, type TooltipProps } from 'recharts';
export const ChartTooltipContainer = ({
children,
}: { children: React.ReactNode }) => {
return (
<div className="min-w-[180px] col gap-2 rounded-xl border bg-background/80 p-3 shadow-xl backdrop-blur-sm">
{children}
</div>
);
};
export const ChartTooltipHeader = ({
children,
}: { children: React.ReactNode }) => {
return <div className="flex justify-between gap-8">{children}</div>;
};
export const ChartTooltipItem = ({
children,
color,
}: { children: React.ReactNode; color: string }) => {
return (
<div className="flex gap-2">
<div className="w-[3px] rounded-full" style={{ background: color }} />
<div className="col flex-1 gap-1">{children}</div>
</div>
);
};
export function createChartTooltip<
PropsFromTooltip,
PropsFromContext extends Record<string, unknown>,
@@ -31,9 +59,9 @@ export function createChartTooltip<
}
return (
<div className="flex min-w-[180px] flex-col gap-2 rounded-xl border bg-background/80 p-3 shadow-xl backdrop-blur-sm">
<ChartTooltipContainer>
<Tooltip data={data} context={context} {...tooltip} />
</div>
</ChartTooltipContainer>
);
};

View File

@@ -1,10 +1,17 @@
import { fancyMinutes, useNumber } from '@/hooks/use-numer-formatter';
import { cn } from '@/utils/cn';
import AutoSizer from 'react-virtualized-auto-sizer';
import { Area, AreaChart } from 'recharts';
import { Area, AreaChart, Tooltip } from 'recharts';
import { formatDate, timeAgo } from '@/utils/date';
import { getChartColor } from '@/utils/theme';
import { getPreviousMetric } from '@openpanel/common';
import { useState } from 'react';
import {
ChartTooltipContainer,
ChartTooltipHeader,
ChartTooltipItem,
} from '../charts/chart-tooltip';
import {
PreviousDiffIndicatorPure,
getDiffIndicator,
@@ -41,6 +48,7 @@ export function OverviewMetricCard({
inverted = false,
isLoading = false,
}: MetricCardProps) {
const [value, setValue] = useState(metric.current);
const number = useNumber();
const { current, previous } = metric;
@@ -79,7 +87,7 @@ export function OverviewMetricCard({
<span>
{label}:{' '}
<span className="font-semibold">
{renderValue(current, 'ml-1 font-light text-xl', false)}
{renderValue(value, 'ml-1 font-light text-xl', false)}
</span>
</span>
}
@@ -97,7 +105,7 @@ export function OverviewMetricCard({
<div className={cn('group relative p-4')}>
<div
className={cn(
'pointer-events-none absolute -left-1 -right-1 bottom-0 top-0 z-0 opacity-50 transition-opacity duration-300 group-hover:opacity-100',
'absolute -left-1 -right-1 bottom-0 top-0 z-0 opacity-50 transition-opacity duration-300 group-hover:opacity-100',
)}
>
<AutoSizer>
@@ -107,6 +115,11 @@ export function OverviewMetricCard({
height={height / 4}
data={data}
style={{ marginTop: (height / 4) * 3 }}
onMouseMove={(event) => {
setValue(
event.activePayload?.[0]?.payload?.current ?? current,
);
}}
>
<defs>
<linearGradient
@@ -128,6 +141,7 @@ export function OverviewMetricCard({
/>
</linearGradient>
</defs>
<Tooltip content={() => null} />
<Area
dataKey={'current'}
type="step"

View File

@@ -75,7 +75,7 @@ export function RealtimeGeo({ projectId }: RealtimeGeoProps) {
},
{
name: 'Events',
width: '84px',
width: '60px',
render(item) {
return (
<div className="row gap-2 justify-end">
@@ -86,6 +86,19 @@ export function RealtimeGeo({ projectId }: RealtimeGeoProps) {
);
},
},
{
name: 'Sessions',
width: '82px',
render(item) {
return (
<div className="row gap-2 justify-end">
<span className="font-semibold">
{number.short(item.unique_sessions)}
</span>
</div>
);
},
},
]}
/>
</div>

View File

@@ -82,7 +82,7 @@ export function RealtimePaths({ projectId }: RealtimePathsProps) {
},
{
name: 'Events',
width: '84px',
width: '60px',
render(item) {
return (
<div className="row gap-2 justify-end">
@@ -93,6 +93,19 @@ export function RealtimePaths({ projectId }: RealtimePathsProps) {
);
},
},
{
name: 'Sessions',
width: '82px',
render(item) {
return (
<div className="row gap-2 justify-end">
<span className="font-semibold">
{number.short(item.unique_sessions)}
</span>
</div>
);
},
},
]}
/>
</div>

View File

@@ -65,7 +65,7 @@ export function RealtimeReferrals({ projectId }: RealtimeReferralsProps) {
},
{
name: 'Events',
width: '84px',
width: '60px',
render(item) {
return (
<div className="row gap-2 justify-end">
@@ -76,6 +76,19 @@ export function RealtimeReferrals({ projectId }: RealtimeReferralsProps) {
);
},
},
{
name: 'Sessions',
width: '82px',
render(item) {
return (
<div className="row gap-2 justify-end">
<span className="font-semibold">
{number.short(item.unique_sessions)}
</span>
</div>
);
},
},
]}
/>
</div>

View File

@@ -8,7 +8,7 @@ import { getChartColor } from '@/utils/theme';
import { useQuery } from '@tanstack/react-query';
import { isSameDay, isSameHour, isSameMonth, isSameWeek } from 'date-fns';
import { last } from 'ramda';
import React, { useCallback } from 'react';
import { useCallback } from 'react';
import {
Area,
CartesianGrid,
@@ -25,7 +25,6 @@ import {
import { useDashedStroke } from '@/hooks/use-dashed-stroke';
import { useXAxisProps, useYAxisProps } from '../common/axis';
import { SolidToDashedGradient } from '../common/linear-gradient';
import { ReportChartTooltip } from '../common/report-chart-tooltip';
import { ReportTable } from '../common/report-table';
import { SerieIcon } from '../common/serie-icon';

View File

@@ -35,7 +35,9 @@ export function Chart({ data }: Props) {
() => (isEditMode ? data.series : data.series.slice(0, limit || 10)),
[data, isEditMode, limit],
);
const maxCount = Math.max(...series.map((serie) => serie.metrics[metric]));
const maxCount = Math.max(
...series.map((serie) => serie.metrics[metric] ?? 0),
);
const tableColumns = [
{

View File

@@ -3,7 +3,11 @@ import { useNumber } from '@/hooks/use-numer-formatter';
import type { IRechartPayloadItem } from '@/hooks/use-rechart-data-model';
import React from 'react';
import { createChartTooltip } from '@/components/charts/chart-tooltip';
import {
ChartTooltipHeader,
ChartTooltipItem,
createChartTooltip,
} from '@/components/charts/chart-tooltip';
import type { RouterOutputs } from '@/trpc/client';
import type { IInterval } from '@openpanel/validation';
import {
@@ -88,37 +92,31 @@ export const ReportChartTooltip = createChartTooltip<Data, Context>(
const hidden = sorted.slice(limit);
return (
<div className="flex min-w-[180px] flex-col gap-2">
<>
{visible.map((item, index) => (
<React.Fragment key={item.id}>
{index === 0 && item.date && (
<div className="flex justify-between gap-8">
<ChartTooltipHeader>
<div>{formatDate(new Date(item.date))}</div>
</div>
</ChartTooltipHeader>
)}
<div className="flex gap-2">
<div
className="w-[3px] rounded-full"
style={{ background: item.color }}
/>
<div className="col flex-1 gap-1">
<div className="flex items-center gap-1">
<SerieIcon name={item.names} />
<SerieName name={item.names} />
</div>
<div className="flex justify-between gap-8 font-mono font-medium">
<div className="row gap-1">
{number.formatWithUnit(item.count, unit)}
{!!item.previous && (
<span className="text-muted-foreground">
({number.formatWithUnit(item.previous.value, unit)})
</span>
)}
</div>
<PreviousDiffIndicator {...item.previous} />
</div>
<ChartTooltipItem color={item.color}>
<div className="flex items-center gap-1">
<SerieIcon name={item.names} />
<SerieName name={item.names} />
</div>
</div>
<div className="flex justify-between gap-8 font-mono font-medium">
<div className="row gap-1">
{number.formatWithUnit(item.count, unit)}
{!!item.previous && (
<span className="text-muted-foreground">
({number.formatWithUnit(item.previous.value, unit)})
</span>
)}
</div>
<PreviousDiffIndicator {...item.previous} />
</div>
</ChartTooltipItem>
</React.Fragment>
))}
{hidden.length > 0 && (
@@ -142,7 +140,7 @@ export const ReportChartTooltip = createChartTooltip<Data, Context>(
))}
</>
)}
</div>
</>
);
},
);

View File

@@ -22,7 +22,7 @@ export function Chart({ data }: Props) {
() =>
series.map((s) => ({
country: s.names[0]?.toLowerCase() ?? '',
value: s.metrics[metric],
value: s.metrics[metric] ?? 0,
})),
[series, metric],
);

View File

@@ -12,7 +12,7 @@ interface Props {
export function Chart({ data }: Props) {
const {
isEditMode,
report: { metric, unit },
report: { unit },
} = useReportChartContext();
const { series } = useVisibleSeries(data, isEditMode ? 20 : 4);
return (
@@ -27,7 +27,7 @@ export function Chart({ data }: Props) {
<MetricCard
key={serie.id}
serie={serie}
metric={metric}
metric={'count'}
unit={unit}
/>
);

View File

@@ -2,10 +2,17 @@ import { fancyMinutes, useNumber } from '@/hooks/use-numer-formatter';
import type { IChartData } from '@/trpc/client';
import { cn } from '@/utils/cn';
import AutoSizer from 'react-virtualized-auto-sizer';
import { Area, AreaChart } from 'recharts';
import { Area, AreaChart, Tooltip } from 'recharts';
import type { IChartMetric } from '@openpanel/validation';
import {
ChartTooltipContainer,
ChartTooltipHeader,
ChartTooltipItem,
} from '@/components/charts/chart-tooltip';
import { formatDate } from '@/utils/date';
import { getChartColor } from '@/utils/theme';
import {
PreviousDiffIndicator,
getDiffIndicator,
@@ -20,6 +27,27 @@ interface MetricCardProps {
unit?: string;
}
const TooltipContent = (props: { payload?: any[] }) => {
const number = useNumber();
return (
<ChartTooltipContainer>
{props.payload?.map((item) => {
const { date, count } = item.payload;
return (
<div key={item.id} className="col gap-2">
<ChartTooltipHeader>
<div>{formatDate(new Date(date))}</div>
</ChartTooltipHeader>
<ChartTooltipItem color={getChartColor(0)}>
<div>{number.format(count)}</div>
</ChartTooltipItem>
</div>
);
})}
</ChartTooltipContainer>
);
};
export function MetricCard({
serie,
color: _color,
@@ -32,7 +60,11 @@ export function MetricCard({
} = useReportChartContext();
const number = useNumber();
const renderValue = (value: number, unitClassName?: string) => {
const renderValue = (value: number | undefined, unitClassName?: string) => {
if (!value) {
return <div className="text-muted-foreground">N/A</div>;
}
if (unit === 'min') {
return <>{fancyMinutes(value)}</>;
}
@@ -62,7 +94,7 @@ export function MetricCard({
>
<div
className={cn(
'pointer-events-none absolute -left-1 -right-1 bottom-0 top-0 z-0 opacity-50 transition-opacity duration-300 group-hover:opacity-100',
'absolute -left-1 -right-1 bottom-0 top-0 z-0 opacity-100 transition-opacity duration-300 group-hover:opacity-100',
)}
>
<AutoSizer>
@@ -89,6 +121,7 @@ export function MetricCard({
/>
</linearGradient>
</defs>
<Tooltip content={TooltipContent} />
<Area
dataKey="count"
type="step"

View File

@@ -7,6 +7,11 @@ import { truncate } from '@/utils/truncate';
import { Fragment } from 'react';
import { Cell, Pie, PieChart, ResponsiveContainer, Tooltip } from 'recharts';
import {
ChartTooltipContainer,
ChartTooltipHeader,
ChartTooltipItem,
} from '@/components/charts/chart-tooltip';
import { useNumber } from '@/hooks/use-numer-formatter';
import { formatDate } from '@/utils/date';
import { AXIS_FONT_PROPS } from '../common/axis';
@@ -24,43 +29,37 @@ interface Props {
const PieTooltip = (props: { payload?: any[] }) => {
const number = useNumber();
return (
<div className="bg-background/80 p-2 rounded-md backdrop-blur-md border min-w-[180px]">
<ChartTooltipContainer>
{props.payload?.map((serie, index) => {
const item = serie.payload;
return (
<Fragment key={item.id}>
{index === 0 && item.date && (
<div className="flex justify-between gap-8">
<ChartTooltipHeader>
<div>{formatDate(new Date(item.date))}</div>
</div>
</ChartTooltipHeader>
)}
<div className="flex gap-2">
<div
className="w-[3px] rounded-full"
style={{ background: item.color }}
/>
<div className="col flex-1 gap-1">
<div className="flex items-center gap-1">
<SerieIcon name={item.name} />
<SerieName name={item.names} className="font-medium" />
</div>
<div className="flex justify-between gap-8 font-mono font-medium">
<div className="row gap-1">
{number.formatWithUnit(item.count)}
{!!item.previous && (
<span className="text-muted-foreground">
({number.formatWithUnit(item.previous.sum.value)})
</span>
)}
</div>
<PreviousDiffIndicator {...item.previous?.sum} />
</div>
<ChartTooltipItem color={item.color}>
<div className="flex items-center gap-1">
<SerieIcon name={item.name} />
<SerieName name={item.names} className="font-medium" />
</div>
</div>
<div className="flex justify-between gap-8 font-mono font-medium">
<div className="row gap-1">
{number.formatWithUnit(item.count)}
{!!item.previous && (
<span className="text-muted-foreground">
({number.formatWithUnit(item.previous.sum.value)})
</span>
)}
</div>
<PreviousDiffIndicator {...item.previous?.sum} />
</div>
</ChartTooltipItem>
</Fragment>
);
})}
</div>
</ChartTooltipContainer>
);
};

View File

@@ -4,8 +4,8 @@ import type {
VisibilityState,
} from '@tanstack/react-table';
import { parseAsInteger, useQueryState } from 'nuqs';
import { useState } from 'react';
import { useLocalStorage } from 'usehooks-ts';
import { useEffect, useState } from 'react';
import { useLocalStorage, useReadLocalStorage } from 'usehooks-ts';
export const useDataTablePagination = (pageSize = 10) => {
const [page, setPage] = useQueryState(
@@ -22,6 +22,12 @@ export const useDataTablePagination = (pageSize = 10) => {
return { page, setPage, state };
};
export const useReadColumnVisibility = (persistentKey: string) => {
return useReadLocalStorage<Record<string, boolean>>(
`@op:${persistentKey}-column-visibility`,
);
};
export const useDataTableColumnVisibility = <TData,>(
columns: ColumnDef<TData>[],
persistentKey: string,
@@ -43,6 +49,13 @@ export const useDataTableColumnVisibility = <TData,>(
}, {} as VisibilityState),
);
// somewhat hack
// Set initial column visibility,
// otherwise will not useReadColumnVisibility be updated
useEffect(() => {
setColumnVisibility(columnVisibility);
}, []);
const [columnOrder, setColumnOrder] = useLocalStorage<string[]>(
`@op:${persistentKey}-column-order`,
columns.map((column) => column.id!),

View File

@@ -19,7 +19,12 @@ const setCookieFn = createServerFn({ method: 'POST' })
if (!VALID_COOKIES.includes(key)) {
return;
}
setCookie(key, value);
const maxAge = 60 * 60 * 24 * 365 * 10;
setCookie(key, value, {
maxAge,
path: '/',
expires: new Date(Date.now() + maxAge),
});
});
// Called in __root.tsx beforeLoad hook to get cookies from the server

View File

@@ -1,4 +1,5 @@
import { EventsTable } from '@/components/events/table';
import { useReadColumnVisibility } from '@/components/ui/data-table/data-table-hooks';
import { useEventQueryNamesFilter } from '@/hooks/use-event-query-filters';
import { useTRPC } from '@/integrations/trpc/react';
import { useInfiniteQuery } from '@tanstack/react-query';
@@ -20,6 +21,7 @@ function Component() {
);
const [endDate, setEndDate] = useQueryState('endDate', parseAsIsoDateTime);
const [eventNames] = useEventQueryNamesFilter();
const columnVisibility = useReadColumnVisibility('events');
const query = useInfiniteQuery(
trpc.event.conversions.infiniteQueryOptions(
{
@@ -27,6 +29,7 @@ function Component() {
startDate: startDate || undefined,
endDate: endDate || undefined,
events: eventNames,
columnVisibility: columnVisibility ?? {},
},
{
getNextPageParam: (lastPage) => lastPage.meta.next,

View File

@@ -1,4 +1,5 @@
import { EventsTable } from '@/components/events/table';
import { useReadColumnVisibility } from '@/components/ui/data-table/data-table-hooks';
import {
useEventQueryFilters,
useEventQueryNamesFilter,
@@ -21,6 +22,8 @@ function Component() {
const [startDate] = useQueryState('startDate', parseAsIsoDateTime);
const [endDate] = useQueryState('endDate', parseAsIsoDateTime);
const [eventNames] = useEventQueryNamesFilter();
const columnVisibility = useReadColumnVisibility('events');
const query = useInfiniteQuery(
trpc.event.events.infiniteQueryOptions(
{
@@ -30,8 +33,10 @@ function Component() {
profileId: '',
startDate: startDate || undefined,
endDate: endDate || undefined,
columnVisibility: columnVisibility ?? {},
},
{
enabled: columnVisibility !== null,
getNextPageParam: (lastPage) => lastPage.meta.next,
},
),

View File

@@ -1,4 +1,5 @@
import { EventsTable } from '@/components/events/table';
import { useReadColumnVisibility } from '@/components/ui/data-table/data-table-hooks';
import {
useEventQueryFilters,
useEventQueryNamesFilter,
@@ -21,6 +22,7 @@ function Component() {
const [startDate] = useQueryState('startDate', parseAsIsoDateTime);
const [endDate] = useQueryState('endDate', parseAsIsoDateTime);
const [eventNames] = useEventQueryNamesFilter();
const columnVisibility = useReadColumnVisibility('events');
const query = useInfiniteQuery(
trpc.event.events.infiniteQueryOptions(
{
@@ -30,8 +32,10 @@ function Component() {
startDate: startDate || undefined,
endDate: endDate || undefined,
events: eventNames,
columnVisibility: columnVisibility ?? {},
},
{
enabled: columnVisibility !== null,
getNextPageParam: (lastPage) => lastPage.meta.next,
},
),

View File

@@ -3,12 +3,11 @@ import FullPageLoadingState from '@/components/full-page-loading-state';
import { PageContainer } from '@/components/page-container';
import { PageHeader } from '@/components/page-header';
import { SerieIcon } from '@/components/report-chart/common/serie-icon';
import { useDataTablePagination } from '@/components/ui/data-table/data-table-hooks';
import { useReadColumnVisibility } from '@/components/ui/data-table/data-table-hooks';
import {
useEventQueryFilters,
useEventQueryNamesFilter,
} from '@/hooks/use-event-query-filters';
import { useSearchQueryState } from '@/hooks/use-search-query-state';
import { useTRPC } from '@/integrations/trpc/react';
import { createProjectTitle } from '@/utils/title';
import { useInfiniteQuery, useSuspenseQuery } from '@tanstack/react-query';
@@ -46,8 +45,6 @@ function Component() {
const trpc = useTRPC();
const LIMIT = 50;
const { page } = useDataTablePagination(LIMIT);
const { debouncedSearch } = useSearchQueryState();
const { data: session } = useSuspenseQuery(
trpc.session.byId.queryOptions({
@@ -60,7 +57,7 @@ function Component() {
const [startDate] = useQueryState('startDate', parseAsIsoDateTime);
const [endDate] = useQueryState('endDate', parseAsIsoDateTime);
const [eventNames] = useEventQueryNamesFilter();
const columnVisibility = useReadColumnVisibility('events');
const query = useInfiniteQuery(
trpc.event.events.infiniteQueryOptions(
{
@@ -70,8 +67,10 @@ function Component() {
events: eventNames,
startDate: startDate || undefined,
endDate: endDate || undefined,
columnVisibility: columnVisibility ?? {},
},
{
enabled: columnVisibility !== null,
getNextPageParam: (lastPage) => lastPage.meta.next,
},
),

View File

@@ -136,6 +136,22 @@ function Component() {
</LinkButton>
</Alert>
)}
{organization.subscriptionPeriodEventsCountExceededAt &&
organization.isActive && (
<Alert
title="Events limit exceeded"
description={`Your subscription has exceeded the limit on ${format(organization.subscriptionPeriodEventsCountExceededAt, 'PPP')}`}
>
<LinkButton
to="/$organizationId/billing"
params={{
organizationId: organizationId,
}}
>
Upgrade now
</LinkButton>
</Alert>
)}
<Outlet />
<SupporterPrompt />
</>

View File

@@ -11,8 +11,8 @@
"typecheck": "tsc --noEmit"
},
"dependencies": {
"@bull-board/api": "6.13.1",
"@bull-board/express": "6.13.1",
"@bull-board/api": "6.14.0",
"@bull-board/express": "6.14.0",
"@openpanel/common": "workspace:*",
"@openpanel/db": "workspace:*",
"@openpanel/email": "workspace:*",
@@ -22,9 +22,9 @@
"@openpanel/importer": "workspace:*",
"@openpanel/queue": "workspace:*",
"@openpanel/redis": "workspace:*",
"bullmq": "^5.8.7",
"bullmq": "^5.63.0",
"express": "^4.18.2",
"groupmq": "1.0.0-next.19",
"groupmq": "1.1.0-next.6",
"prom-client": "^15.1.3",
"ramda": "^0.29.1",
"source-map-support": "^0.5.21",

View File

@@ -44,43 +44,30 @@ export async function bootCron() {
});
}
// Add repeatable jobs
for (const job of jobs) {
await cronQueue.add(
job.name,
{
type: job.type,
payload: undefined,
},
{
jobId: job.type,
repeat:
typeof job.pattern === 'number'
? {
every: job.pattern,
}
: {
pattern: job.pattern,
},
},
);
logger.info('Updating cron jobs');
const jobSchedulers = await cronQueue.getJobSchedulers();
for (const jobScheduler of jobSchedulers) {
await cronQueue.removeJobScheduler(jobScheduler.key);
}
// Remove outdated repeatable jobs
const repeatableJobs = await cronQueue.getRepeatableJobs();
for (const repeatableJob of repeatableJobs) {
const match = jobs.find(
(job) => `${job.name}:${job.type}:::${job.pattern}` === repeatableJob.key,
// Add repeatable jobs
for (const job of jobs) {
await cronQueue.upsertJobScheduler(
job.type,
typeof job.pattern === 'number'
? {
every: job.pattern,
}
: {
pattern: job.pattern,
},
{
data: {
type: job.type,
payload: undefined,
},
},
);
if (match) {
logger.info('Repeatable job exists', {
key: repeatableJob.key,
});
} else {
logger.info('Removing repeatable job', {
key: repeatableJob.key,
});
cronQueue.removeRepeatableByKey(repeatableJob.key);
}
}
}

View File

@@ -2,9 +2,10 @@ import type { Queue, WorkerOptions } from 'bullmq';
import { Worker } from 'bullmq';
import {
EVENTS_GROUP_QUEUES_SHARDS,
type EventsQueuePayloadIncomingEvent,
cronQueue,
eventsGroupQueue,
eventsGroupQueues,
importQueue,
miscQueue,
notificationQueue,
@@ -18,59 +19,179 @@ import { setTimeout as sleep } from 'node:timers/promises';
import { Worker as GroupWorker } from 'groupmq';
import { cronJob } from './jobs/cron';
import { eventsJob } from './jobs/events';
import { incomingEventPure } from './jobs/events.incoming-event';
import { incomingEvent } from './jobs/events.incoming-event';
import { importJob } from './jobs/import';
import { miscJob } from './jobs/misc';
import { notificationJob } from './jobs/notification';
import { sessionsJob } from './jobs/sessions';
import { eventsGroupJobDuration } from './metrics';
import { logger } from './utils/logger';
const workerOptions: WorkerOptions = {
connection: getRedisQueue(),
};
export async function bootWorkers() {
const eventsGroupWorker = new GroupWorker<
EventsQueuePayloadIncomingEvent['payload']
>({
concurrency: Number.parseInt(process.env.EVENT_JOB_CONCURRENCY || '1', 10),
logger: queueLogger,
queue: eventsGroupQueue,
handler: async (job) => {
logger.info('processing event (group queue)', {
groupId: job.groupId,
timestamp: job.data.event.timestamp,
});
await incomingEventPure(job.data);
},
});
eventsGroupWorker.run();
const sessionsWorker = new Worker(
sessionsQueue.name,
sessionsJob,
workerOptions,
);
const cronWorker = new Worker(cronQueue.name, cronJob, workerOptions);
const notificationWorker = new Worker(
notificationQueue.name,
notificationJob,
workerOptions,
);
const miscWorker = new Worker(miscQueue.name, miscJob, workerOptions);
const importWorker = new Worker(importQueue.name, importJob, {
...workerOptions,
concurrency: Number.parseInt(process.env.IMPORT_JOB_CONCURRENCY || '1', 10),
});
type QueueName = string; // Can be: events, events_N (where N is 0 to shards-1), sessions, cron, notification, misc
const workers = [
sessionsWorker,
cronWorker,
notificationWorker,
miscWorker,
importWorker,
// eventsGroupWorker,
];
/**
* Parses the ENABLED_QUEUES environment variable and returns an array of queue names to start.
* If no env var is provided, returns all queues.
*
* Supported queue names:
* - events - All event shards (events_0, events_1, ..., events_N)
* - events_N - Individual event shard (where N is 0 to EVENTS_GROUP_QUEUES_SHARDS-1)
* - sessions, cron, notification, misc
*/
function getEnabledQueues(): QueueName[] {
const enabledQueuesEnv = process.env.ENABLED_QUEUES?.trim();
if (!enabledQueuesEnv) {
logger.info('No ENABLED_QUEUES specified, starting all queues', {
totalEventShards: EVENTS_GROUP_QUEUES_SHARDS,
});
return ['events', 'sessions', 'cron', 'notification', 'misc', 'import'];
}
const queues = enabledQueuesEnv
.split(',')
.map((q) => q.trim())
.filter(Boolean);
logger.info('Starting queues from ENABLED_QUEUES', {
queues,
totalEventShards: EVENTS_GROUP_QUEUES_SHARDS,
});
return queues;
}
/**
* Gets the concurrency setting for a queue from environment variables.
* Env var format: {QUEUE_NAME}_CONCURRENCY (e.g., EVENTS_0_CONCURRENCY=32)
*/
function getConcurrencyFor(queueName: string, defaultValue = 1): number {
const envKey = `${queueName.toUpperCase().replace(/[^A-Z0-9]/g, '_')}_CONCURRENCY`;
const value = process.env[envKey];
if (value) {
const parsed = Number.parseInt(value, 10);
if (!Number.isNaN(parsed) && parsed > 0) {
return parsed;
}
}
return defaultValue;
}
export async function bootWorkers() {
const enabledQueues = getEnabledQueues();
const workers: (Worker | GroupWorker<any>)[] = [];
// Start event workers based on enabled queues
const eventQueuesToStart: number[] = [];
if (enabledQueues.includes('events')) {
// Start all event shards
for (let i = 0; i < EVENTS_GROUP_QUEUES_SHARDS; i++) {
eventQueuesToStart.push(i);
}
} else {
// Start specific event shards (events_0, events_1, etc.)
for (let i = 0; i < EVENTS_GROUP_QUEUES_SHARDS; i++) {
if (enabledQueues.includes(`events_${i}`)) {
eventQueuesToStart.push(i);
}
}
}
for (const index of eventQueuesToStart) {
const queue = eventsGroupQueues[index];
if (!queue) continue;
const queueName = `events_${index}`;
const concurrency = getConcurrencyFor(
queueName,
Number.parseInt(process.env.EVENT_JOB_CONCURRENCY || '10', 10),
);
const worker = new GroupWorker<EventsQueuePayloadIncomingEvent['payload']>({
queue,
concurrency,
logger: queueLogger,
blockingTimeoutSec: Number.parseFloat(
process.env.EVENT_BLOCKING_TIMEOUT_SEC || '1',
),
handler: async (job) => {
return await incomingEvent(job.data);
},
});
worker.run();
workers.push(worker);
logger.info(`Started worker for ${queueName}`, { concurrency });
}
// Start sessions worker
if (enabledQueues.includes('sessions')) {
const concurrency = getConcurrencyFor('sessions');
const sessionsWorker = new Worker(sessionsQueue.name, sessionsJob, {
...workerOptions,
concurrency,
});
workers.push(sessionsWorker);
logger.info('Started worker for sessions', { concurrency });
}
// Start cron worker
if (enabledQueues.includes('cron')) {
const concurrency = getConcurrencyFor('cron');
const cronWorker = new Worker(cronQueue.name, cronJob, {
...workerOptions,
concurrency,
});
workers.push(cronWorker);
logger.info('Started worker for cron', { concurrency });
}
// Start notification worker
if (enabledQueues.includes('notification')) {
const concurrency = getConcurrencyFor('notification');
const notificationWorker = new Worker(
notificationQueue.name,
notificationJob,
{ ...workerOptions, concurrency },
);
workers.push(notificationWorker);
logger.info('Started worker for notification', { concurrency });
}
// Start misc worker
if (enabledQueues.includes('misc')) {
const concurrency = getConcurrencyFor('misc');
const miscWorker = new Worker(miscQueue.name, miscJob, {
...workerOptions,
concurrency,
});
workers.push(miscWorker);
logger.info('Started worker for misc', { concurrency });
}
// Start import worker
if (enabledQueues.includes('import')) {
const concurrency = getConcurrencyFor('import');
const importWorker = new Worker(importQueue.name, importJob, {
...workerOptions,
concurrency,
});
workers.push(importWorker);
logger.info('Started worker for import', { concurrency });
}
if (workers.length === 0) {
logger.warn(
'No workers started. Check ENABLED_QUEUES environment variable.',
);
}
workers.forEach((worker) => {
(worker as Worker).on('error', (error) => {
@@ -94,6 +215,13 @@ export async function bootWorkers() {
(worker as Worker).on('failed', (job) => {
if (job) {
if (job.processedOn && job.finishedOn) {
const elapsed = job.finishedOn - job.processedOn;
eventsGroupJobDuration.observe(
{ name: worker.name, status: 'failed' },
elapsed,
);
}
logger.error('job failed', {
jobId: job.id,
worker: worker.name,
@@ -106,15 +234,18 @@ export async function bootWorkers() {
(worker as Worker).on('completed', (job) => {
if (job) {
logger.info('job completed', {
jobId: job.id,
worker: worker.name,
data: job.data,
elapsed:
job.processedOn && job.finishedOn
? job.finishedOn - job.processedOn
: undefined,
});
if (job.processedOn && job.finishedOn) {
const elapsed = job.finishedOn - job.processedOn;
logger.info('job completed', {
jobId: job.id,
worker: worker.name,
elapsed,
});
eventsGroupJobDuration.observe(
{ name: worker.name, status: 'success' },
elapsed,
);
}
}
});
@@ -135,8 +266,14 @@ export async function bootWorkers() {
});
try {
const time = performance.now();
await waitForQueueToEmpty(cronQueue);
// Wait for cron queue to empty if it's running
if (enabledQueues.includes('cron')) {
await waitForQueueToEmpty(cronQueue);
}
await Promise.all(workers.map((worker) => worker.close()));
logger.info('workers closed successfully', {
elapsed: performance.now() - time,
});
@@ -155,15 +292,7 @@ export async function bootWorkers() {
['uncaughtException', 'unhandledRejection', 'SIGTERM', 'SIGINT'].forEach(
(evt) => {
process.on(evt, (code) => {
if (process.env.NODE_ENV === 'production') {
exitHandler(evt, code);
} else {
logger.info('Shutting down for development', {
event: evt,
code,
});
process.exit(0);
}
exitHandler(evt, code);
});
},
);

View File

@@ -4,7 +4,7 @@ import { ExpressAdapter } from '@bull-board/express';
import { createInitialSalts } from '@openpanel/db';
import {
cronQueue,
eventsGroupQueue,
eventsGroupQueues,
importQueue,
miscQueue,
notificationQueue,
@@ -34,7 +34,9 @@ async function start() {
serverAdapter.setBasePath('/');
createBullBoard({
queues: [
new BullBoardGroupMQAdapter(eventsGroupQueue) as any,
...eventsGroupQueues.map(
(queue) => new BullBoardGroupMQAdapter(queue) as any,
),
new BullMQAdapter(sessionsQueue),
new BullMQAdapter(cronQueue),
new BullMQAdapter(notificationQueue),

View File

@@ -1,10 +1,9 @@
import { logger } from '@/utils/logger';
import { TABLE_NAMES, ch, db, getReplicatedTableName } from '@openpanel/db';
import { db, deleteFromClickhouse, deleteProjects } from '@openpanel/db';
import type { CronQueuePayload } from '@openpanel/queue';
import type { Job } from 'bullmq';
import sqlstring from 'sqlstring';
export async function deleteProjects(job: Job<CronQueuePayload>) {
export async function jobdeleteProjects(job: Job<CronQueuePayload>) {
const projects = await db.project.findMany({
where: {
deleteAt: {
@@ -17,44 +16,13 @@ export async function deleteProjects(job: Job<CronQueuePayload>) {
return;
}
for (const project of projects) {
await db.project.delete({
where: {
id: project.id,
},
});
}
await deleteProjects(projects.map((project) => project.id));
logger.info('Deleting projects', {
projects,
});
projects.forEach((project) => {
job.log(`Delete project: "${project.id}"`);
});
const where = `project_id IN (${projects.map((project) => sqlstring.escape(project.id)).join(',')})`;
const tables = [
TABLE_NAMES.events,
TABLE_NAMES.profiles,
TABLE_NAMES.events_bots,
TABLE_NAMES.sessions,
TABLE_NAMES.cohort_events_mv,
TABLE_NAMES.dau_mv,
TABLE_NAMES.event_names_mv,
TABLE_NAMES.event_property_values_mv,
];
for (const table of tables) {
const query = `ALTER TABLE ${getReplicatedTableName(table)} DELETE WHERE ${where};`;
await ch.command({
query,
clickhouse_settings: {
lightweight_deletes_sync: '0',
},
});
}
await deleteFromClickhouse(projects.map((project) => project.id));
logger.info(`Deleted ${projects.length} projects`, {
projects,

View File

@@ -3,7 +3,7 @@ import type { Job } from 'bullmq';
import { eventBuffer, profileBuffer, sessionBuffer } from '@openpanel/db';
import type { CronQueuePayload } from '@openpanel/queue';
import { deleteProjects } from './cron.delete-projects';
import { jobdeleteProjects } from './cron.delete-projects';
import { ping } from './cron.ping';
import { salt } from './cron.salt';
@@ -25,7 +25,7 @@ export async function cronJob(job: Job<CronQueuePayload>) {
return await ping();
}
case 'deleteProjects': {
return await deleteProjects(job);
return await jobdeleteProjects(job);
}
}
}

View File

@@ -1,13 +1,13 @@
import type { Job } from 'bullmq';
import { logger as baseLogger } from '@/utils/logger';
import { getTime } from '@openpanel/common';
import {
type IClickhouseSession,
type IServiceCreateEventPayload,
type IServiceEvent,
TABLE_NAMES,
checkNotificationRulesForSessionEnd,
convertClickhouseDateToJs,
createEvent,
eventBuffer,
formatClickhouseDate,
@@ -65,10 +65,9 @@ export async function createSessionEnd(
const logger = baseLogger.child({
payload,
jobId: job.id,
reqId: payload.properties?.__reqId ?? 'unknown',
});
logger.info('Processing session end job');
logger.debug('Processing session end job');
const session = await sessionBuffer.getExistingSession(payload.sessionId);
@@ -77,7 +76,7 @@ export async function createSessionEnd(
}
try {
handleSessionEndNotifications({
await handleSessionEndNotifications({
session,
payload,
});
@@ -103,7 +102,9 @@ export async function createSessionEnd(
name: 'session_end',
duration: session.duration ?? 0,
path: lastScreenView?.path ?? '',
createdAt: new Date(getTime(session.ended_at) + 1000),
createdAt: new Date(
convertClickhouseDateToJs(session.ended_at).getTime() + 100,
),
profileId: lastScreenView?.profileId || payload.profileId,
});
}

View File

@@ -18,9 +18,7 @@ import {
} from '@openpanel/db';
import type { ILogger } from '@openpanel/logger';
import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue';
import type { Job } from 'bullmq';
import * as R from 'ramda';
import { omit } from 'ramda';
import { v4 as uuid } from 'uuid';
const GLOBAL_PROPERTIES = ['__path', '__referrer'];
@@ -33,10 +31,9 @@ const merge = <A, B>(a: Partial<A>, b: Partial<B>): A & B =>
async function createEventAndNotify(
payload: IServiceCreateEventPayload,
jobData: Job<EventsQueuePayloadIncomingEvent>['data']['payload'],
logger: ILogger,
) {
logger.info('Creating event', { event: payload, jobData });
logger.info('Creating event', { event: payload });
const [event] = await Promise.all([
createEvent(payload),
checkNotificationRulesForEvent(payload).catch(() => {}),
@@ -45,16 +42,7 @@ async function createEventAndNotify(
}
export async function incomingEvent(
job: Job<EventsQueuePayloadIncomingEvent>,
token?: string,
) {
return incomingEventPure(job.data.payload, job, token);
}
export async function incomingEventPure(
jobPayload: EventsQueuePayloadIncomingEvent['payload'],
job?: Job<EventsQueuePayloadIncomingEvent>,
token?: string,
) {
const {
geo,
@@ -63,6 +51,7 @@ export async function incomingEventPure(
projectId,
currentDeviceId,
previousDeviceId,
uaInfo: _uaInfo,
} = jobPayload;
const properties = body.properties ?? {};
const reqId = headers['request-id'] ?? 'unknown';
@@ -93,18 +82,17 @@ export async function incomingEventPure(
const userAgent = headers['user-agent'];
const sdkName = headers['openpanel-sdk-name'];
const sdkVersion = headers['openpanel-sdk-version'];
const uaInfo = parseUserAgent(userAgent, properties);
// TODO: Remove both user-agent and parseUserAgent
const uaInfo = _uaInfo ?? parseUserAgent(userAgent, properties);
const baseEvent = {
name: body.name,
profileId,
projectId,
properties: omit(GLOBAL_PROPERTIES, {
properties: R.omit(GLOBAL_PROPERTIES, {
...properties,
__user_agent: userAgent,
__hash: hash,
__query: query,
__reqId: reqId,
}),
createdAt,
duration: 0,
@@ -161,7 +149,7 @@ export async function incomingEventPure(
origin: screenView?.origin ?? baseEvent.origin,
};
return createEventAndNotify(payload as IServiceEvent, jobPayload, logger);
return createEventAndNotify(payload as IServiceEvent, logger);
}
const sessionEnd = await getSessionEnd({
@@ -197,7 +185,7 @@ export async function incomingEventPure(
});
}
const event = await createEventAndNotify(payload, jobPayload, logger);
const event = await createEventAndNotify(payload, logger);
if (!sessionEnd) {
logger.info('Creating session end job', { event: payload });

View File

@@ -1,6 +1,9 @@
import { type IServiceEvent, createEvent } from '@openpanel/db';
import { eventBuffer } from '@openpanel/db';
import { sessionsQueue } from '@openpanel/queue';
import {
type EventsQueuePayloadIncomingEvent,
sessionsQueue,
} from '@openpanel/queue';
import type { Job } from 'bullmq';
import { type Mock, beforeEach, describe, expect, it, vi } from 'vitest';
import { incomingEvent } from './events.incoming-event';
@@ -32,6 +35,28 @@ const geo = {
latitude: 0,
};
const uaInfo: EventsQueuePayloadIncomingEvent['payload']['uaInfo'] = {
isServer: false,
device: 'desktop',
os: 'Windows',
osVersion: '10',
browser: 'Chrome',
browserVersion: '91.0.4472.124',
brand: '',
model: '',
};
const uaInfoServer: EventsQueuePayloadIncomingEvent['payload']['uaInfo'] = {
isServer: true,
device: 'server',
os: '',
osVersion: '',
browser: '',
browserVersion: '',
brand: '',
model: '',
};
describe('incomingEvent', () => {
beforeEach(() => {
vi.clearAllMocks();
@@ -41,31 +66,29 @@ describe('incomingEvent', () => {
const spySessionsQueueAdd = vi.spyOn(sessionsQueue, 'add');
const timestamp = new Date();
// Mock job data
const jobData = {
payload: {
geo,
event: {
name: 'test_event',
timestamp: timestamp.toISOString(),
properties: { __path: 'https://example.com/test' },
},
headers: {
'request-id': '123',
'user-agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'openpanel-sdk-name': 'web',
'openpanel-sdk-version': '1.0.0',
},
projectId,
currentDeviceId,
previousDeviceId,
const jobData: EventsQueuePayloadIncomingEvent['payload'] = {
geo,
event: {
name: 'test_event',
timestamp: timestamp.toISOString(),
isTimestampFromThePast: false,
properties: { __path: 'https://example.com/test' },
},
uaInfo,
headers: {
'request-id': '123',
'user-agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'openpanel-sdk-name': 'web',
'openpanel-sdk-version': '1.0.0',
},
projectId,
currentDeviceId,
previousDeviceId,
};
const job = { data: jobData } as Job;
// Execute the job
await incomingEvent(job);
await incomingEvent(jobData);
const event = {
name: 'test_event',
@@ -78,8 +101,6 @@ describe('incomingEvent', () => {
properties: {
__hash: undefined,
__query: undefined,
__user_agent: jobData.payload.headers['user-agent'],
__reqId: jobData.payload.headers['request-id'],
},
createdAt: timestamp,
country: 'US',
@@ -92,16 +113,16 @@ describe('incomingEvent', () => {
browser: 'Chrome',
browserVersion: '91.0.4472.124',
device: 'desktop',
brand: undefined,
model: undefined,
brand: '',
model: '',
duration: 0,
path: '/test',
origin: 'https://example.com',
referrer: '',
referrerName: '',
referrerType: '',
sdkName: jobData.payload.headers['openpanel-sdk-name'],
sdkVersion: jobData.payload.headers['openpanel-sdk-version'],
sdkName: jobData.headers['openpanel-sdk-name'],
sdkVersion: jobData.headers['openpanel-sdk-version'],
};
expect(spySessionsQueueAdd).toHaveBeenCalledWith(
@@ -135,29 +156,27 @@ describe('incomingEvent', () => {
const timestamp = new Date();
// Mock job data
const jobData = {
payload: {
geo,
event: {
name: 'test_event',
timestamp: timestamp.toISOString(),
properties: { __path: 'https://example.com/test' },
},
headers: {
'request-id': '123',
'user-agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'openpanel-sdk-name': 'web',
'openpanel-sdk-version': '1.0.0',
},
projectId,
currentDeviceId,
previousDeviceId,
const jobData: EventsQueuePayloadIncomingEvent['payload'] = {
geo,
event: {
name: 'test_event',
timestamp: timestamp.toISOString(),
properties: { __path: 'https://example.com/test' },
isTimestampFromThePast: false,
},
headers: {
'request-id': '123',
'user-agent':
'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'openpanel-sdk-name': 'web',
'openpanel-sdk-version': '1.0.0',
},
uaInfo,
projectId,
currentDeviceId,
previousDeviceId,
};
const job = { data: jobData } as Job;
const changeDelay = vi.fn();
const updateData = vi.fn();
spySessionsQueueGetJob.mockResolvedValueOnce({
@@ -175,7 +194,7 @@ describe('incomingEvent', () => {
},
} as Partial<Job> as Job);
// Execute the job
await incomingEvent(job);
await incomingEvent(jobData);
const event = {
name: 'test_event',
@@ -186,8 +205,6 @@ describe('incomingEvent', () => {
properties: {
__hash: undefined,
__query: undefined,
__user_agent: jobData.payload.headers['user-agent'],
__reqId: jobData.payload.headers['request-id'],
},
createdAt: timestamp,
country: 'US',
@@ -200,16 +217,16 @@ describe('incomingEvent', () => {
browser: 'Chrome',
browserVersion: '91.0.4472.124',
device: 'desktop',
brand: undefined,
model: undefined,
brand: '',
model: '',
duration: 0,
path: '/test',
origin: 'https://example.com',
referrer: '',
referrerName: '',
referrerType: '',
sdkName: jobData.payload.headers['openpanel-sdk-name'],
sdkVersion: jobData.payload.headers['openpanel-sdk-version'],
sdkName: jobData.headers['openpanel-sdk-name'],
sdkVersion: jobData.headers['openpanel-sdk-version'],
};
expect(spySessionsQueueAdd).toHaveBeenCalledTimes(0);
@@ -220,29 +237,27 @@ describe('incomingEvent', () => {
it('should handle server events (with existing screen view)', async () => {
const timestamp = new Date();
const jobData = {
payload: {
geo,
event: {
name: 'server_event',
timestamp: timestamp.toISOString(),
properties: { custom_property: 'test_value' },
profileId: 'profile-123',
},
headers: {
'user-agent': 'OpenPanel Server/1.0',
'openpanel-sdk-name': 'server',
'openpanel-sdk-version': '1.0.0',
'request-id': '123',
},
projectId,
currentDeviceId: '',
previousDeviceId: '',
const jobData: EventsQueuePayloadIncomingEvent['payload'] = {
geo,
event: {
name: 'server_event',
timestamp: timestamp.toISOString(),
properties: { custom_property: 'test_value' },
profileId: 'profile-123',
isTimestampFromThePast: false,
},
headers: {
'user-agent': 'OpenPanel Server/1.0',
'openpanel-sdk-name': 'server',
'openpanel-sdk-version': '1.0.0',
'request-id': '123',
},
projectId,
currentDeviceId: '',
previousDeviceId: '',
uaInfo: uaInfoServer,
};
const job = { data: jobData } as Job;
const mockLastScreenView = {
deviceId: 'last-device-123',
sessionId: 'last-session-456',
@@ -268,7 +283,7 @@ describe('incomingEvent', () => {
mockLastScreenView as IServiceEvent,
);
await incomingEvent(job);
await incomingEvent(jobData);
expect((createEvent as Mock).mock.calls[0]![0]).toStrictEqual({
name: 'server_event',
@@ -278,8 +293,6 @@ describe('incomingEvent', () => {
projectId,
properties: {
custom_property: 'test_value',
__user_agent: 'OpenPanel Server/1.0',
__reqId: '123',
__hash: undefined,
__query: undefined,
},
@@ -311,33 +324,31 @@ describe('incomingEvent', () => {
it('should handle server events (without existing screen view)', async () => {
const timestamp = new Date();
const jobData = {
payload: {
geo,
event: {
name: 'server_event',
timestamp: timestamp.toISOString(),
properties: { custom_property: 'test_value' },
profileId: 'profile-123',
},
headers: {
'user-agent': 'OpenPanel Server/1.0',
'openpanel-sdk-name': 'server',
'openpanel-sdk-version': '1.0.0',
'request-id': '123',
},
projectId,
currentDeviceId: '',
previousDeviceId: '',
const jobData: EventsQueuePayloadIncomingEvent['payload'] = {
geo,
event: {
name: 'server_event',
timestamp: timestamp.toISOString(),
properties: { custom_property: 'test_value' },
profileId: 'profile-123',
isTimestampFromThePast: false,
},
headers: {
'user-agent': 'OpenPanel Server/1.0',
'openpanel-sdk-name': 'server',
'openpanel-sdk-version': '1.0.0',
'request-id': '123',
},
projectId,
currentDeviceId: '',
previousDeviceId: '',
uaInfo: uaInfoServer,
};
const job = { data: jobData } as Job;
// Mock getLastScreenView to return null
vi.mocked(eventBuffer.getLastScreenView).mockResolvedValueOnce(null);
await incomingEvent(job);
await incomingEvent(jobData);
expect((createEvent as Mock).mock.calls[0]![0]).toStrictEqual({
name: 'server_event',
@@ -347,8 +358,6 @@ describe('incomingEvent', () => {
projectId,
properties: {
custom_property: 'test_value',
__user_agent: 'OpenPanel Server/1.0',
__reqId: '123',
__hash: undefined,
__query: undefined,
},

View File

@@ -1,15 +0,0 @@
import type { Job } from 'bullmq';
import type {
EventsQueuePayload,
EventsQueuePayloadIncomingEvent,
} from '@openpanel/queue';
import { incomingEvent } from './events.incoming-event';
export async function eventsJob(job: Job<EventsQueuePayload>, token?: string) {
return await incomingEvent(
job as Job<EventsQueuePayloadIncomingEvent>,
token,
);
}

View File

@@ -2,23 +2,32 @@ import client from 'prom-client';
import {
botBuffer,
db,
eventBuffer,
profileBuffer,
sessionBuffer,
} from '@openpanel/db';
import { cronQueue, eventsGroupQueue, sessionsQueue } from '@openpanel/queue';
import { cronQueue, eventsGroupQueues, sessionsQueue } from '@openpanel/queue';
const Registry = client.Registry;
export const register = new Registry();
const queues = [sessionsQueue, cronQueue, eventsGroupQueue];
const queues = [sessionsQueue, cronQueue, ...eventsGroupQueues];
// Histogram to track job processing time for eventsGroupQueues
export const eventsGroupJobDuration = new client.Histogram({
name: 'job_duration_ms',
help: 'Duration of job processing (in ms)',
labelNames: ['name', 'status'],
buckets: [10, 25, 50, 100, 250, 500, 750, 1000, 2000, 5000, 10000, 30000], // 10ms to 30s
});
register.registerMetric(eventsGroupJobDuration);
queues.forEach((queue) => {
register.registerMetric(
new client.Gauge({
name: `${queue.name}_active_count`,
name: `${queue.name.replace(/[\{\}]/g, '')}_active_count`,
help: 'Active count',
async collect() {
const metric = await queue.getActiveCount();
@@ -29,7 +38,7 @@ queues.forEach((queue) => {
register.registerMetric(
new client.Gauge({
name: `${queue.name}_delayed_count`,
name: `${queue.name.replace(/[\{\}]/g, '')}_delayed_count`,
help: 'Delayed count',
async collect() {
const metric = await queue.getDelayedCount();
@@ -40,7 +49,7 @@ queues.forEach((queue) => {
register.registerMetric(
new client.Gauge({
name: `${queue.name}_failed_count`,
name: `${queue.name.replace(/[\{\}]/g, '')}_failed_count`,
help: 'Failed count',
async collect() {
const metric = await queue.getFailedCount();
@@ -51,7 +60,7 @@ queues.forEach((queue) => {
register.registerMetric(
new client.Gauge({
name: `${queue.name}_completed_count`,
name: `${queue.name.replace(/[\{\}]/g, '')}_completed_count`,
help: 'Completed count',
async collect() {
const metric = await queue.getCompletedCount();
@@ -62,7 +71,7 @@ queues.forEach((queue) => {
register.registerMetric(
new client.Gauge({
name: `${queue.name}_waiting_count`,
name: `${queue.name.replace(/[\{\}]/g, '')}_waiting_count`,
help: 'Waiting count',
async collect() {
const metric = await queue.getWaitingCount();

View File

@@ -113,13 +113,12 @@ export async function getSessionEndJob(args: {
} | null> {
const state = await job.getState();
if (state !== 'delayed') {
logger.info(`[session-handler] Session end job is in "${state}" state`, {
logger.debug(`[session-handler] Session end job is in "${state}" state`, {
state,
retryCount,
jobTimestamp: new Date(job.timestamp).toISOString(),
jobDelta: Date.now() - job.timestamp,
jobId: job.id,
reqId: job.data.payload.properties?.__reqId ?? 'unknown',
payload: job.data.payload,
});
}

View File

@@ -1,4 +1,4 @@
version: '3'
version: "3"
services:
op-db:
@@ -12,12 +12,25 @@ services:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
op-df:
image: docker.dragonflydb.io/dragonflydb/dragonfly:latest
container_name: op-df
restart: always
ports:
- "6380:6379"
ulimits:
memlock: -1
nofile: 65535
command:
- "--cluster_mode=emulated"
- "--lock_on_hashtags"
op-kv:
image: redis:7.2.5-alpine
restart: always
volumes:
- ./docker/data/op-kv-data:/data
command: [ 'redis-server', '--maxmemory-policy', 'noeviction' ]
command: ["redis-server", "--maxmemory-policy", "noeviction"]
ports:
- 6379:6379

View File

@@ -9,3 +9,4 @@ export * from './src/url';
export * from './src/id';
export * from './src/get-previous-metric';
export * from './src/group-by-labels';
export * from './server/get-client-ip';

View File

@@ -5,7 +5,8 @@
"main": "index.ts",
"exports": {
".": "./index.ts",
"./server": "./server/index.ts"
"./server": "./server/index.ts",
"./server/get-client-ip": "./server/get-client-ip.ts"
},
"scripts": {
"test": "vitest",
@@ -15,6 +16,7 @@
"dependencies": {
"@openpanel/constants": "workspace:*",
"date-fns": "^3.3.1",
"lru-cache": "^11.2.2",
"luxon": "^3.6.1",
"mathjs": "^12.3.2",
"nanoid": "^5.0.7",

View File

@@ -0,0 +1,94 @@
/**
* Get client IP from headers
*
* Can be configured via IP_HEADER_ORDER env variable
* Example: IP_HEADER_ORDER="cf-connecting-ip,x-real-ip,x-forwarded-for"
*/
export const DEFAULT_HEADER_ORDER = [
'cf-connecting-ip',
'true-client-ip',
'x-forwarded-for',
'x-client-ip',
'x-real-ip',
'fastly-client-ip',
'x-cluster-client-ip',
'x-appengine-user-ip',
'do-connecting-ip',
'x-nf-client-connection-ip',
'x-forwarded',
'forwarded',
];
function isPublicIp(ip: string): boolean {
return (
!ip.startsWith('10.') &&
!ip.startsWith('172.16.') &&
!ip.startsWith('192.168.')
);
}
function getHeaderOrder(): string[] {
if (typeof process !== 'undefined' && process.env?.IP_HEADER_ORDER) {
return process.env.IP_HEADER_ORDER.split(',').map((h) => h.trim());
}
return DEFAULT_HEADER_ORDER;
}
function isValidIp(ip: string): boolean {
// Basic IP validation
const ipv4 = /^(\d{1,3}\.){3}\d{1,3}$/;
const ipv6 = /^([0-9a-fA-F]{0,4}:){2,7}[0-9a-fA-F]{0,4}$/;
return isPublicIp(ip) && (ipv4.test(ip) || ipv6.test(ip));
}
export function getClientIpFromHeaders(
headers: Record<string, string | string[] | undefined> | Headers,
overrideHeaderName?: string,
): string {
let headerOrder = getHeaderOrder();
if (overrideHeaderName) {
headerOrder = [overrideHeaderName];
}
for (const headerName of headerOrder) {
let value: string | null = null;
// Get header value
if (headers instanceof Headers) {
value = headers.get(headerName);
} else {
const headerValue = headers[headerName];
if (Array.isArray(headerValue)) {
value = headerValue[0] || null;
} else {
value = headerValue || null;
}
}
if (!value) continue;
// Handle x-forwarded-for (comma separated)
if (headerName === 'x-forwarded-for') {
const firstIp = value.split(',')[0]?.trim();
if (firstIp && isValidIp(firstIp)) {
return firstIp;
}
}
// Handle forwarded header (RFC 7239)
else if (headerName === 'forwarded') {
const match = value.match(/for=(?:"?\[?([^\]"]+)\]?"?)/i);
const ip = match?.[1];
if (ip && isValidIp(ip)) {
return ip;
}
}
// Regular headers
else if (isValidIp(value)) {
return value;
}
}
return '';
}

View File

@@ -1,3 +1,4 @@
import { LRUCache } from 'lru-cache';
import { UAParser } from 'ua-parser-js';
const parsedServerUa = {
@@ -11,8 +12,30 @@ const parsedServerUa = {
model: '',
} as const;
// Pre-compile all regex patterns for better performance
const IPHONE_MODEL_REGEX = /(iPhone|iPad)\s*([0-9,]+)/i;
const IOS_MODEL_REGEX = /(iOS)\s*([0-9\.]+)/i;
const IPAD_OS_VERSION_REGEX = /iPadOS\s*([0-9_]+)/i;
const SINGLE_NAME_VERSION_REGEX = /^[^\/]+\/[\d.]+$/;
// Device detection regexes
const SAMSUNG_MOBILE_REGEX = /SM-[ABDEFGJMNRWZ][0-9]+/i;
const SAMSUNG_TABLET_REGEX = /SM-T[0-9]+/i;
const LG_MOBILE_REGEX = /LG-[A-Z0-9]+/i;
const MOBILE_REGEX_1 =
/(android|bb\d+|meego).+mobile|avantgo|bada\/|blackberry|blazer|compal|elaine|fennec|hiptop|iemobile|ip(hone|od)|iris|kindle|lge |maemo|midp|mmp|mobile.+firefox|netfront|opera m(ob|in)i|palm( os)?|phone|p(ixi|re)\/|plucker|pocket|psp|series(4|6)0|symbian|treo|up\.(browser|link)|vodafone|wap|windows ce|xda|xiino/i;
const MOBILE_REGEX_2 =
/1207|6310|6590|3gso|4thp|50[1-6]i|770s|802s|a wa|abac|ac(er|oo|s-)|ai(ko|rn)|al(av|ca|co)|amoi|an(ex|ny|yw)|aptu|ar(ch|go)|as(te|us)|attw|au(di|-m|r |s )|avan|be(ck|ll|nq)|bi(lb|rd)|bl(ac|az)|br(e|v)w|bumb|bw-(n|u)|c55\/|capi|ccwa|cdm-|cell|chtm|cldc|cmd-|co(mp|nd)|craw|da(it|ll|ng)|dbte|dc-s|devi|dica|dmob|do(c|p)o|ds(12|-d)|el(49|ai)|em(l2|ul)|er(ic|k0)|esl8|ez([4-7]0|os|wa|ze)|fetc|fly(-|_)|g1 u|g560|gene|gf-5|g-mo|go(\.w|od)|gr(ad|un)|haie|hcit|hd-(m|p|t)|hei-|hi(pt|ta)|hp( i|ip)|hs-c|ht(c(-| |_|a|g|p|s|t)|tp)|hu(aw|tc)|i-(20|go|ma)|i230|iac( |-|\/)|ibro|idea|ig01|ikom|im1k|inno|ipaq|iris|ja(t|v)a|jbro|jemu|jigs|kddi|keji|kgt( |\/)|klon|kpt |kwc-|kyo(c|k)|le(no|xi)|lg( g|\/(k|l|u)|50|54|-[a-w])|libw|lynx|m1-w|m3ga|m50\/|ma(te|ui|xo)|mc(01|21|ca)|m-cr|me(rc|ri)|mi(o8|oa|ts)|mmef|mo(01|02|bi|de|do|t(-| |o|v)|zz)|mt(50|p1|v )|mwbp|mywa|n10[0-2]|n20[2-3]|n30(0|2)|n50(0|2|5)|n7(0(0|1)|10)|ne((c|m)-|on|tf|wf|wg|wt)|nok(6|i)|nzph|o2im|op(ti|wv)|oran|owg1|p800|pan(a|d|t)|pdxg|pg(13|-([1-8]|c))|phil|pire|pl(ay|uc)|pn-2|po(ck|rt|se)|prox|psio|pt-g|qa-a|qc(07|12|21|32|60|-[2-7]|i-)|qtek|r380|r600|raks|rim9|ro(ve|zo)|s55\/|sa(ge|ma|mm|ms|ny|va)|sc(01|h-|oo|p-)|sdk\/|se(c(-|0|1)|47|mc|nd|ri)|sgh-|shar|sie(-|m)|sk-0|sl(45|id)|sm(al|ar|b3|it|t5)|so(ft|ny)|sp(01|h-|v-|v )|sy(01|mb)|t2(18|50)|t6(00|10|18)|ta(gt|lk)|tcl-|tdg-|tel(i|m)|tim-|t-mo|to(pl|sh)|ts(70|m-|m3|m5)|tx-9|up(\.b|g1|si)|utst|v400|v750|veri|vi(rg|te)|vk(40|5[0-3]|-v)|vm40|voda|vulc|vx(52|53|60|61|70|80|81|83|85|98)|w3c(-| )|webc|whit|wi(g |nc|nw)|wmlb|wonu|x700|yas-|your|zeto|zte-/i;
const TABLET_REGEX = /tablet|ipad|xoom|sch-i800|kindle|silk|playbook/i;
const ANDROID_REGEX = /android/i;
const MOBILE_KEYWORD_REGEX = /mobile/i;
// Cache for parsed results - stores up to 1000 unique user agents
const parseCache = new LRUCache<string, UAParser.IResult>({
ttl: 1000 * 60 * 5,
ttlAutopurge: true,
max: 1000,
});
const isIphone = (ua: string) => {
const model = ua.match(IPHONE_MODEL_REGEX);
@@ -27,6 +50,12 @@ const isIphone = (ua: string) => {
};
const parse = (ua: string): UAParser.IResult => {
// Check cache first
const cached = parseCache.get(ua);
if (cached) {
return cached;
}
const parser = new UAParser(ua);
const res = parser.getResult();
@@ -35,7 +64,7 @@ const parse = (ua: string): UAParser.IResult => {
if (!res.device.model && !res.os.name) {
const iphone = isIphone(ua);
if (iphone) {
return {
const result = {
...res,
device: {
...res.device,
@@ -48,27 +77,34 @@ const parse = (ua: string): UAParser.IResult => {
version: iphone.osVersion,
},
};
parseCache.set(ua, result);
return result;
}
}
// Mozilla/5.0 (iPad; iPadOS 18_0; like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/18.0
if (res.device.model === 'iPad' && !res.os.version) {
const osVersion = ua.match(/iPadOS\s*([0-9_]+)/i);
const osVersion = ua.match(IPAD_OS_VERSION_REGEX);
if (osVersion) {
return {
const result = {
...res,
os: {
...res.os,
version: osVersion[1]!.replace('_', '.'),
version: osVersion[1]!.replace(/_/g, '.'),
},
};
parseCache.set(ua, result);
return result;
}
}
// Cache the result
parseCache.set(ua, res);
return res;
};
export type UserAgentInfo = ReturnType<typeof parseUserAgent>;
export type UserAgentResult = ReturnType<typeof parseUserAgent>;
export function parseUserAgent(
ua?: string | null,
overrides?: Record<string, unknown>,
@@ -117,8 +153,7 @@ export function parseUserAgent(
function isServer(res: UAParser.IResult) {
// Matches user agents like "Go-http-client/1.0" or "Go Http Client/1.0"
// It should just match the first name (with optional spaces) and version
const isSingleNameWithVersion = !!res.ua.match(/^[^\/]+\/[\d.]+$/);
if (isSingleNameWithVersion) {
if (SINGLE_NAME_VERSION_REGEX.test(res.ua)) {
return true;
}
@@ -133,39 +168,39 @@ function isServer(res: UAParser.IResult) {
export function getDevice(ua: string) {
// Samsung mobile devices use SM-[A,G,N,etc]XXX pattern
if (/SM-[ABDEFGJMNRWZ][0-9]+/i.test(ua)) {
const isSamsungMobile = SAMSUNG_MOBILE_REGEX.test(ua);
if (isSamsungMobile) {
return 'mobile';
}
// Samsung tablets use SM-TXXX pattern
if (/SM-T[0-9]+/i.test(ua)) {
if (SAMSUNG_TABLET_REGEX.test(ua)) {
return 'tablet';
}
// LG mobile devices use LG-XXXX pattern
if (/LG-[A-Z0-9]+/i.test(ua)) {
const isLGMobile = LG_MOBILE_REGEX.test(ua);
if (isLGMobile) {
return 'mobile';
}
const mobile1 =
/(android|bb\d+|meego).+mobile|avantgo|bada\/|blackberry|blazer|compal|elaine|fennec|hiptop|iemobile|ip(hone|od)|iris|kindle|lge |maemo|midp|mmp|mobile.+firefox|netfront|opera m(ob|in)i|palm( os)?|phone|p(ixi|re)\/|plucker|pocket|psp|series(4|6)0|symbian|treo|up\.(browser|link)|vodafone|wap|windows ce|xda|xiino/i.test(
ua,
);
const mobile2 =
/1207|6310|6590|3gso|4thp|50[1-6]i|770s|802s|a wa|abac|ac(er|oo|s-)|ai(ko|rn)|al(av|ca|co)|amoi|an(ex|ny|yw)|aptu|ar(ch|go)|as(te|us)|attw|au(di|-m|r |s )|avan|be(ck|ll|nq)|bi(lb|rd)|bl(ac|az)|br(e|v)w|bumb|bw-(n|u)|c55\/|capi|ccwa|cdm-|cell|chtm|cldc|cmd-|co(mp|nd)|craw|da(it|ll|ng)|dbte|dc-s|devi|dica|dmob|do(c|p)o|ds(12|-d)|el(49|ai)|em(l2|ul)|er(ic|k0)|esl8|ez([4-7]0|os|wa|ze)|fetc|fly(-|_)|g1 u|g560|gene|gf-5|g-mo|go(\.w|od)|gr(ad|un)|haie|hcit|hd-(m|p|t)|hei-|hi(pt|ta)|hp( i|ip)|hs-c|ht(c(-| |_|a|g|p|s|t)|tp)|hu(aw|tc)|i-(20|go|ma)|i230|iac( |-|\/)|ibro|idea|ig01|ikom|im1k|inno|ipaq|iris|ja(t|v)a|jbro|jemu|jigs|kddi|keji|kgt( |\/)|klon|kpt |kwc-|kyo(c|k)|le(no|xi)|lg( g|\/(k|l|u)|50|54|-[a-w])|libw|lynx|m1-w|m3ga|m50\/|ma(te|ui|xo)|mc(01|21|ca)|m-cr|me(rc|ri)|mi(o8|oa|ts)|mmef|mo(01|02|bi|de|do|t(-| |o|v)|zz)|mt(50|p1|v )|mwbp|mywa|n10[0-2]|n20[2-3]|n30(0|2)|n50(0|2|5)|n7(0(0|1)|10)|ne((c|m)-|on|tf|wf|wg|wt)|nok(6|i)|nzph|o2im|op(ti|wv)|oran|owg1|p800|pan(a|d|t)|pdxg|pg(13|-([1-8]|c))|phil|pire|pl(ay|uc)|pn-2|po(ck|rt|se)|prox|psio|pt-g|qa-a|qc(07|12|21|32|60|-[2-7]|i-)|qtek|r380|r600|raks|rim9|ro(ve|zo)|s55\/|sa(ge|ma|mm|ms|ny|va)|sc(01|h-|oo|p-)|sdk\/|se(c(-|0|1)|47|mc|nd|ri)|sgh-|shar|sie(-|m)|sk-0|sl(45|id)|sm(al|ar|b3|it|t5)|so(ft|ny)|sp(01|h-|v-|v )|sy(01|mb)|t2(18|50)|t6(00|10|18)|ta(gt|lk)|tcl-|tdg-|tel(i|m)|tim-|t-mo|to(pl|sh)|ts(70|m-|m3|m5)|tx-9|up(\.b|g1|si)|utst|v400|v750|veri|vi(rg|te)|vk(40|5[0-3]|-v)|vm40|voda|vulc|vx(52|53|60|61|70|80|81|83|85|98)|w3c(-| )|webc|whit|wi(g |nc|nw)|wmlb|wonu|x700|yas-|your|zeto|zte-/i.test(
ua.slice(0, 4),
);
const tablet =
/tablet|ipad|xoom|sch-i800|kindle|silk|playbook/i.test(ua) ||
(/android/i.test(ua) &&
!/mobile/i.test(ua) &&
!/SM-[ABDEFGJMNRWZ][0-9]+/i.test(ua) &&
!/LG-[A-Z0-9]+/i.test(ua));
// Check for mobile patterns
const mobile1 = MOBILE_REGEX_1.test(ua);
const mobile2 = MOBILE_REGEX_2.test(ua.slice(0, 4));
if (mobile1 || mobile2) {
return 'mobile';
}
// Check for tablet patterns
// Note: We already checked for Samsung mobile/tablet and LG mobile above
const isAndroid = ANDROID_REGEX.test(ua);
const hasMobileKeyword = MOBILE_KEYWORD_REGEX.test(ua);
const tablet =
TABLET_REGEX.test(ua) ||
(isAndroid && !hasMobileKeyword && !isSamsungMobile && !isLGMobile);
if (tablet) {
return 'tablet';
}

View File

@@ -4,12 +4,14 @@ export interface ISerieDataItem {
label_2?: string | null | undefined;
label_3?: string | null | undefined;
count: number;
total_count?: number;
date: string;
}
interface GroupedDataPoint {
date: string;
count: number;
total_count?: number;
}
interface GroupedResult {
@@ -45,6 +47,7 @@ export function groupByLabels(data: ISerieDataItem[]): GroupedResult[] {
group.data.push({
date: row.date,
count: row.count,
total_count: row.total_count,
});
});
@@ -63,7 +66,7 @@ export function groupByLabels(data: ISerieDataItem[]): GroupedResult[] {
// This will ensure that all dates are present in the data array
data: Array.from(timestamps).map((date) => {
const dataPoint = group.data.find((dp) => dp.date === date);
return dataPoint || { date, count: 0 };
return dataPoint || { date, count: 0, total_count: 0 };
}),
};
});

View File

@@ -180,6 +180,7 @@ export const deprecated_timeRanges = {
};
export const metrics = {
count: 'count',
sum: 'sum',
average: 'average',
min: 'min',

View File

@@ -21,6 +21,7 @@ export * from './src/services/id.service';
export * from './src/services/retention.service';
export * from './src/services/notification.service';
export * from './src/services/access.service';
export * from './src/services/delete.service';
export * from './src/buffers';
export * from './src/types';
export * from './src/clickhouse/query-builder';

View File

@@ -8,18 +8,21 @@ export class BaseBuffer {
lockKey: string;
lockTimeout = 60;
onFlush: () => void;
enableParallelProcessing: boolean;
protected bufferCounterKey: string;
constructor(options: {
name: string;
onFlush: () => Promise<void>;
enableParallelProcessing?: boolean;
}) {
this.logger = createLogger({ name: options.name });
this.name = options.name;
this.lockKey = `lock:${this.name}`;
this.onFlush = options.onFlush;
this.bufferCounterKey = `${this.name}:buffer:count`;
this.enableParallelProcessing = options.enableParallelProcessing ?? false;
}
protected chunks<T>(items: T[], size: number) {
@@ -91,6 +94,26 @@ export class BaseBuffer {
async tryFlush() {
const now = performance.now();
// Parallel mode: No locking, multiple workers can process simultaneously
if (this.enableParallelProcessing) {
try {
this.logger.debug('Processing buffer (parallel mode)...');
await this.onFlush();
this.logger.debug('Flush completed (parallel mode)', {
elapsed: performance.now() - now,
});
} catch (error) {
this.logger.error('Failed to process buffer (parallel mode)', {
error,
});
// In parallel mode, we can't safely reset counter as other workers might be active
// Counter will be resynced automatically by the periodic job
}
return;
}
// Sequential mode: Use lock to ensure only one worker processes at a time
const lockId = generateSecureId('lock');
const acquired = await getRedisCache().set(
this.lockKey,
@@ -101,7 +124,7 @@ export class BaseBuffer {
);
if (acquired === 'OK') {
try {
this.logger.info('Acquired lock. Processing buffer...', {
this.logger.debug('Acquired lock. Processing buffer...', {
lockId,
});
await this.onFlush();
@@ -117,7 +140,7 @@ export class BaseBuffer {
}
} finally {
await this.releaseLock(lockId);
this.logger.info('Flush completed', {
this.logger.debug('Flush completed', {
elapsed: performance.now() - now,
lockId,
});

View File

@@ -71,7 +71,7 @@ export class BotBuffer extends BaseBuffer {
.decrby(this.bufferCounterKey, events.length)
.exec();
this.logger.info('Processed bot events', {
this.logger.debug('Processed bot events', {
count: events.length,
});
} catch (error) {

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@@ -12,12 +12,12 @@ export class ProfileBuffer extends BaseBuffer {
private batchSize = process.env.PROFILE_BUFFER_BATCH_SIZE
? Number.parseInt(process.env.PROFILE_BUFFER_BATCH_SIZE, 10)
: 200;
private daysToKeep = process.env.PROFILE_BUFFER_DAYS_TO_KEEP
? Number.parseInt(process.env.PROFILE_BUFFER_DAYS_TO_KEEP, 10)
: 7;
private chunkSize = process.env.PROFILE_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.PROFILE_BUFFER_CHUNK_SIZE, 10)
: 1000;
private ttlInSeconds = process.env.PROFILE_BUFFER_TTL_IN_SECONDS
? Number.parseInt(process.env.PROFILE_BUFFER_TTL_IN_SECONDS, 10)
: 60 * 60;
private readonly redisKey = 'profile-buffer';
private readonly redisProfilePrefix = 'profile-cache:';
@@ -49,7 +49,7 @@ export class ProfileBuffer extends BaseBuffer {
profileId: profile.id,
projectId: profile.project_id,
});
return (await getRedisCache().exists(cacheKey)) === 1;
return (await this.redis.exists(cacheKey)) === 1;
}
async add(profile: IClickhouseProfile, isFromEvent = false) {
@@ -90,9 +90,6 @@ export class ProfileBuffer extends BaseBuffer {
profile,
});
const cacheTtl = profile.is_external
? 60 * 60 * 24 * this.daysToKeep
: 60 * 60; // 1 hour for internal profiles
const cacheKey = this.getProfileCacheKey({
profileId: profile.id,
projectId: profile.project_id,
@@ -100,7 +97,7 @@ export class ProfileBuffer extends BaseBuffer {
const result = await this.redis
.multi()
.set(cacheKey, JSON.stringify(mergedProfile), 'EX', cacheTtl)
.set(cacheKey, JSON.stringify(mergedProfile), 'EX', this.ttlInSeconds)
.rpush(this.redisKey, JSON.stringify(mergedProfile))
.incr(this.bufferCounterKey)
.llen(this.redisKey)
@@ -120,7 +117,6 @@ export class ProfileBuffer extends BaseBuffer {
batchSize: this.batchSize,
});
if (bufferLength >= this.batchSize) {
this.logger.info('Buffer full, initiating flush');
await this.tryFlush();
}
} catch (error) {
@@ -137,18 +133,33 @@ export class ProfileBuffer extends BaseBuffer {
projectId: profile.project_id,
});
const existingProfile = await getRedisCache().get(cacheKey);
const existingProfile = await this.fetchFromCache(
profile.id,
profile.project_id,
);
if (existingProfile) {
const parsedProfile = getSafeJson<IClickhouseProfile>(existingProfile);
if (parsedProfile) {
logger.debug('Profile found in Redis');
return parsedProfile;
}
logger.debug('Profile found in Redis');
return existingProfile;
}
return this.fetchFromClickhouse(profile, logger);
}
public async fetchFromCache(
profileId: string,
projectId: string,
): Promise<IClickhouseProfile | null> {
const cacheKey = this.getProfileCacheKey({
profileId,
projectId,
});
const existingProfile = await this.redis.get(cacheKey);
if (!existingProfile) {
return null;
}
return getSafeJson<IClickhouseProfile>(existingProfile);
}
private async fetchFromClickhouse(
profile: IClickhouseProfile,
logger: ILogger,
@@ -176,7 +187,7 @@ export class ProfileBuffer extends BaseBuffer {
async processBuffer() {
try {
this.logger.info('Starting profile buffer processing');
this.logger.debug('Starting profile buffer processing');
const profiles = await this.redis.lrange(
this.redisKey,
0,
@@ -188,7 +199,7 @@ export class ProfileBuffer extends BaseBuffer {
return;
}
this.logger.info(`Processing ${profiles.length} profiles in buffer`);
this.logger.debug(`Processing ${profiles.length} profiles in buffer`);
const parsedProfiles = profiles.map((p) =>
getSafeJson<IClickhouseProfile>(p),
);
@@ -208,7 +219,7 @@ export class ProfileBuffer extends BaseBuffer {
.decrby(this.bufferCounterKey, profiles.length)
.exec();
this.logger.info('Successfully completed profile processing', {
this.logger.debug('Successfully completed profile processing', {
totalProfiles: profiles.length,
});
} catch (error) {

View File

@@ -12,6 +12,9 @@ export class SessionBuffer extends BaseBuffer {
private batchSize = process.env.SESSION_BUFFER_BATCH_SIZE
? Number.parseInt(process.env.SESSION_BUFFER_BATCH_SIZE, 10)
: 1000;
private chunkSize = process.env.SESSION_BUFFER_CHUNK_SIZE
? Number.parseInt(process.env.SESSION_BUFFER_CHUNK_SIZE, 10)
: 1000;
private readonly redisKey = 'session-buffer';
private redis: Redis;
@@ -209,7 +212,7 @@ export class SessionBuffer extends BaseBuffer {
};
});
for (const chunk of this.chunks(sessions, 1000)) {
for (const chunk of this.chunks(sessions, this.chunkSize)) {
// Insert to ClickHouse
await ch.insert({
table: TABLE_NAMES.sessions,
@@ -225,7 +228,7 @@ export class SessionBuffer extends BaseBuffer {
.decrby(this.bufferCounterKey, events.length);
await multi.exec();
this.logger.info('Processed sessions', {
this.logger.debug('Processed sessions', {
count: events.length,
});
} catch (error) {

View File

@@ -24,10 +24,13 @@ type WarnLogParams = LogParams & { err?: Error };
class CustomLogger implements Logger {
trace({ message, args }: LogParams) {
logger.info(message, args);
logger.debug(message, args);
}
debug({ message, args }: LogParams) {
logger.info(message, args);
if (message.includes('Query:') && args?.response_status === 200) {
return;
}
logger.debug(message, args);
}
info({ message, args }: LogParams) {
logger.info(message, args);
@@ -157,8 +160,6 @@ export const ch = new Proxy(originalCh, {
return (...args: any[]) =>
withRetry(() => {
args[0].clickhouse_settings = {
// Allow bigger HTTP payloads/time to stream rows
wait_for_async_insert: 1,
// Increase insert timeouts and buffer sizes for large batches
max_execution_time: 300,
max_insert_block_size: '500000',

View File

@@ -62,6 +62,7 @@ export function getChartSql({
projectId,
limit,
timezone,
chartType,
}: IGetChartDataInput & { timezone: string }) {
const {
sb,
@@ -209,6 +210,17 @@ export function getChartSql({
return sql;
}
// Add total unique count for user segment using a scalar subquery
if (event.segment === 'user') {
const totalUniqueSubquery = `(
SELECT ${sb.select.count}
FROM ${sb.from}
${getJoins()}
${getWhere()}
)`;
sb.select.total_unique_count = `${totalUniqueSubquery} as total_count`;
}
const sql = `${getSelect()} ${getFrom()} ${getJoins()} ${getWhere()} ${getGroupBy()} ${getOrderBy()} ${getFill()}`;
console.log('-- Report --');
console.log(sql.replaceAll(/[\n\r]/g, ' '));

View File

@@ -1,4 +1,4 @@
import { cacheable } from '@openpanel/redis';
import { cacheable, cacheableLru } from '@openpanel/redis';
import type { Client, Prisma } from '../prisma-client';
import { db } from '../prisma-client';
@@ -34,4 +34,7 @@ export async function getClientById(
});
}
export const getClientByIdCached = cacheable(getClientById, 60 * 60 * 24);
export const getClientByIdCached = cacheableLru(getClientById, {
maxSize: 1000,
ttl: 60 * 5,
});

View File

@@ -0,0 +1,66 @@
import { TABLE_NAMES, ch, getReplicatedTableName } from '../clickhouse/client';
import { logger } from '../logger';
import { db } from '../prisma-client';
import sqlstring from 'sqlstring';
export async function deleteOrganization(organizationId: string) {
return await db.organization.delete({
where: {
id: organizationId,
},
});
}
export async function deleteProjects(projectIds: string[]) {
const projects = await db.project.findMany({
where: {
id: {
in: projectIds,
},
},
});
if (projects.length === 0) {
return;
}
for (const project of projects) {
await db.project.delete({
where: {
id: project.id,
},
});
}
return projects;
}
export async function deleteFromClickhouse(projectIds: string[]) {
const where = `project_id IN (${projectIds.map((projectId) => sqlstring.escape(projectId)).join(',')})`;
const tables = [
TABLE_NAMES.events,
TABLE_NAMES.profiles,
TABLE_NAMES.events_bots,
TABLE_NAMES.sessions,
TABLE_NAMES.cohort_events_mv,
TABLE_NAMES.dau_mv,
TABLE_NAMES.event_names_mv,
TABLE_NAMES.event_property_values_mv,
];
for (const table of tables) {
// If materialized view, use ALTER TABLE since DELETE is not supported
const query = table.endsWith('_mv')
? `ALTER TABLE ${getReplicatedTableName(table)} DELETE WHERE ${where};`
: `DELETE FROM ${getReplicatedTableName(table)} WHERE ${where};`;
logger.info('Deleting from ClickHouse table:', { query });
await ch.command({
query,
clickhouse_settings: {
lightweight_deletes_sync: '0',
},
});
}
}

View File

@@ -19,12 +19,9 @@ import type { EventMeta, Prisma } from '../prisma-client';
import { db } from '../prisma-client';
import { type SqlBuilderObject, createSqlBuilder } from '../sql-builder';
import { getEventFiltersWhereClause } from './chart.service';
import { getOrganizationByProjectIdCached } from './organization.service';
import type { IServiceProfile, IServiceUpsertProfile } from './profile.service';
import {
getProfileById,
getProfileByIdCached,
getProfiles,
getProfilesCached,
upsertProfile,
} from './profile.service';
@@ -156,8 +153,6 @@ export interface IServiceEvent {
properties: Record<string, unknown> & {
hash?: string;
query?: Record<string, unknown>;
__reqId?: string;
__user_agent?: string;
};
createdAt: Date;
country?: string | undefined;
@@ -343,7 +338,7 @@ export async function createEvent(payload: IServiceCreateEventPayload) {
sdk_version: payload.sdkVersion ?? '',
};
await Promise.all([sessionBuffer.add(event), eventBuffer.add(event)]);
const promises = [sessionBuffer.add(event), eventBuffer.add(event)];
if (payload.profileId) {
const profile: IServiceUpsertProfile = {
@@ -374,10 +369,12 @@ export async function createEvent(payload: IServiceCreateEventPayload) {
profile.isExternal ||
(profile.isExternal === false && payload.name === 'session_start')
) {
await upsertProfile(profile, true);
promises.push(upsertProfile(profile, true));
}
}
await Promise.all(promises);
return {
document: event,
};
@@ -395,6 +392,7 @@ export interface GetEventListOptions {
endDate?: Date;
select?: SelectHelper<IServiceEvent>;
custom?: (sb: SqlBuilderObject) => void;
dateIntervalInDays?: number;
}
export async function getEventList(options: GetEventListOptions) {
@@ -408,28 +406,28 @@ export async function getEventList(options: GetEventListOptions) {
filters,
startDate,
endDate,
select: incomingSelect,
custom,
select: incomingSelect,
dateIntervalInDays = 0.5,
} = options;
const { sb, getSql, join } = createSqlBuilder();
const organization = await getOrganizationByProjectIdCached(projectId);
// This will speed up the query quite a lot for big organizations
const dateIntervalInDays =
organization?.subscriptionPeriodEventsLimit &&
organization?.subscriptionPeriodEventsLimit > 1_000_000
? 1
: 7;
const MAX_DATE_INTERVAL_IN_DAYS = 365;
// Cap the date interval to prevent infinity
const safeDateIntervalInDays = Math.min(
dateIntervalInDays,
MAX_DATE_INTERVAL_IN_DAYS,
);
if (typeof cursor === 'number') {
sb.offset = Math.max(0, (cursor ?? 0) * take);
} else if (cursor instanceof Date) {
sb.where.cursorWindow = `created_at >= toDateTime64(${sqlstring.escape(formatClickhouseDate(cursor))}, 3) - INTERVAL ${dateIntervalInDays} DAY`;
sb.where.cursorWindow = `created_at >= toDateTime64(${sqlstring.escape(formatClickhouseDate(cursor))}, 3) - INTERVAL ${safeDateIntervalInDays} DAY`;
sb.where.cursor = `created_at <= ${sqlstring.escape(formatClickhouseDate(cursor))}`;
}
if (!cursor) {
sb.where.cursorWindow = `created_at >= toDateTime64(${sqlstring.escape(formatClickhouseDate(new Date()))}, 3) - INTERVAL ${dateIntervalInDays} DAY`;
sb.where.cursorWindow = `created_at >= toDateTime64(${sqlstring.escape(formatClickhouseDate(new Date()))}, 3) - INTERVAL ${safeDateIntervalInDays} DAY`;
}
sb.limit = take;
@@ -453,6 +451,9 @@ export async function getEventList(options: GetEventListOptions) {
incomingSelect ?? {},
);
sb.select.createdAt = 'created_at';
sb.select.projectId = 'project_id';
if (select.id) {
sb.select.id = 'id';
}
@@ -474,9 +475,6 @@ export async function getEventList(options: GetEventListOptions) {
if (select.properties) {
sb.select.properties = 'properties';
}
if (select.createdAt) {
sb.select.createdAt = 'created_at';
}
if (select.country) {
sb.select.country = 'country';
}
@@ -583,21 +581,20 @@ export async function getEventList(options: GetEventListOptions) {
custom(sb);
}
console.log('getSql()', getSql());
const data = await getEvents(getSql(), {
profile: select.profile ?? true,
meta: select.meta ?? true,
});
// If we dont get any events, try without the cursor window
if (data.length === 0 && sb.where.cursorWindow) {
if (
data.length === 0 &&
sb.where.cursorWindow &&
safeDateIntervalInDays < MAX_DATE_INTERVAL_IN_DAYS
) {
return getEventList({
...options,
custom(sb) {
options.custom?.(sb);
delete sb.where.cursorWindow;
},
dateIntervalInDays: dateIntervalInDays * 2,
});
}
@@ -945,7 +942,7 @@ class EventService {
]);
if (event?.profileId) {
const profile = await getProfileByIdCached(event?.profileId, projectId);
const profile = await getProfileById(event?.profileId, projectId);
if (profile) {
event.profile = profile;
}

View File

@@ -13,7 +13,7 @@ import type {
IServiceCreateEventPayload,
IServiceEvent,
} from './event.service';
import { getProfileById, getProfileByIdCached } from './profile.service';
import { getProfileById } from './profile.service';
import { getProjectByIdCached } from './project.service';
type ICreateNotification = Pick<
@@ -264,10 +264,7 @@ export async function checkNotificationRulesForEvent(
payload.profileId &&
rules.some((rule) => rule.template?.match(/{{profile\.[^}]*}}/))
) {
const profile = await getProfileByIdCached(
payload.profileId,
payload.projectId,
);
const profile = await getProfileById(payload.profileId, payload.projectId);
if (profile) {
(payload as any).profile = profile;
}

View File

@@ -218,6 +218,7 @@ export async function getOrganizationBillingEventsCount(
sb.select.count = 'COUNT(*) AS count';
sb.where.projectIds = `project_id IN (${organization.projects.map((project) => sqlstring.escape(project.id)).join(',')})`;
sb.where.createdAt = `created_at BETWEEN ${sqlstring.escape(formatClickhouseDate(organization.subscriptionCurrentPeriodStart))} AND ${sqlstring.escape(formatClickhouseDate(organization.subscriptionCurrentPeriodEnd))}`;
sb.where.names = `name NOT IN ('session_start', 'session_end')`;
const res = await chQuery<{ count: number }>(getSql());
return res[0]?.count;
@@ -242,6 +243,7 @@ export async function getOrganizationBillingEventsCountSerie(
sb.orderBy.day = `${interval} WITH FILL FROM toDate(${sqlstring.escape(formatClickhouseDate(startDate, true))}) TO toDate(${sqlstring.escape(formatClickhouseDate(endDate, true))}) STEP INTERVAL 1 ${interval.toUpperCase()}`;
sb.where.projectIds = `project_id IN (${organization.projects.map((project) => sqlstring.escape(project.id)).join(',')})`;
sb.where.createdAt = `${interval} BETWEEN ${sqlstring.escape(formatClickhouseDate(startDate, true))} AND ${sqlstring.escape(formatClickhouseDate(endDate, true))}`;
sb.where.names = `name NOT IN ('session_start', 'session_end')`;
const res = await chQuery<{ count: number; day: string }>(getSql());
return res;

View File

@@ -83,62 +83,12 @@ export type IGetTopGenericInput = z.infer<typeof zGetTopGenericInput> & {
};
export class OverviewService {
private pendingQueries: Map<string, Promise<number | null>> = new Map();
constructor(private client: typeof ch) {}
isPageFilter(filters: IChartEventFilter[]) {
return filters.some((filter) => filter.name === 'path' && filter.value);
}
getTotalSessions({
projectId,
startDate,
endDate,
filters,
timezone,
}: {
projectId: string;
startDate: string;
endDate: string;
filters: IChartEventFilter[];
timezone: string;
}) {
const where = this.getRawWhereClause('sessions', filters);
const key = `total_sessions_${projectId}_${startDate}_${endDate}_${JSON.stringify(filters)}`;
// Check if there's already a pending query for this key
const pendingQuery = this.pendingQueries.get(key);
if (pendingQuery) {
return pendingQuery.then((res) => res ?? 0);
}
// Create new query promise and store it
const queryPromise = getCache(key, 15, async () => {
try {
const result = await clix(this.client, timezone)
.select<{
total_sessions: number;
}>(['sum(sign) as total_sessions'])
.from(TABLE_NAMES.sessions, true)
.where('project_id', '=', projectId)
.where('created_at', 'BETWEEN', [
clix.datetime(startDate, 'toDateTime'),
clix.datetime(endDate, 'toDateTime'),
])
.rawWhere(where)
.having('sum(sign)', '>', 0)
.execute();
return result?.[0]?.total_sessions ?? 0;
} catch (error) {
return 0;
}
});
this.pendingQueries.set(key, queryPromise);
return queryPromise;
}
getMetrics({
projectId,
filters,
@@ -483,14 +433,6 @@ export class OverviewService {
.orderBy('sessions', 'DESC')
.limit(limit);
const totalSessions = await this.getTotalSessions({
projectId,
startDate,
endDate,
filters,
timezone,
});
return mainQuery.execute();
}
@@ -556,14 +498,6 @@ export class OverviewService {
);
}
const totalSessions = await this.getTotalSessions({
projectId,
startDate,
endDate,
filters,
timezone,
});
return mainQuery.execute();
}
@@ -666,16 +600,7 @@ export class OverviewService {
mainQuery.rawWhere(this.getRawWhereClause('sessions', filters));
}
const [res, totalSessions] = await Promise.all([
mainQuery.execute(),
this.getTotalSessions({
projectId,
startDate,
endDate,
filters,
timezone,
}),
]);
const res = await mainQuery.execute();
return res;
}

View File

@@ -106,6 +106,11 @@ export async function getProfileById(id: string, projectId: string) {
return null;
}
const cachedProfile = await profileBuffer.fetchFromCache(id, projectId);
if (cachedProfile) {
return transformProfile(cachedProfile);
}
const [profile] = await chQuery<IClickhouseProfile>(
`SELECT
id,
@@ -127,8 +132,6 @@ export async function getProfileById(id: string, projectId: string) {
return transformProfile(profile);
}
export const getProfileByIdCached = cacheable(getProfileById, 60 * 30);
interface GetProfileListOptions {
projectId: string;
take: number;
@@ -306,10 +309,5 @@ export async function upsertProfile(
is_external: isExternal,
};
if (!isFromEvent) {
// Save to cache directly since the profile might be used before its saved in clickhouse
getProfileByIdCached.set(id, projectId)(transformProfile(profile));
}
return profileBuffer.add(profile, isFromEvent);
}

View File

@@ -104,7 +104,7 @@ export async function getProjects({
export const getProjectEventsCount = async (projectId: string) => {
const res = await chQuery<{ count: number }>(
`SELECT count(*) as count FROM ${TABLE_NAMES.events} WHERE project_id = ${sqlstring.escape(projectId)}`,
`SELECT count(*) as count FROM ${TABLE_NAMES.events} WHERE project_id = ${sqlstring.escape(projectId)} AND name NOT IN ('session_start', 'session_end')`,
);
return res[0]?.count;
};

View File

@@ -1,6 +1,6 @@
import { generateSalt } from '@openpanel/common/server';
import { getRedisCache } from '@openpanel/redis';
import { cacheableLru } from '@openpanel/redis';
import { db } from '../prisma-client';
export async function getCurrentSalt() {
@@ -17,36 +17,36 @@ export async function getCurrentSalt() {
return salt.salt;
}
export async function getSalts() {
const cache = await getRedisCache().get('op:salt');
if (cache) {
return JSON.parse(cache);
}
export const getSalts = cacheableLru(
'op:salt',
async () => {
const [curr, prev] = await db.salt.findMany({
orderBy: {
createdAt: 'desc',
},
take: 2,
});
const [curr, prev] = await db.salt.findMany({
orderBy: {
createdAt: 'desc',
},
take: 2,
});
if (!curr) {
throw new Error('No salt found');
}
if (!curr) {
throw new Error('No salt found');
}
if (!prev) {
throw new Error('No salt found');
}
if (!prev) {
throw new Error('No salt found');
}
const salts = {
current: curr.salt,
previous: prev.salt,
};
const salts = {
current: curr.salt,
previous: prev.salt,
};
await getRedisCache().set('op:salt', JSON.stringify(salts), 'EX', 60 * 10);
return salts;
}
return salts;
},
{
maxSize: 2,
ttl: 60 * 5,
},
);
export async function createInitialSalts() {
const MAX_RETRIES = 5;

View File

@@ -7,14 +7,15 @@
"codegen": "jiti scripts/download.ts"
},
"dependencies": {
"@maxmind/geoip2-node": "^6.1.0"
"@maxmind/geoip2-node": "^6.1.0",
"lru-cache": "^11.2.2"
},
"devDependencies": {
"@openpanel/tsconfig": "workspace:*",
"@types/node": "catalog:",
"fast-extract": "^1.4.3",
"jiti": "^2.4.1",
"tar": "^7.4.3",
"typescript": "catalog:",
"jiti": "^2.4.1"
"typescript": "catalog:"
}
}

View File

@@ -2,11 +2,12 @@ import { readFile } from 'node:fs/promises';
import path from 'node:path';
import { dirname } from 'node:path';
import { fileURLToPath } from 'node:url';
import type { ReaderModel } from '@maxmind/geoip2-node';
import { Reader } from '@maxmind/geoip2-node';
import { LRUCache } from 'lru-cache';
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
import type { ReaderModel } from '@maxmind/geoip2-node';
import { Reader } from '@maxmind/geoip2-node';
const filename = 'GeoLite2-City.mmdb';
// From api or worker package
@@ -50,24 +51,37 @@ const DEFAULT_GEO: GeoLocation = {
const ignore = ['127.0.0.1', '::1'];
const cache = new LRUCache<string, GeoLocation>({
max: 1000,
ttl: 1000 * 60 * 5,
ttlAutopurge: true,
});
export async function getGeoLocation(ip?: string): Promise<GeoLocation> {
if (!ip || ignore.includes(ip)) {
return DEFAULT_GEO;
}
const cached = cache.get(ip);
if (cached) {
return cached;
}
if (!reader) {
await loadDatabase(dbPath);
}
try {
const response = await reader?.city(ip);
return {
const res = {
city: response?.city?.names.en,
country: response?.country?.isoCode,
region: response?.subdivisions?.[0]?.names.en,
longitude: response?.location?.longitude,
latitude: response?.location?.latitude,
};
cache.set(ip, res);
return res;
} catch (error) {
return DEFAULT_GEO;
}

View File

@@ -6,9 +6,12 @@ export { winston };
export type ILogger = winston.Logger;
const logLevel = process.env.LOG_LEVEL ?? 'info';
const silent = process.env.LOG_SILENT === 'true';
export function createLogger({ name }: { name: string }): ILogger {
const service = `${name}-${process.env.NODE_ENV ?? 'dev'}`;
const service = [process.env.LOG_PREFIX, name, process.env.NODE_ENV ?? 'dev']
.filter(Boolean)
.join('-');
const prettyError = (error: Error) => ({
...error,
@@ -64,13 +67,9 @@ export function createLogger({ name }: { name: string }): ILogger {
return Object.assign({}, info, redactObject(info));
});
const format = winston.format.combine(
errorFormatter(),
redactSensitiveInfo(),
winston.format.json(),
);
const transports: winston.transport[] = [];
let format: winston.Logform.Format;
const transports: winston.transport[] = [new winston.transports.Console()];
if (process.env.HYPERDX_API_KEY) {
transports.push(
HyperDX.getWinstonTransport(logLevel, {
@@ -78,6 +77,24 @@ export function createLogger({ name }: { name: string }): ILogger {
service,
}),
);
format = winston.format.combine(
errorFormatter(),
redactSensitiveInfo(),
winston.format.json(),
);
} else {
transports.push(new winston.transports.Console());
format = winston.format.combine(
errorFormatter(),
redactSensitiveInfo(),
winston.format.colorize(),
winston.format.printf((info) => {
const { level, message, service, ...meta } = info;
const metaStr =
Object.keys(meta).length > 0 ? ` ${JSON.stringify(meta)}` : '';
return `${level} ${message}${metaStr}`;
}),
);
}
const logger = winston.createLogger({
@@ -85,7 +102,7 @@ export function createLogger({ name }: { name: string }): ILogger {
level: logLevel,
format,
transports,
silent: process.env.NODE_ENV === 'test',
silent,
// Add ISO levels of logging from PINO
levels: Object.assign(
{ fatal: 0, warn: 4, trace: 7 },

View File

@@ -10,8 +10,8 @@
"@openpanel/db": "workspace:*",
"@openpanel/logger": "workspace:*",
"@openpanel/redis": "workspace:*",
"bullmq": "^5.8.7",
"groupmq": "1.0.0-next.19"
"bullmq": "^5.63.0",
"groupmq": "1.1.0-next.6"
},
"devDependencies": {
"@openpanel/sdk": "workspace:*",

View File

@@ -1,5 +1,6 @@
import { Queue, QueueEvents } from 'bullmq';
import { createHash } from 'node:crypto';
import type {
IServiceCreateEventPayload,
IServiceEvent,
@@ -10,6 +11,21 @@ import { getRedisGroupQueue, getRedisQueue } from '@openpanel/redis';
import type { TrackPayload } from '@openpanel/sdk';
import { Queue as GroupQueue } from 'groupmq';
export const EVENTS_GROUP_QUEUES_SHARDS = Number.parseInt(
process.env.EVENTS_GROUP_QUEUES_SHARDS || '1',
10,
);
export const getQueueName = (name: string) =>
process.env.QUEUE_CLUSTER ? `{${name}}` : name;
function pickShard(projectId: string) {
const h = createHash('sha1').update(projectId).digest(); // 20 bytes
// take first 4 bytes as unsigned int
const x = h.readUInt32BE(0);
return x % EVENTS_GROUP_QUEUES_SHARDS; // 0..n-1
}
export const queueLogger = createLogger({ name: 'queue' });
export interface EventsQueuePayloadIncomingEvent {
@@ -17,9 +33,30 @@ export interface EventsQueuePayloadIncomingEvent {
payload: {
projectId: string;
event: TrackPayload & {
timestamp: string;
timestamp: string | number;
isTimestampFromThePast: boolean;
};
uaInfo:
| {
readonly isServer: true;
readonly device: 'server';
readonly os: '';
readonly osVersion: '';
readonly browser: '';
readonly browserVersion: '';
readonly brand: '';
readonly model: '';
}
| {
readonly os: string | undefined;
readonly osVersion: string | undefined;
readonly browser: string | undefined;
readonly browserVersion: string | undefined;
readonly device: string;
readonly brand: string | undefined;
readonly model: string | undefined;
readonly isServer: false;
};
geo: {
country: string | undefined;
city: string | undefined;
@@ -93,54 +130,70 @@ export type MiscQueuePayload = MiscQueuePayloadTrialEndingSoon;
export type CronQueueType = CronQueuePayload['type'];
const orderingWindowMs = Number.parseInt(
process.env.ORDERING_WINDOW_MS || '50',
10,
);
const orderingGracePeriodDecay = Number.parseFloat(
process.env.ORDERING_GRACE_PERIOD_DECAY || '0.9',
);
const orderingMaxWaitMultiplier = Number.parseInt(
process.env.ORDERING_MAX_WAIT_MULTIPLIER || '8',
const orderingDelayMs = Number.parseInt(
process.env.ORDERING_DELAY_MS || '100',
10,
);
export const eventsGroupQueue = new GroupQueue<
EventsQueuePayloadIncomingEvent['payload']
>({
logger: queueLogger,
namespace: 'group_events',
redis: getRedisGroupQueue(),
orderingMethod: 'in-memory',
orderingWindowMs,
orderingGracePeriodDecay,
orderingMaxWaitMultiplier,
keepCompleted: 10,
keepFailed: 10_000,
});
const autoBatchMaxWaitMs = Number.parseInt(
process.env.AUTO_BATCH_MAX_WAIT_MS || '0',
10,
);
const autoBatchSize = Number.parseInt(process.env.AUTO_BATCH_SIZE || '0', 10);
export const sessionsQueue = new Queue<SessionsQueuePayload>('sessions', {
// @ts-ignore
connection: getRedisQueue(),
defaultJobOptions: {
removeOnComplete: 10,
export const eventsGroupQueues = Array.from({
length: EVENTS_GROUP_QUEUES_SHARDS,
}).map(
(_, index, list) =>
new GroupQueue<EventsQueuePayloadIncomingEvent['payload']>({
logger: queueLogger,
namespace: getQueueName(
list.length === 1 ? 'group_events' : `group_events_${index}`,
),
redis: getRedisGroupQueue(),
keepCompleted: 1_000,
keepFailed: 10_000,
orderingDelayMs: orderingDelayMs,
autoBatch:
autoBatchMaxWaitMs && autoBatchSize
? {
maxWaitMs: autoBatchMaxWaitMs,
size: autoBatchSize,
}
: undefined,
}),
);
export const getEventsGroupQueueShard = (groupId: string) => {
const shard = pickShard(groupId);
const queue = eventsGroupQueues[shard];
if (!queue) {
throw new Error(`Queue not found for group ${groupId}`);
}
return queue;
};
export const sessionsQueue = new Queue<SessionsQueuePayload>(
getQueueName('sessions'),
{
connection: getRedisQueue(),
defaultJobOptions: {
removeOnComplete: 10,
},
},
});
export const sessionsQueueEvents = new QueueEvents('sessions', {
// @ts-ignore
);
export const sessionsQueueEvents = new QueueEvents(getQueueName('sessions'), {
connection: getRedisQueue(),
});
export const cronQueue = new Queue<CronQueuePayload>('cron', {
// @ts-ignore
export const cronQueue = new Queue<CronQueuePayload>(getQueueName('cron'), {
connection: getRedisQueue(),
defaultJobOptions: {
removeOnComplete: 10,
},
});
export const miscQueue = new Queue<MiscQueuePayload>('misc', {
// @ts-ignore
export const miscQueue = new Queue<MiscQueuePayload>(getQueueName('misc'), {
connection: getRedisQueue(),
defaultJobOptions: {
removeOnComplete: 10,
@@ -155,9 +208,8 @@ export type NotificationQueuePayload = {
};
export const notificationQueue = new Queue<NotificationQueuePayload>(
'notification',
getQueueName('notification'),
{
// @ts-ignore
connection: getRedisQueue(),
defaultJobOptions: {
removeOnComplete: 10,
@@ -172,13 +224,16 @@ export type ImportQueuePayload = {
};
};
export const importQueue = new Queue<ImportQueuePayload>('import', {
connection: getRedisQueue(),
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 50,
export const importQueue = new Queue<ImportQueuePayload>(
getQueueName('import'),
{
connection: getRedisQueue(),
defaultJobOptions: {
removeOnComplete: 10,
removeOnFail: 50,
},
},
});
);
export function addTrialEndingSoonJob(organizationId: string, delay: number) {
return miscQueue.add(

View File

@@ -446,12 +446,6 @@ describe('cachable', () => {
expect(cached).toBe(JSON.stringify(payload));
});
it('should throw error when function is not provided', () => {
expect(() => {
cacheable('test', 3600);
}).toThrow('fn is not a function');
});
it('should throw error when expire time is not provided', () => {
const fn = async (arg1: string, arg2: string) => ({});
expect(() => {

View File

@@ -1,17 +1,34 @@
import { LRUCache } from 'lru-cache';
import { getRedisCache } from './redis';
export const deleteCache = async (key: string) => {
return getRedisCache().del(key);
};
// Global LRU cache for getCache function
const globalLruCache = new LRUCache<string, any>({
max: 5000, // Store up to 5000 entries
ttl: 1000 * 60, // 1 minutes default TTL
});
export async function getCache<T>(
key: string,
expireInSec: number,
fn: () => Promise<T>,
useLruCache?: boolean,
): Promise<T> {
// L1 Cache: Check global LRU cache first (in-memory, instant)
if (useLruCache) {
const lruHit = globalLruCache.get(key);
if (lruHit !== undefined) {
return lruHit as T;
}
}
// L2 Cache: Check Redis cache (shared across instances)
const hit = await getRedisCache().get(key);
if (hit) {
return JSON.parse(hit, (_, value) => {
const parsed = JSON.parse(hit, (_, value) => {
if (
typeof value === 'string' &&
/^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}.*Z$/.test(value)
@@ -20,13 +37,49 @@ export async function getCache<T>(
}
return value;
});
// Store in LRU cache for next time
if (useLruCache) {
globalLruCache.set(key, parsed, {
ttl: expireInSec * 1000, // Use the same TTL as Redis
});
}
return parsed;
}
// Cache miss: Execute function
const data = await fn();
await getRedisCache().setex(key, expireInSec, JSON.stringify(data));
// Store in both caches
if (useLruCache) {
globalLruCache.set(key, data, {
ttl: expireInSec * 1000,
});
}
// Fire and forget Redis write for better performance
getRedisCache().setex(key, expireInSec, JSON.stringify(data));
return data;
}
// Helper functions for managing global LRU cache
export function clearGlobalLruCache(key?: string) {
if (key) {
return globalLruCache.delete(key);
}
globalLruCache.clear();
return true;
}
export function getGlobalLruCacheStats() {
return {
size: globalLruCache.size,
max: globalLruCache.max,
calculatedSize: globalLruCache.calculatedSize,
};
}
function stringify(obj: unknown): string {
if (obj === null) return 'null';
if (obj === undefined) return 'undefined';
@@ -75,6 +128,39 @@ function hasResult(result: unknown): boolean {
return true;
}
export interface CacheableLruOptions {
/** TTL in seconds for LRU cache */
ttl: number;
/** Maximum number of entries in LRU cache */
maxSize?: number;
}
// Overload 1: cacheable(fn, expireInSec)
export function cacheable<T extends (...args: any) => any>(
fn: T,
expireInSec: number,
): T & {
getKey: (...args: Parameters<T>) => string;
clear: (...args: Parameters<T>) => Promise<number>;
set: (
...args: Parameters<T>
) => (payload: Awaited<ReturnType<T>>) => Promise<'OK'>;
};
// Overload 2: cacheable(name, fn, expireInSec)
export function cacheable<T extends (...args: any) => any>(
name: string,
fn: T,
expireInSec: number,
): T & {
getKey: (...args: Parameters<T>) => string;
clear: (...args: Parameters<T>) => Promise<number>;
set: (
...args: Parameters<T>
) => (payload: Awaited<ReturnType<T>>) => Promise<'OK'>;
};
// Implementation for cacheable (Redis-only - async)
export function cacheable<T extends (...args: any) => any>(
fnOrName: T | string,
fnOrExpireInSec: number | T,
@@ -87,12 +173,17 @@ export function cacheable<T extends (...args: any) => any>(
: typeof fnOrExpireInSec === 'function'
? fnOrExpireInSec
: null;
const expireInSec =
typeof fnOrExpireInSec === 'number'
? fnOrExpireInSec
: typeof _expireInSec === 'number'
? _expireInSec
: null;
let expireInSec: number | null = null;
// Parse parameters based on function signature
if (typeof fnOrName === 'function') {
// Overload 1: cacheable(fn, expireInSec)
expireInSec = typeof fnOrExpireInSec === 'number' ? fnOrExpireInSec : null;
} else {
// Overload 2: cacheable(name, fn, expireInSec)
expireInSec = typeof _expireInSec === 'number' ? _expireInSec : null;
}
if (typeof fn !== 'function') {
throw new Error('fn is not a function');
@@ -105,11 +196,14 @@ export function cacheable<T extends (...args: any) => any>(
const cachePrefix = `cachable:${name}`;
const getKey = (...args: Parameters<T>) =>
`${cachePrefix}:${stringify(args)}`;
// Redis-only mode: asynchronous implementation
const cachedFn = async (
...args: Parameters<T>
): Promise<Awaited<ReturnType<T>>> => {
// JSON.stringify here is not bullet proof since ordering of object keys matters etc
const key = getKey(...args);
// Check Redis cache (shared across instances)
const cached = await getRedisCache().get(key);
if (cached) {
try {
@@ -129,10 +223,15 @@ export function cacheable<T extends (...args: any) => any>(
console.error('Failed to parse cache', e);
}
}
// Cache miss: Execute function
const result = await fn(...(args as any));
if (hasResult(result)) {
getRedisCache().setex(key, expireInSec, JSON.stringify(result));
// Don't await Redis write - fire and forget for better performance
getRedisCache()
.setex(key, expireInSec, JSON.stringify(result))
.catch(() => {});
}
return result;
@@ -147,7 +246,134 @@ export function cacheable<T extends (...args: any) => any>(
(...args: Parameters<T>) =>
async (payload: Awaited<ReturnType<T>>) => {
const key = getKey(...args);
return getRedisCache().setex(key, expireInSec, JSON.stringify(payload));
return getRedisCache()
.setex(key, expireInSec, JSON.stringify(payload))
.catch(() => {});
};
return cachedFn;
}
// Overload 1: cacheableLru(fn, options)
export function cacheableLru<T extends (...args: any) => any>(
fn: T,
options: CacheableLruOptions,
): T & {
getKey: (...args: Parameters<T>) => string;
clear: (...args: Parameters<T>) => boolean;
set: (...args: Parameters<T>) => (payload: ReturnType<T>) => void;
};
// Overload 2: cacheableLru(name, fn, options)
export function cacheableLru<T extends (...args: any) => any>(
name: string,
fn: T,
options: CacheableLruOptions,
): T & {
getKey: (...args: Parameters<T>) => string;
clear: (...args: Parameters<T>) => boolean;
set: (...args: Parameters<T>) => (payload: ReturnType<T>) => void;
};
// Implementation for cacheableLru (LRU-only - synchronous)
export function cacheableLru<T extends (...args: any) => any>(
fnOrName: T | string,
fnOrOptions: T | CacheableLruOptions,
_options?: CacheableLruOptions,
) {
const name = typeof fnOrName === 'string' ? fnOrName : fnOrName.name;
const fn =
typeof fnOrName === 'function'
? fnOrName
: typeof fnOrOptions === 'function'
? fnOrOptions
: null;
let options: CacheableLruOptions;
// Parse parameters based on function signature
if (typeof fnOrName === 'function') {
// Overload 1: cacheableLru(fn, options)
options =
typeof fnOrOptions === 'object' && fnOrOptions !== null
? fnOrOptions
: ({} as CacheableLruOptions);
} else {
// Overload 2: cacheableLru(name, fn, options)
options =
typeof _options === 'object' && _options !== null
? _options
: ({} as CacheableLruOptions);
}
if (typeof fn !== 'function') {
throw new Error('fn is not a function');
}
if (typeof options.ttl !== 'number') {
throw new Error('options.ttl is required and must be a number');
}
const cachePrefix = `cachable:${name}`;
const getKey = (...args: Parameters<T>) =>
`${cachePrefix}:${stringify(args)}`;
const maxSize = options.maxSize ?? 1000;
const ttl = options.ttl;
// Create function-specific LRU cache
const functionLruCache = new LRUCache<string, any>({
max: maxSize,
ttl: ttl * 1000, // Convert seconds to milliseconds for LRU
});
// LRU-only mode: synchronous implementation (or returns promise if fn is async)
const cachedFn = ((...args: Parameters<T>): ReturnType<T> => {
const key = getKey(...args);
// Check LRU cache
const lruHit = functionLruCache.get(key);
if (lruHit !== undefined && hasResult(lruHit)) {
return lruHit as ReturnType<T>;
}
// Cache miss: Execute function
const result = fn(...(args as any)) as ReturnType<T>;
// If result is a Promise, handle it asynchronously but cache the resolved value
if (result && typeof (result as any).then === 'function') {
return (result as Promise<any>).then((resolved: any) => {
if (hasResult(resolved)) {
functionLruCache.set(key, resolved);
}
return resolved;
}) as ReturnType<T>;
}
// Synchronous result: cache and return
if (hasResult(result)) {
functionLruCache.set(key, result);
}
return result;
}) as T & {
getKey: (...args: Parameters<T>) => string;
clear: (...args: Parameters<T>) => boolean;
set: (...args: Parameters<T>) => (payload: ReturnType<T>) => void;
};
cachedFn.getKey = getKey;
cachedFn.clear = (...args: Parameters<T>) => {
const key = getKey(...args);
return functionLruCache.delete(key);
};
cachedFn.set =
(...args: Parameters<T>) =>
(payload: ReturnType<T>) => {
const key = getKey(...args);
if (hasResult(payload)) {
functionLruCache.set(key, payload);
}
};
return cachedFn;

Some files were not shown because too many files have changed in this diff Show More