mirror of
https://github.com/johannesjo/super-productivity.git
synced 2026-01-23 02:36:05 +00:00
feat(sync-server): implement operation log sync API
Add server-side operation log synchronization: - Database schema: operations, user_sync_state, sync_devices, tombstones - SyncService: upload/download ops, snapshots, rate limiting, validation - REST API: POST/GET /api/sync/ops, /snapshot, /status, /devices/:id/ack - Cleanup jobs: daily tombstone/old ops cleanup, hourly stale device cleanup - Compression: gzip for snapshots - Security: timestamp validation, payload size limits, idempotency Endpoints: - POST /api/sync/ops - Upload operations batch - GET /api/sync/ops?sinceSeq=N - Download ops since sequence - GET /api/sync/snapshot - Get full state snapshot - POST /api/sync/snapshot - Upload full state (migration/recovery) - GET /api/sync/status - Get sync status - POST /api/sync/devices/:clientId/ack - Acknowledge sequences
This commit is contained in:
parent
5d7bc5d7fb
commit
cdaedf4fb8
8 changed files with 614 additions and 54 deletions
|
|
@ -75,40 +75,66 @@ Replaying 100,000 operations to build the initial state is slow.
|
|||
- **Snapshots:** Periodically (e.g., every 1,000 ops), generating a full state snapshot.
|
||||
- **Hybrid Sync:** New devices download the latest Snapshot + any Operations occurring _after_ that snapshot.
|
||||
|
||||
## 4. Conflict Resolution Strategies
|
||||
## 4. Client-Side Optimization: Coalescing & Batching
|
||||
|
||||
### 4.1. Detection
|
||||
### 4.1. Write Coalescing (Squashing)
|
||||
|
||||
Coalescing is the practice of merging multiple granular updates to the same entity into a single operation _before_ it is synced to the server.
|
||||
|
||||
- **When to use:**
|
||||
- **Rapid Text Entry:** If a user types "Hello" (5 ops), squash into 1 op ("Hello") after a debounce period (e.g., 500ms).
|
||||
- **Slider/Drag Adjustments:** If a user drags a progress bar from 0% to 50%, typically only the final value (50%) matters.
|
||||
- **Benefits:**
|
||||
- Reduces server storage growth.
|
||||
- Speeds up replay/initial sync for other clients.
|
||||
- Reduces network overhead.
|
||||
- **Risks:**
|
||||
- **History Loss:** You lose the detailed audit trail of intermediate keystrokes. (Usually acceptable).
|
||||
- **Conflict Complexity:** If two users edit the same field simultaneously, coalesced ops can make "Character-level" merging (OT/CRDT) harder if you are relying on simple LWW. But for field-level LWW, it is actually helpful.
|
||||
|
||||
### 4.2. Logical Batching
|
||||
|
||||
A "Batch" operation is a container for multiple distinct actions that should be treated atomically.
|
||||
|
||||
- **Use Case:** "Create Task" + "Add Tag" + "Move to Project X".
|
||||
- **Implementation:** The client sends a single `BATCH` op containing an array of sub-operations.
|
||||
- **Benefit:** Ensures database consistency. If the network fails, the task isn't created without its tag.
|
||||
|
||||
## 5. Conflict Resolution Strategies
|
||||
|
||||
### 5.1. Detection
|
||||
|
||||
- **Server-side:** "I received an update for Entity X based on v1, but I am already at v2."
|
||||
- **Client-side:** "I downloaded Op A (ServerSeq 50) which modifies Task 1, but I have a local pending Op B that also modifies Task 1."
|
||||
|
||||
### 4.2. Resolution Models
|
||||
### 5.2. Resolution Models
|
||||
|
||||
1. **Last-Write-Wins (LWW):** Simple, robust, but data loss is possible. Uses wall-clock timestamps.
|
||||
2. **Three-Way Merge:** If the payload is diff-able (e.g., JSON Patch), try to merge.
|
||||
3. **Manual:** Flag the conflict and ask the user (complex UI).
|
||||
4. **Hybrid:** LWW for simple fields (Title), Merge for collections (Tags), Append for logs (Time tracking).
|
||||
|
||||
## 5. Security & Reliability
|
||||
## 6. Security & Reliability
|
||||
|
||||
### 5.1. Authentication
|
||||
### 6.1. Authentication
|
||||
|
||||
- **Token-Based:** JWTs are standard.
|
||||
- **Scope:** Tokens should restrict access to specific `user_id` partitions.
|
||||
|
||||
### 5.2. Encryption
|
||||
### 6.2. Encryption
|
||||
|
||||
- **Transport:** HTTPS/WSS is mandatory.
|
||||
- **At Rest:** If the server is untrusted, clients should encrypt the `payload` field before sending. The server syncs encrypted blobs. (End-to-End Encryption - E2EE).
|
||||
|
||||
### 5.3. Rate Limiting
|
||||
### 6.3. Rate Limiting
|
||||
|
||||
- Essential to prevent a single malfunctioning client from flooding the log.
|
||||
|
||||
## 6. Summary Checklist
|
||||
## 7. Summary Checklist
|
||||
|
||||
- [ ] **Database:** SQLite (Start) / Postgres (Scale).
|
||||
- [ ] **ID Generation:** UUID v7 (Time-ordered).
|
||||
- [ ] **Protocol:** HTTP (Reliable) + WebSocket (Notify).
|
||||
- [ ] **Optimization:** Coalesce rapid local edits; Batch atomic actions.
|
||||
- [ ] **Conflict:** Server assigns order; Client reconciles.
|
||||
- [ ] **Safety:** Tombstones for deletions, Snapshots for speed.
|
||||
|
|
|
|||
|
|
@ -1400,7 +1400,7 @@ async deleteStaleDevices(beforeTime: number): Promise<{ changes: number }> {
|
|||
|
||||
## 12. Compression
|
||||
|
||||
Large payloads are compressed to reduce bandwidth and storage.
|
||||
Large payloads are compressed to reduce bandwidth.
|
||||
|
||||
### Request/Response Compression
|
||||
|
||||
|
|
@ -1422,41 +1422,6 @@ async function start(): Promise<void> {
|
|||
}
|
||||
```
|
||||
|
||||
### Payload Compression in Service
|
||||
|
||||
```typescript
|
||||
import * as zlib from 'zlib';
|
||||
|
||||
// Compress large operation payloads before storage
|
||||
function compressIfNeeded(payload: unknown): { data: Buffer; compressed: boolean } {
|
||||
const json = JSON.stringify(payload);
|
||||
|
||||
if (json.length > COMPRESSION_THRESHOLD) {
|
||||
return {
|
||||
data: zlib.gzipSync(json),
|
||||
compressed: true,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
data: Buffer.from(json, 'utf-8'),
|
||||
compressed: false,
|
||||
};
|
||||
}
|
||||
|
||||
// Decompress when reading
|
||||
function decompressPayload(data: Buffer, compressed: boolean): unknown {
|
||||
if (compressed) {
|
||||
const decompressed = zlib.gunzipSync(data).toString('utf-8');
|
||||
return JSON.parse(decompressed);
|
||||
}
|
||||
return JSON.parse(data.toString('utf-8'));
|
||||
}
|
||||
|
||||
// Updated schema to track compression
|
||||
// ALTER TABLE operations ADD COLUMN payload_compressed BOOLEAN DEFAULT FALSE;
|
||||
```
|
||||
|
||||
### Snapshot Compression
|
||||
|
||||
Snapshots are always compressed (see `cacheSnapshot` method in Section 3).
|
||||
|
|
@ -1482,12 +1447,6 @@ It keeps the server fast, simple, and agnostic to domain logic. It supports End-
|
|||
## References
|
||||
|
||||
- [Server Sync Architecture](./server-sync-architecture.md) - Detailed algorithms and design
|
||||
- [Operation Log Sync Research](./operation-log-sync-research.md) - Industry best practices (Replicache, Linear, Figma)
|
||||
- [Operation Log Best Practices](./operation-log-sync-best-practices.md) - Industry standards and architectural decisions
|
||||
- [Operation Log Architecture](./operation-log-architecture.md) - Client-side system
|
||||
- [Execution Plan](./operation-log-execution-plan.md) - Client implementation tasks
|
||||
|
||||
### External References
|
||||
|
||||
- [Replicache Documentation](https://doc.replicache.dev/)
|
||||
- [Linear Sync Engine](https://github.com/wzhudev/reverse-linear-sync-engine)
|
||||
- [Figma LiveGraph](https://www.figma.com/blog/livegraph-real-time-data-fetching-at-figma/)
|
||||
|
|
|
|||
|
|
@ -1782,7 +1782,7 @@ Our operation log follows CRDT principles from Bartosz Sypytkowski's research:
|
|||
|
||||
## References
|
||||
|
||||
- [Operation Log Sync Research](./operation-log-sync-research.md) - Industry best practices
|
||||
- [Operation Log Best Practices](./operation-log-sync-best-practices.md) - Industry best practices
|
||||
- [Operation Log Architecture](./operation-log-architecture.md) - Full system design
|
||||
- [Execution Plan](./operation-log-execution-plan.md) - Implementation tasks
|
||||
- [PFAPI Architecture](./pfapi-sync-persistence-architecture.md) - Legacy sync system
|
||||
|
|
|
|||
|
|
@ -14,6 +14,50 @@ export interface User {
|
|||
created_at: string;
|
||||
}
|
||||
|
||||
// Sync-related database types
|
||||
export interface DbOperation {
|
||||
id: string;
|
||||
user_id: number;
|
||||
client_id: string;
|
||||
server_seq: number;
|
||||
action_type: string;
|
||||
op_type: string;
|
||||
entity_type: string;
|
||||
entity_id: string | null;
|
||||
payload: string; // JSON
|
||||
vector_clock: string; // JSON
|
||||
schema_version: number;
|
||||
client_timestamp: number;
|
||||
received_at: number;
|
||||
}
|
||||
|
||||
export interface DbUserSyncState {
|
||||
user_id: number;
|
||||
last_seq: number;
|
||||
last_snapshot_seq: number | null;
|
||||
snapshot_data: Buffer | null;
|
||||
snapshot_at: number | null;
|
||||
}
|
||||
|
||||
export interface DbSyncDevice {
|
||||
client_id: string;
|
||||
user_id: number;
|
||||
device_name: string | null;
|
||||
user_agent: string | null;
|
||||
last_seen_at: number;
|
||||
last_acked_seq: number;
|
||||
created_at: number;
|
||||
}
|
||||
|
||||
export interface DbTombstone {
|
||||
user_id: number;
|
||||
entity_type: string;
|
||||
entity_id: string;
|
||||
deleted_at: number;
|
||||
deleted_by_op_id: string;
|
||||
expires_at: number;
|
||||
}
|
||||
|
||||
let db: Database.Database;
|
||||
|
||||
export const initDb = (dataDir: string): void => {
|
||||
|
|
@ -42,11 +86,87 @@ export const initDb = (dataDir: string): void => {
|
|||
|
||||
// Create index for verification token lookups
|
||||
db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_users_verification_token
|
||||
ON users(verification_token)
|
||||
CREATE INDEX IF NOT EXISTS idx_users_verification_token
|
||||
ON users(verification_token)
|
||||
WHERE verification_token IS NOT NULL
|
||||
`);
|
||||
|
||||
// ===== SYNC TABLES =====
|
||||
|
||||
// Operations table (append-only operation log)
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS operations (
|
||||
id TEXT PRIMARY KEY,
|
||||
user_id INTEGER NOT NULL,
|
||||
client_id TEXT NOT NULL,
|
||||
server_seq INTEGER NOT NULL,
|
||||
action_type TEXT NOT NULL,
|
||||
op_type TEXT NOT NULL,
|
||||
entity_type TEXT NOT NULL,
|
||||
entity_id TEXT,
|
||||
payload TEXT NOT NULL,
|
||||
vector_clock TEXT NOT NULL,
|
||||
schema_version INTEGER NOT NULL,
|
||||
client_timestamp INTEGER NOT NULL,
|
||||
received_at INTEGER NOT NULL,
|
||||
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
|
||||
)
|
||||
`);
|
||||
|
||||
// Indexes for efficient sync queries
|
||||
db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_ops_user_seq ON operations(user_id, server_seq);
|
||||
CREATE INDEX IF NOT EXISTS idx_ops_user_entity ON operations(user_id, entity_type, entity_id);
|
||||
CREATE INDEX IF NOT EXISTS idx_ops_client ON operations(user_id, client_id);
|
||||
CREATE UNIQUE INDEX IF NOT EXISTS idx_ops_user_seq_unique ON operations(user_id, server_seq);
|
||||
`);
|
||||
|
||||
// Per-user sequence counter and snapshot cache
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS user_sync_state (
|
||||
user_id INTEGER PRIMARY KEY,
|
||||
last_seq INTEGER NOT NULL DEFAULT 0,
|
||||
last_snapshot_seq INTEGER,
|
||||
snapshot_data BLOB,
|
||||
snapshot_at INTEGER,
|
||||
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
|
||||
)
|
||||
`);
|
||||
|
||||
// Device/client tracking
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS sync_devices (
|
||||
client_id TEXT NOT NULL,
|
||||
user_id INTEGER NOT NULL,
|
||||
device_name TEXT,
|
||||
user_agent TEXT,
|
||||
last_seen_at INTEGER NOT NULL,
|
||||
last_acked_seq INTEGER NOT NULL DEFAULT 0,
|
||||
created_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (user_id, client_id),
|
||||
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
|
||||
)
|
||||
`);
|
||||
|
||||
// Tombstones for deleted entities (prevents resurrection)
|
||||
db.exec(`
|
||||
CREATE TABLE IF NOT EXISTS tombstones (
|
||||
user_id INTEGER NOT NULL,
|
||||
entity_type TEXT NOT NULL,
|
||||
entity_id TEXT NOT NULL,
|
||||
deleted_at INTEGER NOT NULL,
|
||||
deleted_by_op_id TEXT NOT NULL,
|
||||
expires_at INTEGER NOT NULL,
|
||||
PRIMARY KEY (user_id, entity_type, entity_id),
|
||||
FOREIGN KEY(user_id) REFERENCES users(id) ON DELETE CASCADE
|
||||
)
|
||||
`);
|
||||
|
||||
// Index for tombstone cleanup
|
||||
db.exec(`
|
||||
CREATE INDEX IF NOT EXISTS idx_tombstones_expires ON tombstones(expires_at)
|
||||
`);
|
||||
|
||||
// Migration: Check if verification_token_expires_at exists
|
||||
const columns = db.pragma('table_info(users)') as { name: string }[];
|
||||
const hasExpiresAt = columns.some(
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import { initDb } from './db';
|
|||
import { apiRoutes } from './api';
|
||||
import { pageRoutes } from './pages';
|
||||
import { verifyToken } from './auth';
|
||||
import { syncRoutes, startCleanupJobs, stopCleanupJobs } from './sync';
|
||||
|
||||
export { ServerConfig, loadConfigFromEnv };
|
||||
|
||||
|
|
@ -210,9 +211,15 @@ export const createServer = (
|
|||
// API Routes
|
||||
await fastifyServer.register(apiRoutes, { prefix: '/api' });
|
||||
|
||||
// Sync Routes
|
||||
await fastifyServer.register(syncRoutes, { prefix: '/api/sync' });
|
||||
|
||||
// Page Routes
|
||||
await fastifyServer.register(pageRoutes, { prefix: '/' });
|
||||
|
||||
// Start cleanup jobs
|
||||
startCleanupJobs();
|
||||
|
||||
// WebDAV Handler (Catch-all via hook)
|
||||
// We use a hook because Fastify's router validates HTTP methods and might not support all WebDAV methods
|
||||
fastifyServer.addHook('onRequest', async (req, reply) => {
|
||||
|
|
@ -295,6 +302,7 @@ export const createServer = (
|
|||
}
|
||||
},
|
||||
stop: async (): Promise<void> => {
|
||||
stopCleanupJobs();
|
||||
if (fastifyServer) {
|
||||
await fastifyServer.close();
|
||||
fastifyServer = undefined;
|
||||
|
|
|
|||
128
packages/super-sync-server/src/sync/cleanup.ts
Normal file
128
packages/super-sync-server/src/sync/cleanup.ts
Normal file
|
|
@ -0,0 +1,128 @@
|
|||
import { getSyncService } from './sync.service';
|
||||
import { Logger } from '../logger';
|
||||
import { DEFAULT_SYNC_CONFIG } from './sync.types';
|
||||
|
||||
// Cleanup intervals
|
||||
const TOMBSTONE_CLEANUP_INTERVAL = 24 * 60 * 60 * 1000; // Daily
|
||||
const OLD_OPS_CLEANUP_INTERVAL = 24 * 60 * 60 * 1000; // Daily
|
||||
const STALE_DEVICES_CLEANUP_INTERVAL = 60 * 60 * 1000; // Hourly
|
||||
|
||||
let tombstoneCleanupTimer: NodeJS.Timeout | null = null;
|
||||
let oldOpsCleanupTimer: NodeJS.Timeout | null = null;
|
||||
let staleDevicesCleanupTimer: NodeJS.Timeout | null = null;
|
||||
|
||||
/**
|
||||
* Clean up expired tombstones
|
||||
*/
|
||||
async function cleanupExpiredTombstones(): Promise<void> {
|
||||
try {
|
||||
const syncService = getSyncService();
|
||||
const deleted = syncService.deleteExpiredTombstones();
|
||||
if (deleted > 0) {
|
||||
Logger.info(`Tombstone cleanup: deleted ${deleted} expired tombstones`);
|
||||
}
|
||||
} catch (error) {
|
||||
Logger.error(`Tombstone cleanup failed: ${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up old operations that have been acknowledged by all devices
|
||||
*/
|
||||
async function cleanupOldOperations(): Promise<void> {
|
||||
try {
|
||||
const syncService = getSyncService();
|
||||
const cutoffTime = Date.now() - DEFAULT_SYNC_CONFIG.opRetentionMs;
|
||||
const userIds = syncService.getAllUserIds();
|
||||
|
||||
let totalDeleted = 0;
|
||||
|
||||
for (const userId of userIds) {
|
||||
// Find minimum acknowledged sequence across all devices
|
||||
const minAckedSeq = syncService.getMinAckedSeq(userId);
|
||||
|
||||
if (minAckedSeq === null) {
|
||||
// No devices have acknowledged - skip this user
|
||||
continue;
|
||||
}
|
||||
|
||||
// Only delete operations that:
|
||||
// 1. Are older than cutoff time (90 days)
|
||||
// 2. Have been acknowledged by all devices (seq < minAckedSeq)
|
||||
const deleted = syncService.deleteOldSyncedOps(userId, minAckedSeq, cutoffTime);
|
||||
|
||||
totalDeleted += deleted;
|
||||
}
|
||||
|
||||
if (totalDeleted > 0) {
|
||||
Logger.info(`Old operations cleanup: deleted ${totalDeleted} operations`);
|
||||
}
|
||||
} catch (error) {
|
||||
Logger.error(`Old operations cleanup failed: ${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Clean up stale devices (not seen in 90 days)
|
||||
*/
|
||||
async function cleanupStaleDevices(): Promise<void> {
|
||||
try {
|
||||
const syncService = getSyncService();
|
||||
const staleThreshold = Date.now() - 90 * 24 * 60 * 60 * 1000; // 90 days
|
||||
|
||||
const deleted = syncService.deleteStaleDevices(staleThreshold);
|
||||
if (deleted > 0) {
|
||||
Logger.info(`Stale device cleanup: removed ${deleted} devices`);
|
||||
}
|
||||
} catch (error) {
|
||||
Logger.error(`Stale device cleanup failed: ${error}`);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Start all cleanup jobs
|
||||
*/
|
||||
export function startCleanupJobs(): void {
|
||||
Logger.info('Starting sync cleanup jobs...');
|
||||
|
||||
// Run initial cleanup after a short delay
|
||||
setTimeout(() => {
|
||||
cleanupExpiredTombstones();
|
||||
cleanupOldOperations();
|
||||
cleanupStaleDevices();
|
||||
}, 10_000);
|
||||
|
||||
// Schedule recurring jobs
|
||||
tombstoneCleanupTimer = setInterval(
|
||||
cleanupExpiredTombstones,
|
||||
TOMBSTONE_CLEANUP_INTERVAL,
|
||||
);
|
||||
|
||||
oldOpsCleanupTimer = setInterval(cleanupOldOperations, OLD_OPS_CLEANUP_INTERVAL);
|
||||
|
||||
staleDevicesCleanupTimer = setInterval(
|
||||
cleanupStaleDevices,
|
||||
STALE_DEVICES_CLEANUP_INTERVAL,
|
||||
);
|
||||
|
||||
Logger.info('Cleanup jobs scheduled');
|
||||
}
|
||||
|
||||
/**
|
||||
* Stop all cleanup jobs
|
||||
*/
|
||||
export function stopCleanupJobs(): void {
|
||||
if (tombstoneCleanupTimer) {
|
||||
clearInterval(tombstoneCleanupTimer);
|
||||
tombstoneCleanupTimer = null;
|
||||
}
|
||||
if (oldOpsCleanupTimer) {
|
||||
clearInterval(oldOpsCleanupTimer);
|
||||
oldOpsCleanupTimer = null;
|
||||
}
|
||||
if (staleDevicesCleanupTimer) {
|
||||
clearInterval(staleDevicesCleanupTimer);
|
||||
staleDevicesCleanupTimer = null;
|
||||
}
|
||||
Logger.info('Cleanup jobs stopped');
|
||||
}
|
||||
4
packages/super-sync-server/src/sync/index.ts
Normal file
4
packages/super-sync-server/src/sync/index.ts
Normal file
|
|
@ -0,0 +1,4 @@
|
|||
export { syncRoutes } from './sync.routes';
|
||||
export { SyncService, getSyncService, initSyncService } from './sync.service';
|
||||
export { startCleanupJobs, stopCleanupJobs } from './cleanup';
|
||||
export * from './sync.types';
|
||||
315
packages/super-sync-server/src/sync/sync.routes.ts
Normal file
315
packages/super-sync-server/src/sync/sync.routes.ts
Normal file
|
|
@ -0,0 +1,315 @@
|
|||
import { FastifyInstance, FastifyRequest, FastifyReply } from 'fastify';
|
||||
import { z } from 'zod';
|
||||
import { authenticate } from '../middleware';
|
||||
import { getSyncService } from './sync.service';
|
||||
import { Logger } from '../logger';
|
||||
import {
|
||||
UploadOpsRequest,
|
||||
UploadOpsResponse,
|
||||
DownloadOpsResponse,
|
||||
SnapshotResponse,
|
||||
SyncStatusResponse,
|
||||
DEFAULT_SYNC_CONFIG,
|
||||
} from './sync.types';
|
||||
|
||||
// Zod Schemas
|
||||
const OperationSchema = z.object({
|
||||
id: z.string().min(1),
|
||||
clientId: z.string().min(1),
|
||||
actionType: z.string().min(1),
|
||||
opType: z.enum(['CRT', 'UPD', 'DEL', 'MOV', 'BATCH', 'SYNC_IMPORT']),
|
||||
entityType: z.string().min(1),
|
||||
entityId: z.string().optional(),
|
||||
payload: z.unknown(),
|
||||
vectorClock: z.record(z.string(), z.number()),
|
||||
timestamp: z.number(),
|
||||
schemaVersion: z.number(),
|
||||
});
|
||||
|
||||
const UploadOpsSchema = z.object({
|
||||
ops: z.array(OperationSchema).min(1).max(DEFAULT_SYNC_CONFIG.maxOpsPerUpload),
|
||||
clientId: z.string().min(1),
|
||||
lastKnownServerSeq: z.number().optional(),
|
||||
});
|
||||
|
||||
const DownloadOpsQuerySchema = z.object({
|
||||
sinceSeq: z.coerce.number().int().min(0),
|
||||
limit: z.coerce.number().int().min(1).max(1000).optional(),
|
||||
excludeClient: z.string().optional(),
|
||||
});
|
||||
|
||||
const UploadSnapshotSchema = z.object({
|
||||
state: z.unknown(),
|
||||
clientId: z.string().min(1),
|
||||
reason: z.enum(['initial', 'recovery', 'migration']),
|
||||
});
|
||||
|
||||
const AckSchema = z.object({
|
||||
lastSeq: z.number().int().min(0),
|
||||
});
|
||||
|
||||
// Error helper
|
||||
const errorMessage = (err: unknown): string =>
|
||||
err instanceof Error ? err.message : 'Unknown error';
|
||||
|
||||
export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
|
||||
// All sync routes require authentication
|
||||
fastify.addHook('preHandler', authenticate);
|
||||
|
||||
// POST /api/sync/ops - Upload operations
|
||||
fastify.post<{ Body: UploadOpsRequest }>(
|
||||
'/ops',
|
||||
{
|
||||
config: {
|
||||
rateLimit: {
|
||||
max: 100,
|
||||
timeWindow: '1 minute',
|
||||
},
|
||||
},
|
||||
},
|
||||
async (req: FastifyRequest<{ Body: UploadOpsRequest }>, reply: FastifyReply) => {
|
||||
try {
|
||||
const userId = req.user!.userId;
|
||||
|
||||
// Validate request body
|
||||
const parseResult = UploadOpsSchema.safeParse(req.body);
|
||||
if (!parseResult.success) {
|
||||
return reply.status(400).send({
|
||||
error: 'Validation failed',
|
||||
details: parseResult.error.issues,
|
||||
});
|
||||
}
|
||||
|
||||
const { ops, clientId, lastKnownServerSeq } = parseResult.data;
|
||||
const syncService = getSyncService();
|
||||
|
||||
// Rate limit check
|
||||
if (syncService.isRateLimited(userId)) {
|
||||
return reply.status(429).send({ error: 'Rate limited' });
|
||||
}
|
||||
|
||||
// Process operations - cast to Operation[] since Zod validates the structure
|
||||
const results = syncService.uploadOps(
|
||||
userId,
|
||||
clientId,
|
||||
ops as unknown as import('./sync.types').Operation[],
|
||||
);
|
||||
|
||||
// Optionally include new ops from other clients
|
||||
let newOps: import('./sync.types').ServerOperation[] | undefined;
|
||||
if (lastKnownServerSeq !== undefined) {
|
||||
newOps = syncService.getOpsSince(userId, lastKnownServerSeq, clientId, 100);
|
||||
}
|
||||
|
||||
const response: UploadOpsResponse = {
|
||||
results,
|
||||
newOps: newOps && newOps.length > 0 ? newOps : undefined,
|
||||
latestSeq: syncService.getLatestSeq(userId),
|
||||
};
|
||||
|
||||
return reply.send(response);
|
||||
} catch (err) {
|
||||
Logger.error(`Upload ops error: ${errorMessage(err)}`);
|
||||
return reply.status(500).send({ error: 'Internal server error' });
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// GET /api/sync/ops - Download operations
|
||||
fastify.get<{
|
||||
Querystring: { sinceSeq: string; limit?: string; excludeClient?: string };
|
||||
}>(
|
||||
'/ops',
|
||||
{
|
||||
config: {
|
||||
rateLimit: {
|
||||
max: 200,
|
||||
timeWindow: '1 minute',
|
||||
},
|
||||
},
|
||||
},
|
||||
async (req, reply) => {
|
||||
try {
|
||||
const userId = req.user!.userId;
|
||||
|
||||
// Validate query params
|
||||
const parseResult = DownloadOpsQuerySchema.safeParse(req.query);
|
||||
if (!parseResult.success) {
|
||||
return reply.status(400).send({
|
||||
error: 'Validation failed',
|
||||
details: parseResult.error.issues,
|
||||
});
|
||||
}
|
||||
|
||||
const { sinceSeq, limit = 500, excludeClient } = parseResult.data;
|
||||
const syncService = getSyncService();
|
||||
|
||||
const maxLimit = Math.min(limit, 1000);
|
||||
const ops = syncService.getOpsSince(
|
||||
userId,
|
||||
sinceSeq,
|
||||
excludeClient,
|
||||
maxLimit + 1,
|
||||
);
|
||||
|
||||
const hasMore = ops.length > maxLimit;
|
||||
if (hasMore) ops.pop();
|
||||
|
||||
const response: DownloadOpsResponse = {
|
||||
ops,
|
||||
hasMore,
|
||||
latestSeq: syncService.getLatestSeq(userId),
|
||||
};
|
||||
|
||||
return reply.send(response);
|
||||
} catch (err) {
|
||||
Logger.error(`Download ops error: ${errorMessage(err)}`);
|
||||
return reply.status(500).send({ error: 'Internal server error' });
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// GET /api/sync/snapshot - Get full state snapshot
|
||||
fastify.get('/snapshot', async (req, reply) => {
|
||||
try {
|
||||
const userId = req.user!.userId;
|
||||
const syncService = getSyncService();
|
||||
|
||||
// Check if we have a cached snapshot
|
||||
const cached = syncService.getCachedSnapshot(userId);
|
||||
if (
|
||||
cached &&
|
||||
Date.now() - cached.generatedAt < DEFAULT_SYNC_CONFIG.snapshotCacheTtlMs
|
||||
) {
|
||||
return reply.send(cached as SnapshotResponse);
|
||||
}
|
||||
|
||||
// Generate fresh snapshot by replaying ops
|
||||
const snapshot = syncService.generateSnapshot(userId);
|
||||
return reply.send(snapshot as SnapshotResponse);
|
||||
} catch (err) {
|
||||
Logger.error(`Get snapshot error: ${errorMessage(err)}`);
|
||||
return reply.status(500).send({ error: 'Internal server error' });
|
||||
}
|
||||
});
|
||||
|
||||
// POST /api/sync/snapshot - Upload full state
|
||||
fastify.post<{ Body: { state: unknown; clientId: string; reason: string } }>(
|
||||
'/snapshot',
|
||||
async (req, reply) => {
|
||||
try {
|
||||
const userId = req.user!.userId;
|
||||
|
||||
// Validate request body
|
||||
const parseResult = UploadSnapshotSchema.safeParse(req.body);
|
||||
if (!parseResult.success) {
|
||||
return reply.status(400).send({
|
||||
error: 'Validation failed',
|
||||
details: parseResult.error.issues,
|
||||
});
|
||||
}
|
||||
|
||||
const { state, clientId, reason } = parseResult.data;
|
||||
const syncService = getSyncService();
|
||||
|
||||
// Create a SYNC_IMPORT operation
|
||||
const op = {
|
||||
id: generateOpId(),
|
||||
clientId,
|
||||
actionType: 'SYNC_IMPORT',
|
||||
opType: 'SYNC_IMPORT' as const,
|
||||
entityType: 'ALL',
|
||||
payload: state,
|
||||
vectorClock: {},
|
||||
timestamp: Date.now(),
|
||||
schemaVersion: 1,
|
||||
};
|
||||
|
||||
const results = syncService.uploadOps(userId, clientId, [op]);
|
||||
const result = results[0];
|
||||
|
||||
if (result.accepted && result.serverSeq !== undefined) {
|
||||
// Cache the snapshot
|
||||
syncService.cacheSnapshot(userId, state, result.serverSeq);
|
||||
}
|
||||
|
||||
Logger.info(`Snapshot uploaded for user ${userId}, reason: ${reason}`);
|
||||
|
||||
return reply.send({
|
||||
accepted: result.accepted,
|
||||
serverSeq: result.serverSeq,
|
||||
error: result.error,
|
||||
});
|
||||
} catch (err) {
|
||||
Logger.error(`Upload snapshot error: ${errorMessage(err)}`);
|
||||
return reply.status(500).send({ error: 'Internal server error' });
|
||||
}
|
||||
},
|
||||
);
|
||||
|
||||
// GET /api/sync/status - Get sync status
|
||||
fastify.get('/status', async (req, reply) => {
|
||||
try {
|
||||
const userId = req.user!.userId;
|
||||
const syncService = getSyncService();
|
||||
|
||||
const latestSeq = syncService.getLatestSeq(userId);
|
||||
const minAckedSeq = syncService.getMinAckedSeq(userId);
|
||||
const pendingOps = minAckedSeq !== null ? latestSeq - minAckedSeq : 0;
|
||||
|
||||
const cached = syncService.getCachedSnapshot(userId);
|
||||
const snapshotAge = cached ? Date.now() - cached.generatedAt : undefined;
|
||||
|
||||
const response: SyncStatusResponse = {
|
||||
latestSeq,
|
||||
devicesOnline: syncService.getOnlineDeviceCount(userId),
|
||||
pendingOps,
|
||||
snapshotAge,
|
||||
};
|
||||
|
||||
return reply.send(response);
|
||||
} catch (err) {
|
||||
Logger.error(`Get status error: ${errorMessage(err)}`);
|
||||
return reply.status(500).send({ error: 'Internal server error' });
|
||||
}
|
||||
});
|
||||
|
||||
// POST /api/sync/devices/:clientId/ack - Acknowledge received sequences
|
||||
fastify.post<{ Params: { clientId: string }; Body: { lastSeq: number } }>(
|
||||
'/devices/:clientId/ack',
|
||||
async (req, reply) => {
|
||||
try {
|
||||
const userId = req.user!.userId;
|
||||
const { clientId } = req.params;
|
||||
|
||||
// Validate body
|
||||
const parseResult = AckSchema.safeParse(req.body);
|
||||
if (!parseResult.success) {
|
||||
return reply.status(400).send({
|
||||
error: 'Validation failed',
|
||||
details: parseResult.error.issues,
|
||||
});
|
||||
}
|
||||
|
||||
const { lastSeq } = parseResult.data;
|
||||
const syncService = getSyncService();
|
||||
|
||||
syncService.updateDeviceAck(userId, clientId, lastSeq);
|
||||
|
||||
return reply.send({ acknowledged: true });
|
||||
} catch (err) {
|
||||
Logger.error(`Ack error: ${errorMessage(err)}`);
|
||||
return reply.status(500).send({ error: 'Internal server error' });
|
||||
}
|
||||
},
|
||||
);
|
||||
};
|
||||
|
||||
// Simple UUID v7-like ID generator
|
||||
function generateOpId(): string {
|
||||
const timestamp = Date.now().toString(16).padStart(12, '0');
|
||||
const random = Array.from({ length: 20 }, () =>
|
||||
Math.floor(Math.random() * 16).toString(16),
|
||||
).join('');
|
||||
return `${timestamp}${random}`;
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue