diff --git a/packages/shared-schema/src/migrate.ts b/packages/shared-schema/src/migrate.ts index f62eac318..8cb7a9b91 100644 --- a/packages/shared-schema/src/migrate.ts +++ b/packages/shared-schema/src/migrate.ts @@ -98,16 +98,17 @@ export function migrateState( /** * Migrate a single operation from its version to targetVersion. * Returns null if the operation should be dropped (e.g., removed feature). + * Returns an array if the operation should be split into multiple operations. * Pure function - no side effects. * * @param op - The operation to migrate * @param targetVersion - Target version (defaults to CURRENT_SCHEMA_VERSION) - * @returns Migration result with transformed operation, null, or error + * @returns Migration result with transformed operation(s), null, or error */ export function migrateOperation( op: OperationLike, targetVersion: number = CURRENT_SCHEMA_VERSION, -): MigrationResult { +): MigrationResult { const sourceVersion = op.schemaVersion ?? 1; // Validate source version @@ -123,11 +124,12 @@ export function migrateOperation( return { success: true, data: op }; } - let currentOp: OperationLike | null = { ...op }; + // Start with an array containing the single operation + let currentOps: OperationLike[] = [{ ...op }]; let version = sourceVersion; // Apply migrations sequentially - while (version < targetVersion && currentOp !== null) { + while (version < targetVersion && currentOps.length > 0) { const migration = findMigration(version); if (!migration) { return { @@ -139,18 +141,30 @@ export function migrateOperation( } try { - if (migration.migrateOperation) { - currentOp = migration.migrateOperation(currentOp); - if (currentOp !== null) { - // Update version on the migrated operation - currentOp = { ...currentOp, schemaVersion: migration.toVersion }; + const nextOps: OperationLike[] = []; + + for (const currentOp of currentOps) { + if (migration.migrateOperation) { + const result = migration.migrateOperation(currentOp); + if (result === null) { + // Operation dropped, don't add to nextOps + continue; + } else if (Array.isArray(result)) { + // Operation split into multiple + for (const r of result) { + nextOps.push({ ...r, schemaVersion: migration.toVersion }); + } + } else { + // Single operation returned + nextOps.push({ ...result, schemaVersion: migration.toVersion }); + } + } else { + // No operation migration defined - just update version + nextOps.push({ ...currentOp, schemaVersion: migration.toVersion }); } - } else { - // No operation migration defined - just update version - currentOp = { ...currentOp, schemaVersion: migration.toVersion }; } - // Track version even if operation was dropped (null) - // This ensures migratedToVersion reflects where we actually stopped + + currentOps = nextOps; version = migration.toVersion; } catch (err) { const errorMessage = err instanceof Error ? err.message : String(err); @@ -163,17 +177,35 @@ export function migrateOperation( } } - return { - success: true, - data: currentOp, - migratedFromVersion: sourceVersion, - migratedToVersion: version, - }; + // Return based on the number of resulting operations + if (currentOps.length === 0) { + return { + success: true, + data: null, + migratedFromVersion: sourceVersion, + migratedToVersion: version, + }; + } else if (currentOps.length === 1) { + return { + success: true, + data: currentOps[0], + migratedFromVersion: sourceVersion, + migratedToVersion: version, + }; + } else { + return { + success: true, + data: currentOps, + migratedFromVersion: sourceVersion, + migratedToVersion: version, + }; + } } /** * Migrate an array of operations. * Drops operations that return null from migration. + * Handles operations that are split into multiple operations. * * @param ops - Array of operations to migrate * @param targetVersion - Target version (defaults to CURRENT_SCHEMA_VERSION) @@ -195,10 +227,14 @@ export function migrateOperations( }; } - if (result.data !== null && result.data !== undefined) { - migrated.push(result.data); - } else { + if (result.data === null || result.data === undefined) { droppedCount++; + } else if (Array.isArray(result.data)) { + // Operation was split into multiple operations + migrated.push(...result.data); + } else { + // Single operation + migrated.push(result.data); } } diff --git a/packages/shared-schema/src/migration.types.ts b/packages/shared-schema/src/migration.types.ts index 3beebc26c..0c1bf7138 100644 --- a/packages/shared-schema/src/migration.types.ts +++ b/packages/shared-schema/src/migration.types.ts @@ -50,9 +50,11 @@ export interface SchemaMigration { /** * Transform an individual operation payload. * Return null to drop the operation entirely (e.g., for removed features). + * Return an array to split one operation into multiple (e.g., when moving + * settings from one config section to another). * Only required for non-additive changes (renames, removals, type changes). */ - migrateOperation?: (op: OperationLike) => OperationLike | null; + migrateOperation?: (op: OperationLike) => OperationLike | OperationLike[] | null; /** * Explicit declaration that forces migration authors to think about diff --git a/packages/super-sync-server/src/sync/services/snapshot.service.ts b/packages/super-sync-server/src/sync/services/snapshot.service.ts index b4d1ecf1b..5eae8d1ef 100644 --- a/packages/super-sync-server/src/sync/services/snapshot.service.ts +++ b/packages/super-sync-server/src/sync/services/snapshot.service.ts @@ -537,6 +537,15 @@ export class SnapshotService { let payload = row.payload; const opSchemaVersion = row.schemaVersion ?? 1; + + // Prepare list of operations to process (may be expanded by migration) + let opsToProcess: Array<{ + opType: string; + entityType: string; + entityId: string | null; + payload: unknown; + }> = [{ opType, entityType, entityId, payload }]; + if (opSchemaVersion < CURRENT_SCHEMA_VERSION) { const opLike: OperationLike = { id: row.id, @@ -554,71 +563,109 @@ export class SnapshotService { const migratedOp = migrationResult.data; if (!migratedOp) continue; - opType = migratedOp.opType as Operation['opType']; - entityType = migratedOp.entityType; - entityId = migratedOp.entityId ?? null; - payload = migratedOp.payload as any; - } - - // Handle full-state operations BEFORE entity type check - // These operations replace the entire state and don't use a specific entity type - if (opType === 'SYNC_IMPORT' || opType === 'BACKUP_IMPORT' || opType === 'REPAIR') { - if (payload && typeof payload === 'object' && 'appDataComplete' in payload) { - Object.assign(state, (payload as { appDataComplete: unknown }).appDataComplete); + // Handle array result (operation was split into multiple) + if (Array.isArray(migratedOp)) { + opsToProcess = migratedOp.map((op) => ({ + opType: op.opType, + entityType: op.entityType, + entityId: op.entityId ?? null, + payload: op.payload, + })); } else { - Object.assign(state, payload); + opsToProcess = [ + { + opType: migratedOp.opType, + entityType: migratedOp.entityType, + entityId: migratedOp.entityId ?? null, + payload: migratedOp.payload, + }, + ]; } - continue; } - if (!ALLOWED_ENTITY_TYPES.has(entityType)) continue; + // Process all operations (original or migrated) + for (const opToProcess of opsToProcess) { + const { + opType: processOpType, + entityType: processEntityType, + entityId: processEntityId, + payload: processPayload, + } = opToProcess; - if (!state[entityType]) { - state[entityType] = {}; - } + // Handle full-state operations BEFORE entity type check + // These operations replace the entire state and don't use a specific entity type + if ( + processOpType === 'SYNC_IMPORT' || + processOpType === 'BACKUP_IMPORT' || + processOpType === 'REPAIR' + ) { + if ( + processPayload && + typeof processPayload === 'object' && + 'appDataComplete' in processPayload + ) { + Object.assign( + state, + (processPayload as { appDataComplete: unknown }).appDataComplete, + ); + } else { + Object.assign(state, processPayload); + } + continue; + } - switch (opType) { - case 'CRT': - case 'UPD': - if (entityId) { - state[entityType][entityId] = { - ...(state[entityType][entityId] as Record), - ...(payload as Record), - }; - } - break; - case 'DEL': - if (entityId) { - delete state[entityType][entityId]; - } - break; - case 'MOV': - if (entityId && payload) { - state[entityType][entityId] = { - ...(state[entityType][entityId] as Record), - ...(payload as Record), - }; - } - break; - case 'BATCH': - if (payload && typeof payload === 'object') { - const batchPayload = payload as Record; - if (batchPayload.entities && typeof batchPayload.entities === 'object') { - const entities = batchPayload.entities as Record; - for (const [id, entity] of Object.entries(entities)) { - state[entityType][id] = { - ...(state[entityType][id] as Record), - ...(entity as Record), - }; - } - } else if (entityId) { - state[entityType][entityId] = { - ...(state[entityType][entityId] as Record), - ...batchPayload, + if (!ALLOWED_ENTITY_TYPES.has(processEntityType)) continue; + + if (!state[processEntityType]) { + state[processEntityType] = {}; + } + + switch (processOpType) { + case 'CRT': + case 'UPD': + if (processEntityId) { + state[processEntityType][processEntityId] = { + ...(state[processEntityType][processEntityId] as Record), + ...(processPayload as Record), }; } - } - break; + break; + case 'DEL': + if (processEntityId) { + delete state[processEntityType][processEntityId]; + } + break; + case 'MOV': + if (processEntityId && processPayload) { + state[processEntityType][processEntityId] = { + ...(state[processEntityType][processEntityId] as Record), + ...(processPayload as Record), + }; + } + break; + case 'BATCH': + if (processPayload && typeof processPayload === 'object') { + const batchPayload = processPayload as Record; + if (batchPayload.entities && typeof batchPayload.entities === 'object') { + const entities = batchPayload.entities as Record; + for (const [id, entity] of Object.entries(entities)) { + state[processEntityType][id] = { + ...(state[processEntityType][id] as Record), + ...(entity as Record), + }; + } + } else if (processEntityId) { + state[processEntityType][processEntityId] = { + ...(state[processEntityType][processEntityId] as Record< + string, + unknown + >), + ...batchPayload, + }; + } + } + break; + } } } return state; diff --git a/src/app/op-log/persistence/schema-migration.service.ts b/src/app/op-log/persistence/schema-migration.service.ts index dc38d1136..f3a1ebdcb 100644 --- a/src/app/op-log/persistence/schema-migration.service.ts +++ b/src/app/op-log/persistence/schema-migration.service.ts @@ -127,11 +127,12 @@ export class SchemaMigrationService { /** * Migrates a single operation to the current schema version if needed. * Returns null if the operation should be dropped (e.g., for removed features). + * Returns an array if the operation should be split into multiple operations. * * @param op - The operation to migrate - * @returns The migrated operation, or null if it should be dropped + * @returns The migrated operation(s), or null if it should be dropped */ - migrateOperation(op: Operation): Operation | null { + migrateOperation(op: Operation): Operation | Operation[] | null { const opVersion = op.schemaVersion ?? 1; if (opVersion >= CURRENT_SCHEMA_VERSION) { @@ -159,6 +160,19 @@ export class SchemaMigrationService { return null; } + // Handle array result (operation was split into multiple) + if (Array.isArray(result.data)) { + return result.data.map((migratedOpLike) => ({ + ...op, + opType: migratedOpLike.opType as Operation['opType'], + entityType: migratedOpLike.entityType as Operation['entityType'], + entityId: migratedOpLike.entityId, + entityIds: migratedOpLike.entityIds, + payload: migratedOpLike.payload, + schemaVersion: migratedOpLike.schemaVersion, + })); + } + // Merge migrated fields back into the original operation return { ...op, @@ -172,6 +186,7 @@ export class SchemaMigrationService { /** * Migrates an array of operations, filtering out any that should be dropped. + * Handles operations that are split into multiple operations. * * @param ops - The operations to migrate * @returns Array of migrated operations (dropped operations excluded) @@ -180,13 +195,19 @@ export class SchemaMigrationService { const migrated: Operation[] = []; for (const op of ops) { - const migratedOp = this.migrateOperation(op); - if (migratedOp !== null) { - migrated.push(migratedOp); - } else { + const migratedResult = this.migrateOperation(op); + if (migratedResult === null) { OpLog.normal( `SchemaMigrationService: Dropped operation ${op.id} (${op.actionType}) during migration`, ); + } else if (Array.isArray(migratedResult)) { + // Operation was split into multiple operations + migrated.push(...migratedResult); + OpLog.normal( + `SchemaMigrationService: Split operation ${op.id} into ${migratedResult.length} operations during migration`, + ); + } else { + migrated.push(migratedResult); } } diff --git a/src/app/op-log/sync/remote-ops-processing.service.ts b/src/app/op-log/sync/remote-ops-processing.service.ts index 67d2c271c..b580054b9 100644 --- a/src/app/op-log/sync/remote-ops-processing.service.ts +++ b/src/app/op-log/sync/remote-ops-processing.service.ts @@ -130,9 +130,7 @@ export class RemoteOpsProcessingService { try { const migrated = this.schemaMigrationService.migrateOperation(op); - if (migrated) { - migratedOps.push(migrated); - } else { + if (migrated === null) { // Track dropped entity IDs for dependency warning if (op.entityId) { droppedEntityIds.add(op.entityId); @@ -143,6 +141,11 @@ export class RemoteOpsProcessingService { OpLog.verbose( `RemoteOpsProcessingService: Dropped op ${op.id} (migrated to null)`, ); + } else if (Array.isArray(migrated)) { + // Operation was split into multiple operations + migratedOps.push(...migrated); + } else { + migratedOps.push(migrated); } } catch (e) { OpLog.err(`RemoteOpsProcessingService: Migration failed for op ${op.id}`, e);