feat(sync): add shared schema versioning package for frontend/backend

Create @sp/shared-schema package with pure TypeScript migration functions
that work in both Angular frontend and Node.js backend environments.

Package contents:
- schema-version.ts: Version constants (CURRENT=1, MAX_SKIP=3)
- migration.types.ts: OperationLike, SchemaMigration interfaces
- migrate.ts: Pure functions (migrateState, migrateOperation, etc.)
- migrations/index.ts: Empty migrations array (ready for first migration)
- 22 unit tests covering all migration scenarios

Backend changes:
- Add snapshot_schema_version column to user_sync_state table
- Migrate snapshots during generateSnapshot if outdated
- Migrate operations during replayOpsToState if outdated
- Drop operations that return null from migration (removed features)

Frontend changes:
- Refactor SchemaMigrationService to use @sp/shared-schema
- Re-export constants for backwards compatibility
- All 20 existing tests pass

This enables coordinated schema migrations across client and server,
ensuring old snapshots and operations can be upgraded when the state
structure changes.

Rollout strategy: Deploy backend first, then frontend.
This commit is contained in:
Johannes Millan 2025-12-06 11:21:16 +01:00
parent dcdba9e4e5
commit 8dc8207da2
15 changed files with 977 additions and 183 deletions

2
packages/shared-schema/.gitignore vendored Normal file
View file

@ -0,0 +1,2 @@
dist/
node_modules/

View file

@ -0,0 +1,16 @@
{
"name": "@sp/shared-schema",
"version": "1.0.0",
"description": "Shared schema versioning and migrations for Super Productivity sync",
"main": "dist/index.js",
"types": "dist/index.d.ts",
"scripts": {
"build": "tsc",
"test": "vitest run",
"test:watch": "vitest"
},
"devDependencies": {
"typescript": "^5.0.0",
"vitest": "^3.2.4"
}
}

View file

@ -0,0 +1,28 @@
// Schema version constants
export {
CURRENT_SCHEMA_VERSION,
MIN_SUPPORTED_SCHEMA_VERSION,
MAX_VERSION_SKIP,
} from './schema-version';
// Types
export type {
OperationLike,
SchemaMigration,
MigrationResult,
MigratableStateCache,
} from './migration.types';
// Migration functions
export {
migrateState,
migrateOperation,
migrateOperations,
stateNeedsMigration,
operationNeedsMigration,
validateMigrationRegistry,
getCurrentSchemaVersion,
} from './migrate';
// Migration registry (for inspection/debugging)
export { MIGRATIONS } from './migrations';

View file

@ -0,0 +1,266 @@
import { MIGRATIONS } from './migrations';
import { CURRENT_SCHEMA_VERSION, MIN_SUPPORTED_SCHEMA_VERSION } from './schema-version';
import type { MigrationResult, OperationLike, SchemaMigration } from './migration.types';
/**
* Find a migration that transforms from the given version.
*/
function findMigration(fromVersion: number): SchemaMigration | undefined {
return MIGRATIONS.find((m) => m.fromVersion === fromVersion);
}
/**
* Check if a state needs migration.
*/
export function stateNeedsMigration(
schemaVersion: number | undefined,
targetVersion: number = CURRENT_SCHEMA_VERSION,
): boolean {
const version = schemaVersion ?? 1;
return version < targetVersion;
}
/**
* Check if an operation needs migration.
*/
export function operationNeedsMigration(
op: OperationLike,
targetVersion: number = CURRENT_SCHEMA_VERSION,
): boolean {
const version = op.schemaVersion ?? 1;
return version < targetVersion;
}
/**
* Migrate state from sourceVersion to targetVersion.
* Pure function - no side effects.
*
* @param state - The state object to migrate
* @param sourceVersion - Current version of the state
* @param targetVersion - Target version (defaults to CURRENT_SCHEMA_VERSION)
* @returns Migration result with transformed state or error
*/
export function migrateState(
state: unknown,
sourceVersion: number,
targetVersion: number = CURRENT_SCHEMA_VERSION,
): MigrationResult<unknown> {
// Validate source version
if (sourceVersion < MIN_SUPPORTED_SCHEMA_VERSION) {
return {
success: false,
error: `Source version ${sourceVersion} is below minimum supported ${MIN_SUPPORTED_SCHEMA_VERSION}`,
};
}
// Already at or past target version
if (sourceVersion >= targetVersion) {
return { success: true, data: state };
}
let currentState = state;
let version = sourceVersion;
// Apply migrations sequentially
while (version < targetVersion) {
const migration = findMigration(version);
if (!migration) {
return {
success: false,
error: `No migration path from version ${version} to ${version + 1}`,
migratedFromVersion: sourceVersion,
migratedToVersion: version,
};
}
try {
currentState = migration.migrateState(currentState);
version = migration.toVersion;
} catch (err) {
const errorMessage = err instanceof Error ? err.message : String(err);
return {
success: false,
error: `Migration v${migration.fromVersion}->v${migration.toVersion} failed: ${errorMessage}`,
migratedFromVersion: sourceVersion,
migratedToVersion: version,
};
}
}
return {
success: true,
data: currentState,
migratedFromVersion: sourceVersion,
migratedToVersion: version,
};
}
/**
* Migrate a single operation from its version to targetVersion.
* Returns null if the operation should be dropped (e.g., removed feature).
* Pure function - no side effects.
*
* @param op - The operation to migrate
* @param targetVersion - Target version (defaults to CURRENT_SCHEMA_VERSION)
* @returns Migration result with transformed operation, null, or error
*/
export function migrateOperation(
op: OperationLike,
targetVersion: number = CURRENT_SCHEMA_VERSION,
): MigrationResult<OperationLike | null> {
const sourceVersion = op.schemaVersion ?? 1;
// Validate source version
if (sourceVersion < MIN_SUPPORTED_SCHEMA_VERSION) {
return {
success: false,
error: `Operation schema version ${sourceVersion} is below minimum supported ${MIN_SUPPORTED_SCHEMA_VERSION}`,
};
}
// Already at or past target version
if (sourceVersion >= targetVersion) {
return { success: true, data: op };
}
let currentOp: OperationLike | null = { ...op };
let version = sourceVersion;
// Apply migrations sequentially
while (version < targetVersion && currentOp !== null) {
const migration = findMigration(version);
if (!migration) {
return {
success: false,
error: `No migration path from version ${version} to ${version + 1}`,
migratedFromVersion: sourceVersion,
migratedToVersion: version,
};
}
try {
if (migration.migrateOperation) {
currentOp = migration.migrateOperation(currentOp);
if (currentOp !== null) {
// Update version on the migrated operation
currentOp = { ...currentOp, schemaVersion: migration.toVersion };
}
} else {
// No operation migration defined - just update version
currentOp = { ...currentOp, schemaVersion: migration.toVersion };
}
} catch (err) {
const errorMessage = err instanceof Error ? err.message : String(err);
return {
success: false,
error: `Operation migration v${migration.fromVersion}->v${migration.toVersion} failed: ${errorMessage}`,
migratedFromVersion: sourceVersion,
migratedToVersion: version,
};
}
version = migration.toVersion;
}
return {
success: true,
data: currentOp,
migratedFromVersion: sourceVersion,
migratedToVersion: currentOp ? version : sourceVersion,
};
}
/**
* Migrate an array of operations.
* Drops operations that return null from migration.
*
* @param ops - Array of operations to migrate
* @param targetVersion - Target version (defaults to CURRENT_SCHEMA_VERSION)
* @returns Array of migrated operations (dropped operations excluded)
*/
export function migrateOperations(
ops: OperationLike[],
targetVersion: number = CURRENT_SCHEMA_VERSION,
): MigrationResult<OperationLike[]> {
const migrated: OperationLike[] = [];
let droppedCount = 0;
for (const op of ops) {
const result = migrateOperation(op, targetVersion);
if (!result.success) {
return {
success: false,
error: `Failed to migrate operation ${op.id}: ${result.error}`,
};
}
if (result.data !== null && result.data !== undefined) {
migrated.push(result.data);
} else {
droppedCount++;
}
}
return {
success: true,
data: migrated,
migratedFromVersion: ops.length > 0 ? (ops[0].schemaVersion ?? 1) : 1,
migratedToVersion: targetVersion,
};
}
/**
* Validate the migration registry at startup.
* Returns an array of error messages (empty if valid).
*/
export function validateMigrationRegistry(): string[] {
const errors: string[] = [];
for (const migration of MIGRATIONS) {
// Check requiresOperationMigration consistency
if (migration.requiresOperationMigration && !migration.migrateOperation) {
errors.push(
`Migration v${migration.fromVersion}->v${migration.toVersion} declares ` +
`requiresOperationMigration=true but migrateOperation is not defined`,
);
}
// Check version ordering
if (migration.toVersion !== migration.fromVersion + 1) {
errors.push(
`Migration v${migration.fromVersion}->v${migration.toVersion} has invalid version jump ` +
`(expected toVersion = ${migration.fromVersion + 1})`,
);
}
}
// Check for gaps in migration chain
const versions = new Set(MIGRATIONS.map((m) => m.fromVersion));
for (let v = MIN_SUPPORTED_SCHEMA_VERSION; v < CURRENT_SCHEMA_VERSION; v++) {
if (!versions.has(v)) {
errors.push(
`Missing migration from version ${v} to ${v + 1}. ` +
`Current version is ${CURRENT_SCHEMA_VERSION}.`,
);
}
}
// Check for duplicate fromVersions
const seenVersions = new Map<number, number>();
for (const migration of MIGRATIONS) {
const count = seenVersions.get(migration.fromVersion) ?? 0;
seenVersions.set(migration.fromVersion, count + 1);
if (count > 0) {
errors.push(`Duplicate migration for version ${migration.fromVersion}`);
}
}
return errors;
}
/**
* Get the current schema version.
*/
export function getCurrentSchemaVersion(): number {
return CURRENT_SCHEMA_VERSION;
}

View file

@ -0,0 +1,69 @@
/**
* Minimal operation interface for migrations.
* Uses only primitives - no framework dependencies.
*/
export interface OperationLike {
id: string;
opType: string;
entityType: string;
entityId?: string;
entityIds?: string[];
payload: unknown;
schemaVersion: number;
}
/**
* Defines a schema migration from one version to the next.
* Migrations must be pure functions with no external dependencies.
*/
export interface SchemaMigration {
/** Source version this migration applies to */
fromVersion: number;
/** Target version after migration */
toVersion: number;
/** Human-readable description of what changed */
description: string;
/**
* Transform a full state snapshot from fromVersion to toVersion.
* Must be a pure function.
* @param state - The state object (shape depends on fromVersion)
* @returns Transformed state for toVersion
*/
migrateState: (state: unknown) => unknown;
/**
* Transform an individual operation payload.
* Return null to drop the operation entirely (e.g., for removed features).
* Only required for non-additive changes (renames, removals, type changes).
*/
migrateOperation?: (op: OperationLike) => OperationLike | null;
/**
* Explicit declaration that forces migration authors to think about
* whether operations need migration. If true but migrateOperation
* is undefined, validation fails at startup.
*/
requiresOperationMigration: boolean;
}
/**
* Result of a migration operation.
*/
export interface MigrationResult<T> {
success: boolean;
data?: T;
error?: string;
migratedFromVersion?: number;
migratedToVersion?: number;
}
/**
* State cache with schema version for migration tracking.
*/
export interface MigratableStateCache {
state: unknown;
schemaVersion?: number;
}

View file

@ -0,0 +1,30 @@
import type { SchemaMigration } from '../migration.types';
/**
* Registry of all schema migrations.
* Migrations are applied sequentially from the source version.
*
* To add a new migration:
* 1. Increment CURRENT_SCHEMA_VERSION in schema-version.ts
* 2. Create migration file (e.g., v1-to-v2.ts)
* 3. Add to this array
*
* Example migration:
* ```typescript
* {
* fromVersion: 1,
* toVersion: 2,
* description: 'Rename task.estimate to task.timeEstimate',
* requiresOperationMigration: true,
* migrateState: (state) => {
* // Transform state structure
* },
* migrateOperation: (op) => {
* // Transform operation payload, or return null to drop
* },
* }
* ```
*/
export const MIGRATIONS: SchemaMigration[] = [
// No migrations yet - schema version 1 is the initial version
];

View file

@ -0,0 +1,18 @@
/**
* Current schema version for all operations and state snapshots.
* Increment this BEFORE adding a new migration.
*/
export const CURRENT_SCHEMA_VERSION = 1;
/**
* Minimum schema version that this codebase can still handle.
* Operations below this version cannot be processed.
*/
export const MIN_SUPPORTED_SCHEMA_VERSION = 1;
/**
* Maximum version difference we tolerate before forcing an app update.
* If remote data is more than MAX_VERSION_SKIP versions ahead,
* the user must update their app.
*/
export const MAX_VERSION_SKIP = 3;

View file

@ -0,0 +1,331 @@
import { describe, it, expect, beforeEach, vi } from 'vitest';
import {
migrateState,
migrateOperation,
migrateOperations,
stateNeedsMigration,
operationNeedsMigration,
validateMigrationRegistry,
getCurrentSchemaVersion,
} from '../src/migrate';
import {
CURRENT_SCHEMA_VERSION,
MIN_SUPPORTED_SCHEMA_VERSION,
} from '../src/schema-version';
import type { OperationLike, SchemaMigration } from '../src/migration.types';
import { MIGRATIONS } from '../src/migrations';
describe('shared-schema migration functions', () => {
describe('getCurrentSchemaVersion', () => {
it('returns the current schema version', () => {
expect(getCurrentSchemaVersion()).toBe(CURRENT_SCHEMA_VERSION);
});
});
describe('stateNeedsMigration', () => {
it('returns false when version equals target', () => {
expect(stateNeedsMigration(CURRENT_SCHEMA_VERSION)).toBe(false);
});
it('returns false when version exceeds target', () => {
expect(stateNeedsMigration(CURRENT_SCHEMA_VERSION + 1)).toBe(false);
});
it('returns true when version is below target', () => {
expect(
stateNeedsMigration(CURRENT_SCHEMA_VERSION - 1, CURRENT_SCHEMA_VERSION),
).toBe(true);
});
it('treats undefined version as 1', () => {
expect(stateNeedsMigration(undefined, 2)).toBe(true);
expect(stateNeedsMigration(undefined, 1)).toBe(false);
});
});
describe('operationNeedsMigration', () => {
it('returns false when operation version equals target', () => {
const op: OperationLike = {
id: 'op1',
opType: 'UPD',
entityType: 'TASK',
payload: {},
schemaVersion: CURRENT_SCHEMA_VERSION,
};
expect(operationNeedsMigration(op)).toBe(false);
});
it('returns true when operation version is below target', () => {
const op: OperationLike = {
id: 'op1',
opType: 'UPD',
entityType: 'TASK',
payload: {},
schemaVersion: CURRENT_SCHEMA_VERSION - 1,
};
expect(operationNeedsMigration(op, CURRENT_SCHEMA_VERSION)).toBe(true);
});
it('treats undefined schemaVersion as 1', () => {
const op = {
id: 'op1',
opType: 'UPD',
entityType: 'TASK',
payload: {},
} as OperationLike;
expect(operationNeedsMigration(op, 2)).toBe(true);
expect(operationNeedsMigration(op, 1)).toBe(false);
});
});
describe('migrateState', () => {
it('returns unchanged state when already at target version', () => {
const state = { task: { entities: { t1: { title: 'Test' } } } };
const result = migrateState(state, CURRENT_SCHEMA_VERSION);
expect(result.success).toBe(true);
expect(result.data).toEqual(state);
});
it('returns unchanged state when source exceeds target', () => {
const state = { task: {} };
const result = migrateState(
state,
CURRENT_SCHEMA_VERSION + 1,
CURRENT_SCHEMA_VERSION,
);
expect(result.success).toBe(true);
expect(result.data).toEqual(state);
});
it('fails for version below minimum supported', () => {
const result = migrateState({}, MIN_SUPPORTED_SCHEMA_VERSION - 1);
expect(result.success).toBe(false);
expect(result.error).toContain('below minimum supported');
});
it('fails when migration path is missing', () => {
// This test only makes sense when CURRENT_SCHEMA_VERSION > 1 and no migrations exist
if (CURRENT_SCHEMA_VERSION > 1 && MIGRATIONS.length === 0) {
const result = migrateState({}, 1, 2);
expect(result.success).toBe(false);
expect(result.error).toContain('No migration path');
}
});
});
describe('migrateOperation', () => {
it('returns unchanged operation when already at target version', () => {
const op: OperationLike = {
id: 'op1',
opType: 'UPD',
entityType: 'TASK',
payload: { changes: { title: 'New' } },
schemaVersion: CURRENT_SCHEMA_VERSION,
};
const result = migrateOperation(op);
expect(result.success).toBe(true);
expect(result.data).toEqual(op);
});
it('fails for version below minimum supported', () => {
const op: OperationLike = {
id: 'op1',
opType: 'UPD',
entityType: 'TASK',
payload: {},
schemaVersion: MIN_SUPPORTED_SCHEMA_VERSION - 1,
};
const result = migrateOperation(op);
expect(result.success).toBe(false);
expect(result.error).toContain('below minimum supported');
});
});
describe('migrateOperations', () => {
it('returns empty array for empty input', () => {
const result = migrateOperations([]);
expect(result.success).toBe(true);
expect(result.data).toEqual([]);
});
it('returns unchanged operations when already at target version', () => {
const ops: OperationLike[] = [
{
id: 'op1',
opType: 'CRT',
entityType: 'TASK',
entityId: 't1',
payload: { title: 'Task 1' },
schemaVersion: CURRENT_SCHEMA_VERSION,
},
{
id: 'op2',
opType: 'UPD',
entityType: 'TASK',
entityId: 't1',
payload: { changes: { done: true } },
schemaVersion: CURRENT_SCHEMA_VERSION,
},
];
const result = migrateOperations(ops);
expect(result.success).toBe(true);
expect(result.data).toHaveLength(2);
expect(result.data).toEqual(ops);
});
});
describe('validateMigrationRegistry', () => {
it('returns empty array when no migrations and version is 1', () => {
// Only valid if CURRENT_SCHEMA_VERSION is 1
if (CURRENT_SCHEMA_VERSION === 1) {
const errors = validateMigrationRegistry();
expect(errors).toEqual([]);
}
});
it('returns errors when CURRENT_SCHEMA_VERSION > 1 but no migrations', () => {
// This is a consistency check for when we add migrations
if (CURRENT_SCHEMA_VERSION > 1 && MIGRATIONS.length === 0) {
const errors = validateMigrationRegistry();
expect(errors.length).toBeGreaterThan(0);
expect(errors[0]).toContain('Missing migration');
}
});
});
});
describe('migration with mock migrations', () => {
// These tests use a mock migration to verify the migration logic works correctly
// In real usage, migrations are defined in the MIGRATIONS array
describe('state migration chain', () => {
it('applies migration correctly', () => {
// Create a test migration locally to verify the mechanism works
const testMigration: SchemaMigration = {
fromVersion: 1,
toVersion: 2,
description: 'Test migration',
requiresOperationMigration: false,
migrateState: (state: unknown) => {
const s = state as Record<string, unknown>;
return { ...s, migrated: true };
},
};
// Manually apply to verify the pattern
const state = { data: 'test' };
const migrated = testMigration.migrateState(state);
expect(migrated).toEqual({ data: 'test', migrated: true });
});
});
describe('operation migration', () => {
it('migrateOperation can return null to drop operations', () => {
const testMigration: SchemaMigration = {
fromVersion: 1,
toVersion: 2,
description: 'Drop old feature operations',
requiresOperationMigration: true,
migrateState: (state) => state,
migrateOperation: (op) => {
if (op.entityType === 'OLD_FEATURE') {
return null; // Drop operations for removed feature
}
return op;
},
};
const opToKeep: OperationLike = {
id: 'op1',
opType: 'UPD',
entityType: 'TASK',
payload: {},
schemaVersion: 1,
};
const opToDrop: OperationLike = {
id: 'op2',
opType: 'UPD',
entityType: 'OLD_FEATURE',
payload: {},
schemaVersion: 1,
};
expect(testMigration.migrateOperation!(opToKeep)).toEqual(opToKeep);
expect(testMigration.migrateOperation!(opToDrop)).toBeNull();
});
it('migrateOperation can transform payloads', () => {
const testMigration: SchemaMigration = {
fromVersion: 1,
toVersion: 2,
description: 'Rename estimate to timeEstimate',
requiresOperationMigration: true,
migrateState: (state) => state,
migrateOperation: (op) => {
if (op.entityType === 'TASK' && op.opType === 'UPD') {
const payload = op.payload as Record<string, unknown>;
if (payload.changes && typeof payload.changes === 'object') {
const changes = payload.changes as Record<string, unknown>;
if ('estimate' in changes) {
const { estimate, ...rest } = changes;
return {
...op,
payload: {
...payload,
changes: { ...rest, timeEstimate: estimate },
},
};
}
}
}
return op;
},
};
const op: OperationLike = {
id: 'op1',
opType: 'UPD',
entityType: 'TASK',
entityId: 't1',
payload: { changes: { estimate: 3600, title: 'Test' } },
schemaVersion: 1,
};
const migrated = testMigration.migrateOperation!(op);
expect(migrated).not.toBeNull();
expect((migrated!.payload as Record<string, unknown>).changes).toEqual({
timeEstimate: 3600,
title: 'Test',
});
});
});
describe('validation', () => {
it('detects requiresOperationMigration without migrateOperation', () => {
const invalidMigration: SchemaMigration = {
fromVersion: 1,
toVersion: 2,
description: 'Invalid migration',
requiresOperationMigration: true, // Says it needs op migration
migrateState: (state) => state,
// But migrateOperation is missing!
};
// Check the validation logic directly
const hasOpMigration = !!invalidMigration.migrateOperation;
const declaresRequired = invalidMigration.requiresOperationMigration;
expect(declaresRequired && !hasOpMigration).toBe(true);
});
});
});

View file

@ -0,0 +1,21 @@
{
"compilerOptions": {
"target": "ES2022",
"module": "ESNext",
"moduleResolution": "bundler",
"lib": ["ES2022"],
"declaration": true,
"declarationMap": true,
"sourceMap": true,
"outDir": "./dist",
"rootDir": "./src",
"strict": true,
"esModuleInterop": true,
"skipLibCheck": true,
"forceConsistentCasingInFileNames": true,
"resolveJsonModule": true,
"isolatedModules": true
},
"include": ["src/**/*"],
"exclude": ["node_modules", "dist", "tests"]
}

View file

@ -0,0 +1,9 @@
import { defineConfig } from 'vitest/config';
export default defineConfig({
test: {
globals: true,
environment: 'node',
include: ['tests/**/*.spec.ts'],
},
});

View file

@ -12,6 +12,7 @@
"clear-data": "ts-node scripts/clear-data.ts"
},
"dependencies": {
"@sp/shared-schema": "*",
"@fastify/cors": "^11.1.0",
"@fastify/helmet": "^13.0.2",
"@fastify/rate-limit": "^10.3.0",

View file

@ -40,6 +40,7 @@ export interface DbUserSyncState {
last_snapshot_seq: number | null;
snapshot_data: Buffer | null;
snapshot_at: number | null;
snapshot_schema_version: number | null;
}
export interface DbSyncDevice {
@ -213,6 +214,18 @@ export const initDb = (dataDir: string, inMemory = false): void => {
db.exec('ALTER TABLE users ADD COLUMN token_version INTEGER DEFAULT 0');
}
// Migration: Add schema version tracking for snapshots
const syncStateColumns = db.pragma('table_info(user_sync_state)') as { name: string }[];
const hasSnapshotSchemaVersion = syncStateColumns.some(
(col) => col.name === 'snapshot_schema_version',
);
if (!hasSnapshotSchemaVersion) {
Logger.info('Migrating database: adding snapshot_schema_version column');
db.exec(
'ALTER TABLE user_sync_state ADD COLUMN snapshot_schema_version INTEGER DEFAULT 1',
);
}
Logger.info(`Database initialized at ${dbPath}`);
};

View file

@ -17,6 +17,14 @@ import {
SyncErrorCode,
} from './sync.types';
import { Logger } from '../logger';
import {
CURRENT_SCHEMA_VERSION,
migrateState,
migrateOperation,
stateNeedsMigration,
validateMigrationRegistry,
type OperationLike,
} from '@sp/shared-schema';
/**
* Valid entity types for operations.
@ -164,13 +172,13 @@ export class SyncService {
`),
getCachedSnapshot: this.db.prepare(`
SELECT snapshot_data, last_snapshot_seq, snapshot_at
SELECT snapshot_data, last_snapshot_seq, snapshot_at, snapshot_schema_version
FROM user_sync_state WHERE user_id = ?
`),
cacheSnapshot: this.db.prepare(`
UPDATE user_sync_state
SET snapshot_data = ?, last_snapshot_seq = ?, snapshot_at = ?
SET snapshot_data = ?, last_snapshot_seq = ?, snapshot_at = ?, snapshot_schema_version = ?
WHERE user_id = ?
`),
@ -668,12 +676,18 @@ export class SyncService {
getCachedSnapshot(
userId: number,
): { state: unknown; serverSeq: number; generatedAt: number } | null {
): {
state: unknown;
serverSeq: number;
generatedAt: number;
schemaVersion: number;
} | null {
const row = this.stmts.getCachedSnapshot.get(userId) as
| {
snapshot_data: Buffer | null;
last_snapshot_seq: number | null;
snapshot_at: number | null;
snapshot_schema_version: number | null;
}
| undefined;
@ -685,6 +699,7 @@ export class SyncService {
state: JSON.parse(decompressed),
serverSeq: row.last_snapshot_seq ?? 0,
generatedAt: row.snapshot_at ?? 0,
schemaVersion: row.snapshot_schema_version ?? 1,
};
}
@ -702,13 +717,21 @@ export class SyncService {
return;
}
this.stmts.cacheSnapshot.run(compressed, serverSeq, now, userId);
// Store with current schema version
this.stmts.cacheSnapshot.run(
compressed,
serverSeq,
now,
CURRENT_SCHEMA_VERSION,
userId,
);
}
generateSnapshot(userId: number): {
state: unknown;
serverSeq: number;
generatedAt: number;
schemaVersion: number;
} {
// Wrap in transaction for snapshot isolation - prevents race conditions
// where new ops arrive between reading latestSeq and processing ops
@ -716,23 +739,50 @@ export class SyncService {
const latestSeq = this.getLatestSeq(userId);
let state: Record<string, unknown> = {};
let startSeq = 0;
let snapshotSchemaVersion = CURRENT_SCHEMA_VERSION;
// Try to get cached snapshot to build upon (Incremental Snapshot)
const cached = this.getCachedSnapshot(userId);
if (cached) {
state = cached.state as Record<string, unknown>;
startSeq = cached.serverSeq;
snapshotSchemaVersion = cached.schemaVersion;
}
// If we are already up to date, return cached
if (startSeq >= latestSeq && cached) {
// If we are already up to date AND at current schema version, return cached
if (
startSeq >= latestSeq &&
cached &&
snapshotSchemaVersion === CURRENT_SCHEMA_VERSION
) {
return {
state: cached.state,
serverSeq: cached.serverSeq,
generatedAt: Date.now(), // Refresh timestamp
schemaVersion: CURRENT_SCHEMA_VERSION,
};
}
// Migrate snapshot if it's from an older schema version
if (stateNeedsMigration(snapshotSchemaVersion, CURRENT_SCHEMA_VERSION)) {
Logger.info(
`[user:${userId}] Migrating snapshot from v${snapshotSchemaVersion} to v${CURRENT_SCHEMA_VERSION}`,
);
const migrationResult = migrateState(
state,
snapshotSchemaVersion,
CURRENT_SCHEMA_VERSION,
);
if (!migrationResult.success) {
Logger.error(
`[user:${userId}] Snapshot migration failed: ${migrationResult.error}`,
);
throw new Error(`Snapshot migration failed: ${migrationResult.error}`);
}
state = migrationResult.data as Record<string, unknown>;
snapshotSchemaVersion = CURRENT_SCHEMA_VERSION;
}
// Safety check: prevent memory exhaustion for excessive operation counts
const totalOpsToProcess = latestSeq - startSeq;
if (totalOpsToProcess > MAX_OPS_FOR_SNAPSHOT) {
@ -760,7 +810,7 @@ export class SyncService {
if (batchOps.length === 0) break;
// Replay this batch
// Replay this batch (operations are migrated during replay)
state = this.replayOpsToState(batchOps, state);
// Update currentSeq to the last processed operation
@ -779,7 +829,12 @@ export class SyncService {
// Cache the new snapshot
this.cacheSnapshot(userId, state, latestSeq);
return { state, serverSeq: latestSeq, generatedAt };
return {
state,
serverSeq: latestSeq,
generatedAt,
schemaVersion: CURRENT_SCHEMA_VERSION,
};
});
return tx();
@ -795,9 +850,44 @@ export class SyncService {
// Apply operations in order
for (const row of ops) {
const opType = row.op_type;
const entityType = row.entity_type;
const entityId = row.entity_id;
let opType = row.op_type;
let entityType = row.entity_type;
let entityId = row.entity_id;
let payload = JSON.parse(row.payload);
// Migrate operation if it's from an older schema version
const opSchemaVersion = row.schema_version ?? 1;
if (opSchemaVersion < CURRENT_SCHEMA_VERSION) {
const opLike: OperationLike = {
id: row.id,
opType,
entityType,
entityId: entityId ?? undefined,
payload,
schemaVersion: opSchemaVersion,
};
const migrationResult = migrateOperation(opLike, CURRENT_SCHEMA_VERSION);
if (!migrationResult.success) {
Logger.warn(
`Operation migration failed for ${row.id}: ${migrationResult.error}. Skipping.`,
);
continue;
}
const migratedOp = migrationResult.data;
if (migratedOp === null || migratedOp === undefined) {
// Operation was dropped during migration (e.g., removed feature)
Logger.info(`Operation ${row.id} dropped during migration (feature removed)`);
continue;
}
// Use migrated values
opType = migratedOp.opType;
entityType = migratedOp.entityType;
entityId = migratedOp.entityId ?? null;
payload = migratedOp.payload;
}
// Skip operations with invalid entity types (defensive)
if (!ALLOWED_ENTITY_TYPES.has(entityType)) {
@ -805,8 +895,6 @@ export class SyncService {
continue;
}
const payload = JSON.parse(row.payload);
// Initialize entity type if needed
if (!state[entityType]) {
state[entityType] = {};

View file

@ -1,13 +1,25 @@
import { Injectable } from '@angular/core';
import { Operation, VectorClock } from '../operation.types';
import { OpLog } from '../../../log';
import {
CURRENT_SCHEMA_VERSION as SHARED_CURRENT_SCHEMA_VERSION,
MAX_VERSION_SKIP as SHARED_MAX_VERSION_SKIP,
MIGRATIONS,
migrateState,
migrateOperation as sharedMigrateOperation,
stateNeedsMigration,
operationNeedsMigration as sharedOperationNeedsMigration,
validateMigrationRegistry,
type SchemaMigration,
type OperationLike,
} from '@sp/shared-schema';
/**
* Current schema version for the operation log state cache.
* Increment this when making breaking changes to the state structure.
*/
export const CURRENT_SCHEMA_VERSION = 1;
export const MAX_VERSION_SKIP = 5;
// Re-export shared constants for backwards compatibility
export const CURRENT_SCHEMA_VERSION = SHARED_CURRENT_SCHEMA_VERSION;
export const MAX_VERSION_SKIP = SHARED_MAX_VERSION_SKIP;
// Re-export types
export type { SchemaMigration };
/**
* Interface for state cache that may need migration.
@ -20,94 +32,13 @@ export interface MigratableStateCache {
schemaVersion?: number; // Optional for backward compatibility with old caches
}
/**
* Interface for schema migrations (A.7.15 Unified State and Operation Migrations).
* Each migration transforms state from one version to the next, and optionally
* transforms individual operations for tail ops replay and conflict detection.
*
* @see docs/ai/sync/operation-log-architecture.md A.7.15
*/
export interface SchemaMigration {
fromVersion: number;
toVersion: number;
description: string;
/** Required: transform full state snapshot */
migrateState: (state: unknown) => unknown;
/**
* Optional: transform individual operation.
* Return null to drop the operation entirely (e.g., for removed features).
* Only needed for non-additive changes (renames, removals, type changes).
*/
migrateOperation?: (op: Operation) => Operation | null;
/**
* Explicit declaration forces author to think about operation migration.
* If true but migrateOperation is undefined, startup validation fails.
*/
requiresOperationMigration: boolean;
}
/**
* Registry of all schema migrations.
* Add new migrations here when the state structure changes.
*
* NOTE: This is for Operation Log schema migrations (Post-v10 / Post-OpLog).
* For legacy migrations (upgrading from older versions of the app), see:
* `src/app/pfapi/migrate/cross-model-migrations.ts`
*
* Example migration (additive change - no operation migration needed):
* ```typescript
* {
* fromVersion: 1,
* toVersion: 2,
* description: 'Add priority field to tasks',
* requiresOperationMigration: false,
* migrateState: (state) => {
* const s = state as any;
* if (!s.task?.entities) return state;
* const entities = Object.fromEntries(
* Object.entries(s.task.entities).map(([id, task]: [string, any]) => [
* id,
* { ...task, priority: task.priority ?? 'NORMAL' },
* ])
* );
* return { ...s, task: { ...s.task, entities } };
* },
* },
* ```
*
* Example migration (field rename - operation migration required):
* ```typescript
* {
* fromVersion: 2,
* toVersion: 3,
* description: 'Rename task.estimate to task.timeEstimate',
* requiresOperationMigration: true,
* migrateState: (state) => { ... },
* migrateOperation: (op) => {
* if (op.entityType !== 'TASK' || op.opType !== 'UPD') return op;
* const changes = (op.payload as any)?.changes;
* if (!changes?.estimate) return op;
* return {
* ...op,
* schemaVersion: 3,
* payload: { ...op.payload, changes: { ...changes, timeEstimate: changes.estimate, estimate: undefined } },
* };
* },
* },
* ```
*/
const MIGRATIONS: SchemaMigration[] = [
// No migrations yet - schema version 1 is the initial version
// Add migrations here as needed
];
/**
* Service responsible for migrating state cache snapshots and operations
* between schema versions.
*
* This is an Angular wrapper around the shared schema migration functions
* from @sp/shared-schema package.
*
* When the application's state structure changes (e.g., new fields, renamed properties),
* migrations ensure old snapshots and operations can be upgraded to work with new code.
*
@ -139,14 +70,11 @@ export class SchemaMigrationService {
* have a migrateOperation function defined.
*/
private _validateMigrationRegistry(): void {
for (const migration of MIGRATIONS) {
if (migration.requiresOperationMigration && !migration.migrateOperation) {
throw new Error(
`SchemaMigrationService: Migration v${migration.fromVersion}→v${migration.toVersion} ` +
`declares requiresOperationMigration=true but migrateOperation is not defined. ` +
`Either implement migrateOperation or set requiresOperationMigration=false.`,
);
}
const errors = validateMigrationRegistry();
if (errors.length > 0) {
throw new Error(
`SchemaMigrationService: Invalid migration registry:\n${errors.join('\n')}`,
);
}
}
@ -170,44 +98,19 @@ export class SchemaMigrationService {
`SchemaMigrationService: Migrating state from v${currentVersion} to v${CURRENT_SCHEMA_VERSION}`,
);
let { state } = cache;
let version = currentVersion;
const result = migrateState(cache.state, currentVersion, CURRENT_SCHEMA_VERSION);
// Run migrations sequentially
while (version < CURRENT_SCHEMA_VERSION) {
const migration = MIGRATIONS.find((m) => m.fromVersion === version);
if (!migration) {
throw new Error(
`SchemaMigrationService: No migration path from version ${version}. ` +
`Current version is ${CURRENT_SCHEMA_VERSION}.`,
);
}
OpLog.normal(
`SchemaMigrationService: Running state migration v${migration.fromVersion} → v${migration.toVersion}: ${migration.description}`,
);
try {
state = migration.migrateState(state);
version = migration.toVersion;
} catch (e) {
OpLog.err(
`SchemaMigrationService: State migration failed at v${migration.fromVersion} → v${migration.toVersion}`,
e,
);
throw new Error(
`Schema state migration failed: ${migration.description}. ` +
`Error: ${e instanceof Error ? e.message : String(e)}`,
);
}
if (!result.success) {
throw new Error(`SchemaMigrationService: ${result.error}`);
}
OpLog.normal(`SchemaMigrationService: State migration complete. Now at v${version}`);
OpLog.normal(
`SchemaMigrationService: State migration complete. Now at v${CURRENT_SCHEMA_VERSION}`,
);
return {
...cache,
state,
state: result.data,
schemaVersion: CURRENT_SCHEMA_VERSION,
};
}
@ -233,45 +136,36 @@ export class SchemaMigrationService {
return op;
}
let migratedOp: Operation | null = { ...op };
let version = opVersion;
// Convert to OperationLike for the shared function
const opLike: OperationLike = {
id: op.id,
opType: op.opType,
entityType: op.entityType,
entityId: op.entityId,
entityIds: op.entityIds,
payload: op.payload,
schemaVersion: opVersion,
};
while (version < CURRENT_SCHEMA_VERSION && migratedOp !== null) {
const migration = MIGRATIONS.find((m) => m.fromVersion === version);
const result = sharedMigrateOperation(opLike, CURRENT_SCHEMA_VERSION);
if (!migration) {
throw new Error(
`SchemaMigrationService: No migration path from version ${version}. ` +
`Current version is ${CURRENT_SCHEMA_VERSION}.`,
);
}
if (migration.migrateOperation) {
try {
migratedOp = migration.migrateOperation(migratedOp);
if (migratedOp !== null) {
// Update schema version on the operation
migratedOp = { ...migratedOp, schemaVersion: migration.toVersion };
}
} catch (e) {
OpLog.err(
`SchemaMigrationService: Operation migration failed at v${migration.fromVersion} → v${migration.toVersion}`,
e,
);
throw new Error(
`Schema operation migration failed: ${migration.description}. ` +
`Error: ${e instanceof Error ? e.message : String(e)}`,
);
}
} else {
// No operation migration defined - just update version
migratedOp = { ...migratedOp, schemaVersion: migration.toVersion };
}
version = migration.toVersion;
if (!result.success) {
throw new Error(`SchemaMigrationService: ${result.error}`);
}
return migratedOp;
if (result.data === null || result.data === undefined) {
return null;
}
// Merge migrated fields back into the original operation
return {
...op,
opType: result.data.opType as Operation['opType'],
entityType: result.data.entityType as Operation['entityType'],
entityId: result.data.entityId,
payload: result.data.payload,
schemaVersion: result.data.schemaVersion,
};
}
/**
@ -301,16 +195,23 @@ export class SchemaMigrationService {
* Returns true if the cache needs migration.
*/
needsMigration(cache: MigratableStateCache): boolean {
const currentVersion = cache.schemaVersion ?? 1;
return currentVersion < CURRENT_SCHEMA_VERSION;
return stateNeedsMigration(cache.schemaVersion, CURRENT_SCHEMA_VERSION);
}
/**
* Returns true if the operation needs migration.
*/
operationNeedsMigration(op: Operation): boolean {
const opVersion = op.schemaVersion ?? 1;
return opVersion < CURRENT_SCHEMA_VERSION;
return sharedOperationNeedsMigration(
{
id: op.id,
opType: op.opType,
entityType: op.entityType,
payload: op.payload,
schemaVersion: op.schemaVersion ?? 1,
},
CURRENT_SCHEMA_VERSION,
);
}
/**

View file

@ -23,7 +23,8 @@
"typeRoots": ["node_modules/@types"],
"lib": ["es2022", "dom"],
"paths": {
"@super-productivity/plugin-api": ["packages/plugin-api/src/index.ts"]
"@super-productivity/plugin-api": ["packages/plugin-api/src/index.ts"],
"@sp/shared-schema": ["packages/shared-schema/src/index.ts"]
},
"plugins": [
{