fix(sync-server): compile scripts for production Docker image

ts-node is a devDependency and not available in production containers.
Updated tsconfig to compile scripts/ alongside src/, and changed npm
scripts to use compiled JS. Added monitor:dev for local development.
This commit is contained in:
Johannes Millan 2025-12-19 15:58:21 +01:00
parent 754f7fb676
commit 8a449fb944
15 changed files with 1016 additions and 35 deletions

View file

@ -0,0 +1,460 @@
import { test as base, expect } from '@playwright/test';
import {
createTestUser,
getSuperSyncConfig,
createSimulatedClient,
closeClient,
waitForTask,
isServerHealthy,
type SimulatedE2EClient,
} from '../../utils/supersync-helpers';
/**
* SuperSync Snapshot Vector Clock E2E Tests
*
* Tests the snapshot optimization scenario where the server skips returning
* old operations when a SYNC_IMPORT/BACKUP_IMPORT/REPAIR exists.
*
* The fix being tested: When using snapshot optimization, the server returns
* a `snapshotVectorClock` which is an aggregate of all vector clocks from
* skipped operations. This allows fresh clients to create merged updates
* that properly dominate all known clocks.
*
* Without this fix, fresh clients would get stuck in infinite loops because
* their merged updates would never dominate (missing clock entries from
* other clients whose ops were skipped).
*/
const generateTestRunId = (workerIndex: number): string => {
return `${Date.now()}-${workerIndex}`;
};
base.describe('@supersync SuperSync Snapshot Vector Clock', () => {
let serverHealthy: boolean | null = null;
base.beforeEach(async ({}, testInfo) => {
if (serverHealthy === null) {
serverHealthy = await isServerHealthy();
if (!serverHealthy) {
console.warn(
'SuperSync server not healthy at http://localhost:1901 - skipping tests',
);
}
}
testInfo.skip(!serverHealthy, 'SuperSync server not running');
});
/**
* Scenario 1: Fresh client joins after snapshot optimization
*
* This tests the core fix: a fresh client (Client C) joining after Client A
* has done a full-state sync (SYNC_IMPORT) and Client B has made changes.
* Client C's local changes should sync correctly without infinite loops.
*
* The scenario that caused the bug:
* 1. Client A syncs full state (SYNC_IMPORT at seq 316)
* 2. Client B makes changes (ops 317-319)
* 3. Client C joins fresh (sinceSeq=0)
* 4. Server uses snapshot optimization: skips ops 0-315, returns 316-319
* 5. Client C makes a change that conflicts with something from before seq 316
* 6. Without snapshotVectorClock, Client C's merged update can never dominate
* (missing clock entries from skipped ops) -> infinite loop
*
* With the fix, Client C receives snapshotVectorClock and can create
* dominating updates.
*/
base(
'Fresh client syncs correctly after snapshot optimization (no infinite loop)',
async ({ browser, baseURL }, testInfo) => {
testInfo.setTimeout(120000);
const testRunId = generateTestRunId(testInfo.workerIndex);
const appUrl = baseURL || 'http://localhost:4242';
let clientA: SimulatedE2EClient | null = null;
let clientB: SimulatedE2EClient | null = null;
let clientC: SimulatedE2EClient | null = null;
try {
const user = await createTestUser(testRunId);
const syncConfig = getSuperSyncConfig(user);
// === Phase 1: Client A creates initial data and syncs ===
console.log('[SnapshotClock] Phase 1: Client A creates initial data');
clientA = await createSimulatedClient(browser, appUrl, 'A', testRunId);
await clientA.sync.setupSuperSync(syncConfig);
// Create several tasks to build up operation history
const taskA1 = `A1-${testRunId}`;
const taskA2 = `A2-${testRunId}`;
const taskA3 = `A3-${testRunId}`;
await clientA.workView.addTask(taskA1);
await waitForTask(clientA.page, taskA1);
await clientA.workView.addTask(taskA2);
await waitForTask(clientA.page, taskA2);
await clientA.workView.addTask(taskA3);
await waitForTask(clientA.page, taskA3);
// Sync to server
await clientA.sync.syncAndWait();
console.log('[SnapshotClock] Client A synced initial tasks');
// === Phase 2: Client B joins and makes changes ===
console.log('[SnapshotClock] Phase 2: Client B joins and makes changes');
clientB = await createSimulatedClient(browser, appUrl, 'B', testRunId);
await clientB.sync.setupSuperSync(syncConfig);
await clientB.sync.syncAndWait();
// Verify B has all tasks
await waitForTask(clientB.page, taskA1);
await waitForTask(clientB.page, taskA2);
await waitForTask(clientB.page, taskA3);
// Client B creates more tasks (these will be after any potential snapshot)
const taskB1 = `B1-${testRunId}`;
const taskB2 = `B2-${testRunId}`;
await clientB.workView.addTask(taskB1);
await waitForTask(clientB.page, taskB1);
await clientB.workView.addTask(taskB2);
await waitForTask(clientB.page, taskB2);
// Client B syncs
await clientB.sync.syncAndWait();
console.log('[SnapshotClock] Client B created and synced tasks');
// Client A syncs to get B's changes
await clientA.sync.syncAndWait();
await waitForTask(clientA.page, taskB1);
await waitForTask(clientA.page, taskB2);
// === Phase 3: Client C joins fresh (simulates snapshot optimization) ===
console.log('[SnapshotClock] Phase 3: Client C joins fresh');
clientC = await createSimulatedClient(browser, appUrl, 'C', testRunId);
await clientC.sync.setupSuperSync(syncConfig);
// Initial sync - Client C downloads all data
// Server may use snapshot optimization here depending on server state
await clientC.sync.syncAndWait();
// Verify C has all existing tasks
await waitForTask(clientC.page, taskA1);
await waitForTask(clientC.page, taskA2);
await waitForTask(clientC.page, taskA3);
await waitForTask(clientC.page, taskB1);
await waitForTask(clientC.page, taskB2);
console.log('[SnapshotClock] Client C downloaded all tasks');
// === Phase 4: Client C makes changes ===
// This is the critical part - C's changes need to sync without getting stuck
console.log('[SnapshotClock] Phase 4: Client C makes changes');
const taskC1 = `C1-${testRunId}`;
const taskC2 = `C2-${testRunId}`;
await clientC.workView.addTask(taskC1);
await waitForTask(clientC.page, taskC1);
await clientC.workView.addTask(taskC2);
await waitForTask(clientC.page, taskC2);
// Client C syncs - without the fix, this could get stuck in infinite loop
// Set a shorter timeout to catch infinite loops quickly
const syncPromise = clientC.sync.syncAndWait();
const timeoutPromise = new Promise((_, reject) =>
setTimeout(
() => reject(new Error('Sync timed out - possible infinite loop')),
30000,
),
);
await Promise.race([syncPromise, timeoutPromise]);
console.log('[SnapshotClock] Client C synced successfully (no infinite loop)');
// === Phase 5: Verify all clients converge ===
console.log('[SnapshotClock] Phase 5: Verifying convergence');
// Sync all clients to converge
await clientA.sync.syncAndWait();
await clientB.sync.syncAndWait();
await clientC.sync.syncAndWait();
// Wait for UI to settle
await clientA.page.waitForTimeout(1000);
await clientB.page.waitForTimeout(1000);
await clientC.page.waitForTimeout(1000);
// All tasks should be present on all clients
const allTasks = [taskA1, taskA2, taskA3, taskB1, taskB2, taskC1, taskC2];
for (const task of allTasks) {
await waitForTask(clientA.page, task);
await waitForTask(clientB.page, task);
await waitForTask(clientC.page, task);
}
// Count tasks to ensure consistency
const countA = await clientA.page
.locator(`task:has-text("${testRunId}")`)
.count();
const countB = await clientB.page
.locator(`task:has-text("${testRunId}")`)
.count();
const countC = await clientC.page
.locator(`task:has-text("${testRunId}")`)
.count();
expect(countA).toBe(7);
expect(countB).toBe(7);
expect(countC).toBe(7);
console.log('[SnapshotClock] ✓ All clients converged with 7 tasks each');
} finally {
if (clientA) await closeClient(clientA);
if (clientB) await closeClient(clientB);
if (clientC) await closeClient(clientC);
}
},
);
/**
* Scenario 2: Multiple fresh clients joining sequentially
*
* Tests that multiple clients can join fresh and sync correctly,
* each receiving proper snapshotVectorClock values.
*/
base(
'Multiple fresh clients join and sync correctly after snapshot',
async ({ browser, baseURL }, testInfo) => {
testInfo.setTimeout(180000);
const testRunId = generateTestRunId(testInfo.workerIndex);
const appUrl = baseURL || 'http://localhost:4242';
let clientA: SimulatedE2EClient | null = null;
let clientB: SimulatedE2EClient | null = null;
let clientC: SimulatedE2EClient | null = null;
let clientD: SimulatedE2EClient | null = null;
try {
const user = await createTestUser(testRunId);
const syncConfig = getSuperSyncConfig(user);
// Client A: Creates initial data
clientA = await createSimulatedClient(browser, appUrl, 'A', testRunId);
await clientA.sync.setupSuperSync(syncConfig);
const taskA1 = `A1-${testRunId}`;
await clientA.workView.addTask(taskA1);
await waitForTask(clientA.page, taskA1);
await clientA.sync.syncAndWait();
console.log('[MultiClient] Client A created initial task');
// Client B: Joins, adds data
clientB = await createSimulatedClient(browser, appUrl, 'B', testRunId);
await clientB.sync.setupSuperSync(syncConfig);
await clientB.sync.syncAndWait();
await waitForTask(clientB.page, taskA1);
const taskB1 = `B1-${testRunId}`;
await clientB.workView.addTask(taskB1);
await waitForTask(clientB.page, taskB1);
await clientB.sync.syncAndWait();
console.log('[MultiClient] Client B joined and added task');
// Close Client B to simulate it going offline
await closeClient(clientB);
clientB = null;
// Client A continues working
const taskA2 = `A2-${testRunId}`;
await clientA.workView.addTask(taskA2);
await waitForTask(clientA.page, taskA2);
await clientA.sync.syncAndWait();
// Client C: Joins fresh (never saw Client B's changes locally)
clientC = await createSimulatedClient(browser, appUrl, 'C', testRunId);
await clientC.sync.setupSuperSync(syncConfig);
await clientC.sync.syncAndWait();
// C should have all tasks from A and B
await waitForTask(clientC.page, taskA1);
await waitForTask(clientC.page, taskA2);
await waitForTask(clientC.page, taskB1);
// C adds its own task
const taskC1 = `C1-${testRunId}`;
await clientC.workView.addTask(taskC1);
await waitForTask(clientC.page, taskC1);
await clientC.sync.syncAndWait();
console.log('[MultiClient] Client C joined and added task');
// Client D: Another fresh client joins
clientD = await createSimulatedClient(browser, appUrl, 'D', testRunId);
await clientD.sync.setupSuperSync(syncConfig);
await clientD.sync.syncAndWait();
// D should have all tasks
await waitForTask(clientD.page, taskA1);
await waitForTask(clientD.page, taskA2);
await waitForTask(clientD.page, taskB1);
await waitForTask(clientD.page, taskC1);
// D adds its own task
const taskD1 = `D1-${testRunId}`;
await clientD.workView.addTask(taskD1);
await waitForTask(clientD.page, taskD1);
await clientD.sync.syncAndWait();
console.log('[MultiClient] Client D joined and added task');
// Final sync for all active clients
await clientA.sync.syncAndWait();
await clientC.sync.syncAndWait();
await clientD.sync.syncAndWait();
// Verify all active clients have all 5 tasks
const allTasks = [taskA1, taskA2, taskB1, taskC1, taskD1];
for (const task of allTasks) {
await waitForTask(clientA.page, task);
await waitForTask(clientC.page, task);
await waitForTask(clientD.page, task);
}
const countA = await clientA.page
.locator(`task:has-text("${testRunId}")`)
.count();
const countC = await clientC.page
.locator(`task:has-text("${testRunId}")`)
.count();
const countD = await clientD.page
.locator(`task:has-text("${testRunId}")`)
.count();
expect(countA).toBe(5);
expect(countC).toBe(5);
expect(countD).toBe(5);
console.log('[MultiClient] ✓ All clients converged with 5 tasks each');
} finally {
if (clientA) await closeClient(clientA);
if (clientB) await closeClient(clientB);
if (clientC) await closeClient(clientC);
if (clientD) await closeClient(clientD);
}
},
);
/**
* Scenario 3: Fresh client with concurrent modifications
*
* Tests the scenario where a fresh client joins and immediately
* has concurrent modifications with an existing client. This is
* the most likely scenario to trigger the infinite loop bug.
*/
base(
'Fresh client handles concurrent modifications after snapshot',
async ({ browser, baseURL }, testInfo) => {
testInfo.setTimeout(120000);
const testRunId = generateTestRunId(testInfo.workerIndex);
const appUrl = baseURL || 'http://localhost:4242';
let clientA: SimulatedE2EClient | null = null;
let clientB: SimulatedE2EClient | null = null;
try {
const user = await createTestUser(testRunId);
const syncConfig = getSuperSyncConfig(user);
// Client A: Establishes baseline
clientA = await createSimulatedClient(browser, appUrl, 'A', testRunId);
await clientA.sync.setupSuperSync(syncConfig);
// Create initial tasks
const sharedTask = `Shared-${testRunId}`;
await clientA.workView.addTask(sharedTask);
await waitForTask(clientA.page, sharedTask);
await clientA.sync.syncAndWait();
console.log('[ConcurrentFresh] Client A created shared task');
// Add more operations to build up history
for (let i = 0; i < 5; i++) {
await clientA.workView.addTask(`Filler-${i}-${testRunId}`);
}
await clientA.sync.syncAndWait();
console.log('[ConcurrentFresh] Client A added filler tasks');
// Client B: Joins fresh
clientB = await createSimulatedClient(browser, appUrl, 'B', testRunId);
await clientB.sync.setupSuperSync(syncConfig);
await clientB.sync.syncAndWait();
// Verify B has all tasks
await waitForTask(clientB.page, sharedTask);
console.log('[ConcurrentFresh] Client B joined and synced');
// Now create concurrent modifications
// Client A: Marks shared task as done
const taskLocatorA = clientA.page
.locator(`task:not(.ng-animating):has-text("${sharedTask}")`)
.first();
await taskLocatorA.hover();
await taskLocatorA.locator('.task-done-btn').click();
await expect(taskLocatorA).toHaveClass(/isDone/, { timeout: 5000 });
console.log('[ConcurrentFresh] Client A marked shared task done');
// Client B: Also marks shared task as done (concurrent change)
const taskLocatorB = clientB.page
.locator(`task:not(.ng-animating):has-text("${sharedTask}")`)
.first();
await taskLocatorB.hover();
await taskLocatorB.locator('.task-done-btn').click();
await expect(taskLocatorB).toHaveClass(/isDone/, { timeout: 5000 });
console.log('[ConcurrentFresh] Client B marked shared task done (concurrent)');
// Client A syncs first
await clientA.sync.syncAndWait();
// Client B syncs - this is where the infinite loop could occur
// without the snapshotVectorClock fix
const syncPromise = clientB.sync.syncAndWait();
const timeoutPromise = new Promise((_, reject) =>
setTimeout(
() => reject(new Error('Sync timed out - possible infinite loop')),
30000,
),
);
await Promise.race([syncPromise, timeoutPromise]);
console.log('[ConcurrentFresh] Client B synced without infinite loop');
// Final sync to ensure convergence
await clientA.sync.syncAndWait();
await clientB.sync.syncAndWait();
// Wait for UI to settle
await clientA.page.waitForTimeout(500);
await clientB.page.waitForTimeout(500);
// Verify both clients show the shared task as done
await waitForTask(clientA.page, sharedTask);
await waitForTask(clientB.page, sharedTask);
const finalTaskA = clientA.page
.locator(`task:not(.ng-animating):has-text("${sharedTask}")`)
.first();
const finalTaskB = clientB.page
.locator(`task:not(.ng-animating):has-text("${sharedTask}")`)
.first();
await expect(finalTaskA).toHaveClass(/isDone/, { timeout: 5000 });
await expect(finalTaskB).toHaveClass(/isDone/, { timeout: 5000 });
// Verify task counts match
const countA = await clientA.page
.locator(`task:has-text("${testRunId}")`)
.count();
const countB = await clientB.page
.locator(`task:has-text("${testRunId}")`)
.count();
expect(countA).toBe(countB);
console.log('[ConcurrentFresh] ✓ Concurrent modifications resolved correctly');
} finally {
if (clientA) await closeClient(clientA);
if (clientB) await closeClient(clientB);
}
},
);
});

View file

@ -39,7 +39,7 @@ RUN apk add --no-cache openssl libc6-compat wget && \
WORKDIR /app
# Copy built artifacts from builder
# Copy built artifacts from builder (note: tsconfig rootDir is "." so structure is dist/src/ and dist/scripts/)
COPY --from=builder --chown=supersync:nodejs /repo/packages/super-sync-server/dist ./dist
COPY --from=builder --chown=supersync:nodejs /repo/packages/super-sync-server/public ./public
COPY --from=builder --chown=supersync:nodejs /repo/packages/super-sync-server/prisma ./prisma
@ -64,4 +64,4 @@ ENV DATA_DIR=/app/data
HEALTHCHECK --interval=30s --timeout=5s --start-period=10s --retries=3 \
CMD wget --no-verbose --tries=1 --spider http://localhost:${PORT}/health || exit 1
CMD ["sh", "-c", "npx prisma db push --skip-generate && node dist/index.js"]
CMD ["sh", "-c", "npx prisma db push --skip-generate && node dist/src/index.js"]

View file

@ -2,10 +2,10 @@
"name": "@super-productivity/super-sync-server",
"version": "1.0.0",
"description": "SuperSync server for Super Productivity",
"main": "dist/index.js",
"main": "dist/src/index.js",
"scripts": {
"build": "tsc",
"start": "node dist/index.js",
"start": "node dist/src/index.js",
"dev": "nodemon --watch src --ext ts --exec ts-node src/index.ts",
"test": "vitest run",
"docker:build": "./scripts/build-and-push.sh",
@ -15,7 +15,8 @@
"docker:logs": "docker logs supersync-server -f",
"delete-user": "ts-node scripts/delete-user.ts",
"clear-data": "ts-node scripts/clear-data.ts",
"monitor": "ts-node scripts/monitor.ts"
"monitor": "node dist/scripts/monitor.js",
"monitor:dev": "ts-node scripts/monitor.ts"
},
"dependencies": {
"@fastify/cors": "^11.1.0",

View file

@ -329,7 +329,7 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
// Use atomic read to get ops and latestSeq in one transaction
// This prevents race conditions where new ops arrive between the two reads
const { ops, latestSeq, gapDetected, latestSnapshotSeq } =
const { ops, latestSeq, gapDetected, latestSnapshotSeq, snapshotVectorClock } =
await syncService.getOpsSinceWithSeq(
userId,
sinceSeq,
@ -358,6 +358,7 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
latestSeq,
gapDetected: gapDetected || undefined, // Only include if true
latestSnapshotSeq, // Optimization: tells client where effective state starts
snapshotVectorClock, // Aggregated clock from skipped ops for conflict resolution
};
return reply.send(response);

View file

@ -547,6 +547,7 @@ export class SyncService {
latestSeq: number;
gapDetected: boolean;
latestSnapshotSeq?: number;
snapshotVectorClock?: VectorClock;
}> {
return prisma.$transaction(async (tx) => {
// Find the latest full-state operation (SYNC_IMPORT, BACKUP_IMPORT, REPAIR)
@ -566,6 +567,8 @@ export class SyncService {
// start from the full-state op instead. Pre-import ops are superseded and will
// be filtered out by the client anyway.
let effectiveSinceSeq = sinceSeq;
let snapshotVectorClock: VectorClock | undefined;
if (latestSnapshotSeq !== undefined && sinceSeq < latestSnapshotSeq) {
// Start from one before the snapshot so it's included in results
effectiveSinceSeq = latestSnapshotSeq - 1;
@ -573,6 +576,35 @@ export class SyncService {
`[user:${userId}] Optimized download: skipping from sinceSeq=${sinceSeq} to ${effectiveSinceSeq} ` +
`(latest snapshot at seq ${latestSnapshotSeq})`,
);
// Compute aggregated vector clock from all ops up to and including the snapshot.
// This ensures clients know about all clock entries from skipped ops.
const skippedOps = await tx.operation.findMany({
where: {
userId,
serverSeq: { lte: latestSnapshotSeq },
},
select: { vectorClock: true },
});
snapshotVectorClock = {};
for (const op of skippedOps) {
const clock = op.vectorClock as unknown as VectorClock;
if (clock && typeof clock === 'object') {
for (const [clientId, value] of Object.entries(clock)) {
if (typeof value === 'number') {
snapshotVectorClock[clientId] = Math.max(
snapshotVectorClock[clientId] ?? 0,
value,
);
}
}
}
}
Logger.info(
`[user:${userId}] Computed snapshotVectorClock from ${skippedOps.length} ops: ${JSON.stringify(snapshotVectorClock)}`,
);
}
const ops = await tx.operation.findMany({
@ -661,7 +693,7 @@ export class SyncService {
receivedAt: Number(row.receivedAt),
}));
return { ops: mappedOps, latestSeq, gapDetected, latestSnapshotSeq };
return { ops: mappedOps, latestSeq, gapDetected, latestSnapshotSeq, snapshotVectorClock };
});
}

View file

@ -203,6 +203,12 @@ export interface DownloadOpsResponse {
* Operations before this seq are superseded by the full-state operation.
*/
latestSnapshotSeq?: number;
/**
* Aggregated vector clock from all ops before and including the snapshot.
* Only set when snapshot optimization is used (sinceSeq < latestSnapshotSeq).
* Clients need this to create merged updates that dominate all known clocks.
*/
snapshotVectorClock?: VectorClock;
}
// Snapshot types

View file

@ -3,11 +3,11 @@
"target": "ES2020",
"module": "commonjs",
"outDir": "./dist",
"rootDir": "./src",
"rootDir": ".",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true
},
"include": ["src/**/*"]
"include": ["src/**/*", "scripts/**/*"]
}

View file

@ -350,6 +350,7 @@ export class OperationLogStoreService {
async markRejected(opIds: string[]): Promise<void> {
await this._ensureInit();
const tx = this.db.transaction('ops', 'readwrite');
const store = tx.objectStore('ops');
const index = store.index('byId');

View file

@ -577,6 +577,227 @@ describe('OperationLogDownloadService', () => {
});
});
describe('snapshotVectorClock handling', () => {
it('should capture snapshotVectorClock from first response', async () => {
const snapshotClock = { clientA: 10, clientB: 5, clientC: 3 };
mockApiProvider.downloadOps.and.returnValue(
Promise.resolve({
ops: [
{
serverSeq: 317,
receivedAt: Date.now(),
op: {
id: 'op-317',
clientId: 'c1',
actionType: '[Task] Add',
opType: OpType.Create,
entityType: 'TASK',
payload: {},
vectorClock: { c1: 1 },
timestamp: Date.now(),
schemaVersion: 1,
},
},
],
hasMore: false,
latestSeq: 317,
snapshotVectorClock: snapshotClock,
}),
);
const result = await service.downloadRemoteOps(mockApiProvider);
expect(result.snapshotVectorClock).toEqual(snapshotClock);
});
it('should return snapshotVectorClock even when no new ops', async () => {
const snapshotClock = { clientA: 10, clientB: 5 };
mockApiProvider.downloadOps.and.returnValue(
Promise.resolve({
ops: [],
hasMore: false,
latestSeq: 316,
snapshotVectorClock: snapshotClock,
}),
);
const result = await service.downloadRemoteOps(mockApiProvider);
expect(result.snapshotVectorClock).toEqual(snapshotClock);
expect(result.newOps.length).toBe(0);
});
it('should capture snapshotVectorClock from first page in paginated download', async () => {
const snapshotClock = { clientA: 10, clientB: 5 };
mockApiProvider.downloadOps.and.returnValues(
// First page - has snapshotVectorClock
Promise.resolve({
ops: [
{
serverSeq: 317,
receivedAt: Date.now(),
op: {
id: 'op-317',
clientId: 'c1',
actionType: '[Task] Add',
opType: OpType.Create,
entityType: 'TASK',
payload: {},
vectorClock: { c1: 1 },
timestamp: Date.now(),
schemaVersion: 1,
},
},
],
hasMore: true,
latestSeq: 320,
snapshotVectorClock: snapshotClock,
}),
// Second page - no snapshotVectorClock (only first response has it)
Promise.resolve({
ops: [
{
serverSeq: 318,
receivedAt: Date.now(),
op: {
id: 'op-318',
clientId: 'c2',
actionType: '[Task] Update',
opType: OpType.Update,
entityType: 'TASK',
payload: {},
vectorClock: { c2: 1 },
timestamp: Date.now(),
schemaVersion: 1,
},
},
],
hasMore: false,
latestSeq: 320,
}),
);
const result = await service.downloadRemoteOps(mockApiProvider);
// Should have captured snapshotVectorClock from first response
expect(result.snapshotVectorClock).toEqual(snapshotClock);
expect(result.newOps.length).toBe(2);
});
it('should not have snapshotVectorClock when server does not send it', async () => {
mockApiProvider.downloadOps.and.returnValue(
Promise.resolve({
ops: [
{
serverSeq: 1,
receivedAt: Date.now(),
op: {
id: 'op-1',
clientId: 'c1',
actionType: '[Task] Add',
opType: OpType.Create,
entityType: 'TASK',
payload: {},
vectorClock: { c1: 1 },
timestamp: Date.now(),
schemaVersion: 1,
},
},
],
hasMore: false,
latestSeq: 1,
// No snapshotVectorClock
}),
);
const result = await service.downloadRemoteOps(mockApiProvider);
expect(result.snapshotVectorClock).toBeUndefined();
});
it('should clear snapshotVectorClock when gap is detected and re-downloading', async () => {
const staleSnapshotClock = { clientA: 5 };
const freshSnapshotClock = { clientA: 10, clientB: 3 };
mockApiProvider.getLastServerSeq.and.returnValue(Promise.resolve(100));
mockApiProvider.downloadOps.and.returnValues(
// First response: gap detected with stale clock
Promise.resolve({
ops: [],
hasMore: false,
latestSeq: 50,
gapDetected: true,
snapshotVectorClock: staleSnapshotClock,
}),
// After reset: fresh clock
Promise.resolve({
ops: [
{
serverSeq: 1,
receivedAt: Date.now(),
op: {
id: 'op-1',
clientId: 'c1',
actionType: '[Task] Add',
opType: OpType.Create,
entityType: 'TASK',
payload: {},
vectorClock: { c1: 1 },
timestamp: Date.now(),
schemaVersion: 1,
},
},
],
hasMore: false,
latestSeq: 50,
snapshotVectorClock: freshSnapshotClock,
}),
);
const result = await service.downloadRemoteOps(mockApiProvider);
// Should have the fresh clock from after the reset
expect(result.snapshotVectorClock).toEqual(freshSnapshotClock);
});
it('should work with forceFromSeq0 option and return snapshotVectorClock', async () => {
const snapshotClock = { clientA: 100, clientB: 50 };
mockApiProvider.getLastServerSeq.and.returnValue(Promise.resolve(500));
mockApiProvider.downloadOps.and.returnValue(
Promise.resolve({
ops: [
{
serverSeq: 316,
receivedAt: Date.now(),
op: {
id: 'op-316',
clientId: 'c1',
actionType: '[All] Sync Import',
opType: 'SYNC_IMPORT' as OpType,
entityType: 'ALL',
payload: {},
vectorClock: snapshotClock,
timestamp: Date.now(),
schemaVersion: 1,
},
},
],
hasMore: false,
latestSeq: 316,
snapshotVectorClock: snapshotClock,
}),
);
const result = await service.downloadRemoteOps(mockApiProvider, {
forceFromSeq0: true,
});
expect(result.snapshotVectorClock).toEqual(snapshotClock);
expect(result.allOpClocks).toBeDefined();
expect(result.allOpClocks!.length).toBe(1);
});
});
describe('gap detection handling', () => {
it('should reset lastServerSeq to 0 when gap is detected', async () => {
// First call returns gap detected (client has stale sinceSeq)

View file

@ -50,6 +50,12 @@ export interface DownloadResult {
* vector clock state from all known ops on the server.
*/
allOpClocks?: import('../operation.types').VectorClock[];
/**
* Aggregated vector clock from all ops before and including the snapshot.
* Only set when snapshot optimization is used (sinceSeq < latestSnapshotSeq).
* Clients need this to create merged updates that dominate all known clocks.
*/
snapshotVectorClock?: import('../operation.types').VectorClock;
}
/**
@ -116,6 +122,7 @@ export class OperationLogDownloadService {
let downloadFailed = false;
let needsFullStateUpload = false;
let finalLatestSeq = 0;
let snapshotVectorClock: import('../operation.types').VectorClock | undefined;
// Get encryption key upfront
const privateCfg =
@ -152,6 +159,14 @@ export class OperationLogDownloadService {
const response = await syncProvider.downloadOps(sinceSeq, undefined, 500);
finalLatestSeq = response.latestSeq;
// Capture snapshot vector clock from first response (only present when snapshot optimization used)
if (!snapshotVectorClock && response.snapshotVectorClock) {
snapshotVectorClock = response.snapshotVectorClock;
OpLog.normal(
`OperationLogDownloadService: Received snapshotVectorClock with ${Object.keys(snapshotVectorClock).length} entries`,
);
}
// Handle gap detection: server was reset or client has stale lastServerSeq
if (response.gapDetected && !hasResetForGap) {
OpLog.warn(
@ -163,6 +178,7 @@ export class OperationLogDownloadService {
hasResetForGap = true;
allNewOps.length = 0; // Clear any ops we may have accumulated
allOpClocks.length = 0; // Clear clocks too
snapshotVectorClock = undefined; // Clear snapshot clock to capture fresh one after reset
// NOTE: Don't persist lastServerSeq=0 here - caller will persist the final value
// after ops are stored in IndexedDB. This ensures localStorage and IndexedDB stay in sync.
continue;
@ -301,6 +317,8 @@ export class OperationLogDownloadService {
latestServerSeq: finalLatestSeq,
// Include all op clocks when force downloading from seq 0
...(forceFromSeq0 && allOpClocks.length > 0 ? { allOpClocks } : {}),
// Include snapshot vector clock when snapshot optimization was used
...(snapshotVectorClock ? { snapshotVectorClock } : {}),
};
}

View file

@ -3035,9 +3035,239 @@ describe('OperationLogSyncService', () => {
await service.uploadPendingOps(mockProvider);
// Verify _resolveStaleLocalOps was called with the clocks from force download
// Third arg is snapshotVectorClock (undefined in this test)
expect(resolveStaleOpsSpy).toHaveBeenCalledWith(
jasmine.any(Array),
forceDownloadClocks,
undefined,
);
});
it('should pass snapshotVectorClock to _resolveStaleLocalOps when present', async () => {
const localOp: Operation = {
id: 'local-op-1',
clientId: 'client-A',
actionType: 'test',
opType: OpType.Update,
entityType: 'TAG',
entityId: 'TODAY',
payload: { taskIds: ['task-1'] },
vectorClock: { clientA: 1 },
timestamp: Date.now(),
schemaVersion: 1,
};
uploadServiceSpy.uploadPendingOps.and.returnValue(
Promise.resolve({
uploadedCount: 0,
piggybackedOps: [],
rejectedCount: 1,
rejectedOps: [
{
opId: 'local-op-1',
error: 'Concurrent modification detected for TAG:TODAY',
},
],
}),
);
opLogStoreSpy.getOpById.and.returnValue(
Promise.resolve({
seq: 1,
op: localOp,
appliedAt: Date.now(),
source: 'local' as const,
}),
);
opLogStoreSpy.getUnsynced.and.returnValue(
Promise.resolve([
{ seq: 1, op: localOp, appliedAt: Date.now(), source: 'local' as const },
]),
);
const snapshotVectorClock = { clientA: 10, clientB: 5, clientC: 3 };
let downloadCallCount = 0;
spyOn(service, 'downloadRemoteOps').and.callFake(() => {
downloadCallCount++;
// First download returns no new ops but has snapshotVectorClock
return Promise.resolve({
serverMigrationHandled: false,
localWinOpsCreated: 0,
newOpsCount: 0,
snapshotVectorClock,
});
});
// Spy on _resolveStaleLocalOps to verify it receives snapshotVectorClock
const resolveStaleOpsSpy = spyOn<any>(
service,
'_resolveStaleLocalOps',
).and.returnValue(Promise.resolve(0));
await service.uploadPendingOps(mockProvider);
// Verify _resolveStaleLocalOps was called with snapshotVectorClock
expect(resolveStaleOpsSpy).toHaveBeenCalledWith(
jasmine.any(Array),
undefined, // No allOpClocks from regular download
snapshotVectorClock,
);
});
it('should pass both allOpClocks and snapshotVectorClock to _resolveStaleLocalOps', async () => {
const localOp: Operation = {
id: 'local-op-1',
clientId: 'client-A',
actionType: 'test',
opType: OpType.Update,
entityType: 'TAG',
entityId: 'TODAY',
payload: { taskIds: ['task-1'] },
vectorClock: { clientA: 1 },
timestamp: Date.now(),
schemaVersion: 1,
};
uploadServiceSpy.uploadPendingOps.and.returnValue(
Promise.resolve({
uploadedCount: 0,
piggybackedOps: [],
rejectedCount: 1,
rejectedOps: [
{
opId: 'local-op-1',
error: 'Concurrent modification detected for TAG:TODAY',
},
],
}),
);
opLogStoreSpy.getOpById.and.returnValue(
Promise.resolve({
seq: 1,
op: localOp,
appliedAt: Date.now(),
source: 'local' as const,
}),
);
opLogStoreSpy.getUnsynced.and.returnValue(
Promise.resolve([
{ seq: 1, op: localOp, appliedAt: Date.now(), source: 'local' as const },
]),
);
const allOpClocks = [
{ clientA: 3, clientB: 10 },
{ clientA: 5, clientC: 2 },
];
const snapshotVectorClock = { clientA: 15, clientB: 8, clientD: 1 };
let downloadCallCount = 0;
spyOn(service, 'downloadRemoteOps').and.callFake(() => {
downloadCallCount++;
if (downloadCallCount === 1) {
// First download returns no ops (triggers force download)
return Promise.resolve({
serverMigrationHandled: false,
localWinOpsCreated: 0,
newOpsCount: 0,
});
} else {
// Force download returns both allOpClocks and snapshotVectorClock
return Promise.resolve({
serverMigrationHandled: false,
localWinOpsCreated: 0,
newOpsCount: 0,
allOpClocks,
snapshotVectorClock,
});
}
});
// Spy on _resolveStaleLocalOps to verify it receives both
const resolveStaleOpsSpy = spyOn<any>(
service,
'_resolveStaleLocalOps',
).and.returnValue(Promise.resolve(0));
await service.uploadPendingOps(mockProvider);
// Verify _resolveStaleLocalOps was called with both allOpClocks and snapshotVectorClock
expect(resolveStaleOpsSpy).toHaveBeenCalledWith(
jasmine.any(Array),
allOpClocks,
snapshotVectorClock,
);
});
it('should use snapshotVectorClock when download returns new ops but concurrent ops still pending', async () => {
const localOp: Operation = {
id: 'local-op-1',
clientId: 'client-A',
actionType: 'test',
opType: OpType.Update,
entityType: 'TIME_TRACKING', // Different entity type than downloaded op
entityId: 'tt-123',
payload: { data: 'test' },
vectorClock: { clientA: 1 },
timestamp: Date.now(),
schemaVersion: 1,
};
uploadServiceSpy.uploadPendingOps.and.returnValue(
Promise.resolve({
uploadedCount: 0,
piggybackedOps: [],
rejectedCount: 1,
rejectedOps: [
{
opId: 'local-op-1',
error: 'Concurrent modification detected for TIME_TRACKING:tt-123',
},
],
}),
);
opLogStoreSpy.getOpById.and.returnValue(
Promise.resolve({
seq: 1,
op: localOp,
appliedAt: Date.now(),
source: 'local' as const,
}),
);
opLogStoreSpy.getUnsynced.and.returnValue(
Promise.resolve([
{ seq: 1, op: localOp, appliedAt: Date.now(), source: 'local' as const },
]),
);
const snapshotVectorClock = { clientA: 20, clientB: 15 };
// Download returns new ops for a different entity (TASK), but TIME_TRACKING op still pending
spyOn(service, 'downloadRemoteOps').and.returnValue(
Promise.resolve({
serverMigrationHandled: false,
localWinOpsCreated: 0,
newOpsCount: 5, // Got new ops, but for different entity
snapshotVectorClock,
}),
);
// Spy on _resolveStaleLocalOps
const resolveStaleOpsSpy = spyOn<any>(
service,
'_resolveStaleLocalOps',
).and.returnValue(Promise.resolve(0));
await service.uploadPendingOps(mockProvider);
// Verify _resolveStaleLocalOps was called with snapshotVectorClock
expect(resolveStaleOpsSpy).toHaveBeenCalledWith(
jasmine.any(Array),
undefined,
snapshotVectorClock,
);
});
});

View file

@ -378,6 +378,17 @@ export class OperationLogSyncService {
mergedOpsCreated += await this._resolveStaleLocalOps(
stillPendingOps,
forceDownloadResult.allOpClocks,
forceDownloadResult.snapshotVectorClock,
);
} else if (forceDownloadResult.snapshotVectorClock) {
// Force download returned no individual clocks but we have snapshot clock
OpLog.normal(
`OperationLogSyncService: Using snapshotVectorClock from force download`,
);
mergedOpsCreated += await this._resolveStaleLocalOps(
stillPendingOps,
undefined,
forceDownloadResult.snapshotVectorClock,
);
} else {
// Force download returned no clocks but we have concurrent ops.
@ -407,7 +418,11 @@ export class OperationLogSyncService {
`OperationLogSyncService: Download got ${downloadResult.newOpsCount} ops but ${stillPendingOps.length} ` +
`concurrent ops still pending. Resolving locally with merged clocks...`,
);
mergedOpsCreated += await this._resolveStaleLocalOps(stillPendingOps);
mergedOpsCreated += await this._resolveStaleLocalOps(
stillPendingOps,
undefined,
downloadResult.snapshotVectorClock,
);
}
}
} catch (e) {
@ -433,11 +448,13 @@ export class OperationLogSyncService {
*
* @param staleOps - Operations that were rejected due to concurrent modification
* @param extraClocks - Additional clocks to merge (from force download)
* @param snapshotVectorClock - Aggregated clock from snapshot optimization (if available)
* @returns Number of merged ops created
*/
private async _resolveStaleLocalOps(
staleOps: Array<{ opId: string; op: Operation }>,
extraClocks?: VectorClock[],
snapshotVectorClock?: VectorClock,
): Promise<number> {
const clientId = await this._getPfapiService().pf.metaModel.loadClientId();
if (!clientId) {
@ -449,6 +466,15 @@ export class OperationLogSyncService {
// This ensures we have all known clocks, not just entity-specific ones
let globalClock = await this.vectorClockService.getCurrentVectorClock();
// Merge snapshot vector clock if available (from server's snapshot optimization)
// This ensures we have the clocks from ops that were skipped during download
if (snapshotVectorClock && Object.keys(snapshotVectorClock).length > 0) {
OpLog.normal(
`OperationLogSyncService: Merging snapshotVectorClock with ${Object.keys(snapshotVectorClock).length} entries`,
);
globalClock = mergeVectorClocks(globalClock, snapshotVectorClock);
}
// If extra clocks were provided (from force download), merge them all
// This helps recover from situations where our local clock is missing entries
if (extraClocks && extraClocks.length > 0) {
@ -583,6 +609,7 @@ export class OperationLogSyncService {
localWinOpsCreated: number;
newOpsCount: number;
allOpClocks?: VectorClock[];
snapshotVectorClock?: VectorClock;
}> {
const result = await this.downloadService.downloadRemoteOps(syncProvider, options);
@ -614,6 +641,8 @@ export class OperationLogSyncService {
newOpsCount: 0,
// Include all op clocks from forced download (even though no new ops)
allOpClocks: result.allOpClocks,
// Include snapshot vector clock for stale op resolution
snapshotVectorClock: result.snapshotVectorClock,
};
}
@ -662,6 +691,7 @@ export class OperationLogSyncService {
localWinOpsCreated: processResult.localWinOpsCreated,
newOpsCount: result.newOps.length,
allOpClocks: result.allOpClocks,
snapshotVectorClock: result.snapshotVectorClock,
};
}

View file

@ -97,13 +97,6 @@ export const globalConfigReducer = createReducer<GlobalConfigState>(
const syncProvider =
oldState.sync.syncProvider ?? appDataComplete.globalConfig.sync.syncProvider;
// Debug logging for hot reload investigation
console.warn('[HotReloadDebug] loadAllData reducer:', {
oldSyncProvider: oldState.sync.syncProvider,
newSyncProvider: appDataComplete.globalConfig?.sync?.syncProvider,
resolvedProvider: syncProvider,
});
return {
...appDataComplete.globalConfig,
sync: {

View file

@ -191,6 +191,12 @@ export interface OpDownloadResponse {
* When true, the client should reset lastServerSeq to 0 and re-download.
*/
gapDetected?: boolean;
/**
* Aggregated vector clock from all ops before and including the snapshot.
* Only set when snapshot optimization is used (sinceSeq < latestSnapshotSeq).
* Clients need this to create merged updates that dominate all known clocks.
*/
snapshotVectorClock?: Record<string, number>;
}
/**

View file

@ -136,31 +136,13 @@ export class PfapiService {
: null;
const currentProvider = this.pf.getActiveSyncProvider();
// Debug logging for hot reload investigation
PFLog.warn('[HotReloadDebug] syncConfig subscription fired:', {
isEnabled: cfg.isEnabled,
newSyncProvider: newProviderId,
currentActiveProvider: currentProvider?.id || null,
});
// Guard against hot reload race condition:
// Don't clear provider if we had a valid one and the new config has sync disabled
// This protects against temporary state resets during HMR
if (currentProvider && !newProviderId) {
PFLog.warn(
'[HotReloadDebug] Ignoring attempt to clear sync provider - likely hot reload race condition',
);
return;
}
// Log provider changes for debugging
if (currentProvider && newProviderId && currentProvider.id !== newProviderId) {
PFLog.warn('[HotReloadDebug] Provider change detected:', {
from: currentProvider.id,
to: newProviderId,
});
}
this.pf.setActiveSyncProvider(newProviderId);
if (cfg.isEnabled) {
this.pf.setEncryptAndCompressCfg({