diff --git a/.changeset/cluster-support.md b/.changeset/cluster-support.md new file mode 100644 index 0000000..8c31510 --- /dev/null +++ b/.changeset/cluster-support.md @@ -0,0 +1,6 @@ +--- +"chkit": patch +"@chkit/core": patch +--- + +Add ClickHouse cluster support. Set `clickhouse.cluster` in your config to run all generated DDL `ON CLUSTER ` and store the migration journal in a replicated engine — for self-managed multi-node clusters. Your table engines are passed through unchanged (declare `ReplicatedMergeTree` yourself). Leave `cluster` unset for single-node, ClickHouse Cloud, or ObsessionDB, where replication is automatic and `ON CLUSTER` is unnecessary. diff --git a/apps/docs/src/content/docs/configuration/overview.md b/apps/docs/src/content/docs/configuration/overview.md index a8993c9..0edef5c 100644 --- a/apps/docs/src/content/docs/configuration/overview.md +++ b/apps/docs/src/content/docs/configuration/overview.md @@ -37,6 +37,28 @@ export default defineConfig({ }) ``` +## Cluster mode (`ON CLUSTER`) + +For self-managed multi-node ClickHouse clusters, set `clickhouse.cluster` to the cluster name from your server's `remote_servers` config: + +```ts +clickhouse: { + url: process.env.CLICKHOUSE_URL ?? 'http://localhost:9000', + password: process.env.CLICKHOUSE_PASSWORD ?? '', + database: 'default', + cluster: 'my_cluster', +}, +``` + +When `cluster` is set, chkit: + +- emits every generated DDL statement with an `ON CLUSTER ` clause, so `generate` bakes it into the migration files and `migrate` propagates each change to all nodes via ClickHouse's distributed DDL queue, and +- creates its migration journal (`_chkit_migrations`) as a `ReplicatedReplacingMergeTree`, keeping applied-migration history consistent across every node — so running `migrate` against a load-balanced endpoint never re-applies migrations. + +chkit does **not** rewrite your table engines: declare `ReplicatedMergeTree` (or another `Replicated*`/`Shared*` variant) yourself for tables whose data should replicate. Empty-argument engines (e.g. `ENGINE = ReplicatedMergeTree`) are recommended so the server's `default_replica_path` supplies a collision-free Keeper path, which also keeps drop-and-recreate safe. + +Leave `cluster` unset for single-node servers, ClickHouse Cloud, or ObsessionDB, where replication is automatic (SharedMergeTree) and `ON CLUSTER` is unnecessary. The value accepts an identifier (`my_cluster`) or a macro (`{cluster}`) when your nodes define one. + ## User profile config fallback Project-scoped commands (`generate`, `migrate`, `status`, `drift`, `check`, `codegen`, `pull`) always require a project config in the working directory. diff --git a/package.json b/package.json index 081398e..5131bf2 100644 --- a/package.json +++ b/package.json @@ -26,6 +26,11 @@ "test": "turbo run test", "test:turbo": "turbo run test", "test:env": "doppler run --project chkit --config ci -- bun run test", + "cluster:up": "docker compose -f test/cluster/docker-compose.yml up -d --wait", + "cluster:down": "docker compose -f test/cluster/docker-compose.yml down -v", + "cluster:logs": "docker compose -f test/cluster/docker-compose.yml logs -f", + "cluster:verify": "bash test/cluster/verify.sh", + "test:cluster": "bun test packages/cli/test/cluster.e2e.test.ts", "fallow": "bunx fallow", "fallow:dead-code": "bunx fallow --only dead-code", "fallow:pr": "bunx fallow --changed-since main --summary || true", diff --git a/packages/cli/src/commands/check/command.ts b/packages/cli/src/commands/check/command.ts index 74a7866..2bc7d34 100644 --- a/packages/cli/src/commands/check/command.ts +++ b/packages/cli/src/commands/check/command.ts @@ -39,7 +39,7 @@ export const checkCommand: ChxPluginCommand = { const db = pluginContext.executor const database = config.clickhouse?.database - const journalStore = createJournalStore(db) + const journalStore = createJournalStore(db, config.clickhouse?.cluster) const files = await listMigrations(migrationsDir) const journal = await journalStore.readJournal() const databaseMissing = journalStore.databaseMissing diff --git a/packages/cli/src/commands/generate/command.ts b/packages/cli/src/commands/generate/command.ts index 09f6cfc..e662179 100644 --- a/packages/cli/src/commands/generate/command.ts +++ b/packages/cli/src/commands/generate/command.ts @@ -1,5 +1,5 @@ import { generateArtifacts } from '@chkit/codegen' -import { ChxValidationError, planDiff } from '@chkit/core' +import { applyOnClusterToPlan, ChxValidationError, planDiff } from '@chkit/core' import { defineFlags, typedFlags, type ChxPluginCommand } from '../../plugins.js' import { resolveDirs } from '../../runtime/config.js' @@ -163,6 +163,10 @@ async function cmdGenerate(ctx: import('../../plugins.js').ChxPluginCommandConte }).plan } + // Cluster mode: stamp `ON CLUSTER ` onto every DDL statement as a final + // post-pass, after all plan transforms (renames, plugins, scope filtering). + plan = applyOnClusterToPlan(plan, config.clickhouse?.cluster) + if (planMode) { emitGeneratePlanOutput(plan, jsonMode, resolvedScope) return 0 diff --git a/packages/cli/src/commands/migrate/command.ts b/packages/cli/src/commands/migrate/command.ts index b25611e..0750c5d 100644 --- a/packages/cli/src/commands/migrate/command.ts +++ b/packages/cli/src/commands/migrate/command.ts @@ -59,7 +59,7 @@ async function cmdMigrate( throw new Error('clickhouse config is required for migrate (journal is stored in ClickHouse)') } const db = pluginContext.executor - const journalStore = createJournalStore(db) + const journalStore = createJournalStore(db, config.clickhouse?.cluster) const snapshot = await readSnapshot(metaDir) const tableScope = resolveTableScope(tableSelector, tableKeysFromDefinitions(snapshot?.definitions ?? [])) const mode = executeRequested ? 'execute' : 'plan' diff --git a/packages/cli/src/commands/status.ts b/packages/cli/src/commands/status.ts index 2f58c89..078e14f 100644 --- a/packages/cli/src/commands/status.ts +++ b/packages/cli/src/commands/status.ts @@ -21,7 +21,7 @@ export const statusCommand: ChxPluginCommand = { } const db = pluginContext.executor const database = config.clickhouse?.database - const journalStore = createJournalStore(db) + const journalStore = createJournalStore(db, config.clickhouse?.cluster) await mkdir(migrationsDir, { recursive: true }) const files = await listMigrations(migrationsDir) diff --git a/packages/cli/src/runtime/journal-store.ts b/packages/cli/src/runtime/journal-store.ts index 31d5faf..63e79ff 100644 --- a/packages/cli/src/runtime/journal-store.ts +++ b/packages/cli/src/runtime/journal-store.ts @@ -1,4 +1,5 @@ import { isUnknownDatabaseError, type ClickHouseExecutor } from '@chkit/clickhouse' +import { onClusterClause } from '@chkit/core' import type { MigrationJournal, MigrationJournalEntry } from './migration-store.js' import { CLI_VERSION } from './version.js' @@ -141,25 +142,52 @@ function parseOperations(value: unknown): OperationState[] { return decoded.map((row) => operationFromTuple(row as OperationTupleRow)) } -export function createJournalStore(db: ClickHouseExecutor): JournalStore { +export function createJournalStore(db: ClickHouseExecutor, cluster?: string): JournalStore { const journalTable = resolveJournalTableName() - debug('journal', `journal table: ${journalTable}${process.env.CHKIT_JOURNAL_TABLE ? ' (from CHKIT_JOURNAL_TABLE)' : ''}`) - const createTableSql = `CREATE TABLE IF NOT EXISTS ${journalTable} ( + debug('journal', `journal table: ${journalTable}${process.env.CHKIT_JOURNAL_TABLE ? ' (from CHKIT_JOURNAL_TABLE)' : ''}${cluster ? ` (ON CLUSTER ${cluster})` : ''}`) + // In cluster mode the journal must be consistent across every node, so it uses + // a replicated engine with a no-`{shard}` Keeper path (one cluster-wide group) + // created `ON CLUSTER`. The read path already uses SYNC REPLICA + FINAL + + // sequential consistency. Single-node/Cloud keeps the plain engine unchanged. + const onCluster = onClusterClause(cluster) + const journalEngine = cluster + ? "ReplicatedReplacingMergeTree('/clickhouse/tables/{database}/{table}', '{replica}', applied_at)" + : 'ReplacingMergeTree(applied_at)' + const createTableSql = `CREATE TABLE IF NOT EXISTS ${journalTable}${onCluster} ( name String, applied_at DateTime64(3, 'UTC'), checksum String, chkit_version String, migration_completed Bool DEFAULT true, operations ${OPERATIONS_TUPLE_TYPE} DEFAULT [] -) ENGINE = ReplacingMergeTree(applied_at) +) ENGINE = ${journalEngine} ORDER BY (name) SETTINGS index_granularity = 1` let bootstrapped = false let _databaseMissing = false + // Enabling cluster mode on a project that already has a single-node journal + // would otherwise leave migration history non-replicated (and the ON CLUSTER + // schema-upgrade ALTERs could fail on replicas that lack the table). Fail fast + // with actionable guidance instead of silently diverging. + async function assertClusterJournalReplicated(): Promise { + const rows = await db.query<{ engine: string }>( + `SELECT engine FROM system.tables WHERE database = currentDatabase() AND name = '${journalTable}'`, + ) + const engine = rows[0]?.engine + if (engine && !engine.startsWith('Replicated') && !engine.startsWith('Shared')) { + throw new Error( + `Journal table "${journalTable}" already exists with a non-replicated engine (${engine}), but ` + + `clickhouse.cluster is set. A non-replicated journal is unsafe on a cluster. Drop it ` + + `(DROP TABLE ${journalTable} ON CLUSTER '${cluster}' SYNC) or set CHKIT_JOURNAL_TABLE to a new name, then re-run.`, + ) + } + } + async function ensureTable(): Promise { if (bootstrapped) return + if (cluster) await assertClusterJournalReplicated() debug('journal', `probing journal table "${journalTable}"`) try { await db.query(`SELECT name FROM ${journalTable} LIMIT 0`) @@ -205,10 +233,10 @@ SETTINGS index_granularity = 1` // of the new chkit. ALTER ADD COLUMN IF NOT EXISTS is a metadata op, // no data rewrite. await db.command( - `ALTER TABLE ${journalTable} ADD COLUMN IF NOT EXISTS migration_completed Bool DEFAULT true`, + `ALTER TABLE ${journalTable}${onCluster} ADD COLUMN IF NOT EXISTS migration_completed Bool DEFAULT true`, ) await db.command( - `ALTER TABLE ${journalTable} ADD COLUMN IF NOT EXISTS operations ${OPERATIONS_TUPLE_TYPE} DEFAULT []`, + `ALTER TABLE ${journalTable}${onCluster} ADD COLUMN IF NOT EXISTS operations ${OPERATIONS_TUPLE_TYPE} DEFAULT []`, ) } diff --git a/packages/cli/test/cluster.e2e.test.ts b/packages/cli/test/cluster.e2e.test.ts new file mode 100644 index 0000000..8d54b30 --- /dev/null +++ b/packages/cli/test/cluster.e2e.test.ts @@ -0,0 +1,204 @@ +import { describe, expect, test } from 'bun:test' +import { mkdtemp, readdir, readFile, writeFile } from 'node:fs/promises' +import { tmpdir } from 'node:os' +import { join } from 'node:path' + +import { + CORE_ENTRY, + createJournalTableName, + createLiveExecutor, + createPrefix, + formatTestDiagnostic, + runCli, + waitForColumn, + waitForTable, +} from '../src/test/e2e-testkit.js' + +// Cluster-mode e2e against a local replicated ClickHouse cluster +// (test/cluster/docker-compose.yml: 1 shard, 2 replicas, cluster `test_cluster`). +// +// Defaults match that setup; override via env for a different cluster. This file +// lives OUTSIDE src/, so the default `bun test src` suite (the managed-ObsessionDB +// CI job) never collects it. Run explicitly via `bun run test:cluster` after +// `bun run cluster:up`. It hard-fails (never skips) if the cluster is unreachable. +const NODE1 = process.env.CLICKHOUSE_URL ?? 'http://localhost:8123' +const NODE2 = process.env.CHKIT_CLUSTER_E2E_URL2 ?? 'http://localhost:8124' +const USER = process.env.CLICKHOUSE_USER ?? 'default' +const PASSWORD = process.env.CLICKHOUSE_PASSWORD ?? 'clusterpass' +const CLUSTER = process.env.CHKIT_CLUSTER ?? 'test_cluster' +const DATABASE = process.env.CLICKHOUSE_DB ?? 'default' + +function executorFor(url: string) { + return createLiveExecutor({ + clickhouseUrl: url, + clickhouseUser: USER, + clickhousePassword: PASSWORD, + clickhouseDatabase: DATABASE, + }) +} + +function renderSchema(tableName: string, withLabel: boolean): string { + const columns = [ + " { name: 'id', type: 'UInt64' }", + " { name: 'ts', type: 'DateTime' }", + ...(withLabel ? [" { name: 'label', type: 'String' }"] : []), + ].join(',\n') + return `import { schema, table } from '${CORE_ENTRY}' + +export default schema( + table({ + database: '${DATABASE}', + name: '${tableName}', + engine: 'ReplicatedMergeTree', + columns: [ +${columns}, + ], + primaryKey: ['id'], + orderBy: ['id'], + }), +) +` +} + +async function scaffold(tableName: string): Promise<{ dir: string; configPath: string; schemaPath: string; migrationsDir: string }> { + const dir = await mkdtemp(join(tmpdir(), 'chkit-cluster-e2e-')) + const schemaPath = join(dir, 'schema.ts') + const configPath = join(dir, 'clickhouse.config.ts') + const migrationsDir = join(dir, 'chkit', 'migrations') + await writeFile(schemaPath, renderSchema(tableName, false), 'utf8') + await writeFile( + configPath, + `export default { + schema: '${schemaPath}', + outDir: '${join(dir, 'chkit')}', + migrationsDir: '${migrationsDir}', + metaDir: '${join(dir, 'chkit', 'meta')}', + clickhouse: { + url: '${NODE1}', + username: '${USER}', + password: '${PASSWORD}', + database: '${DATABASE}', + cluster: '${CLUSTER}', + }, +} +`, + 'utf8', + ) + return { dir, configPath, schemaPath, migrationsDir } +} + +async function latestMigrationSql(migrationsDir: string): Promise { + const files = (await readdir(migrationsDir)).filter((f) => f.endsWith('.sql')).sort() + const last = files.at(-1) + expect(last).toBeTruthy() + return readFile(join(migrationsDir, last as string), 'utf8') +} + +describe('chkit cluster mode (ON CLUSTER) e2e', () => { + const node1 = executorFor(NODE1) + const node2 = executorFor(NODE2) + + test('fans DDL across replicas, replicates the journal, and is idempotent', async () => { + const prefix = createPrefix('cluster') + const tableName = `${prefix}events` + const journalTable = createJournalTableName('cluster') + const cliEnv = { CHKIT_JOURNAL_TABLE: journalTable } + const { dir, configPath, schemaPath, migrationsDir } = await scaffold(tableName) + + // generate: ON CLUSTER is baked into the migration file (CREATE TABLE + DB). + const generate = runCli(dir, ['generate', '--config', configPath, '--name', 'init', '--json'], cliEnv) + expect(generate.exitCode, formatTestDiagnostic('generate', generate)).toBe(0) + const createSql = await latestMigrationSql(migrationsDir) + expect(createSql).toContain(`CREATE TABLE IF NOT EXISTS ${DATABASE}.${tableName} ON CLUSTER '${CLUSTER}'`) + expect(createSql).toContain(`ENGINE = ReplicatedMergeTree()`) + + // migrate: applies ON CLUSTER DDL. + const migrate = runCli(dir, ['migrate', '--config', configPath, '--execute', '--json'], cliEnv) + expect(migrate.exitCode, formatTestDiagnostic('migrate', migrate)).toBe(0) + const applied = (JSON.parse(migrate.stdout) as { applied: Array<{ name: string }> }).applied + expect(applied.length).toBe(1) + const migrationName = applied[0]?.name as string + + // table exists on BOTH replicas. + await waitForTable(node1, DATABASE, tableName) + await waitForTable(node2, DATABASE, tableName) + + // journal is replicated on BOTH replicas. + for (const node of [node1, node2]) { + await waitForTable(node, DATABASE, journalTable) + const rows = await node.query<{ engine: string }>( + `SELECT engine FROM system.tables WHERE database = '${DATABASE}' AND name = '${journalTable}'`, + ) + expect(rows[0]?.engine).toBe('ReplicatedReplacingMergeTree') + } + + // the migration row written on node1 is visible on node2 (replicated journal). + await node2.command(`SYSTEM SYNC REPLICA ${DATABASE}.\`${journalTable}\``) + const journalRows = await node2.query<{ name: string }>( + `SELECT name FROM ${DATABASE}.\`${journalTable}\` FINAL WHERE migration_completed = true`, + ) + expect(journalRows.map((r) => r.name)).toContain(migrationName) + + // idempotent: re-running migrate applies nothing. + const rerun = runCli(dir, ['migrate', '--config', configPath, '--execute', '--json'], cliEnv) + expect(rerun.exitCode, formatTestDiagnostic('migrate rerun', rerun)).toBe(0) + expect((JSON.parse(rerun.stdout) as { applied: unknown[] }).applied.length).toBe(0) + + // ALTER fans out too: add a column, generate + migrate, verify on BOTH replicas. + await writeFile(schemaPath, renderSchema(tableName, true), 'utf8') + const generateAlter = runCli(dir, ['generate', '--config', configPath, '--name', 'add_label', '--json'], cliEnv) + expect(generateAlter.exitCode, formatTestDiagnostic('generate alter', generateAlter)).toBe(0) + const alterSql = await latestMigrationSql(migrationsDir) + expect(alterSql).toContain(`ALTER TABLE ${DATABASE}.${tableName} ON CLUSTER '${CLUSTER}' ADD COLUMN IF NOT EXISTS \`label\``) + + const migrateAlter = runCli(dir, ['migrate', '--config', configPath, '--execute', '--json'], cliEnv) + expect(migrateAlter.exitCode, formatTestDiagnostic('migrate alter', migrateAlter)).toBe(0) + await waitForColumn(node1, DATABASE, tableName, 'label') + await waitForColumn(node2, DATABASE, tableName, 'label') + + // RENAME fans out, with ON CLUSTER at the END of the statement (ClickHouse + // places it after the `name TO new_name` list, not after the source name). + const renamedTable = `${tableName}_renamed` + await writeFile(schemaPath, renderSchema(renamedTable, true), 'utf8') + const generateRename = runCli( + dir, + ['generate', '--config', configPath, '--name', 'rename', '--rename-table', `${DATABASE}.${tableName}=${DATABASE}.${renamedTable}`, '--json'], + cliEnv, + ) + expect(generateRename.exitCode, formatTestDiagnostic('generate rename', generateRename)).toBe(0) + const renameSql = await latestMigrationSql(migrationsDir) + expect(renameSql).toContain( + `RENAME TABLE IF EXISTS ${DATABASE}.${tableName} TO ${DATABASE}.${renamedTable} ON CLUSTER '${CLUSTER}';`, + ) + const migrateRename = runCli(dir, ['migrate', '--config', configPath, '--execute', '--json'], cliEnv) + expect(migrateRename.exitCode, formatTestDiagnostic('migrate rename', migrateRename)).toBe(0) + await waitForTable(node1, DATABASE, renamedTable) + await waitForTable(node2, DATABASE, renamedTable) + + // cleanup (also exercises DROP ... ON CLUSTER). + await node1.command(`DROP TABLE IF EXISTS ${DATABASE}.${renamedTable} ON CLUSTER '${CLUSTER}' SYNC`) + await node1.command(`DROP TABLE IF EXISTS ${DATABASE}.\`${journalTable}\` ON CLUSTER '${CLUSTER}' SYNC`) + }, 60_000) + + test('rejects a pre-existing non-replicated journal when cluster mode is on', async () => { + const journalTable = createJournalTableName('cluster_p2') + const cliEnv = { CHKIT_JOURNAL_TABLE: journalTable } + const tableName = `${createPrefix('cluster_p2')}events` + const { dir, configPath } = await scaffold(tableName) + + // Simulate a project that ran chkit single-node before enabling cluster mode: + // a plain (non-replicated) journal already exists. + await node1.command( + `CREATE TABLE ${DATABASE}.\`${journalTable}\` (name String, applied_at DateTime64(3, 'UTC'), checksum String, chkit_version String) ENGINE = ReplacingMergeTree(applied_at) ORDER BY name`, + ) + + const generate = runCli(dir, ['generate', '--config', configPath, '--name', 'init', '--json'], cliEnv) + expect(generate.exitCode, formatTestDiagnostic('generate', generate)).toBe(0) + + const status = runCli(dir, ['status', '--config', configPath], cliEnv) + expect(status.exitCode).not.toBe(0) + expect(`${status.stdout}${status.stderr}`).toContain('non-replicated engine') + + await node1.command(`DROP TABLE IF EXISTS ${DATABASE}.\`${journalTable}\` ON CLUSTER '${CLUSTER}' SYNC`) + }, 60_000) +}) diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 080f349..c3ae10c 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -11,6 +11,7 @@ export { createSnapshot } from './snapshot.js' export { splitTopLevelComma } from './key-clause.js' export { normalizeEngine, normalizeSQLFragment } from './sql-normalizer.js' export { toCreateSQL } from './sql.js' +export { applyOnClusterToPlan, onClusterClause } from './on-cluster.js' export { canonicalizeCodec, codec, diff --git a/packages/core/src/model-types.ts b/packages/core/src/model-types.ts index f272573..2b3bbf0 100644 --- a/packages/core/src/model-types.ts +++ b/packages/core/src/model-types.ts @@ -181,6 +181,14 @@ export interface ChxUserClickHouseConfig { password?: string database?: string secure?: boolean + /** + * Cluster name for self-managed multi-node clusters. When set, chkit emits + * `ON CLUSTER ` on generated DDL and stores its migration journal in a + * replicated engine. Leave unset for single-node, ClickHouse Cloud, or + * ObsessionDB (SharedMergeTree auto-replicates — `ON CLUSTER` is unnecessary). + * Accepts an identifier (e.g. `"my_cluster"`) or a macro (e.g. `"{cluster}"`). + */ + cluster?: string } export interface ChxResolvedClickHouseConfig { @@ -189,6 +197,7 @@ export interface ChxResolvedClickHouseConfig { password: string database: string secure: boolean + cluster?: string } export interface ChxInlinePluginRegistration< diff --git a/packages/core/src/model.ts b/packages/core/src/model.ts index 1d6490f..548b715 100644 --- a/packages/core/src/model.ts +++ b/packages/core/src/model.ts @@ -32,6 +32,19 @@ export function defineConfig(config: ChxConfigInput) return config } +// A cluster name is interpolated into `ON CLUSTER ''`, so constrain it to +// an identifier or a `{macro}` to keep it injection-safe and fail fast on typos. +const CLUSTER_NAME_PATTERN = /^([A-Za-z_][A-Za-z0-9_]*|\{[A-Za-z_][A-Za-z0-9_]*\})$/ + +function assertValidClusterName(name: string): string { + if (!CLUSTER_NAME_PATTERN.test(name)) { + throw new Error( + `Invalid clickhouse.cluster "${name}". Expected an identifier (e.g. "my_cluster") or a macro (e.g. "{cluster}").`, + ) + } + return name +} + export function resolveConfig(config: ChxUserConfig): ChxResolvedConfig { const outDir = config.outDir ?? './chkit' const migrationsDir = config.migrationsDir ?? join(outDir, 'migrations') @@ -59,6 +72,9 @@ export function resolveConfig(config: ChxUserConfig): ChxResolvedConfig { password: config.clickhouse.password ?? '', database: config.clickhouse.database ?? 'default', secure: config.clickhouse.secure ?? false, + cluster: config.clickhouse.cluster + ? assertValidClusterName(config.clickhouse.cluster) + : undefined, } : undefined, } diff --git a/packages/core/src/on-cluster.test.ts b/packages/core/src/on-cluster.test.ts new file mode 100644 index 0000000..e7c8a90 --- /dev/null +++ b/packages/core/src/on-cluster.test.ts @@ -0,0 +1,117 @@ +import { describe, expect, test } from 'bun:test' + +import { resolveConfig } from './model.js' +import type { MigrationOperation, MigrationPlan } from './model-types.js' +import { applyOnClusterToPlan, onClusterClause } from './on-cluster.js' + +function op(type: MigrationOperation['type'], sql: string): MigrationOperation { + return { type, key: 'k', risk: 'safe', sql } +} + +function planOf(operations: MigrationOperation[]): MigrationPlan { + return { operations, riskSummary: { safe: 0, caution: 0, danger: 0 }, renameSuggestions: [] } +} + +describe('onClusterClause', () => { + test('is empty when no cluster is configured', () => { + expect(onClusterClause(undefined)).toBe('') + }) + + test('renders a single-quoted clause', () => { + expect(onClusterClause('my_cluster')).toBe(" ON CLUSTER 'my_cluster'") + }) + + test('supports the macro form', () => { + expect(onClusterClause('{cluster}')).toBe(" ON CLUSTER '{cluster}'") + }) +}) + +describe('applyOnClusterToPlan', () => { + test('returns the plan unchanged when cluster is undefined', () => { + const plan = planOf([op('drop_table', 'DROP TABLE IF EXISTS db.t;')]) + expect(applyOnClusterToPlan(plan, undefined)).toBe(plan) + }) + + test('injects ON CLUSTER after the object reference for every statement shape', () => { + const plan = planOf([ + op('create_table', 'CREATE TABLE IF NOT EXISTS db.t\n(\n `id` UInt64\n) ENGINE = MergeTree()\nORDER BY (`id`);'), + op('create_view', 'CREATE VIEW IF NOT EXISTS db.v AS\nSELECT 1;'), + op('create_materialized_view', 'CREATE MATERIALIZED VIEW IF NOT EXISTS db.mv TO db.t AS\nSELECT 1;'), + op('create_materialized_view', 'CREATE MATERIALIZED VIEW IF NOT EXISTS db.mv\nREFRESH EVERY 1 HOUR TO db.t AS\nSELECT 1;'), + op('create_database', 'CREATE DATABASE IF NOT EXISTS db;'), + op('alter_table_add_column', 'ALTER TABLE db.t ADD COLUMN IF NOT EXISTS `c` String;'), + op('alter_table_rename_column', 'ALTER TABLE db.t RENAME COLUMN IF EXISTS `a` TO `b`;'), + op('alter_table_rename_table', 'RENAME TABLE IF EXISTS db.a TO db.b;'), + op('drop_table', 'DROP TABLE IF EXISTS db.t;'), + op('drop_materialized_view', 'DROP TABLE IF EXISTS db.mv SYNC;'), + op('drop_view', 'DROP VIEW IF EXISTS db.v;'), + ]) + + const sql = applyOnClusterToPlan(plan, 'c').operations.map((o) => o.sql) + + expect(sql).toEqual([ + "CREATE TABLE IF NOT EXISTS db.t ON CLUSTER 'c'\n(\n `id` UInt64\n) ENGINE = MergeTree()\nORDER BY (`id`);", + "CREATE VIEW IF NOT EXISTS db.v ON CLUSTER 'c' AS\nSELECT 1;", + "CREATE MATERIALIZED VIEW IF NOT EXISTS db.mv ON CLUSTER 'c' TO db.t AS\nSELECT 1;", + "CREATE MATERIALIZED VIEW IF NOT EXISTS db.mv ON CLUSTER 'c'\nREFRESH EVERY 1 HOUR TO db.t AS\nSELECT 1;", + "CREATE DATABASE IF NOT EXISTS db ON CLUSTER 'c';", + "ALTER TABLE db.t ON CLUSTER 'c' ADD COLUMN IF NOT EXISTS `c` String;", + "ALTER TABLE db.t ON CLUSTER 'c' RENAME COLUMN IF EXISTS `a` TO `b`;", + "RENAME TABLE IF EXISTS db.a TO db.b ON CLUSTER 'c';", + "DROP TABLE IF EXISTS db.t ON CLUSTER 'c';", + "DROP TABLE IF EXISTS db.mv ON CLUSTER 'c' SYNC;", + "DROP VIEW IF EXISTS db.v ON CLUSTER 'c';", + ]) + }) + + test('injects into rename-suggestion confirmation SQL', () => { + const plan: MigrationPlan = { + operations: [], + riskSummary: { safe: 0, caution: 0, danger: 0 }, + renameSuggestions: [ + { + kind: 'column', + database: 'db', + table: 't', + from: 'a', + to: 'b', + confidence: 'high', + reason: 'x', + dropOperationKey: 'd', + addOperationKey: 'a', + confirmationSQL: 'ALTER TABLE db.t RENAME COLUMN IF EXISTS `a` TO `b`;', + }, + ], + } + + expect(applyOnClusterToPlan(plan, 'c').renameSuggestions[0]?.confirmationSQL).toBe( + "ALTER TABLE db.t ON CLUSTER 'c' RENAME COLUMN IF EXISTS `a` TO `b`;", + ) + }) + + test('leaves statements without a known anchor untouched', () => { + const plan = planOf([op('drop_table', 'TRUNCATE TABLE db.t;')]) + expect(applyOnClusterToPlan(plan, 'c').operations[0]?.sql).toBe('TRUNCATE TABLE db.t;') + }) +}) + +describe('resolveConfig cluster validation', () => { + test('passes through an identifier and a macro', () => { + expect(resolveConfig({ schema: 's', clickhouse: { url: 'u', cluster: 'my_cluster' } }).clickhouse?.cluster).toBe( + 'my_cluster', + ) + expect(resolveConfig({ schema: 's', clickhouse: { url: 'u', cluster: '{cluster}' } }).clickhouse?.cluster).toBe( + '{cluster}', + ) + }) + + test('defaults to undefined when no cluster is set', () => { + expect(resolveConfig({ schema: 's', clickhouse: { url: 'u' } }).clickhouse?.cluster).toBeUndefined() + }) + + test('rejects an injection-unsafe cluster name', () => { + expect(() => resolveConfig({ schema: 's', clickhouse: { url: 'u', cluster: "x'; DROP" } })).toThrow( + /Invalid clickhouse.cluster/, + ) + }) +}) diff --git a/packages/core/src/on-cluster.ts b/packages/core/src/on-cluster.ts new file mode 100644 index 0000000..ea11383 --- /dev/null +++ b/packages/core/src/on-cluster.ts @@ -0,0 +1,69 @@ +import type { MigrationPlan } from './model-types.js' + +// DDL statement prefixes where `ON CLUSTER` goes immediately after the object +// reference that follows the prefix. This is the set the planner + plan-pipeline +// produce, minus RENAME (handled separately below); anything else is left +// untouched (e.g. plugin-emitted SQL). +const ON_CLUSTER_ANCHORS = [ + 'CREATE TABLE IF NOT EXISTS', + 'CREATE VIEW IF NOT EXISTS', + 'CREATE MATERIALIZED VIEW IF NOT EXISTS', + 'CREATE DATABASE IF NOT EXISTS', + 'ALTER TABLE', + 'DROP TABLE IF EXISTS', + 'DROP VIEW IF EXISTS', +] as const + +/** + * Renders ` ON CLUSTER ''`, or `''` when cluster mode is off. The name is + * validated at config resolution (`assertValidClusterName`), so it is safe to + * interpolate. Single-quoted so the `{cluster}` macro form also works. + */ +export function onClusterClause(cluster: string | undefined): string { + return cluster ? ` ON CLUSTER '${cluster}'` : '' +} + +function injectOnClusterClause(sql: string, clause: string): string { + // RENAME TABLE is the exception: ClickHouse places `ON CLUSTER` after the full + // `name TO new_name` list (at the very end), not after the source name. + if (sql.startsWith('RENAME TABLE ')) { + return sql.endsWith(';') ? `${sql.slice(0, -1)}${clause};` : `${sql}${clause}` + } + for (const anchor of ON_CLUSTER_ANCHORS) { + if (!sql.startsWith(`${anchor} `)) continue + const rest = sql.slice(anchor.length + 1) + // The object reference (`db.name` or `db`) is the run of characters up to + // the next space, `;`, or `(` — `ON CLUSTER` slots in right after it. + const ref = rest.match(/^[^\s;(]+/)?.[0] + if (!ref) return sql + return `${anchor} ${ref}${clause}${rest.slice(ref.length)}` + } + return sql +} + +/** + * Injects `ON CLUSTER ` into every DDL statement of a migration plan (and + * the rename-suggestion confirmations). No-op when `cluster` is undefined. + * + * Done as a post-pass over the already-rendered SQL so the planner and renderers + * stay cluster-agnostic — `ON CLUSTER` is an execution directive, never part of + * the schema model or drift comparison. + */ +export function applyOnClusterToPlan( + plan: MigrationPlan, + cluster: string | undefined, +): MigrationPlan { + if (!cluster) return plan + const clause = onClusterClause(cluster) + return { + ...plan, + operations: plan.operations.map((operation) => ({ + ...operation, + sql: injectOnClusterClause(operation.sql, clause), + })), + renameSuggestions: plan.renameSuggestions.map((suggestion) => ({ + ...suggestion, + confirmationSQL: injectOnClusterClause(suggestion.confirmationSQL, clause), + })), + } +} diff --git a/test/cluster/README.md b/test/cluster/README.md new file mode 100644 index 0000000..aaa1bd1 --- /dev/null +++ b/test/cluster/README.md @@ -0,0 +1,62 @@ +# Local ClickHouse cluster (chkit cluster-mode e2e) + +A self-contained replicated ClickHouse cluster for verifying chkit's `ON CLUSTER` +support locally. **Deliberately not part of normal CI** (it would slow the pipeline); +run it on demand for cluster e2e verification. + +## Topology + +``` + ┌─────────┐ + │ keeper │ ClickHouse Keeper (coordination + distributed-DDL queue) + └────┬────┘ + ┌───────┴───────┐ +┌────▼────┐ ┌────▼────┐ +│ ch1 │ │ ch2 │ 1 shard, 2 replicas +│ replica │◄───►│ replica │ cluster name: test_cluster +└─────────┘ └─────────┘ +``` + +| Node | HTTP (host) | Native (host) | macros | +|------|-------------|---------------|--------| +| ch1 | `http://localhost:8123` | `localhost:9000` | shard=01, replica=ch1 | +| ch2 | `http://localhost:8124` | `localhost:9001` | shard=01, replica=ch2 | + +Auth: user `default`, password `clusterpass`. Cluster name: `test_cluster`. + +## Run + +```bash +# from repo root +bun run cluster:up # docker compose ... up -d (waits for health) +bun run cluster:down # stop + remove volumes (full reset) +bun run cluster:logs # tail logs +bun run cluster:verify # smoke-test: ON CLUSTER create + cross-replica replication +``` + +or directly: + +```bash +docker compose -f test/cluster/docker-compose.yml up -d +docker compose -f test/cluster/docker-compose.yml down -v +``` + +## Point chkit / tests at it + +```ts +clickhouse: { + url: 'http://localhost:8123', + username: 'default', + password: 'clusterpass', + cluster: 'test_cluster', // ← enables ON CLUSTER mode +} +``` + +Cluster e2e tests are gated behind `CHKIT_CLUSTER_E2E=1` and hard-fail (never +skip) if the cluster isn't reachable. + +## Scaling to multiple shards (later) + +To exercise the multi-shard journal edge case, add `ch3`/`ch4` with `shard=02` +and a second `` block in `config.d/cluster.xml`. Keep the cluster name +`test_cluster`. See `thoughts/cluster-support-design.md` §7.2 (Tier 2). diff --git a/test/cluster/ch1/node.xml b/test/cluster/ch1/node.xml new file mode 100644 index 0000000..a61f0e6 --- /dev/null +++ b/test/cluster/ch1/node.xml @@ -0,0 +1,13 @@ + + + + test_cluster + 01 + ch1 + + + + ch1 + diff --git a/test/cluster/ch2/node.xml b/test/cluster/ch2/node.xml new file mode 100644 index 0000000..b563a2e --- /dev/null +++ b/test/cluster/ch2/node.xml @@ -0,0 +1,9 @@ + + + test_cluster + 01 + ch2 + + + ch2 + diff --git a/test/cluster/config.d/cluster.xml b/test/cluster/config.d/cluster.xml new file mode 100644 index 0000000..78c7b1f --- /dev/null +++ b/test/cluster/config.d/cluster.xml @@ -0,0 +1,43 @@ + + 0.0.0.0 + + + + + keeper + 9181 + + + + + + + chkit_cluster_secret + + true + + ch1 + 9000 + + + ch2 + 9000 + + + + + + + + /clickhouse/task_queue/ddl + + + + /clickhouse/tables/{uuid}/{shard} + {replica} + diff --git a/test/cluster/docker-compose.yml b/test/cluster/docker-compose.yml new file mode 100644 index 0000000..2d8abf3 --- /dev/null +++ b/test/cluster/docker-compose.yml @@ -0,0 +1,72 @@ +# Local ClickHouse replicated cluster for chkit cluster-mode e2e verification. +# +# 1 shard x 2 replicas (ch1, ch2) + 1 ClickHouse Keeper. +# Cluster name: test_cluster +# +# NOT wired into normal CI (would slow it down). Run locally: +# docker compose -f test/cluster/docker-compose.yml up -d +# docker compose -f test/cluster/docker-compose.yml down -v # reset +# +# Endpoints (from host): +# ch1 http://localhost:8123 (native 9000) +# ch2 http://localhost:8124 (native 9001) +# user: default password: clusterpass +name: chkit-cluster + +x-ch-common: &ch-common + image: clickhouse/clickhouse-server:25.3 + environment: + CLICKHOUSE_PASSWORD: clusterpass + ulimits: + nofile: { soft: 262144, hard: 262144 } + networks: [chkit] + depends_on: + keeper: + condition: service_healthy + healthcheck: + test: ["CMD-SHELL", "clickhouse-client --password \"$$CLICKHOUSE_PASSWORD\" --query 'SELECT 1' || exit 1"] + interval: 5s + timeout: 3s + retries: 30 + start_period: 20s + +services: + keeper: + image: clickhouse/clickhouse-keeper:25.3 + container_name: chkit-keeper + hostname: keeper + volumes: + - ./keeper/keeper_config.xml:/etc/clickhouse-keeper/keeper_config.xml:ro + networks: [chkit] + healthcheck: + test: ["CMD", "bash", "-c", "exec 3<>/dev/tcp/127.0.0.1/9181"] + interval: 5s + timeout: 3s + retries: 30 + start_period: 5s + + ch1: + <<: *ch-common + container_name: chkit-ch1 + hostname: ch1 + volumes: + - ./config.d/cluster.xml:/etc/clickhouse-server/config.d/cluster.xml:ro + - ./ch1/node.xml:/etc/clickhouse-server/config.d/node.xml:ro + ports: + - "8123:8123" + - "9000:9000" + + ch2: + <<: *ch-common + container_name: chkit-ch2 + hostname: ch2 + volumes: + - ./config.d/cluster.xml:/etc/clickhouse-server/config.d/cluster.xml:ro + - ./ch2/node.xml:/etc/clickhouse-server/config.d/node.xml:ro + ports: + - "8124:8123" + - "9001:9000" + +networks: + chkit: + driver: bridge diff --git a/test/cluster/keeper/keeper_config.xml b/test/cluster/keeper/keeper_config.xml new file mode 100644 index 0000000..4181384 --- /dev/null +++ b/test/cluster/keeper/keeper_config.xml @@ -0,0 +1,29 @@ + + + information + true + + 0.0.0.0 + 4096 + + + 9181 + 1 + /var/lib/clickhouse-keeper/coordination/log + /var/lib/clickhouse-keeper/coordination/snapshots + + + 10000 + 30000 + warning + + + + + 1 + keeper + 9234 + + + + diff --git a/test/cluster/verify.sh b/test/cluster/verify.sh new file mode 100755 index 0000000..98e58df --- /dev/null +++ b/test/cluster/verify.sh @@ -0,0 +1,29 @@ +#!/usr/bin/env bash +# Smoke-test the local cluster: ON CLUSTER create + cross-replica replication. +# Exits non-zero on failure. Run via `bun run cluster:verify`. +set -euo pipefail + +CH1=(docker exec chkit-ch1 clickhouse-client --password clusterpass) +CH2=(docker exec chkit-ch2 clickhouse-client --password clusterpass) +T="default.chkit_cluster_smoke" + +echo "==> cluster topology (test_cluster)" +"${CH1[@]}" -q "SELECT host_name, shard_num, replica_num FROM system.clusters WHERE cluster='test_cluster' ORDER BY replica_num FORMAT PrettyCompactMonoBlock" + +echo "==> ON CLUSTER create + replicate ch1 -> ch2" +"${CH1[@]}" --multiquery -q " +DROP TABLE IF EXISTS $T ON CLUSTER test_cluster SYNC; +CREATE TABLE $T ON CLUSTER test_cluster (id UInt64) ENGINE = ReplicatedMergeTree ORDER BY id; +INSERT INTO $T VALUES (1),(2),(3); +" >/dev/null + +"${CH2[@]}" -q "SYSTEM SYNC REPLICA $T" >/dev/null +COUNT="$("${CH2[@]}" -q "SELECT count() FROM $T")" +"${CH1[@]}" -q "DROP TABLE IF EXISTS $T ON CLUSTER test_cluster SYNC" >/dev/null + +if [ "$COUNT" = "3" ]; then + echo "PASS: 3 rows replicated ch1 -> ch2" +else + echo "FAIL: expected 3 rows on ch2, got '$COUNT'" >&2 + exit 1 +fi