Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
7c717c5
pem: direct-query gRPC endpoint — stub + TDD contract (entlein/dx#29)
Jun 4, 2026
325b054
utils/shared/k8s: clean pre-existing lint debt (gci + sets.String)
entlein Jun 4, 2026
b89f885
pem/direct-query: Step 0 — build the stub clean (#29)
entlein Jun 4, 2026
5f839b8
pem/direct-query: Step 1 — HS256 JWT verify + matching test mint (#29)
entlein Jun 4, 2026
00e198c
pem/direct-query: Step 2a — wire LocalGRPCResultSinkServer ctor param…
entlein Jun 4, 2026
5cc9ab1
pem/direct-query: Step 2b — port ExecuteScript exec path (#29)
entlein Jun 4, 2026
14f20eb
pem/direct-query: Step 4 — pem_main.cc flag wiring, default OFF (#29)
entlein Jun 4, 2026
21d536e
ci: point vizier_release.yaml at the fork's runner label (#29 unblock)
entlein Jun 4, 2026
db32d73
pem/direct-query: dx-agent feedback — aud array + per-row column mars…
entlein Jun 4, 2026
b409464
pem/direct-query: Step 9 — manager constructs+starts the gRPC server …
entlein Jun 4, 2026
9ce6fbd
pem: drain exec stats + skip payload-less responses (entlein/dx#29)
entlein Jun 4, 2026
0ce0650
pem: direct-query startup is fail-soft + breadcrumbs (entlein/dx#29)
entlein Jun 4, 2026
50dffb0
agent: refuse to start if PL_JWT_SIGNING_KEY is empty (entlein/dx#29)
entlein Jun 4, 2026
66d92c5
ci: fix PR-checks (genfile + cfmt) on PR #49
entlein Jun 5, 2026
caddbd1
pem: jwt key fallback + signing-key security doc + tampering tests (e…
entlein Jun 5, 2026
2a3d510
pem: compile-time disable + auth/discouraged-practices doc (entlein/d…
entlein Jun 5, 2026
965bb54
pem: serialize sink access per request (CodeRabbit r3364645000)
entlein Jun 5, 2026
847409f
pem: direct-query :50305 uses cluster TLS (entlein/dx#29 — blocker)
entlein Jun 5, 2026
b523ce3
pxapi: WithDirectTLSSkipVerify for node-IP direct dial (entlein/dx#29)
entlein Jun 5, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/vizier_release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ jobs:
image-base-name: "dev_image_with_extras"
build-release:
name: Build Release
runs-on: oracle-16cpu-64gb-x86-64
runs-on: oracle-vm-16cpu-64gb-x86-64
needs: get-dev-image
permissions:
contents: read
Expand Down Expand Up @@ -140,7 +140,7 @@ jobs:
git commit -s -m "Release Helm chart Vizier ${VERSION}"
git push origin "gh-pages"
update-gh-artifacts-manifest:
runs-on: oracle-8cpu-32gb-x86-64
runs-on: oracle-vm-16cpu-64gb-x86-64
needs: [get-dev-image, create-github-release]
container:
image: ${{ needs.get-dev-image.outputs.image-with-tag }}
Expand Down
14 changes: 14 additions & 0 deletions src/api/go/pxapi/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,3 +82,17 @@ func WithDirectCredsInsecure() ClientOption {
c.insecureDirect = true
}
}

// WithDirectTLSSkipVerify is the secure-by-default option for direct (standalone /
// node-local PEM) connections: the transport IS TLS-encrypted, but the server cert
// is not chain/hostname-verified. Use this instead of WithDirectCredsInsecure when
// the direct endpoint serves TLS with a self-signed / service cert whose SAN does
// not match the node IP (e.g. vizier-pem's direct-query port served with
// service-tls-certs, dialed at HOST_IP). Unlike WithDisableTLSVerification it does
// NOT require a "cluster.local" address, so it works for the node-IP direct dial.
// Bearer creds (the minted JWT) therefore ride an encrypted channel, never plaintext.
func WithDirectTLSSkipVerify() ClientOption {
return func(c *Client) {
c.disableTLSVerification = true
}
}
8 changes: 8 additions & 0 deletions src/carnot/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ pl_cc_library(
"carnot_executable.cc",
],
),
# entlein/dx#29 — PEM's direct_query_server.cc needs concrete Carnot +
# EngineState defs to compile and execute PxL on the live engine. Same
# access pattern //src/experimental/standalone_pem already uses.
visibility = [
"//src/carnot:__subpackages__",
"//src/experimental:__subpackages__",
"//src/vizier/services/agent/pem:__pkg__",
],
deps = [
"//src/carnot/exec:cc_library",
"//src/carnot/exec/ml:cc_library",
Expand Down
14 changes: 14 additions & 0 deletions src/carnot/exec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,16 @@ pl_cc_library(
"clickhouse_source_node.h",
"exec_node.h",
"exec_state.h",
# entlein/dx#29 — direct_query_server_test in
# //src/vizier/services/agent/pem needs LocalGRPCResultSinkServer
# for its Carnot fixture (same pattern as //src/carnot:carnot_test).
# Per-target visibility extension below keeps the rest of cc_library
# package-internal.
"local_grpc_result_server.h",
],
visibility = [
"//src/carnot:__subpackages__",
"//src/vizier/services/agent/pem:__pkg__",
],
deps = [
"//src/carnot/carnotpb:carnot_pl_cc_proto",
Expand All @@ -61,6 +71,10 @@ pl_cc_library(
pl_cc_test_library(
name = "test_utils",
hdrs = glob(["*test_utils.h"]),
visibility = [
"//src/carnot:__subpackages__",
"//src/vizier/services/agent/pem:__pkg__", # entlein/dx#29 fixture
],
deps = [
":exec_node_test_helpers",
"@com_github_apache_arrow//:arrow",
Expand Down
3 changes: 3 additions & 0 deletions src/carnot/udf/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@ load("//bazel:pl_build_system.bzl", "pl_cc_binary", "pl_cc_library", "pl_cc_test
package(default_visibility = [
"//src/carnot:__subpackages__",
"//src/vizier/funcs:__subpackages__",
# entlein/dx#29 — PEM's direct_query_server_test builds a CarnotTest-style
# fixture with a udf::Registry, same pattern as //src/carnot:carnot_test.
"//src/vizier/services/agent/pem:__pkg__",
])

pl_cc_library(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,29 +24,29 @@ package(default_visibility = [

# Generate all Go container library permutations for supported Go versions.
go_container_libraries(
container_type = "grpc_server",
bazel_sdk_versions = pl_all_supported_go_sdk_versions,
container_type = "grpc_server",
prebuilt_container_versions = pl_go_test_versions,
)

# Stirling test cases usually test server side tracing. Therefore
# we only need to provide the bazel SDK versions for the client containers.
go_container_libraries(
container_type = "grpc_client",
bazel_sdk_versions = pl_all_supported_go_sdk_versions,
container_type = "grpc_client",
)

go_container_libraries(
container_type = "tls_server",
bazel_sdk_versions = pl_all_supported_go_sdk_versions,
container_type = "tls_server",
prebuilt_container_versions = pl_go_test_versions,
)

# Stirling test cases usually test server side tracing. Therefore
# we only need to provide the bazel SDK versions for the client containers.
go_container_libraries(
container_type = "tls_client",
bazel_sdk_versions = pl_all_supported_go_sdk_versions,
container_type = "tls_client",
)

pl_cc_test_library(
Expand Down
2 changes: 1 addition & 1 deletion src/utils/shared/k8s/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ import (
"strings"

log "github.com/sirupsen/logrus"
"k8s.io/apimachinery/pkg/api/meta"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
Expand Down
6 changes: 3 additions & 3 deletions src/utils/shared/k8s/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,8 +123,8 @@ func (o *ObjectDeleter) DeleteNamespace() error {
// groups are skipped during cluster-wide deletion sweeps because aggregated
// servers frequently advertise the delete verb on read-only virtual resources
// and fail the call with "operation not supported".
func (o *ObjectDeleter) getAggregatedGroupVersions() (sets.String, error) {
out := sets.NewString()
func (o *ObjectDeleter) getAggregatedGroupVersions() (sets.Set[string], error) {
out := sets.New[string]()
list, err := o.dynamicClient.Resource(apiServiceGVR).List(context.TODO(), metav1.ListOptions{})
if err != nil {
if errors.IsNotFound(err) || meta.IsNoMatchError(err) {
Expand Down Expand Up @@ -173,7 +173,7 @@ func (o *ObjectDeleter) getDeletableResourceTypes() ([]string, error) {
if len(resource.Verbs) == 0 {
continue
}
if !sets.NewString(resource.Verbs...).HasAll("delete") {
if !sets.New[string](resource.Verbs...).HasAll("delete") {
continue
}
resources = append(resources, resource.Name)
Expand Down
70 changes: 70 additions & 0 deletions src/vizier/services/agent/pem/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,23 @@ load("//bazel:pl_build_system.bzl", "pl_cc_binary", "pl_cc_library", "pl_cc_test

package(default_visibility = ["//src/vizier:__subpackages__"])

# entlein/dx#29 — compile-time kill switch for the direct-query endpoint.
# Operators who do not want the feature available IN THE BINARY at all build
# with `bazel build … --//src/vizier/services/agent/pem:direct_query=disabled`
# (or equivalently `--define=PX_PEM_DIRECT_QUERY=disabled` if invoking via
# bazel's legacy --define flag). When disabled, this propagates
# `-DPX_PEM_DIRECT_QUERY_DISABLED` into every translation unit in cc_library;
# direct_query_server.cc's body, pem_manager.cc's flag DEFINEs, and
# MaybeStartDirectQueryServer's implementation are all `#ifndef`'d out, and
# the stub class methods return UNIMPLEMENTED for any caller that still
# routes through DirectQueryServer. The runtime flag --direct_query_enabled
# is the soft, per-deploy toggle; this is the hard, per-build toggle. See
# DIRECT_QUERY_SECURITY.md "Disabling the feature".
config_setting(
name = "direct_query_disabled",
define_values = {"PX_PEM_DIRECT_QUERY": "disabled"},
)

pl_cc_library(
name = "cc_library",
srcs = glob(
Expand All @@ -30,7 +47,34 @@ pl_cc_library(
],
),
hdrs = glob(["*.h"]),
defines = select({
":direct_query_disabled": ["PX_PEM_DIRECT_QUERY_DISABLED"],
"//conditions:default": [],
}),
deps = [
# entlein/dx#29 direct-query server deps (direct_query_server.{h,cc}):
"//src/api/proto/vizierpb:vizier_pl_cc_proto",
# Use //src/carnot:carnot (the public header target) for forward decls
# in direct_query_server.h. Step 2 (#29) needs concrete types — pull
# additional carnot sub-targets exposed via the fork's PEM visibility:
"//src/carnot",
"//src/carnot:cc_library", # Carnot, EngineState concrete defs
"//src/carnot/carnotpb:carnot_pl_cc_proto", # TransferResultChunkRequest
"//src/carnot/exec:cc_library", # LocalGRPCResultSinkServer
"//src/carnot/funcs:cc_library", # Step 9: funcs::RegisterFuncsOrDie
"//src/carnot/planner/compiler:cc_library", # Compiler().Compile(...)
"//src/carnot/planpb:plan_pl_cc_proto", # planpb::Plan, OperatorType
"//src/carnot/udf:cc_library", # Step 9: udf::Registry
# Step 1 (#29): HS256 verify uses BoringSSL HMAC directly and parses
# claims via rapidjson. cpp_jwt's own HMAC verify path calls
# BIO_f_base64 which is in boringssl/decrepit/ — not exposed as a
# bazel target on this fork. Native HMAC is ~50 LoC vs. a boringssl
# patch; the mint side (test only, via cpp_jwt) still uses the lib.
"@boringssl//:crypto",
"@com_github_grpc_grpc//:grpc++",
"@com_github_rlyeh_sole//:sole", # sole::uuid4 for query_id
"@com_github_tencent_rapidjson//:rapidjson",
# existing PEM deps:
"//src/carnot/planner/dynamic_tracing/ir/logicalpb:logical_pl_cc_proto",
"//src/integrations/grpc_clocksync:cc_library",
"//src/shared/tracepoint_translation:cc_library",
Expand All @@ -51,6 +95,32 @@ pl_cc_test(
],
)

# entlein/dx#29 — TDD contract for the PEM direct-query endpoint. dx-agent authored
# the spec; pem-agent makes it pass (port the standalone_pem execution path + JWT
# verify + Carnot fixture). See DIRECT_QUERY_CONTRACT.md.
#
# Note: pl_cc_test auto-injects //src/common/testing:cc_library for gtest/gmock,
# so we must NOT list it explicitly here — same shape as tracepoint_manager_test
# above (the dx-agent flagged this as a likely nit in the kickoff comment).
pl_cc_test(
name = "direct_query_server_test",
srcs = ["direct_query_server_test.cc"],
deps = [
":cc_library",
"//src/api/proto/vizierpb:vizier_pl_cc_proto",
# Step 2b (#29) — Carnot exec fixture for ValidToken_TrivialQuery_StreamsRows
"//src/carnot",
"//src/carnot:cc_library",
"//src/carnot/exec:cc_library", # LocalGRPCResultSinkServer
"//src/carnot/funcs:cc_library", # RegisterFuncsOrDie
"//src/carnot/udf:cc_library", # udf::Registry
"//src/table_store:cc_library",
"@com_github_arun11299_cpp_jwt//:cpp_jwt", # Step 1: MakeBearerToken mints HS256
"@com_github_grpc_grpc//:grpc++",
"@com_github_rlyeh_sole//:sole",
],
)

pl_cc_binary(
name = "pem",
srcs = ["pem_main.cc"],
Expand Down
88 changes: 88 additions & 0 deletions src/vizier/services/agent/pem/DIRECT_QUERY_CONTRACT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
# PEM direct-query gRPC endpoint — contract (entlein/dx#29)

**Status:** stub PR. dx-agent owns this contract + the dx-side integration; the
**pem-agent** (on a bazel-capable build VM) implements the C++ to make the TDD
targets in `direct_query_server_test.cc` pass.

## Why

Today dx reads evidence by querying the in-cluster **vizier-query-broker** directly
(entlein/dx#28, live). That works and is the MVP path. #29 adds the *durable
per-node* path: make the **normal `vizier-pem`** itself serve `ExecuteScript`
directly over gRPC, so dx on each node can query its node-local PEM with no broker
hop and no cloud dependency.

This is the capability the **experimental `standalone_pem`** already proved
(`src/experimental/standalone_pem/vizier_server.h` — `px::vizier::agent::VizierServer`
implementing `api::vizierpb::VizierService::ExecuteScript` against a local Carnot).
The two differences for the real PEM:

1. **Metadata-connected.** The normal PEM has the metadata service, so per-pod PxL
filters (`df[df.ctx['pod'] == ...]`) resolve — the gap that made standalone_pem
return empty per-pod results (old #15). Reuse the PEM's existing Carnot +
table_store + metadata state; do **not** stand up a second Carnot.
2. **Authenticated.** standalone_pem was insecure (`WithDirectCredsInsecure`). The
real PEM is in `pl` and must require a **valid cluster service JWT** (the same
`jwt-signing-key` dx already mints with — see entlein/dx `cmd/dx-daemon/pxbroker.go`).

## The endpoint

- Service: `px.api.vizierpb.VizierService` (the generated gRPC service).
- Method implemented: **`ExecuteScript`** (server-streaming). Mutations/tracepoints
are **out of scope** for #29 — return `UNIMPLEMENTED` for `req.mutation()==true`.
(standalone_pem handles mutations; the dx read path never mutates.)
- Transport: gRPC over TLS. TLS may use the in-cluster self-signed CA (dx sets
`PX_DISABLE_TLS=1` to skip-verify, matching the broker path).

## Config (flags / env) — gated OFF by default

| flag / env | default | meaning |
|-------------------------------------|---------|----------------------------------------------------|
| `--direct_query_enabled` / `PL_PEM_DIRECT_QUERY_ENABLED` | `false` | master switch; when false the port is never opened |
| `--direct_query_port` / `PL_PEM_DIRECT_QUERY_PORT` | `50305` | gRPC listen port for the direct-query service |
| `--direct_query_jwt_signing_key` / `PL_JWT_SIGNING_KEY` | `""` | HMAC key the bearer JWT must verify against |

Default-off so existing PEM deployments are byte-for-byte unchanged until opted in.

## Auth contract

Every `ExecuteScript` call MUST present `authorization: Bearer <jwt>` metadata.
The JWT is verified with `PL_JWT_SIGNING_KEY` and must:
- have a valid signature (HS256) against the signing key,
- be unexpired (`exp` in the future),
- carry a service/audience claim acceptable to vizier (dx mints
`GenerateJWTForService("dx", "vizier")` — see `src/shared/services/utils`).

Missing/invalid/expired token → `grpc::StatusCode::UNAUTHENTICATED`. No token must
ever fall through to query execution.

## Behavioral contract (the executable spec → `direct_query_server_test.cc`)

1. **flag-off → no listener.** With `direct_query_enabled=false`, nothing listens on
the port; the PEM starts exactly as today.
2. **flag-on → serves ExecuteScript.** With it enabled + a signing key set, a gRPC
client with a valid bearer JWT gets a streamed response (status OK) for a trivial
PxL (e.g. `import px; px.display(px.DataFrame('http_events'))`).
3. **auth required.** Same call with (a) no token, (b) a token signed by the wrong
key, (c) an expired token → each `UNAUTHENTICATED`, no rows.
4. **metadata-connected filter.** A PxL with a per-pod filter returns only that pod's
rows (proves the metadata gap #15 is closed on the real PEM). May be an
integration test tagged `requires_metadata` if a unit Carnot fixture can't supply
pod context.
5. **mutation rejected.** `req.mutation()==true` → `UNIMPLEMENTED` (scope guard).
6. **no regression.** The existing PEM agent registration / Carnot / Stirling path is
unchanged when the flag is off (assert via the existing PEM smoke/unit tests).

## dx-side integration (owned by dx-agent — informational)

dx already has the client: `cmd/dx-daemon/pxbroker.go`. Pointing it at a per-PEM
addr is a one-line switch — add `DX_BENCH=pemdirect` selecting
`DX_PEM_DIRECT_ADDR=<HOST_IP>:50305` (HOST_IP via downward API), reusing the exact
JWT mint + `WithBearerAuth` + `WithDisableTLSVerification` path proven against the
broker. dx-agent adds this once #29's endpoint is live; no PEM-side work needed for it.

## Done =

`direct_query_server_test.cc` green under `bazel test`, PEM image builds via the
vizier-release workflow, and a live PG shows dx (`DX_BENCH=pemdirect`) ruling in the
log4shell e2e off the node-local PEM with the same verdict it gets via the broker.
Loading
Loading