feat(superSync): implement prisma and postgres

This commit is contained in:
Johannes Millan 2025-12-10 11:33:13 +01:00
parent 703dc8e3f0
commit b6fc65831c
10 changed files with 680 additions and 1080 deletions

View file

@ -1,4 +1,22 @@
services:
# PostgreSQL database for SuperSync
db:
image: postgres:15-alpine
restart: always
environment:
POSTGRES_USER: supersync
POSTGRES_PASSWORD: superpassword
POSTGRES_DB: supersync_db
ports:
- '5432:5432'
volumes:
- db_data:/var/lib/postgresql/data
healthcheck:
test: ['CMD-SHELL', 'pg_isready -U supersync']
interval: 5s
timeout: 5s
retries: 5
# SuperSync server for E2E testing
# Start with: docker compose up -d supersync
# Required for @supersync e2e tests
@ -8,6 +26,9 @@ services:
dockerfile: packages/super-sync-server/Dockerfile.test
ports:
- '1900:1900'
depends_on:
db:
condition: service_healthy
environment:
- PORT=1900
- TEST_MODE=true
@ -15,8 +36,8 @@ services:
- JWT_SECRET=e2e-test-secret-minimum-32-chars-long-for-security
- CORS_ORIGINS=*
- DATA_DIR=/data
tmpfs:
- /data # In-memory SQLite for test isolation
# Prisma connection string
- DATABASE_URL=postgresql://supersync:superpassword@db:5432/supersync_db
healthcheck:
test:
[

View file

@ -9,9 +9,6 @@
FROM node:22-alpine
# Install build dependencies for native modules (better-sqlite3)
RUN apk add --no-cache python3 make g++
WORKDIR /app
# Copy package files for workspace setup
@ -22,9 +19,6 @@ COPY packages/super-sync-server/package.json ./packages/super-sync-server/
# Install dependencies (ignore scripts to skip husky/prepare from root)
RUN npm ci --workspace=packages/shared-schema --workspace=packages/super-sync-server --ignore-scripts
# Rebuild better-sqlite3 native module for this platform
RUN cd /app/packages/super-sync-server && npm rebuild better-sqlite3
# Copy source files
COPY packages/shared-schema/ ./packages/shared-schema/
COPY packages/super-sync-server/ ./packages/super-sync-server/
@ -35,6 +29,8 @@ RUN npm run build
# Build super-sync-server
WORKDIR /app/packages/super-sync-server
# Generate Prisma Client
RUN npx prisma generate
# Clean dist folder to remove any stale files that might conflict
RUN rm -rf dist && npm run build
@ -50,4 +46,5 @@ ENV TEST_MODE_CONFIRM=yes-i-understand-the-risks
ENV CORS_ORIGINS=*
ENV JWT_SECRET=e2e-test-secret-minimum-32-chars-long-for-security
CMD ["node", "dist/index.js"]
# Push schema to DB and start server
CMD sh -c "npx prisma db push && node dist/index.js"

View file

@ -12,13 +12,13 @@
"clear-data": "ts-node scripts/clear-data.ts"
},
"dependencies": {
"@sp/shared-schema": "*",
"@fastify/cors": "^11.1.0",
"@fastify/helmet": "^13.0.2",
"@fastify/rate-limit": "^10.3.0",
"@fastify/static": "^8.3.0",
"@prisma/client": "5.22.0",
"@sp/shared-schema": "*",
"bcryptjs": "^3.0.3",
"better-sqlite3": "^12.4.6",
"dotenv": "^17.2.3",
"fastify": "^5.6.2",
"jsonwebtoken": "^9.0.2",
@ -34,6 +34,7 @@
"@types/nodemailer": "^7.0.4",
"@types/supertest": "^6.0.3",
"nodemon": "^3.1.11",
"prisma": "5.22.0",
"supertest": "^7.1.4",
"ts-node": "^10.9.1",
"typescript": "^5.0.0",

View file

@ -1,9 +1,10 @@
import { getDb, User } from './db';
import { prisma, User } from './db';
import * as bcrypt from 'bcryptjs';
import * as jwt from 'jsonwebtoken';
import { Logger } from './logger';
import { randomBytes } from 'crypto';
import { sendVerificationEmail } from './email';
import { Prisma } from '@prisma/client';
// Auth constants
const MIN_JWT_SECRET_LENGTH = 32;
@ -40,88 +41,95 @@ export const registerUser = async (
): Promise<{ message: string }> => {
// Password strength validation is handled by Zod in api.ts
const db = getDb();
const passwordHash = await bcrypt.hash(password, BCRYPT_ROUNDS);
const verificationToken = randomBytes(32).toString('hex');
const expiresAt = Date.now() + VERIFICATION_TOKEN_EXPIRY_MS;
const acceptedAt = termsAcceptedAt || Date.now();
const expiresAt = BigInt(Date.now() + VERIFICATION_TOKEN_EXPIRY_MS);
const acceptedAt = termsAcceptedAt ? BigInt(termsAcceptedAt) : BigInt(Date.now());
try {
const info = db
.prepare(
`
INSERT INTO users (email, password_hash, verification_token, verification_token_expires_at, terms_accepted_at)
VALUES (?, ?, ?, ?, ?)
`,
)
.run(email, passwordHash, verificationToken, expiresAt, acceptedAt);
const user = await prisma.user.create({
data: {
email,
passwordHash,
verificationToken,
verificationTokenExpiresAt: expiresAt,
termsAcceptedAt: acceptedAt,
},
});
Logger.info(`User registered (ID: ${info.lastInsertRowid})`);
Logger.info(`User registered (ID: ${user.id})`);
// Send verification email asynchronously
const emailSent = await sendVerificationEmail(email, verificationToken);
if (!emailSent) {
// Clean up the newly created account to prevent unusable, un-verifiable entries
try {
db.prepare('DELETE FROM users WHERE id = ?').run(info.lastInsertRowid);
Logger.info(`Cleaned up failed registration (ID: ${info.lastInsertRowid})`);
await prisma.user.delete({ where: { id: user.id } });
Logger.info(`Cleaned up failed registration (ID: ${user.id})`);
} catch (cleanupErr) {
// Log but don't mask the original email failure
Logger.error(
`Failed to clean up user ${info.lastInsertRowid} after email failure:`,
`Failed to clean up user ${user.id} after email failure:`,
cleanupErr,
);
}
throw new Error('Failed to send verification email. Please try again later.');
}
} catch (err: unknown) {
if ((err as { code?: string })?.code === 'SQLITE_CONSTRAINT_UNIQUE') {
const existingUser = db
.prepare('SELECT * FROM users WHERE email = ?')
.get(email) as User | undefined;
if (
err instanceof Prisma.PrismaClientKnownRequestError &&
err.code === 'P2002' // Unique constraint violation (email)
) {
const existingUser = await prisma.user.findUnique({
where: { email },
});
if (!existingUser) {
Logger.warn('Unique constraint hit but user not found');
} else if (existingUser.is_verified === 1) {
} else if (existingUser.isVerified === 1) {
Logger.info(
`Registration attempt for already verified account (ID: ${existingUser.id})`,
);
} else {
const now = BigInt(Date.now());
const tokenStillValid =
!!existingUser.verification_token &&
!!existingUser.verification_token_expires_at &&
existingUser.verification_token_expires_at > Date.now();
!!existingUser.verificationToken &&
!!existingUser.verificationTokenExpiresAt &&
existingUser.verificationTokenExpiresAt > now;
const newToken =
tokenStillValid && existingUser.verification_token
? existingUser.verification_token
tokenStillValid && existingUser.verificationToken
? existingUser.verificationToken
: randomBytes(32).toString('hex');
const newExpiresAt = tokenStillValid
? existingUser.verification_token_expires_at
: Date.now() + VERIFICATION_TOKEN_EXPIRY_MS;
const newExpiresAt =
tokenStillValid && existingUser.verificationTokenExpiresAt
? existingUser.verificationTokenExpiresAt
: BigInt(Date.now() + VERIFICATION_TOKEN_EXPIRY_MS);
const previousToken = existingUser.verification_token;
const previousExpiresAt = existingUser.verification_token_expires_at;
const previousResendCount = existingUser.verification_resend_count;
const previousToken = existingUser.verificationToken;
const previousExpiresAt = existingUser.verificationTokenExpiresAt;
const previousResendCount = existingUser.verificationResendCount;
db.prepare(
`
UPDATE users
SET verification_token = ?, verification_token_expires_at = ?, verification_resend_count = verification_resend_count + 1
WHERE id = ?
`,
).run(newToken, newExpiresAt, existingUser.id);
await prisma.user.update({
where: { id: existingUser.id },
data: {
verificationToken: newToken,
verificationTokenExpiresAt: newExpiresAt,
verificationResendCount: { increment: 1 },
},
});
const emailSent = await sendVerificationEmail(email, newToken);
if (!emailSent) {
try {
db.prepare(
`
UPDATE users
SET verification_token = ?, verification_token_expires_at = ?, verification_resend_count = ?
WHERE id = ?
`,
).run(previousToken, previousExpiresAt, previousResendCount, existingUser.id);
await prisma.user.update({
where: { id: existingUser.id },
data: {
verificationToken: previousToken,
verificationTokenExpiresAt: previousExpiresAt,
verificationResendCount: previousResendCount,
},
});
Logger.info(
`Rolled back token update for user ${existingUser.id} after email failure`,
);
@ -152,31 +160,31 @@ export const registerUser = async (
};
};
export const verifyEmail = (token: string): boolean => {
const db = getDb();
const user = db
.prepare('SELECT * FROM users WHERE verification_token = ?')
.get(token) as User | undefined;
export const verifyEmail = async (token: string): Promise<boolean> => {
const user = await prisma.user.findFirst({
where: { verificationToken: token },
});
if (!user) {
throw new Error('Invalid verification token');
}
if (
user.verification_token_expires_at &&
user.verification_token_expires_at < Date.now()
user.verificationTokenExpiresAt &&
user.verificationTokenExpiresAt < BigInt(Date.now())
) {
throw new Error('Verification token has expired');
}
db.prepare(
`
UPDATE users
SET is_verified = 1, verification_token = NULL, verification_token_expires_at = NULL, verification_resend_count = 0
WHERE id = ?
`,
).run(user.id);
await prisma.user.update({
where: { id: user.id },
data: {
isVerified: 1,
verificationToken: null,
verificationTokenExpiresAt: null,
verificationResendCount: 0,
},
});
Logger.info(`User verified (ID: ${user.id})`);
return true;
@ -186,15 +194,15 @@ export const loginUser = async (
email: string,
password: string,
): Promise<{ token: string; user: { id: number; email: string } }> => {
const db = getDb();
const user = db.prepare('SELECT * FROM users WHERE email = ?').get(email) as
| User
| undefined;
const user = await prisma.user.findUnique({
where: { email },
});
// Check if account is locked (do this after fetching user but before password check)
if (user && user.locked_until && user.locked_until > Date.now()) {
const remainingMinutes = Math.ceil((user.locked_until - Date.now()) / 60000);
if (user && user.lockedUntil && user.lockedUntil > BigInt(Date.now())) {
const remainingMinutes = Math.ceil(
Number(user.lockedUntil - BigInt(Date.now())) / 60000,
);
Logger.warn(
`Login attempt for locked account (ID: ${user.id}), ${remainingMinutes}min remaining`,
);
@ -205,24 +213,25 @@ export const loginUser = async (
// Timing attack mitigation: always perform a comparison
// Even if the user is not found, we hash and compare against a dummy hash.
// This ensures the response time is roughly the same for valid and invalid emails,
// preventing attackers from enumerating valid email addresses based on timing differences.
// This is a valid bcrypt hash (12 rounds) of the string "dummy"
const dummyHash = '$2a$12$R9h/cIPz0gi.URNNX3kh2OPST9/PgBkqquzi.Ss7KIUgO2t0jWMUW';
const hashToCompare = user ? user.password_hash : dummyHash;
const hashToCompare = user ? user.passwordHash : dummyHash;
const isMatch = await bcrypt.compare(password, hashToCompare);
if (!user || !isMatch) {
// Increment failed attempts if user exists
if (user) {
const newFailedAttempts = (user.failed_login_attempts || 0) + 1;
const newFailedAttempts = (user.failedLoginAttempts || 0) + 1;
const shouldLock = newFailedAttempts >= MAX_FAILED_LOGIN_ATTEMPTS;
const lockedUntil = shouldLock ? Date.now() + LOCKOUT_DURATION_MS : null;
const lockedUntil = shouldLock ? BigInt(Date.now() + LOCKOUT_DURATION_MS) : null;
db.prepare(
'UPDATE users SET failed_login_attempts = ?, locked_until = ? WHERE id = ?',
).run(newFailedAttempts, lockedUntil, user.id);
await prisma.user.update({
where: { id: user.id },
data: {
failedLoginAttempts: newFailedAttempts,
lockedUntil,
},
});
if (shouldLock) {
Logger.warn(
@ -237,19 +246,23 @@ export const loginUser = async (
throw new Error('Invalid credentials');
}
if (user.is_verified === 0) {
if (user.isVerified === 0) {
throw new Error('Email not verified');
}
// Reset failed attempts on successful login
if (user.failed_login_attempts > 0 || user.locked_until) {
db.prepare(
'UPDATE users SET failed_login_attempts = 0, locked_until = NULL WHERE id = ?',
).run(user.id);
if (user.failedLoginAttempts > 0 || user.lockedUntil) {
await prisma.user.update({
where: { id: user.id },
data: {
failedLoginAttempts: 0,
lockedUntil: null,
},
});
}
// Include token_version in JWT for revocation support
const tokenVersion = user.token_version ?? 0;
const tokenVersion = user.tokenVersion ?? 0;
const token = jwt.sign(
{ userId: user.id, email: user.email, tokenVersion },
JWT_SECRET,
@ -265,43 +278,32 @@ export const loginUser = async (
* Revoke all existing tokens for a user by incrementing their token version.
* Call this when the user changes their password or explicitly logs out all devices.
*/
export const revokeAllTokens = (userId: number): void => {
const db = getDb();
db.prepare('UPDATE users SET token_version = token_version + 1 WHERE id = ?').run(
userId,
);
export const revokeAllTokens = async (userId: number): Promise<void> => {
await prisma.user.update({
where: { id: userId },
data: { tokenVersion: { increment: 1 } },
});
Logger.info(`All tokens revoked for user ${userId}`);
};
/**
* Replace the current JWT with a new one.
* This invalidates all existing tokens (including the current one) and returns a fresh token.
* Use this when a token was accidentally shared or compromised.
*/
export const replaceToken = (
export const replaceToken = async (
userId: number,
email: string,
): { token: string; user: { id: number; email: string } } => {
const db = getDb();
): Promise<{ token: string; user: { id: number; email: string } }> => {
// Use transaction to ensure atomicity of version increment and read
const newTokenVersion = db.transaction(() => {
const newTokenVersion = await prisma.$transaction(async (tx) => {
// Increment token version to invalidate all existing tokens
db.prepare('UPDATE users SET token_version = token_version + 1 WHERE id = ?').run(
userId,
);
// Get the new token version
const user = db
.prepare('SELECT token_version FROM users WHERE id = ?')
.get(userId) as { token_version: number } | undefined;
if (!user) {
throw new Error('User not found');
}
return user.token_version;
})();
const user = await tx.user.update({
where: { id: userId },
data: { tokenVersion: { increment: 1 } },
select: { tokenVersion: true },
});
return user.tokenVersion;
});
const token = jwt.sign({ userId, email, tokenVersion: newTokenVersion }, JWT_SECRET, {
expiresIn: JWT_EXPIRY,
@ -328,10 +330,10 @@ export const verifyToken = async (
});
// Verify user exists and token version matches
const db = getDb();
const user = db
.prepare('SELECT id, token_version FROM users WHERE id = ?')
.get(payload.userId) as { id: number; token_version: number } | undefined;
const user = await prisma.user.findUnique({
where: { id: payload.userId },
select: { id: true, tokenVersion: true },
});
if (!user) {
Logger.warn(`Token verification failed: User ${payload.userId} not found in DB`);
@ -341,7 +343,7 @@ export const verifyToken = async (
// Check token version - if it doesn't match, the token has been revoked
// (e.g., user changed password). Tokens without version are treated as version 0.
const tokenVersion = payload.tokenVersion ?? 0;
const currentVersion = user.token_version ?? 0;
const currentVersion = user.tokenVersion ?? 0;
if (tokenVersion !== currentVersion) {
Logger.warn(
`Token verification failed: Token version mismatch for user ${payload.userId} ` +

View file

@ -1,262 +1,24 @@
import Database from 'better-sqlite3';
import * as path from 'path';
import * as fs from 'fs';
import { Logger } from './logger';
import { PrismaClient } from '@prisma/client';
export interface User {
id: number;
email: string;
password_hash: string;
is_verified: number; // 0 or 1
verification_token: string | null;
verification_token_expires_at: number | null; // Unix timestamp
verification_resend_count: number; // number of times verification mail was resent
failed_login_attempts: number; // count of consecutive failed logins
locked_until: number | null; // Unix timestamp when lockout expires
token_version: number; // Incremented on password change to invalidate old tokens
terms_accepted_at: number | null; // Unix timestamp when terms were accepted
created_at: string;
}
// Initialize Prisma Client
// Log queries in development for debugging
export const prisma = new PrismaClient({
log:
process.env.NODE_ENV === 'development'
? ['query', 'info', 'warn', 'error']
: ['error'],
});
// Sync-related database types
export interface DbOperation {
id: string;
user_id: number;
client_id: string;
server_seq: number;
action_type: string;
op_type: string;
entity_type: string;
entity_id: string | null;
payload: string; // JSON
vector_clock: string; // JSON
schema_version: number;
client_timestamp: number;
received_at: number;
parent_op_id: string | null; // For conflict resolution chain tracking
}
// Re-export types for convenience
export type {
User,
Operation,
UserSyncState,
SyncDevice,
Tombstone,
} from '@prisma/client';
export interface DbUserSyncState {
user_id: number;
last_seq: number;
last_snapshot_seq: number | null;
snapshot_data: Buffer | null;
snapshot_at: number | null;
snapshot_schema_version: number | null;
}
export interface DbSyncDevice {
client_id: string;
user_id: number;
device_name: string | null;
user_agent: string | null;
last_seen_at: number;
last_acked_seq: number;
created_at: number;
}
export interface DbTombstone {
user_id: number;
entity_type: string;
entity_id: string;
deleted_at: number;
deleted_by_op_id: string;
expires_at: number;
}
let db: Database.Database;
export const initDb = (dataDir: string, inMemory = false): void => {
let dbPath: string;
if (inMemory) {
db = new Database(':memory:');
dbPath = ':memory:';
} else {
dbPath = path.join(dataDir, 'database.sqlite');
// Ensure data directory exists
if (!fs.existsSync(dataDir)) {
fs.mkdirSync(dataDir, { recursive: true });
}
db = new Database(dbPath);
db.pragma('journal_mode = WAL');
}
// Enforce foreign key constraints to guarantee cascading deletes
db.pragma('foreign_keys = ON');
// Create users table
db.exec(`
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
email TEXT UNIQUE NOT NULL,
password_hash TEXT NOT NULL,
is_verified INTEGER DEFAULT 0,
verification_token TEXT,
verification_token_expires_at INTEGER,
verification_resend_count INTEGER DEFAULT 0,
failed_login_attempts INTEGER DEFAULT 0,
locked_until INTEGER,
token_version INTEGER DEFAULT 0,
terms_accepted_at INTEGER,
created_at TEXT DEFAULT CURRENT_TIMESTAMP
)
`);
// Create index for verification token lookups
db.exec(`
CREATE INDEX IF NOT EXISTS idx_users_verification_token
ON users(verification_token)
WHERE verification_token IS NOT NULL
`);
// ===== SYNC TABLES =====
// Operations table (append-only operation log)
db.exec(`
CREATE TABLE IF NOT EXISTS operations (
id TEXT PRIMARY KEY,
user_id INTEGER NOT NULL,
client_id TEXT NOT NULL,
server_seq INTEGER NOT NULL,
action_type TEXT NOT NULL,
op_type TEXT NOT NULL,
entity_type TEXT NOT NULL,
entity_id TEXT,
payload TEXT NOT NULL,
vector_clock TEXT NOT NULL,
schema_version INTEGER NOT NULL,
client_timestamp INTEGER NOT NULL,
received_at INTEGER NOT NULL,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
)
`);
// Indexes for efficient sync queries
db.exec(`
CREATE INDEX IF NOT EXISTS idx_ops_user_seq ON operations(user_id, server_seq);
CREATE INDEX IF NOT EXISTS idx_ops_user_entity ON operations(user_id, entity_type, entity_id);
CREATE INDEX IF NOT EXISTS idx_ops_client ON operations(user_id, client_id);
CREATE UNIQUE INDEX IF NOT EXISTS idx_ops_user_seq_unique ON operations(user_id, server_seq);
CREATE INDEX IF NOT EXISTS idx_ops_received_at ON operations(user_id, received_at);
`);
// Per-user sequence counter and snapshot cache
db.exec(`
CREATE TABLE IF NOT EXISTS user_sync_state (
user_id INTEGER PRIMARY KEY,
last_seq INTEGER NOT NULL DEFAULT 0,
last_snapshot_seq INTEGER,
snapshot_data BLOB,
snapshot_at INTEGER,
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
)
`);
// Device/client tracking
db.exec(`
CREATE TABLE IF NOT EXISTS sync_devices (
client_id TEXT NOT NULL,
user_id INTEGER NOT NULL,
device_name TEXT,
user_agent TEXT,
last_seen_at INTEGER NOT NULL,
last_acked_seq INTEGER NOT NULL DEFAULT 0,
created_at INTEGER NOT NULL,
PRIMARY KEY (user_id, client_id),
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
)
`);
// Tombstones for deleted entities (prevents resurrection)
db.exec(`
CREATE TABLE IF NOT EXISTS tombstones (
user_id INTEGER NOT NULL,
entity_type TEXT NOT NULL,
entity_id TEXT NOT NULL,
deleted_at INTEGER NOT NULL,
deleted_by_op_id TEXT NOT NULL,
expires_at INTEGER NOT NULL,
PRIMARY KEY (user_id, entity_type, entity_id),
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
)
`);
// Index for tombstone cleanup
db.exec(`
CREATE INDEX IF NOT EXISTS idx_tombstones_expires ON tombstones(expires_at)
`);
// Migration: Check if verification_token_expires_at exists
const columns = db.pragma('table_info(users)') as { name: string }[];
const hasExpiresAt = columns.some(
(col) => col.name === 'verification_token_expires_at',
);
if (!hasExpiresAt) {
Logger.info('Migrating database: adding verification_token_expires_at column');
db.exec('ALTER TABLE users ADD COLUMN verification_token_expires_at INTEGER');
}
const hasResendCount = columns.some((col) => col.name === 'verification_resend_count');
if (!hasResendCount) {
Logger.info('Migrating database: adding verification_resend_count column');
db.exec('ALTER TABLE users ADD COLUMN verification_resend_count INTEGER DEFAULT 0');
}
// Migration: Add account lockout columns
const hasFailedLoginAttempts = columns.some(
(col) => col.name === 'failed_login_attempts',
);
if (!hasFailedLoginAttempts) {
Logger.info('Migrating database: adding account lockout columns');
db.exec('ALTER TABLE users ADD COLUMN failed_login_attempts INTEGER DEFAULT 0');
db.exec('ALTER TABLE users ADD COLUMN locked_until INTEGER');
}
// Migration: Add token versioning for token revocation
const hasTokenVersion = columns.some((col) => col.name === 'token_version');
if (!hasTokenVersion) {
Logger.info('Migrating database: adding token_version column');
db.exec('ALTER TABLE users ADD COLUMN token_version INTEGER DEFAULT 0');
}
// Migration: Add terms_accepted_at column
const hasTermsAcceptedAt = columns.some((col) => col.name === 'terms_accepted_at');
if (!hasTermsAcceptedAt) {
Logger.info('Migrating database: adding terms_accepted_at column');
db.exec('ALTER TABLE users ADD COLUMN terms_accepted_at INTEGER');
}
// Migration: Add schema version tracking for snapshots
const syncStateColumns = db.pragma('table_info(user_sync_state)') as { name: string }[];
const hasSnapshotSchemaVersion = syncStateColumns.some(
(col) => col.name === 'snapshot_schema_version',
);
if (!hasSnapshotSchemaVersion) {
Logger.info('Migrating database: adding snapshot_schema_version column');
db.exec(
'ALTER TABLE user_sync_state ADD COLUMN snapshot_schema_version INTEGER DEFAULT 1',
);
}
// Migration: Add parent_op_id column for conflict resolution chain tracking
const opsColumns = db.pragma('table_info(operations)') as { name: string }[];
const hasParentOpId = opsColumns.some((col) => col.name === 'parent_op_id');
if (!hasParentOpId) {
Logger.info('Migrating database: adding parent_op_id column to operations');
db.exec('ALTER TABLE operations ADD COLUMN parent_op_id TEXT');
}
Logger.info(`Database initialized at ${dbPath}`);
};
export const getDb = (): Database.Database => {
if (!db) {
throw new Error('Database not initialized');
}
return db;
// Helper to disconnect on shutdown
export const disconnectDb = async (): Promise<void> => {
await prisma.$disconnect();
};

View file

@ -7,7 +7,7 @@ import fastifyStatic from '@fastify/static';
import * as path from 'path';
import { loadConfigFromEnv, ServerConfig } from './config';
import { Logger } from './logger';
import { initDb, getDb } from './db';
import { prisma, disconnectDb } from './db';
import { apiRoutes } from './api';
import { pageRoutes } from './pages';
import { syncRoutes, startCleanupJobs, stopCleanupJobs } from './sync';
@ -24,9 +24,6 @@ export const createServer = (
} => {
const fullConfig = loadConfigFromEnv(config);
// Initialize Database
initDb(fullConfig.dataDir);
// Ensure data directory exists
if (!fs.existsSync(fullConfig.dataDir)) {
fs.mkdirSync(fullConfig.dataDir, { recursive: true });
@ -87,9 +84,8 @@ export const createServer = (
// Health Check - verifies database connectivity
fastifyServer.get('/health', async (_, reply) => {
try {
const db = getDb();
// Simple query to verify DB is responsive
db.prepare('SELECT 1').get();
await prisma.$queryRaw`SELECT 1`;
return { status: 'ok', db: 'connected' };
} catch (err) {
Logger.error('Health check failed: DB not responsive', err);
@ -137,6 +133,7 @@ export const createServer = (
await fastifyServer.close();
fastifyServer = undefined;
}
await disconnectDb();
},
};
};

View file

@ -10,7 +10,7 @@ import {
interface CleanupJob {
name: string;
interval: number;
run: () => number;
run: () => Promise<number> | number;
}
const CLEANUP_JOBS: CleanupJob[] = [
@ -49,9 +49,9 @@ const CLEANUP_JOBS: CleanupJob[] = [
const timers: Map<string, NodeJS.Timeout> = new Map();
const runJob = (job: CleanupJob): void => {
const runJob = async (job: CleanupJob): Promise<void> => {
try {
const deleted = job.run();
const deleted = await job.run();
if (deleted > 0) {
Logger.info(`Cleanup [${job.name}]: removed ${deleted} entries`);
}
@ -66,7 +66,7 @@ export const startCleanupJobs = (): void => {
// Run initial cleanup after a short delay
setTimeout(() => {
for (const job of CLEANUP_JOBS) {
runJob(job);
void runJob(job);
}
}, 10_000);
@ -74,7 +74,9 @@ export const startCleanupJobs = (): void => {
for (const job of CLEANUP_JOBS) {
timers.set(
job.name,
setInterval(() => runJob(job), job.interval),
setInterval(() => {
void runJob(job);
}, job.interval),
);
}

View file

@ -155,7 +155,7 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
Logger.info(
`[user:${userId}] Returning cached results for request ${requestId}`,
);
const latestSeq = syncService.getLatestSeq(userId);
const latestSeq = await syncService.getLatestSeq(userId);
return reply.send({
results: cachedResults,
latestSeq,
@ -165,7 +165,7 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
}
// Process operations - cast to Operation[] since Zod validates the structure
const results = syncService.uploadOps(
const results = await syncService.uploadOps(
userId,
clientId,
ops as unknown as import('./sync.types').Operation[],
@ -195,7 +195,7 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
if (lastKnownServerSeq !== undefined) {
// Use atomic read to get ops and latestSeq together
const opsResult = syncService.getOpsSinceWithSeq(
const opsResult = await syncService.getOpsSinceWithSeq(
userId,
lastKnownServerSeq,
clientId,
@ -209,7 +209,7 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
);
}
} else {
latestSeq = syncService.getLatestSeq(userId);
latestSeq = await syncService.getLatestSeq(userId);
}
const response: UploadOpsResponse = {
@ -272,7 +272,7 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
// Use atomic read to get ops and latestSeq in one transaction
// This prevents race conditions where new ops arrive between the two reads
const { ops, latestSeq, gapDetected } = syncService.getOpsSinceWithSeq(
const { ops, latestSeq, gapDetected } = await syncService.getOpsSinceWithSeq(
userId,
sinceSeq,
excludeClient,
@ -317,7 +317,7 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
Logger.info(`[user:${userId}] Snapshot requested`);
// Check if we have a cached snapshot
const cached = syncService.getCachedSnapshot(userId);
const cached = await syncService.getCachedSnapshot(userId);
if (
cached &&
Date.now() - cached.generatedAt < DEFAULT_SYNC_CONFIG.snapshotCacheTtlMs
@ -330,7 +330,7 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
// Generate fresh snapshot by replaying ops
Logger.info(`[user:${userId}] Generating fresh snapshot...`);
const snapshot = syncService.generateSnapshot(userId);
const snapshot = await syncService.generateSnapshot(userId);
Logger.info(`[user:${userId}] Snapshot generated (seq=${snapshot.serverSeq})`);
return reply.send(snapshot as SnapshotResponse);
} catch (err) {
@ -413,12 +413,12 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
schemaVersion: schemaVersion ?? 1,
};
const results = syncService.uploadOps(userId, clientId, [op]);
const results = await syncService.uploadOps(userId, clientId, [op]);
const result = results[0];
if (result.accepted && result.serverSeq !== undefined) {
// Cache the snapshot
syncService.cacheSnapshot(userId, state, result.serverSeq);
await syncService.cacheSnapshot(userId, state, result.serverSeq);
}
Logger.info(`Snapshot uploaded for user ${userId}, reason: ${reason}`);
@ -441,10 +441,10 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
const userId = getAuthUser(req).userId;
const syncService = getSyncService();
const latestSeq = syncService.getLatestSeq(userId);
const devicesOnline = syncService.getOnlineDeviceCount(userId);
const latestSeq = await syncService.getLatestSeq(userId);
const devicesOnline = await syncService.getOnlineDeviceCount(userId);
const cached = syncService.getCachedSnapshot(userId);
const cached = await syncService.getCachedSnapshot(userId);
const snapshotAge = cached ? Date.now() - cached.generatedAt : undefined;
Logger.debug(`[user:${userId}] Status: seq=${latestSeq}, devices=${devicesOnline}`);

File diff suppressed because it is too large Load diff

View file

@ -7,7 +7,7 @@
import { FastifyInstance } from 'fastify';
import * as bcrypt from 'bcryptjs';
import * as jwt from 'jsonwebtoken';
import { getDb } from './db';
import { prisma } from './db';
import { Logger } from './logger';
const BCRYPT_ROUNDS = 12;
@ -48,46 +48,45 @@ export const testRoutes = async (fastify: FastifyInstance): Promise<void> => {
async (request, reply) => {
const { email, password } = request.body;
const db = getDb();
const passwordHash = await bcrypt.hash(password, BCRYPT_ROUNDS);
try {
// Check if user already exists
const existingUser = db
.prepare('SELECT id, email FROM users WHERE email = ?')
.get(email) as { id: number; email: string } | undefined;
const existingUser = await prisma.user.findUnique({
where: { email },
});
let userId: number;
let tokenVersion: number;
if (existingUser) {
// User exists, get their current token version
const user = db
.prepare('SELECT token_version FROM users WHERE id = ?')
.get(existingUser.id) as { token_version: number } | undefined;
userId = existingUser.id;
tokenVersion = user?.token_version ?? 0;
tokenVersion = existingUser.tokenVersion ?? 0;
Logger.info(
`[TEST] Returning existing user: ${email} (ID: ${userId}) - Clearing old data`,
);
// Clear old data for this user to ensure clean state
db.prepare('DELETE FROM operations WHERE user_id = ?').run(userId);
db.prepare('DELETE FROM sync_devices WHERE user_id = ?').run(userId);
db.prepare('DELETE FROM user_sync_state WHERE user_id = ?').run(userId);
db.prepare('DELETE FROM tombstones WHERE user_id = ?').run(userId);
await prisma.$transaction([
prisma.operation.deleteMany({ where: { userId } }),
prisma.syncDevice.deleteMany({ where: { userId } }),
prisma.userSyncState.deleteMany({ where: { userId } }),
prisma.tombstone.deleteMany({ where: { userId } }),
]);
} else {
// Create user with is_verified=1 (skip email verification)
const info = db
.prepare(
`
INSERT INTO users (email, password_hash, is_verified, verification_token, verification_token_expires_at, token_version)
VALUES (?, ?, 1, NULL, NULL, 0)
`,
)
.run(email, passwordHash);
// Create user with isVerified=1 (skip email verification)
const user = await prisma.user.create({
data: {
email,
passwordHash,
isVerified: 1,
verificationToken: null,
verificationTokenExpiresAt: null,
tokenVersion: 0,
},
});
userId = info.lastInsertRowid as number;
userId = user.id;
tokenVersion = 0;
Logger.info(`[TEST] Created test user: ${email} (ID: ${userId})`);
}
@ -117,15 +116,15 @@ export const testRoutes = async (fastify: FastifyInstance): Promise<void> => {
* Wipes users, operations, sync state, and devices.
*/
fastify.post('/cleanup', async (_request, reply) => {
const db = getDb();
try {
// Delete in correct order due to foreign key constraints
db.exec('DELETE FROM operations');
db.exec('DELETE FROM sync_devices');
db.exec('DELETE FROM user_sync_state');
db.exec('DELETE FROM tombstones');
db.exec('DELETE FROM users');
// Delete in correct order due to foreign key constraints (cascades usually handle it, but explicit is safer)
await prisma.$transaction([
prisma.operation.deleteMany(),
prisma.syncDevice.deleteMany(),
prisma.userSyncState.deleteMany(),
prisma.tombstone.deleteMany(),
prisma.user.deleteMany(),
]);
Logger.info('[TEST] All test data cleaned up');