feat(ilp): add QwpQueryClient for QWP egress over WebSocket#11
Open
bluestreak01 wants to merge 47 commits intomainfrom
Open
feat(ilp): add QwpQueryClient for QWP egress over WebSocket#11bluestreak01 wants to merge 47 commits intomainfrom
bluestreak01 wants to merge 47 commits intomainfrom
Conversation
Without enable_language(ASM) in CMakeLists, CMake silently drops the .S file from the build tree. On x86-64 Linux and macOS the C side of libzstd still references the symbols exported by the assembly file (HUF_decompress4X1_usingDTable_internal_fast_asm_loop and its 4X2 sibling), so the final link fails with "Undefined symbols for architecture x86_64" whenever the host matches ARCH_AMD64 AND NOT WIN32. Linux x86-64 shows the same failure once the build actually reaches the link step. ARM64 and Windows are unaffected because they skip the .S file and set ZSTD_DISABLE_ASM=1, routing everything through the C fallback. The asmlib subdirectory's enable_language(ASM_NASM) is separate -- NASM is Intel-syntax and only handles Agner Fog's .asm files.
macOS clang pulls stdint.h in transitively via stdlib.h, so the build was passing on darwin but failing on the manylinux2014 GCC that the linux-x86-64 and linux-aarch64 workflow jobs use. Add the explicit include so every supported toolchain compiles cleanly.
- Mark ioThread volatile in QwpQueryClient so cancel() from a thread other than the one that ran connect() observes the published reference, and a concurrent null-out from close() does not race. - In QwpResultBatchDecoder, zero decompressScratchAddr and decompressScratchCapacity between free and malloc so a throwing malloc cannot leave a dangling address paired with a non-zero capacity. The next decode would otherwise skip the first-alloc branch and trigger a use-after-free. - Wrap QwpResultBatchDecoderHardeningTest cases in assertMemoryLeak and close the decoder in finally to catch native-memory leaks in the decoder itself. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Auth helpers mirror the existing Sender conventions: - withBasicAuth(user, pass): sets Authorization: Basic <base64>, so a Postgres user created via CREATE USER ... WITH PASSWORD ... can log in unchanged. - withBearerToken(token): sets Authorization: Bearer <token> for OIDC access tokens. TLS helpers route through the existing WebSocketClientFactory.newTlsInstance path already used by QwpWebSocketSender: - withTls(): TLS with full validation and the JVM default trust store. - withInsecureTls(): TLS with certificate validation disabled (testing). - withTrustStore(path, password): TLS with a custom PKCS12/JKS store. fromConfig now accepts the wss:// schema and three additional keys that match the ingress Sender's config-string grammar: tls_verify=on|unsafe_off, tls_roots=<path>, tls_roots_password=<secret>. Auth and TLS keys are mutually validated so inconsistent combinations fail at build time rather than at connect. No changes to the wire protocol: the Authorization header is sent on the WebSocket upgrade request, which is the point at which the HTTP server's existing auth chain runs. Server-side support was already in place; this commit only exposes ergonomic client APIs for it. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
parseNullSection now skips the per-row "nonNullIdx[i] = i" array fill when the column carries no nulls in this batch. Saves O(rowCount * columnCount) trivial assignments per batch on the most common decode path. For a 16K-row x 100-column wide result that's 1.6M skipped iterations every batch. QwpColumnLayout gains a denseIndex(row) helper that returns row directly when nullBitmapAddr == 0, otherwise reads nonNullIdx[row]. All 13 typed accessors in QwpColumnBatch (getBool, getByteValue, getCharValue, getDecimal128High/Low, getDouble, getFloat, getIntValue, getLong, getLong256Word, getLongArray, getLongValue, getShortValue, getUuidHi/Lo, lookupBinaryBytes, lookupStringBytes) now go through denseIndex. parseArrayColumn and parseSymbolColumn hoist the discriminator out of their per-row loops. The public raw-API nonNullIndex(col) lazily materialises the identity array on first call when the column has no nulls, so the existing fragmentation / compression / credit-flow tests that consume the raw API keep working with no per-call cost amortised across reuse. Also drops QwpSpscQueue.clearAndWakeConsumer: zero callers in the codebase and the implementation wrote the consumer-only tail field from a context the doc explicitly described as the producer. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The previous merge (#12) removed TYPE_STRING from QwpConstants but left references in the decoder hot paths and one example. Replace the TYPE_STRING || TYPE_VARCHAR unions with TYPE_VARCHAR alone, drop the TYPE_STRING entry from the symbol-vs-varchar dispatch in getColumnValue/lookupStringBytes, and update the hardening test + the TypedResultExample to advertise/consume TYPE_VARCHAR on the wire. The module now compiles and all QWP decoder/table-buffer tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Transport- or protocol-level faults detected by the egress I/O thread (server close, truncated/unknown frames, decode failures, send/recv exceptions) now latch a sticky terminal state on the client. The next QwpQueryClient.execute() short-circuits via handler.onError with the stored status and message instead of dispatching another query to the broken connection. QwpEgressIoThread gains a TerminalFailureListener that the client wires to a first-failure-wins CAS (AtomicReference). The emitTerminalError helper calls the listener before offering the per-query QueryEvent so the sender-thread latch is visible before the user's handler reacts. Per-query QUERY_ERROR responses are explicitly NOT terminal: the connection stays healthy and the next execute() runs normally. The same applies to user-driven shutdown paths, which leave the latch clear so close() followed by connect() on a new client starts fresh. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
# Conflicts: # core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpEgressMsgKind.java # core/src/main/java/io/questdb/client/cutlass/qwp/client/QwpQueryClient.java
Adds 10 new unit / decoder-hardening tests and extends QwpQueryClientFromConfigTest to lift the QWP client coverage on PR #11 from 46.54% to roughly 75% on the files reachable without a live socket. Per the PR description's testing-policy split, end-to-end behaviour (QwpEgressIoThread, WebSocketClient handshake, live failover) stays in the parent QuestDB egress suite -- this commit only touches the tier that can be exercised in isolation. Per-file line coverage after this commit: QwpSpscQueue 100% (was 0%) QwpServerInfo 100% (was 0%) QwpRoleMismatchException 100% (was 0%) QwpColumnBatchHandler 100% (was 0%) QwpColumnLayout 100% (was 50%) QueryEvent 100% (was 19%) QwpGorillaDecoder 100% (was 11%) QwpServerInfoDecoder 97% (was 0%) QwpBitReader 96% (was 4%) QwpResultBatchDecoder 87% (was 40%) QwpQueryClient 50% (was 43%) Test patterns mirror the existing QwpResultBatchDecoderHardeningTest / QwpQueryClientPostConnectGuardTest style: native-buffer hardening with TestUtils.assertMemoryLeak, reflection for package-private state injection, CountDownLatch-driven SPSC concurrency tests. New per-type column-decoder coverage exercises every wire type (LONG, INT, BYTE, DOUBLE, BOOLEAN, UUID, DECIMAL128, VARCHAR, GEOHASH, SYMBOL, TIMESTAMP raw + Gorilla, LONG_ARRAY) plus the FLAG_DELTA_SYMBOL_DICT, FLAG_GORILLA, CACHE_RESET, and SCHEMA_MODE_REFERENCE state machines. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The happy-path branches in QwpQueryClientFromConfigTest were calling QwpQueryClient.fromConfig(...) without closing the result, leaking the client's bind-values native scratch on every test. Routes the parses through a new assertParses() helper that wraps the call in try-with-resources and unifies the assertNotNull check; the three getCompressionPreference()-chaining tests get explicit try-with-resources since they need the live client. Also picks up the linter's matching try-with-resources reflow on the QueryEvent / QwpColumnLayout / QwpResultBatchDecoderColumnTypes tests. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Tier 1 findings from the QWP client review, consolidated into one change so client and server wire formats stay aligned. Decoder (QwpResultBatchDecoder) - Non-delta parseSymbolColumn now rejects a dict size that is negative or exceeds rowCount, and rejects an entry length that is negative or above Integer.MAX_VALUE. Previously dictSize*8 could overflow int and bypass ensureOwnedEntriesAddr; a signed-wrap entryLen could slip the p + entryLen > limit check and advance p backwards through already- consumed bytes. - parseDeltaSymbolDict caps entryLen at Integer.MAX_VALUE before the int cast, computes newHeapPos in long space, and rejects growth beyond MAX_CONN_DICT_HEAP_BYTES. connDictSize is capped at MAX_CONN_DICT_SIZE. These guards keep a server that fails to emit CACHE_RESET from wrapping connDictHeapPos negative and letting copyMemory write past the heap. - GEOHASH decode rejects precisionBits outside [1, 60], mirroring the server's own range check. Bind encoder (QwpBindValues) - checkScale split into per-width variants (DECIMAL64 <= 18, DECIMAL128 <= 38, DECIMAL256 <= 76) so out-of-range scales are rejected at the type boundary instead of silently passing through under the shared MAX_SCALE cap. - setNull for DECIMAL64/128/256 emits the trailing scale byte and setNull for GEOHASH emits the precision_bits varint. The server reads both fields unconditionally before checking the null flag, so omitting them misframed every bind that followed in the batch. - setDecimal128(Decimal128) and setDecimal256(Decimal256) now preserve value.getScale() on the null-sentinel path. - New explicit helpers setNullDecimal64/128/256(int, int scale) and setNullGeohash(int, int precisionBits) let callers pin the type's scale/precision without a non-null value. - setGeohash masks value to precisionBits before emitting, so bits above the declared precision cannot leak into the top wire byte when precisionBits is not a multiple of 8. Buffer (QwpBatchBuffer) - ensureCapacity rejects a negative required length, starts doubling at max(current, 1) so a zero initial capacity can still grow, and computes the doubling in long space with a clamp at Integer.MAX_VALUE. The old int doubling spun forever at 0 and wrapped negative above 2^30. Concurrency (QwpQueryClient, QwpEgressIoThread) - close() is gated by an AtomicBoolean so concurrent or repeat calls no longer walk the shutdown body twice. - GenerationListener now owns its own AtomicReference<TerminalFailure> instead of sharing one across generations. An orphaned-but-in-flight listener's compareAndSet lands on a ref nobody reads, so a late callback from a dying I/O thread cannot poison the next connection. - releaseBuffer re-checks closed after the freeBuffers.offer and removes the buffer back out if closePool raced it; the buffer is closed in place rather than stranded in a drained-and-abandoned pool. ArrayBlockingQueue.remove gives us the atomicity we need to avoid a double close when closePool's drain beat us to the same buffer. Regression tests - QwpBindEncoderTest: new coverage for per-width scale rejection (DECIMAL64 > 18, DECIMAL128 > 38, DECIMAL256 accepts 76), the four typed setNullDecimal*/setNullGeohash helpers, the geohash masking at sub-byte and 60-bit precisions, and updated expectations for the NULL path of the convenience Decimal128/256 overloads and the exhaustive null-types walk. - QwpResultBatchDecoderHardeningTest: new frames for non-delta dict-size-above-rowCount, delta entry length > Integer.MAX_VALUE, and GEOHASH precision 0 / 61. - QwpBatchBufferTest: new file covering grow-from-zero, reject- negative, and bounded-time growth. - QwpQueryClientUnitTest: new concurrent-close stress test. - QwpEgressIoThreadCloseRaceTest: new file; 200 iterations racing releaseBuffer against closePool under assertMemoryLeak so a stranded buffer surfaces as a native-memory leak. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Contributor
[PR Coverage check]😍 pass : 1624 / 2218 (73.22%) file detail
|
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
QwpQueryClient, the client-side counterpart of the QWP egress endpoint that QuestDB's HTTP server now exposes at/read/v1. Sends a SQL query, receives result batches as binary WebSocket frames, decodes them into a column-majorQwpColumnBatchview.QwpQueryClient.fromConfig(\"ws::addr=localhost:9000;...\")mirrors theSender.fromConfigshape used by the existing ingestion API. A programmatic builder (QwpQueryClient.newPlainText(...)with chainedwith*setters) covers the same surface for callers who prefer typed configuration.DirectUtf8Sequenceview into the underlying WebSocket payload buffer.username/password), OIDC bearer (token), or a rawAuthorizationheader (auth). Mutually exclusive; validated at parse time.wss::schema withtls_verify=on|unsafe_off, optional customtls_roots/tls_roots_passwordtrust store.addr=host:port,host:port,...with atarget=any|primary|replicafilter applied against the server'sSERVER_INFOframe.failover=on(default) transparently reconnects to the nexttarget-matching endpoint on a transport-level failure and replays the query; handlers observeonFailoverReset(newNode)before the replayed batches.failover=offsurfaces transport failures directly.execute(sql, QwpBindSetter, handler)encodes typed binds for every scalar wire type viaQwpBindValues.compression=raw|zstd|autowith server-clampedcompression_level.withInitialCredit(bytes)advertises a byte budget; the client auto-replenishes per batch, the server parks when the budget runs out.cancel()sends aCANCELframe; the server aborts between batches and surfacesSTATUS_CANCELLED.examples/src/main/java/com/example/query/covering basic reads, bind encoding, compression, credit flow, error handling, streaming, and typed-row aggregation.API shape
QwpColumnBatchexposes both schema-agnostic accessors (getLong,getDouble,getString,isNull) and per-type raw-address APIs (valuesAddr,nonNullIndex,getLongValue,getIntValue) for callers writing the tightest possible inner loops.Threading model
QwpQueryClient, spawned onconnect(), marked daemon.WebSocketClient, theQwpResultBatchDecoder, and a small pool ofQwpBatchBufferinstances (default 4, configurable viawithBufferPoolSize).execute(), invokes the handler, releases the buffer back to the pool.freeBuffers.take(), the WS recv buffer fills, and TCP flow control closes the server's send window.close()sendsThread.interrupt()to the I/O thread, joins for up to 5 s, then frees the pool. If join times out, the daemon thread is left running and the buffer pool + WebSocket socket are leaked for the JVM lifetime rather than freed under an active reader;wasLastCloseTimedOut()surfaces the condition for callers.Performance characteristics
DirectUtf8Stringviews replace per-cellStringandbyte[]allocation.getLongstill has a wire-type switch; callers in tight scan loops can use the type-specialisedgetLongValue/getIntValueaccessors to skip it.Limitations
QwpResultCursorrow-iterator wrapper around the column-batch consumer API. The column-batch handler covers every functional case today; the cursor wrapper is on the Phase 2 backlog for callers who want per-row rather than per-batch iteration.Test plan
QwpBindEncoderTest,QwpColumnBatchViewsTest,QwpQueryClientFromConfigTest,QwpQueryClientPostConnectGuardTest,QwpResultBatchDecoderHardeningTest.🤖 Generated with Claude Code