[pull] main from MaterializeInc:main#1041
Merged
Merged
Conversation
For included items with PR links see: https://github.com/MaterializeInc/mz-skills/blob/main/data/mz-release-notes/v26.26.0/release_notes.md For omitted items with PR links see: https://github.com/MaterializeInc/mz-skills/blob/main/data/mz-release-notes/v26.26.0/omitted.md **Borderline PRs omitted: 9** — [review borderline calls](https://github.com/MaterializeInc/mz-skills/blob/main/data/mz-release-notes/v26.26.0/omitted.md#borderline) --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: val-materialize <valerie.chiang@materialize.com> Co-authored-by: Pranshu Maheshwari <pranshu.maheshwari@materialize.com>
### Motivation CLU-68: The `SingleTimeMonotonic` physical-monotonicity interpreter propagated the input's monotonicity through `mfp` and `flat_map` without consulting the MFP's predicates or the table function's properties. The MIR monotonicity analysis (`src/transform/src/analysis/monotonic.rs`) already accounts for both, so the LIR interpreter was inconsistent with it. ### Severity The practical correctness impact is low, and this is best understood as defense-in-depth / consistency rather than a live bug fix: - This analysis only ever **relaxes** the safe default. Single-time peeks set `must_consolidate = true` on hierarchical reductions / TopK by default; a positive physical-monotonicity result is what lets that be relaxed to `false`. A spuriously monotonic result is therefore the only way to cause trouble. - The only LIR-reachable operator that can actually emit negative diffs is `repeat_row` with a negative count, which requires the unsafe `enable_repeat_row` flag and is not meaningfully representable in a single-time peek (a consolidated `-1` row has no SQL meaning). - `preserves_monotonicity()` also returns `false` for `AclExplode`, `MzAclExplode`, and `GuardSubquerySize`, but those classifications are conservative — they do not emit negative diffs. So no known query produces wrong results today; the change keeps the LIR interpreter honest and aligned with the MIR analysis. ### Description Corrects `mfp` and `flat_map` in `SingleTimeMonotonic`: 1. **`flat_map`** — now requires `func.preserves_monotonicity()` (the substantive check; e.g. `repeat_row` does not preserve it) in addition to propagating the input status. Mirrors `MirRelationExpr::FlatMap`. 2. **Temporal-predicate checks** (`mfp`, and the post-MFP of `flat_map`) — now reject MFPs containing temporal predicates (`mz_now()`). Note this is *conservative*: an MFP cannot itself negate diffs, and temporal predicates only introduce retractions *across* timestamps, while a one-shot dataflow runs at a single time. We keep the check as defense-in-depth and to mirror `MirRelationExpr::Filter`. The code comments document this severity nuance inline. ### Verification Added unit tests covering: - `flat_map` with a monotonicity-preserving function (`GenerateSeriesInt64`) - `flat_map` with a non-preserving function (`RepeatRow`) - `flat_map` with a temporal predicate in the post-MFP - `mfp` with and without temporal predicates https://claude.ai/code/session_013EEMmczcmK98tcSa9B9vvQ --------- Co-authored-by: Claude <noreply@anthropic.com>
### Motivation Builds on #36391 (now merged), which introduced `mz_ore::pager`. Layers a typed `Column` API over the pager with an injected policy that decides, per call, whether to keep resident, page out raw, or page out lz4-compressed. This is the seam between in-memory columnar buffers and out-of-core storage envisioned in `doc/developer/design/20260504_pager.md`. ### Description Adds `mz_ore::pager::pageout_with` for explicit-backend dispatch, bypassing the global atomic so layered consumers can route per call without racing other writers. New `mz_timely_util::column_pager` module: * `PagingPolicy` trait with `decide(PageHint) -> PageDecision` (`Skip` / `Page { backend, codec }`) and `record(PageEvent)` for budget bookkeeping and metrics. * `PagedColumn` with three variants: `Resident(Column, ResidentTicket)`, `Paged { handle, meta }`, and `Compressed { inner, meta }` (memory or pager-backed framed bytes). * `ResidentTicket` is a drop guard that fires `PageEvent::ResidentReleased { bytes }` whether the caller calls `ColumnPager::take` or drops the column without taking — so policy budgets don't leak. * `ColumnPager::page` drains via `ContainerBytes`. The `Column::Align(Vec)` uncompressed path moves the body `Vec` directly into the pager handle; the other variants serialize and then `Column::clear` in place, retaining the caller's `Typed` allocation for reuse instead of dropping it. The compressed path wraps the target in `FrameEncoder` so `into_bytes` streams straight through lz4 — no intermediate uncompressed `Vec`. The compressed File path pads to u64 alignment; the frame trailer self-delimits so no length prefix or unpad is needed on read. `TieredPolicy` in `column_pager::policy` is a single process-wide byte budget. Each `decide` reserves `len_bytes` from one `AtomicUsize` pool — success keeps the column resident, failure spills it via the configured `backend` + `codec` — and `record` credits the bytes back on `ResidentReleased`. Accounting is a single atomic rather than per-worker `thread_local!` state because resident columns move freely between Timely workers, so a release can run on a different thread than the grant; one pool credits correctly regardless of where the drop lands. A future thread-aware tiered layout would need a `SendColumn` wrapper that pins the column to its origin thread, or an origin-keyed cross-thread credit channel. Criterion bench (`cargo bench -p mz-timely-util --bench column_pager`) covers 4 KiB / 256 KiB / 4 MiB × Swap / File × raw / lz4 for both round-trip and operator-loop shapes. Swap-backend results are labelled `swap-warm` to flag that the bench never builds enough working set to force kernel eviction; a follow-up `column_pager_pressure` bench under `systemd-run --user --scope -p MemoryMax=...` will exercise the cold path. ### Checklist - [ ] This PR has adequate test coverage / QA is not needed. - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post. --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This PR optimizes the memory straegy for upsert v2 by swapping to RowRowSpine for the key-value lookup arrangement, which is able to swap to disk much more efficiently when memory usage goes over the normal memory limits --------- Co-authored-by: Patrick Butler <patrick.butler@materialize.com>
### Motivation The merge-batcher's transient state (chains of sealed input chunks awaiting merge / extract) sits between input ingestion and the spine. Under memory pressure that transient peak is what trips OOMs, not the spine itself — the spine has already consolidated. This PR plugs `ColumnPager` (#36552) into the merge-batcher so those chain entries can live on disk (or compressed in RAM) instead of resident memory, while leaving the hot-path merge / extract logic and on-wire chunk shape unchanged. ### Description A new `ColumnMergeBatcher` in `mz-timely-util` forks the DD merge-batcher framework so chain entries are `PagedColumn` (`Resident` / `Paged` / `Compressed`) rather than resident `Column`s. Streaming drivers `merge_chains` / `extract_chain` walk those chains via a `FetchIter` that materializes one head per side on demand and hands finished output back to the pager. The per-chunk merge / extract logic itself is the same `Column::merge_from` / `Column::extract` already used by the in-memory `ColumnMerger`. Compute integration: - New dyncfgs in `compute-types`: `enable_column_paged_batcher` (off by default — controls whether the installed pager actually evicts) and `column_paged_batcher_budget_fraction` (default 5% of replica memory; floored at 128 MiB so the no-pressure case doesn't page per chunk). Pager backend follows `--scratch-directory` availability the same way `mz_ore::pager` already does (file when a scratch directory is configured, swap otherwise). - `apply_worker_config` derives `(enabled, total, backend)` from those dyncfgs and routes through `column_pager::apply_tiered_config`, which holds a process-wide `TieredPolicy` singleton and mutates its atomic budget / backend / codec in place. In-flight `ResidentTicket`s keep crediting the same atomic across reconfigures, so operator-driven tunes (or other workers in the same process reapplying the same config) don't orphan accounting onto a stale policy. - Two arrange call sites swapped from `Col2ValBatcher` to `Col2ValPagedBatcher`: `arrange_collection` in `render/context.rs` and the `JoinStage` arrange in `render/join/linear_join.rs`. Logging arranges stay on the legacy `ColumnMerger` path. - New `Col2ValPagedBatcher` type alias and a `BuilderInput for Column<((K, V), T, R)>` impl so the DD `OrdValBuilder` can consume the batcher's `Column` output without a container conversion. `RowRowColPagedBuilder` + a couple of small `PushInto` / `PartialEq` impls in `row_spine.rs` get the Row-keyed paths type-checking. - `Materialized` and `Clusterd` mzcompose services gain `memory_swap` and `mem_swappiness` so feature-benchmarks can configure container swap behavior independently of the batcher. Observability (last commit on the stack): `column_pager::metrics` registers a `PagerMetrics` struct with the process metrics registry. Counters cover skip / pageout / pagein decisions and bytes through each path; computed gauges expose `mz_column_pager_budget_remaining_bytes` and `mz_column_pager_budget_configured_bytes` against the live `TieredPolicy` atomics. Compute init wires the registration; the timely-util module is registry-agnostic, so callers without a registry (tests, benches, examples) just see no-op observers. ### Verification - **Unit tests** (`mz-timely-util`, columnar + column_pager modules): 46 tests covering chunker correctness, per-chunk merge / extract, drain, `ColumnMergeBatcher` end-to-end seal under `ColumnPager::disabled()` and a `ForcePagePolicy` that forces every chunk through the pager. New `TieredPolicy::reconfigure` tests cover in-flight ticket preservation across pool resizes, shrink-saturates-at-zero, and live backend/codec swap. Proptests cover merge / extract invariants. - **Criterion microbench** ([`benches/columnar_merge_batcher.rs`](src/timely-util/benches/columnar_merge_batcher.rs)): compares legacy `ColumnMerger` against the paged path with disabled / swap / lz4 across mixed / collision / disjoint inputs and four cache-tier sizes; prints a throughput summary table. - **End-to-end example** ([`examples/column_paged_spill.rs`](src/timely-util/examples/column_paged_spill.rs)): drives `arrange_core` over a cancellation workload (positives + negatives at the same time so the spine stays empty and all pressure lives in the batcher). Back-to-back baseline + spill modes with an optional RSS sampler thread. - **Feature-benchmark scenarios**: - `DifferentialJoinColumnPaged` measures steady-state overhead vs. `DifferentialJoin`. - `DifferentialJoinHydrationBaseline` / `DifferentialJoinHydrationFile` measure re-hydration time after `REPLICATION FACTOR 0 → 1` toggling. Run under `--this-memory` + `--this-memory-swap` to compare user-space spill against OS swap. ### Risk **The batcher type swap at the two arrange sites is unconditional.** `enable_column_paged_batcher` only controls whether the pager actually evicts — with the flag off, chunks stay in `PagedColumn::Resident` and behave like the legacy in-memory path, but the surrounding code is the new `ColumnMergeBatcher` (chunker, chain layout, `merge_chains` driver, `extract_chain` driver, `BatcherEvent` accounting). That code path is exercised in every dataflow that hits `arrange_collection` or `JoinStage` from day one of this merging, flag or no flag. If the new batcher misbehaves, mitigation is a revert, not a flag flip. **Whole-chunk passthrough is lost on the new path.** Legacy `ColumnMerger::merge` does a two-probe passthrough that hands disjoint-key chains directly to output with no per-record work. `merge_chains` doesn't implement that yet (peeking endpoints on a paged head would force materialization with no clean way to undo it). Workloads with disjoint key ranges across chains — monotonic source ingestion, partitioned upserts, time-windowed batches — regress to the per-record merge path even with the feature flag off. The criterion bench compares paged-vs-paged variants and doesn't cover this regression vs. the legacy passthrough; a future change can restore passthrough by gating it on `PagedColumn::Resident` heads or by carrying first/last keys in the pager metadata. Logging arranges still use the legacy `ColumnMerger` path. Bytes paged to swap / file don't enter the `mz_arrangement_batcher_*_raw` RSS-shaped accounting tables, matching how those tables already treat shipped chunks.
Our `WillDistinct` transform pushes down permission to do .. thing that will be masked by `Distinct`. Generalized, this is "feel free to change record multiplicity *magnitudes*, as long as you do not change the polarity (pos, neg, zero)". With that generalization, we can move the information farther along, as well as introduce some more behaviors. Concretely, the PR also adds `Top1` as a thing that can introduce this permission. Edit from @ggevay: Linking the slack discussion: https://materializeinc.slack.com/archives/C08ACQNGSQK/p1742482715848509?thread_ts=1742482658.602089&cid=C08ACQNGSQK ### Motivation <!-- Which of the following best describes the motivation behind this PR? * This PR fixes a recognized bug. [Ensure issue is linked somewhere.] * This PR adds a known-desirable feature. [Ensure issue is linked somewhere.] * This PR fixes a previously unreported bug. [Describe the bug in detail, as if you were filing a bug report.] * This PR adds a feature that has not yet been specified. [Write a brief specification for the feature, including justification for its inclusion in Materialize, as if you were writing the original feature specification.] * This PR refactors existing code. [Describe what was wrong with the existing code, if it is not obvious.] --> ### Tips for reviewer <!-- Leave some tips for your reviewer, like: * The diff is much smaller if viewed with whitespace hidden. * [Some function/module/file] deserves extra attention. * [Some function/module/file] is pure code movement and only needs a skim. Delete this section if no tips. --> ### Checklist - [ ] This PR has adequate test coverage / QA involvement has been duly considered. ([trigger-ci for additional test/nightly runs](https://trigger-ci.dev.materialize.com/)) - [ ] This PR has an associated up-to-date [design doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md), is a design doc ([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)), or is sufficiently small to not require a design. <!-- Reference the design in the description. --> - [ ] If this PR evolves [an existing `$T ⇔ Proto$T` mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md) (possibly in a backwards-incompatible way), then it is tagged with a `T-proto` label. - [ ] If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label ([example](MaterializeInc/cloud#5021)). <!-- Ask in #team-cloud on Slack if you need help preparing the cloud PR. --> - [ ] If this PR includes major [user-facing behavior changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note), I have pinged the relevant PM to schedule a changelog post. --------- Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to subscribe to this conversation on GitHub.
Already have an account?
Sign in.
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
See Commits and Changes for more details.
Created by
pull[bot] (v2.0.0-alpha.4)
Can you help keep this open source service alive? 💖 Please sponsor : )