mirror of
https://github.com/johannesjo/super-productivity.git
synced 2026-01-23 02:36:05 +00:00
fix(sync): address operation-log review issues
- Fix server migration race condition by moving check inside upload lock using preUploadCallback pattern to ensure atomicity - Fix batch apply error handling to track partial success, returning ApplyOperationsResult with appliedOps and failedOp info - Add 5-minute timeout to conflict dialog to prevent indefinite blocking if user walks away during sync conflict resolution
This commit is contained in:
parent
174d4aa6b2
commit
262f8f10b1
15 changed files with 672 additions and 108 deletions
|
|
@ -40,6 +40,9 @@ describe('Migration Handling Integration', () => {
|
|||
operationApplierSpy = jasmine.createSpyObj('OperationApplierService', [
|
||||
'applyOperations',
|
||||
]);
|
||||
operationApplierSpy.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
TestBed.configureTestingModule({
|
||||
providers: [
|
||||
|
|
@ -203,13 +206,17 @@ describe('Migration Handling Integration', () => {
|
|||
schemaVersion: 1,
|
||||
});
|
||||
|
||||
it('should mark operations as failed if application throws error', async () => {
|
||||
it('should mark operations as failed if application returns failure result', async () => {
|
||||
const op = createOp('op-fail');
|
||||
|
||||
// Make applier throw error
|
||||
operationApplierSpy.applyOperations.and.rejectWith(
|
||||
new Error('Simulated Apply Error'),
|
||||
);
|
||||
// Make applier return failure result (new behavior with partial success support)
|
||||
operationApplierSpy.applyOperations.and.resolveTo({
|
||||
appliedOps: [],
|
||||
failedOp: {
|
||||
op,
|
||||
error: new Error('Simulated Apply Error'),
|
||||
},
|
||||
});
|
||||
|
||||
// Spy on store to verify markFailed is called
|
||||
spyOn(opLogStore, 'markFailed').and.callThrough();
|
||||
|
|
|
|||
|
|
@ -166,6 +166,7 @@ describe('Service Logic Integration', () => {
|
|||
'presentConflicts',
|
||||
]);
|
||||
applierSpy = jasmine.createSpyObj('OperationApplierService', ['applyOperations']);
|
||||
applierSpy.applyOperations.and.returnValue(Promise.resolve({ appliedOps: [] }));
|
||||
|
||||
// Create spy properly before using in TestBed
|
||||
const waitServiceSpy = jasmine.createSpyObj('UserInputWaitStateService', [
|
||||
|
|
|
|||
|
|
@ -153,3 +153,11 @@ export const SLOW_COMPACTION_THRESHOLD_MS = 3000;
|
|||
* Default: 20MB
|
||||
*/
|
||||
export const STATE_SIZE_WARNING_THRESHOLD_MB = 20;
|
||||
|
||||
/**
|
||||
* Timeout for the conflict resolution dialog (milliseconds).
|
||||
* If user doesn't respond within this time, the dialog auto-cancels
|
||||
* to prevent sync from being blocked indefinitely.
|
||||
* Default: 5 minutes
|
||||
*/
|
||||
export const CONFLICT_DIALOG_TIMEOUT_MS = 5 * 60 * 1000;
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ describe('OperationApplierService', () => {
|
|||
it('should dispatch action for operation with no dependencies', async () => {
|
||||
const op = createMockOperation('op-1', 'TASK', OpType.Update, { title: 'Test' });
|
||||
|
||||
await service.applyOperations([op]);
|
||||
const result = await service.applyOperations([op]);
|
||||
|
||||
expect(mockStore.dispatch).toHaveBeenCalledTimes(1);
|
||||
expect(mockStore.dispatch).toHaveBeenCalledWith(
|
||||
|
|
@ -80,6 +80,8 @@ describe('OperationApplierService', () => {
|
|||
}),
|
||||
}),
|
||||
);
|
||||
expect(result.appliedOps).toEqual([op]);
|
||||
expect(result.failedOp).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should dispatch actions for multiple operations in order', async () => {
|
||||
|
|
@ -89,7 +91,7 @@ describe('OperationApplierService', () => {
|
|||
createMockOperation('op-3', 'TASK', OpType.Update, { title: 'Third' }),
|
||||
];
|
||||
|
||||
await service.applyOperations(ops);
|
||||
const result = await service.applyOperations(ops);
|
||||
|
||||
expect(mockStore.dispatch).toHaveBeenCalledTimes(3);
|
||||
|
||||
|
|
@ -97,12 +99,17 @@ describe('OperationApplierService', () => {
|
|||
expect((calls[0].args[0] as any).title).toBe('First');
|
||||
expect((calls[1].args[0] as any).title).toBe('Second');
|
||||
expect((calls[2].args[0] as any).title).toBe('Third');
|
||||
|
||||
expect(result.appliedOps).toEqual(ops);
|
||||
expect(result.failedOp).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should handle empty operations array', async () => {
|
||||
await service.applyOperations([]);
|
||||
const result = await service.applyOperations([]);
|
||||
|
||||
expect(mockStore.dispatch).not.toHaveBeenCalled();
|
||||
expect(result.appliedOps).toEqual([]);
|
||||
expect(result.failedOp).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should call archiveOperationHandler after dispatching', async () => {
|
||||
|
|
@ -115,7 +122,7 @@ describe('OperationApplierService', () => {
|
|||
});
|
||||
|
||||
describe('dependency handling', () => {
|
||||
it('should throw SyncStateCorruptedError for operation with missing hard dependency', async () => {
|
||||
it('should return failed op for operation with missing hard dependency', async () => {
|
||||
const op = createMockOperation('op-1', 'TASK', OpType.Create, {
|
||||
parentId: 'parent-123',
|
||||
});
|
||||
|
|
@ -132,10 +139,12 @@ describe('OperationApplierService', () => {
|
|||
Promise.resolve({ missing: [parentDep] }),
|
||||
);
|
||||
|
||||
await expectAsync(service.applyOperations([op])).toBeRejectedWithError(
|
||||
SyncStateCorruptedError,
|
||||
);
|
||||
const result = await service.applyOperations([op]);
|
||||
|
||||
expect(result.appliedOps).toEqual([]);
|
||||
expect(result.failedOp).toBeDefined();
|
||||
expect(result.failedOp!.op).toBe(op);
|
||||
expect(result.failedOp!.error).toBeInstanceOf(SyncStateCorruptedError);
|
||||
expect(mockStore.dispatch).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
|
|
@ -156,16 +165,14 @@ describe('OperationApplierService', () => {
|
|||
Promise.resolve({ missing: [parentDep] }),
|
||||
);
|
||||
|
||||
try {
|
||||
await service.applyOperations([op]);
|
||||
fail('Expected SyncStateCorruptedError to be thrown');
|
||||
} catch (e) {
|
||||
expect(e).toBeInstanceOf(SyncStateCorruptedError);
|
||||
const error = e as SyncStateCorruptedError;
|
||||
expect(error.context.opId).toBe('op-1');
|
||||
expect(error.context.actionType).toBe('[Test] Action');
|
||||
expect(error.context.missingDependencies).toContain('TASK:parent-123');
|
||||
}
|
||||
const result = await service.applyOperations([op]);
|
||||
|
||||
expect(result.failedOp).toBeDefined();
|
||||
const error = result.failedOp!.error as SyncStateCorruptedError;
|
||||
expect(error).toBeInstanceOf(SyncStateCorruptedError);
|
||||
expect(error.context.opId).toBe('op-1');
|
||||
expect(error.context.actionType).toBe('[Test] Action');
|
||||
expect(error.context.missingDependencies).toContain('TASK:parent-123');
|
||||
});
|
||||
|
||||
it('should dispatch action for operation with missing soft dependency', async () => {
|
||||
|
|
@ -185,13 +192,15 @@ describe('OperationApplierService', () => {
|
|||
Promise.resolve({ missing: [projectDep] }),
|
||||
);
|
||||
|
||||
await service.applyOperations([op]);
|
||||
const result = await service.applyOperations([op]);
|
||||
|
||||
// Soft dependency doesn't block application
|
||||
expect(mockStore.dispatch).toHaveBeenCalledTimes(1);
|
||||
expect(result.appliedOps).toEqual([op]);
|
||||
expect(result.failedOp).toBeUndefined();
|
||||
});
|
||||
|
||||
it('should throw on first operation with missing hard deps in a batch', async () => {
|
||||
it('should return partial success when operation in batch fails (batch apply fix)', async () => {
|
||||
const op1 = createMockOperation('op-1', 'TASK', OpType.Update, { title: 'OK' });
|
||||
const op2 = createMockOperation('op-2', 'TASK', OpType.Create, {
|
||||
parentId: 'missing',
|
||||
|
|
@ -219,13 +228,55 @@ describe('OperationApplierService', () => {
|
|||
},
|
||||
);
|
||||
|
||||
await expectAsync(service.applyOperations([op1, op2, op3])).toBeRejectedWithError(
|
||||
SyncStateCorruptedError,
|
||||
);
|
||||
const result = await service.applyOperations([op1, op2, op3]);
|
||||
|
||||
// First operation should have been dispatched before we hit the error
|
||||
expect(mockStore.dispatch).toHaveBeenCalledTimes(1);
|
||||
expect((mockStore.dispatch.calls.first().args[0] as any).title).toBe('OK');
|
||||
|
||||
// Result should show partial success
|
||||
expect(result.appliedOps).toEqual([op1]);
|
||||
expect(result.failedOp).toBeDefined();
|
||||
expect(result.failedOp!.op).toBe(op2);
|
||||
expect(result.failedOp!.error).toBeInstanceOf(SyncStateCorruptedError);
|
||||
});
|
||||
|
||||
it('should return partial success with all ops that succeeded before failure', async () => {
|
||||
const ops = [
|
||||
createMockOperation('op-1', 'TASK', OpType.Update, { title: 'First OK' }),
|
||||
createMockOperation('op-2', 'TASK', OpType.Update, { title: 'Second OK' }),
|
||||
createMockOperation('op-3', 'TASK', OpType.Create, { parentId: 'missing' }), // fails
|
||||
createMockOperation('op-4', 'TASK', OpType.Update, { title: 'Never' }),
|
||||
createMockOperation('op-5', 'TASK', OpType.Update, { title: 'Never' }),
|
||||
];
|
||||
|
||||
const parentDep: OperationDependency = {
|
||||
entityType: 'TASK',
|
||||
entityId: 'missing',
|
||||
mustExist: true,
|
||||
relation: 'parent',
|
||||
};
|
||||
|
||||
mockDependencyResolver.extractDependencies.and.callFake((op: Operation) => {
|
||||
if (op.id === 'op-3') return [parentDep];
|
||||
return [];
|
||||
});
|
||||
|
||||
mockDependencyResolver.checkDependencies.and.callFake(
|
||||
async (deps: OperationDependency[]) => {
|
||||
if (deps.some((d) => d.entityId === 'missing')) {
|
||||
return { missing: deps };
|
||||
}
|
||||
return { missing: [] };
|
||||
},
|
||||
);
|
||||
|
||||
const result = await service.applyOperations(ops);
|
||||
|
||||
// First two operations should have been dispatched
|
||||
expect(mockStore.dispatch).toHaveBeenCalledTimes(2);
|
||||
expect(result.appliedOps).toEqual([ops[0], ops[1]]);
|
||||
expect(result.failedOp!.op).toBe(ops[2]);
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -8,6 +8,29 @@ import { ArchiveOperationHandler } from './archive-operation-handler.service';
|
|||
import { SyncStateCorruptedError } from '../sync-state-corrupted.error';
|
||||
import { HydrationStateService } from './hydration-state.service';
|
||||
|
||||
/**
|
||||
* Result of applying operations to the NgRx store.
|
||||
*
|
||||
* This allows callers to handle partial success scenarios where some operations
|
||||
* were applied before an error occurred.
|
||||
*/
|
||||
export interface ApplyOperationsResult {
|
||||
/**
|
||||
* Operations that were successfully applied to the NgRx store.
|
||||
* These ops have already been dispatched and should be marked as applied.
|
||||
*/
|
||||
appliedOps: Operation[];
|
||||
|
||||
/**
|
||||
* If an error occurred, this contains the failed operation and the error.
|
||||
* Operations after this one in the batch were NOT applied.
|
||||
*/
|
||||
failedOp?: {
|
||||
op: Operation;
|
||||
error: Error;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Service responsible for applying operations to the local NgRx store.
|
||||
*
|
||||
|
|
@ -43,14 +66,17 @@ export class OperationApplierService {
|
|||
|
||||
/**
|
||||
* Apply operations to the NgRx store.
|
||||
* Operations are applied in order. If any operation has missing hard dependencies,
|
||||
* a SyncStateCorruptedError is thrown immediately.
|
||||
* Operations are applied in order. If any operation fails, the result includes
|
||||
* information about which operations succeeded and which failed.
|
||||
*
|
||||
* @throws SyncStateCorruptedError if any operation has missing hard dependencies
|
||||
* @returns Result containing applied operations and optionally the failed operation.
|
||||
* Callers should:
|
||||
* - Mark `appliedOps` as applied (they've been dispatched to NgRx)
|
||||
* - Mark the failed op and any remaining ops as failed
|
||||
*/
|
||||
async applyOperations(ops: Operation[]): Promise<void> {
|
||||
async applyOperations(ops: Operation[]): Promise<ApplyOperationsResult> {
|
||||
if (ops.length === 0) {
|
||||
return;
|
||||
return { appliedOps: [] };
|
||||
}
|
||||
|
||||
OpLog.normal(
|
||||
|
|
@ -58,17 +84,39 @@ export class OperationApplierService {
|
|||
ops.map((op) => op.id),
|
||||
);
|
||||
|
||||
const appliedOps: Operation[] = [];
|
||||
|
||||
// Mark that we're applying remote operations to suppress selector-based effects
|
||||
this.hydrationState.startApplyingRemoteOps();
|
||||
try {
|
||||
for (const op of ops) {
|
||||
await this._applyOperation(op);
|
||||
try {
|
||||
await this._applyOperation(op);
|
||||
appliedOps.push(op);
|
||||
} catch (e) {
|
||||
// Log the error
|
||||
OpLog.err(
|
||||
`OperationApplierService: Failed to apply operation ${op.id}. ` +
|
||||
`${appliedOps.length} ops were applied before this failure.`,
|
||||
e,
|
||||
);
|
||||
|
||||
// Return partial success result with the failed op
|
||||
return {
|
||||
appliedOps,
|
||||
failedOp: {
|
||||
op,
|
||||
error: e instanceof Error ? e : new Error(String(e)),
|
||||
},
|
||||
};
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
this.hydrationState.endApplyingRemoteOps();
|
||||
}
|
||||
|
||||
OpLog.normal('OperationApplierService: Finished applying operations.');
|
||||
return { appliedOps };
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -143,7 +143,9 @@ describe('OperationLogHydratorService', () => {
|
|||
mockOpLogStore.getFailedRemoteOps.and.returnValue(Promise.resolve([]));
|
||||
mockOpLogStore.markApplied.and.returnValue(Promise.resolve());
|
||||
mockOpLogStore.markFailed.and.returnValue(Promise.resolve());
|
||||
mockOperationApplierService.applyOperations.and.returnValue(Promise.resolve());
|
||||
mockOperationApplierService.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
mockMigrationService.checkAndMigrate.and.returnValue(Promise.resolve());
|
||||
mockSchemaMigrationService.needsMigration.and.returnValue(false);
|
||||
mockSchemaMigrationService.operationNeedsMigration.and.returnValue(false);
|
||||
|
|
@ -1010,7 +1012,9 @@ describe('OperationLogHydratorService', () => {
|
|||
};
|
||||
|
||||
mockOpLogStore.getFailedRemoteOps.and.returnValue(Promise.resolve([failedEntry]));
|
||||
mockOperationApplierService.applyOperations.and.returnValue(Promise.resolve());
|
||||
mockOperationApplierService.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
await service.retryFailedRemoteOps();
|
||||
|
||||
|
|
@ -1040,7 +1044,9 @@ describe('OperationLogHydratorService', () => {
|
|||
});
|
||||
|
||||
mockOpLogStore.getFailedRemoteOps.and.returnValue(Promise.resolve([failedEntry]));
|
||||
mockOperationApplierService.applyOperations.and.returnValue(Promise.resolve());
|
||||
mockOperationApplierService.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
await service.retryFailedRemoteOps();
|
||||
|
||||
|
|
@ -1060,8 +1066,14 @@ describe('OperationLogHydratorService', () => {
|
|||
};
|
||||
|
||||
mockOpLogStore.getFailedRemoteOps.and.returnValue(Promise.resolve([failedEntry]));
|
||||
mockOperationApplierService.applyOperations.and.rejectWith(
|
||||
new Error('Still failing'),
|
||||
mockOperationApplierService.applyOperations.and.returnValue(
|
||||
Promise.resolve({
|
||||
appliedOps: [],
|
||||
failedOp: {
|
||||
op: failedOp,
|
||||
error: new Error('Still failing'),
|
||||
},
|
||||
}),
|
||||
);
|
||||
|
||||
await service.retryFailedRemoteOps();
|
||||
|
|
|
|||
|
|
@ -182,7 +182,11 @@ export class OperationLogHydratorService {
|
|||
// Use OperationApplierService to handle dependency resolution during replay.
|
||||
// This ensures operations referencing deleted entities are handled gracefully
|
||||
// rather than throwing errors (e.g., adding subtask to deleted parent).
|
||||
await this.operationApplierService.applyOperations(opsToReplay);
|
||||
const tailReplayResult =
|
||||
await this.operationApplierService.applyOperations(opsToReplay);
|
||||
if (tailReplayResult.failedOp) {
|
||||
throw tailReplayResult.failedOp.error;
|
||||
}
|
||||
|
||||
// CHECKPOINT C: Validate state after replaying tail operations
|
||||
// Must validate BEFORE saving snapshot to avoid persisting corrupted state
|
||||
|
|
@ -256,7 +260,11 @@ export class OperationLogHydratorService {
|
|||
// Use OperationApplierService to handle dependency resolution during replay.
|
||||
// This ensures operations referencing deleted entities are handled gracefully
|
||||
// rather than throwing errors (e.g., adding subtask to deleted parent).
|
||||
await this.operationApplierService.applyOperations(opsToReplay);
|
||||
const fullReplayResult =
|
||||
await this.operationApplierService.applyOperations(opsToReplay);
|
||||
if (fullReplayResult.failedOp) {
|
||||
throw fullReplayResult.failedOp.error;
|
||||
}
|
||||
|
||||
// CHECKPOINT C: Validate state after replaying all operations
|
||||
// Must validate BEFORE saving snapshot to avoid persisting corrupted state
|
||||
|
|
@ -885,14 +893,17 @@ export class OperationLogHydratorService {
|
|||
const stillFailedOpIds: string[] = [];
|
||||
|
||||
for (const entry of failedOps) {
|
||||
try {
|
||||
await this.operationApplierService.applyOperations([entry.op]);
|
||||
// If we get here without throwing, the operation succeeded
|
||||
appliedOpIds.push(entry.op.id);
|
||||
} catch (e) {
|
||||
const result = await this.operationApplierService.applyOperations([entry.op]);
|
||||
if (result.failedOp) {
|
||||
// SyncStateCorruptedError or any other error means the op still can't be applied
|
||||
OpLog.warn(`OperationLogHydratorService: Failed to retry op ${entry.op.id}`, e);
|
||||
OpLog.warn(
|
||||
`OperationLogHydratorService: Failed to retry op ${entry.op.id}`,
|
||||
result.failedOp.error,
|
||||
);
|
||||
stillFailedOpIds.push(entry.op.id);
|
||||
} else {
|
||||
// Operation succeeded
|
||||
appliedOpIds.push(entry.op.id);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -5,9 +5,13 @@ import { OperationApplierService } from '../processing/operation-applier.service
|
|||
import { OperationLogStoreService } from '../store/operation-log-store.service';
|
||||
import { SnackService } from '../../../snack/snack.service';
|
||||
import { ValidateStateService } from '../processing/validate-state.service';
|
||||
import { of } from 'rxjs';
|
||||
import { of, Subject } from 'rxjs';
|
||||
import { EntityConflict, OpType, Operation } from '../operation.types';
|
||||
import { DialogConflictResolutionComponent } from '../../../../imex/sync/dialog-conflict-resolution/dialog-conflict-resolution.component';
|
||||
import {
|
||||
ConflictResolutionResult,
|
||||
DialogConflictResolutionComponent,
|
||||
} from '../../../../imex/sync/dialog-conflict-resolution/dialog-conflict-resolution.component';
|
||||
import { UserInputWaitStateService } from '../../../../imex/sync/user-input-wait-state.service';
|
||||
|
||||
describe('ConflictResolutionService', () => {
|
||||
let service: ConflictResolutionService;
|
||||
|
|
@ -16,6 +20,7 @@ describe('ConflictResolutionService', () => {
|
|||
let mockOpLogStore: jasmine.SpyObj<OperationLogStoreService>;
|
||||
let mockSnackService: jasmine.SpyObj<SnackService>;
|
||||
let mockValidateStateService: jasmine.SpyObj<ValidateStateService>;
|
||||
let mockUserInputWaitState: jasmine.SpyObj<UserInputWaitStateService>;
|
||||
|
||||
const createMockOp = (id: string, clientId: string): Operation => ({
|
||||
id,
|
||||
|
|
@ -47,6 +52,11 @@ describe('ConflictResolutionService', () => {
|
|||
mockValidateStateService = jasmine.createSpyObj('ValidateStateService', [
|
||||
'validateAndRepairCurrentState',
|
||||
]);
|
||||
mockUserInputWaitState = jasmine.createSpyObj('UserInputWaitStateService', [
|
||||
'startWaiting',
|
||||
]);
|
||||
// startWaiting returns a stop function
|
||||
mockUserInputWaitState.startWaiting.and.returnValue(() => {});
|
||||
|
||||
TestBed.configureTestingModule({
|
||||
providers: [
|
||||
|
|
@ -56,12 +66,13 @@ describe('ConflictResolutionService', () => {
|
|||
{ provide: OperationLogStoreService, useValue: mockOpLogStore },
|
||||
{ provide: SnackService, useValue: mockSnackService },
|
||||
{ provide: ValidateStateService, useValue: mockValidateStateService },
|
||||
{ provide: UserInputWaitStateService, useValue: mockUserInputWaitState },
|
||||
],
|
||||
});
|
||||
service = TestBed.inject(ConflictResolutionService);
|
||||
|
||||
// Default mock behaviors
|
||||
mockOperationApplier.applyOperations.and.resolveTo();
|
||||
mockOperationApplier.applyOperations.and.resolveTo({ appliedOps: [] });
|
||||
mockValidateStateService.validateAndRepairCurrentState.and.resolveTo(true);
|
||||
mockOpLogStore.getUnsyncedByEntity.and.resolveTo(new Map());
|
||||
});
|
||||
|
|
@ -131,6 +142,10 @@ describe('ConflictResolutionService', () => {
|
|||
mockDialog.open.and.returnValue(mockDialogRef);
|
||||
mockOpLogStore.hasOp.and.resolveTo(false);
|
||||
mockOpLogStore.append.and.resolveTo(100);
|
||||
// Need to return the applied ops so markApplied can be called with correct seqs
|
||||
mockOperationApplier.applyOperations.and.resolveTo({
|
||||
appliedOps: conflicts[0].remoteOps,
|
||||
});
|
||||
|
||||
await service.presentConflicts(conflicts);
|
||||
|
||||
|
|
@ -305,10 +320,14 @@ describe('ConflictResolutionService', () => {
|
|||
mockOpLogStore.hasOp.and.resolveTo(false);
|
||||
mockOpLogStore.append.and.resolveTo(100);
|
||||
|
||||
// Simulate application failure by throwing an error
|
||||
mockOperationApplier.applyOperations.and.rejectWith(
|
||||
new Error('Simulated dependency failure'),
|
||||
);
|
||||
// Simulate application failure by returning result with failedOp
|
||||
mockOperationApplier.applyOperations.and.resolveTo({
|
||||
appliedOps: [],
|
||||
failedOp: {
|
||||
op: conflicts[0].remoteOps[0],
|
||||
error: new Error('Simulated dependency failure'),
|
||||
},
|
||||
});
|
||||
|
||||
await service.presentConflicts(conflicts);
|
||||
|
||||
|
|
@ -535,4 +554,130 @@ describe('ConflictResolutionService', () => {
|
|||
expect(mockValidateStateService.validateAndRepairCurrentState).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('dialog timeout', () => {
|
||||
/**
|
||||
* Tests for the conflict dialog timeout feature.
|
||||
* The dialog auto-cancels after CONFLICT_DIALOG_TIMEOUT_MS to prevent
|
||||
* sync from being blocked indefinitely if user walks away.
|
||||
*/
|
||||
|
||||
// Use constant to avoid linter complaints about mixed operators
|
||||
const FIVE_MINUTES_MS = 5 * 60 * 1000;
|
||||
|
||||
// Sample conflicts for timeout tests
|
||||
const localOps = [createMockOp('local-1', 'client-1')];
|
||||
const remoteOps = [createMockOp('remote-1', 'client-2')];
|
||||
const conflicts: EntityConflict[] = [
|
||||
{
|
||||
entityType: 'TASK',
|
||||
entityId: 'task-1',
|
||||
localOps,
|
||||
remoteOps,
|
||||
suggestedResolution: 'manual',
|
||||
},
|
||||
];
|
||||
|
||||
beforeEach(() => {
|
||||
jasmine.clock().install();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
jasmine.clock().uninstall();
|
||||
});
|
||||
|
||||
it('should auto-cancel dialog after timeout', async () => {
|
||||
// Use Subject to control when the dialog closes
|
||||
const afterClosed$ = new Subject<ConflictResolutionResult | undefined>();
|
||||
|
||||
const mockDialogRef = {
|
||||
afterClosed: () => afterClosed$.asObservable(),
|
||||
close: jasmine.createSpy('close').and.callFake(() => {
|
||||
// Simulate dialog close by emitting undefined
|
||||
afterClosed$.next(undefined);
|
||||
afterClosed$.complete();
|
||||
}),
|
||||
} as unknown as MatDialogRef<DialogConflictResolutionComponent>;
|
||||
|
||||
mockDialog.open.and.returnValue(mockDialogRef);
|
||||
|
||||
// Start the presentConflicts call
|
||||
const presentPromise = service.presentConflicts(conflicts);
|
||||
|
||||
// Advance time past the timeout
|
||||
jasmine.clock().tick(FIVE_MINUTES_MS + 100);
|
||||
|
||||
// Wait for the method to complete
|
||||
await presentPromise;
|
||||
|
||||
// Verify dialog was closed
|
||||
expect(mockDialogRef.close).toHaveBeenCalledWith(undefined);
|
||||
|
||||
// Verify snack was shown
|
||||
expect(mockSnackService.open).toHaveBeenCalledWith({
|
||||
type: 'ERROR',
|
||||
msg: jasmine.any(String),
|
||||
});
|
||||
|
||||
// Verify no operations were applied (cancelled)
|
||||
expect(mockOpLogStore.append).not.toHaveBeenCalled();
|
||||
expect(mockOperationApplier.applyOperations).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should clear timeout when dialog closes normally', async () => {
|
||||
const mockDialogRef = {
|
||||
afterClosed: () =>
|
||||
of({
|
||||
resolutions: new Map([[0, 'local']]),
|
||||
conflicts,
|
||||
}),
|
||||
close: jasmine.createSpy('close'),
|
||||
} as unknown as MatDialogRef<DialogConflictResolutionComponent>;
|
||||
|
||||
mockDialog.open.and.returnValue(mockDialogRef);
|
||||
mockOpLogStore.hasOp.and.resolveTo(false);
|
||||
mockOpLogStore.append.and.resolveTo(100);
|
||||
|
||||
await service.presentConflicts(conflicts);
|
||||
|
||||
// Advance time past the timeout (after dialog already closed)
|
||||
jasmine.clock().tick(FIVE_MINUTES_MS + 100);
|
||||
|
||||
// Dialog.close should NOT have been called by the timeout (only if it was called at all)
|
||||
// The key test is that no extra snack warning was shown
|
||||
// Actually we check that snack was NOT called with ERROR type for timeout
|
||||
expect(mockSnackService.open).not.toHaveBeenCalledWith({
|
||||
type: 'ERROR',
|
||||
msg: jasmine.stringMatching(/timeout/i),
|
||||
});
|
||||
});
|
||||
|
||||
it('should not apply any operations when dialog times out', async () => {
|
||||
// Use Subject to control when the dialog closes
|
||||
const afterClosed$ = new Subject<ConflictResolutionResult | undefined>();
|
||||
|
||||
const mockDialogRef = {
|
||||
afterClosed: () => afterClosed$.asObservable(),
|
||||
close: jasmine.createSpy('close').and.callFake(() => {
|
||||
afterClosed$.next(undefined);
|
||||
afterClosed$.complete();
|
||||
}),
|
||||
} as unknown as MatDialogRef<DialogConflictResolutionComponent>;
|
||||
|
||||
mockDialog.open.and.returnValue(mockDialogRef);
|
||||
|
||||
const presentPromise = service.presentConflicts(conflicts);
|
||||
|
||||
// Advance time past timeout
|
||||
jasmine.clock().tick(FIVE_MINUTES_MS + 100);
|
||||
|
||||
await presentPromise;
|
||||
|
||||
// No operations should be stored or applied
|
||||
expect(mockOpLogStore.append).not.toHaveBeenCalled();
|
||||
expect(mockOpLogStore.markApplied).not.toHaveBeenCalled();
|
||||
expect(mockOpLogStore.markRejected).not.toHaveBeenCalled();
|
||||
expect(mockOperationApplier.applyOperations).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -13,7 +13,10 @@ import { firstValueFrom } from 'rxjs';
|
|||
import { SnackService } from '../../../snack/snack.service';
|
||||
import { T } from '../../../../t.const';
|
||||
import { ValidateStateService } from '../processing/validate-state.service';
|
||||
import { MAX_CONFLICT_RETRY_ATTEMPTS } from '../operation-log.const';
|
||||
import {
|
||||
MAX_CONFLICT_RETRY_ATTEMPTS,
|
||||
CONFLICT_DIALOG_TIMEOUT_MS,
|
||||
} from '../operation-log.const';
|
||||
import { UserInputWaitStateService } from '../../../../imex/sync/user-input-wait-state.service';
|
||||
|
||||
/**
|
||||
|
|
@ -93,6 +96,7 @@ export class ConflictResolutionService {
|
|||
// Signal that we're waiting for user input to prevent sync timeout
|
||||
const stopWaiting = this.userInputWaitState.startWaiting('oplog-conflict');
|
||||
let result: ConflictResolutionResult | undefined;
|
||||
let timeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
|
||||
try {
|
||||
this._dialogRef = this.dialog.open(DialogConflictResolutionComponent, {
|
||||
|
|
@ -100,8 +104,28 @@ export class ConflictResolutionService {
|
|||
disableClose: false,
|
||||
});
|
||||
|
||||
// Set up timeout to auto-cancel the dialog
|
||||
// This prevents sync from being blocked indefinitely if user walks away
|
||||
timeoutId = setTimeout(() => {
|
||||
if (this._dialogRef) {
|
||||
OpLog.warn(
|
||||
'ConflictResolutionService: Dialog timeout - auto-cancelling after ' +
|
||||
`${CONFLICT_DIALOG_TIMEOUT_MS / 1000}s`,
|
||||
);
|
||||
this._dialogRef.close(undefined);
|
||||
this.snackService.open({
|
||||
type: 'ERROR',
|
||||
msg: T.F.SYNC.S.CONFLICT_DIALOG_TIMEOUT,
|
||||
});
|
||||
}
|
||||
}, CONFLICT_DIALOG_TIMEOUT_MS);
|
||||
|
||||
result = await firstValueFrom(this._dialogRef.afterClosed());
|
||||
} finally {
|
||||
// Clear timeout on normal close
|
||||
if (timeoutId) {
|
||||
clearTimeout(timeoutId);
|
||||
}
|
||||
stopWaiting();
|
||||
}
|
||||
|
||||
|
|
@ -244,20 +268,40 @@ export class ConflictResolutionService {
|
|||
OpLog.normal(
|
||||
`ConflictResolutionService: Applying ${allOpsToApply.length} ops in single batch`,
|
||||
);
|
||||
try {
|
||||
await this.operationApplier.applyOperations(allOpsToApply);
|
||||
|
||||
// If we get here without throwing, all ops succeeded - mark as applied
|
||||
const successSeqs = allStoredOps.map((o) => o.seq);
|
||||
await this.opLogStore.markApplied(successSeqs);
|
||||
// Map op ID to seq for marking partial success
|
||||
const opIdToSeq = new Map(allStoredOps.map((o) => [o.id, o.seq]));
|
||||
|
||||
const applyResult = await this.operationApplier.applyOperations(allOpsToApply);
|
||||
|
||||
// Mark successfully applied ops
|
||||
const appliedSeqs = applyResult.appliedOps
|
||||
.map((op) => opIdToSeq.get(op.id))
|
||||
.filter((seq): seq is number => seq !== undefined);
|
||||
|
||||
if (appliedSeqs.length > 0) {
|
||||
await this.opLogStore.markApplied(appliedSeqs);
|
||||
OpLog.normal(
|
||||
`ConflictResolutionService: Successfully applied ${allOpsToApply.length} ops`,
|
||||
`ConflictResolutionService: Successfully applied ${appliedSeqs.length} ops`,
|
||||
);
|
||||
} catch (e) {
|
||||
// SyncStateCorruptedError or any other error means ops failed to apply
|
||||
OpLog.err('ConflictResolutionService: Exception applying ops', e);
|
||||
const allOpIds = allStoredOps.map((o) => o.id);
|
||||
await this.opLogStore.markFailed(allOpIds, MAX_CONFLICT_RETRY_ATTEMPTS);
|
||||
}
|
||||
|
||||
// Handle partial failure
|
||||
if (applyResult.failedOp) {
|
||||
// Find all ops that weren't applied (failed op + remaining ops)
|
||||
const failedOpIndex = allOpsToApply.findIndex(
|
||||
(op) => op.id === applyResult.failedOp!.op.id,
|
||||
);
|
||||
const failedOps = allOpsToApply.slice(failedOpIndex);
|
||||
const failedOpIds = failedOps.map((op) => op.id);
|
||||
|
||||
OpLog.err(
|
||||
`ConflictResolutionService: ${applyResult.appliedOps.length} ops applied before failure. ` +
|
||||
`Marking ${failedOpIds.length} ops as failed.`,
|
||||
applyResult.failedOp.error,
|
||||
);
|
||||
await this.opLogStore.markFailed(failedOpIds, MAX_CONFLICT_RETRY_ATTEMPTS);
|
||||
|
||||
this.snackService.open({
|
||||
type: 'ERROR',
|
||||
msg: T.F.SYNC.S.CONFLICT_RESOLUTION_FAILED,
|
||||
|
|
|
|||
|
|
@ -137,6 +137,10 @@ describe('OperationLogSyncService', () => {
|
|||
dependencyResolverSpy.extractDependencies.and.returnValue([]);
|
||||
// Default: no local ops to replay after SYNC_IMPORT
|
||||
opLogStoreSpy.getOpsAfterSeq.and.returnValue(Promise.resolve([]));
|
||||
// Default: successful operation application
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
});
|
||||
|
||||
it('should be created', () => {
|
||||
|
|
@ -1217,7 +1221,9 @@ describe('OperationLogSyncService', () => {
|
|||
opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false));
|
||||
opLogStoreSpy.append.and.returnValue(Promise.resolve(1));
|
||||
opLogStoreSpy.markApplied.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
// Process remote ops with SYNC_IMPORT
|
||||
// Note: _processRemoteOps is private but we can still call it in tests
|
||||
|
|
@ -1240,7 +1246,9 @@ describe('OperationLogSyncService', () => {
|
|||
opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false));
|
||||
opLogStoreSpy.append.and.returnValue(Promise.resolve(1));
|
||||
opLogStoreSpy.markApplied.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
await (service as any)._processRemoteOps([backupImportOp]);
|
||||
|
||||
|
|
@ -1286,7 +1294,9 @@ describe('OperationLogSyncService', () => {
|
|||
opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false));
|
||||
opLogStoreSpy.append.and.returnValue(Promise.resolve(1));
|
||||
opLogStoreSpy.markApplied.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
// Spy on detectConflicts to verify it's NOT called
|
||||
spyOn(service, 'detectConflicts').and.callThrough();
|
||||
|
|
@ -1320,7 +1330,9 @@ describe('OperationLogSyncService', () => {
|
|||
opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false));
|
||||
opLogStoreSpy.append.and.returnValue(Promise.resolve(1));
|
||||
opLogStoreSpy.markApplied.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
await (service as any)._processRemoteOps([syncImportOp, followUpOp]);
|
||||
|
||||
|
|
@ -1350,7 +1362,9 @@ describe('OperationLogSyncService', () => {
|
|||
opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false));
|
||||
opLogStoreSpy.append.and.returnValue(Promise.resolve(1));
|
||||
opLogStoreSpy.markApplied.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
spyOn(service, 'detectConflicts').and.callThrough();
|
||||
|
||||
|
|
@ -1434,7 +1448,9 @@ describe('OperationLogSyncService', () => {
|
|||
opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false));
|
||||
opLogStoreSpy.append.and.returnValue(Promise.resolve(1));
|
||||
opLogStoreSpy.markApplied.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
// Process the SYNC_IMPORT
|
||||
await (service as any)._processRemoteOps([syncImportOp]);
|
||||
|
|
@ -1488,7 +1504,9 @@ describe('OperationLogSyncService', () => {
|
|||
opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false));
|
||||
opLogStoreSpy.append.and.returnValue(Promise.resolve(1));
|
||||
opLogStoreSpy.markApplied.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
await (service as any)._processRemoteOps([syncImportOp]);
|
||||
|
||||
|
|
@ -1531,7 +1549,9 @@ describe('OperationLogSyncService', () => {
|
|||
opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false));
|
||||
opLogStoreSpy.append.and.returnValue(Promise.resolve(1));
|
||||
opLogStoreSpy.markApplied.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
await (service as any)._processRemoteOps([syncImportOp]);
|
||||
|
||||
|
|
@ -1577,7 +1597,9 @@ describe('OperationLogSyncService', () => {
|
|||
opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false));
|
||||
opLogStoreSpy.append.and.returnValue(Promise.resolve(1));
|
||||
opLogStoreSpy.markApplied.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
await (service as any)._processRemoteOps([syncImportOp]);
|
||||
|
||||
|
|
@ -1603,7 +1625,9 @@ describe('OperationLogSyncService', () => {
|
|||
opLogStoreSpy.hasOp.and.returnValue(Promise.resolve(false));
|
||||
opLogStoreSpy.append.and.returnValue(Promise.resolve(1));
|
||||
opLogStoreSpy.markApplied.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(Promise.resolve());
|
||||
operationApplierServiceSpy.applyOperations.and.returnValue(
|
||||
Promise.resolve({ appliedOps: [] }),
|
||||
);
|
||||
|
||||
await (service as any)._processRemoteOps([syncImportOp]);
|
||||
|
||||
|
|
|
|||
|
|
@ -179,12 +179,12 @@ export class OperationLogSyncService {
|
|||
return;
|
||||
}
|
||||
|
||||
// SERVER MIGRATION CHECK: If this client has history but appears to be connecting
|
||||
// to a new/empty server, create a SYNC_IMPORT with full state first.
|
||||
// This handles the scenario where user changes sync server credentials.
|
||||
await this._checkAndHandleServerMigration(syncProvider);
|
||||
|
||||
const result = await this.uploadService.uploadPendingOps(syncProvider);
|
||||
// SERVER MIGRATION CHECK: Passed as callback to execute INSIDE the upload lock.
|
||||
// This prevents race conditions where multiple tabs could both detect migration
|
||||
// and create duplicate SYNC_IMPORT operations.
|
||||
const result = await this.uploadService.uploadPendingOps(syncProvider, {
|
||||
preUploadCallback: () => this._checkAndHandleServerMigration(syncProvider),
|
||||
});
|
||||
|
||||
// STEP 1: Process piggybacked ops FIRST
|
||||
// This is critical: piggybacked ops may contain the "winning" remote versions
|
||||
|
|
@ -740,20 +740,17 @@ export class OperationLogSyncService {
|
|||
* @throws Re-throws if application fails (ops marked as failed first)
|
||||
*/
|
||||
private async _applyNonConflictingOps(ops: Operation[]): Promise<void> {
|
||||
// Track stored seqs for marking as applied after success
|
||||
const storedSeqs: number[] = [];
|
||||
// Map op ID to seq for marking partial success
|
||||
const opIdToSeq = new Map<string, number>();
|
||||
// Track ops that are NOT duplicates (need to be applied)
|
||||
const opsToApply: Operation[] = [];
|
||||
// Track op IDs for error handling
|
||||
const storedOpIds: string[] = [];
|
||||
|
||||
// Store operations with pending status before applying
|
||||
// If we crash after storing but before applying, these will be retried on startup
|
||||
for (const op of ops) {
|
||||
if (!(await this.opLogStore.hasOp(op.id))) {
|
||||
const seq = await this.opLogStore.append(op, 'remote', { pendingApply: true });
|
||||
storedSeqs.push(seq);
|
||||
storedOpIds.push(op.id);
|
||||
opIdToSeq.set(op.id, seq);
|
||||
opsToApply.push(op);
|
||||
} else {
|
||||
OpLog.verbose(`OperationLogSyncService: Skipping duplicate op: ${op.id}`);
|
||||
|
|
@ -762,26 +759,39 @@ export class OperationLogSyncService {
|
|||
|
||||
// Apply only NON-duplicate ops to NgRx store
|
||||
if (opsToApply.length > 0) {
|
||||
try {
|
||||
await this.operationApplier.applyOperations(opsToApply);
|
||||
} catch (e) {
|
||||
// If application fails catastrophically, mark ops as failed to prevent
|
||||
// them from being uploaded in a potentially inconsistent state
|
||||
OpLog.err(
|
||||
`OperationLogSyncService: Failed to apply ${opsToApply.length} ops. Marking as failed.`,
|
||||
e,
|
||||
);
|
||||
await this.opLogStore.markFailed(storedOpIds);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
const result = await this.operationApplier.applyOperations(opsToApply);
|
||||
|
||||
// Mark ops as successfully applied (crash recovery will skip these)
|
||||
if (storedSeqs.length > 0) {
|
||||
await this.opLogStore.markApplied(storedSeqs);
|
||||
OpLog.normal(
|
||||
`OperationLogSyncService: Applied and marked ${storedSeqs.length} remote ops`,
|
||||
);
|
||||
// Mark successfully applied ops
|
||||
const appliedSeqs = result.appliedOps
|
||||
.map((op) => opIdToSeq.get(op.id))
|
||||
.filter((seq): seq is number => seq !== undefined);
|
||||
|
||||
if (appliedSeqs.length > 0) {
|
||||
await this.opLogStore.markApplied(appliedSeqs);
|
||||
OpLog.normal(
|
||||
`OperationLogSyncService: Applied and marked ${appliedSeqs.length} remote ops`,
|
||||
);
|
||||
}
|
||||
|
||||
// Handle partial failure
|
||||
if (result.failedOp) {
|
||||
// Find all ops that weren't applied (failed op + remaining ops)
|
||||
const failedOpIndex = opsToApply.findIndex(
|
||||
(op) => op.id === result.failedOp!.op.id,
|
||||
);
|
||||
const failedOps = opsToApply.slice(failedOpIndex);
|
||||
const failedOpIds = failedOps.map((op) => op.id);
|
||||
|
||||
OpLog.err(
|
||||
`OperationLogSyncService: ${result.appliedOps.length} ops applied before failure. ` +
|
||||
`Marking ${failedOpIds.length} ops as failed.`,
|
||||
result.failedOp.error,
|
||||
);
|
||||
await this.opLogStore.markFailed(failedOpIds);
|
||||
|
||||
// Re-throw if it's a SyncStateCorruptedError, otherwise wrap it
|
||||
throw result.failedOp.error;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -816,5 +816,181 @@ describe('OperationLogUploadService', () => {
|
|||
expect(mockApiProvider.uploadOps).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
|
||||
describe('preUploadCallback (server migration race condition fix)', () => {
|
||||
/**
|
||||
* These tests verify that preUploadCallback is:
|
||||
* 1. Called INSIDE the upload lock
|
||||
* 2. Called BEFORE checking for pending ops
|
||||
*
|
||||
* This fixes a race condition where multiple tabs could both detect
|
||||
* server migration and create duplicate SYNC_IMPORT operations.
|
||||
*/
|
||||
let mockApiProvider: jasmine.SpyObj<
|
||||
SyncProviderServiceInterface<SyncProviderId> & OperationSyncCapable
|
||||
>;
|
||||
let mockFileProvider: jasmine.SpyObj<SyncProviderServiceInterface<SyncProviderId>>;
|
||||
|
||||
beforeEach(() => {
|
||||
mockApiProvider = jasmine.createSpyObj('ApiSyncProvider', [
|
||||
'getLastServerSeq',
|
||||
'uploadOps',
|
||||
'setLastServerSeq',
|
||||
]);
|
||||
(mockApiProvider as any).supportsOperationSync = true;
|
||||
(mockApiProvider as any).privateCfg = {
|
||||
load: jasmine
|
||||
.createSpy('privateCfg.load')
|
||||
.and.returnValue(Promise.resolve(null)),
|
||||
};
|
||||
mockApiProvider.getLastServerSeq.and.returnValue(Promise.resolve(0));
|
||||
mockApiProvider.uploadOps.and.returnValue(
|
||||
Promise.resolve({ results: [], latestSeq: 0, newOps: [] }),
|
||||
);
|
||||
mockApiProvider.setLastServerSeq.and.returnValue(Promise.resolve());
|
||||
|
||||
mockFileProvider = jasmine.createSpyObj('FileSyncProvider', ['uploadFile']);
|
||||
(mockFileProvider as any).supportsOperationSync = false;
|
||||
mockFileProvider.uploadFile.and.returnValue(Promise.resolve({ rev: 'test-rev' }));
|
||||
});
|
||||
|
||||
it('should call preUploadCallback inside the lock for API-based sync', async () => {
|
||||
const callOrder: string[] = [];
|
||||
|
||||
mockLockService.request.and.callFake(
|
||||
async (_name: string, fn: () => Promise<void>) => {
|
||||
callOrder.push('lock-acquired');
|
||||
await fn();
|
||||
callOrder.push('lock-released');
|
||||
},
|
||||
);
|
||||
|
||||
const callback = jasmine.createSpy('preUploadCallback').and.callFake(async () => {
|
||||
callOrder.push('callback-executed');
|
||||
});
|
||||
|
||||
await service.uploadPendingOps(mockApiProvider, { preUploadCallback: callback });
|
||||
|
||||
expect(callback).toHaveBeenCalled();
|
||||
// Verify callback was called INSIDE the lock
|
||||
expect(callOrder).toEqual([
|
||||
'lock-acquired',
|
||||
'callback-executed',
|
||||
'lock-released',
|
||||
]);
|
||||
});
|
||||
|
||||
it('should call preUploadCallback inside the lock for file-based sync', async () => {
|
||||
const callOrder: string[] = [];
|
||||
|
||||
mockLockService.request.and.callFake(
|
||||
async (_name: string, fn: () => Promise<void>) => {
|
||||
callOrder.push('lock-acquired');
|
||||
await fn();
|
||||
callOrder.push('lock-released');
|
||||
},
|
||||
);
|
||||
|
||||
const callback = jasmine.createSpy('preUploadCallback').and.callFake(async () => {
|
||||
callOrder.push('callback-executed');
|
||||
});
|
||||
|
||||
await service.uploadPendingOps(mockFileProvider, { preUploadCallback: callback });
|
||||
|
||||
expect(callback).toHaveBeenCalled();
|
||||
// Verify callback was called INSIDE the lock
|
||||
expect(callOrder).toEqual([
|
||||
'lock-acquired',
|
||||
'callback-executed',
|
||||
'lock-released',
|
||||
]);
|
||||
});
|
||||
|
||||
it('should call preUploadCallback BEFORE checking for pending ops (API)', async () => {
|
||||
const callOrder: string[] = [];
|
||||
|
||||
mockOpLogStore.getUnsynced.and.callFake(async () => {
|
||||
callOrder.push('getUnsynced-called');
|
||||
return [];
|
||||
});
|
||||
|
||||
const callback = jasmine.createSpy('preUploadCallback').and.callFake(async () => {
|
||||
callOrder.push('callback-executed');
|
||||
});
|
||||
|
||||
await service.uploadPendingOps(mockApiProvider, { preUploadCallback: callback });
|
||||
|
||||
// Callback should be called before getUnsynced
|
||||
expect(callOrder).toEqual(['callback-executed', 'getUnsynced-called']);
|
||||
});
|
||||
|
||||
it('should call preUploadCallback BEFORE checking for pending ops (file-based)', async () => {
|
||||
const callOrder: string[] = [];
|
||||
|
||||
mockOpLogStore.getUnsynced.and.callFake(async () => {
|
||||
callOrder.push('getUnsynced-called');
|
||||
return [];
|
||||
});
|
||||
|
||||
const callback = jasmine.createSpy('preUploadCallback').and.callFake(async () => {
|
||||
callOrder.push('callback-executed');
|
||||
});
|
||||
|
||||
await service.uploadPendingOps(mockFileProvider, { preUploadCallback: callback });
|
||||
|
||||
// Callback should be called before getUnsynced
|
||||
expect(callOrder).toEqual(['callback-executed', 'getUnsynced-called']);
|
||||
});
|
||||
|
||||
it('should not call preUploadCallback if not provided', async () => {
|
||||
await service.uploadPendingOps(mockApiProvider);
|
||||
|
||||
// Should complete without error, verifying optional nature
|
||||
expect(mockLockService.request).toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should propagate errors from preUploadCallback', async () => {
|
||||
const callback = jasmine
|
||||
.createSpy('preUploadCallback')
|
||||
.and.rejectWith(new Error('Migration check failed'));
|
||||
|
||||
await expectAsync(
|
||||
service.uploadPendingOps(mockApiProvider, { preUploadCallback: callback }),
|
||||
).toBeRejectedWithError('Migration check failed');
|
||||
|
||||
// Should not proceed to check for pending ops
|
||||
expect(mockOpLogStore.getUnsynced).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('should allow callback to create new operations that get uploaded', async () => {
|
||||
// First call to getUnsynced returns empty (callback hasn't run yet)
|
||||
// After callback runs, we simulate it creating a new op
|
||||
let callCount = 0;
|
||||
mockOpLogStore.getUnsynced.and.callFake(async () => {
|
||||
callCount++;
|
||||
if (callCount === 1) {
|
||||
// After callback ran, return the new op it created
|
||||
return [createMockEntry(1, 'sync-import-op', 'client-1')];
|
||||
}
|
||||
return [];
|
||||
});
|
||||
|
||||
mockApiProvider.uploadOps.and.returnValue(
|
||||
Promise.resolve({
|
||||
results: [{ opId: 'sync-import-op', accepted: true }],
|
||||
latestSeq: 1,
|
||||
newOps: [],
|
||||
}),
|
||||
);
|
||||
|
||||
const callback = jasmine.createSpy('preUploadCallback').and.resolveTo(undefined);
|
||||
|
||||
await service.uploadPendingOps(mockApiProvider, { preUploadCallback: callback });
|
||||
|
||||
// Callback was called, and the op it created was uploaded
|
||||
expect(callback).toHaveBeenCalled();
|
||||
expect(mockApiProvider.uploadOps).toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
|
|||
|
|
@ -44,6 +44,17 @@ export interface UploadResult {
|
|||
rejectedOps: RejectedOpInfo[];
|
||||
}
|
||||
|
||||
/**
|
||||
* Options for uploadPendingOps.
|
||||
*/
|
||||
export interface UploadOptions {
|
||||
/**
|
||||
* Optional callback executed INSIDE the upload lock, BEFORE checking for pending ops.
|
||||
* Use this for operations that must be atomic with the upload, such as server migration checks.
|
||||
*/
|
||||
preUploadCallback?: () => Promise<void>;
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles uploading local pending operations to remote storage.
|
||||
* Supports both API-based sync (for real-time providers) and
|
||||
|
|
@ -60,6 +71,7 @@ export class OperationLogUploadService {
|
|||
|
||||
async uploadPendingOps(
|
||||
syncProvider: SyncProviderServiceInterface<SyncProviderId>,
|
||||
options?: UploadOptions,
|
||||
): Promise<UploadResult> {
|
||||
if (!syncProvider) {
|
||||
OpLog.warn('OperationLogUploadService: No active sync provider passed for upload.');
|
||||
|
|
@ -68,15 +80,16 @@ export class OperationLogUploadService {
|
|||
|
||||
// Use operation sync if supported
|
||||
if (isOperationSyncCapable(syncProvider)) {
|
||||
return this._uploadPendingOpsViaApi(syncProvider);
|
||||
return this._uploadPendingOpsViaApi(syncProvider, options);
|
||||
}
|
||||
|
||||
// Fall back to file-based sync
|
||||
return this._uploadPendingOpsViaFiles(syncProvider);
|
||||
return this._uploadPendingOpsViaFiles(syncProvider, options);
|
||||
}
|
||||
|
||||
private async _uploadPendingOpsViaApi(
|
||||
syncProvider: SyncProviderServiceInterface<SyncProviderId> & OperationSyncCapable,
|
||||
options?: UploadOptions,
|
||||
): Promise<UploadResult> {
|
||||
OpLog.normal('OperationLogUploadService: Uploading pending operations via API...');
|
||||
|
||||
|
|
@ -86,6 +99,12 @@ export class OperationLogUploadService {
|
|||
let rejectedCount = 0;
|
||||
|
||||
await this.lockService.request('sp_op_log_upload', async () => {
|
||||
// Execute pre-upload callback INSIDE the lock, BEFORE checking for pending ops.
|
||||
// This ensures operations like server migration checks are atomic with the upload.
|
||||
if (options?.preUploadCallback) {
|
||||
await options.preUploadCallback();
|
||||
}
|
||||
|
||||
const pendingOps = await this.opLogStore.getUnsynced();
|
||||
|
||||
if (pendingOps.length === 0) {
|
||||
|
|
@ -250,12 +269,18 @@ export class OperationLogUploadService {
|
|||
|
||||
private async _uploadPendingOpsViaFiles(
|
||||
syncProvider: SyncProviderServiceInterface<SyncProviderId>,
|
||||
options?: UploadOptions,
|
||||
): Promise<UploadResult> {
|
||||
OpLog.normal('OperationLogUploadService: Uploading pending operations via files...');
|
||||
|
||||
let uploadedCount = 0;
|
||||
|
||||
await this.lockService.request('sp_op_log_upload', async () => {
|
||||
// Execute pre-upload callback INSIDE the lock, BEFORE checking for pending ops.
|
||||
if (options?.preUploadCallback) {
|
||||
await options.preUploadCallback();
|
||||
}
|
||||
|
||||
const pendingOps = await this.opLogStore.getUnsynced();
|
||||
|
||||
if (pendingOps.length === 0) {
|
||||
|
|
|
|||
|
|
@ -1217,6 +1217,7 @@ const T = {
|
|||
INTEGRITY_CHECK_FAILED: 'F.SYNC.S.INTEGRITY_CHECK_FAILED',
|
||||
COMPACTION_FAILED: 'F.SYNC.S.COMPACTION_FAILED',
|
||||
CONFLICT_RESOLUTION_FAILED: 'F.SYNC.S.CONFLICT_RESOLUTION_FAILED',
|
||||
CONFLICT_DIALOG_TIMEOUT: 'F.SYNC.S.CONFLICT_DIALOG_TIMEOUT',
|
||||
OPERATION_PERMANENTLY_FAILED: 'F.SYNC.S.OPERATION_PERMANENTLY_FAILED',
|
||||
DATA_REPAIRED: 'F.SYNC.S.DATA_REPAIRED',
|
||||
INVALID_OPERATION_PAYLOAD: 'F.SYNC.S.INVALID_OPERATION_PAYLOAD',
|
||||
|
|
|
|||
|
|
@ -1181,6 +1181,7 @@
|
|||
"CLOCK_DRIFT_WARNING": "Your device clock appears to be off by {{minutes}} minutes. This may cause sync issues.",
|
||||
"COMPACTION_FAILED": "Database cleanup failed. App may slow down.",
|
||||
"CONFLICT_RESOLUTION_FAILED": "Sync conflict resolution failed. Please reload.",
|
||||
"CONFLICT_DIALOG_TIMEOUT": "Sync conflict dialog timed out and was cancelled. Sync will retry on next trigger.",
|
||||
"OPERATION_PERMANENTLY_FAILED": "Some sync operations failed and could not be applied. Data may be incomplete.",
|
||||
"DATA_REPAIRED": "Data automatically repaired ({{count}} issues fixed)",
|
||||
"ERROR_CORS": "WebDAV Sync Error: Network request failed.\n\nThis might be a CORS issue. Please ensure:\n• Your WebDAV server allows Cross-Origin requests\n• The server URL is correct and accessible\n• You have a working internet connection",
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue