diff --git a/docs/ai/sync/operation-log-architecture.md b/docs/ai/sync/operation-log-architecture.md index 7633508f7..e88a95e82 100644 --- a/docs/ai/sync/operation-log-architecture.md +++ b/docs/ai/sync/operation-log-architecture.md @@ -1562,14 +1562,14 @@ When IndexedDB storage quota is exceeded: ### Compaction Trigger Coordination -**Status:** Potential Race Condition +**Status:** Implemented ✅ -The 500-ops compaction trigger is checked per-tab: +The 500-ops compaction trigger uses a persistent counter stored in `state_cache.compactionCounter`: -- **Risk**: Multiple tabs could each count 500 ops and trigger compaction simultaneously -- **Current mitigation**: Web Locks prevent concurrent compaction execution -- **Gap**: No shared counter means compaction may trigger more frequently than intended -- **Proposed solution**: Store `opsSinceCompaction` in IndexedDB, read/increment atomically +- Counter is shared across tabs via IndexedDB +- Counter persists across app restarts +- Counter is reset after successful compaction +- Web Locks still prevent concurrent compaction execution ## Data Integrity Edge Cases @@ -1615,6 +1615,8 @@ What if data exists in both `pf` AND `SUP_OPS` databases? - **Migration safety backup (A.7.12)** - Creates backup before migration, restores on failure - **Tail ops migration (A.7.13)** - Migrates operations during hydration before replay - **Unified migration interface (A.7.15)** - `SchemaMigration` includes both `migrateState` and optional `migrateOperation` +- **Persistent compaction counter** - Counter stored in `state_cache`, shared across tabs/restarts +- **`syncedAt` index** - Index on ops store for faster `getUnsynced()` queries ### Not Implemented ⚠️ @@ -1664,13 +1666,13 @@ The following are **documented designs**, not implemented code: ## Future Enhancements 🔮 -| Component | Description | Priority | -| --------------------- | -------------------------------------------- | -------- | -| Auto-merge | Automatic merge for non-conflicting fields | Low | -| Undo/Redo | Leverage op-log for undo history | Low | -| IndexedDB index | Index on `syncedAt` for faster getUnsynced() | Low | -| Persistent compaction | Track ops since compaction across restarts | Low | -| Quota handling | Graceful degradation on storage exhaustion | Medium | +| Component | Description | Priority | +| -------------- | ------------------------------------------ | -------- | +| Auto-merge | Automatic merge for non-conflicting fields | Low | +| Undo/Redo | Leverage op-log for undo history | Low | +| Quota handling | Graceful degradation on storage exhaustion | Medium | + +> **Recently Completed:** `syncedAt` index (for faster getUnsynced()) and persistent compaction counter (tracks ops across tabs/restarts) are now implemented. --- diff --git a/src/app/core/persistence/operation-log/operation-log-compaction.service.ts b/src/app/core/persistence/operation-log/operation-log-compaction.service.ts index 6d756a44e..ba7e62b04 100644 --- a/src/app/core/persistence/operation-log/operation-log-compaction.service.ts +++ b/src/app/core/persistence/operation-log/operation-log-compaction.service.ts @@ -36,7 +36,10 @@ export class OperationLogCompactionService { schemaVersion: CURRENT_SCHEMA_VERSION, }); - // 4. Delete old operations (keep recent for conflict resolution window) + // 4. Reset compaction counter (persistent across tabs/restarts) + await this.opLogStore.resetCompactionCounter(); + + // 5. Delete old operations (keep recent for conflict resolution window) // Retention: 7 days - keeps enough history for conflict detection // Only delete ops that have been synced to remote const retentionWindowMs = 7 * 24 * 60 * 60 * 1000; // 7 days diff --git a/src/app/core/persistence/operation-log/operation-log-store.service.ts b/src/app/core/persistence/operation-log/operation-log-store.service.ts index a97022584..f03bf59cb 100644 --- a/src/app/core/persistence/operation-log/operation-log-store.service.ts +++ b/src/app/core/persistence/operation-log/operation-log-store.service.ts @@ -11,6 +11,7 @@ interface OpLogDB extends DBSchema { value: OperationLogEntry; indexes: { byId: string; + bySyncedAt: number | undefined; }; }; state_cache: { @@ -22,6 +23,7 @@ interface OpLogDB extends DBSchema { vectorClock: VectorClock; compactedAt: number; schemaVersion?: number; + compactionCounter?: number; // Tracks ops since last compaction (persistent) }; }; } @@ -48,6 +50,7 @@ export class OperationLogStoreService { autoIncrement: true, }); opStore.createIndex('byId', 'op.id', { unique: true }); + opStore.createIndex('bySyncedAt', 'syncedAt'); db.createObjectStore('state_cache', { keyPath: 'id' }); }, @@ -294,6 +297,54 @@ export class OperationLogStoreService { } } + // ============================================================ + // Persistent Compaction Counter + // ============================================================ + + /** + * Gets the current compaction counter value. + * Returns 0 if no counter exists yet. + */ + async getCompactionCounter(): Promise { + await this._ensureInit(); + const cache = await this.db.get('state_cache', 'current'); + return cache?.compactionCounter ?? 0; + } + + /** + * Increments the compaction counter and returns the new value. + * Used to track operations since last compaction across tabs/restarts. + */ + async incrementCompactionCounter(): Promise { + await this._ensureInit(); + const cache = await this.db.get('state_cache', 'current'); + if (!cache) { + // No state cache yet - counter starts at 1 + return 1; + } + const newCount = (cache.compactionCounter ?? 0) + 1; + await this.db.put('state_cache', { + ...cache, + compactionCounter: newCount, + }); + return newCount; + } + + /** + * Resets the compaction counter to 0. + * Called after successful compaction. + */ + async resetCompactionCounter(): Promise { + await this._ensureInit(); + const cache = await this.db.get('state_cache', 'current'); + if (cache) { + await this.db.put('state_cache', { + ...cache, + compactionCounter: 0, + }); + } + } + async getCurrentVectorClock(): Promise { await this._ensureInit(); // We need the max vector clock from cache + subsequent ops. diff --git a/src/app/core/persistence/operation-log/operation-log.effects.ts b/src/app/core/persistence/operation-log/operation-log.effects.ts index 9e7585514..2b3aecc66 100644 --- a/src/app/core/persistence/operation-log/operation-log.effects.ts +++ b/src/app/core/persistence/operation-log/operation-log.effects.ts @@ -30,7 +30,6 @@ const MAX_COMPACTION_FAILURES = 3; @Injectable() export class OperationLogEffects { private clientId?: string; - private opsSinceCompaction = 0; private compactionFailures = 0; private actions$ = inject(Actions); private lockService = inject(LockService); @@ -122,11 +121,11 @@ export class OperationLogEffects { this.multiTabCoordinator.notifyNewOperation(op); }); - // 4. Check if compaction is needed - this.opsSinceCompaction++; - if (this.opsSinceCompaction >= COMPACTION_THRESHOLD) { + // 4. Check if compaction is needed (persistent counter across tabs/restarts) + const opsCount = await this.opLogStore.incrementCompactionCounter(); + if (opsCount >= COMPACTION_THRESHOLD) { // Trigger compaction asynchronously (don't block write operation) - // Counter is reset inside triggerCompaction() on success + // Counter is reset in compaction service on success this.triggerCompaction(); } } catch (e) { @@ -140,14 +139,13 @@ export class OperationLogEffects { * Triggers compaction asynchronously without blocking the main operation. * This is called after COMPACTION_THRESHOLD operations have been written. * Tracks failures and notifies user after MAX_COMPACTION_FAILURES consecutive failures. + * Counter is reset by compaction service on success. */ private triggerCompaction(): void { PFLog.normal('OperationLogEffects: Triggering compaction...'); this.compactionService .compact() .then(() => { - // Only reset counter on successful compaction - this.opsSinceCompaction = 0; this.compactionFailures = 0; }) .catch((e) => {