feat(connectors): Clickhouse Sink Connector#2886
feat(connectors): Clickhouse Sink Connector#2886kriti-sc wants to merge 23 commits intoapache:masterfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #2886 +/- ##
=============================================
- Coverage 74.48% 52.09% -22.39%
Complexity 943 943
=============================================
Files 1188 1192 +4
Lines 106530 94673 -11857
Branches 83560 71721 -11839
=============================================
- Hits 79350 49324 -30026
- Misses 24433 42636 +18203
+ Partials 2747 2713 -34
🚀 New features to boost your workflow:
|
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
hubcio
left a comment
There was a problem hiding this comment.
overall good direction, just needs a little bit polishing
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
|
Well, the Clickhouse native client situation is kinda sad, we should definitely create a tracking issue for that + TCP support. I don't see a reason to hold this PR anymore, I think we should merge it as it is. |
|
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 7 days if no further activity occurs. If you need a review, please ensure CI is green and the PR is rebased on the latest master. Don't hesitate to ping the maintainers - either @core on Discord or by mentioning them directly here on the PR. Thank you for your contribution! |
| InsertFormat::StringPassthrough => build_string_body(&messages, self.string_format), | ||
| }; | ||
|
|
||
| if body.is_empty() { |
There was a problem hiding this comment.
silent data loss path. body.rs builders skip wrong-type / serialize-failed messages with warn! (lines 46-52, 71-76, 99-104), then return an empty buffer. here consume() returns Ok(()) on empty body. runtime uses AutoCommit::When(AutoCommitWhen::PollingMessages) and the FFI shim throws away the i32 return code anyway -- offsets advance regardless of skip.
any batch where every message has the wrong payload type is permanently lost without surfacing an error. either count skips and return Err, route them to a DLQ, or fail the batch loud.
There was a problem hiding this comment.
Err does not makes sense here as that would cause the batch to be retried. We don't want that because if messages are unserializable, a retry will not solve that.
Changed the warn!s to error!s.
lmk @hubcio
| escape_backtick(&self.database), | ||
| escape_backtick(table), | ||
| format, | ||
| ); |
There was a problem hiding this comment.
non-idempotent INSERT with no insert_deduplication_token. retry loop at lines 219-252 resends the same body on transient network/HTTP failures. if the server applied the batch and the response was lost, the retry produces duplicate rows.
pass a deterministic token derived from (partition_id, first_offset, last_offset) as a query setting; works on ReplicatedMergeTree by default and on plain MergeTree with non_replicated_deduplication_window set. otherwise document at-least-once semantics loudly.
| } | ||
|
|
||
| fn escape_single_quote(s: &str) -> String { | ||
| s.replace('\'', "\\'") |
There was a problem hiding this comment.
sql injection vector. escape_single_quote only handles '; backslash passes through unchanged. clickhouse string-literal grammar requires escaping both ' and \ (per https://clickhouse.com/docs/sql-reference/syntax). adversarial config like database = "x\" followed by table = "y' OR 1=1 --" produces a literal that closes early because the trailing \ swallows the next '.
fix: validate identifiers with a strict allowlist (^[A-Za-z_][A-Za-z0-9_]*$) at config parse, and escape both ' and \ in any literal that gets concatenated into SQL.
| } | ||
|
|
||
| fn escape_backtick(s: &str) -> String { | ||
| s.replace('`', "\\`") |
There was a problem hiding this comment.
same backslash issue as escape_single_quote. clickhouse's lexer accepts both doubled-backtick and \``-escape inside backtick-quoted identifiers, so the produced SQL parses, but the unescaped backslash means an attacker-controlled identifier with a trailing ` can break out of the identifier just like the literal case.
| for (k, v) in obj { | ||
| // Map keys must be serialisable as the key type. JSON object | ||
| // keys are always strings, so we wrap them in OwnedValue::String. | ||
| let key_val = OwnedValue::String(k.clone()); |
There was a problem hiding this comment.
OwnedValue::String(k.clone()) per map entry per row, then full serialize_value recursion. when key_type == ChType::String (the common case for Map(String, _)), fast-path to write_string(k.as_bytes(), buf) directly.
| error!("Cannot decode UUID hex: {s}"); | ||
| Error::InvalidRecord | ||
| })?; | ||
| let bytes = hex::decode(hex_str).map_err(|_| { |
There was a problem hiding this comment.
uuid path allocates a Vec<u8> via hex::decode for every uuid cell. four u64::from_str_radix calls on the hex segments + to_le_bytes() is zero-alloc and avoids the inline mod hex reimplementation at lines 1215-1238.
| (s, "00:00:00") | ||
| }; | ||
|
|
||
| let date_parts: Vec<&str> = date_part.splitn(3, '-').collect(); |
There was a problem hiding this comment.
splitn(...).collect::<Vec<&str>>() per call (also at lines 491, 583, 605). per-row hot path when timestamps arrive as iso strings.
use iterator destructuring (let mut it = s.split('-'); let y = it.next()...) -- zero alloc.
| return Ok(()); | ||
| } | ||
|
|
||
| let query = format!( |
There was a problem hiding this comment.
format! rebuilds the INSERT query and the URL on every batch. both depend only on (database, table, format) which are fixed after open().
precompute in ClickHouseClient::new (or via OnceLock keyed on (table, format) if the format is allowed to change per call). saves two small allocations per batch.
| /// Build a newline-delimited JSON body for `FORMAT JSONEachRow`. | ||
| /// Each `Payload::Json` message becomes one line. Other payload types are skipped. | ||
| pub(crate) fn build_json_body(messages: &[ConsumedMessage]) -> Vec<u8> { | ||
| let mut buf = Vec::with_capacity(messages.len() * 64); |
There was a problem hiding this comment.
fresh Vec<u8> allocated and dropped per batch (also at lines 64, 89). at default batch_length = 1000 and poll_interval = 5ms this is meaningful allocator pressure.
reuse a sink-owned buffer. note: consume(&self) may run concurrently for multi-topic configs, so the buffer needs Mutex<BytesMut> or per-task allocation -- check the runtime contract first.
Which issue does this PR close?
Closes #2539
Rationale
Clickhouse is a real-time data analytics engine, and very popular in modern analytics architectures.
What changed?
This PR introduces a Clickhouse Sink Connector that enables writing data from Iggy to Clickhouse.
The Clickhouse writing logic is heavily inspired by the official Clickhouse Kafka Connector.
Local Execution
Images 1&2: Produced 30456 + 29060 rows into Iggy in two batches
Image 3: Verified schema and number of rows in Clickhouse
AI Usage