Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/cluster-support.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"chkit": patch
"@chkit/core": patch
---

Add ClickHouse cluster support. Set `clickhouse.cluster` in your config to run all generated DDL `ON CLUSTER <name>` and store the migration journal in a replicated engine — for self-managed multi-node clusters. Your table engines are passed through unchanged (declare `ReplicatedMergeTree` yourself). Leave `cluster` unset for single-node, ClickHouse Cloud, or ObsessionDB, where replication is automatic and `ON CLUSTER` is unnecessary.
22 changes: 22 additions & 0 deletions apps/docs/src/content/docs/configuration/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,28 @@ export default defineConfig({
})
```

## Cluster mode (`ON CLUSTER`)

For self-managed multi-node ClickHouse clusters, set `clickhouse.cluster` to the cluster name from your server's `remote_servers` config:

```ts
clickhouse: {
url: process.env.CLICKHOUSE_URL ?? 'http://localhost:9000',
password: process.env.CLICKHOUSE_PASSWORD ?? '',
database: 'default',
cluster: 'my_cluster',
},
```

When `cluster` is set, chkit:

- emits every generated DDL statement with an `ON CLUSTER <name>` clause, so `generate` bakes it into the migration files and `migrate` propagates each change to all nodes via ClickHouse's distributed DDL queue, and
- creates its migration journal (`_chkit_migrations`) as a `ReplicatedReplacingMergeTree`, keeping applied-migration history consistent across every node — so running `migrate` against a load-balanced endpoint never re-applies migrations.

chkit does **not** rewrite your table engines: declare `ReplicatedMergeTree` (or another `Replicated*`/`Shared*` variant) yourself for tables whose data should replicate. Empty-argument engines (e.g. `ENGINE = ReplicatedMergeTree`) are recommended so the server's `default_replica_path` supplies a collision-free Keeper path, which also keeps drop-and-recreate safe.

Leave `cluster` unset for single-node servers, ClickHouse Cloud, or ObsessionDB, where replication is automatic (SharedMergeTree) and `ON CLUSTER` is unnecessary. The value accepts an identifier (`my_cluster`) or a macro (`{cluster}`) when your nodes define one.

## User profile config fallback

Project-scoped commands (`generate`, `migrate`, `status`, `drift`, `check`, `codegen`, `pull`) always require a project config in the working directory.
Expand Down
5 changes: 5 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
"test": "turbo run test",
"test:turbo": "turbo run test",
"test:env": "doppler run --project chkit --config ci -- bun run test",
"cluster:up": "docker compose -f test/cluster/docker-compose.yml up -d --wait",
"cluster:down": "docker compose -f test/cluster/docker-compose.yml down -v",
"cluster:logs": "docker compose -f test/cluster/docker-compose.yml logs -f",
"cluster:verify": "bash test/cluster/verify.sh",
"test:cluster": "bun test packages/cli/test/cluster.e2e.test.ts",
"fallow": "bunx fallow",
"fallow:dead-code": "bunx fallow --only dead-code",
"fallow:pr": "bunx fallow --changed-since main --summary || true",
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/check/command.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
name: 'check',
description: 'Run policy checks for CI and release gates',
flags: [STRICT_FLAG],
async run(context): Promise<undefined | number> {

Check failure on line 26 in packages/cli/src/commands/check/command.ts

View workflow job for this annotation

GitHub Actions / verify

High CRAP score (critical)

Function 'run' has a CRAP score of 272.0 (threshold: 30.0). • Severity: critical • Cyclomatic: 16 • Cognitive: 13 • CRAP: 272.0 (threshold: 30.0) • Lines: 99 CRAP combines complexity with coverage: high CRAP means changes here carry high risk. Consider adding tests, simplifying the function, or both.
const { flags, config, configPath, pluginRuntime, pluginContext } = context
const f = typedFlags(flags, [...GLOBAL_FLAGS, STRICT_FLAG] as const)
const strict = f['--strict'] === true
Expand All @@ -39,7 +39,7 @@
const db = pluginContext.executor
const database = config.clickhouse?.database

const journalStore = createJournalStore(db)
const journalStore = createJournalStore(db, config.clickhouse?.cluster)
const files = await listMigrations(migrationsDir)
const journal = await journalStore.readJournal()
const databaseMissing = journalStore.databaseMissing
Expand Down
6 changes: 5 additions & 1 deletion packages/cli/src/commands/generate/command.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { generateArtifacts } from '@chkit/codegen'
import { ChxValidationError, planDiff } from '@chkit/core'
import { applyOnClusterToPlan, ChxValidationError, planDiff } from '@chkit/core'

import { defineFlags, typedFlags, type ChxPluginCommand } from '../../plugins.js'
import { resolveDirs } from '../../runtime/config.js'
Expand Down Expand Up @@ -50,7 +50,7 @@
run: cmdGenerate,
}

async function cmdGenerate(ctx: import('../../plugins.js').ChxPluginCommandContext): Promise<undefined | number> {

Check failure on line 53 in packages/cli/src/commands/generate/command.ts

View workflow job for this annotation

GitHub Actions / verify

High CRAP score (critical)

Function 'cmdGenerate' has a CRAP score of 552.0 (threshold: 30.0). • Severity: critical • Cyclomatic: 23 • Cognitive: 28 • CRAP: 552.0 (threshold: 30.0) • Lines: 163 CRAP combines complexity with coverage: high CRAP means changes here carry high risk. Consider adding tests, simplifying the function, or both.
const { flags, config, configPath, pluginRuntime } = ctx
const dirs = resolveDirs(config)
const f = typedFlags(flags, [...GLOBAL_FLAGS, ...GENERATE_FLAGS] as const)
Expand Down Expand Up @@ -163,6 +163,10 @@
}).plan
}

// Cluster mode: stamp `ON CLUSTER <name>` onto every DDL statement as a final
// post-pass, after all plan transforms (renames, plugins, scope filtering).
plan = applyOnClusterToPlan(plan, config.clickhouse?.cluster)

if (planMode) {
emitGeneratePlanOutput(plan, jsonMode, resolvedScope)
return 0
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/migrate/command.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
import { mkdir, readFile } from 'node:fs/promises'
import { join } from 'node:path'

Expand Down Expand Up @@ -42,7 +42,7 @@
run: cmdMigrate,
}

async function cmdMigrate(

Check failure on line 45 in packages/cli/src/commands/migrate/command.ts

View workflow job for this annotation

GitHub Actions / verify

High CRAP score (critical)

Function 'cmdMigrate' has a CRAP score of 2352.0 (threshold: 30.0). • Severity: critical • Cyclomatic: 48 • Cognitive: 71 • CRAP: 2352.0 (threshold: 30.0) • Lines: 207 CRAP combines complexity with coverage: high CRAP means changes here carry high risk. Consider adding tests, simplifying the function, or both.
runCtx: import('../../plugins.js').ChxPluginCommandContext,
): Promise<undefined | number> {
const { flags, config, configPath, pluginRuntime, pluginContext } = runCtx
Expand All @@ -59,7 +59,7 @@
throw new Error('clickhouse config is required for migrate (journal is stored in ClickHouse)')
}
const db = pluginContext.executor
const journalStore = createJournalStore(db)
const journalStore = createJournalStore(db, config.clickhouse?.cluster)
const snapshot = await readSnapshot(metaDir)
const tableScope = resolveTableScope(tableSelector, tableKeysFromDefinitions(snapshot?.definitions ?? []))
const mode = executeRequested ? 'execute' : 'plan'
Expand All @@ -70,13 +70,13 @@
configPath,
tableScope,
flags,
})

await mkdir(migrationsDir, { recursive: true })
const files = await listMigrations(migrationsDir)
const journal = await journalStore.readJournal()
const appliedNames = new Set(journal.applied.map((entry) => entry.name))
const pendingAll = files.filter((file) => !appliedNames.has(file))

Check warning on line 79 in packages/cli/src/commands/migrate/command.ts

View workflow job for this annotation

GitHub Actions / verify

Code duplication

11 duplicated lines (63 tokens) 2 instances found. Also in: → src/commands/status.ts:24-34 Extract a shared function to eliminate this duplication.
debug('migrate', `migrations: total=${files.length}, applied=${journal.applied.length}, pending=${pendingAll.length}`)

const checksumMismatches = await findChecksumMismatches(migrationsDir, journal)
Expand Down
2 changes: 1 addition & 1 deletion packages/cli/src/commands/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
name: 'status',
description: 'Show migration status and checksum mismatch information',
flags: [],
async run(context): Promise<undefined | number> {

Check failure on line 14 in packages/cli/src/commands/status.ts

View workflow job for this annotation

GitHub Actions / verify

High CRAP score (critical)

Function 'run' has a CRAP score of 156.0 (threshold: 30.0). • Severity: critical • Cyclomatic: 12 • Cognitive: 11 • CRAP: 156.0 (threshold: 30.0) • Lines: 63 CRAP combines complexity with coverage: high CRAP means changes here carry high risk. Consider adding tests, simplifying the function, or both.
const { flags, config, pluginContext } = context
const jsonMode = flags['--json'] === true
const { migrationsDir } = resolveDirs(config)
Expand All @@ -21,17 +21,17 @@
}
const db = pluginContext.executor
const database = config.clickhouse?.database
const journalStore = createJournalStore(db)
const journalStore = createJournalStore(db, config.clickhouse?.cluster)

await mkdir(migrationsDir, { recursive: true })
const files = await listMigrations(migrationsDir)
const journal = await journalStore.readJournal()
const appliedNames = new Set(journal.applied.map((entry) => entry.name))
// Scope "Applied" to migrations that exist in this project's migrations dir
// (#31). The journal table can be shared across tenants/services on
// ObsessionDB, so journal.applied.length counts other projects' rows too and
// can exceed this project's total. The intersection is always <= total.
const applied = files.filter((f) => appliedNames.has(f))

Check warning on line 34 in packages/cli/src/commands/status.ts

View workflow job for this annotation

GitHub Actions / verify

Code duplication

11 duplicated lines (63 tokens) 2 instances found. Also in: → commands/migrate/command.ts:73-79 Extract a shared function to eliminate this duplication.
const pending = files.filter((f) => !appliedNames.has(f))
const checksumMismatches = await findChecksumMismatches(migrationsDir, journal)

Expand Down
40 changes: 34 additions & 6 deletions packages/cli/src/runtime/journal-store.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { isUnknownDatabaseError, type ClickHouseExecutor } from '@chkit/clickhouse'
import { onClusterClause } from '@chkit/core'

import type { MigrationJournal, MigrationJournalEntry } from './migration-store.js'
import { CLI_VERSION } from './version.js'
Expand Down Expand Up @@ -141,25 +142,52 @@ function parseOperations(value: unknown): OperationState[] {
return decoded.map((row) => operationFromTuple(row as OperationTupleRow))
}

export function createJournalStore(db: ClickHouseExecutor): JournalStore {
export function createJournalStore(db: ClickHouseExecutor, cluster?: string): JournalStore {
const journalTable = resolveJournalTableName()
debug('journal', `journal table: ${journalTable}${process.env.CHKIT_JOURNAL_TABLE ? ' (from CHKIT_JOURNAL_TABLE)' : ''}`)
const createTableSql = `CREATE TABLE IF NOT EXISTS ${journalTable} (
debug('journal', `journal table: ${journalTable}${process.env.CHKIT_JOURNAL_TABLE ? ' (from CHKIT_JOURNAL_TABLE)' : ''}${cluster ? ` (ON CLUSTER ${cluster})` : ''}`)
// In cluster mode the journal must be consistent across every node, so it uses
// a replicated engine with a no-`{shard}` Keeper path (one cluster-wide group)
// created `ON CLUSTER`. The read path already uses SYNC REPLICA + FINAL +
// sequential consistency. Single-node/Cloud keeps the plain engine unchanged.
const onCluster = onClusterClause(cluster)
const journalEngine = cluster
? "ReplicatedReplacingMergeTree('/clickhouse/tables/{database}/{table}', '{replica}', applied_at)"
: 'ReplacingMergeTree(applied_at)'
const createTableSql = `CREATE TABLE IF NOT EXISTS ${journalTable}${onCluster} (
name String,
applied_at DateTime64(3, 'UTC'),
checksum String,
chkit_version String,
migration_completed Bool DEFAULT true,
operations ${OPERATIONS_TUPLE_TYPE} DEFAULT []
) ENGINE = ReplacingMergeTree(applied_at)
) ENGINE = ${journalEngine}
ORDER BY (name)
SETTINGS index_granularity = 1`

let bootstrapped = false
let _databaseMissing = false

// Enabling cluster mode on a project that already has a single-node journal
// would otherwise leave migration history non-replicated (and the ON CLUSTER
// schema-upgrade ALTERs could fail on replicas that lack the table). Fail fast
// with actionable guidance instead of silently diverging.
async function assertClusterJournalReplicated(): Promise<void> {
const rows = await db.query<{ engine: string }>(
`SELECT engine FROM system.tables WHERE database = currentDatabase() AND name = '${journalTable}'`,
)
const engine = rows[0]?.engine
if (engine && !engine.startsWith('Replicated') && !engine.startsWith('Shared')) {
throw new Error(
`Journal table "${journalTable}" already exists with a non-replicated engine (${engine}), but ` +
`clickhouse.cluster is set. A non-replicated journal is unsafe on a cluster. Drop it ` +
`(DROP TABLE ${journalTable} ON CLUSTER '${cluster}' SYNC) or set CHKIT_JOURNAL_TABLE to a new name, then re-run.`,
)
}
}

async function ensureTable(): Promise<void> {
if (bootstrapped) return
if (cluster) await assertClusterJournalReplicated()
debug('journal', `probing journal table "${journalTable}"`)
try {
await db.query(`SELECT name FROM ${journalTable} LIMIT 0`)
Expand Down Expand Up @@ -205,10 +233,10 @@ SETTINGS index_granularity = 1`
// of the new chkit. ALTER ADD COLUMN IF NOT EXISTS is a metadata op,
// no data rewrite.
await db.command(
`ALTER TABLE ${journalTable} ADD COLUMN IF NOT EXISTS migration_completed Bool DEFAULT true`,
`ALTER TABLE ${journalTable}${onCluster} ADD COLUMN IF NOT EXISTS migration_completed Bool DEFAULT true`,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Reject existing local journals in cluster mode

When clickhouse.cluster is added to a project that already has the old local _chkit_migrations table, this upgrade path only runs ALTER ... ON CLUSTER and never creates or converts the table to the new replicated engine. That leaves the journal non-replicated (or the distributed ALTER fails on replicas where the old table is absent), so status/migrate through another cluster node can miss applied migrations and replay them; please detect a pre-existing non-ReplicatedReplacingMergeTree journal and fail with migration instructions or migrate it before marking the store bootstrapped.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 4554f90. In cluster mode, ensureTable now checks the existing journal's engine first and fails fast with migration guidance (drop ON CLUSTER, or set CHKIT_JOURNAL_TABLE) if it isn't a replicated engine — before any ON CLUSTER ALTERs run. Added a cluster e2e case covering a pre-existing non-replicated journal.

)
await db.command(
`ALTER TABLE ${journalTable} ADD COLUMN IF NOT EXISTS operations ${OPERATIONS_TUPLE_TYPE} DEFAULT []`,
`ALTER TABLE ${journalTable}${onCluster} ADD COLUMN IF NOT EXISTS operations ${OPERATIONS_TUPLE_TYPE} DEFAULT []`,
)
}

Expand Down
204 changes: 204 additions & 0 deletions packages/cli/test/cluster.e2e.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
import { describe, expect, test } from 'bun:test'
import { mkdtemp, readdir, readFile, writeFile } from 'node:fs/promises'
import { tmpdir } from 'node:os'
import { join } from 'node:path'

import {
CORE_ENTRY,
createJournalTableName,
createLiveExecutor,
createPrefix,
formatTestDiagnostic,
runCli,
waitForColumn,
waitForTable,
} from '../src/test/e2e-testkit.js'

// Cluster-mode e2e against a local replicated ClickHouse cluster
// (test/cluster/docker-compose.yml: 1 shard, 2 replicas, cluster `test_cluster`).
//
// Defaults match that setup; override via env for a different cluster. This file
// lives OUTSIDE src/, so the default `bun test src` suite (the managed-ObsessionDB
// CI job) never collects it. Run explicitly via `bun run test:cluster` after
// `bun run cluster:up`. It hard-fails (never skips) if the cluster is unreachable.
const NODE1 = process.env.CLICKHOUSE_URL ?? 'http://localhost:8123'
const NODE2 = process.env.CHKIT_CLUSTER_E2E_URL2 ?? 'http://localhost:8124'
const USER = process.env.CLICKHOUSE_USER ?? 'default'
const PASSWORD = process.env.CLICKHOUSE_PASSWORD ?? 'clusterpass'
const CLUSTER = process.env.CHKIT_CLUSTER ?? 'test_cluster'
const DATABASE = process.env.CLICKHOUSE_DB ?? 'default'

function executorFor(url: string) {
return createLiveExecutor({
clickhouseUrl: url,
clickhouseUser: USER,
clickhousePassword: PASSWORD,
clickhouseDatabase: DATABASE,
})
}

function renderSchema(tableName: string, withLabel: boolean): string {
const columns = [
" { name: 'id', type: 'UInt64' }",
" { name: 'ts', type: 'DateTime' }",
...(withLabel ? [" { name: 'label', type: 'String' }"] : []),
].join(',\n')
return `import { schema, table } from '${CORE_ENTRY}'

export default schema(
table({
database: '${DATABASE}',
name: '${tableName}',
engine: 'ReplicatedMergeTree',
columns: [
${columns},
],
primaryKey: ['id'],
orderBy: ['id'],
}),
)
`
}

async function scaffold(tableName: string): Promise<{ dir: string; configPath: string; schemaPath: string; migrationsDir: string }> {
const dir = await mkdtemp(join(tmpdir(), 'chkit-cluster-e2e-'))
const schemaPath = join(dir, 'schema.ts')
const configPath = join(dir, 'clickhouse.config.ts')
const migrationsDir = join(dir, 'chkit', 'migrations')
await writeFile(schemaPath, renderSchema(tableName, false), 'utf8')
await writeFile(
configPath,
`export default {
schema: '${schemaPath}',
outDir: '${join(dir, 'chkit')}',
migrationsDir: '${migrationsDir}',
metaDir: '${join(dir, 'chkit', 'meta')}',
clickhouse: {
url: '${NODE1}',
username: '${USER}',
password: '${PASSWORD}',
database: '${DATABASE}',
cluster: '${CLUSTER}',
},
}
`,
'utf8',
)
return { dir, configPath, schemaPath, migrationsDir }
}

async function latestMigrationSql(migrationsDir: string): Promise<string> {
const files = (await readdir(migrationsDir)).filter((f) => f.endsWith('.sql')).sort()
const last = files.at(-1)
expect(last).toBeTruthy()
return readFile(join(migrationsDir, last as string), 'utf8')
}

describe('chkit cluster mode (ON CLUSTER) e2e', () => {
const node1 = executorFor(NODE1)
const node2 = executorFor(NODE2)

test('fans DDL across replicas, replicates the journal, and is idempotent', async () => {
const prefix = createPrefix('cluster')
const tableName = `${prefix}events`
const journalTable = createJournalTableName('cluster')
const cliEnv = { CHKIT_JOURNAL_TABLE: journalTable }
const { dir, configPath, schemaPath, migrationsDir } = await scaffold(tableName)

// generate: ON CLUSTER is baked into the migration file (CREATE TABLE + DB).
const generate = runCli(dir, ['generate', '--config', configPath, '--name', 'init', '--json'], cliEnv)
expect(generate.exitCode, formatTestDiagnostic('generate', generate)).toBe(0)
const createSql = await latestMigrationSql(migrationsDir)
expect(createSql).toContain(`CREATE TABLE IF NOT EXISTS ${DATABASE}.${tableName} ON CLUSTER '${CLUSTER}'`)
expect(createSql).toContain(`ENGINE = ReplicatedMergeTree()`)

// migrate: applies ON CLUSTER DDL.
const migrate = runCli(dir, ['migrate', '--config', configPath, '--execute', '--json'], cliEnv)
expect(migrate.exitCode, formatTestDiagnostic('migrate', migrate)).toBe(0)
const applied = (JSON.parse(migrate.stdout) as { applied: Array<{ name: string }> }).applied
expect(applied.length).toBe(1)
const migrationName = applied[0]?.name as string

// table exists on BOTH replicas.
await waitForTable(node1, DATABASE, tableName)
await waitForTable(node2, DATABASE, tableName)

// journal is replicated on BOTH replicas.
for (const node of [node1, node2]) {
await waitForTable(node, DATABASE, journalTable)
const rows = await node.query<{ engine: string }>(
`SELECT engine FROM system.tables WHERE database = '${DATABASE}' AND name = '${journalTable}'`,
)
expect(rows[0]?.engine).toBe('ReplicatedReplacingMergeTree')
}

// the migration row written on node1 is visible on node2 (replicated journal).
await node2.command(`SYSTEM SYNC REPLICA ${DATABASE}.\`${journalTable}\``)
const journalRows = await node2.query<{ name: string }>(
`SELECT name FROM ${DATABASE}.\`${journalTable}\` FINAL WHERE migration_completed = true`,
)
expect(journalRows.map((r) => r.name)).toContain(migrationName)

// idempotent: re-running migrate applies nothing.
const rerun = runCli(dir, ['migrate', '--config', configPath, '--execute', '--json'], cliEnv)
expect(rerun.exitCode, formatTestDiagnostic('migrate rerun', rerun)).toBe(0)
expect((JSON.parse(rerun.stdout) as { applied: unknown[] }).applied.length).toBe(0)

// ALTER fans out too: add a column, generate + migrate, verify on BOTH replicas.
await writeFile(schemaPath, renderSchema(tableName, true), 'utf8')
const generateAlter = runCli(dir, ['generate', '--config', configPath, '--name', 'add_label', '--json'], cliEnv)
expect(generateAlter.exitCode, formatTestDiagnostic('generate alter', generateAlter)).toBe(0)
const alterSql = await latestMigrationSql(migrationsDir)
expect(alterSql).toContain(`ALTER TABLE ${DATABASE}.${tableName} ON CLUSTER '${CLUSTER}' ADD COLUMN IF NOT EXISTS \`label\``)

const migrateAlter = runCli(dir, ['migrate', '--config', configPath, '--execute', '--json'], cliEnv)
expect(migrateAlter.exitCode, formatTestDiagnostic('migrate alter', migrateAlter)).toBe(0)
await waitForColumn(node1, DATABASE, tableName, 'label')
await waitForColumn(node2, DATABASE, tableName, 'label')

// RENAME fans out, with ON CLUSTER at the END of the statement (ClickHouse
// places it after the `name TO new_name` list, not after the source name).
const renamedTable = `${tableName}_renamed`
await writeFile(schemaPath, renderSchema(renamedTable, true), 'utf8')
const generateRename = runCli(
dir,
['generate', '--config', configPath, '--name', 'rename', '--rename-table', `${DATABASE}.${tableName}=${DATABASE}.${renamedTable}`, '--json'],
cliEnv,
)
expect(generateRename.exitCode, formatTestDiagnostic('generate rename', generateRename)).toBe(0)
const renameSql = await latestMigrationSql(migrationsDir)
expect(renameSql).toContain(
`RENAME TABLE IF EXISTS ${DATABASE}.${tableName} TO ${DATABASE}.${renamedTable} ON CLUSTER '${CLUSTER}';`,
)
const migrateRename = runCli(dir, ['migrate', '--config', configPath, '--execute', '--json'], cliEnv)
expect(migrateRename.exitCode, formatTestDiagnostic('migrate rename', migrateRename)).toBe(0)
await waitForTable(node1, DATABASE, renamedTable)
await waitForTable(node2, DATABASE, renamedTable)

// cleanup (also exercises DROP ... ON CLUSTER).
await node1.command(`DROP TABLE IF EXISTS ${DATABASE}.${renamedTable} ON CLUSTER '${CLUSTER}' SYNC`)
await node1.command(`DROP TABLE IF EXISTS ${DATABASE}.\`${journalTable}\` ON CLUSTER '${CLUSTER}' SYNC`)
}, 60_000)

test('rejects a pre-existing non-replicated journal when cluster mode is on', async () => {
const journalTable = createJournalTableName('cluster_p2')
const cliEnv = { CHKIT_JOURNAL_TABLE: journalTable }
const tableName = `${createPrefix('cluster_p2')}events`
const { dir, configPath } = await scaffold(tableName)

// Simulate a project that ran chkit single-node before enabling cluster mode:
// a plain (non-replicated) journal already exists.
await node1.command(
`CREATE TABLE ${DATABASE}.\`${journalTable}\` (name String, applied_at DateTime64(3, 'UTC'), checksum String, chkit_version String) ENGINE = ReplacingMergeTree(applied_at) ORDER BY name`,
)

const generate = runCli(dir, ['generate', '--config', configPath, '--name', 'init', '--json'], cliEnv)
expect(generate.exitCode, formatTestDiagnostic('generate', generate)).toBe(0)

const status = runCli(dir, ['status', '--config', configPath], cliEnv)
expect(status.exitCode).not.toBe(0)
expect(`${status.stdout}${status.stderr}`).toContain('non-replicated engine')

await node1.command(`DROP TABLE IF EXISTS ${DATABASE}.\`${journalTable}\` ON CLUSTER '${CLUSTER}' SYNC`)
}, 60_000)
})
1 change: 1 addition & 0 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export { createSnapshot } from './snapshot.js'
export { splitTopLevelComma } from './key-clause.js'
export { normalizeEngine, normalizeSQLFragment } from './sql-normalizer.js'
export { toCreateSQL } from './sql.js'
export { applyOnClusterToPlan, onClusterClause } from './on-cluster.js'
export {
canonicalizeCodec,
codec,
Expand Down
Loading
Loading