-
Notifications
You must be signed in to change notification settings - Fork 0
feat: ClickHouse cluster (ON CLUSTER) support #168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
KeKs0r
wants to merge
2
commits into
main
Choose a base branch
from
cluster-support
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| --- | ||
| "chkit": patch | ||
| "@chkit/core": patch | ||
| --- | ||
|
|
||
| Add ClickHouse cluster support. Set `clickhouse.cluster` in your config to run all generated DDL `ON CLUSTER <name>` and store the migration journal in a replicated engine — for self-managed multi-node clusters. Your table engines are passed through unchanged (declare `ReplicatedMergeTree` yourself). Leave `cluster` unset for single-node, ClickHouse Cloud, or ObsessionDB, where replication is automatic and `ON CLUSTER` is unnecessary. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,204 @@ | ||
| import { describe, expect, test } from 'bun:test' | ||
| import { mkdtemp, readdir, readFile, writeFile } from 'node:fs/promises' | ||
| import { tmpdir } from 'node:os' | ||
| import { join } from 'node:path' | ||
|
|
||
| import { | ||
| CORE_ENTRY, | ||
| createJournalTableName, | ||
| createLiveExecutor, | ||
| createPrefix, | ||
| formatTestDiagnostic, | ||
| runCli, | ||
| waitForColumn, | ||
| waitForTable, | ||
| } from '../src/test/e2e-testkit.js' | ||
|
|
||
| // Cluster-mode e2e against a local replicated ClickHouse cluster | ||
| // (test/cluster/docker-compose.yml: 1 shard, 2 replicas, cluster `test_cluster`). | ||
| // | ||
| // Defaults match that setup; override via env for a different cluster. This file | ||
| // lives OUTSIDE src/, so the default `bun test src` suite (the managed-ObsessionDB | ||
| // CI job) never collects it. Run explicitly via `bun run test:cluster` after | ||
| // `bun run cluster:up`. It hard-fails (never skips) if the cluster is unreachable. | ||
| const NODE1 = process.env.CLICKHOUSE_URL ?? 'http://localhost:8123' | ||
| const NODE2 = process.env.CHKIT_CLUSTER_E2E_URL2 ?? 'http://localhost:8124' | ||
| const USER = process.env.CLICKHOUSE_USER ?? 'default' | ||
| const PASSWORD = process.env.CLICKHOUSE_PASSWORD ?? 'clusterpass' | ||
| const CLUSTER = process.env.CHKIT_CLUSTER ?? 'test_cluster' | ||
| const DATABASE = process.env.CLICKHOUSE_DB ?? 'default' | ||
|
|
||
| function executorFor(url: string) { | ||
| return createLiveExecutor({ | ||
| clickhouseUrl: url, | ||
| clickhouseUser: USER, | ||
| clickhousePassword: PASSWORD, | ||
| clickhouseDatabase: DATABASE, | ||
| }) | ||
| } | ||
|
|
||
| function renderSchema(tableName: string, withLabel: boolean): string { | ||
| const columns = [ | ||
| " { name: 'id', type: 'UInt64' }", | ||
| " { name: 'ts', type: 'DateTime' }", | ||
| ...(withLabel ? [" { name: 'label', type: 'String' }"] : []), | ||
| ].join(',\n') | ||
| return `import { schema, table } from '${CORE_ENTRY}' | ||
|
|
||
| export default schema( | ||
| table({ | ||
| database: '${DATABASE}', | ||
| name: '${tableName}', | ||
| engine: 'ReplicatedMergeTree', | ||
| columns: [ | ||
| ${columns}, | ||
| ], | ||
| primaryKey: ['id'], | ||
| orderBy: ['id'], | ||
| }), | ||
| ) | ||
| ` | ||
| } | ||
|
|
||
| async function scaffold(tableName: string): Promise<{ dir: string; configPath: string; schemaPath: string; migrationsDir: string }> { | ||
| const dir = await mkdtemp(join(tmpdir(), 'chkit-cluster-e2e-')) | ||
| const schemaPath = join(dir, 'schema.ts') | ||
| const configPath = join(dir, 'clickhouse.config.ts') | ||
| const migrationsDir = join(dir, 'chkit', 'migrations') | ||
| await writeFile(schemaPath, renderSchema(tableName, false), 'utf8') | ||
| await writeFile( | ||
| configPath, | ||
| `export default { | ||
| schema: '${schemaPath}', | ||
| outDir: '${join(dir, 'chkit')}', | ||
| migrationsDir: '${migrationsDir}', | ||
| metaDir: '${join(dir, 'chkit', 'meta')}', | ||
| clickhouse: { | ||
| url: '${NODE1}', | ||
| username: '${USER}', | ||
| password: '${PASSWORD}', | ||
| database: '${DATABASE}', | ||
| cluster: '${CLUSTER}', | ||
| }, | ||
| } | ||
| `, | ||
| 'utf8', | ||
| ) | ||
| return { dir, configPath, schemaPath, migrationsDir } | ||
| } | ||
|
|
||
| async function latestMigrationSql(migrationsDir: string): Promise<string> { | ||
| const files = (await readdir(migrationsDir)).filter((f) => f.endsWith('.sql')).sort() | ||
| const last = files.at(-1) | ||
| expect(last).toBeTruthy() | ||
| return readFile(join(migrationsDir, last as string), 'utf8') | ||
| } | ||
|
|
||
| describe('chkit cluster mode (ON CLUSTER) e2e', () => { | ||
| const node1 = executorFor(NODE1) | ||
| const node2 = executorFor(NODE2) | ||
|
|
||
| test('fans DDL across replicas, replicates the journal, and is idempotent', async () => { | ||
| const prefix = createPrefix('cluster') | ||
| const tableName = `${prefix}events` | ||
| const journalTable = createJournalTableName('cluster') | ||
| const cliEnv = { CHKIT_JOURNAL_TABLE: journalTable } | ||
| const { dir, configPath, schemaPath, migrationsDir } = await scaffold(tableName) | ||
|
|
||
| // generate: ON CLUSTER is baked into the migration file (CREATE TABLE + DB). | ||
| const generate = runCli(dir, ['generate', '--config', configPath, '--name', 'init', '--json'], cliEnv) | ||
| expect(generate.exitCode, formatTestDiagnostic('generate', generate)).toBe(0) | ||
| const createSql = await latestMigrationSql(migrationsDir) | ||
| expect(createSql).toContain(`CREATE TABLE IF NOT EXISTS ${DATABASE}.${tableName} ON CLUSTER '${CLUSTER}'`) | ||
| expect(createSql).toContain(`ENGINE = ReplicatedMergeTree()`) | ||
|
|
||
| // migrate: applies ON CLUSTER DDL. | ||
| const migrate = runCli(dir, ['migrate', '--config', configPath, '--execute', '--json'], cliEnv) | ||
| expect(migrate.exitCode, formatTestDiagnostic('migrate', migrate)).toBe(0) | ||
| const applied = (JSON.parse(migrate.stdout) as { applied: Array<{ name: string }> }).applied | ||
| expect(applied.length).toBe(1) | ||
| const migrationName = applied[0]?.name as string | ||
|
|
||
| // table exists on BOTH replicas. | ||
| await waitForTable(node1, DATABASE, tableName) | ||
| await waitForTable(node2, DATABASE, tableName) | ||
|
|
||
| // journal is replicated on BOTH replicas. | ||
| for (const node of [node1, node2]) { | ||
| await waitForTable(node, DATABASE, journalTable) | ||
| const rows = await node.query<{ engine: string }>( | ||
| `SELECT engine FROM system.tables WHERE database = '${DATABASE}' AND name = '${journalTable}'`, | ||
| ) | ||
| expect(rows[0]?.engine).toBe('ReplicatedReplacingMergeTree') | ||
| } | ||
|
|
||
| // the migration row written on node1 is visible on node2 (replicated journal). | ||
| await node2.command(`SYSTEM SYNC REPLICA ${DATABASE}.\`${journalTable}\``) | ||
| const journalRows = await node2.query<{ name: string }>( | ||
| `SELECT name FROM ${DATABASE}.\`${journalTable}\` FINAL WHERE migration_completed = true`, | ||
| ) | ||
| expect(journalRows.map((r) => r.name)).toContain(migrationName) | ||
|
|
||
| // idempotent: re-running migrate applies nothing. | ||
| const rerun = runCli(dir, ['migrate', '--config', configPath, '--execute', '--json'], cliEnv) | ||
| expect(rerun.exitCode, formatTestDiagnostic('migrate rerun', rerun)).toBe(0) | ||
| expect((JSON.parse(rerun.stdout) as { applied: unknown[] }).applied.length).toBe(0) | ||
|
|
||
| // ALTER fans out too: add a column, generate + migrate, verify on BOTH replicas. | ||
| await writeFile(schemaPath, renderSchema(tableName, true), 'utf8') | ||
| const generateAlter = runCli(dir, ['generate', '--config', configPath, '--name', 'add_label', '--json'], cliEnv) | ||
| expect(generateAlter.exitCode, formatTestDiagnostic('generate alter', generateAlter)).toBe(0) | ||
| const alterSql = await latestMigrationSql(migrationsDir) | ||
| expect(alterSql).toContain(`ALTER TABLE ${DATABASE}.${tableName} ON CLUSTER '${CLUSTER}' ADD COLUMN IF NOT EXISTS \`label\``) | ||
|
|
||
| const migrateAlter = runCli(dir, ['migrate', '--config', configPath, '--execute', '--json'], cliEnv) | ||
| expect(migrateAlter.exitCode, formatTestDiagnostic('migrate alter', migrateAlter)).toBe(0) | ||
| await waitForColumn(node1, DATABASE, tableName, 'label') | ||
| await waitForColumn(node2, DATABASE, tableName, 'label') | ||
|
|
||
| // RENAME fans out, with ON CLUSTER at the END of the statement (ClickHouse | ||
| // places it after the `name TO new_name` list, not after the source name). | ||
| const renamedTable = `${tableName}_renamed` | ||
| await writeFile(schemaPath, renderSchema(renamedTable, true), 'utf8') | ||
| const generateRename = runCli( | ||
| dir, | ||
| ['generate', '--config', configPath, '--name', 'rename', '--rename-table', `${DATABASE}.${tableName}=${DATABASE}.${renamedTable}`, '--json'], | ||
| cliEnv, | ||
| ) | ||
| expect(generateRename.exitCode, formatTestDiagnostic('generate rename', generateRename)).toBe(0) | ||
| const renameSql = await latestMigrationSql(migrationsDir) | ||
| expect(renameSql).toContain( | ||
| `RENAME TABLE IF EXISTS ${DATABASE}.${tableName} TO ${DATABASE}.${renamedTable} ON CLUSTER '${CLUSTER}';`, | ||
| ) | ||
| const migrateRename = runCli(dir, ['migrate', '--config', configPath, '--execute', '--json'], cliEnv) | ||
| expect(migrateRename.exitCode, formatTestDiagnostic('migrate rename', migrateRename)).toBe(0) | ||
| await waitForTable(node1, DATABASE, renamedTable) | ||
| await waitForTable(node2, DATABASE, renamedTable) | ||
|
|
||
| // cleanup (also exercises DROP ... ON CLUSTER). | ||
| await node1.command(`DROP TABLE IF EXISTS ${DATABASE}.${renamedTable} ON CLUSTER '${CLUSTER}' SYNC`) | ||
| await node1.command(`DROP TABLE IF EXISTS ${DATABASE}.\`${journalTable}\` ON CLUSTER '${CLUSTER}' SYNC`) | ||
| }, 60_000) | ||
|
|
||
| test('rejects a pre-existing non-replicated journal when cluster mode is on', async () => { | ||
| const journalTable = createJournalTableName('cluster_p2') | ||
| const cliEnv = { CHKIT_JOURNAL_TABLE: journalTable } | ||
| const tableName = `${createPrefix('cluster_p2')}events` | ||
| const { dir, configPath } = await scaffold(tableName) | ||
|
|
||
| // Simulate a project that ran chkit single-node before enabling cluster mode: | ||
| // a plain (non-replicated) journal already exists. | ||
| await node1.command( | ||
| `CREATE TABLE ${DATABASE}.\`${journalTable}\` (name String, applied_at DateTime64(3, 'UTC'), checksum String, chkit_version String) ENGINE = ReplacingMergeTree(applied_at) ORDER BY name`, | ||
| ) | ||
|
|
||
| const generate = runCli(dir, ['generate', '--config', configPath, '--name', 'init', '--json'], cliEnv) | ||
| expect(generate.exitCode, formatTestDiagnostic('generate', generate)).toBe(0) | ||
|
|
||
| const status = runCli(dir, ['status', '--config', configPath], cliEnv) | ||
| expect(status.exitCode).not.toBe(0) | ||
| expect(`${status.stdout}${status.stderr}`).toContain('non-replicated engine') | ||
|
|
||
| await node1.command(`DROP TABLE IF EXISTS ${DATABASE}.\`${journalTable}\` ON CLUSTER '${CLUSTER}' SYNC`) | ||
| }, 60_000) | ||
| }) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When
clickhouse.clusteris added to a project that already has the old local_chkit_migrationstable, this upgrade path only runsALTER ... ON CLUSTERand never creates or converts the table to the new replicated engine. That leaves the journal non-replicated (or the distributed ALTER fails on replicas where the old table is absent), sostatus/migratethrough another cluster node can miss applied migrations and replay them; please detect a pre-existing non-ReplicatedReplacingMergeTreejournal and fail with migration instructions or migrate it before marking the store bootstrapped.Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in 4554f90. In cluster mode,
ensureTablenow checks the existing journal's engine first and fails fast with migration guidance (drop ON CLUSTER, or setCHKIT_JOURNAL_TABLE) if it isn't a replicated engine — before any ON CLUSTER ALTERs run. Added a cluster e2e case covering a pre-existing non-replicated journal.