From 936f374bdeea8a697d830fe569a0f5c042fe257b Mon Sep 17 00:00:00 2001 From: Johannes Millan Date: Fri, 2 Jan 2026 13:48:22 +0100 Subject: [PATCH] refactor(sync): address code review findings for operation log Phase 1 - Fix archive circular dependency: - Create ArchiveDbAdapter for direct IndexedDB access to archive data - Update ArchiveOperationHandler to use ArchiveDbAdapter instead of lazy-injecting PfapiService, breaking the circular dependency chain - Update tests to mock ArchiveDbAdapter Phase 2 - Address testing gaps: - Add 3 timeout exhaustion tests for compaction service (25s timeout) - Add 5 clock skew edge case tests for conflict resolution: - Far future/past timestamps - Zero and negative timestamps - Client ID tie-breaker for identical timestamps Phase 3 - Document cross-version migration (A.7.11): - Add comprehensive implementation guide in architecture doc covering: - When to bump CURRENT_SCHEMA_VERSION - Operation transformation strategy - Conflict detection across versions - Backward compatibility guarantees - Migration rollout strategy - Example migration and testing requirements All 1399 op-log tests pass. --- docs/sync-and-op-log/README.md | 2 +- .../operation-log-architecture.md | 196 +++++++++++++++++- .../persistence/archive-db-adapter.service.ts | 142 +++++++++++++ .../archive-operation-handler.service.spec.ts | 121 +++++------ .../archive-operation-handler.service.ts | 94 ++++----- .../operation-log-compaction.service.spec.ts | 70 +++++++ .../sync/conflict-resolution.service.spec.ts | 171 +++++++++++++++ 7 files changed, 667 insertions(+), 129 deletions(-) create mode 100644 src/app/core/persistence/archive-db-adapter.service.ts diff --git a/docs/sync-and-op-log/README.md b/docs/sync-and-op-log/README.md index c220d95fb..e830d8bad 100644 --- a/docs/sync-and-op-log/README.md +++ b/docs/sync-and-op-log/README.md @@ -128,7 +128,7 @@ This prevents duplicate side effects when syncing operations from other clients. | Server Sync (Part C) | ✅ Complete (single-version) | | Validation & Repair (Part D) | ✅ Complete | | End-to-End Encryption | ✅ Complete (AES-256-GCM + Argon2id) | -| Cross-version Sync (A.7.11) | ⚠️ Not implemented | +| Cross-version Sync (A.7.11) | 📋 Documented (not yet implemented) | | Schema Migrations | ✅ Infrastructure ready (no migrations defined yet) | See [operation-log-architecture.md#implementation-status](./operation-log-architecture.md#implementation-status) for detailed status. diff --git a/docs/sync-and-op-log/operation-log-architecture.md b/docs/sync-and-op-log/operation-log-architecture.md index ed063b7b0..36db35b08 100644 --- a/docs/sync-and-op-log/operation-log-architecture.md +++ b/docs/sync-and-op-log/operation-log-architecture.md @@ -1,8 +1,8 @@ # Operation Log Architecture -**Status:** Parts A, B, C, D Complete (single-version; cross-version sync requires A.7.11) +**Status:** Parts A, B, C, D Complete (single-version; cross-version sync A.7.11 documented, not implemented) **Branch:** `feat/operation-logs` -**Last Updated:** December 27, 2025 +**Last Updated:** January 2, 2026 --- @@ -807,6 +807,198 @@ No, not yet. It provides the bridge from older versions of the app to the Operat **Required before:** Any schema migration that renames/removes fields. +### A.7.11 Cross-Version Sync Implementation Guide + +> **Status:** Not yet implemented. This section documents the design for when `CURRENT_SCHEMA_VERSION > 1`. + +This guide provides the implementation roadmap for supporting sync between clients on different schema versions. + +#### When to Bump CURRENT_SCHEMA_VERSION + +Bump the schema version when: + +| Change Type | Bump Version? | Reason | +| ------------------------------- | ------------- | -------------------------------------------------------------------- | +| Add optional field with default | ✅ Yes | Old clients won't set it; new clients need to know to apply defaults | +| Rename field | ✅ Yes | Operations need payload transformation | +| Remove field/feature | ✅ Yes | Operations may reference removed entities | +| Change field type | ✅ Yes | Payload values need conversion | +| Add new entity type | ✅ Yes | Old snapshots need initialization | +| Add new action type | ❌ No | Old clients ignore unknown actions | +| Bug fix in reducer | ❌ No | Not a schema change | + +**Decision rule:** If the change affects how `state_cache` snapshots or operation payloads are structured, bump the version. + +#### Operation Transformation Strategy + +When receiving operations from older versions: + +```typescript +// In SchemaMigrationService.migrateOperation() +async migrateOperation(op: Operation): Promise { + const opVersion = op.schemaVersion ?? 1; + + if (opVersion >= CURRENT_SCHEMA_VERSION) { + return op; // Already current + } + + // Run through migration chain + let migratedPayload = op.payload; + for (let v = opVersion; v < CURRENT_SCHEMA_VERSION; v++) { + const migration = MIGRATIONS.find(m => m.fromVersion === v); + if (migration?.migrateOperation) { + const result = migration.migrateOperation(op.actionType, migratedPayload); + if (result === null) { + // Operation should be dropped (removed feature) + return null; + } + migratedPayload = result; + } + } + + return { + ...op, + payload: migratedPayload, + schemaVersion: CURRENT_SCHEMA_VERSION, + }; +} +``` + +#### Conflict Detection Across Versions + +The migration shield ensures conflict detection always compares apples-to-apples: + +``` +Remote Op (v1) Local Op (v2) + │ │ + ▼ │ +┌─────────────────┐ │ +│ Migration Layer │ │ +│ (v1 → v2) │ │ +└────────┬────────┘ │ + │ │ + ▼ ▼ + ┌────────────────────────────┐ + │ Conflict Detection │ + │ (Both ops now v2) │ + └────────────────────────────┘ +``` + +**Key invariant:** Operations are ALWAYS migrated to current version BEFORE conflict detection. This ensures: + +- Vector clock comparison is valid (same logical schema) +- LWW timestamp comparison is fair (same field semantics) +- Entity IDs are comparable (no renamed references) + +#### Backward Compatibility Guarantees + +| Scenario | Behavior | User Experience | +| ------------------------------------------ | ---------------------------------------------------- | ------------------------- | +| Newer client → Older client | Ops uploaded as-is; older client migrates on receive | Seamless | +| Older client → Newer client | Newer client migrates incoming ops | Seamless | +| Client too old (> MAX_VERSION_SKIP behind) | Reject ops, prompt update | "Please update app" modal | +| Client too new (server rejects) | N/A - server doesn't validate schema | No issue | + +**MAX_VERSION_SKIP = 5**: Clients more than 5 versions behind cannot sync until updated. This bounds the migration chain complexity. + +#### Migration Rollout Strategy + +When deploying a schema migration: + +1. **Release new version with migration code** + + - Add migration to `MIGRATIONS` array + - Bump `CURRENT_SCHEMA_VERSION` + - Migration handles both state and operations + +2. **Graceful degradation period** + + - Old clients continue working (they don't know about new schema) + - New clients migrate incoming old ops seamlessly + - Mixed-version sync works via receiver-side migration + +3. **Monitoring** (future) + + - Track `op.schemaVersion` distribution in server logs + - Alert if many clients are > 2 versions behind + +4. **Cleanup** (optional, after many versions) + - Remove migrations for versions < `MIN_SUPPORTED_SCHEMA_VERSION` + - Update `MIN_SUPPORTED_SCHEMA_VERSION` + - Old clients will see "update required" prompt + +#### Example Migration: Renaming a Field + +```typescript +// packages/shared-schema/src/migrations.ts +export const MIGRATIONS: SchemaMigration[] = [ + { + fromVersion: 1, + toVersion: 2, + description: 'Rename task.estimate to task.timeEstimate', + + // Migrate state snapshot + migrateState: (state: unknown): unknown => { + const s = state as AppDataComplete; + return { + ...s, + task: { + ...s.task, + entities: Object.fromEntries( + Object.entries(s.task.entities).map(([id, task]) => [ + id, + { + ...task, + timeEstimate: (task as any).estimate, // Copy old field + estimate: undefined, // Remove old field + }, + ]), + ), + }, + }; + }, + + // Migrate operation payload + requiresOperationMigration: true, + migrateOperation: (actionType: string, payload: unknown): unknown | null => { + if (actionType.includes('[Task]') && payload && typeof payload === 'object') { + const p = payload as Record; + if ('estimate' in p) { + return { + ...p, + timeEstimate: p.estimate, + estimate: undefined, + }; + } + } + return payload; // No change for other actions + }, + }, +]; +``` + +#### Testing Cross-Version Sync + +Before releasing any migration: + +1. **Unit tests** in `schema-migration.service.spec.ts`: + + - State migration correctness + - Operation migration correctness + - Null return for dropped operations + +2. **Integration tests** in `cross-version-sync.integration.spec.ts`: + + - Client A (v1) syncs with Client B (v2) + - Both clients converge to same state + - No data loss during migration + +3. **E2E tests** (manual or automated): + - Install old app version, create data + - Update to new version + - Verify data migrated correctly + - Sync with another device on new version + --- # Part B: Legacy Sync Bridge diff --git a/src/app/core/persistence/archive-db-adapter.service.ts b/src/app/core/persistence/archive-db-adapter.service.ts new file mode 100644 index 000000000..c74acdefd --- /dev/null +++ b/src/app/core/persistence/archive-db-adapter.service.ts @@ -0,0 +1,142 @@ +import { Injectable } from '@angular/core'; +import { DBSchema, IDBPDatabase, openDB } from 'idb'; +import { ArchiveModel } from '../../features/time-tracking/time-tracking.model'; +import { PFLog } from '../log'; + +/** + * Database key constants for archive storage. + */ +const DB_KEY_ARCHIVE_YOUNG = 'archiveYoung' as const; +const DB_KEY_ARCHIVE_OLD = 'archiveOld' as const; + +/** + * Database configuration matching PFAPI's IndexedDbAdapter. + * The 'pf' database with 'main' object store is shared with PFAPI. + */ +const DB_NAME = 'pf'; +const DB_MAIN_NAME = 'main'; +const DB_VERSION = 1; + +/** + * Minimal schema for the PFAPI database. + * We only access archive keys, but the database may contain other data. + */ +interface PfapiDb extends DBSchema { + [DB_MAIN_NAME]: { + key: string; + value: unknown; + }; +} + +/** + * Low-level IndexedDB adapter for archive storage. + * + * ## Purpose + * + * This service provides direct IndexedDB access to archive data (archiveYoung, archiveOld) + * WITHOUT going through PfapiService. This breaks the circular dependency: + * + * ``` + * DataInitService → OperationLogHydratorService → OperationApplierService + * → ArchiveOperationHandler → [THIS SERVICE instead of PfapiService] + * ``` + * + * ## Database Sharing + * + * This service opens a connection to the same 'pf' database that PFAPI uses. + * IndexedDB supports multiple connections to the same database, and since we + * only use this for archive operations that specify `isIgnoreDBLock: true`, + * there's no conflict with PFAPI's lock mechanism. + * + * ## Usage + * + * Used by `ArchiveOperationHandler` for: + * - `_handleFlushYoungToOld()`: Reading/writing archiveYoung and archiveOld + * - `_handleLoadAllData()`: Writing archive data from SYNC_IMPORT/BACKUP_IMPORT + * + * @see src/app/op-log/apply/archive-operation-handler.service.ts + */ +@Injectable({ + providedIn: 'root', +}) +export class ArchiveDbAdapter { + private _db?: IDBPDatabase; + private _initPromise?: Promise; + + /** + * Initializes the database connection. + * Safe to call multiple times - subsequent calls return the same promise. + */ + async init(): Promise { + if (this._initPromise) { + return this._initPromise; + } + + this._initPromise = this._doInit(); + return this._initPromise; + } + + private async _doInit(): Promise { + try { + // Open connection to existing PFAPI database + // Note: We don't create stores here - they're created by PFAPI + this._db = await openDB(DB_NAME, DB_VERSION, { + // No upgrade needed - PFAPI handles schema creation + // If this is called before PFAPI, the database won't have the store yet + // but that's fine because ArchiveOperationHandler is only called after data init + }); + PFLog.normal('[ArchiveDbAdapter] Database connection initialized'); + } catch (e) { + PFLog.err('[ArchiveDbAdapter] Failed to initialize database', e); + this._initPromise = undefined; // Allow retry + throw e; + } + } + + /** + * Ensures the database is initialized before use. + */ + private async _ensureDb(): Promise> { + if (!this._db) { + await this.init(); + } + if (!this._db) { + throw new Error('[ArchiveDbAdapter] Database not initialized'); + } + return this._db; + } + + /** + * Loads archiveYoung data from IndexedDB. + */ + async loadArchiveYoung(): Promise { + const db = await this._ensureDb(); + const data = await db.get(DB_MAIN_NAME, DB_KEY_ARCHIVE_YOUNG); + return data as ArchiveModel | undefined; + } + + /** + * Saves archiveYoung data to IndexedDB. + */ + async saveArchiveYoung(data: ArchiveModel): Promise { + const db = await this._ensureDb(); + await db.put(DB_MAIN_NAME, data, DB_KEY_ARCHIVE_YOUNG); + } + + /** + * Loads archiveOld data from IndexedDB. + */ + async loadArchiveOld(): Promise { + const db = await this._ensureDb(); + const data = await db.get(DB_MAIN_NAME, DB_KEY_ARCHIVE_OLD); + return data as ArchiveModel | undefined; + } + + /** + * Saves archiveOld data to IndexedDB. + */ + async saveArchiveOld(data: ArchiveModel): Promise { + const db = await this._ensureDb(); + await db.put(DB_MAIN_NAME, data, DB_KEY_ARCHIVE_OLD); + } +} diff --git a/src/app/op-log/apply/archive-operation-handler.service.spec.ts b/src/app/op-log/apply/archive-operation-handler.service.spec.ts index 9ecd9d0c4..26e768c94 100644 --- a/src/app/op-log/apply/archive-operation-handler.service.spec.ts +++ b/src/app/op-log/apply/archive-operation-handler.service.spec.ts @@ -6,7 +6,6 @@ import { import { PersistentAction } from '../core/persistent-action.interface'; import { ArchiveService } from '../../features/time-tracking/archive.service'; import { TaskArchiveService } from '../../features/time-tracking/task-archive.service'; -import { PfapiService } from '../../pfapi/pfapi.service'; import { Task, TaskWithSubTasks } from '../../features/tasks/task.model'; import { ArchiveModel } from '../../features/time-tracking/time-tracking.model'; import { TaskSharedActions } from '../../root-store/meta/task-shared.actions'; @@ -14,6 +13,7 @@ import { flushYoungToOld } from '../../features/time-tracking/store/archive.acti import { deleteTag, deleteTags } from '../../features/tag/store/tag.actions'; import { TimeTrackingService } from '../../features/time-tracking/time-tracking.service'; import { loadAllData } from '../../root-store/meta/load-all-data.action'; +import { ArchiveDbAdapter } from '../../core/persistence/archive-db-adapter.service'; describe('isArchiveAffectingAction', () => { it('should return true for moveToArchive action', () => { @@ -91,7 +91,7 @@ describe('ArchiveOperationHandler', () => { let service: ArchiveOperationHandler; let mockArchiveService: jasmine.SpyObj; let mockTaskArchiveService: jasmine.SpyObj; - let mockPfapiService: jasmine.SpyObj; + let mockArchiveDbAdapter: jasmine.SpyObj; let mockTimeTrackingService: jasmine.SpyObj; const createMockTaskWithSubTasks = ( @@ -136,22 +136,21 @@ describe('ArchiveOperationHandler', () => { 'cleanupDataEverywhereForProject', 'cleanupArchiveDataForTag', ]); - mockPfapiService = jasmine.createSpyObj('PfapiService', [], { - m: { - archiveYoung: { - load: jasmine - .createSpy('load') - .and.returnValue(Promise.resolve(createEmptyArchiveModel())), - save: jasmine.createSpy('save').and.returnValue(Promise.resolve()), - }, - archiveOld: { - load: jasmine - .createSpy('load') - .and.returnValue(Promise.resolve(createEmptyArchiveModel())), - save: jasmine.createSpy('save').and.returnValue(Promise.resolve()), - }, - }, - }); + mockArchiveDbAdapter = jasmine.createSpyObj('ArchiveDbAdapter', [ + 'loadArchiveYoung', + 'loadArchiveOld', + 'saveArchiveYoung', + 'saveArchiveOld', + ]); + // Default returns for ArchiveDbAdapter + mockArchiveDbAdapter.loadArchiveYoung.and.returnValue( + Promise.resolve(createEmptyArchiveModel()), + ); + mockArchiveDbAdapter.loadArchiveOld.and.returnValue( + Promise.resolve(createEmptyArchiveModel()), + ); + mockArchiveDbAdapter.saveArchiveYoung.and.returnValue(Promise.resolve()); + mockArchiveDbAdapter.saveArchiveOld.and.returnValue(Promise.resolve()); // Set up default resolved promises mockArchiveService.writeTasksToArchiveForRemoteSync.and.returnValue( @@ -181,7 +180,7 @@ describe('ArchiveOperationHandler', () => { ArchiveOperationHandler, { provide: ArchiveService, useValue: mockArchiveService }, { provide: TaskArchiveService, useValue: mockTaskArchiveService }, - { provide: PfapiService, useValue: mockPfapiService }, + { provide: ArchiveDbAdapter, useValue: mockArchiveDbAdapter }, { provide: TimeTrackingService, useValue: mockTimeTrackingService }, ], }); @@ -477,8 +476,8 @@ describe('ArchiveOperationHandler', () => { } // Verify it tried to load archives (confirming action was recognized) - expect(mockPfapiService.m.archiveYoung.load).toHaveBeenCalled(); - expect(mockPfapiService.m.archiveOld.load).toHaveBeenCalled(); + expect(mockArchiveDbAdapter.loadArchiveYoung).toHaveBeenCalled(); + expect(mockArchiveDbAdapter.loadArchiveOld).toHaveBeenCalled(); }); }); @@ -809,9 +808,8 @@ describe('ArchiveOperationHandler', () => { await service.handleOperation(action); - expect(mockPfapiService.m.archiveYoung.save).toHaveBeenCalledWith( + expect(mockArchiveDbAdapter.saveArchiveYoung).toHaveBeenCalledWith( archiveYoungData, - { isUpdateRevAndLastUpdate: false, isIgnoreDBLock: true }, ); }); @@ -826,10 +824,7 @@ describe('ArchiveOperationHandler', () => { await service.handleOperation(action); - expect(mockPfapiService.m.archiveOld.save).toHaveBeenCalledWith(archiveOldData, { - isUpdateRevAndLastUpdate: false, - isIgnoreDBLock: true, - }); + expect(mockArchiveDbAdapter.saveArchiveOld).toHaveBeenCalledWith(archiveOldData); }); it('should write both archiveYoung and archiveOld for remote operations', async () => { @@ -844,14 +839,10 @@ describe('ArchiveOperationHandler', () => { await service.handleOperation(action); - expect(mockPfapiService.m.archiveYoung.save).toHaveBeenCalledWith( + expect(mockArchiveDbAdapter.saveArchiveYoung).toHaveBeenCalledWith( archiveYoungData, - { isUpdateRevAndLastUpdate: false, isIgnoreDBLock: true }, ); - expect(mockPfapiService.m.archiveOld.save).toHaveBeenCalledWith(archiveOldData, { - isUpdateRevAndLastUpdate: false, - isIgnoreDBLock: true, - }); + expect(mockArchiveDbAdapter.saveArchiveOld).toHaveBeenCalledWith(archiveOldData); }); it('should NOT write archive for local operations (already done by PfapiService)', async () => { @@ -865,8 +856,8 @@ describe('ArchiveOperationHandler', () => { await service.handleOperation(action); - expect(mockPfapiService.m.archiveYoung.save).not.toHaveBeenCalled(); - expect(mockPfapiService.m.archiveOld.save).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.saveArchiveYoung).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.saveArchiveOld).not.toHaveBeenCalled(); }); it('should NOT write archive when isRemote is undefined (treated as local)', async () => { @@ -880,8 +871,8 @@ describe('ArchiveOperationHandler', () => { await service.handleOperation(action); - expect(mockPfapiService.m.archiveYoung.save).not.toHaveBeenCalled(); - expect(mockPfapiService.m.archiveOld.save).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.saveArchiveYoung).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.saveArchiveOld).not.toHaveBeenCalled(); }); it('should handle missing archiveYoung gracefully', async () => { @@ -895,11 +886,8 @@ describe('ArchiveOperationHandler', () => { await service.handleOperation(action); - expect(mockPfapiService.m.archiveYoung.save).not.toHaveBeenCalled(); - expect(mockPfapiService.m.archiveOld.save).toHaveBeenCalledWith(archiveOldData, { - isUpdateRevAndLastUpdate: false, - isIgnoreDBLock: true, - }); + expect(mockArchiveDbAdapter.saveArchiveYoung).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.saveArchiveOld).toHaveBeenCalledWith(archiveOldData); }); it('should handle missing archiveOld gracefully', async () => { @@ -913,11 +901,10 @@ describe('ArchiveOperationHandler', () => { await service.handleOperation(action); - expect(mockPfapiService.m.archiveYoung.save).toHaveBeenCalledWith( + expect(mockArchiveDbAdapter.saveArchiveYoung).toHaveBeenCalledWith( archiveYoungData, - { isUpdateRevAndLastUpdate: false, isIgnoreDBLock: true }, ); - expect(mockPfapiService.m.archiveOld.save).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.saveArchiveOld).not.toHaveBeenCalled(); }); it('should handle empty appDataComplete gracefully', async () => { @@ -929,8 +916,8 @@ describe('ArchiveOperationHandler', () => { await service.handleOperation(action); - expect(mockPfapiService.m.archiveYoung.save).not.toHaveBeenCalled(); - expect(mockPfapiService.m.archiveOld.save).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.saveArchiveYoung).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.saveArchiveOld).not.toHaveBeenCalled(); }); it('should preserve timeTracking data in archive', async () => { @@ -951,9 +938,8 @@ describe('ArchiveOperationHandler', () => { await service.handleOperation(action); - const savedData = ( - mockPfapiService.m.archiveYoung.save as jasmine.Spy - ).calls.mostRecent().args[0]; + const savedData = + mockArchiveDbAdapter.saveArchiveYoung.calls.mostRecent().args[0]; expect(savedData.timeTracking.project.proj1.date20240115.s).toBe(3600000); expect(savedData.timeTracking.tag.tag1.date20240115.s).toBe(1800000); expect(savedData.lastTimeTrackingFlush).toBe(1234567890); @@ -1003,7 +989,7 @@ describe('ArchiveOperationHandler', () => { expect( mockTaskArchiveService.unlinkIssueProviderFromArchiveTasks, ).not.toHaveBeenCalled(); - expect(mockPfapiService.m.archiveYoung.load).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.loadArchiveYoung).not.toHaveBeenCalled(); }); it('should not throw for unknown action types', async () => { @@ -1083,9 +1069,7 @@ describe('ArchiveOperationHandler', () => { it('should propagate errors from archive load in flushYoungToOld', async () => { const error = new Error('Load failed'); - (mockPfapiService.m.archiveYoung.load as jasmine.Spy).and.returnValue( - Promise.reject(error), - ); + mockArchiveDbAdapter.loadArchiveYoung.and.returnValue(Promise.reject(error)); const timestamp = Date.now(); const action = { @@ -1273,21 +1257,21 @@ describe('ArchiveOperationHandler', () => { } as unknown as PersistentAction; // Reset spies to verify nothing is called - (mockPfapiService.m.archiveYoung.load as jasmine.Spy).calls.reset(); - (mockPfapiService.m.archiveYoung.save as jasmine.Spy).calls.reset(); - (mockPfapiService.m.archiveOld.load as jasmine.Spy).calls.reset(); - (mockPfapiService.m.archiveOld.save as jasmine.Spy).calls.reset(); + mockArchiveDbAdapter.loadArchiveYoung.calls.reset(); + mockArchiveDbAdapter.saveArchiveYoung.calls.reset(); + mockArchiveDbAdapter.loadArchiveOld.calls.reset(); + mockArchiveDbAdapter.saveArchiveOld.calls.reset(); await service.handleOperation(action); // Verify NO archive operations were performed for local actions - expect(mockPfapiService.m.archiveYoung.load).not.toHaveBeenCalled(); - expect(mockPfapiService.m.archiveYoung.save).not.toHaveBeenCalled(); - expect(mockPfapiService.m.archiveOld.load).not.toHaveBeenCalled(); - expect(mockPfapiService.m.archiveOld.save).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.loadArchiveYoung).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.saveArchiveYoung).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.loadArchiveOld).not.toHaveBeenCalled(); + expect(mockArchiveDbAdapter.saveArchiveOld).not.toHaveBeenCalled(); }); - it('should pass isIgnoreDBLock: true for remote operations', async () => { + it('should use ArchiveDbAdapter for remote operations', async () => { const timestamp = Date.now(); const action = { type: flushYoungToOld.type, @@ -1301,14 +1285,9 @@ describe('ArchiveOperationHandler', () => { // Expected - sort function returns undefined in tests } - // Verify save was called with isIgnoreDBLock: true - const saveCall = ( - mockPfapiService.m.archiveYoung.save as jasmine.Spy - ).calls.mostRecent(); - if (saveCall) { - const options = saveCall.args[1]; - expect(options.isIgnoreDBLock).toBe(true); - } + // Verify ArchiveDbAdapter was used for loading archives + expect(mockArchiveDbAdapter.loadArchiveYoung).toHaveBeenCalled(); + expect(mockArchiveDbAdapter.loadArchiveOld).toHaveBeenCalled(); }); }); }); diff --git a/src/app/op-log/apply/archive-operation-handler.service.ts b/src/app/op-log/apply/archive-operation-handler.service.ts index ce5c51988..f747bde3e 100644 --- a/src/app/op-log/apply/archive-operation-handler.service.ts +++ b/src/app/op-log/apply/archive-operation-handler.service.ts @@ -10,7 +10,6 @@ import { } from '../../features/time-tracking/store/archive.actions'; import { ArchiveService } from '../../features/time-tracking/archive.service'; import { TaskArchiveService } from '../../features/time-tracking/task-archive.service'; -import { PfapiService } from '../../pfapi/pfapi.service'; import { sortTimeTrackingAndTasksFromArchiveYoungToOld } from '../../features/time-tracking/sort-data-to-flush'; import { ARCHIVE_TASK_YOUNG_TO_OLD_THRESHOLD } from '../../features/time-tracking/archive.service'; import { OpLog } from '../../core/log'; @@ -20,6 +19,17 @@ import { TimeTrackingService } from '../../features/time-tracking/time-tracking. import { ArchiveCompressionService } from '../../features/time-tracking/archive-compression.service'; import { loadAllData } from '../../root-store/meta/load-all-data.action'; import { ArchiveModel } from '../../features/time-tracking/time-tracking.model'; +import { ArchiveDbAdapter } from '../../core/persistence/archive-db-adapter.service'; + +/** + * Creates an empty ArchiveModel with default values. + * Used when archive has never been written (first-time usage). + */ +const createEmptyArchiveModel = (): ArchiveModel => ({ + task: { ids: [], entities: {} }, + timeTracking: { project: {}, tag: {} }, + lastTimeTrackingFlush: 0, +}); /** * Action types that affect archive storage and require special handling. @@ -91,28 +101,25 @@ export const isArchiveAffectingAction = (action: Action): action is PersistentAc }) export class ArchiveOperationHandler { // ═══════════════════════════════════════════════════════════════════════════ - // ARCHITECTURAL DEBT: Lazy Injection for Circular Dependencies + // DEPENDENCY INJECTION NOTES // ═══════════════════════════════════════════════════════════════════════════ // - // These services use lazyInject() to break circular dependency chains: + // Some services use lazyInject() to break circular dependency chains: // DataInitService -> OperationLogHydratorService -> OperationApplierService // -> ArchiveOperationHandler -> ArchiveService/TaskArchiveService -> PfapiService // DataInitService also injects PfapiService directly, causing the cycle. // - // POTENTIAL REFACTORING APPROACHES: - // 1. Extract archive storage operations into a lower-level service that doesn't - // depend on PfapiService directly, only on the database adapter - // 2. Create an event-based notification system where archive operations emit - // events and a dedicated handler picks them up (decouples dependencies) - // 3. Move archive storage into NgRx state instead of IndexedDB (would require - // significant architectural changes and increase memory usage) + // ArchiveDbAdapter is used for direct IndexedDB access to break the PfapiService + // dependency for archive operations (_handleFlushYoungToOld, _handleLoadAllData). + // This avoids the circular dependency while providing the same functionality. // - // For now, lazyInject works correctly and the pattern is well-documented. + // Other services still use lazyInject because they have their own complex + // dependency chains through PfapiService that would require deeper refactoring. // ═══════════════════════════════════════════════════════════════════════════ private _injector = inject(Injector); + private _archiveDbAdapter = inject(ArchiveDbAdapter); private _getArchiveService = lazyInject(this._injector, ArchiveService); private _getTaskArchiveService = lazyInject(this._injector, TaskArchiveService); - private _getPfapiService = lazyInject(this._injector, PfapiService); private _getTimeTrackingService = lazyInject(this._injector, TimeTrackingService); private _getArchiveCompressionService = lazyInject( this._injector, @@ -276,7 +283,7 @@ export class ArchiveOperationHandler { * it will produce the same result on all clients. * * @localBehavior SKIP - Flush performed by ArchiveService.moveTasksToArchiveAndFlushArchiveIfDue() before dispatch - * @remoteBehavior Executes - Performs flush with isIgnoreDBLock (sync has DB locked) + * @remoteBehavior Executes - Uses ArchiveDbAdapter for direct IndexedDB access (bypasses PfapiService) */ private async _handleFlushYoungToOld(action: PersistentAction): Promise { if (!action.meta?.isRemote) { @@ -284,11 +291,13 @@ export class ArchiveOperationHandler { } const timestamp = (action as ReturnType).timestamp; - const pfapi = this._getPfapiService(); - // Load original state for potential rollback - const originalArchiveYoung = await pfapi.m.archiveYoung.load(); - const originalArchiveOld = await pfapi.m.archiveOld.load(); + // Load original state for potential rollback using ArchiveDbAdapter + // Default to empty archives if they don't exist (first-time usage) + const originalArchiveYoung = + (await this._archiveDbAdapter.loadArchiveYoung()) ?? createEmptyArchiveModel(); + const originalArchiveOld = + (await this._archiveDbAdapter.loadArchiveOld()) ?? createEmptyArchiveModel(); const newSorted = sortTimeTrackingAndTasksFromArchiveYoungToOld({ archiveYoung: originalArchiveYoung, @@ -298,27 +307,15 @@ export class ArchiveOperationHandler { }); try { - await pfapi.m.archiveYoung.save( - { - ...newSorted.archiveYoung, - lastTimeTrackingFlush: timestamp, - }, - { - isUpdateRevAndLastUpdate: true, - isIgnoreDBLock: true, // Remote ops: DB is locked during sync processing - }, - ); + await this._archiveDbAdapter.saveArchiveYoung({ + ...newSorted.archiveYoung, + lastTimeTrackingFlush: timestamp, + }); - await pfapi.m.archiveOld.save( - { - ...newSorted.archiveOld, - lastTimeTrackingFlush: timestamp, - }, - { - isUpdateRevAndLastUpdate: true, - isIgnoreDBLock: true, // Remote ops: DB is locked during sync processing - }, - ); + await this._archiveDbAdapter.saveArchiveOld({ + ...newSorted.archiveOld, + lastTimeTrackingFlush: timestamp, + }); } catch (e) { // Attempt rollback: restore BOTH archiveYoung and archiveOld to original state OpLog.err('Archive flush failed, attempting rollback...', e); @@ -327,10 +324,7 @@ export class ArchiveOperationHandler { // Rollback archiveYoung try { if (originalArchiveYoung) { - await pfapi.m.archiveYoung.save(originalArchiveYoung, { - isUpdateRevAndLastUpdate: true, - isIgnoreDBLock: true, - }); + await this._archiveDbAdapter.saveArchiveYoung(originalArchiveYoung); } } catch (rollbackErr) { rollbackErrors.push(rollbackErr as Error); @@ -339,10 +333,7 @@ export class ArchiveOperationHandler { // Rollback archiveOld try { if (originalArchiveOld) { - await pfapi.m.archiveOld.save(originalArchiveOld, { - isUpdateRevAndLastUpdate: true, - isIgnoreDBLock: true, - }); + await this._archiveDbAdapter.saveArchiveOld(originalArchiveOld); } } catch (rollbackErr) { rollbackErrors.push(rollbackErr as Error); @@ -485,7 +476,7 @@ export class ArchiveOperationHandler { * data to IndexedDB on remote client, causing data loss on restart. * * @localBehavior SKIP - Archive written by PfapiService._updateModelCtrlCaches() - * @remoteBehavior Executes - Writes archiveYoung/archiveOld to IndexedDB + * @remoteBehavior Executes - Uses ArchiveDbAdapter for direct IndexedDB access (bypasses PfapiService) */ private async _handleLoadAllData(action: PersistentAction): Promise { if (!action.meta?.isRemote) { @@ -494,25 +485,18 @@ export class ArchiveOperationHandler { const loadAllDataAction = action as unknown as ReturnType; const appDataComplete = loadAllDataAction.appDataComplete; - const pfapi = this._getPfapiService(); // Write archiveYoung if present in the import data const archiveYoung = (appDataComplete as { archiveYoung?: ArchiveModel }) .archiveYoung; if (archiveYoung !== undefined) { - await pfapi.m.archiveYoung.save(archiveYoung, { - isUpdateRevAndLastUpdate: false, // Preserve rev from import - isIgnoreDBLock: true, - }); + await this._archiveDbAdapter.saveArchiveYoung(archiveYoung); } // Write archiveOld if present in the import data const archiveOld = (appDataComplete as { archiveOld?: ArchiveModel }).archiveOld; if (archiveOld !== undefined) { - await pfapi.m.archiveOld.save(archiveOld, { - isUpdateRevAndLastUpdate: false, - isIgnoreDBLock: true, - }); + await this._archiveDbAdapter.saveArchiveOld(archiveOld); } OpLog.log( diff --git a/src/app/op-log/store/operation-log-compaction.service.spec.ts b/src/app/op-log/store/operation-log-compaction.service.spec.ts index c00370b70..c44812eba 100644 --- a/src/app/op-log/store/operation-log-compaction.service.spec.ts +++ b/src/app/op-log/store/operation-log-compaction.service.spec.ts @@ -967,4 +967,74 @@ describe('OperationLogCompactionService', () => { expect(callOrder).not.toContain('deleteOpsWhere'); }); }); + + // ========================================================================= + // Compaction timeout tests + // ========================================================================= + // These tests verify compaction handles timeout scenarios gracefully + // to prevent data corruption from lock expiration. + + describe('compaction timeout handling', () => { + it('should throw error when compaction exceeds timeout', async () => { + // Simulate slow state retrieval that exceeds the 25s timeout + const originalDateNow = Date.now; + let callCount = 0; + + // Mock Date.now to simulate time passing during compaction + spyOn(Date, 'now').and.callFake(() => { + callCount++; + // First call is startTime, subsequent calls simulate 26s elapsed + if (callCount === 1) { + return 0; + } + return 26000; // 26 seconds - exceeds 25s timeout + }); + + // Make state retrieval trigger a timeout check + mockStoreDelegate.getAllSyncModelDataFromStore.and.callFake(async () => { + // This will trigger timeout check after "26 seconds" + return mockState; + }); + + await expectAsync(service.compact()).toBeRejectedWithError( + /Compaction timeout after.*Aborting to prevent lock expiration/, + ); + + // Restore original Date.now + (Date.now as jasmine.Spy).and.callFake(originalDateNow); + }); + + it('should not throw when compaction completes within timeout', async () => { + // Normal operation should complete without timeout + mockStoreDelegate.getAllSyncModelDataFromStore.and.returnValue( + Promise.resolve(mockState), + ); + + await expectAsync(service.compact()).toBeResolved(); + }); + + it('should include phase info in timeout error message', async () => { + let callCount = 0; + + spyOn(Date, 'now').and.callFake(() => { + callCount++; + if (callCount === 1) { + return 0; + } + return 26000; + }); + + mockStoreDelegate.getAllSyncModelDataFromStore.and.callFake(async () => { + return mockState; + }); + + try { + await service.compact(); + fail('Expected error to be thrown'); + } catch (e: any) { + expect(e.message).toContain('during'); + expect(e.message).toContain('Consider reducing state size'); + } + }); + }); }); diff --git a/src/app/op-log/sync/conflict-resolution.service.spec.ts b/src/app/op-log/sync/conflict-resolution.service.spec.ts index 40c62369a..db0e6bcb9 100644 --- a/src/app/op-log/sync/conflict-resolution.service.spec.ts +++ b/src/app/op-log/sync/conflict-resolution.service.spec.ts @@ -1763,4 +1763,175 @@ describe('ConflictResolutionService', () => { expect(mockSnackService.open).toHaveBeenCalled(); }); }); + + // ========================================================================= + // Clock skew edge cases + // ========================================================================= + // These tests verify LWW conflict resolution handles edge cases where + // clients have significant clock differences. + + describe('clock skew edge cases', () => { + const ONE_YEAR_MS = 365 * 24 * 60 * 60 * 1000; + + const createOpWithTimestamp = ( + id: string, + clientId: string, + timestamp: number, + opType: OpType = OpType.Update, + entityId: string = 'task-1', + ): Operation => ({ + id, + actionType: '[Task] Update Task' as ActionType, + opType, + entityType: 'TASK', + entityId, + payload: { source: clientId, timestamp }, + clientId, + timestamp, + vectorClock: { [clientId]: 1 }, + schemaVersion: 1, + }); + + it('should handle timestamps in far future (client clock ahead)', async () => { + const now = Date.now(); + const futureTime = now + ONE_YEAR_MS; // 1 year in future + + const conflicts: EntityConflict[] = [ + { + entityType: 'TASK', + entityId: 'task-1', + localOps: [createOpWithTimestamp('local-1', 'client-a', now)], + remoteOps: [createOpWithTimestamp('remote-1', 'client-b', futureTime)], + suggestedResolution: 'remote', + }, + ]; + + mockOpLogStore.hasOp.and.resolveTo(false); + mockOpLogStore.append.and.resolveTo(1); + mockOpLogStore.markApplied.and.resolveTo(undefined); + mockOpLogStore.markRejected.and.resolveTo(undefined); + mockOperationApplier.applyOperations.and.resolveTo({ + appliedOps: [conflicts[0].remoteOps[0]], + }); + + // Remote wins because its timestamp is newer (even if unrealistic) + await service.autoResolveConflictsLWW(conflicts); + + expect(mockOpLogStore.append).toHaveBeenCalledWith( + jasmine.objectContaining({ id: 'remote-1' }), + 'remote', + jasmine.any(Object), + ); + }); + + it('should handle timestamps in far past (client clock behind)', async () => { + const now = Date.now(); + const pastTime = now - ONE_YEAR_MS; // 1 year in past + + const conflicts: EntityConflict[] = [ + { + entityType: 'TASK', + entityId: 'task-1', + localOps: [createOpWithTimestamp('local-1', 'client-a', now)], + remoteOps: [createOpWithTimestamp('remote-1', 'client-b', pastTime)], + suggestedResolution: 'local', + }, + ]; + + mockOpLogStore.hasOp.and.resolveTo(false); + mockOpLogStore.append.and.resolveTo(1); + mockOpLogStore.markApplied.and.resolveTo(undefined); + mockOpLogStore.markRejected.and.resolveTo(undefined); + mockOperationApplier.applyOperations.and.resolveTo({ appliedOps: [] }); + + // Local wins because its timestamp is newer + await service.autoResolveConflictsLWW(conflicts); + + expect(mockOpLogStore.markRejected).toHaveBeenCalledWith(['remote-1']); + }); + + it('should handle zero timestamps gracefully', async () => { + const now = Date.now(); + + const conflicts: EntityConflict[] = [ + { + entityType: 'TASK', + entityId: 'task-1', + localOps: [createOpWithTimestamp('local-1', 'client-a', now)], + remoteOps: [createOpWithTimestamp('remote-1', 'client-b', 0)], + suggestedResolution: 'local', + }, + ]; + + mockOpLogStore.hasOp.and.resolveTo(false); + mockOpLogStore.append.and.resolveTo(1); + mockOpLogStore.markApplied.and.resolveTo(undefined); + mockOpLogStore.markRejected.and.resolveTo(undefined); + mockOperationApplier.applyOperations.and.resolveTo({ appliedOps: [] }); + + // Local wins (0 is earlier than now) + await service.autoResolveConflictsLWW(conflicts); + + expect(mockOpLogStore.markRejected).toHaveBeenCalledWith(['remote-1']); + }); + + it('should handle negative timestamps gracefully (system clock errors)', async () => { + const now = Date.now(); + + const conflicts: EntityConflict[] = [ + { + entityType: 'TASK', + entityId: 'task-1', + localOps: [createOpWithTimestamp('local-1', 'client-a', now)], + remoteOps: [createOpWithTimestamp('remote-1', 'client-b', -1000)], + suggestedResolution: 'local', + }, + ]; + + mockOpLogStore.hasOp.and.resolveTo(false); + mockOpLogStore.append.and.resolveTo(1); + mockOpLogStore.markApplied.and.resolveTo(undefined); + mockOpLogStore.markRejected.and.resolveTo(undefined); + mockOperationApplier.applyOperations.and.resolveTo({ appliedOps: [] }); + + // Should not throw, local wins + await service.autoResolveConflictsLWW(conflicts); + + expect(mockOpLogStore.markRejected).toHaveBeenCalledWith(['remote-1']); + }); + + it('should use client ID as stable tie-breaker for identical timestamps', async () => { + const now = Date.now(); + + // Both have exactly the same timestamp + const conflicts: EntityConflict[] = [ + { + entityType: 'TASK', + entityId: 'task-1', + // client-a < client-b alphabetically, but we test that remote wins on tie + localOps: [createOpWithTimestamp('local-1', 'client-a', now)], + remoteOps: [createOpWithTimestamp('remote-1', 'client-b', now)], + suggestedResolution: 'remote', // Remote wins on tie + }, + ]; + + mockOpLogStore.hasOp.and.resolveTo(false); + mockOpLogStore.append.and.resolveTo(1); + mockOpLogStore.markApplied.and.resolveTo(undefined); + mockOpLogStore.markRejected.and.resolveTo(undefined); + mockOperationApplier.applyOperations.and.resolveTo({ + appliedOps: [conflicts[0].remoteOps[0]], + }); + + await service.autoResolveConflictsLWW(conflicts); + + // Remote should win on tie + expect(mockOpLogStore.append).toHaveBeenCalledWith( + jasmine.objectContaining({ id: 'remote-1' }), + 'remote', + jasmine.any(Object), + ); + expect(mockOpLogStore.markRejected).toHaveBeenCalledWith(['local-1']); + }); + }); });