From f704dc11797f5abfa82c55a52727b84409888312 Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 02:01:47 +0900 Subject: [PATCH 01/15] Add lazy jq input via wader/gojq and jqresult package. Stream rows through JQValue in lazy mode while eager mode keeps the full ResultSet. Fix metadata priming, stats-then-rows materialization, and cleanup that no longer drains unconsumed rows. Co-authored-by: Cursor --- .github/workflows/go.yml | 2 +- .github/workflows/golangci-lint.yml | 2 +- .github/workflows/ko.yml | 2 +- README.md | 15 +- go.mod | 8 +- go.sum | 12 +- jqresult/compile.go | 14 ++ jqresult/jqresult_test.go | 160 ++++++++++++++++++++ jqresult/lazy.go | 227 ++++++++++++++++++++++++++++ jqresult/mode.go | 33 ++++ jqresult/normalize.go | 79 ++++++++++ jqresult/pipeline.go | 30 ++++ jqresult/print.go | 50 ++++++ jqresult/protojson.go | 90 +++++++++++ jqresult/rowiter.go | 100 ++++++++++++ jqresult/rowjson.go | 38 +++++ main.go | 183 ++++++++++++---------- 17 files changed, 951 insertions(+), 94 deletions(-) create mode 100644 jqresult/compile.go create mode 100644 jqresult/jqresult_test.go create mode 100644 jqresult/lazy.go create mode 100644 jqresult/mode.go create mode 100644 jqresult/normalize.go create mode 100644 jqresult/pipeline.go create mode 100644 jqresult/print.go create mode 100644 jqresult/protojson.go create mode 100644 jqresult/rowiter.go create mode 100644 jqresult/rowjson.go diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 72c3985..b746b50 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -19,7 +19,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v4 with: - go-version: '1.23.2' + go-version: '1.24.0' - name: Build run: go build -v ./... diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 5e1c077..4e0ce4c 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -19,7 +19,7 @@ jobs: - uses: actions/checkout@v4 - uses: actions/setup-go@v5 with: - go-version: '1.23.2' + go-version: '1.24.0' - name: golangci-lint uses: golangci/golangci-lint-action@v6 with: diff --git a/.github/workflows/ko.yml b/.github/workflows/ko.yml index ac97bdc..693ac8e 100644 --- a/.github/workflows/ko.yml +++ b/.github/workflows/ko.yml @@ -12,7 +12,7 @@ jobs: steps: - uses: actions/setup-go@v4 with: - go-version: '1.23.x' + go-version: '1.24.x' - uses: actions/checkout@v3 - uses: ko-build/setup-ko@v0.6 diff --git a/README.md b/README.md index 9418ea9..fef99dd 100644 --- a/README.md +++ b/README.md @@ -54,7 +54,7 @@ Arguments: database: (required) ID of the database. ``` -Local build requires Go 1.23. +Local build requires Go 1.24. ``` $ go install github.com/apstndb/execspansql@latest @@ -114,7 +114,18 @@ $ execspansql ${DATABASE_ID} --query-mode=PROFILE \ ### Embedded jq -execspansql can process output using embedded [gojq](https://github.com/itchyny/gojq) using `--jq-filter` flag. +execspansql can process output using embedded [wader/gojq](https://github.com/wader/gojq) (jq-compatible; includes `JQValue` for lazy inputs) using `--filter` flag. + +`--jq-input-mode` controls how results are passed to jq (json/yaml only): + +| Mode | Input | Typical filter | +|------|--------|----------------| +| `eager` (default) | Full ResultSet object | `.`, `.stats.queryPlan` | +| `lazy` | `JQValue` root (`metadata` / `rows` Iter / `stats`) | `.rows[]`, `.stats.queryPlan` | + +In `lazy` mode, `metadata` is populated after the first row is read from Spanner (or after a zero-row result). Use `.rows[]` to stream rows; `.rows` alone is a jq array only after rows are materialized (for example after `.stats` drains them). Accessing `.stats` before `.rows` still allows later `.rows[]` to emit the drained rows. + +Output expands top-level `gojq.Iter` to one JSON/YAML document per row (JSONL-style). Nested `Iter` values inside objects are expanded to arrays on encode. #### Example: Extract QueryPlan diff --git a/go.mod b/go.mod index 553b3d0..68d1fd7 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/apstndb/execspansql -go 1.23.2 +go 1.24.0 require ( cloud.google.com/go/spanner v1.84.1 @@ -14,9 +14,9 @@ require ( github.com/cloudspannerecosystem/memefish v0.6.2 github.com/google/go-cmp v0.7.0 github.com/grpc-ecosystem/go-grpc-middleware v1.4.0 - github.com/itchyny/gojq v0.12.16 github.com/jessevdk/go-flags v1.6.1 github.com/samber/lo v1.53.0 + github.com/wader/gojq v0.12.1-0.20260315123642-6d8c75fc0e74 go.opentelemetry.io/otel v1.36.0 go.opentelemetry.io/otel/bridge/opencensus v1.27.0 go.opentelemetry.io/otel/sdk v1.36.0 @@ -71,7 +71,7 @@ require ( github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.6 // indirect github.com/googleapis/gax-go/v2 v2.15.0 // indirect - github.com/itchyny/timefmt-go v0.1.6 // indirect + github.com/itchyny/timefmt-go v0.1.7 // indirect github.com/klauspost/compress v1.17.4 // indirect github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect github.com/magiconair/properties v1.8.7 // indirect @@ -112,7 +112,7 @@ require ( golang.org/x/net v0.42.0 // indirect golang.org/x/oauth2 v0.30.0 // indirect golang.org/x/sync v0.16.0 // indirect - golang.org/x/sys v0.34.0 // indirect + golang.org/x/sys v0.38.0 // indirect golang.org/x/text v0.27.0 // indirect golang.org/x/time v0.12.0 // indirect google.golang.org/genproto v0.0.0-20250603155806-513f23925822 // indirect diff --git a/go.sum b/go.sum index 2b0819d..de74a78 100644 --- a/go.sum +++ b/go.sum @@ -918,10 +918,8 @@ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/itchyny/gojq v0.12.16 h1:yLfgLxhIr/6sJNVmYfQjTIv0jGctu6/DgDoivmxTr7g= -github.com/itchyny/gojq v0.12.16/go.mod h1:6abHbdC2uB9ogMS38XsErnfqJ94UlngIJGlRAIj4jTM= -github.com/itchyny/timefmt-go v0.1.6 h1:ia3s54iciXDdzWzwaVKXZPbiXzxxnv1SPGFfM/myJ5Q= -github.com/itchyny/timefmt-go v0.1.6/go.mod h1:RRDZYC5s9ErkjQvTvvU7keJjxUYzIISJGxm9/mAERQg= +github.com/itchyny/timefmt-go v0.1.7 h1:xyftit9Tbw+Dc/huSSPJaEmX1TVL8lw5vxjJLK4GMMA= +github.com/itchyny/timefmt-go v0.1.7/go.mod h1:5E46Q+zj7vbTgWY8o5YkMeYb4I6GeWLFnetPy5oBrAI= github.com/jessevdk/go-flags v1.6.1 h1:Cvu5U8UGrLay1rZfv/zP7iLpSHGUZ/Ou68T0iX1bBK4= github.com/jessevdk/go-flags v1.6.1/go.mod h1:Mk8T1hIAWpOiJiHa9rJASDK2UGWji0EuPGBnNLMooyc= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= @@ -1058,6 +1056,8 @@ github.com/tklauser/go-sysconf v0.3.12 h1:0QaGUFOdQaIVdPgfITYzaTegZvdCjmYO52cSFA github.com/tklauser/go-sysconf v0.3.12/go.mod h1:Ho14jnntGE1fpdOqQEEaiKRpvIavV0hSfmBq8nJbHYI= github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+Fk= github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= +github.com/wader/gojq v0.12.1-0.20260315123642-6d8c75fc0e74 h1:eLFDUQ8b/cmQTT50/+Jm/L8TpMwu6yra+BKz6X9eE/g= +github.com/wader/gojq v0.12.1-0.20260315123642-6d8c75fc0e74/go.mod h1:USEjy5fdczNWD8kQrhsj1EjA0xeCxLtvF1ev8pN2qpE= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -1385,8 +1385,8 @@ golang.org/x/sys v0.7.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.34.0 h1:H5Y5sJ2L2JRdyv7ROF1he/lPdvFsd0mJHFw2ThKHxLA= -golang.org/x/sys v0.34.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.2.0/go.mod h1:TVmDHMZPmdnySmBfhjOoOdhjzdE1h4u1VwSiw2l1Nuc= diff --git a/jqresult/compile.go b/jqresult/compile.go new file mode 100644 index 0000000..26ec1a4 --- /dev/null +++ b/jqresult/compile.go @@ -0,0 +1,14 @@ +package jqresult + +import ( + "github.com/wader/gojq" +) + +// Compile parses filter and returns executable jq code for the given input mode. +func Compile(filter string, mode InputMode) (*gojq.Code, error) { + q, err := gojq.Parse(filter) + if err != nil { + return nil, err + } + return gojq.Compile(q) +} diff --git a/jqresult/jqresult_test.go b/jqresult/jqresult_test.go new file mode 100644 index 0000000..7a31f6e --- /dev/null +++ b/jqresult/jqresult_test.go @@ -0,0 +1,160 @@ +package jqresult + +import ( + "encoding/json" + "testing" + + "github.com/wader/gojq" +) + +type stubIter struct { + vals []int + i int +} + +func (s *stubIter) Next() (any, bool) { + if s.i >= len(s.vals) { + return nil, false + } + v := s.vals[s.i] + s.i++ + return v, true +} + +func TestNormalizeForEncodeIter(t *testing.T) { + t.Parallel() + var it gojq.Iter = &stubIter{vals: []int{1, 2}} + got, err := NormalizeForEncode(it) + if err != nil { + t.Fatal(err) + } + want := []any{1, 2} + if len(got.([]any)) != len(want) { + t.Fatalf("got %v want %v", got, want) + } +} + +func TestNormalizeForEncodeInterfaceMapWithIter(t *testing.T) { + t.Parallel() + it := &stubIter{vals: []int{1, 2}} + got, err := NormalizeForEncode(map[string]interface{}{"rows": it}) + if err != nil { + t.Fatal(err) + } + b, _ := json.Marshal(got) + if string(b) != `{"rows":[1,2]}` { + t.Fatalf("got %s", b) + } +} + +func TestNormalizeForEncodeLeafSlice(t *testing.T) { + t.Parallel() + got, err := NormalizeForEncode([]float64{1.5, 2.5}) + if err != nil { + t.Fatal(err) + } + if _, ok := got.([]float64); !ok { + t.Fatalf("got %T", got) + } +} + +func TestLazyRowsAfterStatsDrain(t *testing.T) { + t.Parallel() + l := &Lazy{ + metadata: map[string]any{"c": 1}, + stats: map[string]any{"n": 2}, + drained: true, + materializedRows: []any{[]any{int64(1)}, []any{int64(2)}}, + } + q, err := gojq.Parse(".rows[]") + if err != nil { + t.Fatal(err) + } + iter := q.Run(l) + var got []any + for { + v, ok := iter.Next() + if !ok { + break + } + if err, isErr := v.(error); isErr { + t.Fatal(err) + } + got = append(got, v) + } + if len(got) != 2 { + t.Fatalf("got %d rows want 2: %v", len(got), got) + } +} + +func TestLazyStopDoesNotDrain(t *testing.T) { + t.Parallel() + l := &Lazy{ + rows: &RowIter{stopped: true}, + } + l.Stop() + if l.drained { + t.Fatal("Stop() must not drain rows") + } +} + +func TestLazyStatsKey(t *testing.T) { + t.Parallel() + l := &Lazy{ + metadata: map[string]any{"c": 1}, + metadataReady: true, + stats: map[string]any{"n": 2}, + drained: true, + } + q, err := gojq.Parse(".stats.n") + if err != nil { + t.Fatal(err) + } + iter := q.Run(l) + v, ok := iter.Next() + if !ok { + t.Fatal("no output") + } + if v != 2 { + t.Fatalf("got %v want 2", v) + } +} + +func TestDefaultFilter(t *testing.T) { + t.Parallel() + if DefaultFilter(InputEager) != "." { + t.Fatal(InputEager) + } +} + +func TestParseInputMode(t *testing.T) { + t.Parallel() + if _, err := ParseInputMode("nope"); err == nil { + t.Fatal("expected error") + } + if _, err := ParseInputMode("stream"); err == nil { + t.Fatal("stream mode is not supported") + } +} + +type enc struct { + vals []any +} + +func (e *enc) Encode(v any) error { + e.vals = append(e.vals, v) + return nil +} + +func TestPrintTopLevelIterJSONL(t *testing.T) { + t.Parallel() + e := &enc{} + // jq emitted a single value that is itself a gojq.Iter (e.g. filter "." with Iter root input). + out := gojq.NewIter[any](&stubIter{vals: []int{1, 2, 3}}) + if err := Print(e, out); err != nil { + t.Fatal(err) + } + if len(e.vals) != 3 { + t.Fatalf("got %d encodes want 3", len(e.vals)) + } +} diff --git a/jqresult/lazy.go b/jqresult/lazy.go new file mode 100644 index 0000000..c5f4c3c --- /dev/null +++ b/jqresult/lazy.go @@ -0,0 +1,227 @@ +package jqresult + +import ( + "sync" + + "cloud.google.com/go/spanner" + "github.com/wader/gojq" +) + +// StatsFunc builds the stats object after all rows have been read from rowIter. +type StatsFunc func(rowIter *spanner.RowIterator) (map[string]any, error) + +// StatsFuncFromProto is a StatsFunc that uses BuildResultSet with nil rows (iterator already drained). +func StatsFuncFromProto() StatsFunc { + return func(rowIter *spanner.RowIterator) (map[string]any, error) { + return StatsMap(rowIter, nil) + } +} + +// Lazy is a JQValue root for a Spanner query result. Accessing stats drains rows; rows streams via gojq.Iter. +type Lazy struct { + mu sync.Mutex + + rowIter *spanner.RowIterator + redact bool + statsFn StatsFunc + + metadata map[string]any + metadataReady bool + metadataErr error + stats map[string]any + rows *RowIter + materializedRows []any + + drained bool + drainErr error +} + +// NewLazy builds a lazy jq input. rowIter must not have been read yet; Lazy takes ownership and Stop()s it. +func NewLazy(rowIter *spanner.RowIterator, redact bool, statsFn StatsFunc) *Lazy { + return &Lazy{ + rowIter: rowIter, + redact: redact, + statsFn: statsFn, + rows: NewRowIter(rowIter, redact, RowToJSON), + } +} + +func (l *Lazy) drain() error { + l.mu.Lock() + defer l.mu.Unlock() + if l.drained { + return l.drainErr + } + l.materializedRows, l.drainErr = l.rows.Drain() + if l.drainErr == nil && l.statsFn != nil { + l.stats, l.drainErr = l.statsFn(l.rowIter) + } + l.drained = true + l.rows.Stop() + return l.drainErr +} + +func (l *Lazy) ensureMetadata() error { + l.mu.Lock() + if l.metadataReady { + err := l.metadataErr + l.mu.Unlock() + return err + } + l.mu.Unlock() + + if err := l.rows.Prime(); err != nil { + l.mu.Lock() + l.metadataErr = err + l.metadataReady = true + l.mu.Unlock() + return err + } + m, err := MetadataMap(l.rowIter) + l.mu.Lock() + l.metadata = m + l.metadataErr = err + l.metadataReady = true + l.mu.Unlock() + return err +} + +func (l *Lazy) statsMap() (map[string]any, error) { + if err := l.drain(); err != nil { + return nil, err + } + l.mu.Lock() + defer l.mu.Unlock() + return l.stats, nil +} + +func (l *Lazy) rowsJQValue() any { + l.mu.Lock() + drained := l.drained + redact := l.redact + materialized := append([]any(nil), l.materializedRows...) + l.mu.Unlock() + + if redact { + return gojq.NewIter[any]() + } + if drained { + return materializedRowsIter(materialized) + } + return l.rows +} + +func (l *Lazy) JQValueType() string { return gojq.JQTypeObject } + +func (l *Lazy) JQValueLength() any { return 3 } + +func (l *Lazy) JQValueSliceLen() any { return 0 } + +func (l *Lazy) JQValueIndex(int) any { return nil } + +func (l *Lazy) JQValueSlice(int, int) any { return nil } + +func (l *Lazy) JQValueKeys() any { + return []any{"metadata", "rows", "stats"} +} + +func (l *Lazy) JQValueHas(key any) any { + k, _ := key.(string) + switch k { + case "metadata", "rows", "stats": + return true + default: + return false + } +} + +func (l *Lazy) JQValueToNumber() any { return nil } + +func (l *Lazy) JQValueToString() any { return "" } + +func (l *Lazy) JQValueToGoJQ() any { + if err := l.ensureMetadata(); err != nil { + return err + } + if err := l.drain(); err != nil { + return err + } + l.mu.Lock() + defer l.mu.Unlock() + return map[string]any{ + "metadata": l.metadata, + "stats": l.stats, + "rows": l.materializedRows, + } +} + +func (l *Lazy) JQValueKey(name string) any { + switch name { + case "metadata": + if err := l.ensureMetadata(); err != nil { + return err + } + l.mu.Lock() + m := l.metadata + l.mu.Unlock() + return m + case "rows": + return l.rowsJQValue() + case "stats": + stats, err := l.statsMap() + if err != nil { + return err + } + return stats + default: + return nil + } +} + +func (l *Lazy) JQValueEach() any { + if err := l.ensureMetadata(); err != nil { + return err + } + l.mu.Lock() + m := l.metadata + drained := l.drained + statsVal := l.stats + l.mu.Unlock() + + rowsVal := l.rowsJQValue() + if statsVal == nil && !drained { + statsVal = nil + } + + return []gojq.PathValue{ + {Path: "metadata", Value: m}, + {Path: "rows", Value: rowsVal}, + {Path: "stats", Value: statsVal}, + } +} + +// Stop releases the row iterator without draining unconsumed rows. +func (l *Lazy) Stop() { + l.rows.Stop() +} + +type materializedIter struct { + rows []any + i int +} + +func (m *materializedIter) Next() (any, bool) { + if m.i >= len(m.rows) { + return nil, false + } + v := m.rows[m.i] + m.i++ + return v, true +} + +func materializedRowsIter(rows []any) gojq.Iter { + if len(rows) == 0 { + return gojq.NewIter[any]() + } + return &materializedIter{rows: rows} +} diff --git a/jqresult/mode.go b/jqresult/mode.go new file mode 100644 index 0000000..db50c05 --- /dev/null +++ b/jqresult/mode.go @@ -0,0 +1,33 @@ +package jqresult + +import "fmt" + +// InputMode selects how Spanner query results are passed to gojq. +type InputMode string + +const ( + InputEager InputMode = "eager" + InputLazy InputMode = "lazy" +) + +func ParseInputMode(s string) (InputMode, error) { + switch InputMode(s) { + case InputEager, InputLazy: + return InputMode(s), nil + default: + return "", fmt.Errorf("jq-input-mode must be eager or lazy") + } +} + +// DefaultFilter returns the filter used when the user passes an empty --filter. +func DefaultFilter(InputMode) string { + return "." +} + +// ValidateFormat returns an error when mode is incompatible with the output format. +func (m InputMode) ValidateFormat(format string) error { + if format == "experimental_csv" && m != InputEager { + return fmt.Errorf("--jq-input-mode=%s is only supported with --format=json or yaml", m) + } + return nil +} diff --git a/jqresult/normalize.go b/jqresult/normalize.go new file mode 100644 index 0000000..4886688 --- /dev/null +++ b/jqresult/normalize.go @@ -0,0 +1,79 @@ +package jqresult + +import ( + "encoding/json" + "math/big" + + "github.com/wader/gojq" +) + +// NormalizeForEncode walks v and expands gojq.Iter and gojq.JQValue so encoding/json or yaml can encode the result. +// jq-compatible containers ([]any, map[string]any) are recursed; other concrete values are left as leaves. +func NormalizeForEncode(v any) (any, error) { + if v == nil { + return nil, nil + } + if err, ok := v.(error); ok { + return nil, err + } + switch v := v.(type) { + case gojq.Iter: + return normalizeIter(v) + case gojq.JQValue: + return NormalizeForEncode(v.JQValueToGoJQ()) + case []any: + out := make([]any, len(v)) + for i, e := range v { + n, err := NormalizeForEncode(e) + if err != nil { + return nil, err + } + out[i] = n + } + return out, nil + case map[string]any: + out := make(map[string]any, len(v)) + for k, e := range v { + n, err := NormalizeForEncode(e) + if err != nil { + return nil, err + } + out[k] = n + } + return out, nil + default: + if isEncodeLeaf(v) { + return v, nil + } + return v, nil + } +} + +func normalizeIter(it gojq.Iter) ([]any, error) { + var out []any + for { + x, ok := it.Next() + if !ok { + break + } + n, err := NormalizeForEncode(x) + if err != nil { + return nil, err + } + out = append(out, n) + } + return out, nil +} + +// isEncodeLeaf reports values that cannot contain gojq.Iter and need no further normalization. +func isEncodeLeaf(v any) bool { + switch v.(type) { + case nil, bool, int, int8, int16, int32, int64, + uint, uint8, uint16, uint32, uint64, + float32, float64, string, + json.Number, *big.Int: + return true + default: + return false + } +} diff --git a/jqresult/pipeline.go b/jqresult/pipeline.go new file mode 100644 index 0000000..bfb5fa0 --- /dev/null +++ b/jqresult/pipeline.go @@ -0,0 +1,30 @@ +package jqresult + +import ( + "fmt" + + "cloud.google.com/go/spanner" + sppb "cloud.google.com/go/spanner/apiv1/spannerpb" + "github.com/wader/gojq" +) + +// Execute runs jq. For eager mode, rs must be set and rowIter is ignored. +// For lazy mode, rowIter must be unread; cleanup releases the iterator state. +func Execute(code *gojq.Code, mode InputMode, rowIter *spanner.RowIterator, rs *sppb.ResultSet, redactRows bool) (gojq.Iter, func(), error) { + switch mode { + case InputEager: + if rs == nil { + return nil, func() {}, fmt.Errorf("eager mode requires a materialized ResultSet") + } + m, err := ResultSetMap(rs) + if err != nil { + return nil, func() {}, err + } + return code.Run(m), func() {}, nil + case InputLazy: + lazy := NewLazy(rowIter, redactRows, StatsFuncFromProto()) + return code.Run(lazy), lazy.Stop, nil + default: + return nil, func() {}, fmt.Errorf("unknown jq input mode: %s", mode) + } +} diff --git a/jqresult/print.go b/jqresult/print.go new file mode 100644 index 0000000..60eedca --- /dev/null +++ b/jqresult/print.go @@ -0,0 +1,50 @@ +package jqresult + +import "github.com/wader/gojq" + +// Encoder writes one jq emission (JSON or YAML document). +type Encoder interface { + Encode(v any) error +} + +// Print writes all values from a jq iterator. +// Top-level gojq.Iter values are expanded to one Encode per element (JSONL-friendly). +// Other values are normalized (including nested Iter) then encoded once. +func Print(enc Encoder, iter gojq.Iter) error { + for { + v, ok := iter.Next() + if !ok { + return nil + } + if err, ok := v.(error); ok { + return err + } + if err := encodeOne(enc, v, true); err != nil { + return err + } + } +} + +func encodeOne(enc Encoder, v any, topLevel bool) error { + if topLevel { + if it, ok := v.(gojq.Iter); ok { + for { + x, ok := it.Next() + if !ok { + return nil + } + if err, ok := x.(error); ok { + return err + } + if err := encodeOne(enc, x, false); err != nil { + return err + } + } + } + } + n, err := NormalizeForEncode(v) + if err != nil { + return err + } + return enc.Encode(n) +} diff --git a/jqresult/protojson.go b/jqresult/protojson.go new file mode 100644 index 0000000..f6a6176 --- /dev/null +++ b/jqresult/protojson.go @@ -0,0 +1,90 @@ +package jqresult + +import ( + "bytes" + "encoding/json" + + "cloud.google.com/go/spanner" + sppb "cloud.google.com/go/spanner/apiv1/spannerpb" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/types/known/structpb" +) + +// ProtoToMap marshals a protobuf message with protojson and decodes to map[string]any. +func ProtoToMap(m proto.Message) (map[string]any, error) { + b, err := protojson.Marshal(m) + if err != nil { + return nil, err + } + dec := json.NewDecoder(bytes.NewReader(b)) + dec.UseNumber() + var object map[string]any + if err := dec.Decode(&object); err != nil { + return nil, err + } + return object, nil +} + +// MetadataMap returns the metadata object for a row iterator (before rows are consumed). +func MetadataMap(rowIter *spanner.RowIterator) (map[string]any, error) { + obj, err := ProtoToMap(&sppb.ResultSet{Metadata: rowIter.Metadata}) + if err != nil { + return nil, err + } + if m, ok := obj["metadata"].(map[string]any); ok { + return m, nil + } + return nil, nil +} + +// StatsMap builds the stats object after rowIter has been fully consumed. +func StatsMap(rowIter *spanner.RowIterator, rows []*structpb.ListValue) (map[string]any, error) { + rs, err := BuildResultSet(rows, rowIter) + if err != nil { + return nil, err + } + obj, err := ProtoToMap(rs) + if err != nil { + return nil, err + } + if s, ok := obj["stats"].(map[string]any); ok { + return s, nil + } + return nil, nil +} + +// ResultSetMap materializes a full ResultSet as a jq input map (eager mode). +func ResultSetMap(rs *sppb.ResultSet) (map[string]any, error) { + return ProtoToMap(rs) +} + +// BuildResultSet constructs ResultSet stats from a consumed row iterator. +func BuildResultSet(rows []*structpb.ListValue, rowIter *spanner.RowIterator) (*sppb.ResultSet, error) { + out := &sppb.ResultSet{ + Rows: rows, + Metadata: rowIter.Metadata, + } + + var queryStats *structpb.Struct + if rowIter.QueryStats != nil { + qs, err := structpb.NewStruct(rowIter.QueryStats) + if err != nil { + return nil, err + } + queryStats = qs + } + + if rowIter.QueryPlan == nil && queryStats == nil && rowIter.RowCount == 0 { + return out, nil + } + + out.Stats = &sppb.ResultSetStats{ + QueryPlan: rowIter.QueryPlan, + QueryStats: queryStats, + } + if rowIter.RowCount != 0 { + out.Stats.RowCount = &sppb.ResultSetStats_RowCountExact{RowCountExact: rowIter.RowCount} + } + return out, nil +} diff --git a/jqresult/rowiter.go b/jqresult/rowiter.go new file mode 100644 index 0000000..763f7de --- /dev/null +++ b/jqresult/rowiter.go @@ -0,0 +1,100 @@ +package jqresult + +import ( + "errors" + + "cloud.google.com/go/spanner" + "google.golang.org/api/iterator" +) + +// RowIter streams query rows as jq-compatible values ([]any per row, matching protojson ResultSet rows). +type RowIter struct { + iter *spanner.RowIterator + redact bool + rowToJSON func(*spanner.Row) (any, error) + stopped bool + + primedRow *spanner.Row + primed bool +} + +func NewRowIter(rowIter *spanner.RowIterator, redact bool, rowToJSON func(*spanner.Row) (any, error)) *RowIter { + if rowToJSON == nil { + rowToJSON = RowToJSON + } + return &RowIter{iter: rowIter, redact: redact, rowToJSON: rowToJSON} +} + +// Prime reads the first row (or iterator.Done) so rowIter.Metadata is populated. +// The first row is buffered for the next Next call when present. +func (r *RowIter) Prime() error { + if r.primed || r.stopped { + return nil + } + r.primed = true + row, err := r.iter.Next() + if errors.Is(err, iterator.Done) { + return nil + } + if err != nil { + return err + } + r.primedRow = row + return nil +} + +func (r *RowIter) Next() (any, bool) { + if r.redact || r.stopped { + return nil, false + } + if r.primedRow != nil { + row := r.primedRow + r.primedRow = nil + v, err := r.rowToJSON(row) + if err != nil { + return err, true + } + return v, true + } + row, err := r.iter.Next() + if errors.Is(err, iterator.Done) { + return nil, false + } + if err != nil { + return err, true + } + v, err := r.rowToJSON(row) + if err != nil { + return err, true + } + return v, true +} + +// Drain exhausts the Spanner row iterator. When redact is false, drained rows are returned. +func (r *RowIter) Drain() ([]any, error) { + if r.stopped { + return nil, nil + } + var rows []any + for { + v, ok := r.Next() + if !ok { + return rows, nil + } + if err, isErr := v.(error); isErr { + return rows, err + } + if r.redact { + continue + } + rows = append(rows, v) + } +} + +func (r *RowIter) Stop() { + if r.stopped { + return + } + r.stopped = true + r.iter.Stop() +} diff --git a/jqresult/rowjson.go b/jqresult/rowjson.go new file mode 100644 index 0000000..e611ab0 --- /dev/null +++ b/jqresult/rowjson.go @@ -0,0 +1,38 @@ +package jqresult + +import ( + "bytes" + "encoding/json" + "slices" + "spheric.cloud/xiter" + + "cloud.google.com/go/spanner" + sppb "cloud.google.com/go/spanner/apiv1/spannerpb" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/types/known/structpb" +) + +// RowToListValue converts a Spanner row to structpb.ListValue. +func RowToListValue(r *spanner.Row) *structpb.ListValue { + return &structpb.ListValue{Values: slices.Collect(xiter.Map(xiter.Range(0, r.Size()), r.ColumnValue))} +} + +// RowToJSON encodes one row the same way as protojson on a single-row ResultSet (array-shaped row). +func RowToJSON(r *spanner.Row) (any, error) { + rs := &sppb.ResultSet{Rows: []*structpb.ListValue{RowToListValue(r)}} + b, err := protojson.Marshal(rs) + if err != nil { + return nil, err + } + dec := json.NewDecoder(bytes.NewReader(b)) + dec.UseNumber() + var obj map[string]any + if err := dec.Decode(&obj); err != nil { + return nil, err + } + rows, ok := obj["rows"].([]any) + if !ok || len(rows) == 0 { + return nil, nil + } + return rows[0], nil +} diff --git a/main.go b/main.go index 719af53..fbc872e 100644 --- a/main.go +++ b/main.go @@ -26,7 +26,6 @@ import ( sdktrace "go.opentelemetry.io/otel/sdk/trace" - "google.golang.org/protobuf/proto" "gopkg.in/yaml.v3" "google.golang.org/api/option" @@ -39,9 +38,9 @@ import ( sppb "cloud.google.com/go/spanner/apiv1/spannerpb" "github.com/apstndb/spannerotel/interceptor" svwriter "github.com/apstndb/spanvalue/writer" - "github.com/itchyny/gojq" + "github.com/apstndb/execspansql/jqresult" "github.com/jessevdk/go-flags" - "google.golang.org/protobuf/encoding/protojson" + "github.com/wader/gojq" "google.golang.org/protobuf/types/known/structpb" ) @@ -80,6 +79,7 @@ type opts struct { JqFilter string `long:"filter" description:"jq filter"` JqRawOutput bool `long:"raw-output" short:"r" description:"(--raw-output of jq)"` JqFromFile string `long:"filter-file" description:"(--from-file of jq)"` + JqInputMode string `long:"jq-input-mode" description:"How query rows are passed to jq (json/yaml only): eager (full ResultSet), lazy (JQValue root)." default:"eager" choice:"eager" choice:"lazy"` Param map[string]string `long:"param" description:"[name]:[Cloud Spanner type(PLAN only) or literal]"` LogGrpc bool `long:"log-grpc" description:"Show gRPC logs"` TraceProject string `long:"experimental-trace-project"` @@ -128,6 +128,10 @@ func processFlags() (o opts, err error) { if o.JqFilter != "" && o.JqFromFile != "" { return o, errors.New("--jq-filter and --jq-from-file are exclusive") } + + if _, err := jqresult.ParseInputMode(o.JqInputMode); err != nil { + return o, err + } return o, nil } @@ -230,18 +234,23 @@ func _main() error { ctx, cancel := context.WithTimeout(context.Background(), o.Timeout) defer cancel() - // Use jqQuery even if empty filter - jqFilter, err := readFileOrDefault(o.JqFromFile, o.JqFilter) + jqMode, err := jqresult.ParseInputMode(o.JqInputMode) if err != nil { return err } + if err := jqMode.ValidateFormat(o.Format); err != nil { + return err + } - // Overwrite to "." because gojq don't support empty query + jqFilter, err := readFileOrDefault(o.JqFromFile, o.JqFilter) + if err != nil { + return err + } if jqFilter == "" { - jqFilter = "." + jqFilter = jqresult.DefaultFilter(jqMode) } - jqQuery, err := gojq.Parse(jqFilter) + jqCode, err := jqresult.Compile(jqFilter, jqMode) if err != nil { return err } @@ -334,22 +343,7 @@ func _main() error { return runAndWriteCsv(ctx, client, stmt, spanner.QueryOptions{Mode: &mode}, m, o.RedactRows) } - rs, err := runInNewTransaction(ctx, client, stmt, spanner.QueryOptions{Mode: &mode}, m, o.RedactRows) - if err != nil { - return err - } - - object, err := toProtojsonObject(rs) - if err != nil { - return err - } - - enc, err := newEncoder(os.Stdout, o.Format, o.CompactOutput, o.JqRawOutput) - if err != nil { - return err - } - - return printResult(enc, jqQuery.Run(object)) + return runJqOutput(ctx, client, stmt, spanner.QueryOptions{Mode: &mode}, m, o, jqMode, jqCode) } func runAndWriteCsv(ctx context.Context, client *spanner.Client, stmt spanner.Statement, opts spanner.QueryOptions, mode queryMode, redactRows bool) error { @@ -471,31 +465,16 @@ func newClient(ctx context.Context, project, instance, database string, logGrpc }, copts...) } -func toProtojsonObject(m proto.Message) (map[string]interface{}, error) { - b, err := protojson.Marshal(m) - if err != nil { - return nil, err - } - - dec := json.NewDecoder(bytes.NewReader(b)) - dec.UseNumber() - - var object map[string]interface{} - err = dec.Decode(&object) - if err != nil { - return nil, err - } - return object, nil +type encoder interface { + Encode(v any) error } -type encoder interface{ Encode(v interface{}) error } - type stringPassThroughEncoderWrapper struct { Writer io.Writer Enc encoder } -func (enc *stringPassThroughEncoderWrapper) Encode(v interface{}) error { +func (enc *stringPassThroughEncoderWrapper) Encode(v any) error { if s, ok := v.(string); ok { _, err := fmt.Fprintln(enc.Writer, s) return err @@ -503,21 +482,95 @@ func (enc *stringPassThroughEncoderWrapper) Encode(v interface{}) error { return enc.Enc.Encode(v) } -func printResult(enc encoder, iter gojq.Iter) error { - for { - v, ok := iter.Next() - if !ok { - break +func runJqOutput( + ctx context.Context, + client *spanner.Client, + stmt spanner.Statement, + opts spanner.QueryOptions, + mode queryMode, + o opts, + jqMode jqresult.InputMode, + jqCode *gojq.Code, +) error { + if jqMode == jqresult.InputEager { + rs, err := runInNewTransaction(ctx, client, stmt, opts, mode, o.RedactRows) + if err != nil { + return err + } + enc, err := newEncoder(os.Stdout, o.Format, o.CompactOutput, o.JqRawOutput) + if err != nil { + return err + } + iter, cleanup, err := jqresult.Execute(jqCode, jqMode, nil, rs, o.RedactRows) + if err != nil { + return err + } + defer cleanup() + return jqresult.Print(enc, iter) + } + + switch mode := mode.(type) { + case readWrite: + var buf bytes.Buffer + _, err := client.ReadWriteTransaction(ctx, func(ctx context.Context, tx *spanner.ReadWriteTransaction) error { + buf.Reset() + enc, err := newEncoder(&buf, o.Format, o.CompactOutput, o.JqRawOutput) + if err != nil { + return err + } + return runJqOnRowIter(tx.QueryWithOptions(ctx, stmt, opts), o.RedactRows, jqMode, jqCode, enc) + }) + if err != nil { + return err } - if err, ok := v.(error); ok { + _, err = io.Copy(os.Stdout, &buf) + return err + case single: + enc, err := newEncoder(os.Stdout, o.Format, o.CompactOutput, o.JqRawOutput) + if err != nil { return err } - err := enc.Encode(v) + rowIter := client.Single().WithTimestampBound(mode.TimestampBound).QueryWithOptions(ctx, stmt, opts) + return runJqOnRowIter(rowIter, o.RedactRows, jqMode, jqCode, enc) + case partitionedDML: + count, err := client.PartitionedUpdateWithOptions(ctx, stmt, opts) if err != nil { return err } + rs := &sppb.ResultSet{ + Metadata: &sppb.ResultSetMetadata{RowType: &sppb.StructType{}}, + Stats: &sppb.ResultSetStats{ + RowCount: &sppb.ResultSetStats_RowCountLowerBound{RowCountLowerBound: count}, + }, + } + enc, err := newEncoder(os.Stdout, o.Format, o.CompactOutput, o.JqRawOutput) + if err != nil { + return err + } + iter, cleanup, err := jqresult.Execute(jqCode, jqresult.InputEager, nil, rs, false) + if err != nil { + return err + } + defer cleanup() + return jqresult.Print(enc, iter) + default: + panic(fmt.Sprintf("unknown mode: %T", mode)) + } +} + +func runJqOnRowIter( + rowIter *spanner.RowIterator, + redactRows bool, + jqMode jqresult.InputMode, + jqCode *gojq.Code, + enc encoder, +) error { + iter, cleanup, err := jqresult.Execute(jqCode, jqMode, rowIter, nil, redactRows) + if err != nil { + return err } - return nil + defer cleanup() + return jqresult.Print(enc, iter) } func newEncoder(writer io.Writer, format string, compactOutput bool, rawOutput bool) (encoder, error) { @@ -564,35 +617,7 @@ func consumeRowIterIntoResultSet(rowIter *spanner.RowIterator, redactRows bool) // convertToResultSet convert rows and rowIter into sppb.ResultSet. // rowIter must be consumed with iterator.Done, and not Stop()-ed. func convertToResultSet(rows []*structpb.ListValue, rowIter *spanner.RowIterator) (*sppb.ResultSet, error) { - // Leave null if fields are not populated - rs := &sppb.ResultSet{ - Rows: rows, - Metadata: rowIter.Metadata, - } - - var queryStats *structpb.Struct - if rowIter.QueryStats != nil { - qs, err := structpb.NewStruct(rowIter.QueryStats) - if err != nil { - return nil, err - } - queryStats = qs - } - - // If there are no stats member, entire Stats must be nil - if rowIter.QueryPlan == nil && queryStats == nil && rowIter.RowCount == 0 { - return rs, nil - } - - rs.Stats = &sppb.ResultSetStats{ - QueryPlan: rowIter.QueryPlan, - QueryStats: queryStats, - } - if rowIter.RowCount != 0 { - rs.Stats.RowCount = &sppb.ResultSetStats_RowCountExact{RowCountExact: rowIter.RowCount} - } - - return rs, nil + return jqresult.BuildResultSet(rows, rowIter) } func rowIterSeq(rowIter *spanner.RowIterator) iter.Seq2[*spanner.Row, error] { From 126be2a95fd6e1f1e433a46e9506608e55f9959b Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 02:20:20 +0900 Subject: [PATCH 02/15] Address PR review: ioMu, redact drain, RowToJSON, lint CI. Serialize lazy Spanner I/O with ioMu, drain iterators fully when redacting rows, use ListValue.AsSlice for row encoding, and bump golangci-lint to v1.64.8 for Go 1.24. Co-authored-by: Cursor --- .github/workflows/golangci-lint.yml | 2 +- jqresult/lazy.go | 58 ++++++++++++++++++++++----- jqresult/rowiter.go | 61 +++++++++++++++++++++++------ jqresult/rowjson.go | 21 +--------- 4 files changed, 98 insertions(+), 44 deletions(-) diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 4e0ce4c..b792e32 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -23,5 +23,5 @@ jobs: - name: golangci-lint uses: golangci/golangci-lint-action@v6 with: - version: v1.60 + version: v1.64.8 args: --timeout=5m \ No newline at end of file diff --git a/jqresult/lazy.go b/jqresult/lazy.go index c5f4c3c..7bba27b 100644 --- a/jqresult/lazy.go +++ b/jqresult/lazy.go @@ -19,7 +19,8 @@ func StatsFuncFromProto() StatsFunc { // Lazy is a JQValue root for a Spanner query result. Accessing stats drains rows; rows streams via gojq.Iter. type Lazy struct { - mu sync.Mutex + mu sync.Mutex + ioMu sync.Mutex rowIter *spanner.RowIterator redact bool @@ -38,27 +39,51 @@ type Lazy struct { // NewLazy builds a lazy jq input. rowIter must not have been read yet; Lazy takes ownership and Stop()s it. func NewLazy(rowIter *spanner.RowIterator, redact bool, statsFn StatsFunc) *Lazy { - return &Lazy{ + l := &Lazy{ rowIter: rowIter, redact: redact, statsFn: statsFn, - rows: NewRowIter(rowIter, redact, RowToJSON), } + l.rows = NewRowIter(rowIter, redact, RowToJSON) + l.rows.ioMu = &l.ioMu + return l } func (l *Lazy) drain() error { l.mu.Lock() - defer l.mu.Unlock() if l.drained { - return l.drainErr + err := l.drainErr + l.mu.Unlock() + return err } - l.materializedRows, l.drainErr = l.rows.Drain() - if l.drainErr == nil && l.statsFn != nil { - l.stats, l.drainErr = l.statsFn(l.rowIter) + l.mu.Unlock() + + l.ioMu.Lock() + l.mu.Lock() + if l.drained { + err := l.drainErr + l.mu.Unlock() + l.ioMu.Unlock() + return err + } + l.mu.Unlock() + + materializedRows, drainErr := l.rows.drainUnlocked() + var stats map[string]any + if drainErr == nil && l.statsFn != nil { + stats, drainErr = l.statsFn(l.rowIter) } + l.ioMu.Unlock() + + l.mu.Lock() + l.materializedRows = materializedRows + l.stats = stats + l.drainErr = drainErr l.drained = true + l.mu.Unlock() + l.rows.Stop() - return l.drainErr + return drainErr } func (l *Lazy) ensureMetadata() error { @@ -70,14 +95,27 @@ func (l *Lazy) ensureMetadata() error { } l.mu.Unlock() - if err := l.rows.Prime(); err != nil { + l.ioMu.Lock() + l.mu.Lock() + if l.metadataReady { + err := l.metadataErr + l.mu.Unlock() + l.ioMu.Unlock() + return err + } + l.mu.Unlock() + + if err := l.rows.primeUnlocked(); err != nil { l.mu.Lock() l.metadataErr = err l.metadataReady = true l.mu.Unlock() + l.ioMu.Unlock() return err } m, err := MetadataMap(l.rowIter) + l.ioMu.Unlock() + l.mu.Lock() l.metadata = m l.metadataErr = err diff --git a/jqresult/rowiter.go b/jqresult/rowiter.go index 763f7de..6b81475 100644 --- a/jqresult/rowiter.go +++ b/jqresult/rowiter.go @@ -2,6 +2,7 @@ package jqresult import ( "errors" + "sync" "cloud.google.com/go/spanner" "google.golang.org/api/iterator" @@ -9,6 +10,7 @@ import ( // RowIter streams query rows as jq-compatible values ([]any per row, matching protojson ResultSet rows). type RowIter struct { + ioMu *sync.Mutex iter *spanner.RowIterator redact bool rowToJSON func(*spanner.Row) (any, error) @@ -25,9 +27,23 @@ func NewRowIter(rowIter *spanner.RowIterator, redact bool, rowToJSON func(*spann return &RowIter{iter: rowIter, redact: redact, rowToJSON: rowToJSON} } +func (r *RowIter) lockIO() func() { + if r.ioMu == nil { + return func() {} + } + r.ioMu.Lock() + return r.ioMu.Unlock +} + // Prime reads the first row (or iterator.Done) so rowIter.Metadata is populated. // The first row is buffered for the next Next call when present. func (r *RowIter) Prime() error { + unlock := r.lockIO() + defer unlock() + return r.primeUnlocked() +} + +func (r *RowIter) primeUnlocked() error { if r.primed || r.stopped { return nil } @@ -43,20 +59,26 @@ func (r *RowIter) Prime() error { return nil } -func (r *RowIter) Next() (any, bool) { - if r.redact || r.stopped { - return nil, false - } +func (r *RowIter) nextRow() (*spanner.Row, error) { if r.primedRow != nil { row := r.primedRow r.primedRow = nil - v, err := r.rowToJSON(row) - if err != nil { - return err, true - } - return v, true + return row, nil } - row, err := r.iter.Next() + return r.iter.Next() +} + +func (r *RowIter) Next() (any, bool) { + unlock := r.lockIO() + defer unlock() + return r.nextUnlocked() +} + +func (r *RowIter) nextUnlocked() (any, bool) { + if r.redact || r.stopped { + return nil, false + } + row, err := r.nextRow() if errors.Is(err, iterator.Done) { return nil, false } @@ -72,26 +94,39 @@ func (r *RowIter) Next() (any, bool) { // Drain exhausts the Spanner row iterator. When redact is false, drained rows are returned. func (r *RowIter) Drain() ([]any, error) { + unlock := r.lockIO() + defer unlock() + return r.drainUnlocked() +} + +func (r *RowIter) drainUnlocked() ([]any, error) { if r.stopped { return nil, nil } var rows []any for { - v, ok := r.Next() - if !ok { + row, err := r.nextRow() + if errors.Is(err, iterator.Done) { return rows, nil } - if err, isErr := v.(error); isErr { + if err != nil { return rows, err } if r.redact { continue } + v, err := r.rowToJSON(row) + if err != nil { + return rows, err + } rows = append(rows, v) } } func (r *RowIter) Stop() { + unlock := r.lockIO() + defer unlock() + if r.stopped { return } diff --git a/jqresult/rowjson.go b/jqresult/rowjson.go index e611ab0..3f6f542 100644 --- a/jqresult/rowjson.go +++ b/jqresult/rowjson.go @@ -1,14 +1,10 @@ package jqresult import ( - "bytes" - "encoding/json" "slices" "spheric.cloud/xiter" "cloud.google.com/go/spanner" - sppb "cloud.google.com/go/spanner/apiv1/spannerpb" - "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/structpb" ) @@ -19,20 +15,5 @@ func RowToListValue(r *spanner.Row) *structpb.ListValue { // RowToJSON encodes one row the same way as protojson on a single-row ResultSet (array-shaped row). func RowToJSON(r *spanner.Row) (any, error) { - rs := &sppb.ResultSet{Rows: []*structpb.ListValue{RowToListValue(r)}} - b, err := protojson.Marshal(rs) - if err != nil { - return nil, err - } - dec := json.NewDecoder(bytes.NewReader(b)) - dec.UseNumber() - var obj map[string]any - if err := dec.Decode(&obj); err != nil { - return nil, err - } - rows, ok := obj["rows"].([]any) - if !ok || len(rows) == 0 { - return nil, nil - } - return rows[0], nil + return RowToListValue(r).AsSlice(), nil } From d73025089a0e8628ea54ebd3d9fa1788850b7b37 Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 02:26:39 +0900 Subject: [PATCH 03/15] Validate lazy rowIter input and add emulator jq tests. Reject nil RowIterator in jqresult.Execute lazy mode and cover lazy filters with a real Spanner RowIterator in integration tests. Co-authored-by: Cursor --- integration_test.go | 68 +++++++++++++++++++++++++++++++++++++++ jqresult/pipeline.go | 3 ++ jqresult/pipeline_test.go | 31 ++++++++++++++++++ 3 files changed, 102 insertions(+) create mode 100644 jqresult/pipeline_test.go diff --git a/integration_test.go b/integration_test.go index 1e0265d..e5422f3 100644 --- a/integration_test.go +++ b/integration_test.go @@ -14,6 +14,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "spheric.cloud/xiter" + "github.com/apstndb/execspansql/jqresult" "github.com/apstndb/execspansql/params" ) @@ -286,4 +287,71 @@ func TestWithCloudSpannerEmulator(t *testing.T) { }) } }) + + t.Run("lazy jq with RowIterator", func(t *testing.T) { + runLazy := func(t *testing.T, filter string, redact bool, mode *sppb.ExecuteSqlRequest_QueryMode) []any { + t.Helper() + var opts spanner.QueryOptions + if mode != nil { + opts.Mode = mode + } + rowIter := client.Single().QueryWithOptions(ctx, + spanner.Statement{SQL: "SELECT SingerId FROM Singers ORDER BY SingerId LIMIT 3"}, + opts, + ) + code, err := jqresult.Compile(filter, jqresult.InputLazy) + if err != nil { + t.Fatal(err) + } + iter, cleanup, err := jqresult.Execute(code, jqresult.InputLazy, rowIter, nil, redact) + if err != nil { + t.Fatal(err) + } + defer cleanup() + var out []any + for { + v, ok := iter.Next() + if !ok { + return out + } + if err, isErr := v.(error); isErr { + t.Fatal(err) + } + out = append(out, v) + } + } + + rows := runLazy(t, ".rows[]", false, sppb.ExecuteSqlRequest_NORMAL.Enum()) + if len(rows) != 3 { + t.Fatalf("got %d row values, want 3", len(rows)) + } + + meta := runLazy(t, ".metadata.rowType.fields[0].name", false, sppb.ExecuteSqlRequest_NORMAL.Enum()) + if len(meta) != 1 || meta[0] != "SingerId" { + t.Fatalf("metadata field name: got %v", meta) + } + + stats := runLazy(t, ".stats.rowCount", false, sppb.ExecuteSqlRequest_PROFILE.Enum()) + if len(stats) != 1 { + t.Fatalf("stats rowCount: got %v", stats) + } + + combined := runLazy(t, "{rc: .stats.rowCount, ids: [.rows[] | .[0]]}", false, sppb.ExecuteSqlRequest_PROFILE.Enum()) + if len(combined) != 1 { + t.Fatalf("combined output: got %v", combined) + } + obj, ok := combined[0].(map[string]any) + if !ok { + t.Fatalf("combined type: %T", combined[0]) + } + ids, ok := obj["ids"].([]any) + if !ok || len(ids) != 3 { + t.Fatalf("combined ids: got %v", obj["ids"]) + } + + redacted := runLazy(t, ".stats.rowCount", true, sppb.ExecuteSqlRequest_PROFILE.Enum()) + if len(redacted) != 1 { + t.Fatalf("redacted stats: got %v", redacted) + } + }) } diff --git a/jqresult/pipeline.go b/jqresult/pipeline.go index bfb5fa0..59b595f 100644 --- a/jqresult/pipeline.go +++ b/jqresult/pipeline.go @@ -22,6 +22,9 @@ func Execute(code *gojq.Code, mode InputMode, rowIter *spanner.RowIterator, rs * } return code.Run(m), func() {}, nil case InputLazy: + if rowIter == nil { + return nil, func() {}, fmt.Errorf("lazy mode requires an unread RowIterator") + } lazy := NewLazy(rowIter, redactRows, StatsFuncFromProto()) return code.Run(lazy), lazy.Stop, nil default: diff --git a/jqresult/pipeline_test.go b/jqresult/pipeline_test.go new file mode 100644 index 0000000..becb613 --- /dev/null +++ b/jqresult/pipeline_test.go @@ -0,0 +1,31 @@ +package jqresult + +import ( + "testing" +) + +func TestExecuteLazyNilRowIter(t *testing.T) { + t.Parallel() + + code, err := Compile(".", InputLazy) + if err != nil { + t.Fatal(err) + } + _, _, err = Execute(code, InputLazy, nil, nil, false) + if err == nil { + t.Fatal("expected error for nil rowIter in lazy mode") + } +} + +func TestExecuteEagerNilResultSet(t *testing.T) { + t.Parallel() + + code, err := Compile(".", InputEager) + if err != nil { + t.Fatal(err) + } + _, _, err = Execute(code, InputEager, nil, nil, false) + if err == nil { + t.Fatal("expected error for nil ResultSet in eager mode") + } +} From 6c96b2d4f1b3b93930056f9cd9512c691e1ec19e Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 02:32:24 +0900 Subject: [PATCH 04/15] Assert lazy .rows[] emits nothing when rows are redacted. Co-authored-by: Cursor --- integration_test.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/integration_test.go b/integration_test.go index e5422f3..f56e27d 100644 --- a/integration_test.go +++ b/integration_test.go @@ -353,5 +353,10 @@ func TestWithCloudSpannerEmulator(t *testing.T) { if len(redacted) != 1 { t.Fatalf("redacted stats: got %v", redacted) } + + redactedRows := runLazy(t, ".rows[]", true, sppb.ExecuteSqlRequest_NORMAL.Enum()) + if len(redactedRows) != 0 { + t.Fatalf("redacted .rows[]: got %d values, want 0", len(redactedRows)) + } }) } From 02e1f74a54259c2f21645f3549a828a798485c7c Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 02:38:26 +0900 Subject: [PATCH 05/15] Defer lazy stats in JQValueEach for consistent enumeration. Expose stats via lazyStatsField so to_entries and similar filters observe the same values as direct .stats access. Co-authored-by: Cursor --- jqresult/jqresult_test.go | 35 +++++++++++++++++ jqresult/lazy.go | 83 +++++++++++++++++++++++++++++++++++---- 2 files changed, 111 insertions(+), 7 deletions(-) diff --git a/jqresult/jqresult_test.go b/jqresult/jqresult_test.go index 7a31f6e..e7ab51b 100644 --- a/jqresult/jqresult_test.go +++ b/jqresult/jqresult_test.go @@ -98,6 +98,41 @@ func TestLazyStopDoesNotDrain(t *testing.T) { } } +func TestLazyEachStatsDeferred(t *testing.T) { + t.Parallel() + l := &Lazy{ + metadata: map[string]any{"c": 1}, + metadataReady: true, + stats: map[string]any{"n": 2}, + drained: true, + } + q, err := gojq.Parse(".stats.n") + if err != nil { + t.Fatal(err) + } + iter := q.Run(l) + v, ok := iter.Next() + if !ok { + t.Fatal("no output") + } + if v != 2 { + t.Fatalf("got %v want 2", v) + } + + q2, err := gojq.Parse("to_entries | map(select(.key == \"stats\")) | .[0].value.n") + if err != nil { + t.Fatal(err) + } + iter2 := q2.Run(l) + v2, ok := iter2.Next() + if !ok { + t.Fatal("no output from to_entries") + } + if v2 != 2 { + t.Fatalf("to_entries stats: got %v want 2", v2) + } +} + func TestLazyStatsKey(t *testing.T) { t.Parallel() l := &Lazy{ diff --git a/jqresult/lazy.go b/jqresult/lazy.go index 7bba27b..a41ed34 100644 --- a/jqresult/lazy.go +++ b/jqresult/lazy.go @@ -223,21 +223,90 @@ func (l *Lazy) JQValueEach() any { l.mu.Lock() m := l.metadata drained := l.drained - statsVal := l.stats - l.mu.Unlock() - - rowsVal := l.rowsJQValue() - if statsVal == nil && !drained { - statsVal = nil + var statsVal any + if drained { + statsVal = l.stats + } else { + statsVal = &lazyStatsField{l: l} } + l.mu.Unlock() return []gojq.PathValue{ {Path: "metadata", Value: m}, - {Path: "rows", Value: rowsVal}, + {Path: "rows", Value: l.rowsJQValue()}, {Path: "stats", Value: statsVal}, } } +// lazyStatsField defers stats reads until jq accesses the stats field (including via object iteration). +type lazyStatsField struct { + l *Lazy +} + +func (f *lazyStatsField) JQValueType() string { return gojq.JQTypeObject } + +func (f *lazyStatsField) JQValueLength() any { return 0 } + +func (f *lazyStatsField) JQValueSliceLen() any { return 0 } + +func (f *lazyStatsField) JQValueIndex(int) any { return nil } + +func (f *lazyStatsField) JQValueSlice(int, int) any { return nil } + +func (f *lazyStatsField) JQValueKeys() any { + s, err := f.l.statsMap() + if err != nil { + return err + } + keys := make([]any, 0, len(s)) + for k := range s { + keys = append(keys, k) + } + return keys +} + +func (f *lazyStatsField) JQValueHas(key any) any { + s, err := f.l.statsMap() + if err != nil { + return err + } + k, _ := key.(string) + _, ok := s[k] + return ok +} + +func (f *lazyStatsField) JQValueToNumber() any { return nil } + +func (f *lazyStatsField) JQValueToString() any { return "" } + +func (f *lazyStatsField) JQValueToGoJQ() any { + s, err := f.l.statsMap() + if err != nil { + return err + } + return s +} + +func (f *lazyStatsField) JQValueKey(name string) any { + s, err := f.l.statsMap() + if err != nil { + return err + } + return s[name] +} + +func (f *lazyStatsField) JQValueEach() any { + s, err := f.l.statsMap() + if err != nil { + return err + } + pvs := make([]gojq.PathValue, 0, len(s)) + for k, v := range s { + pvs = append(pvs, gojq.PathValue{Path: k, Value: v}) + } + return pvs +} + // Stop releases the row iterator without draining unconsumed rows. func (l *Lazy) Stop() { l.rows.Stop() From af7b6565c76cb98388cddb151505373bdfbac1f1 Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 02:44:14 +0900 Subject: [PATCH 06/15] Fix lazy stats length and reject lazy mode for partitioned DML. lazyStatsField reports the real stats map length after drain, and partitioned DML returns a clear error instead of silently using eager input. Co-authored-by: Cursor --- jqresult/lazy.go | 12 ++++++++++-- main.go | 3 +++ 2 files changed, 13 insertions(+), 2 deletions(-) diff --git a/jqresult/lazy.go b/jqresult/lazy.go index a41ed34..f09558a 100644 --- a/jqresult/lazy.go +++ b/jqresult/lazy.go @@ -245,9 +245,17 @@ type lazyStatsField struct { func (f *lazyStatsField) JQValueType() string { return gojq.JQTypeObject } -func (f *lazyStatsField) JQValueLength() any { return 0 } +func (f *lazyStatsField) JQValueLength() any { + s, err := f.l.statsMap() + if err != nil { + return err + } + return len(s) +} -func (f *lazyStatsField) JQValueSliceLen() any { return 0 } +func (f *lazyStatsField) JQValueSliceLen() any { + return f.JQValueLength() +} func (f *lazyStatsField) JQValueIndex(int) any { return nil } diff --git a/main.go b/main.go index fbc872e..8ceee50 100644 --- a/main.go +++ b/main.go @@ -533,6 +533,9 @@ func runJqOutput( rowIter := client.Single().WithTimestampBound(mode.TimestampBound).QueryWithOptions(ctx, stmt, opts) return runJqOnRowIter(rowIter, o.RedactRows, jqMode, jqCode, enc) case partitionedDML: + if jqMode != jqresult.InputEager { + return fmt.Errorf("--jq-input-mode=lazy is not supported for partitioned DML") + } count, err := client.PartitionedUpdateWithOptions(ctx, stmt, opts) if err != nil { return err From 3f9b5bd4994729193467953748da369607140d4e Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 02:50:26 +0900 Subject: [PATCH 07/15] Use lazyRowsField so captured .rows survives stats drain. Object literals that store .rows before .stats now replay materialized rows after drain instead of a stopped RowIter. Co-authored-by: Cursor --- integration_test.go | 17 +++++++++++++++++ jqresult/lazy.go | 29 ++++++++++++++++++++++++++++- 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/integration_test.go b/integration_test.go index f56e27d..68180da 100644 --- a/integration_test.go +++ b/integration_test.go @@ -336,6 +336,23 @@ func TestWithCloudSpannerEmulator(t *testing.T) { t.Fatalf("stats rowCount: got %v", stats) } + objectRows := runLazy(t, "{rows: .rows, rc: .stats.rowCount}", false, sppb.ExecuteSqlRequest_PROFILE.Enum()) + if len(objectRows) != 1 { + t.Fatalf("object rows output: got %v", objectRows) + } + objRows, ok := objectRows[0].(map[string]any) + if !ok { + t.Fatalf("object rows type: %T", objectRows[0]) + } + normalized, err := jqresult.NormalizeForEncode(objRows["rows"]) + if err != nil { + t.Fatal(err) + } + rowSlice, ok := normalized.([]any) + if !ok || len(rowSlice) != 3 { + t.Fatalf("normalized rows after stats: got %v (%T)", normalized, normalized) + } + combined := runLazy(t, "{rc: .stats.rowCount, ids: [.rows[] | .[0]]}", false, sppb.ExecuteSqlRequest_PROFILE.Enum()) if len(combined) != 1 { t.Fatalf("combined output: got %v", combined) diff --git a/jqresult/lazy.go b/jqresult/lazy.go index f09558a..f635287 100644 --- a/jqresult/lazy.go +++ b/jqresult/lazy.go @@ -146,7 +146,7 @@ func (l *Lazy) rowsJQValue() any { if drained { return materializedRowsIter(materialized) } - return l.rows + return &lazyRowsField{l: l} } func (l *Lazy) JQValueType() string { return gojq.JQTypeObject } @@ -320,6 +320,33 @@ func (l *Lazy) Stop() { l.rows.Stop() } +// lazyRowsField streams rows until drain, then replays materializedRows. +type lazyRowsField struct { + l *Lazy + mat *materializedIter +} + +func (f *lazyRowsField) Next() (any, bool) { + f.l.mu.Lock() + drained := f.l.drained + redact := f.l.redact + f.l.mu.Unlock() + + if redact { + return nil, false + } + if drained { + if f.mat == nil { + f.l.mu.Lock() + rows := append([]any(nil), f.l.materializedRows...) + f.l.mu.Unlock() + f.mat = &materializedIter{rows: rows} + } + return f.mat.Next() + } + return f.l.rows.Next() +} + type materializedIter struct { rows []any i int From b273eab69ecdafc38d48b486d9f6416686ab4192 Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 02:55:20 +0900 Subject: [PATCH 08/15] Remove unreachable partitioned DML eager path in lazy branch. Partitioned DML jq output is already handled by the eager runJqOutput path via runInNewTransaction. Co-authored-by: Cursor --- main.go | 25 ++----------------------- 1 file changed, 2 insertions(+), 23 deletions(-) diff --git a/main.go b/main.go index 8ceee50..a142329 100644 --- a/main.go +++ b/main.go @@ -533,29 +533,8 @@ func runJqOutput( rowIter := client.Single().WithTimestampBound(mode.TimestampBound).QueryWithOptions(ctx, stmt, opts) return runJqOnRowIter(rowIter, o.RedactRows, jqMode, jqCode, enc) case partitionedDML: - if jqMode != jqresult.InputEager { - return fmt.Errorf("--jq-input-mode=lazy is not supported for partitioned DML") - } - count, err := client.PartitionedUpdateWithOptions(ctx, stmt, opts) - if err != nil { - return err - } - rs := &sppb.ResultSet{ - Metadata: &sppb.ResultSetMetadata{RowType: &sppb.StructType{}}, - Stats: &sppb.ResultSetStats{ - RowCount: &sppb.ResultSetStats_RowCountLowerBound{RowCountLowerBound: count}, - }, - } - enc, err := newEncoder(os.Stdout, o.Format, o.CompactOutput, o.JqRawOutput) - if err != nil { - return err - } - iter, cleanup, err := jqresult.Execute(jqCode, jqresult.InputEager, nil, rs, false) - if err != nil { - return err - } - defer cleanup() - return jqresult.Print(enc, iter) + // Eager mode is handled above via runInNewTransaction. + return fmt.Errorf("--jq-input-mode=lazy is not supported for partitioned DML") default: panic(fmt.Sprintf("unknown mode: %T", mode)) } From 1c9713f4f24537ea0e622a8ea918b61e9082d140 Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 03:03:56 +0900 Subject: [PATCH 09/15] Cache lazy rows during streaming and document duplicate .rows usage. Row reads append to shared materializedRows for replay after drain; README recommends [.rows[]] when duplicating rows in one object literal. Co-authored-by: Cursor --- README.md | 2 +- integration_test.go | 27 ++++++++ jqresult/jqresult_test.go | 34 ++++++++++ jqresult/lazy.go | 135 +++++++++++++++++++++++++++++++++++--- jqresult/normalize.go | 4 ++ 5 files changed, 191 insertions(+), 11 deletions(-) diff --git a/README.md b/README.md index fef99dd..0d58580 100644 --- a/README.md +++ b/README.md @@ -123,7 +123,7 @@ execspansql can process output using embedded [wader/gojq](https://github.com/wa | `eager` (default) | Full ResultSet object | `.`, `.stats.queryPlan` | | `lazy` | `JQValue` root (`metadata` / `rows` Iter / `stats`) | `.rows[]`, `.stats.queryPlan` | -In `lazy` mode, `metadata` is populated after the first row is read from Spanner (or after a zero-row result). Use `.rows[]` to stream rows; `.rows` alone is a jq array only after rows are materialized (for example after `.stats` drains them). Accessing `.stats` before `.rows` still allows later `.rows[]` to emit the drained rows. +In `lazy` mode, `metadata` is populated after the first row is read from Spanner (or after a zero-row result). Prefer `.rows[]` to stream rows. Bare `.rows` is a lazy iterator: reuse it in one object literal (for example `{a: .rows, b: .rows}`) may not duplicate rows because jq can evaluate the subexpression once; use `{a: [.rows[]], b: [.rows[]]}` when you need two row arrays. After `.stats` drains the iterator, captured `.rows` values replay from materialized rows. Output expands top-level `gojq.Iter` to one JSON/YAML document per row (JSONL-style). Nested `Iter` values inside objects are expanded to arrays on encode. diff --git a/integration_test.go b/integration_test.go index 68180da..92a2b10 100644 --- a/integration_test.go +++ b/integration_test.go @@ -336,6 +336,33 @@ func TestWithCloudSpannerEmulator(t *testing.T) { t.Fatalf("stats rowCount: got %v", stats) } + lens := runLazy(t, "{alen: (.rows|length), blen: (.rows|length)}", false, sppb.ExecuteSqlRequest_NORMAL.Enum()) + if len(lens) != 1 { + t.Fatalf("rows length output: got %v", lens) + } + lensObj, ok := lens[0].(map[string]any) + if !ok { + t.Fatalf("rows length type: %T", lens[0]) + } + if lensObj["alen"] != 3 || lensObj["blen"] != 3 { + t.Fatalf("rows lengths: got %#v", lensObj) + } + + dupRows := runLazy(t, "{a: [.rows[]], b: [.rows[]]}", false, sppb.ExecuteSqlRequest_NORMAL.Enum()) + if len(dupRows) != 1 { + t.Fatalf("duplicate rows output: got %v", dupRows) + } + dupObj, ok := dupRows[0].(map[string]any) + if !ok { + t.Fatalf("duplicate rows type: %T", dupRows[0]) + } + for _, key := range []string{"a", "b"} { + rowSlice, ok := dupObj[key].([]any) + if !ok || len(rowSlice) != 3 { + t.Fatalf("%s: got %v (%T)", key, dupObj[key], dupObj[key]) + } + } + objectRows := runLazy(t, "{rows: .rows, rc: .stats.rowCount}", false, sppb.ExecuteSqlRequest_PROFILE.Enum()) if len(objectRows) != 1 { t.Fatalf("object rows output: got %v", objectRows) diff --git a/jqresult/jqresult_test.go b/jqresult/jqresult_test.go index e7ab51b..8c8011b 100644 --- a/jqresult/jqresult_test.go +++ b/jqresult/jqresult_test.go @@ -98,6 +98,40 @@ func TestLazyStopDoesNotDrain(t *testing.T) { } } +func TestLazyDuplicateRowsCapture(t *testing.T) { + t.Parallel() + l := &Lazy{ + metadata: map[string]any{"c": 1}, + metadataReady: true, + materializedRows: []any{[]any{int64(1)}, []any{int64(2)}}, + rowsStreamDone: true, + rows: &RowIter{stopped: true}, + } + q, err := gojq.Parse("{a: .rows, b: .rows}") + if err != nil { + t.Fatal(err) + } + iter := q.Run(l) + v, ok := iter.Next() + if !ok { + t.Fatal("no output") + } + obj, ok := v.(map[string]any) + if !ok { + t.Fatalf("got %T", v) + } + for _, key := range []string{"a", "b"} { + n, err := NormalizeForEncode(obj[key]) + if err != nil { + t.Fatal(err) + } + rows, ok := n.([]any) + if !ok || len(rows) != 2 { + t.Fatalf("%s: got %v", key, n) + } + } +} + func TestLazyEachStatsDeferred(t *testing.T) { t.Parallel() l := &Lazy{ diff --git a/jqresult/lazy.go b/jqresult/lazy.go index f635287..29c7064 100644 --- a/jqresult/lazy.go +++ b/jqresult/lazy.go @@ -32,6 +32,7 @@ type Lazy struct { stats map[string]any rows *RowIter materializedRows []any + rowsStreamDone bool drained bool drainErr error @@ -56,6 +57,8 @@ func (l *Lazy) drain() error { l.mu.Unlock() return err } + alreadyStreamed := l.rowsStreamDone + cachedRows := append([]any(nil), l.materializedRows...) l.mu.Unlock() l.ioMu.Lock() @@ -68,7 +71,13 @@ func (l *Lazy) drain() error { } l.mu.Unlock() - materializedRows, drainErr := l.rows.drainUnlocked() + var materializedRows []any + var drainErr error + if alreadyStreamed { + materializedRows = cachedRows + } else { + materializedRows, drainErr = l.rows.drainUnlocked() + } var stats map[string]any if drainErr == nil && l.statsFn != nil { stats, drainErr = l.statsFn(l.rowIter) @@ -320,31 +329,137 @@ func (l *Lazy) Stop() { l.rows.Stop() } -// lazyRowsField streams rows until drain, then replays materializedRows. +// lazyRowsField streams rows and caches them so multiple captured .rows values can replay. type lazyRowsField struct { l *Lazy mat *materializedIter } +func (f *lazyRowsField) materializeSlice() (any, error) { + var out []any + for { + v, ok := f.Next() + if !ok { + if err, isErr := v.(error); isErr { + return nil, err + } + return out, nil + } + if err, isErr := v.(error); isErr { + return nil, err + } + out = append(out, v) + } +} + +func (f *lazyRowsField) JQValueType() string { return gojq.JQTypeArray } + +func (f *lazyRowsField) JQValueLength() any { + s, err := f.materializeSlice() + if err != nil { + return err + } + return len(s.([]any)) +} + +func (f *lazyRowsField) JQValueSliceLen() any { return f.JQValueLength() } + +func (f *lazyRowsField) JQValueIndex(i int) any { + s, err := f.materializeSlice() + if err != nil { + return err + } + rows := s.([]any) + if i < 0 || i >= len(rows) { + return nil + } + return rows[i] +} + +func (f *lazyRowsField) JQValueSlice(start, end int) any { + s, err := f.materializeSlice() + if err != nil { + return err + } + rows := s.([]any) + if start < 0 { + start = 0 + } + if end > len(rows) { + end = len(rows) + } + if start > end { + return []any{} + } + return rows[start:end] +} + +func (f *lazyRowsField) JQValueKeys() any { return nil } + +func (f *lazyRowsField) JQValueHas(any) any { return false } + +func (f *lazyRowsField) JQValueToNumber() any { return nil } + +func (f *lazyRowsField) JQValueToString() any { return "" } + +func (f *lazyRowsField) JQValueToGoJQ() any { + s, err := f.materializeSlice() + if err != nil { + return err + } + return s +} + +func (f *lazyRowsField) JQValueKey(string) any { return nil } + +func (f *lazyRowsField) JQValueEach() any { + s, err := f.materializeSlice() + if err != nil { + return err + } + rows := s.([]any) + pvs := make([]gojq.PathValue, len(rows)) + for i, v := range rows { + pvs[i] = gojq.PathValue{Path: i, Value: v} + } + return pvs +} + func (f *lazyRowsField) Next() (any, bool) { f.l.mu.Lock() drained := f.l.drained redact := f.l.redact - f.l.mu.Unlock() - - if redact { - return nil, false - } - if drained { + if redact || drained || f.l.rowsStreamDone { if f.mat == nil { - f.l.mu.Lock() rows := append([]any(nil), f.l.materializedRows...) f.l.mu.Unlock() f.mat = &materializedIter{rows: rows} + return f.mat.Next() } + f.l.mu.Unlock() return f.mat.Next() } - return f.l.rows.Next() + f.l.mu.Unlock() + + f.l.ioMu.Lock() + v, ok := f.l.rows.nextUnlocked() + f.l.ioMu.Unlock() + if !ok { + if err, isErr := v.(error); isErr { + return err, true + } + f.l.mu.Lock() + f.l.rowsStreamDone = true + f.l.mu.Unlock() + return nil, false + } + if err, isErr := v.(error); isErr { + return err, true + } + f.l.mu.Lock() + f.l.materializedRows = append(f.l.materializedRows, v) + f.l.mu.Unlock() + return v, true } type materializedIter struct { diff --git a/jqresult/normalize.go b/jqresult/normalize.go index 4886688..092c902 100644 --- a/jqresult/normalize.go +++ b/jqresult/normalize.go @@ -19,6 +19,10 @@ func NormalizeForEncode(v any) (any, error) { switch v := v.(type) { case gojq.Iter: return normalizeIter(v) + case *lazyRowsField: + return normalizeIter(v) + case *materializedIter: + return normalizeIter(v) case gojq.JQValue: return NormalizeForEncode(v.JQValueToGoJQ()) case []any: From b2c24ff5afe035dde6388890a7573c67587bfc98 Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 03:10:18 +0900 Subject: [PATCH 10/15] Append streamed rows when lazy drain completes stats. Preserve rows already read via .rows[] before .stats drains the iterator. Co-authored-by: Cursor --- jqresult/jqresult_test.go | 21 +++++++++++++++++++++ jqresult/lazy.go | 5 ++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/jqresult/jqresult_test.go b/jqresult/jqresult_test.go index 8c8011b..7a5c843 100644 --- a/jqresult/jqresult_test.go +++ b/jqresult/jqresult_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "testing" + "cloud.google.com/go/spanner" "github.com/wader/gojq" ) @@ -98,6 +99,26 @@ func TestLazyStopDoesNotDrain(t *testing.T) { } } +func TestDrainAppendsStreamedRows(t *testing.T) { + t.Parallel() + l := &Lazy{ + materializedRows: []any{[]any{int64(1)}}, + rows: &RowIter{stopped: true}, + statsFn: func(*spanner.RowIterator) (map[string]any, error) { + return map[string]any{"ok": true}, nil + }, + } + if err := l.drain(); err != nil { + t.Fatal(err) + } + if len(l.materializedRows) != 1 { + t.Fatalf("materialized rows: got %d want 1", len(l.materializedRows)) + } + if l.stats["ok"] != true { + t.Fatalf("stats: %v", l.stats) + } +} + func TestLazyDuplicateRowsCapture(t *testing.T) { t.Parallel() l := &Lazy{ diff --git a/jqresult/lazy.go b/jqresult/lazy.go index 29c7064..2f16cf9 100644 --- a/jqresult/lazy.go +++ b/jqresult/lazy.go @@ -76,7 +76,9 @@ func (l *Lazy) drain() error { if alreadyStreamed { materializedRows = cachedRows } else { - materializedRows, drainErr = l.rows.drainUnlocked() + var remaining []any + remaining, drainErr = l.rows.drainUnlocked() + materializedRows = append(cachedRows, remaining...) } var stats map[string]any if drainErr == nil && l.statsFn != nil { @@ -89,6 +91,7 @@ func (l *Lazy) drain() error { l.stats = stats l.drainErr = drainErr l.drained = true + l.rowsStreamDone = true l.mu.Unlock() l.rows.Stop() From a4faa6403ace156c4da0103b719988141a0690cb Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 03:15:04 +0900 Subject: [PATCH 11/15] Encode empty lazy row arrays as [] instead of null. Use non-nil empty slices in normalizeIter and after lazy drain so zero-row results match documented array shape. Co-authored-by: Cursor --- jqresult/jqresult_test.go | 15 +++++++++++++++ jqresult/lazy.go | 3 +++ jqresult/normalize.go | 2 +- 3 files changed, 19 insertions(+), 1 deletion(-) diff --git a/jqresult/jqresult_test.go b/jqresult/jqresult_test.go index 7a5c843..beb7144 100644 --- a/jqresult/jqresult_test.go +++ b/jqresult/jqresult_test.go @@ -22,6 +22,21 @@ func (s *stubIter) Next() (any, bool) { return v, true } +func TestNormalizeForEncodeEmptyIter(t *testing.T) { + t.Parallel() + got, err := NormalizeForEncode(gojq.NewIter[any]()) + if err != nil { + t.Fatal(err) + } + rows, ok := got.([]any) + if !ok { + t.Fatalf("got %T", got) + } + if rows == nil || len(rows) != 0 { + t.Fatalf("got %#v want non-nil empty slice", got) + } +} + func TestNormalizeForEncodeIter(t *testing.T) { t.Parallel() var it gojq.Iter = &stubIter{vals: []int{1, 2}} diff --git a/jqresult/lazy.go b/jqresult/lazy.go index 2f16cf9..89a4522 100644 --- a/jqresult/lazy.go +++ b/jqresult/lazy.go @@ -87,6 +87,9 @@ func (l *Lazy) drain() error { l.ioMu.Unlock() l.mu.Lock() + if materializedRows == nil { + materializedRows = []any{} + } l.materializedRows = materializedRows l.stats = stats l.drainErr = drainErr diff --git a/jqresult/normalize.go b/jqresult/normalize.go index 092c902..93450c3 100644 --- a/jqresult/normalize.go +++ b/jqresult/normalize.go @@ -54,7 +54,7 @@ func NormalizeForEncode(v any) (any, error) { } func normalizeIter(it gojq.Iter) ([]any, error) { - var out []any + out := make([]any, 0) for { x, ok := it.Next() if !ok { From e680968a9f27b019a5c1d07f3f5cdf3064b49c39 Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 03:24:53 +0900 Subject: [PATCH 12/15] Remove unreachable normalize cases caught by staticcheck. Co-authored-by: Cursor --- jqresult/normalize.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/jqresult/normalize.go b/jqresult/normalize.go index 93450c3..9ce11aa 100644 --- a/jqresult/normalize.go +++ b/jqresult/normalize.go @@ -19,10 +19,6 @@ func NormalizeForEncode(v any) (any, error) { switch v := v.(type) { case gojq.Iter: return normalizeIter(v) - case *lazyRowsField: - return normalizeIter(v) - case *materializedIter: - return normalizeIter(v) case gojq.JQValue: return NormalizeForEncode(v.JQValueToGoJQ()) case []any: From c1c7e9c74d68cf5ada0cf3eb40aadb0ba1c3962b Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 03:30:57 +0900 Subject: [PATCH 13/15] Implement lazyRowsField array keys/has and use after drain. Co-authored-by: Cursor --- jqresult/lazy.go | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/jqresult/lazy.go b/jqresult/lazy.go index 89a4522..fd1271b 100644 --- a/jqresult/lazy.go +++ b/jqresult/lazy.go @@ -158,8 +158,8 @@ func (l *Lazy) rowsJQValue() any { if redact { return gojq.NewIter[any]() } - if drained { - return materializedRowsIter(materialized) + if drained || l.rowsStreamDone { + return &lazyRowsField{l: l, mat: materializedRowsIter(materialized)} } return &lazyRowsField{l: l} } @@ -338,7 +338,7 @@ func (l *Lazy) Stop() { // lazyRowsField streams rows and caches them so multiple captured .rows values can replay. type lazyRowsField struct { l *Lazy - mat *materializedIter + mat gojq.Iter } func (f *lazyRowsField) materializeSlice() (any, error) { @@ -400,9 +400,31 @@ func (f *lazyRowsField) JQValueSlice(start, end int) any { return rows[start:end] } -func (f *lazyRowsField) JQValueKeys() any { return nil } +func (f *lazyRowsField) JQValueKeys() any { + s, err := f.materializeSlice() + if err != nil { + return err + } + rows := s.([]any) + keys := make([]any, len(rows)) + for i := range rows { + keys[i] = i + } + return keys +} -func (f *lazyRowsField) JQValueHas(any) any { return false } +func (f *lazyRowsField) JQValueHas(key any) any { + s, err := f.materializeSlice() + if err != nil { + return err + } + rows := s.([]any) + i, ok := key.(int) + if !ok { + return false + } + return i >= 0 && i < len(rows) +} func (f *lazyRowsField) JQValueToNumber() any { return nil } @@ -439,7 +461,7 @@ func (f *lazyRowsField) Next() (any, bool) { if f.mat == nil { rows := append([]any(nil), f.l.materializedRows...) f.l.mu.Unlock() - f.mat = &materializedIter{rows: rows} + f.mat = materializedRowsIter(rows) return f.mat.Next() } f.l.mu.Unlock() From cb47a07b8038397b4204270271e42133cf0d4bd8 Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 04:50:25 +0900 Subject: [PATCH 14/15] Fix lazy rows snapshot races and repeatable materialization. Read rowsStreamDone under mu, return lazyRowsField for redacted rows, and serve array ops from cached rows after streaming completes. Co-authored-by: Cursor --- jqresult/jqresult_test.go | 24 ++++++++++++++++++++++++ jqresult/lazy.go | 27 +++++++++++++++++++++++++-- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/jqresult/jqresult_test.go b/jqresult/jqresult_test.go index beb7144..88a4173 100644 --- a/jqresult/jqresult_test.go +++ b/jqresult/jqresult_test.go @@ -134,6 +134,30 @@ func TestDrainAppendsStreamedRows(t *testing.T) { } } +func TestLazyRowsFieldMaterializeTwice(t *testing.T) { + t.Parallel() + l := &Lazy{ + materializedRows: []any{[]any{int64(1)}, []any{int64(2)}}, + rowsStreamDone: true, + rows: &RowIter{stopped: true}, + } + f := &lazyRowsField{ + l: l, + mat: materializedRowsIter([]any{[]any{int64(1)}}), // partially consumed replay iter + } + s1, err := f.materializeSlice() + if err != nil { + t.Fatal(err) + } + s2, err := f.materializeSlice() + if err != nil { + t.Fatal(err) + } + if len(s1.([]any)) != 2 || len(s2.([]any)) != 2 { + t.Fatalf("got %v and %v", s1, s2) + } +} + func TestLazyDuplicateRowsCapture(t *testing.T) { t.Parallel() l := &Lazy{ diff --git a/jqresult/lazy.go b/jqresult/lazy.go index fd1271b..172913c 100644 --- a/jqresult/lazy.go +++ b/jqresult/lazy.go @@ -152,13 +152,14 @@ func (l *Lazy) rowsJQValue() any { l.mu.Lock() drained := l.drained redact := l.redact + streamDone := l.rowsStreamDone materialized := append([]any(nil), l.materializedRows...) l.mu.Unlock() if redact { - return gojq.NewIter[any]() + return &lazyRowsField{l: l, mat: materializedRowsIter([]any{})} } - if drained || l.rowsStreamDone { + if drained || streamDone { return &lazyRowsField{l: l, mat: materializedRowsIter(materialized)} } return &lazyRowsField{l: l} @@ -341,7 +342,29 @@ type lazyRowsField struct { mat gojq.Iter } +func (f *lazyRowsField) cachedRows() ([]any, error) { + f.l.mu.Lock() + defer f.l.mu.Unlock() + if f.l.redact { + return []any{}, nil + } + rows := append([]any(nil), f.l.materializedRows...) + if rows == nil { + rows = []any{} + } + return rows, nil +} + func (f *lazyRowsField) materializeSlice() (any, error) { + f.l.mu.Lock() + streamDone := f.l.rowsStreamDone + drained := f.l.drained + f.l.mu.Unlock() + + if streamDone || drained { + return f.cachedRows() + } + var out []any for { v, ok := f.Next() From 9b3639c75319a301a6de2381f115226f2d22d1df Mon Sep 17 00:00:00 2001 From: apstndb <803393+apstndb@users.noreply.github.com> Date: Sun, 31 May 2026 04:56:23 +0900 Subject: [PATCH 15/15] Align lazy row JSON with eager protojson and preserve .rows[] streaming. Restore per-row protojson decoding with UseNumber for numeric parity, and avoid materializing all rows in lazyRowsField.JQValueEach while streaming. Co-authored-by: Cursor --- jqresult/lazy.go | 16 ++++++++++++++-- jqresult/rowjson.go | 21 ++++++++++++++++++++- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/jqresult/lazy.go b/jqresult/lazy.go index 172913c..a4cb6d3 100644 --- a/jqresult/lazy.go +++ b/jqresult/lazy.go @@ -464,11 +464,23 @@ func (f *lazyRowsField) JQValueToGoJQ() any { func (f *lazyRowsField) JQValueKey(string) any { return nil } func (f *lazyRowsField) JQValueEach() any { - s, err := f.materializeSlice() + f.l.mu.Lock() + streamDone := f.l.rowsStreamDone + drained := f.l.drained + redact := f.l.redact + f.l.mu.Unlock() + + if redact { + return []gojq.PathValue{} + } + // While rows are still streaming, let jq iterate via Next() instead of materializing here. + if !streamDone && !drained { + return nil + } + rows, err := f.cachedRows() if err != nil { return err } - rows := s.([]any) pvs := make([]gojq.PathValue, len(rows)) for i, v := range rows { pvs[i] = gojq.PathValue{Path: i, Value: v} diff --git a/jqresult/rowjson.go b/jqresult/rowjson.go index 3f6f542..31d9107 100644 --- a/jqresult/rowjson.go +++ b/jqresult/rowjson.go @@ -1,10 +1,14 @@ package jqresult import ( + "bytes" + "encoding/json" "slices" "spheric.cloud/xiter" "cloud.google.com/go/spanner" + sppb "cloud.google.com/go/spanner/apiv1/spannerpb" + "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/types/known/structpb" ) @@ -15,5 +19,20 @@ func RowToListValue(r *spanner.Row) *structpb.ListValue { // RowToJSON encodes one row the same way as protojson on a single-row ResultSet (array-shaped row). func RowToJSON(r *spanner.Row) (any, error) { - return RowToListValue(r).AsSlice(), nil + rs := &sppb.ResultSet{Rows: []*structpb.ListValue{RowToListValue(r)}} + b, err := protojson.Marshal(rs) + if err != nil { + return nil, err + } + dec := json.NewDecoder(bytes.NewReader(b)) + dec.UseNumber() + var obj map[string]any + if err := dec.Decode(&obj); err != nil { + return nil, err + } + rows, ok := obj["rows"].([]any) + if !ok || len(rows) == 0 { + return []any{}, nil + } + return rows[0], nil }