mirror of
https://github.com/johannesjo/super-productivity.git
synced 2026-01-23 02:36:05 +00:00
fix: address code review findings from Dec 27-29 changes
ESLint rule improvements: - Fix require-hydration-guard to detect guards on combineLatest/forkJoin/zip results (eliminates 20+ false positive warnings) Server improvements: - Add OperationDownloadService unit tests (21 tests covering vector clock aggregation, gap detection, excludeClient parameter, snapshot optimization) - Split ZIP bomb size limits: 10MB for /ops, 30MB for /snapshot - Document storage quota update as intentionally non-atomic E2E test improvements: - Add waitForUISettle() helper using Angular stability instead of fixed timeouts - Update supersync-cross-entity tests to use dynamic waits Unit test improvements: - Add HydrationStateService edge case tests (concurrent calls, cooldown cycles, timer cleanup, interleaved operations)
This commit is contained in:
parent
b3022c7285
commit
1da70487f9
8 changed files with 677 additions and 10 deletions
|
|
@ -8,6 +8,7 @@ import {
|
|||
isServerHealthy,
|
||||
type SimulatedE2EClient,
|
||||
} from '../../utils/supersync-helpers';
|
||||
import { waitForUISettle } from '../../utils/waits';
|
||||
|
||||
/**
|
||||
* SuperSync Cross-Entity Operations E2E Tests
|
||||
|
|
@ -64,7 +65,7 @@ base.describe('@supersync Cross-Entity Operations Sync', () => {
|
|||
const taskName = `BatchTask${i}-${uniqueId}`;
|
||||
taskNames.push(taskName);
|
||||
await clientA.workView.addTask(taskName);
|
||||
await clientA.page.waitForTimeout(200);
|
||||
await waitForUISettle(clientA.page);
|
||||
}
|
||||
console.log('[Multi Task Test] Client A created 5 tasks');
|
||||
|
||||
|
|
|
|||
|
|
@ -126,6 +126,25 @@ export const waitForAppReady = async (
|
|||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Wait for UI to settle after an action (e.g., adding a task).
|
||||
* Uses Angular stability as the primary signal rather than fixed timeouts.
|
||||
* Falls back to a minimal timeout if Angular stability check fails.
|
||||
*/
|
||||
export const waitForUISettle = async (page: Page): Promise<void> => {
|
||||
if (page.isClosed()) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await waitForAngularStability(page, 2000);
|
||||
} catch {
|
||||
// Fall back to minimal fixed timeout if stability check fails
|
||||
if (!page.isClosed()) {
|
||||
await page.waitForTimeout(200);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* Wait for local state changes to persist before triggering sync.
|
||||
* This ensures IndexedDB writes have completed after UI state changes.
|
||||
|
|
|
|||
|
|
@ -229,7 +229,11 @@ module.exports = {
|
|||
};
|
||||
|
||||
/**
|
||||
* Find the full pipe chain starting from a selector and check for guards
|
||||
* Find the full pipe chain starting from a selector and check for guards.
|
||||
*
|
||||
* IMPORTANT: When a selector is inside combineLatest([sel1, sel2]).pipe(...),
|
||||
* the guard is on the combineLatest result, not the individual selector.
|
||||
* We need to walk up through the array and combineLatest call to find the full chain.
|
||||
*/
|
||||
const hasGuardInChain = (selectorNode) => {
|
||||
const sourceCode = context.getSourceCode();
|
||||
|
|
@ -237,10 +241,27 @@ module.exports = {
|
|||
// Walk up from selector to find the full pipe chain
|
||||
let current = selectorNode;
|
||||
|
||||
// Follow the chain: select().pipe().pipe()...
|
||||
// First, walk up through any combineLatest/forkJoin/zip wrappers
|
||||
// This handles: combineLatest([sel1, sel2]).pipe(guard, ...)
|
||||
while (current.parent) {
|
||||
const parent = current.parent;
|
||||
|
||||
// If we're inside an array that's an argument to combineLatest/forkJoin/zip,
|
||||
// move up to the call expression
|
||||
if (parent.type === 'ArrayExpression') {
|
||||
const arrayParent = parent.parent;
|
||||
if (arrayParent && arrayParent.type === 'CallExpression') {
|
||||
const callee = arrayParent.callee;
|
||||
if (
|
||||
callee.type === 'Identifier' &&
|
||||
['combineLatest', 'forkJoin', 'zip'].includes(callee.name)
|
||||
) {
|
||||
current = arrayParent;
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Check if parent is a member access for .pipe
|
||||
if (
|
||||
parent.type === 'MemberExpression' &&
|
||||
|
|
|
|||
|
|
@ -165,6 +165,55 @@ ruleTester.run('require-hydration-guard', rule, {
|
|||
)
|
||||
`,
|
||||
},
|
||||
|
||||
// combineLatest at ROOT with guard on the combineLatest result - should NOT flag
|
||||
// This is the key fix: guards applied to combineLatest protect all inner selectors
|
||||
{
|
||||
code: `
|
||||
createEffect(() =>
|
||||
combineLatest([
|
||||
this._store$.select(selectorA),
|
||||
this._store$.select(selectorB)
|
||||
]).pipe(
|
||||
skipWhileApplyingRemoteOps(),
|
||||
map(([a, b]) => someAction({ a, b }))
|
||||
)
|
||||
)
|
||||
`,
|
||||
},
|
||||
|
||||
// combineLatest at ROOT with isApplyingRemoteOps guard - should NOT flag
|
||||
{
|
||||
code: `
|
||||
createEffect(() =>
|
||||
combineLatest([
|
||||
this.store.select(selectFocusModeConfig),
|
||||
this.store.select(selectIsFocusModeEnabled),
|
||||
]).pipe(
|
||||
filter(() => !this.hydrationState.isApplyingRemoteOps()),
|
||||
switchMap(([cfg, enabled]) => enabled ? of(action()) : EMPTY)
|
||||
)
|
||||
)
|
||||
`,
|
||||
},
|
||||
|
||||
// Many selectors in combineLatest with guard - should NOT flag any
|
||||
{
|
||||
code: `
|
||||
createEffect(() =>
|
||||
combineLatest([
|
||||
this.store.select(selectA),
|
||||
this.store.select(selectB),
|
||||
this.store.select(selectC),
|
||||
this.store.select(selectD),
|
||||
this.store.select(selectE),
|
||||
]).pipe(
|
||||
skipDuringSync(),
|
||||
tap(([a, b, c, d, e]) => this.doSomething())
|
||||
)
|
||||
)
|
||||
`,
|
||||
},
|
||||
],
|
||||
|
||||
invalid: [
|
||||
|
|
|
|||
|
|
@ -71,6 +71,17 @@ export class StorageQuotaService {
|
|||
/**
|
||||
* Update the cached storage usage for a user.
|
||||
* Called after successful uploads to keep the cache accurate.
|
||||
*
|
||||
* NOTE: This is intentionally NOT wrapped in a transaction with calculateStorageUsage().
|
||||
* While this creates a theoretical race condition where another process could modify
|
||||
* storage between calculate and update, this is acceptable because:
|
||||
*
|
||||
* 1. Storage quota is recalculated from scratch before each upload in checkStorageQuota()
|
||||
* 2. The cached value is only used for performance optimization, not hard enforcement
|
||||
* 3. Wrapping in a transaction would add unnecessary overhead for a non-critical cache
|
||||
* 4. The worst case is a slightly stale cache value that gets corrected on next upload
|
||||
*
|
||||
* If strict accuracy becomes important, wrap both calls in a SERIALIZABLE transaction.
|
||||
*/
|
||||
async updateStorageUsage(userId: number): Promise<void> {
|
||||
const { totalBytes } = await this.calculateStorageUsage(userId);
|
||||
|
|
|
|||
|
|
@ -22,10 +22,16 @@ const gunzipAsync = promisify(zlib.gunzip);
|
|||
// Validation constants
|
||||
const CLIENT_ID_REGEX = /^[a-zA-Z0-9_-]+$/;
|
||||
const MAX_CLIENT_ID_LENGTH = 255;
|
||||
|
||||
// Two-stage protection against zip bombs:
|
||||
// 1. Pre-check: Reject compressed data > 10MB (typical ratio ~10:1, so protects against ~100MB)
|
||||
// 2. Post-check: Reject decompressed data > 100MB (catches edge cases)
|
||||
const MAX_COMPRESSED_SIZE = 10 * 1024 * 1024; // 10MB - prevents memory exhaustion during decompression
|
||||
// 1. Pre-check: Reject compressed data > limit (typical ratio ~10:1)
|
||||
// 2. Post-check: Reject decompressed data > limit (catches edge cases)
|
||||
//
|
||||
// Different limits for ops vs snapshots:
|
||||
// - Ops uploads are incremental and smaller
|
||||
// - Snapshots can be larger for backup/repair imports
|
||||
const MAX_COMPRESSED_SIZE_OPS = 10 * 1024 * 1024; // 10MB for /ops
|
||||
const MAX_COMPRESSED_SIZE_SNAPSHOT = 30 * 1024 * 1024; // 30MB for /snapshot (matches bodyLimit)
|
||||
const MAX_DECOMPRESSED_SIZE = 100 * 1024 * 1024; // 100MB - catches malicious high-ratio compression
|
||||
|
||||
// Zod Schemas
|
||||
|
|
@ -162,9 +168,9 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
|
|||
| undefined;
|
||||
|
||||
// Pre-check: reject if compressed size exceeds limit (prevents memory exhaustion)
|
||||
if (rawBody.length > MAX_COMPRESSED_SIZE) {
|
||||
if (rawBody.length > MAX_COMPRESSED_SIZE_OPS) {
|
||||
Logger.warn(
|
||||
`[user:${userId}] Compressed upload too large: ${rawBody.length} bytes (max ${MAX_COMPRESSED_SIZE})`,
|
||||
`[user:${userId}] Compressed upload too large: ${rawBody.length} bytes (max ${MAX_COMPRESSED_SIZE_OPS})`,
|
||||
);
|
||||
return reply.status(413).send({
|
||||
error: 'Compressed payload too large',
|
||||
|
|
@ -560,9 +566,9 @@ export const syncRoutes = async (fastify: FastifyInstance): Promise<void> => {
|
|||
| undefined;
|
||||
|
||||
// Pre-check: reject if compressed size exceeds limit (prevents memory exhaustion)
|
||||
if (rawBody.length > MAX_COMPRESSED_SIZE) {
|
||||
if (rawBody.length > MAX_COMPRESSED_SIZE_SNAPSHOT) {
|
||||
Logger.warn(
|
||||
`[user:${userId}] Compressed snapshot too large: ${rawBody.length} bytes (max ${MAX_COMPRESSED_SIZE})`,
|
||||
`[user:${userId}] Compressed snapshot too large: ${rawBody.length} bytes (max ${MAX_COMPRESSED_SIZE_SNAPSHOT})`,
|
||||
);
|
||||
return reply.status(413).send({
|
||||
error: 'Compressed payload too large',
|
||||
|
|
|
|||
|
|
@ -0,0 +1,482 @@
|
|||
import { describe, it, expect, beforeEach, vi, afterEach } from 'vitest';
|
||||
import { OperationDownloadService } from '../src/sync/services/operation-download.service';
|
||||
import { Operation, ServerOperation } from '../src/sync/sync.types';
|
||||
|
||||
// Mock prisma
|
||||
vi.mock('../src/db', () => ({
|
||||
prisma: {
|
||||
operation: {
|
||||
findMany: vi.fn(),
|
||||
findFirst: vi.fn(),
|
||||
aggregate: vi.fn(),
|
||||
},
|
||||
userSyncState: {
|
||||
findUnique: vi.fn(),
|
||||
},
|
||||
$transaction: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
// Mock logger to avoid console noise in tests
|
||||
vi.mock('../src/logger', () => ({
|
||||
Logger: {
|
||||
info: vi.fn(),
|
||||
warn: vi.fn(),
|
||||
error: vi.fn(),
|
||||
},
|
||||
}));
|
||||
|
||||
import { prisma } from '../src/db';
|
||||
|
||||
// Helper to create a mock operation row (as returned by Prisma)
|
||||
const createMockOpRow = (
|
||||
serverSeq: number,
|
||||
clientId: string = 'client-1',
|
||||
overrides: Partial<{
|
||||
id: string;
|
||||
opType: string;
|
||||
actionType: string;
|
||||
entityType: string;
|
||||
entityId: string | null;
|
||||
payload: unknown;
|
||||
vectorClock: Record<string, number>;
|
||||
schemaVersion: number;
|
||||
clientTimestamp: bigint;
|
||||
receivedAt: bigint;
|
||||
isPayloadEncrypted: boolean;
|
||||
}> = {},
|
||||
) => ({
|
||||
id: overrides.id ?? `op-${serverSeq}`,
|
||||
serverSeq,
|
||||
clientId,
|
||||
actionType: overrides.actionType ?? '[Task] Add',
|
||||
opType: overrides.opType ?? 'ADD',
|
||||
entityType: overrides.entityType ?? 'Task',
|
||||
// Use 'in' check to allow null to be explicitly set
|
||||
entityId: 'entityId' in overrides ? overrides.entityId : `task-${serverSeq}`,
|
||||
payload: overrides.payload ?? { title: `Task ${serverSeq}` },
|
||||
vectorClock: overrides.vectorClock ?? { [clientId]: serverSeq },
|
||||
schemaVersion: overrides.schemaVersion ?? 1,
|
||||
clientTimestamp: overrides.clientTimestamp ?? BigInt(Date.now()),
|
||||
receivedAt: overrides.receivedAt ?? BigInt(Date.now()),
|
||||
isPayloadEncrypted: overrides.isPayloadEncrypted ?? false,
|
||||
});
|
||||
|
||||
describe('OperationDownloadService', () => {
|
||||
let service: OperationDownloadService;
|
||||
|
||||
beforeEach(() => {
|
||||
vi.clearAllMocks();
|
||||
service = new OperationDownloadService();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
vi.useRealTimers();
|
||||
});
|
||||
|
||||
describe('getOpsSince', () => {
|
||||
it('should return empty array when no operations exist', async () => {
|
||||
vi.mocked(prisma.operation.findMany).mockResolvedValue([]);
|
||||
|
||||
const result = await service.getOpsSince(1, 0);
|
||||
|
||||
expect(result).toEqual([]);
|
||||
expect(prisma.operation.findMany).toHaveBeenCalledWith({
|
||||
where: {
|
||||
userId: 1,
|
||||
serverSeq: { gt: 0 },
|
||||
},
|
||||
orderBy: { serverSeq: 'asc' },
|
||||
take: 500,
|
||||
});
|
||||
});
|
||||
|
||||
it('should return operations mapped to ServerOperation format', async () => {
|
||||
const mockOps = [createMockOpRow(1), createMockOpRow(2)];
|
||||
vi.mocked(prisma.operation.findMany).mockResolvedValue(mockOps as any);
|
||||
|
||||
const result = await service.getOpsSince(1, 0);
|
||||
|
||||
expect(result).toHaveLength(2);
|
||||
expect(result[0].serverSeq).toBe(1);
|
||||
expect(result[0].op.id).toBe('op-1');
|
||||
expect(result[0].op.opType).toBe('ADD');
|
||||
expect(result[1].serverSeq).toBe(2);
|
||||
});
|
||||
|
||||
it('should exclude operations from specified client', async () => {
|
||||
vi.mocked(prisma.operation.findMany).mockResolvedValue([]);
|
||||
|
||||
await service.getOpsSince(1, 0, 'excluded-client');
|
||||
|
||||
expect(prisma.operation.findMany).toHaveBeenCalledWith({
|
||||
where: {
|
||||
userId: 1,
|
||||
serverSeq: { gt: 0 },
|
||||
clientId: { not: 'excluded-client' },
|
||||
},
|
||||
orderBy: { serverSeq: 'asc' },
|
||||
take: 500,
|
||||
});
|
||||
});
|
||||
|
||||
it('should respect custom limit', async () => {
|
||||
vi.mocked(prisma.operation.findMany).mockResolvedValue([]);
|
||||
|
||||
await service.getOpsSince(1, 0, undefined, 100);
|
||||
|
||||
expect(prisma.operation.findMany).toHaveBeenCalledWith(
|
||||
expect.objectContaining({ take: 100 }),
|
||||
);
|
||||
});
|
||||
|
||||
it('should correctly map operation fields including optional entityId', async () => {
|
||||
const opWithNullEntityId = createMockOpRow(1, 'client-1', { entityId: null });
|
||||
vi.mocked(prisma.operation.findMany).mockResolvedValue([opWithNullEntityId as any]);
|
||||
|
||||
const result = await service.getOpsSince(1, 0);
|
||||
|
||||
expect(result[0].op.entityId).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should convert timestamps from bigint to number', async () => {
|
||||
const mockOp = createMockOpRow(1, 'client-1', {
|
||||
clientTimestamp: BigInt(1700000000000),
|
||||
receivedAt: BigInt(1700000001000),
|
||||
});
|
||||
vi.mocked(prisma.operation.findMany).mockResolvedValue([mockOp as any]);
|
||||
|
||||
const result = await service.getOpsSince(1, 0);
|
||||
|
||||
expect(result[0].op.timestamp).toBe(1700000000000);
|
||||
expect(result[0].receivedAt).toBe(1700000001000);
|
||||
});
|
||||
|
||||
it('should handle operations with encrypted payloads', async () => {
|
||||
const encryptedOp = createMockOpRow(1, 'client-1', { isPayloadEncrypted: true });
|
||||
vi.mocked(prisma.operation.findMany).mockResolvedValue([encryptedOp as any]);
|
||||
|
||||
const result = await service.getOpsSince(1, 0);
|
||||
|
||||
expect(result[0].op.isPayloadEncrypted).toBe(true);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getOpsSinceWithSeq', () => {
|
||||
// Helper to set up transaction mock
|
||||
const setupTransactionMock = (mockFn: (tx: any) => Promise<any>) => {
|
||||
vi.mocked(prisma.$transaction).mockImplementation(async (fn: any) => {
|
||||
const mockTx = {
|
||||
operation: {
|
||||
findFirst: vi.fn(),
|
||||
findMany: vi.fn(),
|
||||
aggregate: vi.fn(),
|
||||
},
|
||||
userSyncState: {
|
||||
findUnique: vi.fn(),
|
||||
},
|
||||
};
|
||||
return mockFn(mockTx);
|
||||
});
|
||||
};
|
||||
|
||||
it('should return empty ops with correct latestSeq', async () => {
|
||||
setupTransactionMock(async (tx) => {
|
||||
tx.operation.findFirst.mockResolvedValue(null); // No full-state op
|
||||
tx.operation.findMany.mockResolvedValue([]);
|
||||
tx.userSyncState.findUnique.mockResolvedValue({ lastSeq: 5 });
|
||||
tx.operation.aggregate.mockResolvedValue({ _min: { serverSeq: 1 } });
|
||||
|
||||
return {
|
||||
ops: [],
|
||||
latestSeq: 5,
|
||||
gapDetected: false,
|
||||
latestSnapshotSeq: undefined,
|
||||
snapshotVectorClock: undefined,
|
||||
};
|
||||
});
|
||||
|
||||
const result = await service.getOpsSinceWithSeq(1, 4);
|
||||
|
||||
expect(result.ops).toEqual([]);
|
||||
expect(result.latestSeq).toBe(5);
|
||||
expect(result.gapDetected).toBe(false);
|
||||
});
|
||||
|
||||
it('should detect gap when client is ahead of server', async () => {
|
||||
vi.mocked(prisma.$transaction).mockImplementation(async (fn: any) => {
|
||||
const mockTx = {
|
||||
operation: {
|
||||
findFirst: vi.fn().mockResolvedValue(null),
|
||||
findMany: vi.fn().mockResolvedValue([]),
|
||||
aggregate: vi.fn().mockResolvedValue({ _min: { serverSeq: 1 } }),
|
||||
},
|
||||
userSyncState: {
|
||||
findUnique: vi.fn().mockResolvedValue({ lastSeq: 5 }),
|
||||
},
|
||||
};
|
||||
return fn(mockTx);
|
||||
});
|
||||
|
||||
const result = await service.getOpsSinceWithSeq(1, 10); // Client at seq 10, server at 5
|
||||
|
||||
expect(result.gapDetected).toBe(true);
|
||||
});
|
||||
|
||||
it('should detect gap when client has history but server is empty', async () => {
|
||||
vi.mocked(prisma.$transaction).mockImplementation(async (fn: any) => {
|
||||
const mockTx = {
|
||||
operation: {
|
||||
findFirst: vi.fn().mockResolvedValue(null),
|
||||
findMany: vi.fn().mockResolvedValue([]),
|
||||
aggregate: vi.fn().mockResolvedValue({ _min: { serverSeq: null } }),
|
||||
},
|
||||
userSyncState: {
|
||||
findUnique: vi.fn().mockResolvedValue({ lastSeq: 0 }),
|
||||
},
|
||||
};
|
||||
return fn(mockTx);
|
||||
});
|
||||
|
||||
const result = await service.getOpsSinceWithSeq(1, 5); // Client at seq 5, server empty
|
||||
|
||||
expect(result.gapDetected).toBe(true);
|
||||
});
|
||||
|
||||
it('should detect gap when requested seq is purged', async () => {
|
||||
vi.mocked(prisma.$transaction).mockImplementation(async (fn: any) => {
|
||||
const mockTx = {
|
||||
operation: {
|
||||
findFirst: vi.fn().mockResolvedValue(null),
|
||||
findMany: vi.fn().mockResolvedValue([]),
|
||||
aggregate: vi.fn().mockResolvedValue({ _min: { serverSeq: 50 } }), // Min is 50
|
||||
},
|
||||
userSyncState: {
|
||||
findUnique: vi.fn().mockResolvedValue({ lastSeq: 100 }),
|
||||
},
|
||||
};
|
||||
return fn(mockTx);
|
||||
});
|
||||
|
||||
const result = await service.getOpsSinceWithSeq(1, 10); // Client at seq 10, min is 50
|
||||
|
||||
expect(result.gapDetected).toBe(true);
|
||||
});
|
||||
|
||||
it('should detect gap when there is a gap in returned operations', async () => {
|
||||
const mockOps = [createMockOpRow(15)]; // Gap: requested sinceSeq + 1 = 11, but got 15
|
||||
|
||||
vi.mocked(prisma.$transaction).mockImplementation(async (fn: any) => {
|
||||
const mockTx = {
|
||||
operation: {
|
||||
findFirst: vi.fn().mockResolvedValue(null),
|
||||
findMany: vi.fn().mockResolvedValue(mockOps),
|
||||
aggregate: vi.fn().mockResolvedValue({ _min: { serverSeq: 1 } }),
|
||||
},
|
||||
userSyncState: {
|
||||
findUnique: vi.fn().mockResolvedValue({ lastSeq: 20 }),
|
||||
},
|
||||
};
|
||||
return fn(mockTx);
|
||||
});
|
||||
|
||||
const result = await service.getOpsSinceWithSeq(1, 10);
|
||||
|
||||
expect(result.gapDetected).toBe(true);
|
||||
});
|
||||
|
||||
it('should NOT detect gap when excludeClient filters cause apparent gaps', async () => {
|
||||
const mockOps = [createMockOpRow(15, 'other-client')]; // From different client
|
||||
|
||||
vi.mocked(prisma.$transaction).mockImplementation(async (fn: any) => {
|
||||
const mockTx = {
|
||||
operation: {
|
||||
findFirst: vi.fn().mockResolvedValue(null),
|
||||
findMany: vi.fn().mockResolvedValue(mockOps),
|
||||
aggregate: vi.fn().mockResolvedValue({ _min: { serverSeq: 1 } }),
|
||||
},
|
||||
userSyncState: {
|
||||
findUnique: vi.fn().mockResolvedValue({ lastSeq: 20 }),
|
||||
},
|
||||
};
|
||||
return fn(mockTx);
|
||||
});
|
||||
|
||||
const result = await service.getOpsSinceWithSeq(1, 10, 'excluded-client');
|
||||
|
||||
// Gap detection is disabled when excludeClient is used
|
||||
expect(result.gapDetected).toBe(false);
|
||||
});
|
||||
|
||||
it('should optimize download when latest snapshot exists', async () => {
|
||||
const snapshotOp = { serverSeq: 50 };
|
||||
const skippedOps = [
|
||||
{ vectorClock: { 'client-1': 10, 'client-2': 5 } },
|
||||
{ vectorClock: { 'client-1': 15, 'client-3': 8 } },
|
||||
];
|
||||
const opsAfterSnapshot = [createMockOpRow(51)];
|
||||
|
||||
vi.mocked(prisma.$transaction).mockImplementation(async (fn: any) => {
|
||||
const mockTx = {
|
||||
operation: {
|
||||
findFirst: vi.fn().mockResolvedValue(snapshotOp), // Latest SYNC_IMPORT at seq 50
|
||||
findMany: vi
|
||||
.fn()
|
||||
.mockResolvedValueOnce(skippedOps) // Skipped ops for vector clock
|
||||
.mockResolvedValueOnce(opsAfterSnapshot as any), // Actual ops to return
|
||||
aggregate: vi.fn().mockResolvedValue({ _min: { serverSeq: 1 } }),
|
||||
},
|
||||
userSyncState: {
|
||||
findUnique: vi.fn().mockResolvedValue({ lastSeq: 60 }),
|
||||
},
|
||||
};
|
||||
return fn(mockTx);
|
||||
});
|
||||
|
||||
const result = await service.getOpsSinceWithSeq(1, 10); // Client at seq 10, snapshot at 50
|
||||
|
||||
expect(result.latestSnapshotSeq).toBe(50);
|
||||
expect(result.snapshotVectorClock).toEqual({
|
||||
'client-1': 15, // Max of 10 and 15
|
||||
'client-2': 5,
|
||||
'client-3': 8,
|
||||
});
|
||||
});
|
||||
|
||||
it('should aggregate vector clocks correctly from skipped ops', async () => {
|
||||
const skippedOps = [
|
||||
{ vectorClock: { a: 1, b: 2 } },
|
||||
{ vectorClock: { a: 3, c: 1 } },
|
||||
{ vectorClock: { b: 5, c: 2 } },
|
||||
];
|
||||
|
||||
vi.mocked(prisma.$transaction).mockImplementation(async (fn: any) => {
|
||||
const mockTx = {
|
||||
operation: {
|
||||
findFirst: vi.fn().mockResolvedValue({ serverSeq: 10 }),
|
||||
findMany: vi.fn().mockResolvedValueOnce(skippedOps).mockResolvedValueOnce([]),
|
||||
aggregate: vi.fn().mockResolvedValue({ _min: { serverSeq: 1 } }),
|
||||
},
|
||||
userSyncState: {
|
||||
findUnique: vi.fn().mockResolvedValue({ lastSeq: 20 }),
|
||||
},
|
||||
};
|
||||
return fn(mockTx);
|
||||
});
|
||||
|
||||
const result = await service.getOpsSinceWithSeq(1, 0);
|
||||
|
||||
expect(result.snapshotVectorClock).toEqual({
|
||||
a: 3, // Max of 1 and 3
|
||||
b: 5, // Max of 2 and 5
|
||||
c: 2, // Max of 1 and 2
|
||||
});
|
||||
});
|
||||
|
||||
it('should handle malformed vector clocks gracefully', async () => {
|
||||
const skippedOps = [
|
||||
{ vectorClock: { a: 1 } },
|
||||
{ vectorClock: null }, // Invalid
|
||||
{ vectorClock: 'not an object' }, // Invalid
|
||||
{ vectorClock: { b: 'not a number', c: 2 } }, // Partially invalid
|
||||
];
|
||||
|
||||
vi.mocked(prisma.$transaction).mockImplementation(async (fn: any) => {
|
||||
const mockTx = {
|
||||
operation: {
|
||||
findFirst: vi.fn().mockResolvedValue({ serverSeq: 10 }),
|
||||
findMany: vi.fn().mockResolvedValueOnce(skippedOps).mockResolvedValueOnce([]),
|
||||
aggregate: vi.fn().mockResolvedValue({ _min: { serverSeq: 1 } }),
|
||||
},
|
||||
userSyncState: {
|
||||
findUnique: vi.fn().mockResolvedValue({ lastSeq: 20 }),
|
||||
},
|
||||
};
|
||||
return fn(mockTx);
|
||||
});
|
||||
|
||||
const result = await service.getOpsSinceWithSeq(1, 0);
|
||||
|
||||
// Should only include valid entries
|
||||
expect(result.snapshotVectorClock).toEqual({
|
||||
a: 1,
|
||||
c: 2, // String 'not a number' is skipped
|
||||
});
|
||||
});
|
||||
|
||||
it('should not optimize when client is already past snapshot', async () => {
|
||||
vi.mocked(prisma.$transaction).mockImplementation(async (fn: any) => {
|
||||
const mockTx = {
|
||||
operation: {
|
||||
findFirst: vi.fn().mockResolvedValue({ serverSeq: 50 }), // Snapshot at 50
|
||||
findMany: vi.fn().mockResolvedValue([createMockOpRow(61)] as any),
|
||||
aggregate: vi.fn().mockResolvedValue({ _min: { serverSeq: 1 } }),
|
||||
},
|
||||
userSyncState: {
|
||||
findUnique: vi.fn().mockResolvedValue({ lastSeq: 70 }),
|
||||
},
|
||||
};
|
||||
return fn(mockTx);
|
||||
});
|
||||
|
||||
const result = await service.getOpsSinceWithSeq(1, 60); // Client already past snapshot
|
||||
|
||||
// findMany for skipped ops should not be called when sinceSeq >= latestSnapshotSeq
|
||||
expect(result.snapshotVectorClock).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should return latestSeq as 0 when no sync state exists', async () => {
|
||||
vi.mocked(prisma.$transaction).mockImplementation(async (fn: any) => {
|
||||
const mockTx = {
|
||||
operation: {
|
||||
findFirst: vi.fn().mockResolvedValue(null),
|
||||
findMany: vi.fn().mockResolvedValue([]),
|
||||
aggregate: vi.fn().mockResolvedValue({ _min: { serverSeq: null } }),
|
||||
},
|
||||
userSyncState: {
|
||||
findUnique: vi.fn().mockResolvedValue(null), // No sync state
|
||||
},
|
||||
};
|
||||
return fn(mockTx);
|
||||
});
|
||||
|
||||
const result = await service.getOpsSinceWithSeq(1, 0);
|
||||
|
||||
expect(result.latestSeq).toBe(0);
|
||||
});
|
||||
});
|
||||
|
||||
describe('getLatestSeq', () => {
|
||||
it('should return latest sequence from userSyncState', async () => {
|
||||
vi.mocked(prisma.userSyncState.findUnique).mockResolvedValue({
|
||||
lastSeq: 42,
|
||||
} as any);
|
||||
|
||||
const result = await service.getLatestSeq(1);
|
||||
|
||||
expect(result).toBe(42);
|
||||
expect(prisma.userSyncState.findUnique).toHaveBeenCalledWith({
|
||||
where: { userId: 1 },
|
||||
select: { lastSeq: true },
|
||||
});
|
||||
});
|
||||
|
||||
it('should return 0 when no sync state exists', async () => {
|
||||
vi.mocked(prisma.userSyncState.findUnique).mockResolvedValue(null);
|
||||
|
||||
const result = await service.getLatestSeq(1);
|
||||
|
||||
expect(result).toBe(0);
|
||||
});
|
||||
|
||||
it('should handle large sequence numbers', async () => {
|
||||
vi.mocked(prisma.userSyncState.findUnique).mockResolvedValue({
|
||||
lastSeq: 999999999,
|
||||
} as any);
|
||||
|
||||
const result = await service.getLatestSeq(1);
|
||||
|
||||
expect(result).toBe(999999999);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -196,4 +196,82 @@ describe('HydrationStateService', () => {
|
|||
}, 100);
|
||||
});
|
||||
});
|
||||
|
||||
describe('edge cases', () => {
|
||||
it('should handle concurrent start calls without issues', () => {
|
||||
// Multiple starts should just keep it true
|
||||
service.startApplyingRemoteOps();
|
||||
service.startApplyingRemoteOps();
|
||||
service.startApplyingRemoteOps();
|
||||
|
||||
expect(service.isApplyingRemoteOps()).toBeTrue();
|
||||
expect(getIsApplyingRemoteOps()).toBeTrue();
|
||||
|
||||
// Single end should clear it
|
||||
service.endApplyingRemoteOps();
|
||||
expect(service.isApplyingRemoteOps()).toBeFalse();
|
||||
});
|
||||
|
||||
it('should handle multiple rapid cooldown starts', (done) => {
|
||||
// Start multiple cooldowns in rapid succession - only the last one matters
|
||||
service.startPostSyncCooldown(50);
|
||||
service.startPostSyncCooldown(100);
|
||||
service.startPostSyncCooldown(50); // Last one: 50ms
|
||||
|
||||
expect(service.isInSyncWindow()).toBeTrue();
|
||||
|
||||
// After 75ms, should still be in window (last cooldown was 50ms, but timer restarted)
|
||||
setTimeout(() => {
|
||||
expect(service.isInSyncWindow()).toBeFalse();
|
||||
done();
|
||||
}, 100);
|
||||
});
|
||||
|
||||
it('should properly cleanup when clearPostSyncCooldown is called during active cooldown', (done) => {
|
||||
service.startPostSyncCooldown(200);
|
||||
expect(service.isInSyncWindow()).toBeTrue();
|
||||
|
||||
// Clear immediately
|
||||
service.clearPostSyncCooldown();
|
||||
expect(service.isInSyncWindow()).toBeFalse();
|
||||
|
||||
// Wait past the original timeout - should still be false (timer was cleared)
|
||||
setTimeout(() => {
|
||||
expect(service.isInSyncWindow()).toBeFalse();
|
||||
done();
|
||||
}, 250);
|
||||
});
|
||||
|
||||
it('should handle interleaved start/end/cooldown calls', () => {
|
||||
// Complex sequence that might occur during rapid sync operations
|
||||
service.startApplyingRemoteOps();
|
||||
expect(service.isInSyncWindow()).toBeTrue();
|
||||
|
||||
service.startPostSyncCooldown(1000);
|
||||
expect(service.isInSyncWindow()).toBeTrue();
|
||||
|
||||
service.endApplyingRemoteOps();
|
||||
// Should still be in window due to cooldown
|
||||
expect(service.isInSyncWindow()).toBeTrue();
|
||||
expect(service.isApplyingRemoteOps()).toBeFalse();
|
||||
|
||||
service.clearPostSyncCooldown();
|
||||
// Now should be fully out of window
|
||||
expect(service.isInSyncWindow()).toBeFalse();
|
||||
});
|
||||
|
||||
it('should handle clearPostSyncCooldown when no cooldown is active', () => {
|
||||
// Should not throw or cause issues
|
||||
expect(() => service.clearPostSyncCooldown()).not.toThrow();
|
||||
expect(service.isInSyncWindow()).toBeFalse();
|
||||
});
|
||||
|
||||
it('should handle endApplyingRemoteOps when not started', () => {
|
||||
// Should not throw or cause issues
|
||||
expect(service.isApplyingRemoteOps()).toBeFalse();
|
||||
expect(() => service.endApplyingRemoteOps()).not.toThrow();
|
||||
expect(service.isApplyingRemoteOps()).toBeFalse();
|
||||
expect(getIsApplyingRemoteOps()).toBeFalse();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue