feat(oplog): add persistent compaction counter and syncedAt index

- Store compaction counter in state_cache to share across tabs/restarts
- Add bySyncedAt index for faster getUnsynced() queries
- Remove in-memory opsSinceCompaction counter from effects
- Reset counter in compaction service after successful compaction
This commit is contained in:
Johannes Millan 2025-12-03 20:11:13 +01:00
parent 108a37962c
commit 6818ae8d9f
4 changed files with 75 additions and 21 deletions

View file

@ -1562,14 +1562,14 @@ When IndexedDB storage quota is exceeded:
### Compaction Trigger Coordination
**Status:** Potential Race Condition
**Status:** Implemented ✅
The 500-ops compaction trigger is checked per-tab:
The 500-ops compaction trigger uses a persistent counter stored in `state_cache.compactionCounter`:
- **Risk**: Multiple tabs could each count 500 ops and trigger compaction simultaneously
- **Current mitigation**: Web Locks prevent concurrent compaction execution
- **Gap**: No shared counter means compaction may trigger more frequently than intended
- **Proposed solution**: Store `opsSinceCompaction` in IndexedDB, read/increment atomically
- Counter is shared across tabs via IndexedDB
- Counter persists across app restarts
- Counter is reset after successful compaction
- Web Locks still prevent concurrent compaction execution
## Data Integrity Edge Cases
@ -1615,6 +1615,8 @@ What if data exists in both `pf` AND `SUP_OPS` databases?
- **Migration safety backup (A.7.12)** - Creates backup before migration, restores on failure
- **Tail ops migration (A.7.13)** - Migrates operations during hydration before replay
- **Unified migration interface (A.7.15)** - `SchemaMigration` includes both `migrateState` and optional `migrateOperation`
- **Persistent compaction counter** - Counter stored in `state_cache`, shared across tabs/restarts
- **`syncedAt` index** - Index on ops store for faster `getUnsynced()` queries
### Not Implemented ⚠️
@ -1664,13 +1666,13 @@ The following are **documented designs**, not implemented code:
## Future Enhancements 🔮
| Component | Description | Priority |
| --------------------- | -------------------------------------------- | -------- |
| Auto-merge | Automatic merge for non-conflicting fields | Low |
| Undo/Redo | Leverage op-log for undo history | Low |
| IndexedDB index | Index on `syncedAt` for faster getUnsynced() | Low |
| Persistent compaction | Track ops since compaction across restarts | Low |
| Quota handling | Graceful degradation on storage exhaustion | Medium |
| Component | Description | Priority |
| -------------- | ------------------------------------------ | -------- |
| Auto-merge | Automatic merge for non-conflicting fields | Low |
| Undo/Redo | Leverage op-log for undo history | Low |
| Quota handling | Graceful degradation on storage exhaustion | Medium |
> **Recently Completed:** `syncedAt` index (for faster getUnsynced()) and persistent compaction counter (tracks ops across tabs/restarts) are now implemented.
---

View file

@ -36,7 +36,10 @@ export class OperationLogCompactionService {
schemaVersion: CURRENT_SCHEMA_VERSION,
});
// 4. Delete old operations (keep recent for conflict resolution window)
// 4. Reset compaction counter (persistent across tabs/restarts)
await this.opLogStore.resetCompactionCounter();
// 5. Delete old operations (keep recent for conflict resolution window)
// Retention: 7 days - keeps enough history for conflict detection
// Only delete ops that have been synced to remote
const retentionWindowMs = 7 * 24 * 60 * 60 * 1000; // 7 days

View file

@ -11,6 +11,7 @@ interface OpLogDB extends DBSchema {
value: OperationLogEntry;
indexes: {
byId: string;
bySyncedAt: number | undefined;
};
};
state_cache: {
@ -22,6 +23,7 @@ interface OpLogDB extends DBSchema {
vectorClock: VectorClock;
compactedAt: number;
schemaVersion?: number;
compactionCounter?: number; // Tracks ops since last compaction (persistent)
};
};
}
@ -48,6 +50,7 @@ export class OperationLogStoreService {
autoIncrement: true,
});
opStore.createIndex('byId', 'op.id', { unique: true });
opStore.createIndex('bySyncedAt', 'syncedAt');
db.createObjectStore('state_cache', { keyPath: 'id' });
},
@ -294,6 +297,54 @@ export class OperationLogStoreService {
}
}
// ============================================================
// Persistent Compaction Counter
// ============================================================
/**
* Gets the current compaction counter value.
* Returns 0 if no counter exists yet.
*/
async getCompactionCounter(): Promise<number> {
await this._ensureInit();
const cache = await this.db.get('state_cache', 'current');
return cache?.compactionCounter ?? 0;
}
/**
* Increments the compaction counter and returns the new value.
* Used to track operations since last compaction across tabs/restarts.
*/
async incrementCompactionCounter(): Promise<number> {
await this._ensureInit();
const cache = await this.db.get('state_cache', 'current');
if (!cache) {
// No state cache yet - counter starts at 1
return 1;
}
const newCount = (cache.compactionCounter ?? 0) + 1;
await this.db.put('state_cache', {
...cache,
compactionCounter: newCount,
});
return newCount;
}
/**
* Resets the compaction counter to 0.
* Called after successful compaction.
*/
async resetCompactionCounter(): Promise<void> {
await this._ensureInit();
const cache = await this.db.get('state_cache', 'current');
if (cache) {
await this.db.put('state_cache', {
...cache,
compactionCounter: 0,
});
}
}
async getCurrentVectorClock(): Promise<VectorClock> {
await this._ensureInit();
// We need the max vector clock from cache + subsequent ops.

View file

@ -30,7 +30,6 @@ const MAX_COMPACTION_FAILURES = 3;
@Injectable()
export class OperationLogEffects {
private clientId?: string;
private opsSinceCompaction = 0;
private compactionFailures = 0;
private actions$ = inject(Actions);
private lockService = inject(LockService);
@ -122,11 +121,11 @@ export class OperationLogEffects {
this.multiTabCoordinator.notifyNewOperation(op);
});
// 4. Check if compaction is needed
this.opsSinceCompaction++;
if (this.opsSinceCompaction >= COMPACTION_THRESHOLD) {
// 4. Check if compaction is needed (persistent counter across tabs/restarts)
const opsCount = await this.opLogStore.incrementCompactionCounter();
if (opsCount >= COMPACTION_THRESHOLD) {
// Trigger compaction asynchronously (don't block write operation)
// Counter is reset inside triggerCompaction() on success
// Counter is reset in compaction service on success
this.triggerCompaction();
}
} catch (e) {
@ -140,14 +139,13 @@ export class OperationLogEffects {
* Triggers compaction asynchronously without blocking the main operation.
* This is called after COMPACTION_THRESHOLD operations have been written.
* Tracks failures and notifies user after MAX_COMPACTION_FAILURES consecutive failures.
* Counter is reset by compaction service on success.
*/
private triggerCompaction(): void {
PFLog.normal('OperationLogEffects: Triggering compaction...');
this.compactionService
.compact()
.then(() => {
// Only reset counter on successful compaction
this.opsSinceCompaction = 0;
this.compactionFailures = 0;
})
.catch((e) => {