Skip to content

feat(ilp): add QwpQueryClient for QWP egress over WebSocket#11

Open
bluestreak01 wants to merge 47 commits intomainfrom
vi_egress
Open

feat(ilp): add QwpQueryClient for QWP egress over WebSocket#11
bluestreak01 wants to merge 47 commits intomainfrom
vi_egress

Conversation

@bluestreak01
Copy link
Copy Markdown
Member

@bluestreak01 bluestreak01 commented Apr 18, 2026

Summary

  • Add QwpQueryClient, the client-side counterpart of the QWP egress endpoint that QuestDB's HTTP server now exposes at /read/v1. Sends a SQL query, receives result batches as binary WebSocket frames, decodes them into a column-major QwpColumnBatch view.
  • Connect-string entry point QwpQueryClient.fromConfig(\"ws::addr=localhost:9000;...\") mirrors the Sender.fromConfig shape used by the existing ingestion API. A programmatic builder (QwpQueryClient.newPlainText(...) with chained with* setters) covers the same surface for callers who prefer typed configuration.
  • Dedicated I/O thread reads + decodes ahead of the user thread; batches arrive via a bounded SPSC queue so decoding of batch N+1 overlaps with the user's processing of batch N.
  • Zero-allocation hot path: per-cell accessors read directly from native memory; STRING / VARCHAR / SYMBOL access returns a reusable DirectUtf8Sequence view into the underlying WebSocket payload buffer.
  • Feature surface:
    • Authentication: HTTP Basic (username / password), OIDC bearer (token), or a raw Authorization header (auth). Mutually exclusive; validated at parse time.
    • TLS: wss:: schema with tls_verify=on|unsafe_off, optional custom tls_roots / tls_roots_password trust store.
    • Multi-endpoint routing: addr=host:port,host:port,... with a target=any|primary|replica filter applied against the server's SERVER_INFO frame.
    • Failover: failover=on (default) transparently reconnects to the next target-matching endpoint on a transport-level failure and replays the query; handlers observe onFailoverReset(newNode) before the replayed batches. failover=off surfaces transport failures directly.
    • Bind parameters: execute(sql, QwpBindSetter, handler) encodes typed binds for every scalar wire type via QwpBindValues.
    • Compression: compression=raw|zstd|auto with server-clamped compression_level.
    • Client-driven flow control: withInitialCredit(bytes) advertises a byte budget; the client auto-replenishes per batch, the server parks when the budget runs out.
    • Query cancellation: thread-safe cancel() sends a CANCEL frame; the server aborts between batches and surfaces STATUS_CANCELLED.
  • 21 user-facing examples under examples/src/main/java/com/example/query/ covering basic reads, bind encoding, compression, credit flow, error handling, streaming, and typed-row aggregation.

API shape

try (QwpQueryClient client = QwpQueryClient.fromConfig(\"ws::addr=localhost:9000;\")) {
    client.connect();
    client.execute(\"SELECT ts, sym, price FROM trades LIMIT 1000\", new QwpColumnBatchHandler() {
        @Override public void onBatch(QwpColumnBatch batch) {
            for (int r = 0; r < batch.getRowCount(); r++) {
                long ts = batch.getLong(0, r);
                DirectUtf8Sequence sym = batch.getStrA(1, r);
                double px = batch.getDouble(2, r);
                // ... process row
            }
        }
        @Override public void onEnd(long totalRows) { ... }
        @Override public void onError(byte status, String message) { ... }
    });
}

QwpColumnBatch exposes both schema-agnostic accessors (getLong, getDouble, getString, isNull) and per-type raw-address APIs (valuesAddr, nonNullIndex, getLongValue, getIntValue) for callers writing the tightest possible inner loops.

Threading model

  • One I/O thread per QwpQueryClient, spawned on connect(), marked daemon.
  • I/O thread owns the WebSocketClient, the QwpResultBatchDecoder, and a small pool of QwpBatchBuffer instances (default 4, configurable via withBufferPoolSize).
  • Per-frame: take a buffer from the free pool, memcpy the WS payload into it, decode in place, push onto the events queue.
  • User thread drains events inside execute(), invokes the handler, releases the buffer back to the pool.
  • Back-pressure: when the buffer pool is exhausted (slow consumer), the I/O thread blocks on freeBuffers.take(), the WS recv buffer fills, and TCP flow control closes the server's send window.
  • close() sends Thread.interrupt() to the I/O thread, joins for up to 5 s, then frees the pool. If join times out, the daemon thread is left running and the buffer pool + WebSocket socket are leaked for the JVM lifetime rather than freed under an active reader; wasLastCloseTimedOut() surfaces the condition for callers.

Performance characteristics

  • After warmup the decode path allocates nothing on the JVM heap. Per-column native scratches (null bitmaps, dense values, string heaps, symbol-id arrays) grow to the maximum observed size and are reused across batches. Pooled DirectUtf8String views replace per-cell String and byte[] allocation.
  • Per-row dispatch in getLong still has a wire-type switch; callers in tight scan loops can use the type-specialised getLongValue / getIntValue accessors to skip it.
  • The per-batch memcpy from WS recv buffer into the buffer's owned native scratch is the cost of decoupling the I/O thread from the user thread. ~10-100 µs for typical batches; the alternative (zero-copy ringing of the WS buffer) is left to a future revision.

Limitations

  • One query at a time per connection; no client-side multiplexing of concurrent queries on a shared socket. Multi-query fair-scheduling is a Phase 2 item.
  • No QwpResultCursor row-iterator wrapper around the column-batch consumer API. The column-batch handler covers every functional case today; the cursor wrapper is on the Phase 2 backlog for callers who want per-row rather than per-batch iteration.

Test plan

  • Client-side unit / decoder-hardening tests pass: QwpBindEncoderTest, QwpColumnBatchViewsTest, QwpQueryClientFromConfigTest, QwpQueryClientPostConnectGuardTest, QwpResultBatchDecoderHardeningTest.
  • End-to-end behaviour is exercised by the parent QuestDB repository's QWP egress suite (bootstrap, fuzz, fragmentation, compression, credit, cancel, delta-dict, cache-reset, metrics, timestamp-Gorilla, symbol-edge-cases, types-exhaustive, bind round-trip, page-frame, DDL-exec) — see the matching parent PR.
  • Examples build cleanly under Java 11 target.

🤖 Generated with Claude Code

bluestreak01 and others added 30 commits April 18, 2026 18:00
Without enable_language(ASM) in CMakeLists, CMake silently drops the
.S file from the build tree. On x86-64 Linux and macOS the C side of
libzstd still references the symbols exported by the assembly file
(HUF_decompress4X1_usingDTable_internal_fast_asm_loop and its 4X2
sibling), so the final link fails with "Undefined symbols for
architecture x86_64" whenever the host matches ARCH_AMD64 AND NOT
WIN32. Linux x86-64 shows the same failure once the build actually
reaches the link step.

ARM64 and Windows are unaffected because they skip the .S file and
set ZSTD_DISABLE_ASM=1, routing everything through the C fallback.
The asmlib subdirectory's enable_language(ASM_NASM) is separate --
NASM is Intel-syntax and only handles Agner Fog's .asm files.
macOS clang pulls stdint.h in transitively via stdlib.h, so the build
was passing on darwin but failing on the manylinux2014 GCC that the
linux-x86-64 and linux-aarch64 workflow jobs use. Add the explicit
include so every supported toolchain compiles cleanly.
- Mark ioThread volatile in QwpQueryClient so cancel() from a thread
  other than the one that ran connect() observes the published reference,
  and a concurrent null-out from close() does not race.
- In QwpResultBatchDecoder, zero decompressScratchAddr and
  decompressScratchCapacity between free and malloc so a throwing malloc
  cannot leave a dangling address paired with a non-zero capacity. The
  next decode would otherwise skip the first-alloc branch and trigger a
  use-after-free.
- Wrap QwpResultBatchDecoderHardeningTest cases in assertMemoryLeak and
  close the decoder in finally to catch native-memory leaks in the
  decoder itself.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Auth helpers mirror the existing Sender conventions:
- withBasicAuth(user, pass): sets Authorization: Basic <base64>, so a
  Postgres user created via CREATE USER ... WITH PASSWORD ... can log
  in unchanged.
- withBearerToken(token): sets Authorization: Bearer <token> for OIDC
  access tokens.

TLS helpers route through the existing WebSocketClientFactory.newTlsInstance
path already used by QwpWebSocketSender:
- withTls(): TLS with full validation and the JVM default trust store.
- withInsecureTls(): TLS with certificate validation disabled (testing).
- withTrustStore(path, password): TLS with a custom PKCS12/JKS store.

fromConfig now accepts the wss:// schema and three additional keys that
match the ingress Sender's config-string grammar: tls_verify=on|unsafe_off,
tls_roots=<path>, tls_roots_password=<secret>. Auth and TLS keys are
mutually validated so inconsistent combinations fail at build time rather
than at connect.

No changes to the wire protocol: the Authorization header is sent on the
WebSocket upgrade request, which is the point at which the HTTP server's
existing auth chain runs. Server-side support was already in place; this
commit only exposes ergonomic client APIs for it.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
parseNullSection now skips the per-row "nonNullIdx[i] = i" array fill
when the column carries no nulls in this batch. Saves O(rowCount *
columnCount) trivial assignments per batch on the most common decode
path. For a 16K-row x 100-column wide result that's 1.6M skipped
iterations every batch.

QwpColumnLayout gains a denseIndex(row) helper that returns row
directly when nullBitmapAddr == 0, otherwise reads nonNullIdx[row].
All 13 typed accessors in QwpColumnBatch (getBool, getByteValue,
getCharValue, getDecimal128High/Low, getDouble, getFloat, getIntValue,
getLong, getLong256Word, getLongArray, getLongValue, getShortValue,
getUuidHi/Lo, lookupBinaryBytes, lookupStringBytes) now go through
denseIndex. parseArrayColumn and parseSymbolColumn hoist the
discriminator out of their per-row loops.

The public raw-API nonNullIndex(col) lazily materialises the identity
array on first call when the column has no nulls, so the existing
fragmentation / compression / credit-flow tests that consume the raw
API keep working with no per-call cost amortised across reuse.

Also drops QwpSpscQueue.clearAndWakeConsumer: zero callers in the
codebase and the implementation wrote the consumer-only tail field
from a context the doc explicitly described as the producer.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
bluestreak01 and others added 10 commits April 21, 2026 14:56
The previous merge (#12) removed TYPE_STRING from QwpConstants but left
references in the decoder hot paths and one example. Replace the
TYPE_STRING || TYPE_VARCHAR unions with TYPE_VARCHAR alone, drop the
TYPE_STRING entry from the symbol-vs-varchar dispatch in
getColumnValue/lookupStringBytes, and update the hardening test + the
TypedResultExample to advertise/consume TYPE_VARCHAR on the wire.

The module now compiles and all QWP decoder/table-buffer tests pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Transport- or protocol-level faults detected by the egress I/O thread
(server close, truncated/unknown frames, decode failures, send/recv
exceptions) now latch a sticky terminal state on the client. The next
QwpQueryClient.execute() short-circuits via handler.onError with the
stored status and message instead of dispatching another query to the
broken connection.

QwpEgressIoThread gains a TerminalFailureListener that the client wires
to a first-failure-wins CAS (AtomicReference). The emitTerminalError
helper calls the listener before offering the per-query QueryEvent so
the sender-thread latch is visible before the user's handler reacts.

Per-query QUERY_ERROR responses are explicitly NOT terminal: the
connection stays healthy and the next execute() runs normally. The
same applies to user-driven shutdown paths, which leave the latch
clear so close() followed by connect() on a new client starts fresh.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
# Conflicts:
#	core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressMsgKind.java
#	core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java
@bluestreak01 bluestreak01 changed the title feat(qwp): add QwpQueryClient for QWP egress over WebSocket feat(ilp): add QwpQueryClient for QWP egress over WebSocket Apr 24, 2026
bluestreak01 and others added 7 commits April 24, 2026 02:48
Adds 10 new unit / decoder-hardening tests and extends QwpQueryClientFromConfigTest
to lift the QWP client coverage on PR #11 from 46.54% to roughly 75% on the files
reachable without a live socket. Per the PR description's testing-policy split,
end-to-end behaviour (QwpEgressIoThread, WebSocketClient handshake, live
failover) stays in the parent QuestDB egress suite -- this commit only touches
the tier that can be exercised in isolation.

Per-file line coverage after this commit:

  QwpSpscQueue                   100%  (was   0%)
  QwpServerInfo                  100%  (was   0%)
  QwpRoleMismatchException       100%  (was   0%)
  QwpColumnBatchHandler          100%  (was   0%)
  QwpColumnLayout                100%  (was  50%)
  QueryEvent                     100%  (was  19%)
  QwpGorillaDecoder              100%  (was  11%)
  QwpServerInfoDecoder            97%  (was   0%)
  QwpBitReader                    96%  (was   4%)
  QwpResultBatchDecoder           87%  (was  40%)
  QwpQueryClient                  50%  (was  43%)

Test patterns mirror the existing QwpResultBatchDecoderHardeningTest /
QwpQueryClientPostConnectGuardTest style: native-buffer hardening with
TestUtils.assertMemoryLeak, reflection for package-private state injection,
CountDownLatch-driven SPSC concurrency tests. New per-type column-decoder
coverage exercises every wire type (LONG, INT, BYTE, DOUBLE, BOOLEAN, UUID,
DECIMAL128, VARCHAR, GEOHASH, SYMBOL, TIMESTAMP raw + Gorilla, LONG_ARRAY)
plus the FLAG_DELTA_SYMBOL_DICT, FLAG_GORILLA, CACHE_RESET, and
SCHEMA_MODE_REFERENCE state machines.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The happy-path branches in QwpQueryClientFromConfigTest were calling
QwpQueryClient.fromConfig(...) without closing the result, leaking the
client's bind-values native scratch on every test. Routes the parses
through a new assertParses() helper that wraps the call in
try-with-resources and unifies the assertNotNull check; the three
getCompressionPreference()-chaining tests get explicit try-with-resources
since they need the live client. Also picks up the linter's matching
try-with-resources reflow on the QueryEvent / QwpColumnLayout /
QwpResultBatchDecoderColumnTypes tests.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Tier 1 findings from the QWP client review, consolidated into one
change so client and server wire formats stay aligned.

Decoder (QwpResultBatchDecoder)
- Non-delta parseSymbolColumn now rejects a dict size that is negative
  or exceeds rowCount, and rejects an entry length that is negative or
  above Integer.MAX_VALUE. Previously dictSize*8 could overflow int and
  bypass ensureOwnedEntriesAddr; a signed-wrap entryLen could slip the
  p + entryLen > limit check and advance p backwards through already-
  consumed bytes.
- parseDeltaSymbolDict caps entryLen at Integer.MAX_VALUE before the
  int cast, computes newHeapPos in long space, and rejects growth
  beyond MAX_CONN_DICT_HEAP_BYTES. connDictSize is capped at
  MAX_CONN_DICT_SIZE. These guards keep a server that fails to emit
  CACHE_RESET from wrapping connDictHeapPos negative and letting
  copyMemory write past the heap.
- GEOHASH decode rejects precisionBits outside [1, 60], mirroring the
  server's own range check.

Bind encoder (QwpBindValues)
- checkScale split into per-width variants (DECIMAL64 <= 18,
  DECIMAL128 <= 38, DECIMAL256 <= 76) so out-of-range scales are
  rejected at the type boundary instead of silently passing through
  under the shared MAX_SCALE cap.
- setNull for DECIMAL64/128/256 emits the trailing scale byte and
  setNull for GEOHASH emits the precision_bits varint. The server
  reads both fields unconditionally before checking the null flag, so
  omitting them misframed every bind that followed in the batch.
- setDecimal128(Decimal128) and setDecimal256(Decimal256) now preserve
  value.getScale() on the null-sentinel path.
- New explicit helpers setNullDecimal64/128/256(int, int scale) and
  setNullGeohash(int, int precisionBits) let callers pin the type's
  scale/precision without a non-null value.
- setGeohash masks value to precisionBits before emitting, so bits
  above the declared precision cannot leak into the top wire byte
  when precisionBits is not a multiple of 8.

Buffer (QwpBatchBuffer)
- ensureCapacity rejects a negative required length, starts doubling
  at max(current, 1) so a zero initial capacity can still grow, and
  computes the doubling in long space with a clamp at
  Integer.MAX_VALUE. The old int doubling spun forever at 0 and
  wrapped negative above 2^30.

Concurrency (QwpQueryClient, QwpEgressIoThread)
- close() is gated by an AtomicBoolean so concurrent or repeat calls
  no longer walk the shutdown body twice.
- GenerationListener now owns its own AtomicReference<TerminalFailure>
  instead of sharing one across generations. An orphaned-but-in-flight
  listener's compareAndSet lands on a ref nobody reads, so a late
  callback from a dying I/O thread cannot poison the next connection.
- releaseBuffer re-checks closed after the freeBuffers.offer and
  removes the buffer back out if closePool raced it; the buffer is
  closed in place rather than stranded in a drained-and-abandoned
  pool. ArrayBlockingQueue.remove gives us the atomicity we need to
  avoid a double close when closePool's drain beat us to the same
  buffer.

Regression tests
- QwpBindEncoderTest: new coverage for per-width scale rejection
  (DECIMAL64 > 18, DECIMAL128 > 38, DECIMAL256 accepts 76), the four
  typed setNullDecimal*/setNullGeohash helpers, the geohash masking
  at sub-byte and 60-bit precisions, and updated expectations for
  the NULL path of the convenience Decimal128/256 overloads and the
  exhaustive null-types walk.
- QwpResultBatchDecoderHardeningTest: new frames for non-delta
  dict-size-above-rowCount, delta entry length > Integer.MAX_VALUE,
  and GEOHASH precision 0 / 61.
- QwpBatchBufferTest: new file covering grow-from-zero, reject-
  negative, and bounded-time growth.
- QwpQueryClientUnitTest: new concurrent-close stress test.
- QwpEgressIoThreadCloseRaceTest: new file; 200 iterations racing
  releaseBuffer against closePool under assertMemoryLeak so a
  stranded buffer surfaces as a native-memory leak.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@mtopolnik
Copy link
Copy Markdown
Contributor

[PR Coverage check]

😍 pass : 1624 / 2218 (73.22%)

file detail

path covered line new line coverage
🔵 io/questdb/client/cutlass/http/client/WebSocketClient.java 2 12 16.67%
🔵 io/questdb/client/cutlass/qwp/client/QwpEgressIoThread.java 136 289 47.06%
🔵 io/questdb/client/cutlass/qwp/client/QwpQueryClient.java 318 661 48.11%
🔵 io/questdb/client/cutlass/qwp/client/QwpColumnBatch.java 168 192 87.50%
🔵 io/questdb/client/cutlass/qwp/client/QwpResultBatchDecoder.java 373 426 87.56%
🔵 io/questdb/client/cutlass/qwp/client/QwpBatchBuffer.java 32 35 91.43%
🔵 io/questdb/client/cutlass/qwp/protocol/QwpBitReader.java 52 54 96.30%
🔵 io/questdb/client/cutlass/qwp/client/QwpBindValues.java 195 200 97.50%
🔵 io/questdb/client/cutlass/qwp/client/QwpServerInfoDecoder.java 37 38 97.37%
🔵 io/questdb/client/cutlass/qwp/client/QwpSpscQueue.java 37 37 100.00%
🔵 io/questdb/client/std/Long256Impl.java 10 10 100.00%
🔵 io/questdb/client/cutlass/qwp/client/QwpDecodeException.java 2 2 100.00%
🔵 io/questdb/client/cutlass/qwp/client/QwpEgressColumnInfo.java 6 6 100.00%
🔵 io/questdb/client/std/Long256Sink.java 6 6 100.00%
🔵 io/questdb/client/cutlass/qwp/client/QwpServerInfo.java 22 22 100.00%
🔵 io/questdb/client/cutlass/qwp/protocol/QwpConstants.java 4 4 100.00%
🔵 io/questdb/client/cutlass/qwp/client/ColumnView.java 80 80 100.00%
🔵 io/questdb/client/std/Uuid.java 10 10 100.00%
🔵 io/questdb/client/cutlass/qwp/client/RowView.java 35 35 100.00%
🔵 io/questdb/client/cutlass/qwp/client/QwpRoleMismatchException.java 6 6 100.00%
🔵 io/questdb/client/cutlass/qwp/client/QwpWebSocketSender.java 1 1 100.00%
🔵 io/questdb/client/cutlass/qwp/client/QwpColumnLayout.java 32 32 100.00%
🔵 io/questdb/client/cutlass/qwp/client/QueryEvent.java 31 31 100.00%
🔵 io/questdb/client/cutlass/qwp/protocol/QwpGorillaDecoder.java 27 27 100.00%
🔵 io/questdb/client/cutlass/qwp/client/QwpColumnBatchHandler.java 2 2 100.00%

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants