feat: update migration functions to support splitting operations into multiple results

This commit is contained in:
Ivan Kalashnikov 2026-01-19 13:58:27 +07:00
parent b3da4e4850
commit 263495b8cd
5 changed files with 199 additions and 90 deletions

View file

@ -98,16 +98,17 @@ export function migrateState(
/**
* Migrate a single operation from its version to targetVersion.
* Returns null if the operation should be dropped (e.g., removed feature).
* Returns an array if the operation should be split into multiple operations.
* Pure function - no side effects.
*
* @param op - The operation to migrate
* @param targetVersion - Target version (defaults to CURRENT_SCHEMA_VERSION)
* @returns Migration result with transformed operation, null, or error
* @returns Migration result with transformed operation(s), null, or error
*/
export function migrateOperation(
op: OperationLike,
targetVersion: number = CURRENT_SCHEMA_VERSION,
): MigrationResult<OperationLike | null> {
): MigrationResult<OperationLike | OperationLike[] | null> {
const sourceVersion = op.schemaVersion ?? 1;
// Validate source version
@ -123,11 +124,12 @@ export function migrateOperation(
return { success: true, data: op };
}
let currentOp: OperationLike | null = { ...op };
// Start with an array containing the single operation
let currentOps: OperationLike[] = [{ ...op }];
let version = sourceVersion;
// Apply migrations sequentially
while (version < targetVersion && currentOp !== null) {
while (version < targetVersion && currentOps.length > 0) {
const migration = findMigration(version);
if (!migration) {
return {
@ -139,18 +141,30 @@ export function migrateOperation(
}
try {
if (migration.migrateOperation) {
currentOp = migration.migrateOperation(currentOp);
if (currentOp !== null) {
// Update version on the migrated operation
currentOp = { ...currentOp, schemaVersion: migration.toVersion };
const nextOps: OperationLike[] = [];
for (const currentOp of currentOps) {
if (migration.migrateOperation) {
const result = migration.migrateOperation(currentOp);
if (result === null) {
// Operation dropped, don't add to nextOps
continue;
} else if (Array.isArray(result)) {
// Operation split into multiple
for (const r of result) {
nextOps.push({ ...r, schemaVersion: migration.toVersion });
}
} else {
// Single operation returned
nextOps.push({ ...result, schemaVersion: migration.toVersion });
}
} else {
// No operation migration defined - just update version
nextOps.push({ ...currentOp, schemaVersion: migration.toVersion });
}
} else {
// No operation migration defined - just update version
currentOp = { ...currentOp, schemaVersion: migration.toVersion };
}
// Track version even if operation was dropped (null)
// This ensures migratedToVersion reflects where we actually stopped
currentOps = nextOps;
version = migration.toVersion;
} catch (err) {
const errorMessage = err instanceof Error ? err.message : String(err);
@ -163,17 +177,35 @@ export function migrateOperation(
}
}
return {
success: true,
data: currentOp,
migratedFromVersion: sourceVersion,
migratedToVersion: version,
};
// Return based on the number of resulting operations
if (currentOps.length === 0) {
return {
success: true,
data: null,
migratedFromVersion: sourceVersion,
migratedToVersion: version,
};
} else if (currentOps.length === 1) {
return {
success: true,
data: currentOps[0],
migratedFromVersion: sourceVersion,
migratedToVersion: version,
};
} else {
return {
success: true,
data: currentOps,
migratedFromVersion: sourceVersion,
migratedToVersion: version,
};
}
}
/**
* Migrate an array of operations.
* Drops operations that return null from migration.
* Handles operations that are split into multiple operations.
*
* @param ops - Array of operations to migrate
* @param targetVersion - Target version (defaults to CURRENT_SCHEMA_VERSION)
@ -195,10 +227,14 @@ export function migrateOperations(
};
}
if (result.data !== null && result.data !== undefined) {
migrated.push(result.data);
} else {
if (result.data === null || result.data === undefined) {
droppedCount++;
} else if (Array.isArray(result.data)) {
// Operation was split into multiple operations
migrated.push(...result.data);
} else {
// Single operation
migrated.push(result.data);
}
}

View file

@ -50,9 +50,11 @@ export interface SchemaMigration {
/**
* Transform an individual operation payload.
* Return null to drop the operation entirely (e.g., for removed features).
* Return an array to split one operation into multiple (e.g., when moving
* settings from one config section to another).
* Only required for non-additive changes (renames, removals, type changes).
*/
migrateOperation?: (op: OperationLike) => OperationLike | null;
migrateOperation?: (op: OperationLike) => OperationLike | OperationLike[] | null;
/**
* Explicit declaration that forces migration authors to think about

View file

@ -537,6 +537,15 @@ export class SnapshotService {
let payload = row.payload;
const opSchemaVersion = row.schemaVersion ?? 1;
// Prepare list of operations to process (may be expanded by migration)
let opsToProcess: Array<{
opType: string;
entityType: string;
entityId: string | null;
payload: unknown;
}> = [{ opType, entityType, entityId, payload }];
if (opSchemaVersion < CURRENT_SCHEMA_VERSION) {
const opLike: OperationLike = {
id: row.id,
@ -554,71 +563,109 @@ export class SnapshotService {
const migratedOp = migrationResult.data;
if (!migratedOp) continue;
opType = migratedOp.opType as Operation['opType'];
entityType = migratedOp.entityType;
entityId = migratedOp.entityId ?? null;
payload = migratedOp.payload as any;
}
// Handle full-state operations BEFORE entity type check
// These operations replace the entire state and don't use a specific entity type
if (opType === 'SYNC_IMPORT' || opType === 'BACKUP_IMPORT' || opType === 'REPAIR') {
if (payload && typeof payload === 'object' && 'appDataComplete' in payload) {
Object.assign(state, (payload as { appDataComplete: unknown }).appDataComplete);
// Handle array result (operation was split into multiple)
if (Array.isArray(migratedOp)) {
opsToProcess = migratedOp.map((op) => ({
opType: op.opType,
entityType: op.entityType,
entityId: op.entityId ?? null,
payload: op.payload,
}));
} else {
Object.assign(state, payload);
opsToProcess = [
{
opType: migratedOp.opType,
entityType: migratedOp.entityType,
entityId: migratedOp.entityId ?? null,
payload: migratedOp.payload,
},
];
}
continue;
}
if (!ALLOWED_ENTITY_TYPES.has(entityType)) continue;
// Process all operations (original or migrated)
for (const opToProcess of opsToProcess) {
const {
opType: processOpType,
entityType: processEntityType,
entityId: processEntityId,
payload: processPayload,
} = opToProcess;
if (!state[entityType]) {
state[entityType] = {};
}
// Handle full-state operations BEFORE entity type check
// These operations replace the entire state and don't use a specific entity type
if (
processOpType === 'SYNC_IMPORT' ||
processOpType === 'BACKUP_IMPORT' ||
processOpType === 'REPAIR'
) {
if (
processPayload &&
typeof processPayload === 'object' &&
'appDataComplete' in processPayload
) {
Object.assign(
state,
(processPayload as { appDataComplete: unknown }).appDataComplete,
);
} else {
Object.assign(state, processPayload);
}
continue;
}
switch (opType) {
case 'CRT':
case 'UPD':
if (entityId) {
state[entityType][entityId] = {
...(state[entityType][entityId] as Record<string, unknown>),
...(payload as Record<string, unknown>),
};
}
break;
case 'DEL':
if (entityId) {
delete state[entityType][entityId];
}
break;
case 'MOV':
if (entityId && payload) {
state[entityType][entityId] = {
...(state[entityType][entityId] as Record<string, unknown>),
...(payload as Record<string, unknown>),
};
}
break;
case 'BATCH':
if (payload && typeof payload === 'object') {
const batchPayload = payload as Record<string, unknown>;
if (batchPayload.entities && typeof batchPayload.entities === 'object') {
const entities = batchPayload.entities as Record<string, unknown>;
for (const [id, entity] of Object.entries(entities)) {
state[entityType][id] = {
...(state[entityType][id] as Record<string, unknown>),
...(entity as Record<string, unknown>),
};
}
} else if (entityId) {
state[entityType][entityId] = {
...(state[entityType][entityId] as Record<string, unknown>),
...batchPayload,
if (!ALLOWED_ENTITY_TYPES.has(processEntityType)) continue;
if (!state[processEntityType]) {
state[processEntityType] = {};
}
switch (processOpType) {
case 'CRT':
case 'UPD':
if (processEntityId) {
state[processEntityType][processEntityId] = {
...(state[processEntityType][processEntityId] as Record<string, unknown>),
...(processPayload as Record<string, unknown>),
};
}
}
break;
break;
case 'DEL':
if (processEntityId) {
delete state[processEntityType][processEntityId];
}
break;
case 'MOV':
if (processEntityId && processPayload) {
state[processEntityType][processEntityId] = {
...(state[processEntityType][processEntityId] as Record<string, unknown>),
...(processPayload as Record<string, unknown>),
};
}
break;
case 'BATCH':
if (processPayload && typeof processPayload === 'object') {
const batchPayload = processPayload as Record<string, unknown>;
if (batchPayload.entities && typeof batchPayload.entities === 'object') {
const entities = batchPayload.entities as Record<string, unknown>;
for (const [id, entity] of Object.entries(entities)) {
state[processEntityType][id] = {
...(state[processEntityType][id] as Record<string, unknown>),
...(entity as Record<string, unknown>),
};
}
} else if (processEntityId) {
state[processEntityType][processEntityId] = {
...(state[processEntityType][processEntityId] as Record<
string,
unknown
>),
...batchPayload,
};
}
}
break;
}
}
}
return state;

View file

@ -127,11 +127,12 @@ export class SchemaMigrationService {
/**
* Migrates a single operation to the current schema version if needed.
* Returns null if the operation should be dropped (e.g., for removed features).
* Returns an array if the operation should be split into multiple operations.
*
* @param op - The operation to migrate
* @returns The migrated operation, or null if it should be dropped
* @returns The migrated operation(s), or null if it should be dropped
*/
migrateOperation(op: Operation): Operation | null {
migrateOperation(op: Operation): Operation | Operation[] | null {
const opVersion = op.schemaVersion ?? 1;
if (opVersion >= CURRENT_SCHEMA_VERSION) {
@ -159,6 +160,19 @@ export class SchemaMigrationService {
return null;
}
// Handle array result (operation was split into multiple)
if (Array.isArray(result.data)) {
return result.data.map((migratedOpLike) => ({
...op,
opType: migratedOpLike.opType as Operation['opType'],
entityType: migratedOpLike.entityType as Operation['entityType'],
entityId: migratedOpLike.entityId,
entityIds: migratedOpLike.entityIds,
payload: migratedOpLike.payload,
schemaVersion: migratedOpLike.schemaVersion,
}));
}
// Merge migrated fields back into the original operation
return {
...op,
@ -172,6 +186,7 @@ export class SchemaMigrationService {
/**
* Migrates an array of operations, filtering out any that should be dropped.
* Handles operations that are split into multiple operations.
*
* @param ops - The operations to migrate
* @returns Array of migrated operations (dropped operations excluded)
@ -180,13 +195,19 @@ export class SchemaMigrationService {
const migrated: Operation[] = [];
for (const op of ops) {
const migratedOp = this.migrateOperation(op);
if (migratedOp !== null) {
migrated.push(migratedOp);
} else {
const migratedResult = this.migrateOperation(op);
if (migratedResult === null) {
OpLog.normal(
`SchemaMigrationService: Dropped operation ${op.id} (${op.actionType}) during migration`,
);
} else if (Array.isArray(migratedResult)) {
// Operation was split into multiple operations
migrated.push(...migratedResult);
OpLog.normal(
`SchemaMigrationService: Split operation ${op.id} into ${migratedResult.length} operations during migration`,
);
} else {
migrated.push(migratedResult);
}
}

View file

@ -130,9 +130,7 @@ export class RemoteOpsProcessingService {
try {
const migrated = this.schemaMigrationService.migrateOperation(op);
if (migrated) {
migratedOps.push(migrated);
} else {
if (migrated === null) {
// Track dropped entity IDs for dependency warning
if (op.entityId) {
droppedEntityIds.add(op.entityId);
@ -143,6 +141,11 @@ export class RemoteOpsProcessingService {
OpLog.verbose(
`RemoteOpsProcessingService: Dropped op ${op.id} (migrated to null)`,
);
} else if (Array.isArray(migrated)) {
// Operation was split into multiple operations
migratedOps.push(...migrated);
} else {
migratedOps.push(migrated);
}
} catch (e) {
OpLog.err(`RemoteOpsProcessingService: Migration failed for op ${op.id}`, e);