diff --git a/e2e/tests/sync/supersync-snapshot-vector-clock.spec.ts b/e2e/tests/sync/supersync-snapshot-vector-clock.spec.ts new file mode 100644 index 000000000..740c8c265 --- /dev/null +++ b/e2e/tests/sync/supersync-snapshot-vector-clock.spec.ts @@ -0,0 +1,460 @@ +import { test as base, expect } from '@playwright/test'; +import { + createTestUser, + getSuperSyncConfig, + createSimulatedClient, + closeClient, + waitForTask, + isServerHealthy, + type SimulatedE2EClient, +} from '../../utils/supersync-helpers'; + +/** + * SuperSync Snapshot Vector Clock E2E Tests + * + * Tests the snapshot optimization scenario where the server skips returning + * old operations when a SYNC_IMPORT/BACKUP_IMPORT/REPAIR exists. + * + * The fix being tested: When using snapshot optimization, the server returns + * a `snapshotVectorClock` which is an aggregate of all vector clocks from + * skipped operations. This allows fresh clients to create merged updates + * that properly dominate all known clocks. + * + * Without this fix, fresh clients would get stuck in infinite loops because + * their merged updates would never dominate (missing clock entries from + * other clients whose ops were skipped). + */ + +const generateTestRunId = (workerIndex: number): string => { + return `${Date.now()}-${workerIndex}`; +}; + +base.describe('@supersync SuperSync Snapshot Vector Clock', () => { + let serverHealthy: boolean | null = null; + + base.beforeEach(async ({}, testInfo) => { + if (serverHealthy === null) { + serverHealthy = await isServerHealthy(); + if (!serverHealthy) { + console.warn( + 'SuperSync server not healthy at http://localhost:1901 - skipping tests', + ); + } + } + testInfo.skip(!serverHealthy, 'SuperSync server not running'); + }); + + /** + * Scenario 1: Fresh client joins after snapshot optimization + * + * This tests the core fix: a fresh client (Client C) joining after Client A + * has done a full-state sync (SYNC_IMPORT) and Client B has made changes. + * Client C's local changes should sync correctly without infinite loops. + * + * The scenario that caused the bug: + * 1. Client A syncs full state (SYNC_IMPORT at seq 316) + * 2. Client B makes changes (ops 317-319) + * 3. Client C joins fresh (sinceSeq=0) + * 4. Server uses snapshot optimization: skips ops 0-315, returns 316-319 + * 5. Client C makes a change that conflicts with something from before seq 316 + * 6. Without snapshotVectorClock, Client C's merged update can never dominate + * (missing clock entries from skipped ops) -> infinite loop + * + * With the fix, Client C receives snapshotVectorClock and can create + * dominating updates. + */ + base( + 'Fresh client syncs correctly after snapshot optimization (no infinite loop)', + async ({ browser, baseURL }, testInfo) => { + testInfo.setTimeout(120000); + const testRunId = generateTestRunId(testInfo.workerIndex); + const appUrl = baseURL || 'http://localhost:4242'; + let clientA: SimulatedE2EClient | null = null; + let clientB: SimulatedE2EClient | null = null; + let clientC: SimulatedE2EClient | null = null; + + try { + const user = await createTestUser(testRunId); + const syncConfig = getSuperSyncConfig(user); + + // === Phase 1: Client A creates initial data and syncs === + console.log('[SnapshotClock] Phase 1: Client A creates initial data'); + clientA = await createSimulatedClient(browser, appUrl, 'A', testRunId); + await clientA.sync.setupSuperSync(syncConfig); + + // Create several tasks to build up operation history + const taskA1 = `A1-${testRunId}`; + const taskA2 = `A2-${testRunId}`; + const taskA3 = `A3-${testRunId}`; + await clientA.workView.addTask(taskA1); + await waitForTask(clientA.page, taskA1); + await clientA.workView.addTask(taskA2); + await waitForTask(clientA.page, taskA2); + await clientA.workView.addTask(taskA3); + await waitForTask(clientA.page, taskA3); + + // Sync to server + await clientA.sync.syncAndWait(); + console.log('[SnapshotClock] Client A synced initial tasks'); + + // === Phase 2: Client B joins and makes changes === + console.log('[SnapshotClock] Phase 2: Client B joins and makes changes'); + clientB = await createSimulatedClient(browser, appUrl, 'B', testRunId); + await clientB.sync.setupSuperSync(syncConfig); + await clientB.sync.syncAndWait(); + + // Verify B has all tasks + await waitForTask(clientB.page, taskA1); + await waitForTask(clientB.page, taskA2); + await waitForTask(clientB.page, taskA3); + + // Client B creates more tasks (these will be after any potential snapshot) + const taskB1 = `B1-${testRunId}`; + const taskB2 = `B2-${testRunId}`; + await clientB.workView.addTask(taskB1); + await waitForTask(clientB.page, taskB1); + await clientB.workView.addTask(taskB2); + await waitForTask(clientB.page, taskB2); + + // Client B syncs + await clientB.sync.syncAndWait(); + console.log('[SnapshotClock] Client B created and synced tasks'); + + // Client A syncs to get B's changes + await clientA.sync.syncAndWait(); + await waitForTask(clientA.page, taskB1); + await waitForTask(clientA.page, taskB2); + + // === Phase 3: Client C joins fresh (simulates snapshot optimization) === + console.log('[SnapshotClock] Phase 3: Client C joins fresh'); + clientC = await createSimulatedClient(browser, appUrl, 'C', testRunId); + await clientC.sync.setupSuperSync(syncConfig); + + // Initial sync - Client C downloads all data + // Server may use snapshot optimization here depending on server state + await clientC.sync.syncAndWait(); + + // Verify C has all existing tasks + await waitForTask(clientC.page, taskA1); + await waitForTask(clientC.page, taskA2); + await waitForTask(clientC.page, taskA3); + await waitForTask(clientC.page, taskB1); + await waitForTask(clientC.page, taskB2); + console.log('[SnapshotClock] Client C downloaded all tasks'); + + // === Phase 4: Client C makes changes === + // This is the critical part - C's changes need to sync without getting stuck + console.log('[SnapshotClock] Phase 4: Client C makes changes'); + const taskC1 = `C1-${testRunId}`; + const taskC2 = `C2-${testRunId}`; + await clientC.workView.addTask(taskC1); + await waitForTask(clientC.page, taskC1); + await clientC.workView.addTask(taskC2); + await waitForTask(clientC.page, taskC2); + + // Client C syncs - without the fix, this could get stuck in infinite loop + // Set a shorter timeout to catch infinite loops quickly + const syncPromise = clientC.sync.syncAndWait(); + const timeoutPromise = new Promise((_, reject) => + setTimeout( + () => reject(new Error('Sync timed out - possible infinite loop')), + 30000, + ), + ); + + await Promise.race([syncPromise, timeoutPromise]); + console.log('[SnapshotClock] Client C synced successfully (no infinite loop)'); + + // === Phase 5: Verify all clients converge === + console.log('[SnapshotClock] Phase 5: Verifying convergence'); + + // Sync all clients to converge + await clientA.sync.syncAndWait(); + await clientB.sync.syncAndWait(); + await clientC.sync.syncAndWait(); + + // Wait for UI to settle + await clientA.page.waitForTimeout(1000); + await clientB.page.waitForTimeout(1000); + await clientC.page.waitForTimeout(1000); + + // All tasks should be present on all clients + const allTasks = [taskA1, taskA2, taskA3, taskB1, taskB2, taskC1, taskC2]; + + for (const task of allTasks) { + await waitForTask(clientA.page, task); + await waitForTask(clientB.page, task); + await waitForTask(clientC.page, task); + } + + // Count tasks to ensure consistency + const countA = await clientA.page + .locator(`task:has-text("${testRunId}")`) + .count(); + const countB = await clientB.page + .locator(`task:has-text("${testRunId}")`) + .count(); + const countC = await clientC.page + .locator(`task:has-text("${testRunId}")`) + .count(); + + expect(countA).toBe(7); + expect(countB).toBe(7); + expect(countC).toBe(7); + + console.log('[SnapshotClock] ✓ All clients converged with 7 tasks each'); + } finally { + if (clientA) await closeClient(clientA); + if (clientB) await closeClient(clientB); + if (clientC) await closeClient(clientC); + } + }, + ); + + /** + * Scenario 2: Multiple fresh clients joining sequentially + * + * Tests that multiple clients can join fresh and sync correctly, + * each receiving proper snapshotVectorClock values. + */ + base( + 'Multiple fresh clients join and sync correctly after snapshot', + async ({ browser, baseURL }, testInfo) => { + testInfo.setTimeout(180000); + const testRunId = generateTestRunId(testInfo.workerIndex); + const appUrl = baseURL || 'http://localhost:4242'; + let clientA: SimulatedE2EClient | null = null; + let clientB: SimulatedE2EClient | null = null; + let clientC: SimulatedE2EClient | null = null; + let clientD: SimulatedE2EClient | null = null; + + try { + const user = await createTestUser(testRunId); + const syncConfig = getSuperSyncConfig(user); + + // Client A: Creates initial data + clientA = await createSimulatedClient(browser, appUrl, 'A', testRunId); + await clientA.sync.setupSuperSync(syncConfig); + + const taskA1 = `A1-${testRunId}`; + await clientA.workView.addTask(taskA1); + await waitForTask(clientA.page, taskA1); + await clientA.sync.syncAndWait(); + console.log('[MultiClient] Client A created initial task'); + + // Client B: Joins, adds data + clientB = await createSimulatedClient(browser, appUrl, 'B', testRunId); + await clientB.sync.setupSuperSync(syncConfig); + await clientB.sync.syncAndWait(); + await waitForTask(clientB.page, taskA1); + + const taskB1 = `B1-${testRunId}`; + await clientB.workView.addTask(taskB1); + await waitForTask(clientB.page, taskB1); + await clientB.sync.syncAndWait(); + console.log('[MultiClient] Client B joined and added task'); + + // Close Client B to simulate it going offline + await closeClient(clientB); + clientB = null; + + // Client A continues working + const taskA2 = `A2-${testRunId}`; + await clientA.workView.addTask(taskA2); + await waitForTask(clientA.page, taskA2); + await clientA.sync.syncAndWait(); + + // Client C: Joins fresh (never saw Client B's changes locally) + clientC = await createSimulatedClient(browser, appUrl, 'C', testRunId); + await clientC.sync.setupSuperSync(syncConfig); + await clientC.sync.syncAndWait(); + + // C should have all tasks from A and B + await waitForTask(clientC.page, taskA1); + await waitForTask(clientC.page, taskA2); + await waitForTask(clientC.page, taskB1); + + // C adds its own task + const taskC1 = `C1-${testRunId}`; + await clientC.workView.addTask(taskC1); + await waitForTask(clientC.page, taskC1); + await clientC.sync.syncAndWait(); + console.log('[MultiClient] Client C joined and added task'); + + // Client D: Another fresh client joins + clientD = await createSimulatedClient(browser, appUrl, 'D', testRunId); + await clientD.sync.setupSuperSync(syncConfig); + await clientD.sync.syncAndWait(); + + // D should have all tasks + await waitForTask(clientD.page, taskA1); + await waitForTask(clientD.page, taskA2); + await waitForTask(clientD.page, taskB1); + await waitForTask(clientD.page, taskC1); + + // D adds its own task + const taskD1 = `D1-${testRunId}`; + await clientD.workView.addTask(taskD1); + await waitForTask(clientD.page, taskD1); + await clientD.sync.syncAndWait(); + console.log('[MultiClient] Client D joined and added task'); + + // Final sync for all active clients + await clientA.sync.syncAndWait(); + await clientC.sync.syncAndWait(); + await clientD.sync.syncAndWait(); + + // Verify all active clients have all 5 tasks + const allTasks = [taskA1, taskA2, taskB1, taskC1, taskD1]; + + for (const task of allTasks) { + await waitForTask(clientA.page, task); + await waitForTask(clientC.page, task); + await waitForTask(clientD.page, task); + } + + const countA = await clientA.page + .locator(`task:has-text("${testRunId}")`) + .count(); + const countC = await clientC.page + .locator(`task:has-text("${testRunId}")`) + .count(); + const countD = await clientD.page + .locator(`task:has-text("${testRunId}")`) + .count(); + + expect(countA).toBe(5); + expect(countC).toBe(5); + expect(countD).toBe(5); + + console.log('[MultiClient] ✓ All clients converged with 5 tasks each'); + } finally { + if (clientA) await closeClient(clientA); + if (clientB) await closeClient(clientB); + if (clientC) await closeClient(clientC); + if (clientD) await closeClient(clientD); + } + }, + ); + + /** + * Scenario 3: Fresh client with concurrent modifications + * + * Tests the scenario where a fresh client joins and immediately + * has concurrent modifications with an existing client. This is + * the most likely scenario to trigger the infinite loop bug. + */ + base( + 'Fresh client handles concurrent modifications after snapshot', + async ({ browser, baseURL }, testInfo) => { + testInfo.setTimeout(120000); + const testRunId = generateTestRunId(testInfo.workerIndex); + const appUrl = baseURL || 'http://localhost:4242'; + let clientA: SimulatedE2EClient | null = null; + let clientB: SimulatedE2EClient | null = null; + + try { + const user = await createTestUser(testRunId); + const syncConfig = getSuperSyncConfig(user); + + // Client A: Establishes baseline + clientA = await createSimulatedClient(browser, appUrl, 'A', testRunId); + await clientA.sync.setupSuperSync(syncConfig); + + // Create initial tasks + const sharedTask = `Shared-${testRunId}`; + await clientA.workView.addTask(sharedTask); + await waitForTask(clientA.page, sharedTask); + await clientA.sync.syncAndWait(); + console.log('[ConcurrentFresh] Client A created shared task'); + + // Add more operations to build up history + for (let i = 0; i < 5; i++) { + await clientA.workView.addTask(`Filler-${i}-${testRunId}`); + } + await clientA.sync.syncAndWait(); + console.log('[ConcurrentFresh] Client A added filler tasks'); + + // Client B: Joins fresh + clientB = await createSimulatedClient(browser, appUrl, 'B', testRunId); + await clientB.sync.setupSuperSync(syncConfig); + await clientB.sync.syncAndWait(); + + // Verify B has all tasks + await waitForTask(clientB.page, sharedTask); + console.log('[ConcurrentFresh] Client B joined and synced'); + + // Now create concurrent modifications + // Client A: Marks shared task as done + const taskLocatorA = clientA.page + .locator(`task:not(.ng-animating):has-text("${sharedTask}")`) + .first(); + await taskLocatorA.hover(); + await taskLocatorA.locator('.task-done-btn').click(); + await expect(taskLocatorA).toHaveClass(/isDone/, { timeout: 5000 }); + console.log('[ConcurrentFresh] Client A marked shared task done'); + + // Client B: Also marks shared task as done (concurrent change) + const taskLocatorB = clientB.page + .locator(`task:not(.ng-animating):has-text("${sharedTask}")`) + .first(); + await taskLocatorB.hover(); + await taskLocatorB.locator('.task-done-btn').click(); + await expect(taskLocatorB).toHaveClass(/isDone/, { timeout: 5000 }); + console.log('[ConcurrentFresh] Client B marked shared task done (concurrent)'); + + // Client A syncs first + await clientA.sync.syncAndWait(); + + // Client B syncs - this is where the infinite loop could occur + // without the snapshotVectorClock fix + const syncPromise = clientB.sync.syncAndWait(); + const timeoutPromise = new Promise((_, reject) => + setTimeout( + () => reject(new Error('Sync timed out - possible infinite loop')), + 30000, + ), + ); + + await Promise.race([syncPromise, timeoutPromise]); + console.log('[ConcurrentFresh] Client B synced without infinite loop'); + + // Final sync to ensure convergence + await clientA.sync.syncAndWait(); + await clientB.sync.syncAndWait(); + + // Wait for UI to settle + await clientA.page.waitForTimeout(500); + await clientB.page.waitForTimeout(500); + + // Verify both clients show the shared task as done + await waitForTask(clientA.page, sharedTask); + await waitForTask(clientB.page, sharedTask); + + const finalTaskA = clientA.page + .locator(`task:not(.ng-animating):has-text("${sharedTask}")`) + .first(); + const finalTaskB = clientB.page + .locator(`task:not(.ng-animating):has-text("${sharedTask}")`) + .first(); + + await expect(finalTaskA).toHaveClass(/isDone/, { timeout: 5000 }); + await expect(finalTaskB).toHaveClass(/isDone/, { timeout: 5000 }); + + // Verify task counts match + const countA = await clientA.page + .locator(`task:has-text("${testRunId}")`) + .count(); + const countB = await clientB.page + .locator(`task:has-text("${testRunId}")`) + .count(); + expect(countA).toBe(countB); + + console.log('[ConcurrentFresh] ✓ Concurrent modifications resolved correctly'); + } finally { + if (clientA) await closeClient(clientA); + if (clientB) await closeClient(clientB); + } + }, + ); +}); diff --git a/packages/super-sync-server/Dockerfile b/packages/super-sync-server/Dockerfile index 4d9fd0b87..62ce32f3c 100644 --- a/packages/super-sync-server/Dockerfile +++ b/packages/super-sync-server/Dockerfile @@ -39,7 +39,7 @@ RUN apk add --no-cache openssl libc6-compat wget && \ WORKDIR /app -# Copy built artifacts from builder +# Copy built artifacts from builder (note: tsconfig rootDir is "." so structure is dist/src/ and dist/scripts/) COPY --from=builder --chown=supersync:nodejs /repo/packages/super-sync-server/dist ./dist COPY --from=builder --chown=supersync:nodejs /repo/packages/super-sync-server/public ./public COPY --from=builder --chown=supersync:nodejs /repo/packages/super-sync-server/prisma ./prisma @@ -64,4 +64,4 @@ ENV DATA_DIR=/app/data HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \ CMD wget --no-verbose --tries=1 --spider http://localhost:${PORT}/health || exit 1 -CMD ["sh", "-c", "npx prisma db push --skip-generate && node dist/index.js"] \ No newline at end of file +CMD ["sh", "-c", "npx prisma db push --skip-generate && node dist/src/index.js"] \ No newline at end of file diff --git a/packages/super-sync-server/package.json b/packages/super-sync-server/package.json index 551776d03..2a80fefe1 100644 --- a/packages/super-sync-server/package.json +++ b/packages/super-sync-server/package.json @@ -2,10 +2,10 @@ "name": "@super-productivity/super-sync-server", "version": "1.0.0", "description": "SuperSync server for Super Productivity", - "main": "dist/index.js", + "main": "dist/src/index.js", "scripts": { "build": "tsc", - "start": "node dist/index.js", + "start": "node dist/src/index.js", "dev": "nodemon --watch src --ext ts --exec ts-node src/index.ts", "test": "vitest run", "docker:build": "./scripts/build-and-push.sh", @@ -15,7 +15,8 @@ "docker:logs": "docker logs supersync-server -f", "delete-user": "ts-node scripts/delete-user.ts", "clear-data": "ts-node scripts/clear-data.ts", - "monitor": "ts-node scripts/monitor.ts" + "monitor": "node dist/scripts/monitor.js", + "monitor:dev": "ts-node scripts/monitor.ts" }, "dependencies": { "@fastify/cors": "^11.1.0", diff --git a/packages/super-sync-server/src/sync/sync.routes.ts b/packages/super-sync-server/src/sync/sync.routes.ts index dc52b3e6f..20e3afa14 100644 --- a/packages/super-sync-server/src/sync/sync.routes.ts +++ b/packages/super-sync-server/src/sync/sync.routes.ts @@ -329,7 +329,7 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise => { // Use atomic read to get ops and latestSeq in one transaction // This prevents race conditions where new ops arrive between the two reads - const { ops, latestSeq, gapDetected, latestSnapshotSeq } = + const { ops, latestSeq, gapDetected, latestSnapshotSeq, snapshotVectorClock } = await syncService.getOpsSinceWithSeq( userId, sinceSeq, @@ -358,6 +358,7 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise => { latestSeq, gapDetected: gapDetected || undefined, // Only include if true latestSnapshotSeq, // Optimization: tells client where effective state starts + snapshotVectorClock, // Aggregated clock from skipped ops for conflict resolution }; return reply.send(response); diff --git a/packages/super-sync-server/src/sync/sync.service.ts b/packages/super-sync-server/src/sync/sync.service.ts index 56ccc1867..446360e29 100644 --- a/packages/super-sync-server/src/sync/sync.service.ts +++ b/packages/super-sync-server/src/sync/sync.service.ts @@ -547,6 +547,7 @@ export class SyncService { latestSeq: number; gapDetected: boolean; latestSnapshotSeq?: number; + snapshotVectorClock?: VectorClock; }> { return prisma.$transaction(async (tx) => { // Find the latest full-state operation (SYNC_IMPORT, BACKUP_IMPORT, REPAIR) @@ -566,6 +567,8 @@ export class SyncService { // start from the full-state op instead. Pre-import ops are superseded and will // be filtered out by the client anyway. let effectiveSinceSeq = sinceSeq; + let snapshotVectorClock: VectorClock | undefined; + if (latestSnapshotSeq !== undefined && sinceSeq < latestSnapshotSeq) { // Start from one before the snapshot so it's included in results effectiveSinceSeq = latestSnapshotSeq - 1; @@ -573,6 +576,35 @@ export class SyncService { `[user:${userId}] Optimized download: skipping from sinceSeq=${sinceSeq} to ${effectiveSinceSeq} ` + `(latest snapshot at seq ${latestSnapshotSeq})`, ); + + // Compute aggregated vector clock from all ops up to and including the snapshot. + // This ensures clients know about all clock entries from skipped ops. + const skippedOps = await tx.operation.findMany({ + where: { + userId, + serverSeq: { lte: latestSnapshotSeq }, + }, + select: { vectorClock: true }, + }); + + snapshotVectorClock = {}; + for (const op of skippedOps) { + const clock = op.vectorClock as unknown as VectorClock; + if (clock && typeof clock === 'object') { + for (const [clientId, value] of Object.entries(clock)) { + if (typeof value === 'number') { + snapshotVectorClock[clientId] = Math.max( + snapshotVectorClock[clientId] ?? 0, + value, + ); + } + } + } + } + + Logger.info( + `[user:${userId}] Computed snapshotVectorClock from ${skippedOps.length} ops: ${JSON.stringify(snapshotVectorClock)}`, + ); } const ops = await tx.operation.findMany({ @@ -661,7 +693,7 @@ export class SyncService { receivedAt: Number(row.receivedAt), })); - return { ops: mappedOps, latestSeq, gapDetected, latestSnapshotSeq }; + return { ops: mappedOps, latestSeq, gapDetected, latestSnapshotSeq, snapshotVectorClock }; }); } diff --git a/packages/super-sync-server/src/sync/sync.types.ts b/packages/super-sync-server/src/sync/sync.types.ts index 7e9fd8316..072aca10a 100644 --- a/packages/super-sync-server/src/sync/sync.types.ts +++ b/packages/super-sync-server/src/sync/sync.types.ts @@ -203,6 +203,12 @@ export interface DownloadOpsResponse { * Operations before this seq are superseded by the full-state operation. */ latestSnapshotSeq?: number; + /** + * Aggregated vector clock from all ops before and including the snapshot. + * Only set when snapshot optimization is used (sinceSeq < latestSnapshotSeq). + * Clients need this to create merged updates that dominate all known clocks. + */ + snapshotVectorClock?: VectorClock; } // Snapshot types diff --git a/packages/super-sync-server/tsconfig.json b/packages/super-sync-server/tsconfig.json index 1a12b4e79..855d67547 100644 --- a/packages/super-sync-server/tsconfig.json +++ b/packages/super-sync-server/tsconfig.json @@ -3,11 +3,11 @@ "target": "ES2020", "module": "commonjs", "outDir": "./dist", - "rootDir": "./src", + "rootDir": ".", "strict": true, "esModuleInterop": true, "skipLibCheck": true, "forceConsistentCasingInFileNames": true }, - "include": ["src/**/*"] + "include": ["src/**/*", "scripts/**/*"] } diff --git a/src/app/core/persistence/operation-log/store/operation-log-store.service.ts b/src/app/core/persistence/operation-log/store/operation-log-store.service.ts index ad4478297..8894eb3f5 100644 --- a/src/app/core/persistence/operation-log/store/operation-log-store.service.ts +++ b/src/app/core/persistence/operation-log/store/operation-log-store.service.ts @@ -350,6 +350,7 @@ export class OperationLogStoreService { async markRejected(opIds: string[]): Promise { await this._ensureInit(); + const tx = this.db.transaction('ops', 'readwrite'); const store = tx.objectStore('ops'); const index = store.index('byId'); diff --git a/src/app/core/persistence/operation-log/sync/operation-log-download.service.spec.ts b/src/app/core/persistence/operation-log/sync/operation-log-download.service.spec.ts index 96590ee5f..f780bc482 100644 --- a/src/app/core/persistence/operation-log/sync/operation-log-download.service.spec.ts +++ b/src/app/core/persistence/operation-log/sync/operation-log-download.service.spec.ts @@ -577,6 +577,227 @@ describe('OperationLogDownloadService', () => { }); }); + describe('snapshotVectorClock handling', () => { + it('should capture snapshotVectorClock from first response', async () => { + const snapshotClock = { clientA: 10, clientB: 5, clientC: 3 }; + mockApiProvider.downloadOps.and.returnValue( + Promise.resolve({ + ops: [ + { + serverSeq: 317, + receivedAt: Date.now(), + op: { + id: 'op-317', + clientId: 'c1', + actionType: '[Task] Add', + opType: OpType.Create, + entityType: 'TASK', + payload: {}, + vectorClock: { c1: 1 }, + timestamp: Date.now(), + schemaVersion: 1, + }, + }, + ], + hasMore: false, + latestSeq: 317, + snapshotVectorClock: snapshotClock, + }), + ); + + const result = await service.downloadRemoteOps(mockApiProvider); + + expect(result.snapshotVectorClock).toEqual(snapshotClock); + }); + + it('should return snapshotVectorClock even when no new ops', async () => { + const snapshotClock = { clientA: 10, clientB: 5 }; + mockApiProvider.downloadOps.and.returnValue( + Promise.resolve({ + ops: [], + hasMore: false, + latestSeq: 316, + snapshotVectorClock: snapshotClock, + }), + ); + + const result = await service.downloadRemoteOps(mockApiProvider); + + expect(result.snapshotVectorClock).toEqual(snapshotClock); + expect(result.newOps.length).toBe(0); + }); + + it('should capture snapshotVectorClock from first page in paginated download', async () => { + const snapshotClock = { clientA: 10, clientB: 5 }; + mockApiProvider.downloadOps.and.returnValues( + // First page - has snapshotVectorClock + Promise.resolve({ + ops: [ + { + serverSeq: 317, + receivedAt: Date.now(), + op: { + id: 'op-317', + clientId: 'c1', + actionType: '[Task] Add', + opType: OpType.Create, + entityType: 'TASK', + payload: {}, + vectorClock: { c1: 1 }, + timestamp: Date.now(), + schemaVersion: 1, + }, + }, + ], + hasMore: true, + latestSeq: 320, + snapshotVectorClock: snapshotClock, + }), + // Second page - no snapshotVectorClock (only first response has it) + Promise.resolve({ + ops: [ + { + serverSeq: 318, + receivedAt: Date.now(), + op: { + id: 'op-318', + clientId: 'c2', + actionType: '[Task] Update', + opType: OpType.Update, + entityType: 'TASK', + payload: {}, + vectorClock: { c2: 1 }, + timestamp: Date.now(), + schemaVersion: 1, + }, + }, + ], + hasMore: false, + latestSeq: 320, + }), + ); + + const result = await service.downloadRemoteOps(mockApiProvider); + + // Should have captured snapshotVectorClock from first response + expect(result.snapshotVectorClock).toEqual(snapshotClock); + expect(result.newOps.length).toBe(2); + }); + + it('should not have snapshotVectorClock when server does not send it', async () => { + mockApiProvider.downloadOps.and.returnValue( + Promise.resolve({ + ops: [ + { + serverSeq: 1, + receivedAt: Date.now(), + op: { + id: 'op-1', + clientId: 'c1', + actionType: '[Task] Add', + opType: OpType.Create, + entityType: 'TASK', + payload: {}, + vectorClock: { c1: 1 }, + timestamp: Date.now(), + schemaVersion: 1, + }, + }, + ], + hasMore: false, + latestSeq: 1, + // No snapshotVectorClock + }), + ); + + const result = await service.downloadRemoteOps(mockApiProvider); + + expect(result.snapshotVectorClock).toBeUndefined(); + }); + + it('should clear snapshotVectorClock when gap is detected and re-downloading', async () => { + const staleSnapshotClock = { clientA: 5 }; + const freshSnapshotClock = { clientA: 10, clientB: 3 }; + + mockApiProvider.getLastServerSeq.and.returnValue(Promise.resolve(100)); + mockApiProvider.downloadOps.and.returnValues( + // First response: gap detected with stale clock + Promise.resolve({ + ops: [], + hasMore: false, + latestSeq: 50, + gapDetected: true, + snapshotVectorClock: staleSnapshotClock, + }), + // After reset: fresh clock + Promise.resolve({ + ops: [ + { + serverSeq: 1, + receivedAt: Date.now(), + op: { + id: 'op-1', + clientId: 'c1', + actionType: '[Task] Add', + opType: OpType.Create, + entityType: 'TASK', + payload: {}, + vectorClock: { c1: 1 }, + timestamp: Date.now(), + schemaVersion: 1, + }, + }, + ], + hasMore: false, + latestSeq: 50, + snapshotVectorClock: freshSnapshotClock, + }), + ); + + const result = await service.downloadRemoteOps(mockApiProvider); + + // Should have the fresh clock from after the reset + expect(result.snapshotVectorClock).toEqual(freshSnapshotClock); + }); + + it('should work with forceFromSeq0 option and return snapshotVectorClock', async () => { + const snapshotClock = { clientA: 100, clientB: 50 }; + mockApiProvider.getLastServerSeq.and.returnValue(Promise.resolve(500)); + mockApiProvider.downloadOps.and.returnValue( + Promise.resolve({ + ops: [ + { + serverSeq: 316, + receivedAt: Date.now(), + op: { + id: 'op-316', + clientId: 'c1', + actionType: '[All] Sync Import', + opType: 'SYNC_IMPORT' as OpType, + entityType: 'ALL', + payload: {}, + vectorClock: snapshotClock, + timestamp: Date.now(), + schemaVersion: 1, + }, + }, + ], + hasMore: false, + latestSeq: 316, + snapshotVectorClock: snapshotClock, + }), + ); + + const result = await service.downloadRemoteOps(mockApiProvider, { + forceFromSeq0: true, + }); + + expect(result.snapshotVectorClock).toEqual(snapshotClock); + expect(result.allOpClocks).toBeDefined(); + expect(result.allOpClocks!.length).toBe(1); + }); + }); + describe('gap detection handling', () => { it('should reset lastServerSeq to 0 when gap is detected', async () => { // First call returns gap detected (client has stale sinceSeq) diff --git a/src/app/core/persistence/operation-log/sync/operation-log-download.service.ts b/src/app/core/persistence/operation-log/sync/operation-log-download.service.ts index 16837d2f3..82b28c944 100644 --- a/src/app/core/persistence/operation-log/sync/operation-log-download.service.ts +++ b/src/app/core/persistence/operation-log/sync/operation-log-download.service.ts @@ -50,6 +50,12 @@ export interface DownloadResult { * vector clock state from all known ops on the server. */ allOpClocks?: import('../operation.types').VectorClock[]; + /** + * Aggregated vector clock from all ops before and including the snapshot. + * Only set when snapshot optimization is used (sinceSeq < latestSnapshotSeq). + * Clients need this to create merged updates that dominate all known clocks. + */ + snapshotVectorClock?: import('../operation.types').VectorClock; } /** @@ -116,6 +122,7 @@ export class OperationLogDownloadService { let downloadFailed = false; let needsFullStateUpload = false; let finalLatestSeq = 0; + let snapshotVectorClock: import('../operation.types').VectorClock | undefined; // Get encryption key upfront const privateCfg = @@ -152,6 +159,14 @@ export class OperationLogDownloadService { const response = await syncProvider.downloadOps(sinceSeq, undefined, 500); finalLatestSeq = response.latestSeq; + // Capture snapshot vector clock from first response (only present when snapshot optimization used) + if (!snapshotVectorClock && response.snapshotVectorClock) { + snapshotVectorClock = response.snapshotVectorClock; + OpLog.normal( + `OperationLogDownloadService: Received snapshotVectorClock with ${Object.keys(snapshotVectorClock).length} entries`, + ); + } + // Handle gap detection: server was reset or client has stale lastServerSeq if (response.gapDetected && !hasResetForGap) { OpLog.warn( @@ -163,6 +178,7 @@ export class OperationLogDownloadService { hasResetForGap = true; allNewOps.length = 0; // Clear any ops we may have accumulated allOpClocks.length = 0; // Clear clocks too + snapshotVectorClock = undefined; // Clear snapshot clock to capture fresh one after reset // NOTE: Don't persist lastServerSeq=0 here - caller will persist the final value // after ops are stored in IndexedDB. This ensures localStorage and IndexedDB stay in sync. continue; @@ -301,6 +317,8 @@ export class OperationLogDownloadService { latestServerSeq: finalLatestSeq, // Include all op clocks when force downloading from seq 0 ...(forceFromSeq0 && allOpClocks.length > 0 ? { allOpClocks } : {}), + // Include snapshot vector clock when snapshot optimization was used + ...(snapshotVectorClock ? { snapshotVectorClock } : {}), }; } diff --git a/src/app/core/persistence/operation-log/sync/operation-log-sync.service.spec.ts b/src/app/core/persistence/operation-log/sync/operation-log-sync.service.spec.ts index 835e3586e..be4e6d063 100644 --- a/src/app/core/persistence/operation-log/sync/operation-log-sync.service.spec.ts +++ b/src/app/core/persistence/operation-log/sync/operation-log-sync.service.spec.ts @@ -3035,9 +3035,239 @@ describe('OperationLogSyncService', () => { await service.uploadPendingOps(mockProvider); // Verify _resolveStaleLocalOps was called with the clocks from force download + // Third arg is snapshotVectorClock (undefined in this test) expect(resolveStaleOpsSpy).toHaveBeenCalledWith( jasmine.any(Array), forceDownloadClocks, + undefined, + ); + }); + + it('should pass snapshotVectorClock to _resolveStaleLocalOps when present', async () => { + const localOp: Operation = { + id: 'local-op-1', + clientId: 'client-A', + actionType: 'test', + opType: OpType.Update, + entityType: 'TAG', + entityId: 'TODAY', + payload: { taskIds: ['task-1'] }, + vectorClock: { clientA: 1 }, + timestamp: Date.now(), + schemaVersion: 1, + }; + + uploadServiceSpy.uploadPendingOps.and.returnValue( + Promise.resolve({ + uploadedCount: 0, + piggybackedOps: [], + rejectedCount: 1, + rejectedOps: [ + { + opId: 'local-op-1', + error: 'Concurrent modification detected for TAG:TODAY', + }, + ], + }), + ); + + opLogStoreSpy.getOpById.and.returnValue( + Promise.resolve({ + seq: 1, + op: localOp, + appliedAt: Date.now(), + source: 'local' as const, + }), + ); + opLogStoreSpy.getUnsynced.and.returnValue( + Promise.resolve([ + { seq: 1, op: localOp, appliedAt: Date.now(), source: 'local' as const }, + ]), + ); + + const snapshotVectorClock = { clientA: 10, clientB: 5, clientC: 3 }; + + let downloadCallCount = 0; + spyOn(service, 'downloadRemoteOps').and.callFake(() => { + downloadCallCount++; + // First download returns no new ops but has snapshotVectorClock + return Promise.resolve({ + serverMigrationHandled: false, + localWinOpsCreated: 0, + newOpsCount: 0, + snapshotVectorClock, + }); + }); + + // Spy on _resolveStaleLocalOps to verify it receives snapshotVectorClock + const resolveStaleOpsSpy = spyOn( + service, + '_resolveStaleLocalOps', + ).and.returnValue(Promise.resolve(0)); + + await service.uploadPendingOps(mockProvider); + + // Verify _resolveStaleLocalOps was called with snapshotVectorClock + expect(resolveStaleOpsSpy).toHaveBeenCalledWith( + jasmine.any(Array), + undefined, // No allOpClocks from regular download + snapshotVectorClock, + ); + }); + + it('should pass both allOpClocks and snapshotVectorClock to _resolveStaleLocalOps', async () => { + const localOp: Operation = { + id: 'local-op-1', + clientId: 'client-A', + actionType: 'test', + opType: OpType.Update, + entityType: 'TAG', + entityId: 'TODAY', + payload: { taskIds: ['task-1'] }, + vectorClock: { clientA: 1 }, + timestamp: Date.now(), + schemaVersion: 1, + }; + + uploadServiceSpy.uploadPendingOps.and.returnValue( + Promise.resolve({ + uploadedCount: 0, + piggybackedOps: [], + rejectedCount: 1, + rejectedOps: [ + { + opId: 'local-op-1', + error: 'Concurrent modification detected for TAG:TODAY', + }, + ], + }), + ); + + opLogStoreSpy.getOpById.and.returnValue( + Promise.resolve({ + seq: 1, + op: localOp, + appliedAt: Date.now(), + source: 'local' as const, + }), + ); + opLogStoreSpy.getUnsynced.and.returnValue( + Promise.resolve([ + { seq: 1, op: localOp, appliedAt: Date.now(), source: 'local' as const }, + ]), + ); + + const allOpClocks = [ + { clientA: 3, clientB: 10 }, + { clientA: 5, clientC: 2 }, + ]; + const snapshotVectorClock = { clientA: 15, clientB: 8, clientD: 1 }; + + let downloadCallCount = 0; + spyOn(service, 'downloadRemoteOps').and.callFake(() => { + downloadCallCount++; + if (downloadCallCount === 1) { + // First download returns no ops (triggers force download) + return Promise.resolve({ + serverMigrationHandled: false, + localWinOpsCreated: 0, + newOpsCount: 0, + }); + } else { + // Force download returns both allOpClocks and snapshotVectorClock + return Promise.resolve({ + serverMigrationHandled: false, + localWinOpsCreated: 0, + newOpsCount: 0, + allOpClocks, + snapshotVectorClock, + }); + } + }); + + // Spy on _resolveStaleLocalOps to verify it receives both + const resolveStaleOpsSpy = spyOn( + service, + '_resolveStaleLocalOps', + ).and.returnValue(Promise.resolve(0)); + + await service.uploadPendingOps(mockProvider); + + // Verify _resolveStaleLocalOps was called with both allOpClocks and snapshotVectorClock + expect(resolveStaleOpsSpy).toHaveBeenCalledWith( + jasmine.any(Array), + allOpClocks, + snapshotVectorClock, + ); + }); + + it('should use snapshotVectorClock when download returns new ops but concurrent ops still pending', async () => { + const localOp: Operation = { + id: 'local-op-1', + clientId: 'client-A', + actionType: 'test', + opType: OpType.Update, + entityType: 'TIME_TRACKING', // Different entity type than downloaded op + entityId: 'tt-123', + payload: { data: 'test' }, + vectorClock: { clientA: 1 }, + timestamp: Date.now(), + schemaVersion: 1, + }; + + uploadServiceSpy.uploadPendingOps.and.returnValue( + Promise.resolve({ + uploadedCount: 0, + piggybackedOps: [], + rejectedCount: 1, + rejectedOps: [ + { + opId: 'local-op-1', + error: 'Concurrent modification detected for TIME_TRACKING:tt-123', + }, + ], + }), + ); + + opLogStoreSpy.getOpById.and.returnValue( + Promise.resolve({ + seq: 1, + op: localOp, + appliedAt: Date.now(), + source: 'local' as const, + }), + ); + opLogStoreSpy.getUnsynced.and.returnValue( + Promise.resolve([ + { seq: 1, op: localOp, appliedAt: Date.now(), source: 'local' as const }, + ]), + ); + + const snapshotVectorClock = { clientA: 20, clientB: 15 }; + + // Download returns new ops for a different entity (TASK), but TIME_TRACKING op still pending + spyOn(service, 'downloadRemoteOps').and.returnValue( + Promise.resolve({ + serverMigrationHandled: false, + localWinOpsCreated: 0, + newOpsCount: 5, // Got new ops, but for different entity + snapshotVectorClock, + }), + ); + + // Spy on _resolveStaleLocalOps + const resolveStaleOpsSpy = spyOn( + service, + '_resolveStaleLocalOps', + ).and.returnValue(Promise.resolve(0)); + + await service.uploadPendingOps(mockProvider); + + // Verify _resolveStaleLocalOps was called with snapshotVectorClock + expect(resolveStaleOpsSpy).toHaveBeenCalledWith( + jasmine.any(Array), + undefined, + snapshotVectorClock, ); }); }); diff --git a/src/app/core/persistence/operation-log/sync/operation-log-sync.service.ts b/src/app/core/persistence/operation-log/sync/operation-log-sync.service.ts index 92540e979..e42db02c6 100644 --- a/src/app/core/persistence/operation-log/sync/operation-log-sync.service.ts +++ b/src/app/core/persistence/operation-log/sync/operation-log-sync.service.ts @@ -378,6 +378,17 @@ export class OperationLogSyncService { mergedOpsCreated += await this._resolveStaleLocalOps( stillPendingOps, forceDownloadResult.allOpClocks, + forceDownloadResult.snapshotVectorClock, + ); + } else if (forceDownloadResult.snapshotVectorClock) { + // Force download returned no individual clocks but we have snapshot clock + OpLog.normal( + `OperationLogSyncService: Using snapshotVectorClock from force download`, + ); + mergedOpsCreated += await this._resolveStaleLocalOps( + stillPendingOps, + undefined, + forceDownloadResult.snapshotVectorClock, ); } else { // Force download returned no clocks but we have concurrent ops. @@ -407,7 +418,11 @@ export class OperationLogSyncService { `OperationLogSyncService: Download got ${downloadResult.newOpsCount} ops but ${stillPendingOps.length} ` + `concurrent ops still pending. Resolving locally with merged clocks...`, ); - mergedOpsCreated += await this._resolveStaleLocalOps(stillPendingOps); + mergedOpsCreated += await this._resolveStaleLocalOps( + stillPendingOps, + undefined, + downloadResult.snapshotVectorClock, + ); } } } catch (e) { @@ -433,11 +448,13 @@ export class OperationLogSyncService { * * @param staleOps - Operations that were rejected due to concurrent modification * @param extraClocks - Additional clocks to merge (from force download) + * @param snapshotVectorClock - Aggregated clock from snapshot optimization (if available) * @returns Number of merged ops created */ private async _resolveStaleLocalOps( staleOps: Array<{ opId: string; op: Operation }>, extraClocks?: VectorClock[], + snapshotVectorClock?: VectorClock, ): Promise { const clientId = await this._getPfapiService().pf.metaModel.loadClientId(); if (!clientId) { @@ -449,6 +466,15 @@ export class OperationLogSyncService { // This ensures we have all known clocks, not just entity-specific ones let globalClock = await this.vectorClockService.getCurrentVectorClock(); + // Merge snapshot vector clock if available (from server's snapshot optimization) + // This ensures we have the clocks from ops that were skipped during download + if (snapshotVectorClock && Object.keys(snapshotVectorClock).length > 0) { + OpLog.normal( + `OperationLogSyncService: Merging snapshotVectorClock with ${Object.keys(snapshotVectorClock).length} entries`, + ); + globalClock = mergeVectorClocks(globalClock, snapshotVectorClock); + } + // If extra clocks were provided (from force download), merge them all // This helps recover from situations where our local clock is missing entries if (extraClocks && extraClocks.length > 0) { @@ -583,6 +609,7 @@ export class OperationLogSyncService { localWinOpsCreated: number; newOpsCount: number; allOpClocks?: VectorClock[]; + snapshotVectorClock?: VectorClock; }> { const result = await this.downloadService.downloadRemoteOps(syncProvider, options); @@ -614,6 +641,8 @@ export class OperationLogSyncService { newOpsCount: 0, // Include all op clocks from forced download (even though no new ops) allOpClocks: result.allOpClocks, + // Include snapshot vector clock for stale op resolution + snapshotVectorClock: result.snapshotVectorClock, }; } @@ -662,6 +691,7 @@ export class OperationLogSyncService { localWinOpsCreated: processResult.localWinOpsCreated, newOpsCount: result.newOps.length, allOpClocks: result.allOpClocks, + snapshotVectorClock: result.snapshotVectorClock, }; } diff --git a/src/app/features/config/store/global-config.reducer.ts b/src/app/features/config/store/global-config.reducer.ts index 2cd5a580e..f206da9be 100644 --- a/src/app/features/config/store/global-config.reducer.ts +++ b/src/app/features/config/store/global-config.reducer.ts @@ -97,13 +97,6 @@ export const globalConfigReducer = createReducer( const syncProvider = oldState.sync.syncProvider ?? appDataComplete.globalConfig.sync.syncProvider; - // Debug logging for hot reload investigation - console.warn('[HotReloadDebug] loadAllData reducer:', { - oldSyncProvider: oldState.sync.syncProvider, - newSyncProvider: appDataComplete.globalConfig?.sync?.syncProvider, - resolvedProvider: syncProvider, - }); - return { ...appDataComplete.globalConfig, sync: { diff --git a/src/app/pfapi/api/sync/sync-provider.interface.ts b/src/app/pfapi/api/sync/sync-provider.interface.ts index 22288df57..2919c853d 100644 --- a/src/app/pfapi/api/sync/sync-provider.interface.ts +++ b/src/app/pfapi/api/sync/sync-provider.interface.ts @@ -191,6 +191,12 @@ export interface OpDownloadResponse { * When true, the client should reset lastServerSeq to 0 and re-download. */ gapDetected?: boolean; + /** + * Aggregated vector clock from all ops before and including the snapshot. + * Only set when snapshot optimization is used (sinceSeq < latestSnapshotSeq). + * Clients need this to create merged updates that dominate all known clocks. + */ + snapshotVectorClock?: Record; } /** diff --git a/src/app/pfapi/pfapi.service.ts b/src/app/pfapi/pfapi.service.ts index 84de5a9f7..b8036f65b 100644 --- a/src/app/pfapi/pfapi.service.ts +++ b/src/app/pfapi/pfapi.service.ts @@ -136,31 +136,13 @@ export class PfapiService { : null; const currentProvider = this.pf.getActiveSyncProvider(); - // Debug logging for hot reload investigation - PFLog.warn('[HotReloadDebug] syncConfig subscription fired:', { - isEnabled: cfg.isEnabled, - newSyncProvider: newProviderId, - currentActiveProvider: currentProvider?.id || null, - }); - // Guard against hot reload race condition: // Don't clear provider if we had a valid one and the new config has sync disabled // This protects against temporary state resets during HMR if (currentProvider && !newProviderId) { - PFLog.warn( - '[HotReloadDebug] Ignoring attempt to clear sync provider - likely hot reload race condition', - ); return; } - // Log provider changes for debugging - if (currentProvider && newProviderId && currentProvider.id !== newProviderId) { - PFLog.warn('[HotReloadDebug] Provider change detected:', { - from: currentProvider.id, - to: newProviderId, - }); - } - this.pf.setActiveSyncProvider(newProviderId); if (cfg.isEnabled) { this.pf.setEncryptAndCompressCfg({