From 262f8f10b181c99d98da3a7dec860dfa11d3f992 Mon Sep 17 00:00:00 2001 From: Johannes Millan Date: Fri, 12 Dec 2025 12:29:43 +0100 Subject: [PATCH] fix(sync): address operation-log review issues - Fix server migration race condition by moving check inside upload lock using preUploadCallback pattern to ensure atomicity - Fix batch apply error handling to track partial success, returning ApplyOperationsResult with appliedOps and failedOp info - Add 5-minute timeout to conflict dialog to prevent indefinite blocking if user walks away during sync conflict resolution --- .../migration-handling.integration.spec.ts | 17 +- .../service-logic.integration.spec.ts | 1 + .../operation-log/operation-log.const.ts | 8 + .../operation-applier.service.spec.ts | 95 +++++++--- .../processing/operation-applier.service.ts | 60 +++++- .../operation-log-hydrator.service.spec.ts | 22 ++- .../store/operation-log-hydrator.service.ts | 27 ++- .../sync/conflict-resolution.service.spec.ts | 159 +++++++++++++++- .../sync/conflict-resolution.service.ts | 68 +++++-- .../sync/operation-log-sync.service.spec.ts | 44 ++++- .../sync/operation-log-sync.service.ts | 72 ++++--- .../sync/operation-log-upload.service.spec.ts | 176 ++++++++++++++++++ .../sync/operation-log-upload.service.ts | 29 ++- src/app/t.const.ts | 1 + src/assets/i18n/en.json | 1 + 15 files changed, 672 insertions(+), 108 deletions(-) diff --git a/src/app/core/persistence/operation-log/integration/migration-handling.integration.spec.ts b/src/app/core/persistence/operation-log/integration/migration-handling.integration.spec.ts index ce7f83d8c..504c7cf7c 100644 --- a/src/app/core/persistence/operation-log/integration/migration-handling.integration.spec.ts +++ b/src/app/core/persistence/operation-log/integration/migration-handling.integration.spec.ts @@ -40,6 +40,9 @@ describe('Migration Handling Integration', () => { operationApplierSpy = jasmine.createSpyObj('OperationApplierService', [ 'applyOperations', ]); + operationApplierSpy.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); TestBed.configureTestingModule({ providers: [ @@ -203,13 +206,17 @@ describe('Migration Handling Integration', () => { schemaVersion: 1, }); - it('should mark operations as failed if application throws error', async () => { + it('should mark operations as failed if application returns failure result', async () => { const op = createOp('op-fail'); - // Make applier throw error - operationApplierSpy.applyOperations.and.rejectWith( - new Error('Simulated Apply Error'), - ); + // Make applier return failure result (new behavior with partial success support) + operationApplierSpy.applyOperations.and.resolveTo({ + appliedOps: [], + failedOp: { + op, + error: new Error('Simulated Apply Error'), + }, + }); // Spy on store to verify markFailed is called spyOn(opLogStore, 'markFailed').and.callThrough(); diff --git a/src/app/core/persistence/operation-log/integration/service-logic.integration.spec.ts b/src/app/core/persistence/operation-log/integration/service-logic.integration.spec.ts index a4a9f55f3..2a1ddd66c 100644 --- a/src/app/core/persistence/operation-log/integration/service-logic.integration.spec.ts +++ b/src/app/core/persistence/operation-log/integration/service-logic.integration.spec.ts @@ -166,6 +166,7 @@ describe('Service Logic Integration', () => { 'presentConflicts', ]); applierSpy = jasmine.createSpyObj('OperationApplierService', ['applyOperations']); + applierSpy.applyOperations.and.returnValue(Promise.resolve({ appliedOps: [] })); // Create spy properly before using in TestBed const waitServiceSpy = jasmine.createSpyObj('UserInputWaitStateService', [ diff --git a/src/app/core/persistence/operation-log/operation-log.const.ts b/src/app/core/persistence/operation-log/operation-log.const.ts index 8c76dea78..3cb04faee 100644 --- a/src/app/core/persistence/operation-log/operation-log.const.ts +++ b/src/app/core/persistence/operation-log/operation-log.const.ts @@ -153,3 +153,11 @@ export const SLOW_COMPACTION_THRESHOLD_MS = 3000; * Default: 20MB */ export const STATE_SIZE_WARNING_THRESHOLD_MB = 20; + +/** + * Timeout for the conflict resolution dialog (milliseconds). + * If user doesn't respond within this time, the dialog auto-cancels + * to prevent sync from being blocked indefinitely. + * Default: 5 minutes + */ +export const CONFLICT_DIALOG_TIMEOUT_MS = 5 * 60 * 1000; diff --git a/src/app/core/persistence/operation-log/processing/operation-applier.service.spec.ts b/src/app/core/persistence/operation-log/processing/operation-applier.service.spec.ts index ce12a2f5e..810d340ea 100644 --- a/src/app/core/persistence/operation-log/processing/operation-applier.service.spec.ts +++ b/src/app/core/persistence/operation-log/processing/operation-applier.service.spec.ts @@ -67,7 +67,7 @@ describe('OperationApplierService', () => { it('should dispatch action for operation with no dependencies', async () => { const op = createMockOperation('op-1', 'TASK', OpType.Update, { title: 'Test' }); - await service.applyOperations([op]); + const result = await service.applyOperations([op]); expect(mockStore.dispatch).toHaveBeenCalledTimes(1); expect(mockStore.dispatch).toHaveBeenCalledWith( @@ -80,6 +80,8 @@ describe('OperationApplierService', () => { }), }), ); + expect(result.appliedOps).toEqual([op]); + expect(result.failedOp).toBeUndefined(); }); it('should dispatch actions for multiple operations in order', async () => { @@ -89,7 +91,7 @@ describe('OperationApplierService', () => { createMockOperation('op-3', 'TASK', OpType.Update, { title: 'Third' }), ]; - await service.applyOperations(ops); + const result = await service.applyOperations(ops); expect(mockStore.dispatch).toHaveBeenCalledTimes(3); @@ -97,12 +99,17 @@ describe('OperationApplierService', () => { expect((calls[0].args[0] as any).title).toBe('First'); expect((calls[1].args[0] as any).title).toBe('Second'); expect((calls[2].args[0] as any).title).toBe('Third'); + + expect(result.appliedOps).toEqual(ops); + expect(result.failedOp).toBeUndefined(); }); it('should handle empty operations array', async () => { - await service.applyOperations([]); + const result = await service.applyOperations([]); expect(mockStore.dispatch).not.toHaveBeenCalled(); + expect(result.appliedOps).toEqual([]); + expect(result.failedOp).toBeUndefined(); }); it('should call archiveOperationHandler after dispatching', async () => { @@ -115,7 +122,7 @@ describe('OperationApplierService', () => { }); describe('dependency handling', () => { - it('should throw SyncStateCorruptedError for operation with missing hard dependency', async () => { + it('should return failed op for operation with missing hard dependency', async () => { const op = createMockOperation('op-1', 'TASK', OpType.Create, { parentId: 'parent-123', }); @@ -132,10 +139,12 @@ describe('OperationApplierService', () => { Promise.resolve({ missing: [parentDep] }), ); - await expectAsync(service.applyOperations([op])).toBeRejectedWithError( - SyncStateCorruptedError, - ); + const result = await service.applyOperations([op]); + expect(result.appliedOps).toEqual([]); + expect(result.failedOp).toBeDefined(); + expect(result.failedOp!.op).toBe(op); + expect(result.failedOp!.error).toBeInstanceOf(SyncStateCorruptedError); expect(mockStore.dispatch).not.toHaveBeenCalled(); }); @@ -156,16 +165,14 @@ describe('OperationApplierService', () => { Promise.resolve({ missing: [parentDep] }), ); - try { - await service.applyOperations([op]); - fail('Expected SyncStateCorruptedError to be thrown'); - } catch (e) { - expect(e).toBeInstanceOf(SyncStateCorruptedError); - const error = e as SyncStateCorruptedError; - expect(error.context.opId).toBe('op-1'); - expect(error.context.actionType).toBe('[Test] Action'); - expect(error.context.missingDependencies).toContain('TASK:parent-123'); - } + const result = await service.applyOperations([op]); + + expect(result.failedOp).toBeDefined(); + const error = result.failedOp!.error as SyncStateCorruptedError; + expect(error).toBeInstanceOf(SyncStateCorruptedError); + expect(error.context.opId).toBe('op-1'); + expect(error.context.actionType).toBe('[Test] Action'); + expect(error.context.missingDependencies).toContain('TASK:parent-123'); }); it('should dispatch action for operation with missing soft dependency', async () => { @@ -185,13 +192,15 @@ describe('OperationApplierService', () => { Promise.resolve({ missing: [projectDep] }), ); - await service.applyOperations([op]); + const result = await service.applyOperations([op]); // Soft dependency doesn't block application expect(mockStore.dispatch).toHaveBeenCalledTimes(1); + expect(result.appliedOps).toEqual([op]); + expect(result.failedOp).toBeUndefined(); }); - it('should throw on first operation with missing hard deps in a batch', async () => { + it('should return partial success when operation in batch fails (batch apply fix)', async () => { const op1 = createMockOperation('op-1', 'TASK', OpType.Update, { title: 'OK' }); const op2 = createMockOperation('op-2', 'TASK', OpType.Create, { parentId: 'missing', @@ -219,13 +228,55 @@ describe('OperationApplierService', () => { }, ); - await expectAsync(service.applyOperations([op1, op2, op3])).toBeRejectedWithError( - SyncStateCorruptedError, - ); + const result = await service.applyOperations([op1, op2, op3]); // First operation should have been dispatched before we hit the error expect(mockStore.dispatch).toHaveBeenCalledTimes(1); expect((mockStore.dispatch.calls.first().args[0] as any).title).toBe('OK'); + + // Result should show partial success + expect(result.appliedOps).toEqual([op1]); + expect(result.failedOp).toBeDefined(); + expect(result.failedOp!.op).toBe(op2); + expect(result.failedOp!.error).toBeInstanceOf(SyncStateCorruptedError); + }); + + it('should return partial success with all ops that succeeded before failure', async () => { + const ops = [ + createMockOperation('op-1', 'TASK', OpType.Update, { title: 'First OK' }), + createMockOperation('op-2', 'TASK', OpType.Update, { title: 'Second OK' }), + createMockOperation('op-3', 'TASK', OpType.Create, { parentId: 'missing' }), // fails + createMockOperation('op-4', 'TASK', OpType.Update, { title: 'Never' }), + createMockOperation('op-5', 'TASK', OpType.Update, { title: 'Never' }), + ]; + + const parentDep: OperationDependency = { + entityType: 'TASK', + entityId: 'missing', + mustExist: true, + relation: 'parent', + }; + + mockDependencyResolver.extractDependencies.and.callFake((op: Operation) => { + if (op.id === 'op-3') return [parentDep]; + return []; + }); + + mockDependencyResolver.checkDependencies.and.callFake( + async (deps: OperationDependency[]) => { + if (deps.some((d) => d.entityId === 'missing')) { + return { missing: deps }; + } + return { missing: [] }; + }, + ); + + const result = await service.applyOperations(ops); + + // First two operations should have been dispatched + expect(mockStore.dispatch).toHaveBeenCalledTimes(2); + expect(result.appliedOps).toEqual([ops[0], ops[1]]); + expect(result.failedOp!.op).toBe(ops[2]); }); }); }); diff --git a/src/app/core/persistence/operation-log/processing/operation-applier.service.ts b/src/app/core/persistence/operation-log/processing/operation-applier.service.ts index c238a6b2c..014acc20d 100644 --- a/src/app/core/persistence/operation-log/processing/operation-applier.service.ts +++ b/src/app/core/persistence/operation-log/processing/operation-applier.service.ts @@ -8,6 +8,29 @@ import { ArchiveOperationHandler } from './archive-operation-handler.service'; import { SyncStateCorruptedError } from '../sync-state-corrupted.error'; import { HydrationStateService } from './hydration-state.service'; +/** + * Result of applying operations to the NgRx store. + * + * This allows callers to handle partial success scenarios where some operations + * were applied before an error occurred. + */ +export interface ApplyOperationsResult { + /** + * Operations that were successfully applied to the NgRx store. + * These ops have already been dispatched and should be marked as applied. + */ + appliedOps: Operation[]; + + /** + * If an error occurred, this contains the failed operation and the error. + * Operations after this one in the batch were NOT applied. + */ + failedOp?: { + op: Operation; + error: Error; + }; +} + /** * Service responsible for applying operations to the local NgRx store. * @@ -43,14 +66,17 @@ export class OperationApplierService { /** * Apply operations to the NgRx store. - * Operations are applied in order. If any operation has missing hard dependencies, - * a SyncStateCorruptedError is thrown immediately. + * Operations are applied in order. If any operation fails, the result includes + * information about which operations succeeded and which failed. * - * @throws SyncStateCorruptedError if any operation has missing hard dependencies + * @returns Result containing applied operations and optionally the failed operation. + * Callers should: + * - Mark `appliedOps` as applied (they've been dispatched to NgRx) + * - Mark the failed op and any remaining ops as failed */ - async applyOperations(ops: Operation[]): Promise { + async applyOperations(ops: Operation[]): Promise { if (ops.length === 0) { - return; + return { appliedOps: [] }; } OpLog.normal( @@ -58,17 +84,39 @@ export class OperationApplierService { ops.map((op) => op.id), ); + const appliedOps: Operation[] = []; + // Mark that we're applying remote operations to suppress selector-based effects this.hydrationState.startApplyingRemoteOps(); try { for (const op of ops) { - await this._applyOperation(op); + try { + await this._applyOperation(op); + appliedOps.push(op); + } catch (e) { + // Log the error + OpLog.err( + `OperationApplierService: Failed to apply operation ${op.id}. ` + + `${appliedOps.length} ops were applied before this failure.`, + e, + ); + + // Return partial success result with the failed op + return { + appliedOps, + failedOp: { + op, + error: e instanceof Error ? e : new Error(String(e)), + }, + }; + } } } finally { this.hydrationState.endApplyingRemoteOps(); } OpLog.normal('OperationApplierService: Finished applying operations.'); + return { appliedOps }; } /** diff --git a/src/app/core/persistence/operation-log/store/operation-log-hydrator.service.spec.ts b/src/app/core/persistence/operation-log/store/operation-log-hydrator.service.spec.ts index 88d39e5d9..3d1e2bc26 100644 --- a/src/app/core/persistence/operation-log/store/operation-log-hydrator.service.spec.ts +++ b/src/app/core/persistence/operation-log/store/operation-log-hydrator.service.spec.ts @@ -143,7 +143,9 @@ describe('OperationLogHydratorService', () => { mockOpLogStore.getFailedRemoteOps.and.returnValue(Promise.resolve([])); mockOpLogStore.markApplied.and.returnValue(Promise.resolve()); mockOpLogStore.markFailed.and.returnValue(Promise.resolve()); - mockOperationApplierService.applyOperations.and.returnValue(Promise.resolve()); + mockOperationApplierService.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); mockMigrationService.checkAndMigrate.and.returnValue(Promise.resolve()); mockSchemaMigrationService.needsMigration.and.returnValue(false); mockSchemaMigrationService.operationNeedsMigration.and.returnValue(false); @@ -1010,7 +1012,9 @@ describe('OperationLogHydratorService', () => { }; mockOpLogStore.getFailedRemoteOps.and.returnValue(Promise.resolve([failedEntry])); - mockOperationApplierService.applyOperations.and.returnValue(Promise.resolve()); + mockOperationApplierService.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); await service.retryFailedRemoteOps(); @@ -1040,7 +1044,9 @@ describe('OperationLogHydratorService', () => { }); mockOpLogStore.getFailedRemoteOps.and.returnValue(Promise.resolve([failedEntry])); - mockOperationApplierService.applyOperations.and.returnValue(Promise.resolve()); + mockOperationApplierService.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); await service.retryFailedRemoteOps(); @@ -1060,8 +1066,14 @@ describe('OperationLogHydratorService', () => { }; mockOpLogStore.getFailedRemoteOps.and.returnValue(Promise.resolve([failedEntry])); - mockOperationApplierService.applyOperations.and.rejectWith( - new Error('Still failing'), + mockOperationApplierService.applyOperations.and.returnValue( + Promise.resolve({ + appliedOps: [], + failedOp: { + op: failedOp, + error: new Error('Still failing'), + }, + }), ); await service.retryFailedRemoteOps(); diff --git a/src/app/core/persistence/operation-log/store/operation-log-hydrator.service.ts b/src/app/core/persistence/operation-log/store/operation-log-hydrator.service.ts index f7cc80d91..1ae33eb22 100644 --- a/src/app/core/persistence/operation-log/store/operation-log-hydrator.service.ts +++ b/src/app/core/persistence/operation-log/store/operation-log-hydrator.service.ts @@ -182,7 +182,11 @@ export class OperationLogHydratorService { // Use OperationApplierService to handle dependency resolution during replay. // This ensures operations referencing deleted entities are handled gracefully // rather than throwing errors (e.g., adding subtask to deleted parent). - await this.operationApplierService.applyOperations(opsToReplay); + const tailReplayResult = + await this.operationApplierService.applyOperations(opsToReplay); + if (tailReplayResult.failedOp) { + throw tailReplayResult.failedOp.error; + } // CHECKPOINT C: Validate state after replaying tail operations // Must validate BEFORE saving snapshot to avoid persisting corrupted state @@ -256,7 +260,11 @@ export class OperationLogHydratorService { // Use OperationApplierService to handle dependency resolution during replay. // This ensures operations referencing deleted entities are handled gracefully // rather than throwing errors (e.g., adding subtask to deleted parent). - await this.operationApplierService.applyOperations(opsToReplay); + const fullReplayResult = + await this.operationApplierService.applyOperations(opsToReplay); + if (fullReplayResult.failedOp) { + throw fullReplayResult.failedOp.error; + } // CHECKPOINT C: Validate state after replaying all operations // Must validate BEFORE saving snapshot to avoid persisting corrupted state @@ -885,14 +893,17 @@ export class OperationLogHydratorService { const stillFailedOpIds: string[] = []; for (const entry of failedOps) { - try { - await this.operationApplierService.applyOperations([entry.op]); - // If we get here without throwing, the operation succeeded - appliedOpIds.push(entry.op.id); - } catch (e) { + const result = await this.operationApplierService.applyOperations([entry.op]); + if (result.failedOp) { // SyncStateCorruptedError or any other error means the op still can't be applied - OpLog.warn(`OperationLogHydratorService: Failed to retry op ${entry.op.id}`, e); + OpLog.warn( + `OperationLogHydratorService: Failed to retry op ${entry.op.id}`, + result.failedOp.error, + ); stillFailedOpIds.push(entry.op.id); + } else { + // Operation succeeded + appliedOpIds.push(entry.op.id); } } diff --git a/src/app/core/persistence/operation-log/sync/conflict-resolution.service.spec.ts b/src/app/core/persistence/operation-log/sync/conflict-resolution.service.spec.ts index e150a89b4..b4952c1ba 100644 --- a/src/app/core/persistence/operation-log/sync/conflict-resolution.service.spec.ts +++ b/src/app/core/persistence/operation-log/sync/conflict-resolution.service.spec.ts @@ -5,9 +5,13 @@ import { OperationApplierService } from '../processing/operation-applier.service import { OperationLogStoreService } from '../store/operation-log-store.service'; import { SnackService } from '../../../snack/snack.service'; import { ValidateStateService } from '../processing/validate-state.service'; -import { of } from 'rxjs'; +import { of, Subject } from 'rxjs'; import { EntityConflict, OpType, Operation } from '../operation.types'; -import { DialogConflictResolutionComponent } from '../../../../imex/sync/dialog-conflict-resolution/dialog-conflict-resolution.component'; +import { + ConflictResolutionResult, + DialogConflictResolutionComponent, +} from '../../../../imex/sync/dialog-conflict-resolution/dialog-conflict-resolution.component'; +import { UserInputWaitStateService } from '../../../../imex/sync/user-input-wait-state.service'; describe('ConflictResolutionService', () => { let service: ConflictResolutionService; @@ -16,6 +20,7 @@ describe('ConflictResolutionService', () => { let mockOpLogStore: jasmine.SpyObj; let mockSnackService: jasmine.SpyObj; let mockValidateStateService: jasmine.SpyObj; + let mockUserInputWaitState: jasmine.SpyObj; const createMockOp = (id: string, clientId: string): Operation => ({ id, @@ -47,6 +52,11 @@ describe('ConflictResolutionService', () => { mockValidateStateService = jasmine.createSpyObj('ValidateStateService', [ 'validateAndRepairCurrentState', ]); + mockUserInputWaitState = jasmine.createSpyObj('UserInputWaitStateService', [ + 'startWaiting', + ]); + // startWaiting returns a stop function + mockUserInputWaitState.startWaiting.and.returnValue(() => {}); TestBed.configureTestingModule({ providers: [ @@ -56,12 +66,13 @@ describe('ConflictResolutionService', () => { { provide: OperationLogStoreService, useValue: mockOpLogStore }, { provide: SnackService, useValue: mockSnackService }, { provide: ValidateStateService, useValue: mockValidateStateService }, + { provide: UserInputWaitStateService, useValue: mockUserInputWaitState }, ], }); service = TestBed.inject(ConflictResolutionService); // Default mock behaviors - mockOperationApplier.applyOperations.and.resolveTo(); + mockOperationApplier.applyOperations.and.resolveTo({ appliedOps: [] }); mockValidateStateService.validateAndRepairCurrentState.and.resolveTo(true); mockOpLogStore.getUnsyncedByEntity.and.resolveTo(new Map()); }); @@ -131,6 +142,10 @@ describe('ConflictResolutionService', () => { mockDialog.open.and.returnValue(mockDialogRef); mockOpLogStore.hasOp.and.resolveTo(false); mockOpLogStore.append.and.resolveTo(100); + // Need to return the applied ops so markApplied can be called with correct seqs + mockOperationApplier.applyOperations.and.resolveTo({ + appliedOps: conflicts[0].remoteOps, + }); await service.presentConflicts(conflicts); @@ -305,10 +320,14 @@ describe('ConflictResolutionService', () => { mockOpLogStore.hasOp.and.resolveTo(false); mockOpLogStore.append.and.resolveTo(100); - // Simulate application failure by throwing an error - mockOperationApplier.applyOperations.and.rejectWith( - new Error('Simulated dependency failure'), - ); + // Simulate application failure by returning result with failedOp + mockOperationApplier.applyOperations.and.resolveTo({ + appliedOps: [], + failedOp: { + op: conflicts[0].remoteOps[0], + error: new Error('Simulated dependency failure'), + }, + }); await service.presentConflicts(conflicts); @@ -535,4 +554,130 @@ describe('ConflictResolutionService', () => { expect(mockValidateStateService.validateAndRepairCurrentState).toHaveBeenCalled(); }); }); + + describe('dialog timeout', () => { + /** + * Tests for the conflict dialog timeout feature. + * The dialog auto-cancels after CONFLICT_DIALOG_TIMEOUT_MS to prevent + * sync from being blocked indefinitely if user walks away. + */ + + // Use constant to avoid linter complaints about mixed operators + const FIVE_MINUTES_MS = 5 * 60 * 1000; + + // Sample conflicts for timeout tests + const localOps = [createMockOp('local-1', 'client-1')]; + const remoteOps = [createMockOp('remote-1', 'client-2')]; + const conflicts: EntityConflict[] = [ + { + entityType: 'TASK', + entityId: 'task-1', + localOps, + remoteOps, + suggestedResolution: 'manual', + }, + ]; + + beforeEach(() => { + jasmine.clock().install(); + }); + + afterEach(() => { + jasmine.clock().uninstall(); + }); + + it('should auto-cancel dialog after timeout', async () => { + // Use Subject to control when the dialog closes + const afterClosed$ = new Subject(); + + const mockDialogRef = { + afterClosed: () => afterClosed$.asObservable(), + close: jasmine.createSpy('close').and.callFake(() => { + // Simulate dialog close by emitting undefined + afterClosed$.next(undefined); + afterClosed$.complete(); + }), + } as unknown as MatDialogRef; + + mockDialog.open.and.returnValue(mockDialogRef); + + // Start the presentConflicts call + const presentPromise = service.presentConflicts(conflicts); + + // Advance time past the timeout + jasmine.clock().tick(FIVE_MINUTES_MS + 100); + + // Wait for the method to complete + await presentPromise; + + // Verify dialog was closed + expect(mockDialogRef.close).toHaveBeenCalledWith(undefined); + + // Verify snack was shown + expect(mockSnackService.open).toHaveBeenCalledWith({ + type: 'ERROR', + msg: jasmine.any(String), + }); + + // Verify no operations were applied (cancelled) + expect(mockOpLogStore.append).not.toHaveBeenCalled(); + expect(mockOperationApplier.applyOperations).not.toHaveBeenCalled(); + }); + + it('should clear timeout when dialog closes normally', async () => { + const mockDialogRef = { + afterClosed: () => + of({ + resolutions: new Map([[0, 'local']]), + conflicts, + }), + close: jasmine.createSpy('close'), + } as unknown as MatDialogRef; + + mockDialog.open.and.returnValue(mockDialogRef); + mockOpLogStore.hasOp.and.resolveTo(false); + mockOpLogStore.append.and.resolveTo(100); + + await service.presentConflicts(conflicts); + + // Advance time past the timeout (after dialog already closed) + jasmine.clock().tick(FIVE_MINUTES_MS + 100); + + // Dialog.close should NOT have been called by the timeout (only if it was called at all) + // The key test is that no extra snack warning was shown + // Actually we check that snack was NOT called with ERROR type for timeout + expect(mockSnackService.open).not.toHaveBeenCalledWith({ + type: 'ERROR', + msg: jasmine.stringMatching(/timeout/i), + }); + }); + + it('should not apply any operations when dialog times out', async () => { + // Use Subject to control when the dialog closes + const afterClosed$ = new Subject(); + + const mockDialogRef = { + afterClosed: () => afterClosed$.asObservable(), + close: jasmine.createSpy('close').and.callFake(() => { + afterClosed$.next(undefined); + afterClosed$.complete(); + }), + } as unknown as MatDialogRef; + + mockDialog.open.and.returnValue(mockDialogRef); + + const presentPromise = service.presentConflicts(conflicts); + + // Advance time past timeout + jasmine.clock().tick(FIVE_MINUTES_MS + 100); + + await presentPromise; + + // No operations should be stored or applied + expect(mockOpLogStore.append).not.toHaveBeenCalled(); + expect(mockOpLogStore.markApplied).not.toHaveBeenCalled(); + expect(mockOpLogStore.markRejected).not.toHaveBeenCalled(); + expect(mockOperationApplier.applyOperations).not.toHaveBeenCalled(); + }); + }); }); diff --git a/src/app/core/persistence/operation-log/sync/conflict-resolution.service.ts b/src/app/core/persistence/operation-log/sync/conflict-resolution.service.ts index d8d99407d..857794e13 100644 --- a/src/app/core/persistence/operation-log/sync/conflict-resolution.service.ts +++ b/src/app/core/persistence/operation-log/sync/conflict-resolution.service.ts @@ -13,7 +13,10 @@ import { firstValueFrom } from 'rxjs'; import { SnackService } from '../../../snack/snack.service'; import { T } from '../../../../t.const'; import { ValidateStateService } from '../processing/validate-state.service'; -import { MAX_CONFLICT_RETRY_ATTEMPTS } from '../operation-log.const'; +import { + MAX_CONFLICT_RETRY_ATTEMPTS, + CONFLICT_DIALOG_TIMEOUT_MS, +} from '../operation-log.const'; import { UserInputWaitStateService } from '../../../../imex/sync/user-input-wait-state.service'; /** @@ -93,6 +96,7 @@ export class ConflictResolutionService { // Signal that we're waiting for user input to prevent sync timeout const stopWaiting = this.userInputWaitState.startWaiting('oplog-conflict'); let result: ConflictResolutionResult | undefined; + let timeoutId: ReturnType | undefined; try { this._dialogRef = this.dialog.open(DialogConflictResolutionComponent, { @@ -100,8 +104,28 @@ export class ConflictResolutionService { disableClose: false, }); + // Set up timeout to auto-cancel the dialog + // This prevents sync from being blocked indefinitely if user walks away + timeoutId = setTimeout(() => { + if (this._dialogRef) { + OpLog.warn( + 'ConflictResolutionService: Dialog timeout - auto-cancelling after ' + + `${CONFLICT_DIALOG_TIMEOUT_MS / 1000}s`, + ); + this._dialogRef.close(undefined); + this.snackService.open({ + type: 'ERROR', + msg: T.F.SYNC.S.CONFLICT_DIALOG_TIMEOUT, + }); + } + }, CONFLICT_DIALOG_TIMEOUT_MS); + result = await firstValueFrom(this._dialogRef.afterClosed()); } finally { + // Clear timeout on normal close + if (timeoutId) { + clearTimeout(timeoutId); + } stopWaiting(); } @@ -244,20 +268,40 @@ export class ConflictResolutionService { OpLog.normal( `ConflictResolutionService: Applying ${allOpsToApply.length} ops in single batch`, ); - try { - await this.operationApplier.applyOperations(allOpsToApply); - // If we get here without throwing, all ops succeeded - mark as applied - const successSeqs = allStoredOps.map((o) => o.seq); - await this.opLogStore.markApplied(successSeqs); + // Map op ID to seq for marking partial success + const opIdToSeq = new Map(allStoredOps.map((o) => [o.id, o.seq])); + + const applyResult = await this.operationApplier.applyOperations(allOpsToApply); + + // Mark successfully applied ops + const appliedSeqs = applyResult.appliedOps + .map((op) => opIdToSeq.get(op.id)) + .filter((seq): seq is number => seq !== undefined); + + if (appliedSeqs.length > 0) { + await this.opLogStore.markApplied(appliedSeqs); OpLog.normal( - `ConflictResolutionService: Successfully applied ${allOpsToApply.length} ops`, + `ConflictResolutionService: Successfully applied ${appliedSeqs.length} ops`, ); - } catch (e) { - // SyncStateCorruptedError or any other error means ops failed to apply - OpLog.err('ConflictResolutionService: Exception applying ops', e); - const allOpIds = allStoredOps.map((o) => o.id); - await this.opLogStore.markFailed(allOpIds, MAX_CONFLICT_RETRY_ATTEMPTS); + } + + // Handle partial failure + if (applyResult.failedOp) { + // Find all ops that weren't applied (failed op + remaining ops) + const failedOpIndex = allOpsToApply.findIndex( + (op) => op.id === applyResult.failedOp!.op.id, + ); + const failedOps = allOpsToApply.slice(failedOpIndex); + const failedOpIds = failedOps.map((op) => op.id); + + OpLog.err( + `ConflictResolutionService: ${applyResult.appliedOps.length} ops applied before failure. ` + + `Marking ${failedOpIds.length} ops as failed.`, + applyResult.failedOp.error, + ); + await this.opLogStore.markFailed(failedOpIds, MAX_CONFLICT_RETRY_ATTEMPTS); + this.snackService.open({ type: 'ERROR', msg: T.F.SYNC.S.CONFLICT_RESOLUTION_FAILED, diff --git a/src/app/core/persistence/operation-log/sync/operation-log-sync.service.spec.ts b/src/app/core/persistence/operation-log/sync/operation-log-sync.service.spec.ts index a49cf59c6..d01a6a580 100644 --- a/src/app/core/persistence/operation-log/sync/operation-log-sync.service.spec.ts +++ b/src/app/core/persistence/operation-log/sync/operation-log-sync.service.spec.ts @@ -137,6 +137,10 @@ describe('OperationLogSyncService', () => { dependencyResolverSpy.extractDependencies.and.returnValue([]); // Default: no local ops to replay after SYNC_IMPORT opLogStoreSpy.getOpsAfterSeq.and.returnValue(Promise.resolve([])); + // Default: successful operation application + operationApplierServiceSpy.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); }); it('should be created', () => { @@ -1217,7 +1221,9 @@ describe('OperationLogSyncService', () => { opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false)); opLogStoreSpy.append.and.returnValue(Promise.resolve(1)); opLogStoreSpy.markApplied.and.returnValue(Promise.resolve()); - operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve()); + operationApplierServiceSpy.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); // Process remote ops with SYNC_IMPORT // Note: _processRemoteOps is private but we can still call it in tests @@ -1240,7 +1246,9 @@ describe('OperationLogSyncService', () => { opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false)); opLogStoreSpy.append.and.returnValue(Promise.resolve(1)); opLogStoreSpy.markApplied.and.returnValue(Promise.resolve()); - operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve()); + operationApplierServiceSpy.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); await (service as any)._processRemoteOps([backupImportOp]); @@ -1286,7 +1294,9 @@ describe('OperationLogSyncService', () => { opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false)); opLogStoreSpy.append.and.returnValue(Promise.resolve(1)); opLogStoreSpy.markApplied.and.returnValue(Promise.resolve()); - operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve()); + operationApplierServiceSpy.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); // Spy on detectConflicts to verify it's NOT called spyOn(service, 'detectConflicts').and.callThrough(); @@ -1320,7 +1330,9 @@ describe('OperationLogSyncService', () => { opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false)); opLogStoreSpy.append.and.returnValue(Promise.resolve(1)); opLogStoreSpy.markApplied.and.returnValue(Promise.resolve()); - operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve()); + operationApplierServiceSpy.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); await (service as any)._processRemoteOps([syncImportOp, followUpOp]); @@ -1350,7 +1362,9 @@ describe('OperationLogSyncService', () => { opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false)); opLogStoreSpy.append.and.returnValue(Promise.resolve(1)); opLogStoreSpy.markApplied.and.returnValue(Promise.resolve()); - operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve()); + operationApplierServiceSpy.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); spyOn(service, 'detectConflicts').and.callThrough(); @@ -1434,7 +1448,9 @@ describe('OperationLogSyncService', () => { opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false)); opLogStoreSpy.append.and.returnValue(Promise.resolve(1)); opLogStoreSpy.markApplied.and.returnValue(Promise.resolve()); - operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve()); + operationApplierServiceSpy.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); // Process the SYNC_IMPORT await (service as any)._processRemoteOps([syncImportOp]); @@ -1488,7 +1504,9 @@ describe('OperationLogSyncService', () => { opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false)); opLogStoreSpy.append.and.returnValue(Promise.resolve(1)); opLogStoreSpy.markApplied.and.returnValue(Promise.resolve()); - operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve()); + operationApplierServiceSpy.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); await (service as any)._processRemoteOps([syncImportOp]); @@ -1531,7 +1549,9 @@ describe('OperationLogSyncService', () => { opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false)); opLogStoreSpy.append.and.returnValue(Promise.resolve(1)); opLogStoreSpy.markApplied.and.returnValue(Promise.resolve()); - operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve()); + operationApplierServiceSpy.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); await (service as any)._processRemoteOps([syncImportOp]); @@ -1577,7 +1597,9 @@ describe('OperationLogSyncService', () => { opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false)); opLogStoreSpy.append.and.returnValue(Promise.resolve(1)); opLogStoreSpy.markApplied.and.returnValue(Promise.resolve()); - operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve()); + operationApplierServiceSpy.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); await (service as any)._processRemoteOps([syncImportOp]); @@ -1603,7 +1625,9 @@ describe('OperationLogSyncService', () => { opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false)); opLogStoreSpy.append.and.returnValue(Promise.resolve(1)); opLogStoreSpy.markApplied.and.returnValue(Promise.resolve()); - operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve()); + operationApplierServiceSpy.applyOperations.and.returnValue( + Promise.resolve({ appliedOps: [] }), + ); await (service as any)._processRemoteOps([syncImportOp]); diff --git a/src/app/core/persistence/operation-log/sync/operation-log-sync.service.ts b/src/app/core/persistence/operation-log/sync/operation-log-sync.service.ts index ee0d9e60f..c2a661672 100644 --- a/src/app/core/persistence/operation-log/sync/operation-log-sync.service.ts +++ b/src/app/core/persistence/operation-log/sync/operation-log-sync.service.ts @@ -179,12 +179,12 @@ export class OperationLogSyncService { return; } - // SERVER MIGRATION CHECK: If this client has history but appears to be connecting - // to a new/empty server, create a SYNC_IMPORT with full state first. - // This handles the scenario where user changes sync server credentials. - await this._checkAndHandleServerMigration(syncProvider); - - const result = await this.uploadService.uploadPendingOps(syncProvider); + // SERVER MIGRATION CHECK: Passed as callback to execute INSIDE the upload lock. + // This prevents race conditions where multiple tabs could both detect migration + // and create duplicate SYNC_IMPORT operations. + const result = await this.uploadService.uploadPendingOps(syncProvider, { + preUploadCallback: () => this._checkAndHandleServerMigration(syncProvider), + }); // STEP 1: Process piggybacked ops FIRST // This is critical: piggybacked ops may contain the "winning" remote versions @@ -740,20 +740,17 @@ export class OperationLogSyncService { * @throws Re-throws if application fails (ops marked as failed first) */ private async _applyNonConflictingOps(ops: Operation[]): Promise { - // Track stored seqs for marking as applied after success - const storedSeqs: number[] = []; + // Map op ID to seq for marking partial success + const opIdToSeq = new Map(); // Track ops that are NOT duplicates (need to be applied) const opsToApply: Operation[] = []; - // Track op IDs for error handling - const storedOpIds: string[] = []; // Store operations with pending status before applying // If we crash after storing but before applying, these will be retried on startup for (const op of ops) { if (!(await this.opLogStore.hasOp(op.id))) { const seq = await this.opLogStore.append(op, 'remote', { pendingApply: true }); - storedSeqs.push(seq); - storedOpIds.push(op.id); + opIdToSeq.set(op.id, seq); opsToApply.push(op); } else { OpLog.verbose(`OperationLogSyncService: Skipping duplicate op: ${op.id}`); @@ -762,26 +759,39 @@ export class OperationLogSyncService { // Apply only NON-duplicate ops to NgRx store if (opsToApply.length > 0) { - try { - await this.operationApplier.applyOperations(opsToApply); - } catch (e) { - // If application fails catastrophically, mark ops as failed to prevent - // them from being uploaded in a potentially inconsistent state - OpLog.err( - `OperationLogSyncService: Failed to apply ${opsToApply.length} ops. Marking as failed.`, - e, - ); - await this.opLogStore.markFailed(storedOpIds); - throw e; - } - } + const result = await this.operationApplier.applyOperations(opsToApply); - // Mark ops as successfully applied (crash recovery will skip these) - if (storedSeqs.length > 0) { - await this.opLogStore.markApplied(storedSeqs); - OpLog.normal( - `OperationLogSyncService: Applied and marked ${storedSeqs.length} remote ops`, - ); + // Mark successfully applied ops + const appliedSeqs = result.appliedOps + .map((op) => opIdToSeq.get(op.id)) + .filter((seq): seq is number => seq !== undefined); + + if (appliedSeqs.length > 0) { + await this.opLogStore.markApplied(appliedSeqs); + OpLog.normal( + `OperationLogSyncService: Applied and marked ${appliedSeqs.length} remote ops`, + ); + } + + // Handle partial failure + if (result.failedOp) { + // Find all ops that weren't applied (failed op + remaining ops) + const failedOpIndex = opsToApply.findIndex( + (op) => op.id === result.failedOp!.op.id, + ); + const failedOps = opsToApply.slice(failedOpIndex); + const failedOpIds = failedOps.map((op) => op.id); + + OpLog.err( + `OperationLogSyncService: ${result.appliedOps.length} ops applied before failure. ` + + `Marking ${failedOpIds.length} ops as failed.`, + result.failedOp.error, + ); + await this.opLogStore.markFailed(failedOpIds); + + // Re-throw if it's a SyncStateCorruptedError, otherwise wrap it + throw result.failedOp.error; + } } } diff --git a/src/app/core/persistence/operation-log/sync/operation-log-upload.service.spec.ts b/src/app/core/persistence/operation-log/sync/operation-log-upload.service.spec.ts index 757fa05fd..08e45adea 100644 --- a/src/app/core/persistence/operation-log/sync/operation-log-upload.service.spec.ts +++ b/src/app/core/persistence/operation-log/sync/operation-log-upload.service.spec.ts @@ -816,5 +816,181 @@ describe('OperationLogUploadService', () => { expect(mockApiProvider.uploadOps).not.toHaveBeenCalled(); }); }); + + describe('preUploadCallback (server migration race condition fix)', () => { + /** + * These tests verify that preUploadCallback is: + * 1. Called INSIDE the upload lock + * 2. Called BEFORE checking for pending ops + * + * This fixes a race condition where multiple tabs could both detect + * server migration and create duplicate SYNC_IMPORT operations. + */ + let mockApiProvider: jasmine.SpyObj< + SyncProviderServiceInterface & OperationSyncCapable + >; + let mockFileProvider: jasmine.SpyObj>; + + beforeEach(() => { + mockApiProvider = jasmine.createSpyObj('ApiSyncProvider', [ + 'getLastServerSeq', + 'uploadOps', + 'setLastServerSeq', + ]); + (mockApiProvider as any).supportsOperationSync = true; + (mockApiProvider as any).privateCfg = { + load: jasmine + .createSpy('privateCfg.load') + .and.returnValue(Promise.resolve(null)), + }; + mockApiProvider.getLastServerSeq.and.returnValue(Promise.resolve(0)); + mockApiProvider.uploadOps.and.returnValue( + Promise.resolve({ results: [], latestSeq: 0, newOps: [] }), + ); + mockApiProvider.setLastServerSeq.and.returnValue(Promise.resolve()); + + mockFileProvider = jasmine.createSpyObj('FileSyncProvider', ['uploadFile']); + (mockFileProvider as any).supportsOperationSync = false; + mockFileProvider.uploadFile.and.returnValue(Promise.resolve({ rev: 'test-rev' })); + }); + + it('should call preUploadCallback inside the lock for API-based sync', async () => { + const callOrder: string[] = []; + + mockLockService.request.and.callFake( + async (_name: string, fn: () => Promise) => { + callOrder.push('lock-acquired'); + await fn(); + callOrder.push('lock-released'); + }, + ); + + const callback = jasmine.createSpy('preUploadCallback').and.callFake(async () => { + callOrder.push('callback-executed'); + }); + + await service.uploadPendingOps(mockApiProvider, { preUploadCallback: callback }); + + expect(callback).toHaveBeenCalled(); + // Verify callback was called INSIDE the lock + expect(callOrder).toEqual([ + 'lock-acquired', + 'callback-executed', + 'lock-released', + ]); + }); + + it('should call preUploadCallback inside the lock for file-based sync', async () => { + const callOrder: string[] = []; + + mockLockService.request.and.callFake( + async (_name: string, fn: () => Promise) => { + callOrder.push('lock-acquired'); + await fn(); + callOrder.push('lock-released'); + }, + ); + + const callback = jasmine.createSpy('preUploadCallback').and.callFake(async () => { + callOrder.push('callback-executed'); + }); + + await service.uploadPendingOps(mockFileProvider, { preUploadCallback: callback }); + + expect(callback).toHaveBeenCalled(); + // Verify callback was called INSIDE the lock + expect(callOrder).toEqual([ + 'lock-acquired', + 'callback-executed', + 'lock-released', + ]); + }); + + it('should call preUploadCallback BEFORE checking for pending ops (API)', async () => { + const callOrder: string[] = []; + + mockOpLogStore.getUnsynced.and.callFake(async () => { + callOrder.push('getUnsynced-called'); + return []; + }); + + const callback = jasmine.createSpy('preUploadCallback').and.callFake(async () => { + callOrder.push('callback-executed'); + }); + + await service.uploadPendingOps(mockApiProvider, { preUploadCallback: callback }); + + // Callback should be called before getUnsynced + expect(callOrder).toEqual(['callback-executed', 'getUnsynced-called']); + }); + + it('should call preUploadCallback BEFORE checking for pending ops (file-based)', async () => { + const callOrder: string[] = []; + + mockOpLogStore.getUnsynced.and.callFake(async () => { + callOrder.push('getUnsynced-called'); + return []; + }); + + const callback = jasmine.createSpy('preUploadCallback').and.callFake(async () => { + callOrder.push('callback-executed'); + }); + + await service.uploadPendingOps(mockFileProvider, { preUploadCallback: callback }); + + // Callback should be called before getUnsynced + expect(callOrder).toEqual(['callback-executed', 'getUnsynced-called']); + }); + + it('should not call preUploadCallback if not provided', async () => { + await service.uploadPendingOps(mockApiProvider); + + // Should complete without error, verifying optional nature + expect(mockLockService.request).toHaveBeenCalled(); + }); + + it('should propagate errors from preUploadCallback', async () => { + const callback = jasmine + .createSpy('preUploadCallback') + .and.rejectWith(new Error('Migration check failed')); + + await expectAsync( + service.uploadPendingOps(mockApiProvider, { preUploadCallback: callback }), + ).toBeRejectedWithError('Migration check failed'); + + // Should not proceed to check for pending ops + expect(mockOpLogStore.getUnsynced).not.toHaveBeenCalled(); + }); + + it('should allow callback to create new operations that get uploaded', async () => { + // First call to getUnsynced returns empty (callback hasn't run yet) + // After callback runs, we simulate it creating a new op + let callCount = 0; + mockOpLogStore.getUnsynced.and.callFake(async () => { + callCount++; + if (callCount === 1) { + // After callback ran, return the new op it created + return [createMockEntry(1, 'sync-import-op', 'client-1')]; + } + return []; + }); + + mockApiProvider.uploadOps.and.returnValue( + Promise.resolve({ + results: [{ opId: 'sync-import-op', accepted: true }], + latestSeq: 1, + newOps: [], + }), + ); + + const callback = jasmine.createSpy('preUploadCallback').and.resolveTo(undefined); + + await service.uploadPendingOps(mockApiProvider, { preUploadCallback: callback }); + + // Callback was called, and the op it created was uploaded + expect(callback).toHaveBeenCalled(); + expect(mockApiProvider.uploadOps).toHaveBeenCalled(); + }); + }); }); }); diff --git a/src/app/core/persistence/operation-log/sync/operation-log-upload.service.ts b/src/app/core/persistence/operation-log/sync/operation-log-upload.service.ts index 4cd129939..b4f193cf7 100644 --- a/src/app/core/persistence/operation-log/sync/operation-log-upload.service.ts +++ b/src/app/core/persistence/operation-log/sync/operation-log-upload.service.ts @@ -44,6 +44,17 @@ export interface UploadResult { rejectedOps: RejectedOpInfo[]; } +/** + * Options for uploadPendingOps. + */ +export interface UploadOptions { + /** + * Optional callback executed INSIDE the upload lock, BEFORE checking for pending ops. + * Use this for operations that must be atomic with the upload, such as server migration checks. + */ + preUploadCallback?: () => Promise; +} + /** * Handles uploading local pending operations to remote storage. * Supports both API-based sync (for real-time providers) and @@ -60,6 +71,7 @@ export class OperationLogUploadService { async uploadPendingOps( syncProvider: SyncProviderServiceInterface, + options?: UploadOptions, ): Promise { if (!syncProvider) { OpLog.warn('OperationLogUploadService: No active sync provider passed for upload.'); @@ -68,15 +80,16 @@ export class OperationLogUploadService { // Use operation sync if supported if (isOperationSyncCapable(syncProvider)) { - return this._uploadPendingOpsViaApi(syncProvider); + return this._uploadPendingOpsViaApi(syncProvider, options); } // Fall back to file-based sync - return this._uploadPendingOpsViaFiles(syncProvider); + return this._uploadPendingOpsViaFiles(syncProvider, options); } private async _uploadPendingOpsViaApi( syncProvider: SyncProviderServiceInterface & OperationSyncCapable, + options?: UploadOptions, ): Promise { OpLog.normal('OperationLogUploadService: Uploading pending operations via API...'); @@ -86,6 +99,12 @@ export class OperationLogUploadService { let rejectedCount = 0; await this.lockService.request('sp_op_log_upload', async () => { + // Execute pre-upload callback INSIDE the lock, BEFORE checking for pending ops. + // This ensures operations like server migration checks are atomic with the upload. + if (options?.preUploadCallback) { + await options.preUploadCallback(); + } + const pendingOps = await this.opLogStore.getUnsynced(); if (pendingOps.length === 0) { @@ -250,12 +269,18 @@ export class OperationLogUploadService { private async _uploadPendingOpsViaFiles( syncProvider: SyncProviderServiceInterface, + options?: UploadOptions, ): Promise { OpLog.normal('OperationLogUploadService: Uploading pending operations via files...'); let uploadedCount = 0; await this.lockService.request('sp_op_log_upload', async () => { + // Execute pre-upload callback INSIDE the lock, BEFORE checking for pending ops. + if (options?.preUploadCallback) { + await options.preUploadCallback(); + } + const pendingOps = await this.opLogStore.getUnsynced(); if (pendingOps.length === 0) { diff --git a/src/app/t.const.ts b/src/app/t.const.ts index 185a1abd6..b62792995 100644 --- a/src/app/t.const.ts +++ b/src/app/t.const.ts @@ -1217,6 +1217,7 @@ const T = { INTEGRITY_CHECK_FAILED: 'F.SYNC.S.INTEGRITY_CHECK_FAILED', COMPACTION_FAILED: 'F.SYNC.S.COMPACTION_FAILED', CONFLICT_RESOLUTION_FAILED: 'F.SYNC.S.CONFLICT_RESOLUTION_FAILED', + CONFLICT_DIALOG_TIMEOUT: 'F.SYNC.S.CONFLICT_DIALOG_TIMEOUT', OPERATION_PERMANENTLY_FAILED: 'F.SYNC.S.OPERATION_PERMANENTLY_FAILED', DATA_REPAIRED: 'F.SYNC.S.DATA_REPAIRED', INVALID_OPERATION_PAYLOAD: 'F.SYNC.S.INVALID_OPERATION_PAYLOAD', diff --git a/src/assets/i18n/en.json b/src/assets/i18n/en.json index ec5d8f252..49e3357f9 100644 --- a/src/assets/i18n/en.json +++ b/src/assets/i18n/en.json @@ -1181,6 +1181,7 @@ "CLOCK_DRIFT_WARNING": "Your device clock appears to be off by {{minutes}} minutes. This may cause sync issues.", "COMPACTION_FAILED": "Database cleanup failed. App may slow down.", "CONFLICT_RESOLUTION_FAILED": "Sync conflict resolution failed. Please reload.", + "CONFLICT_DIALOG_TIMEOUT": "Sync conflict dialog timed out and was cancelled. Sync will retry on next trigger.", "OPERATION_PERMANENTLY_FAILED": "Some sync operations failed and could not be applied. Data may be incomplete.", "DATA_REPAIRED": "Data automatically repaired ({{count}} issues fixed)", "ERROR_CORS": "WebDAV Sync Error: Network request failed.\n\nThis might be a CORS issue. Please ensure:\n• Your WebDAV server allows Cross-Origin requests\n• The server URL is correct and accessible\n• You have a working internet connection",