mirror of
https://github.com/johannesjo/super-productivity.git
synced 2026-01-23 02:36:05 +00:00
docs(op-log): update architecture doc and fix compaction service
- Update architecture doc to reflect actual implementation state: - Replace action blacklist with isPersistent pattern - Mark Part C (server sync) as complete (was incorrectly "not started") - Update file reference section with all current files - Update compaction config to show 7-day retention window - Remove obsolete "models requiring migration" section - Fix compaction service: - Use PfapiStoreDelegateService for consistency - Add schemaVersion to state cache saves
This commit is contained in:
parent
f4df0731e3
commit
94c1123c1f
2 changed files with 327 additions and 212 deletions
|
|
@ -1,8 +1,8 @@
|
|||
# Operation Log Architecture
|
||||
|
||||
**Status:** Part A/B Complete (100%), Part C Not Started
|
||||
**Status:** Parts A, B, C Implemented
|
||||
**Branch:** `feat/operation-logs`
|
||||
**Last Updated:** December 2, 2025
|
||||
**Last Updated:** December 3, 2025
|
||||
|
||||
---
|
||||
|
||||
|
|
@ -10,18 +10,18 @@
|
|||
|
||||
The Operation Log serves **three distinct purposes**:
|
||||
|
||||
| Purpose | Description | Status |
|
||||
| ------------------------- | --------------------------------------------- | ------ |
|
||||
| **A. Local Persistence** | Fast writes, crash recovery, event sourcing | Active |
|
||||
| **B. Legacy Sync Bridge** | Vector clock updates for PFAPI sync detection | Active |
|
||||
| **C. Server Sync** | Upload/download individual operations | Future |
|
||||
| Purpose | Description | Status |
|
||||
| ------------------------- | --------------------------------------------- | ----------- |
|
||||
| **A. Local Persistence** | Fast writes, crash recovery, event sourcing | Complete ✅ |
|
||||
| **B. Legacy Sync Bridge** | Vector clock updates for PFAPI sync detection | Complete ✅ |
|
||||
| **C. Server Sync** | Upload/download individual operations | Complete ✅ |
|
||||
|
||||
This document is structured around these three purposes. Most complexity lives in **Part A** (local persistence). **Part B** is a thin bridge to PFAPI. **Part C** is future work.
|
||||
This document is structured around these three purposes. Most complexity lives in **Part A** (local persistence). **Part B** is a thin bridge to PFAPI. **Part C** handles operation-based sync with servers.
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────────┐
|
||||
│ User Action │
|
||||
└────────────────────────────┬────────────────────────────────────────┘
|
||||
└────────────────────────────────────────────────────────────────────┘
|
||||
▼
|
||||
NgRx Store
|
||||
(Runtime Source of Truth)
|
||||
|
|
@ -60,7 +60,7 @@ interface OperationLogEntry {
|
|||
op: Operation; // The operation
|
||||
appliedAt: number; // When applied locally
|
||||
source: 'local' | 'remote';
|
||||
syncedAt?: number; // For future server sync (Part C)
|
||||
syncedAt?: number; // For server sync (Part C)
|
||||
}
|
||||
|
||||
// state_cache table - periodic snapshots
|
||||
|
|
@ -106,7 +106,11 @@ NgRx Dispatch (action)
|
|||
│
|
||||
└──► OperationLogEffects
|
||||
│
|
||||
├──► Filter: Is action blacklisted? → Skip
|
||||
├──► Filter: action.meta.isPersistent === true?
|
||||
│ └──► Skip if false or missing
|
||||
│
|
||||
├──► Filter: action.meta.isRemote === true?
|
||||
│ └──► Skip (prevents re-logging sync/replay)
|
||||
│
|
||||
├──► Convert action to Operation
|
||||
│
|
||||
|
|
@ -137,22 +141,36 @@ interface Operation {
|
|||
type OpType = 'CRT' | 'UPD' | 'DEL' | 'MOV' | 'BATCH' | 'SYNC_IMPORT' | 'BACKUP_IMPORT';
|
||||
```
|
||||
|
||||
### Action Blacklist
|
||||
### Persistent Action Pattern
|
||||
|
||||
UI-only actions are excluded from persistence:
|
||||
Actions are persisted based on explicit `meta.isPersistent: true`:
|
||||
|
||||
```typescript
|
||||
// action-blacklist.ts
|
||||
export const BLACKLISTED_ACTION_TYPES: Set<string> = new Set([
|
||||
'[Layout] Toggle Sidebar',
|
||||
'[Task] SetCurrentTask',
|
||||
'[Task] SetSelectedTask',
|
||||
'[Task] UnsetCurrentTask',
|
||||
'[Task] Update Task Ui',
|
||||
// ... other transient UI actions
|
||||
]);
|
||||
// persistent-action.interface.ts
|
||||
export interface PersistentActionMeta {
|
||||
isPersistent?: boolean; // When true, action is persisted
|
||||
entityType: EntityType;
|
||||
entityId?: string;
|
||||
entityIds?: string[]; // For batch operations
|
||||
opType: OpType;
|
||||
isRemote?: boolean; // TRUE if from Sync (prevents re-logging)
|
||||
isBulk?: boolean; // TRUE for batch operations
|
||||
}
|
||||
|
||||
// Type guard - only actions with explicit isPersistent: true are persisted
|
||||
export const isPersistentAction = (action: Action): action is PersistentAction => {
|
||||
const a = action as PersistentAction;
|
||||
return !!a.meta && a.meta.isPersistent === true;
|
||||
};
|
||||
```
|
||||
|
||||
Actions that should NOT be persisted:
|
||||
|
||||
- UI-only actions (selectedTaskId, currentTaskId, toggle sidebar, etc.)
|
||||
- Load/hydration actions (data already in log)
|
||||
- Upsert actions (typically from sync/import)
|
||||
- Internal cleanup actions
|
||||
|
||||
## A.3 Read Path (Hydration)
|
||||
|
||||
```
|
||||
|
|
@ -213,36 +231,42 @@ Without compaction, the op log grows unbounded. Compaction:
|
|||
```typescript
|
||||
async compact(): Promise<void> {
|
||||
// 1. Acquire lock
|
||||
await this.lockService.acquireCompactionLock();
|
||||
await this.lockService.request('sp_op_log_compact', async () => {
|
||||
// 2. Read current state from NgRx (via delegate)
|
||||
const currentState = await this.storeDelegate.getAllSyncModelDataFromStore();
|
||||
|
||||
// 2. Read current state from NgRx
|
||||
const currentState = await this.storeDelegateService.getAllSyncModelDataFromStore();
|
||||
// 3. Save new snapshot
|
||||
const lastSeq = await this.opLogStore.getLastSeq();
|
||||
await this.opLogStore.saveStateCache({
|
||||
state: currentState,
|
||||
lastAppliedOpSeq: lastSeq,
|
||||
vectorClock: await this.opLogStore.getCurrentVectorClock(),
|
||||
compactedAt: Date.now(),
|
||||
schemaVersion: CURRENT_SCHEMA_VERSION
|
||||
});
|
||||
|
||||
// 3. Save new snapshot
|
||||
const lastSeq = await this.opLogStore.getLastSeq();
|
||||
await this.opLogStore.saveStateCache({
|
||||
state: currentState,
|
||||
lastAppliedOpSeq: lastSeq,
|
||||
savedAt: Date.now(),
|
||||
schemaVersion: CURRENT_SCHEMA_VERSION
|
||||
// 4. Delete old ops (sync-aware)
|
||||
// Only delete ops that have been synced AND are older than retention window
|
||||
const retentionWindowMs = 7 * 24 * 60 * 60 * 1000; // 7 days
|
||||
const cutoff = Date.now() - retentionWindowMs;
|
||||
|
||||
await this.opLogStore.deleteOpsWhere(
|
||||
(entry) =>
|
||||
!!entry.syncedAt && // never drop unsynced ops
|
||||
entry.appliedAt < cutoff &&
|
||||
entry.seq <= lastSeq
|
||||
);
|
||||
});
|
||||
|
||||
// 4. Delete old ops
|
||||
// For local-only: delete all ops before snapshot
|
||||
// For server sync (Part C): keep unsynced ops
|
||||
await this.opLogStore.deleteOpsBefore(lastSeq - RETENTION_BUFFER);
|
||||
|
||||
// 5. Release lock
|
||||
this.lockService.releaseCompactionLock();
|
||||
}
|
||||
```
|
||||
|
||||
### Configuration
|
||||
|
||||
| Setting | Value | Description |
|
||||
| ------------------ | ------- | -------------------------- |
|
||||
| Compaction trigger | 500 ops | Ops before snapshot |
|
||||
| Retention buffer | 100 ops | Keep recent ops for safety |
|
||||
| Setting | Value | Description |
|
||||
| ------------------ | ------- | ------------------------- |
|
||||
| Compaction trigger | 500 ops | Ops before snapshot |
|
||||
| Retention window | 7 days | Keep recent synced ops |
|
||||
| Unsynced ops | ∞ | Never delete unsynced ops |
|
||||
|
||||
## A.5 Multi-Tab Coordination
|
||||
|
||||
|
|
@ -267,12 +291,13 @@ When one tab writes an operation:
|
|||
|
||||
```typescript
|
||||
// Tab A writes
|
||||
this.broadcastChannel.postMessage({ type: 'OP_WRITTEN', op });
|
||||
this.broadcastChannel.postMessage({ type: 'NEW_OP', op });
|
||||
|
||||
// Tab B receives
|
||||
this.broadcastChannel.onmessage = (event) => {
|
||||
if (event.data.type === 'OP_WRITTEN') {
|
||||
this.applyOperation(event.data.op, { isRemote: true });
|
||||
if (event.data.type === 'NEW_OP') {
|
||||
const action = convertOpToAction(event.data.op); // Sets isRemote: true
|
||||
this.store.dispatch(action);
|
||||
}
|
||||
};
|
||||
```
|
||||
|
|
@ -283,9 +308,11 @@ this.broadcastChannel.onmessage = (event) => {
|
|||
|
||||
```
|
||||
1. Detect: Hydration fails or returns empty/invalid state
|
||||
2. Check remote sync for data
|
||||
3. If remote has data: Force sync download
|
||||
4. If all else fails: User must restore from backup
|
||||
2. Check legacy 'pf' database for data
|
||||
3. If found: Run recovery migration with that data
|
||||
4. If not: Check remote sync for data
|
||||
5. If remote has data: Force sync download
|
||||
6. If all else fails: User must restore from backup
|
||||
```
|
||||
|
||||
### Implementation
|
||||
|
|
@ -306,9 +333,9 @@ async hydrateStore(): Promise<void> {
|
|||
|
||||
private async attemptRecovery(): Promise<void> {
|
||||
// 1. Try legacy database
|
||||
const legacyData = await this.pfapi.getAllSyncModelData();
|
||||
const legacyData = await this.pfapi.getAllSyncModelDataFromModelCtrls();
|
||||
if (legacyData && this.hasData(legacyData)) {
|
||||
await this.runGenesisMigration(legacyData);
|
||||
await this.recoverFromLegacyData(legacyData);
|
||||
return;
|
||||
}
|
||||
// 2. Try remote sync
|
||||
|
|
@ -343,14 +370,17 @@ Delete old ops (baked into v2 snapshot)
|
|||
|
||||
```typescript
|
||||
const MIGRATIONS: SchemaMigration[] = [
|
||||
{
|
||||
fromVersion: 1,
|
||||
toVersion: 2,
|
||||
migrate: (state) => ({
|
||||
...state,
|
||||
task: migrateTasksV1ToV2(state.task),
|
||||
}),
|
||||
},
|
||||
// No migrations yet - schema version 1 is initial
|
||||
// Add migrations here as needed:
|
||||
// {
|
||||
// fromVersion: 1,
|
||||
// toVersion: 2,
|
||||
// description: 'Add new field to tasks',
|
||||
// migrate: (state) => ({
|
||||
// ...state,
|
||||
// task: migrateTasksV1ToV2(state.task),
|
||||
// }),
|
||||
// },
|
||||
];
|
||||
|
||||
async migrateIfNeeded(snapshot: StateCache): Promise<StateCache> {
|
||||
|
|
@ -411,7 +441,7 @@ private async writeOperation(op: Operation): Promise<void> {
|
|||
await this.opLogStore.appendOperation(op);
|
||||
|
||||
// 2. Bridge to PFAPI (Part B)
|
||||
await this.pfapiService.pf.metaModel.incrementVectorClock(this.clientId);
|
||||
await this.pfapiService.pf.metaModel.incrementVectorClockForLocalChange(this.clientId);
|
||||
|
||||
// 3. Broadcast to other tabs (Part A)
|
||||
this.multiTabCoordinator.broadcastOperation(op);
|
||||
|
|
@ -426,32 +456,33 @@ This ensures:
|
|||
|
||||
## B.3 Sync Download Persistence
|
||||
|
||||
When PFAPI downloads remote data, it dispatches `loadAllData`. The op-log must persist this:
|
||||
When PFAPI downloads remote data, the hydrator persists it to SUP_OPS:
|
||||
|
||||
```typescript
|
||||
// In OperationLogEffects
|
||||
handleLoadAllData$ = createEffect(
|
||||
() =>
|
||||
this.actions$.pipe(
|
||||
ofType(loadAllData),
|
||||
filter((action) => action.meta?.isRemoteSync || action.meta?.isBackupImport),
|
||||
tap(async (action) => {
|
||||
// Create SYNC_IMPORT operation
|
||||
const op: Operation = {
|
||||
id: uuidv7(),
|
||||
opType: 'SYNC_IMPORT',
|
||||
entityType: 'ALL',
|
||||
payload: action.appDataComplete,
|
||||
// ...
|
||||
};
|
||||
await this.opLogStore.appendOperation(op);
|
||||
async hydrateFromRemoteSync(): Promise<void> {
|
||||
// 1. Read synced data from 'pf' database
|
||||
const syncedData = await this.pfapiService.pf.getAllSyncModelDataFromModelCtrls();
|
||||
|
||||
// Force snapshot for crash safety
|
||||
await this.compactionService.forceSnapshot();
|
||||
}),
|
||||
),
|
||||
{ dispatch: false },
|
||||
);
|
||||
// 2. Create SYNC_IMPORT operation
|
||||
const op: Operation = {
|
||||
id: uuidv7(),
|
||||
opType: 'SYNC_IMPORT',
|
||||
entityType: 'ALL',
|
||||
payload: syncedData,
|
||||
// ...
|
||||
};
|
||||
await this.opLogStore.append(op, 'remote');
|
||||
|
||||
// 3. Force snapshot for crash safety
|
||||
await this.opLogStore.saveStateCache({
|
||||
state: syncedData,
|
||||
lastAppliedOpSeq: lastSeq,
|
||||
// ...
|
||||
});
|
||||
|
||||
// 4. Dispatch to NgRx
|
||||
this.store.dispatch(loadAllData({ appDataComplete: syncedData }));
|
||||
}
|
||||
```
|
||||
|
||||
### loadAllData Variants
|
||||
|
|
@ -477,51 +508,42 @@ This service reads ALL sync models from NgRx for PFAPI:
|
|||
```typescript
|
||||
@Injectable({ providedIn: 'root' })
|
||||
export class PfapiStoreDelegateService {
|
||||
async getAllSyncModelDataFromStore(): Promise<AllSyncModels> {
|
||||
// Read NgRx state for models already in store
|
||||
const ngrxData = await firstValueFrom(
|
||||
getAllSyncModelDataFromStore(): Promise<AllSyncModels> {
|
||||
return firstValueFrom(
|
||||
combineLatest([
|
||||
this._store.select(selectTaskFeatureState),
|
||||
this._store.select(selectProjectFeatureState),
|
||||
this._store.select(selectTagFeatureState),
|
||||
// ... all NgRx models
|
||||
]),
|
||||
this._store.select(selectConfigFeatureState),
|
||||
this._store.select(selectNoteFeatureState),
|
||||
this._store.select(selectIssueProviderState),
|
||||
this._store.select(selectPlannerState),
|
||||
this._store.select(selectBoardsState),
|
||||
this._store.select(selectMetricFeatureState),
|
||||
this._store.select(selectSimpleCounterFeatureState),
|
||||
this._store.select(selectTaskRepeatCfgFeatureState),
|
||||
this._store.select(selectMenuTreeState),
|
||||
this._store.select(selectTimeTrackingState),
|
||||
this._store.select(selectImprovementFeatureState),
|
||||
this._store.select(selectObstructionFeatureState),
|
||||
this._store.select(selectPluginUserDataFeatureState),
|
||||
this._store.select(selectPluginMetadataFeatureState),
|
||||
this._store.select(selectReminderFeatureState),
|
||||
this._store.select(selectArchiveYoungFeatureState),
|
||||
this._store.select(selectArchiveOldFeatureState),
|
||||
]).pipe(first(), map(/* combine into AllSyncModels */)),
|
||||
);
|
||||
|
||||
// For models not yet in NgRx, load from pf database
|
||||
// (These need migration - see "Models Requiring Migration")
|
||||
const nonNgrxData = await Promise.all([
|
||||
this._modelCtrls.reminders.load(),
|
||||
this._modelCtrls.archiveYoung.load(),
|
||||
// ...
|
||||
]);
|
||||
|
||||
return { ...ngrxData, ...nonNgrxData };
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## B.5 Models Requiring NgRx Migration
|
||||
|
||||
These models bypass NgRx and must be migrated before the op-log system is complete:
|
||||
|
||||
| Model | Current State | Migration Required |
|
||||
| ---------------- | ------------------------- | ------------------ |
|
||||
| `reminders` | Direct `ModelCtrl.save()` | Add to NgRx |
|
||||
| `archiveYoung` | Direct `ModelCtrl.save()` | Add to NgRx |
|
||||
| `archiveOld` | Direct `ModelCtrl.save()` | Add to NgRx |
|
||||
| `pluginUserData` | Direct `ModelCtrl.save()` | Add to NgRx |
|
||||
| `pluginMetadata` | Direct `ModelCtrl.save()` | Add to NgRx |
|
||||
| `improvement` | Direct `ModelCtrl.save()` | Add to NgRx |
|
||||
| `obstruction` | Direct `ModelCtrl.save()` | Add to NgRx |
|
||||
|
||||
**This is a BLOCKER** - No hybrid persistence modes allowed.
|
||||
All sync models are now in NgRx - no hybrid persistence.
|
||||
|
||||
---
|
||||
|
||||
# Part C: Server Sync (Future)
|
||||
# Part C: Server Sync
|
||||
|
||||
For future server-based sync, the operation log becomes the sync mechanism itself.
|
||||
For server-based sync, the operation log IS the sync mechanism. Individual operations are uploaded/downloaded rather than full state snapshots.
|
||||
|
||||
## C.1 How Server Sync Differs
|
||||
|
||||
|
|
@ -532,82 +554,166 @@ For future server-based sync, the operation log becomes the sync mechanism itsel
|
|||
| Op-log role | Not involved | IS the sync |
|
||||
| `syncedAt` tracking | Not needed | Required |
|
||||
|
||||
## C.2 Additional Infrastructure Needed
|
||||
## C.2 Operation Sync Protocol
|
||||
|
||||
### Per-Op Sync Tracking
|
||||
Providers that support operation sync implement `OperationSyncCapable`:
|
||||
|
||||
```typescript
|
||||
interface OperationLogEntry {
|
||||
// ... existing fields
|
||||
syncedAt?: number; // When uploaded to server (null if pending)
|
||||
interface OperationSyncCapable {
|
||||
supportsOperationSync: true;
|
||||
uploadOps(
|
||||
ops: SyncOperation[],
|
||||
clientId: string,
|
||||
lastKnownSeq: number,
|
||||
): Promise<UploadResponse>;
|
||||
downloadOps(
|
||||
sinceSeq: number,
|
||||
clientId?: string,
|
||||
limit?: number,
|
||||
): Promise<DownloadResponse>;
|
||||
acknowledgeOps(clientId: string, upToSeq: number): Promise<void>;
|
||||
getLastServerSeq(): Promise<number>;
|
||||
setLastServerSeq(seq: number): Promise<void>;
|
||||
}
|
||||
```
|
||||
|
||||
### Sync-Aware Compaction
|
||||
### Upload Flow
|
||||
|
||||
```typescript
|
||||
// Part A (local-only): Delete all ops before snapshot
|
||||
// Part C (server sync): Never delete unsynced ops
|
||||
async compact(): Promise<void> {
|
||||
if (this.isServerSyncEnabled) {
|
||||
// Only delete ops where syncedAt IS NOT NULL
|
||||
await this.opLogStore.deleteOpsWhere({
|
||||
syncedAt: { $ne: null },
|
||||
appliedAt: { $lt: Date.now() - RETENTION_WINDOW }
|
||||
});
|
||||
async uploadPendingOps(syncProvider: OperationSyncCapable): Promise<void> {
|
||||
const pendingOps = await this.opLogStore.getUnsynced();
|
||||
|
||||
// Upload in batches (up to 100 ops per request)
|
||||
for (const chunk of chunkArray(pendingOps, 100)) {
|
||||
const response = await syncProvider.uploadOps(
|
||||
chunk.map(entry => toSyncOperation(entry.op)),
|
||||
clientId,
|
||||
lastKnownServerSeq
|
||||
);
|
||||
|
||||
// Mark accepted ops as synced
|
||||
const acceptedSeqs = response.results
|
||||
.filter(r => r.accepted)
|
||||
.map(r => findEntry(r.opId).seq);
|
||||
await this.opLogStore.markSynced(acceptedSeqs);
|
||||
|
||||
// Process piggybacked new ops from other clients
|
||||
if (response.newOps?.length > 0) {
|
||||
await this.processRemoteOps(response.newOps);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Download Flow
|
||||
|
||||
```typescript
|
||||
async downloadRemoteOps(syncProvider: OperationSyncCapable): Promise<void> {
|
||||
let sinceSeq = await syncProvider.getLastServerSeq();
|
||||
let hasMore = true;
|
||||
|
||||
while (hasMore) {
|
||||
const response = await syncProvider.downloadOps(sinceSeq, undefined, 500);
|
||||
|
||||
// Filter already-applied ops
|
||||
const newOps = response.ops.filter(op => !appliedOpIds.has(op.id));
|
||||
await this.processRemoteOps(newOps);
|
||||
|
||||
sinceSeq = response.ops[response.ops.length - 1].serverSeq;
|
||||
hasMore = response.hasMore;
|
||||
await syncProvider.setLastServerSeq(response.latestSeq);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## C.3 File-Based Sync Fallback
|
||||
|
||||
For providers without API support (WebDAV/Dropbox), operations are synced via files:
|
||||
|
||||
```
|
||||
ops/
|
||||
├── manifest.json
|
||||
├── ops_CLIENT1_1701234567890.json
|
||||
├── ops_CLIENT1_1701234599999.json
|
||||
└── ops_CLIENT2_1701234600000.json
|
||||
```
|
||||
|
||||
The manifest tracks which operation files exist. Each file contains a batch of operations.
|
||||
|
||||
## C.4 Conflict Detection
|
||||
|
||||
Conflicts are detected using vector clocks at the entity level:
|
||||
|
||||
```typescript
|
||||
async detectConflicts(remoteOps: Operation[]): Promise<ConflictResult> {
|
||||
const localPendingByEntity = await this.opLogStore.getUnsyncedByEntity();
|
||||
const appliedFrontierByEntity = await this.opLogStore.getEntityFrontier();
|
||||
|
||||
for (const remoteOp of remoteOps) {
|
||||
const entityKey = `${remoteOp.entityType}:${remoteOp.entityId}`;
|
||||
const localFrontier = mergeClocks(
|
||||
appliedFrontierByEntity.get(entityKey),
|
||||
...localPendingByEntity.get(entityKey)?.map(op => op.vectorClock) || []
|
||||
);
|
||||
|
||||
const comparison = compareVectorClocks(localFrontier, remoteOp.vectorClock);
|
||||
if (comparison === VectorClockComparison.CONCURRENT) {
|
||||
conflicts.push({
|
||||
entityType: remoteOp.entityType,
|
||||
entityId: remoteOp.entityId,
|
||||
localOps: localPendingByEntity.get(entityKey) || [],
|
||||
remoteOps: [remoteOp],
|
||||
suggestedResolution: 'manual'
|
||||
});
|
||||
} else {
|
||||
nonConflicting.push(remoteOp);
|
||||
}
|
||||
}
|
||||
|
||||
return { nonConflicting, conflicts };
|
||||
}
|
||||
```
|
||||
|
||||
## C.5 Conflict Resolution
|
||||
|
||||
Conflicts are presented to the user via `ConflictResolutionService`:
|
||||
|
||||
```typescript
|
||||
async presentConflicts(conflicts: EntityConflict[]): Promise<void> {
|
||||
const dialogRef = this.dialog.open(DialogConflictResolutionComponent, {
|
||||
data: { conflicts },
|
||||
disableClose: true
|
||||
});
|
||||
|
||||
const result = await firstValueFrom(dialogRef.afterClosed());
|
||||
|
||||
if (result.resolution === 'remote') {
|
||||
// Apply remote ops, overwrite local state
|
||||
for (const conflict of conflicts) {
|
||||
await this.operationApplier.applyOperations(conflict.remoteOps);
|
||||
}
|
||||
} else {
|
||||
// Aggressive compaction for local-only
|
||||
await this.opLogStore.deleteOpsBefore(lastSeq - RETENTION_BUFFER);
|
||||
// Keep local ops, ignore remote
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Operation Upload/Download
|
||||
## C.6 Dependency Resolution
|
||||
|
||||
Operations may have dependencies (e.g., subtask requires parent task):
|
||||
|
||||
```typescript
|
||||
// Upload unsynced ops to server
|
||||
async uploadPendingOps(): Promise<void> {
|
||||
const unsyncedOps = await this.opLogStore.getUnsyncedOps();
|
||||
for (const op of unsyncedOps) {
|
||||
await this.serverApi.uploadOperation(op);
|
||||
await this.opLogStore.markSynced(op.id);
|
||||
}
|
||||
interface OperationDependency {
|
||||
entityType: EntityType;
|
||||
entityId: string;
|
||||
mustExist: boolean; // Hard dependency
|
||||
relation: 'parent' | 'reference';
|
||||
}
|
||||
|
||||
// Download remote ops
|
||||
async downloadRemoteOps(sinceSeq: number): Promise<void> {
|
||||
const remoteOps = await this.serverApi.getOperations(sinceSeq);
|
||||
for (const op of remoteOps) {
|
||||
await this.applyOperation(op, { isRemote: true });
|
||||
}
|
||||
}
|
||||
// Operations with missing hard dependencies are queued for retry
|
||||
// After MAX_RETRY_ATTEMPTS (3), they're marked as permanently failed
|
||||
```
|
||||
|
||||
### Entity-Level Conflict Detection
|
||||
|
||||
```typescript
|
||||
// Using per-op vector clocks
|
||||
function detectConflict(localOp: Operation, remoteOp: Operation): boolean {
|
||||
if (localOp.entityId !== remoteOp.entityId) return false;
|
||||
|
||||
const comparison = compareVectorClocks(localOp.vectorClock, remoteOp.vectorClock);
|
||||
return comparison === VectorClockComparison.CONCURRENT;
|
||||
}
|
||||
```
|
||||
|
||||
## C.3 Detailed Architecture
|
||||
|
||||
Server sync is **future work**. For the complete server sync architecture including:
|
||||
|
||||
- REST API and WebSocket protocol design
|
||||
- Conflict detection algorithms (vector clock comparison)
|
||||
- Entity-level merge strategies per model type
|
||||
- Sync-aware compaction rules
|
||||
- Offline handling and recovery
|
||||
- Migration path from legacy sync
|
||||
|
||||
See: **[Server Sync Architecture](./server-sync-architecture.md)**
|
||||
|
||||
---
|
||||
|
||||
# Implementation Status
|
||||
|
|
@ -615,33 +721,33 @@ See: **[Server Sync Architecture](./server-sync-architecture.md)**
|
|||
## Complete ✅
|
||||
|
||||
- SUP_OPS IndexedDB store (ops + state_cache)
|
||||
- NgRx effect capture with vector clock
|
||||
- NgRx effect capture with isPersistent pattern
|
||||
- Snapshot + tail replay hydration
|
||||
- Multi-tab BroadcastChannel coordination
|
||||
- Web Locks + localStorage fallback
|
||||
- Genesis migration from legacy data
|
||||
- `PfapiStoreDelegateService` (reads NgRx for sync)
|
||||
- META_MODEL vector clock update (B.1) - ops now update META_MODEL
|
||||
- Sync download persistence (B.2) - downloads written to SUP_OPS via `hydrateFromRemoteSync()`
|
||||
- Non-NgRx model migration (B.4) - all models (reminders, archives, plugins) now in NgRx
|
||||
- Compaction triggers (A.2) - triggered every 500 operations
|
||||
- Action blacklist audit (A.3) - expanded to 39 UI-only actions
|
||||
- Disaster recovery (A.6) - recovery from legacy 'pf' database on corruption
|
||||
- Schema migration service (A.7) - infrastructure for state migrations
|
||||
- `PfapiStoreDelegateService` (reads all NgRx models for sync)
|
||||
- META_MODEL vector clock update (B.2)
|
||||
- Sync download persistence via `hydrateFromRemoteSync()` (B.3)
|
||||
- All models in NgRx (no hybrid persistence)
|
||||
- Compaction with 7-day retention window
|
||||
- Disaster recovery from legacy 'pf' database
|
||||
- Schema migration service infrastructure
|
||||
- Server sync upload/download (C.2)
|
||||
- File-based sync fallback (C.3)
|
||||
- Entity-level conflict detection (C.4)
|
||||
- Conflict resolution dialog (C.5)
|
||||
- Dependency resolution with retry queue (C.6)
|
||||
- Persistent action metadata on all model actions
|
||||
|
||||
## Needs Implementation 🚧
|
||||
## Future Enhancements 🔮
|
||||
|
||||
| Component | Part | Issue | Priority |
|
||||
| -------------------------- | ---- | ----- | -------- |
|
||||
| (None - Part A/B complete) | | | |
|
||||
|
||||
## Not Started ❌
|
||||
|
||||
| Component | Part | Description |
|
||||
| ---------------------- | ---- | ---------------------------------- |
|
||||
| Server API | C | Backend for op-log sync |
|
||||
| Per-op sync tracking | C | `syncedAt` field usage |
|
||||
| Entity-level conflicts | C | Conflict detection + resolution UI |
|
||||
| Component | Description |
|
||||
| ---------------- | ------------------------------------------ |
|
||||
| Auto-merge | Automatic merge for non-conflicting fields |
|
||||
| Undo/Redo | Leverage op-log for undo history |
|
||||
| Offline queue UI | Show pending sync operations to user |
|
||||
| Op-log analytics | Debug view of operation history |
|
||||
|
||||
---
|
||||
|
||||
|
|
@ -649,20 +755,25 @@ See: **[Server Sync Architecture](./server-sync-architecture.md)**
|
|||
|
||||
```
|
||||
src/app/core/persistence/operation-log/
|
||||
├── operation.types.ts # Type definitions
|
||||
├── operation-log-store.service.ts # SUP_OPS IndexedDB
|
||||
├── operation-log.effects.ts # Action capture + META_MODEL bridge
|
||||
├── operation-log-hydrator.service.ts# Startup hydration
|
||||
├── operation-log-compaction.service.ts
|
||||
├── operation-applier.service.ts # Apply ops to store
|
||||
├── operation-converter.util.ts # Op ↔ Action conversion
|
||||
├── action-blacklist.ts # UI action filtering
|
||||
├── lock.service.ts # Cross-tab locking
|
||||
└── multi-tab-coordinator.service.ts # BroadcastChannel
|
||||
├── operation.types.ts # Type definitions (Operation, OpType, EntityType)
|
||||
├── operation-log-store.service.ts # SUP_OPS IndexedDB wrapper
|
||||
├── operation-log.effects.ts # Action capture + META_MODEL bridge
|
||||
├── operation-log-hydrator.service.ts # Startup hydration
|
||||
├── operation-log-compaction.service.ts # Snapshot + cleanup
|
||||
├── operation-log-migration.service.ts # Genesis migration from legacy
|
||||
├── operation-log-sync.service.ts # Upload/download operations (Part C)
|
||||
├── operation-applier.service.ts # Apply ops to store with dependency handling
|
||||
├── operation-converter.util.ts # Op ↔ Action conversion
|
||||
├── persistent-action.interface.ts # PersistentAction type + isPersistentAction guard
|
||||
├── lock.service.ts # Cross-tab locking (Web Locks + fallback)
|
||||
├── multi-tab-coordinator.service.ts # BroadcastChannel coordination
|
||||
├── schema-migration.service.ts # State schema migrations
|
||||
├── dependency-resolver.service.ts # Extract/check operation dependencies
|
||||
└── conflict-resolution.service.ts # Conflict UI presentation
|
||||
|
||||
src/app/pfapi/
|
||||
├── pfapi-store-delegate.service.ts # Reads NgRx for sync (Part B)
|
||||
└── pfapi.service.ts # Sync orchestration
|
||||
├── pfapi-store-delegate.service.ts # Reads NgRx for sync (Part B)
|
||||
└── pfapi.service.ts # Sync orchestration
|
||||
```
|
||||
|
||||
---
|
||||
|
|
@ -671,4 +782,4 @@ src/app/pfapi/
|
|||
|
||||
- [Execution Plan](./operation-log-execution-plan.md) - Implementation tasks
|
||||
- [PFAPI Architecture](./pfapi-sync-persistence-architecture.md) - Legacy sync system
|
||||
- [Server Sync Architecture](./server-sync-architecture.md) - Future server-based sync (Part C)
|
||||
- [Server Sync Architecture](./server-sync-architecture.md) - Server-based sync details
|
||||
|
|
|
|||
|
|
@ -1,7 +1,8 @@
|
|||
import { inject, Injectable } from '@angular/core';
|
||||
import { LockService } from './lock.service';
|
||||
import { OperationLogStoreService } from './operation-log-store.service';
|
||||
import { PfapiService } from '../../../pfapi/pfapi.service';
|
||||
import { PfapiStoreDelegateService } from '../../../pfapi/pfapi-store-delegate.service';
|
||||
import { CURRENT_SCHEMA_VERSION } from './schema-migration.service';
|
||||
|
||||
/**
|
||||
* Manages the compaction (garbage collection) of the operation log.
|
||||
|
|
@ -15,26 +16,29 @@ import { PfapiService } from '../../../pfapi/pfapi.service';
|
|||
export class OperationLogCompactionService {
|
||||
private opLogStore = inject(OperationLogStoreService);
|
||||
private lockService = inject(LockService);
|
||||
private pfapiService = inject(PfapiService);
|
||||
private storeDelegate = inject(PfapiStoreDelegateService);
|
||||
|
||||
async compact(): Promise<void> {
|
||||
await this.lockService.request('sp_op_log_compact', async () => {
|
||||
// 1. Get current state
|
||||
const currentState = await this.pfapiService.pf.getAllSyncModelData();
|
||||
// 1. Get current state from NgRx store (via delegate for consistency)
|
||||
const currentState = await this.storeDelegate.getAllSyncModelDataFromStore();
|
||||
|
||||
// 2. Get current vector clock (max of all ops)
|
||||
const currentVectorClock = await this.opLogStore.getCurrentVectorClock();
|
||||
|
||||
// 3. Write to state cache
|
||||
// 3. Write to state cache with schema version
|
||||
const lastSeq = await this.opLogStore.getLastSeq();
|
||||
await this.opLogStore.saveStateCache({
|
||||
state: currentState,
|
||||
lastAppliedOpSeq: lastSeq,
|
||||
vectorClock: currentVectorClock,
|
||||
compactedAt: Date.now(),
|
||||
schemaVersion: CURRENT_SCHEMA_VERSION,
|
||||
});
|
||||
|
||||
// 4. Delete old operations (keep recent for conflict resolution window)
|
||||
// Retention: 7 days - keeps enough history for conflict detection
|
||||
// Only delete ops that have been synced to remote
|
||||
const retentionWindowMs = 7 * 24 * 60 * 60 * 1000; // 7 days
|
||||
const cutoff = Date.now() - retentionWindowMs;
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue