feat(sync): implement time tracking sync and deterministic archive flush

- Add syncTimeSpent persistent action for batched time tracking sync
  - Accumulates duration per task, syncs every 5 minutes or on task stop
  - Local dispatch: no-op (state already updated by second-by-second ticks)
  - Remote dispatch: applies batched duration to task.timeSpentOnDay

- Fix remote archive handling to include time tracking data
  - writeTasksToArchiveForRemoteSync now moves historical time tracking
    data to archiveYoung alongside tasks

- Add flushYoungToOld persistent action for deterministic archive state
  - Replaces inline flush logic with action-based approach
  - Remote clients replay the flush operation to maintain consistent
    archiveOld state without syncing large files
  - Vector clock ordering ensures flush happens after preceding operations
This commit is contained in:
Johannes Millan 2025-12-04 15:41:16 +01:00
parent eba3f3fcf5
commit f661de10de
8 changed files with 244 additions and 118 deletions

View file

@ -1,73 +0,0 @@
# Plan: Operation Log & Archive Synchronization
## Problem Analysis
1. **Time Tracking Frequency:** `addTimeSpent` fires every second. Persisting this directly would flood the operation log and network.
2. **Time Tracking Divergence:** Currently, time tracking is not synced at all.
3. **Archive Divergence:** `moveToArchive` syncs the task removal but fails to move the associated "Worklog" (Time Tracking) data to the archive on remote clients.
4. **Archive Consistency:** `ArchiveYoung` and `ArchiveOld` are derived states. We need to ensure they evolve deterministically on all clients to avoid massive file syncs.
## Goals
1. Sync time tracking efficiently (batching).
2. Ensure `ArchiveYoung` is correctly updated on remote clients during `moveToArchive`.
3. Leverage the rarity of `ArchiveOld` updates.
## Implementation Steps
### 1. Efficient Time Tracking Sync (Batching)
We will implement a "Batch Sync" pattern. The UI continues to update every second via the existing local `addTimeSpent` action. A new persistent action will handle the sync.
- **New Action:** `TimeTrackingActions.saveTimeSpentBatch`
- Props: `{ task: Task, date: string, duration: number }`
- Meta: `{ isPersistent: true, entityType: 'TASK', entityId: task.id, opType: 'UPD' }`
- **Logic Update (`TaskService`):**
- Maintain a local accumulator `_unsyncedDuration`.
- Subscribe to `tick$`.
- On every tick:
- Dispatch `addTimeSpent` (Local, existing).
- `_unsyncedDuration += tick.duration`.
- Every **1 minute** (and when task stops):
- If `_unsyncedDuration > 0`:
- Dispatch `saveTimeSpentBatch({ duration: _unsyncedDuration, ... })`.
- Reset `_unsyncedDuration = 0`.
- **Reducer Updates (`TaskReducer` & `TimeTrackingReducer`):**
- Handle `TimeTrackingActions.saveTimeSpentBatch`.
- **Crucial:** Check `action.meta.isRemote`.
- If `false` (Local dispatch): **Ignore** (Return state as is). The local `addTimeSpent` actions have already updated the state second-by-second.
- If `true` (Remote dispatch): **Apply**. Add the duration to `timeSpent` and `worklog`.
### 2. Fix Remote Archive Handling (`ArchiveYoung`)
Remote clients receiving `moveToArchive` currently only move Tasks to `ArchiveYoung`. They must also move the corresponding time tracking data.
- **File:** `src/app/features/time-tracking/archive.service.ts`
- **Function:** `writeTasksToArchiveForRemoteSync(tasks)`
- **Logic Update:**
1. Load `TimeTrackingState` (Active) and `ArchiveYoung` (Model).
2. Use `sortTimeTrackingDataToArchiveYoung` (existing util) to partition the time data for the archived tasks.
3. **Update `ArchiveYoung`:** Save the partitioned "old" time data into the `ArchiveYoung` model.
4. **Update Active State:** Dispatch `TimeTrackingActions.updateWholeState` with the remaining "active" time data (stripping out the archived days).
### 3. Smart Handling for ArchiveOld
Since `ArchiveOld` is written to rarely, we will treat the _Action of Flushing_ as the synced event, rather than the data itself.
- **Deterministic Flush:** The trigger to flush tasks from `ArchiveYoung` to `ArchiveOld` is currently local (based on time thresholds).
- **New Strategy:**
- When Client A triggers a flush, it emits a new operation: `ArchiveActions.flushYoungToOld`.
- Client B receives this op.
- Client B executes the _exact same logic_: loads its `ArchiveYoung`, moves items to its `ArchiveOld`, saves both.
- **Benefit:** No massive file transfer. Both clients maintain their own `ArchiveOld` files consistent via deterministic replay of the flush operation.
- **Prerequisite:** `ArchiveYoung` must be consistent on both sides (handled by Step 2).
## Verification Plan
1. **Time Tracking:** Verify 1-minute batch syncing between clients.
2. **Archive Young:** Verify archiving a task on Client A removes its old worklogs on Client B.
3. **Archive Old:** Trigger a flush on Client A (mock threshold) and verify Client B also moves items to its `ArchiveOld`.

View file

@ -48,7 +48,10 @@ import { loadAllData } from '../../../root-store/meta/load-all-data.action';
import { createReducer, on } from '@ngrx/store';
import { PlannerActions } from '../../planner/store/planner.actions';
import { TaskSharedActions } from '../../../root-store/meta/task-shared.actions';
import { TimeTrackingActions } from '../../time-tracking/store/time-tracking.actions';
import {
TimeTrackingActions,
syncTimeSpent,
} from '../../time-tracking/store/time-tracking.actions';
import { TaskLog } from '../../../core/log';
import { devError } from '../../../util/dev-error';
@ -107,6 +110,34 @@ export const taskReducer = createReducer<TaskState>(
);
}),
// Sync time spent from remote clients
// Local: no-op (state already updated by addTimeSpent ticks)
// Remote: apply the batched duration
on(syncTimeSpent, (state, action) => {
// Only apply for remote actions - local state is already up-to-date
if (!action.meta.isRemote) {
return state;
}
const { taskId, date, duration } = action;
const task = state.entities[taskId];
if (!task) {
TaskLog.warn(`[syncTimeSpent] Task ${taskId} not found, skipping`);
return state;
}
const currentTimeSpentForDay =
(task.timeSpentOnDay && +task.timeSpentOnDay[date]) || 0;
return updateTimeSpentForTask(
taskId,
{
...task.timeSpentOnDay,
[date]: currentTimeSpentForDay + duration,
},
state,
);
}),
//--------------------------------
// TODO check if working

View file

@ -1,6 +1,6 @@
import { nanoid } from 'nanoid';
import typia from 'typia';
import { first, map, take, withLatestFrom } from 'rxjs/operators';
import { distinctUntilChanged, first, map, take, withLatestFrom } from 'rxjs/operators';
import { computed, effect, inject, Injectable, untracked } from '@angular/core';
import { toSignal } from '@angular/core/rxjs-interop';
import { Observable } from 'rxjs';
@ -82,7 +82,10 @@ import {
import { Update } from '@ngrx/entity';
import { RootState } from '../../root-store/root-state';
import { DateService } from '../../core/date/date.service';
import { TimeTrackingActions } from '../time-tracking/store/time-tracking.actions';
import {
TimeTrackingActions,
syncTimeSpent,
} from '../time-tracking/store/time-tracking.actions';
import { ArchiveService } from '../time-tracking/archive.service';
import { TaskArchiveService } from '../time-tracking/task-archive.service';
import { TODAY_TAG } from '../tag/tag.const';
@ -181,6 +184,11 @@ export class TaskService {
private _lastFocusedTaskEl: HTMLElement | null = null;
private _allTasks$: Observable<Task[]> = this._store.pipe(select(selectAllTasks));
// Batch sync for time tracking: accumulates duration per task, syncs every 5 minutes
private static readonly SYNC_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes
private _unsyncedDuration: Map<string, { duration: number; date: string }> = new Map();
private _lastSyncTime = Date.now();
constructor() {
document.addEventListener(
'focus',
@ -196,17 +204,32 @@ export class TaskService {
true,
);
// time tracking
// time tracking with batch sync
this._timeTrackingService.tick$
.pipe(
withLatestFrom(this.currentTask$, this._imexMetaService.isDataImportInProgress$),
)
.subscribe(([tick, currentTask, isImportInProgress]) => {
if (currentTask && !isImportInProgress) {
// Update local state immediately (existing behavior)
this.addTimeSpent(currentTask, tick.duration, tick.date);
// Accumulate for batch sync
this._accumulateTimeSpent(currentTask.id, tick.duration, tick.date);
// Check if it's time to sync (every 5 minutes)
if (Date.now() - this._lastSyncTime >= TaskService.SYNC_INTERVAL_MS) {
this._flushAccumulatedTimeSpent();
}
}
});
// Flush accumulated time when task stops (currentTaskId becomes null or changes)
this.currentTaskId$.pipe(distinctUntilChanged()).subscribe((newTaskId) => {
// When task changes or stops, flush any accumulated time
this._flushAccumulatedTimeSpent();
});
effect(() => {
if (!this.isTimeTrackingEnabled() && untracked(this.currentTaskId) != null) {
this.toggleStartTask();
@ -214,6 +237,37 @@ export class TaskService {
});
}
/**
* Accumulates time spent for batch sync to other clients.
*/
private _accumulateTimeSpent(taskId: string, duration: number, date: string): void {
const existing = this._unsyncedDuration.get(taskId);
if (existing && existing.date === date) {
existing.duration += duration;
} else {
// If date changed, flush the old accumulated time first
if (existing) {
this._store.dispatch(
syncTimeSpent({ taskId, date: existing.date, duration: existing.duration }),
);
}
this._unsyncedDuration.set(taskId, { duration, date });
}
}
/**
* Dispatches syncTimeSpent for all accumulated time and resets accumulators.
*/
private _flushAccumulatedTimeSpent(): void {
this._unsyncedDuration.forEach(({ duration, date }, taskId) => {
if (duration > 0) {
this._store.dispatch(syncTimeSpent({ taskId, date, duration }));
}
});
this._unsyncedDuration.clear();
this._lastSyncTime = Date.now();
}
getAllParentWithoutTag$(tagId: string): Observable<Task[]> {
return this._store.pipe(select(selectMainTasksWithoutTag, { tagId }));
}

View file

@ -4,12 +4,10 @@ import { flattenTasks } from '../tasks/store/task.selectors';
import { createEmptyEntity } from '../../util/create-empty-entity';
import { taskAdapter } from '../tasks/store/task.adapter';
import { PfapiService } from '../../pfapi/pfapi.service';
import {
sortTimeTrackingAndTasksFromArchiveYoungToOld,
sortTimeTrackingDataToArchiveYoung,
} from './sort-data-to-flush';
import { sortTimeTrackingDataToArchiveYoung } from './sort-data-to-flush';
import { Store } from '@ngrx/store';
import { TimeTrackingActions } from './store/time-tracking.actions';
import { flushYoungToOld } from './store/archive.actions';
import { getDbDateStr } from '../../util/get-db-date-str';
import { Log } from '../../core/log';
@ -116,6 +114,7 @@ export class ArchiveService {
);
// ------------------------------------------------
// Check if it's time to flush archiveYoung to archiveOld
const archiveOld = await this._pfapiService.m.archiveOld.load();
const isFlushArchiveOld =
now - archiveOld.lastTimeTrackingFlush > ARCHIVE_ALL_YOUNG_TO_OLD_THRESHOLD;
@ -124,42 +123,17 @@ export class ArchiveService {
return;
}
// ------------------------------------------------
// Result B:
// Also sort timeTracking and task data from archiveYoung to archiveOld
const newSorted2 = sortTimeTrackingAndTasksFromArchiveYoungToOld({
archiveYoung: newArchiveYoung,
archiveOld,
threshold: ARCHIVE_TASK_YOUNG_TO_OLD_THRESHOLD,
now,
});
await this._pfapiService.m.archiveYoung.save(
{
...newSorted2.archiveYoung,
lastTimeTrackingFlush: now,
},
{
isUpdateRevAndLastUpdate: true,
},
);
await this._pfapiService.m.archiveOld.save(
{
...newSorted2.archiveOld,
lastTimeTrackingFlush: now,
},
{
isUpdateRevAndLastUpdate: true,
},
);
Log.log(
'______________________\nFLUSHED ALL FROM ARCHIVE YOUNG TO OLD\n_______________________',
);
// Dispatch the flush action - this will be persisted and synced to other clients
// The actual flush operation is handled by ArchiveEffects.flushYoungToOld$
// This ensures other clients receive the operation and replay the same flush,
// maintaining deterministic archive state without syncing large archiveOld files.
this._store.dispatch(flushYoungToOld({ timestamp: now }));
}
/**
* Writes tasks to archiveYoung for remote sync operations.
* This is a simplified version that only adds tasks without triggering
* time tracking flush or archiveOld compaction.
* Also moves historical time tracking data to archiveYoung to keep
* the client's archive consistent with the originating client.
*
* Used when receiving moveToArchive operations from other clients.
*/
@ -199,9 +173,18 @@ export class ArchiveService {
taskArchiveState,
);
// Also move historical time tracking data to archiveYoung
// This ensures the remote client's archive matches the originating client
const timeTracking = await this._pfapiService.m.timeTracking.load();
const sorted = sortTimeTrackingDataToArchiveYoung({
timeTracking,
archiveYoung,
todayStr: getDbDateStr(now),
});
await this._pfapiService.m.archiveYoung.save(
{
...archiveYoung,
...sorted.archiveYoung,
task: newTaskArchive,
},
{
@ -210,8 +193,18 @@ export class ArchiveService {
},
);
Log.log('[ArchiveService] Remote sync: saved tasks to archiveYoung:', {
archivedTaskCount: Object.keys(newTaskArchive.entities).length,
});
// Update active time tracking state (remove historical data that was moved to archive)
this._store.dispatch(
TimeTrackingActions.updateWholeState({
newState: sorted.timeTracking,
}),
);
Log.log(
'[ArchiveService] Remote sync: saved tasks and time tracking to archiveYoung:',
{
archivedTaskCount: Object.keys(newTaskArchive.entities).length,
},
);
}
}

View file

@ -0,0 +1,28 @@
import { createAction } from '@ngrx/store';
import { PersistentActionMeta } from '../../../core/persistence/operation-log/persistent-action.interface';
import { OpType } from '../../../core/persistence/operation-log/operation.types';
/**
* Persistent action to flush data from archiveYoung to archiveOld.
* This action is dispatched when the local client determines it's time to flush,
* and is synced to other clients to ensure deterministic archive state.
*
* When received remotely, the receiving client executes the same flush logic,
* ensuring both clients have consistent archiveOld content without needing
* to sync the large archiveOld files directly.
*
* Prerequisites:
* - Vector clock ordering ensures this action is applied AFTER all preceding
* moveToArchive operations that populated archiveYoung.
*/
export const flushYoungToOld = createAction(
'[Archive] Flush Young to Old',
(actionProps: { timestamp: number }) => ({
...actionProps,
meta: {
isPersistent: true,
entityType: 'ALL', // Affects multiple entity types (tasks, timeTracking)
opType: OpType.Batch, // Bulk operation affecting multiple entities
} as PersistentActionMeta,
}),
);

View file

@ -0,0 +1,69 @@
import { Injectable, inject } from '@angular/core';
import { Actions, createEffect, ofType } from '@ngrx/effects';
import { tap } from 'rxjs/operators';
import { flushYoungToOld } from './archive.actions';
import { PfapiService } from '../../../pfapi/pfapi.service';
import { sortTimeTrackingAndTasksFromArchiveYoungToOld } from '../sort-data-to-flush';
import { ARCHIVE_TASK_YOUNG_TO_OLD_THRESHOLD } from '../archive.service';
import { Log } from '../../../core/log';
@Injectable()
export class ArchiveEffects {
private _actions$ = inject(Actions);
private _pfapiService = inject(PfapiService);
/**
* Handles the flushYoungToOld action by executing the actual flush operation.
* This effect runs for both local and remote dispatches, ensuring deterministic
* archive state across all clients.
*
* The flush operation:
* 1. Loads archiveYoung and archiveOld
* 2. Moves old tasks and time tracking data from Young to Old
* 3. Saves both archives
*/
flushYoungToOld$ = createEffect(
() =>
this._actions$.pipe(
ofType(flushYoungToOld),
tap(async ({ timestamp }) => {
const now = timestamp;
const archiveYoung = await this._pfapiService.m.archiveYoung.load();
const archiveOld = await this._pfapiService.m.archiveOld.load();
const newSorted = sortTimeTrackingAndTasksFromArchiveYoungToOld({
archiveYoung,
archiveOld,
threshold: ARCHIVE_TASK_YOUNG_TO_OLD_THRESHOLD,
now,
});
await this._pfapiService.m.archiveYoung.save(
{
...newSorted.archiveYoung,
lastTimeTrackingFlush: now,
},
{
isUpdateRevAndLastUpdate: true,
},
);
await this._pfapiService.m.archiveOld.save(
{
...newSorted.archiveOld,
lastTimeTrackingFlush: now,
},
{
isUpdateRevAndLastUpdate: true,
},
);
Log.log(
'______________________\nFLUSHED ALL FROM ARCHIVE YOUNG TO OLD (via action)\n_______________________',
);
}),
),
{ dispatch: false },
);
}

View file

@ -1,8 +1,10 @@
/* eslint-disable @typescript-eslint/naming-convention */
import { createActionGroup, props } from '@ngrx/store';
import { createAction, createActionGroup, props } from '@ngrx/store';
import { WorkContextType } from '../../work-context/work-context.model';
import { TimeTrackingState, TTWorkContextData } from '../time-tracking.model';
import { Task } from '../../tasks/task.model';
import { PersistentActionMeta } from '../../../core/persistence/operation-log/persistent-action.interface';
import { OpType } from '../../../core/persistence/operation-log/operation.types';
export const TimeTrackingActions = createActionGroup({
source: 'TimeTracking',
@ -23,3 +25,23 @@ export const TimeTrackingActions = createActionGroup({
}>(),
},
});
/**
* Persistent action for syncing accumulated time spent to other clients.
* Dispatched every 5 minutes during active tracking and when tracking stops.
*
* Local dispatch: Ignored by reducer (state already updated by addTimeSpent ticks)
* Remote dispatch: Applied to update timeSpentOnDay and timeTracking state
*/
export const syncTimeSpent = createAction(
'[TimeTracking] Sync time spent',
(actionProps: { taskId: string; date: string; duration: number }) => ({
...actionProps,
meta: {
isPersistent: true,
entityType: 'TASK',
entityId: actionProps.taskId,
opType: OpType.Update,
} as PersistentActionMeta,
}),
);

View file

@ -76,6 +76,7 @@ import { ReminderCountdownEffects } from '../features/reminder/store/reminder-co
import { SyncEffects } from '../imex/sync/sync.effects';
import { boardsFeature } from '../features/boards/store/boards.reducer';
import { timeTrackingFeature } from '../features/time-tracking/store/time-tracking.reducer';
import { ArchiveEffects } from '../features/time-tracking/store/archive.effects';
import { plannerFeature } from '../features/planner/store/planner.reducer';
import { PlannerEffects } from '../features/planner/store/planner.effects';
import { AppStateEffects } from './app-state/app-state.effects';
@ -166,6 +167,7 @@ import {
StoreModule.forFeature(boardsFeature),
StoreModule.forFeature(timeTrackingFeature),
EffectsModule.forFeature([ArchiveEffects]),
StoreModule.forFeature(plannerFeature),
EffectsModule.forFeature([PlannerEffects]),