Skip to content

feat(connectors): Clickhouse Sink Connector#2886

Open
kriti-sc wants to merge 23 commits intoapache:masterfrom
kriti-sc:clickhouse-sink
Open

feat(connectors): Clickhouse Sink Connector#2886
kriti-sc wants to merge 23 commits intoapache:masterfrom
kriti-sc:clickhouse-sink

Conversation

@kriti-sc
Copy link
Copy Markdown
Contributor

@kriti-sc kriti-sc commented Mar 6, 2026

Which issue does this PR close?

Closes #2539

Rationale

Clickhouse is a real-time data analytics engine, and very popular in modern analytics architectures.

What changed?

This PR introduces a Clickhouse Sink Connector that enables writing data from Iggy to Clickhouse.

The Clickhouse writing logic is heavily inspired by the official Clickhouse Kafka Connector.

Local Execution

  • Produced messages 30456 + 29060 rows with schema user_id: String, user_type: u8, email: String, source: String, state: String, created_at: DateTime, message: String using sample data producer.
  • Consumed messages using the Clickhouse sink and into the particular Clickhouse table.
  • Verified schema and number of rows in Clickhouse.
  • Added unit tests and e2e tests, both passing.

Images 1&2: Produced 30456 + 29060 rows into Iggy in two batches
Image 3: Verified schema and number of rows in Clickhouse

image image image

AI Usage

  1. Which tools? (e.g., GitHub Copilot, Claude, ChatGPT) Claude Code
  2. Scope of usage? (e.g., autocomplete, generated functions, entire implementation) generated functions
  3. How did you verify the generated code works correctly? Manual testing by producing data into Iggy and then running the sink and verifying insertion into Clickhouse, unit tests and e2e tests for different Clickhouse insert configurations.
  4. Can you explain every line of the code if asked? Yes

@codecov
Copy link
Copy Markdown

codecov Bot commented Mar 6, 2026

Codecov Report

❌ Patch coverage is 80.57681% with 330 lines in your changes missing coverage. Please review.
✅ Project coverage is 52.09%. Comparing base (140f3c5) to head (3b6f137).

Files with missing lines Patch % Lines
...ore/connectors/sinks/clickhouse_sink/src/binary.rs 80.75% 106 Missing and 52 partials ⚠️
...ore/connectors/sinks/clickhouse_sink/src/schema.rs 76.60% 24 Missing and 71 partials ⚠️
...ore/connectors/sinks/clickhouse_sink/src/client.rs 63.12% 55 Missing and 11 partials ⚠️
core/connectors/sinks/clickhouse_sink/src/lib.rs 96.45% 3 Missing and 2 partials ⚠️
core/connectors/sinks/clickhouse_sink/src/sink.rs 60.00% 4 Missing ⚠️
core/connectors/sinks/clickhouse_sink/src/body.rs 98.59% 1 Missing and 1 partial ⚠️
Additional details and impacted files
@@              Coverage Diff              @@
##             master    #2886       +/-   ##
=============================================
- Coverage     74.48%   52.09%   -22.39%     
  Complexity      943      943               
=============================================
  Files          1188     1192        +4     
  Lines        106530    94673    -11857     
  Branches      83560    71721    -11839     
=============================================
- Hits          79350    49324    -30026     
- Misses        24433    42636    +18203     
+ Partials       2747     2713       -34     
Components Coverage Δ
Rust Core 46.04% <80.57%> (-29.70%) ⬇️
Java SDK 60.14% <ø> (ø)
C# SDK 69.07% <ø> (-0.31%) ⬇️
Python SDK 81.43% <ø> (ø)
Node SDK 91.41% <ø> (-0.13%) ⬇️
Go SDK 39.80% <ø> (ø)
Files with missing lines Coverage Δ
core/connectors/sinks/clickhouse_sink/src/body.rs 98.59% <98.59%> (ø)
core/connectors/sinks/clickhouse_sink/src/sink.rs 60.00% <60.00%> (ø)
core/connectors/sinks/clickhouse_sink/src/lib.rs 96.45% <96.45%> (ø)
...ore/connectors/sinks/clickhouse_sink/src/client.rs 63.12% <63.12%> (ø)
...ore/connectors/sinks/clickhouse_sink/src/schema.rs 76.60% <76.60%> (ø)
...ore/connectors/sinks/clickhouse_sink/src/binary.rs 80.75% <80.75%> (ø)

... and 306 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

Comment thread core/connectors/sinks/clickhouse_sink/src/binary.rs
@kriti-sc kriti-sc requested a review from abonander March 7, 2026 10:58
Comment thread core/connectors/sinks/clickhouse_sink/src/binary.rs
@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs.

If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR.

Thank you for your contribution!

@github-actions github-actions Bot added stale Inactive issue or pull request and removed stale Inactive issue or pull request labels Mar 17, 2026
Copy link
Copy Markdown
Contributor

@hubcio hubcio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overall good direction, just needs a little bit polishing

Comment thread core/connectors/sinks/clickhouse_sink/src/binary.rs Outdated
Comment thread core/connectors/sinks/clickhouse_sink/src/binary.rs Outdated
Comment thread core/connectors/sinks/clickhouse_sink/src/schema.rs Outdated
Comment thread core/connectors/sinks/clickhouse_sink/src/binary.rs
Comment thread core/connectors/sinks/clickhouse_sink/src/binary.rs Outdated
Comment thread core/connectors/sinks/clickhouse_sink/src/binary.rs
Comment thread core/connectors/sinks/clickhouse_sink/src/client.rs Outdated
Comment thread core/connectors/sinks/clickhouse_sink/src/binary.rs Outdated
Comment thread core/connectors/sinks/clickhouse_sink/src/binary.rs Outdated
Comment thread core/connectors/sinks/clickhouse_sink/src/binary.rs Outdated
@github-actions
Copy link
Copy Markdown

github-actions Bot commented Apr 2, 2026

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs.

If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR.

Thank you for your contribution!

@github-actions github-actions Bot added stale Inactive issue or pull request and removed stale Inactive issue or pull request labels Apr 2, 2026
@kriti-sc kriti-sc requested a review from hubcio April 3, 2026 19:56
@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs.

If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR.

Thank you for your contribution!

@github-actions github-actions Bot added stale Inactive issue or pull request and removed stale Inactive issue or pull request labels Apr 18, 2026
Comment thread core/connectors/sinks/clickhouse_sink/src/binary.rs
numinnex
numinnex previously approved these changes Apr 22, 2026
@numinnex
Copy link
Copy Markdown
Contributor

Well, the Clickhouse native client situation is kinda sad, we should definitely create a tracking issue for that + TCP support.

I don't see a reason to hold this PR anymore, I think we should merge it as it is.

@github-actions
Copy link
Copy Markdown

This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs.

If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR.

Thank you for your contribution!

@github-actions github-actions Bot added stale Inactive issue or pull request and removed stale Inactive issue or pull request labels Apr 30, 2026
Comment thread core/connectors/sinks/clickhouse_sink/src/body.rs Outdated
InsertFormat::StringPassthrough => build_string_body(&messages, self.string_format),
};

if body.is_empty() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

silent data loss path. body.rs builders skip wrong-type / serialize-failed messages with warn! (lines 46-52, 71-76, 99-104), then return an empty buffer. here consume() returns Ok(()) on empty body. runtime uses AutoCommit::When(AutoCommitWhen::PollingMessages) and the FFI shim throws away the i32 return code anyway -- offsets advance regardless of skip.

any batch where every message has the wrong payload type is permanently lost without surfacing an error. either count skips and return Err, route them to a DLQ, or fail the batch loud.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Err does not makes sense here as that would cause the batch to be retried. We don't want that because if messages are unserializable, a retry will not solve that.

Changed the warn!s to error!s.

lmk @hubcio

escape_backtick(&self.database),
escape_backtick(table),
format,
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-idempotent INSERT with no insert_deduplication_token. retry loop at lines 219-252 resends the same body on transient network/HTTP failures. if the server applied the batch and the response was lost, the retry produces duplicate rows.

pass a deterministic token derived from (partition_id, first_offset, last_offset) as a query setting; works on ReplicatedMergeTree by default and on plain MergeTree with non_replicated_deduplication_window set. otherwise document at-least-once semantics loudly.

}

fn escape_single_quote(s: &str) -> String {
s.replace('\'', "\\'")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sql injection vector. escape_single_quote only handles '; backslash passes through unchanged. clickhouse string-literal grammar requires escaping both ' and \ (per https://clickhouse.com/docs/sql-reference/syntax). adversarial config like database = "x\" followed by table = "y' OR 1=1 --" produces a literal that closes early because the trailing \ swallows the next '.

fix: validate identifiers with a strict allowlist (^[A-Za-z_][A-Za-z0-9_]*$) at config parse, and escape both ' and \ in any literal that gets concatenated into SQL.

}

fn escape_backtick(s: &str) -> String {
s.replace('`', "\\`")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same backslash issue as escape_single_quote. clickhouse's lexer accepts both doubled-backtick and \``-escape inside backtick-quoted identifiers, so the produced SQL parses, but the unescaped backslash means an attacker-controlled identifier with a trailing ` can break out of the identifier just like the literal case.

for (k, v) in obj {
// Map keys must be serialisable as the key type. JSON object
// keys are always strings, so we wrap them in OwnedValue::String.
let key_val = OwnedValue::String(k.clone());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OwnedValue::String(k.clone()) per map entry per row, then full serialize_value recursion. when key_type == ChType::String (the common case for Map(String, _)), fast-path to write_string(k.as_bytes(), buf) directly.

error!("Cannot decode UUID hex: {s}");
Error::InvalidRecord
})?;
let bytes = hex::decode(hex_str).map_err(|_| {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uuid path allocates a Vec<u8> via hex::decode for every uuid cell. four u64::from_str_radix calls on the hex segments + to_le_bytes() is zero-alloc and avoids the inline mod hex reimplementation at lines 1215-1238.

(s, "00:00:00")
};

let date_parts: Vec<&str> = date_part.splitn(3, '-').collect();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

splitn(...).collect::<Vec<&str>>() per call (also at lines 491, 583, 605). per-row hot path when timestamps arrive as iso strings.

use iterator destructuring (let mut it = s.split('-'); let y = it.next()...) -- zero alloc.

return Ok(());
}

let query = format!(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

format! rebuilds the INSERT query and the URL on every batch. both depend only on (database, table, format) which are fixed after open().

precompute in ClickHouseClient::new (or via OnceLock keyed on (table, format) if the format is allowed to change per call). saves two small allocations per batch.

/// Build a newline-delimited JSON body for `FORMAT JSONEachRow`.
/// Each `Payload::Json` message becomes one line. Other payload types are skipped.
pub(crate) fn build_json_body(messages: &[ConsumedMessage]) -> Vec<u8> {
let mut buf = Vec::with_capacity(messages.len() * 64);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fresh Vec<u8> allocated and dropped per batch (also at lines 64, 89). at default batch_length = 1000 and poll_interval = 5ms this is meaningful allocator pressure.

reuse a sink-owned buffer. note: consume(&self) may run concurrently for multi-topic configs, so the buffer needs Mutex<BytesMut> or per-task allocation -- check the runtime contract first.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement ClickHouse Sink Connector

5 participants