diff --git a/docs/ai/sync/plan.md b/docs/ai/sync/plan.md deleted file mode 100644 index 4f98f53eb..000000000 --- a/docs/ai/sync/plan.md +++ /dev/null @@ -1,73 +0,0 @@ -# Plan: Operation Log & Archive Synchronization - -## Problem Analysis - -1. **Time Tracking Frequency:** `addTimeSpent` fires every second. Persisting this directly would flood the operation log and network. -2. **Time Tracking Divergence:** Currently, time tracking is not synced at all. -3. **Archive Divergence:** `moveToArchive` syncs the task removal but fails to move the associated "Worklog" (Time Tracking) data to the archive on remote clients. -4. **Archive Consistency:** `ArchiveYoung` and `ArchiveOld` are derived states. We need to ensure they evolve deterministically on all clients to avoid massive file syncs. - -## Goals - -1. Sync time tracking efficiently (batching). -2. Ensure `ArchiveYoung` is correctly updated on remote clients during `moveToArchive`. -3. Leverage the rarity of `ArchiveOld` updates. - -## Implementation Steps - -### 1. Efficient Time Tracking Sync (Batching) - -We will implement a "Batch Sync" pattern. The UI continues to update every second via the existing local `addTimeSpent` action. A new persistent action will handle the sync. - -- **New Action:** `TimeTrackingActions.saveTimeSpentBatch` - - - Props: `{ task: Task, date: string, duration: number }` - - Meta: `{ isPersistent: true, entityType: 'TASK', entityId: task.id, opType: 'UPD' }` - -- **Logic Update (`TaskService`):** - - - Maintain a local accumulator `_unsyncedDuration`. - - Subscribe to `tick$`. - - On every tick: - - Dispatch `addTimeSpent` (Local, existing). - - `_unsyncedDuration += tick.duration`. - - Every **1 minute** (and when task stops): - - If `_unsyncedDuration > 0`: - - Dispatch `saveTimeSpentBatch({ duration: _unsyncedDuration, ... })`. - - Reset `_unsyncedDuration = 0`. - -- **Reducer Updates (`TaskReducer` & `TimeTrackingReducer`):** - - Handle `TimeTrackingActions.saveTimeSpentBatch`. - - **Crucial:** Check `action.meta.isRemote`. - - If `false` (Local dispatch): **Ignore** (Return state as is). The local `addTimeSpent` actions have already updated the state second-by-second. - - If `true` (Remote dispatch): **Apply**. Add the duration to `timeSpent` and `worklog`. - -### 2. Fix Remote Archive Handling (`ArchiveYoung`) - -Remote clients receiving `moveToArchive` currently only move Tasks to `ArchiveYoung`. They must also move the corresponding time tracking data. - -- **File:** `src/app/features/time-tracking/archive.service.ts` -- **Function:** `writeTasksToArchiveForRemoteSync(tasks)` -- **Logic Update:** - 1. Load `TimeTrackingState` (Active) and `ArchiveYoung` (Model). - 2. Use `sortTimeTrackingDataToArchiveYoung` (existing util) to partition the time data for the archived tasks. - 3. **Update `ArchiveYoung`:** Save the partitioned "old" time data into the `ArchiveYoung` model. - 4. **Update Active State:** Dispatch `TimeTrackingActions.updateWholeState` with the remaining "active" time data (stripping out the archived days). - -### 3. Smart Handling for ArchiveOld - -Since `ArchiveOld` is written to rarely, we will treat the _Action of Flushing_ as the synced event, rather than the data itself. - -- **Deterministic Flush:** The trigger to flush tasks from `ArchiveYoung` to `ArchiveOld` is currently local (based on time thresholds). -- **New Strategy:** - - When Client A triggers a flush, it emits a new operation: `ArchiveActions.flushYoungToOld`. - - Client B receives this op. - - Client B executes the _exact same logic_: loads its `ArchiveYoung`, moves items to its `ArchiveOld`, saves both. - - **Benefit:** No massive file transfer. Both clients maintain their own `ArchiveOld` files consistent via deterministic replay of the flush operation. - - **Prerequisite:** `ArchiveYoung` must be consistent on both sides (handled by Step 2). - -## Verification Plan - -1. **Time Tracking:** Verify 1-minute batch syncing between clients. -2. **Archive Young:** Verify archiving a task on Client A removes its old worklogs on Client B. -3. **Archive Old:** Trigger a flush on Client A (mock threshold) and verify Client B also moves items to its `ArchiveOld`. diff --git a/src/app/features/tasks/store/task.reducer.ts b/src/app/features/tasks/store/task.reducer.ts index db76a4897..07e42590b 100644 --- a/src/app/features/tasks/store/task.reducer.ts +++ b/src/app/features/tasks/store/task.reducer.ts @@ -48,7 +48,10 @@ import { loadAllData } from '../../../root-store/meta/load-all-data.action'; import { createReducer, on } from '@ngrx/store'; import { PlannerActions } from '../../planner/store/planner.actions'; import { TaskSharedActions } from '../../../root-store/meta/task-shared.actions'; -import { TimeTrackingActions } from '../../time-tracking/store/time-tracking.actions'; +import { + TimeTrackingActions, + syncTimeSpent, +} from '../../time-tracking/store/time-tracking.actions'; import { TaskLog } from '../../../core/log'; import { devError } from '../../../util/dev-error'; @@ -107,6 +110,34 @@ export const taskReducer = createReducer( ); }), + // Sync time spent from remote clients + // Local: no-op (state already updated by addTimeSpent ticks) + // Remote: apply the batched duration + on(syncTimeSpent, (state, action) => { + // Only apply for remote actions - local state is already up-to-date + if (!action.meta.isRemote) { + return state; + } + + const { taskId, date, duration } = action; + const task = state.entities[taskId]; + if (!task) { + TaskLog.warn(`[syncTimeSpent] Task ${taskId} not found, skipping`); + return state; + } + + const currentTimeSpentForDay = + (task.timeSpentOnDay && +task.timeSpentOnDay[date]) || 0; + return updateTimeSpentForTask( + taskId, + { + ...task.timeSpentOnDay, + [date]: currentTimeSpentForDay + duration, + }, + state, + ); + }), + //-------------------------------- // TODO check if working diff --git a/src/app/features/tasks/task.service.ts b/src/app/features/tasks/task.service.ts index 13b2403d4..ba0a0be07 100644 --- a/src/app/features/tasks/task.service.ts +++ b/src/app/features/tasks/task.service.ts @@ -1,6 +1,6 @@ import { nanoid } from 'nanoid'; import typia from 'typia'; -import { first, map, take, withLatestFrom } from 'rxjs/operators'; +import { distinctUntilChanged, first, map, take, withLatestFrom } from 'rxjs/operators'; import { computed, effect, inject, Injectable, untracked } from '@angular/core'; import { toSignal } from '@angular/core/rxjs-interop'; import { Observable } from 'rxjs'; @@ -82,7 +82,10 @@ import { import { Update } from '@ngrx/entity'; import { RootState } from '../../root-store/root-state'; import { DateService } from '../../core/date/date.service'; -import { TimeTrackingActions } from '../time-tracking/store/time-tracking.actions'; +import { + TimeTrackingActions, + syncTimeSpent, +} from '../time-tracking/store/time-tracking.actions'; import { ArchiveService } from '../time-tracking/archive.service'; import { TaskArchiveService } from '../time-tracking/task-archive.service'; import { TODAY_TAG } from '../tag/tag.const'; @@ -181,6 +184,11 @@ export class TaskService { private _lastFocusedTaskEl: HTMLElement | null = null; private _allTasks$: Observable = this._store.pipe(select(selectAllTasks)); + // Batch sync for time tracking: accumulates duration per task, syncs every 5 minutes + private static readonly SYNC_INTERVAL_MS = 5 * 60 * 1000; // 5 minutes + private _unsyncedDuration: Map = new Map(); + private _lastSyncTime = Date.now(); + constructor() { document.addEventListener( 'focus', @@ -196,17 +204,32 @@ export class TaskService { true, ); - // time tracking + // time tracking with batch sync this._timeTrackingService.tick$ .pipe( withLatestFrom(this.currentTask$, this._imexMetaService.isDataImportInProgress$), ) .subscribe(([tick, currentTask, isImportInProgress]) => { if (currentTask && !isImportInProgress) { + // Update local state immediately (existing behavior) this.addTimeSpent(currentTask, tick.duration, tick.date); + + // Accumulate for batch sync + this._accumulateTimeSpent(currentTask.id, tick.duration, tick.date); + + // Check if it's time to sync (every 5 minutes) + if (Date.now() - this._lastSyncTime >= TaskService.SYNC_INTERVAL_MS) { + this._flushAccumulatedTimeSpent(); + } } }); + // Flush accumulated time when task stops (currentTaskId becomes null or changes) + this.currentTaskId$.pipe(distinctUntilChanged()).subscribe((newTaskId) => { + // When task changes or stops, flush any accumulated time + this._flushAccumulatedTimeSpent(); + }); + effect(() => { if (!this.isTimeTrackingEnabled() && untracked(this.currentTaskId) != null) { this.toggleStartTask(); @@ -214,6 +237,37 @@ export class TaskService { }); } + /** + * Accumulates time spent for batch sync to other clients. + */ + private _accumulateTimeSpent(taskId: string, duration: number, date: string): void { + const existing = this._unsyncedDuration.get(taskId); + if (existing && existing.date === date) { + existing.duration += duration; + } else { + // If date changed, flush the old accumulated time first + if (existing) { + this._store.dispatch( + syncTimeSpent({ taskId, date: existing.date, duration: existing.duration }), + ); + } + this._unsyncedDuration.set(taskId, { duration, date }); + } + } + + /** + * Dispatches syncTimeSpent for all accumulated time and resets accumulators. + */ + private _flushAccumulatedTimeSpent(): void { + this._unsyncedDuration.forEach(({ duration, date }, taskId) => { + if (duration > 0) { + this._store.dispatch(syncTimeSpent({ taskId, date, duration })); + } + }); + this._unsyncedDuration.clear(); + this._lastSyncTime = Date.now(); + } + getAllParentWithoutTag$(tagId: string): Observable { return this._store.pipe(select(selectMainTasksWithoutTag, { tagId })); } diff --git a/src/app/features/time-tracking/archive.service.ts b/src/app/features/time-tracking/archive.service.ts index bd21892f2..1d5b54416 100644 --- a/src/app/features/time-tracking/archive.service.ts +++ b/src/app/features/time-tracking/archive.service.ts @@ -4,12 +4,10 @@ import { flattenTasks } from '../tasks/store/task.selectors'; import { createEmptyEntity } from '../../util/create-empty-entity'; import { taskAdapter } from '../tasks/store/task.adapter'; import { PfapiService } from '../../pfapi/pfapi.service'; -import { - sortTimeTrackingAndTasksFromArchiveYoungToOld, - sortTimeTrackingDataToArchiveYoung, -} from './sort-data-to-flush'; +import { sortTimeTrackingDataToArchiveYoung } from './sort-data-to-flush'; import { Store } from '@ngrx/store'; import { TimeTrackingActions } from './store/time-tracking.actions'; +import { flushYoungToOld } from './store/archive.actions'; import { getDbDateStr } from '../../util/get-db-date-str'; import { Log } from '../../core/log'; @@ -116,6 +114,7 @@ export class ArchiveService { ); // ------------------------------------------------ + // Check if it's time to flush archiveYoung to archiveOld const archiveOld = await this._pfapiService.m.archiveOld.load(); const isFlushArchiveOld = now - archiveOld.lastTimeTrackingFlush > ARCHIVE_ALL_YOUNG_TO_OLD_THRESHOLD; @@ -124,42 +123,17 @@ export class ArchiveService { return; } - // ------------------------------------------------ - // Result B: - // Also sort timeTracking and task data from archiveYoung to archiveOld - const newSorted2 = sortTimeTrackingAndTasksFromArchiveYoungToOld({ - archiveYoung: newArchiveYoung, - archiveOld, - threshold: ARCHIVE_TASK_YOUNG_TO_OLD_THRESHOLD, - now, - }); - await this._pfapiService.m.archiveYoung.save( - { - ...newSorted2.archiveYoung, - lastTimeTrackingFlush: now, - }, - { - isUpdateRevAndLastUpdate: true, - }, - ); - await this._pfapiService.m.archiveOld.save( - { - ...newSorted2.archiveOld, - lastTimeTrackingFlush: now, - }, - { - isUpdateRevAndLastUpdate: true, - }, - ); - Log.log( - '______________________\nFLUSHED ALL FROM ARCHIVE YOUNG TO OLD\n_______________________', - ); + // Dispatch the flush action - this will be persisted and synced to other clients + // The actual flush operation is handled by ArchiveEffects.flushYoungToOld$ + // This ensures other clients receive the operation and replay the same flush, + // maintaining deterministic archive state without syncing large archiveOld files. + this._store.dispatch(flushYoungToOld({ timestamp: now })); } /** * Writes tasks to archiveYoung for remote sync operations. - * This is a simplified version that only adds tasks without triggering - * time tracking flush or archiveOld compaction. + * Also moves historical time tracking data to archiveYoung to keep + * the client's archive consistent with the originating client. * * Used when receiving moveToArchive operations from other clients. */ @@ -199,9 +173,18 @@ export class ArchiveService { taskArchiveState, ); + // Also move historical time tracking data to archiveYoung + // This ensures the remote client's archive matches the originating client + const timeTracking = await this._pfapiService.m.timeTracking.load(); + const sorted = sortTimeTrackingDataToArchiveYoung({ + timeTracking, + archiveYoung, + todayStr: getDbDateStr(now), + }); + await this._pfapiService.m.archiveYoung.save( { - ...archiveYoung, + ...sorted.archiveYoung, task: newTaskArchive, }, { @@ -210,8 +193,18 @@ export class ArchiveService { }, ); - Log.log('[ArchiveService] Remote sync: saved tasks to archiveYoung:', { - archivedTaskCount: Object.keys(newTaskArchive.entities).length, - }); + // Update active time tracking state (remove historical data that was moved to archive) + this._store.dispatch( + TimeTrackingActions.updateWholeState({ + newState: sorted.timeTracking, + }), + ); + + Log.log( + '[ArchiveService] Remote sync: saved tasks and time tracking to archiveYoung:', + { + archivedTaskCount: Object.keys(newTaskArchive.entities).length, + }, + ); } } diff --git a/src/app/features/time-tracking/store/archive.actions.ts b/src/app/features/time-tracking/store/archive.actions.ts new file mode 100644 index 000000000..ae4d3d407 --- /dev/null +++ b/src/app/features/time-tracking/store/archive.actions.ts @@ -0,0 +1,28 @@ +import { createAction } from '@ngrx/store'; +import { PersistentActionMeta } from '../../../core/persistence/operation-log/persistent-action.interface'; +import { OpType } from '../../../core/persistence/operation-log/operation.types'; + +/** + * Persistent action to flush data from archiveYoung to archiveOld. + * This action is dispatched when the local client determines it's time to flush, + * and is synced to other clients to ensure deterministic archive state. + * + * When received remotely, the receiving client executes the same flush logic, + * ensuring both clients have consistent archiveOld content without needing + * to sync the large archiveOld files directly. + * + * Prerequisites: + * - Vector clock ordering ensures this action is applied AFTER all preceding + * moveToArchive operations that populated archiveYoung. + */ +export const flushYoungToOld = createAction( + '[Archive] Flush Young to Old', + (actionProps: { timestamp: number }) => ({ + ...actionProps, + meta: { + isPersistent: true, + entityType: 'ALL', // Affects multiple entity types (tasks, timeTracking) + opType: OpType.Batch, // Bulk operation affecting multiple entities + } as PersistentActionMeta, + }), +); diff --git a/src/app/features/time-tracking/store/archive.effects.ts b/src/app/features/time-tracking/store/archive.effects.ts new file mode 100644 index 000000000..ead583905 --- /dev/null +++ b/src/app/features/time-tracking/store/archive.effects.ts @@ -0,0 +1,69 @@ +import { Injectable, inject } from '@angular/core'; +import { Actions, createEffect, ofType } from '@ngrx/effects'; +import { tap } from 'rxjs/operators'; +import { flushYoungToOld } from './archive.actions'; +import { PfapiService } from '../../../pfapi/pfapi.service'; +import { sortTimeTrackingAndTasksFromArchiveYoungToOld } from '../sort-data-to-flush'; +import { ARCHIVE_TASK_YOUNG_TO_OLD_THRESHOLD } from '../archive.service'; +import { Log } from '../../../core/log'; + +@Injectable() +export class ArchiveEffects { + private _actions$ = inject(Actions); + private _pfapiService = inject(PfapiService); + + /** + * Handles the flushYoungToOld action by executing the actual flush operation. + * This effect runs for both local and remote dispatches, ensuring deterministic + * archive state across all clients. + * + * The flush operation: + * 1. Loads archiveYoung and archiveOld + * 2. Moves old tasks and time tracking data from Young to Old + * 3. Saves both archives + */ + flushYoungToOld$ = createEffect( + () => + this._actions$.pipe( + ofType(flushYoungToOld), + tap(async ({ timestamp }) => { + const now = timestamp; + + const archiveYoung = await this._pfapiService.m.archiveYoung.load(); + const archiveOld = await this._pfapiService.m.archiveOld.load(); + + const newSorted = sortTimeTrackingAndTasksFromArchiveYoungToOld({ + archiveYoung, + archiveOld, + threshold: ARCHIVE_TASK_YOUNG_TO_OLD_THRESHOLD, + now, + }); + + await this._pfapiService.m.archiveYoung.save( + { + ...newSorted.archiveYoung, + lastTimeTrackingFlush: now, + }, + { + isUpdateRevAndLastUpdate: true, + }, + ); + + await this._pfapiService.m.archiveOld.save( + { + ...newSorted.archiveOld, + lastTimeTrackingFlush: now, + }, + { + isUpdateRevAndLastUpdate: true, + }, + ); + + Log.log( + '______________________\nFLUSHED ALL FROM ARCHIVE YOUNG TO OLD (via action)\n_______________________', + ); + }), + ), + { dispatch: false }, + ); +} diff --git a/src/app/features/time-tracking/store/time-tracking.actions.ts b/src/app/features/time-tracking/store/time-tracking.actions.ts index 7d0f94d5c..4bb7924b0 100644 --- a/src/app/features/time-tracking/store/time-tracking.actions.ts +++ b/src/app/features/time-tracking/store/time-tracking.actions.ts @@ -1,8 +1,10 @@ /* eslint-disable @typescript-eslint/naming-convention */ -import { createActionGroup, props } from '@ngrx/store'; +import { createAction, createActionGroup, props } from '@ngrx/store'; import { WorkContextType } from '../../work-context/work-context.model'; import { TimeTrackingState, TTWorkContextData } from '../time-tracking.model'; import { Task } from '../../tasks/task.model'; +import { PersistentActionMeta } from '../../../core/persistence/operation-log/persistent-action.interface'; +import { OpType } from '../../../core/persistence/operation-log/operation.types'; export const TimeTrackingActions = createActionGroup({ source: 'TimeTracking', @@ -23,3 +25,23 @@ export const TimeTrackingActions = createActionGroup({ }>(), }, }); + +/** + * Persistent action for syncing accumulated time spent to other clients. + * Dispatched every 5 minutes during active tracking and when tracking stops. + * + * Local dispatch: Ignored by reducer (state already updated by addTimeSpent ticks) + * Remote dispatch: Applied to update timeSpentOnDay and timeTracking state + */ +export const syncTimeSpent = createAction( + '[TimeTracking] Sync time spent', + (actionProps: { taskId: string; date: string; duration: number }) => ({ + ...actionProps, + meta: { + isPersistent: true, + entityType: 'TASK', + entityId: actionProps.taskId, + opType: OpType.Update, + } as PersistentActionMeta, + }), +); diff --git a/src/app/root-store/feature-stores.module.ts b/src/app/root-store/feature-stores.module.ts index f3143883a..5b0213caf 100644 --- a/src/app/root-store/feature-stores.module.ts +++ b/src/app/root-store/feature-stores.module.ts @@ -76,6 +76,7 @@ import { ReminderCountdownEffects } from '../features/reminder/store/reminder-co import { SyncEffects } from '../imex/sync/sync.effects'; import { boardsFeature } from '../features/boards/store/boards.reducer'; import { timeTrackingFeature } from '../features/time-tracking/store/time-tracking.reducer'; +import { ArchiveEffects } from '../features/time-tracking/store/archive.effects'; import { plannerFeature } from '../features/planner/store/planner.reducer'; import { PlannerEffects } from '../features/planner/store/planner.effects'; import { AppStateEffects } from './app-state/app-state.effects'; @@ -166,6 +167,7 @@ import { StoreModule.forFeature(boardsFeature), StoreModule.forFeature(timeTrackingFeature), + EffectsModule.forFeature([ArchiveEffects]), StoreModule.forFeature(plannerFeature), EffectsModule.forFeature([PlannerEffects]),