Skip to content

[pull] main from MaterializeInc:main#1041

Merged
pull[bot] merged 7 commits into
transparencies:mainfrom
MaterializeInc:main
May 29, 2026
Merged

[pull] main from MaterializeInc:main#1041
pull[bot] merged 7 commits into
transparencies:mainfrom
MaterializeInc:main

Conversation

@pull
Copy link
Copy Markdown

@pull pull Bot commented May 29, 2026

See Commits and Changes for more details.


Created by pull[bot] (v2.0.0-alpha.4)

Can you help keep this open source service alive? 💖 Please sponsor : )

bosconi and others added 7 commits May 29, 2026 13:19
For included items with PR links see:
https://github.com/MaterializeInc/mz-skills/blob/main/data/mz-release-notes/v26.26.0/release_notes.md

For omitted items with PR links see:
https://github.com/MaterializeInc/mz-skills/blob/main/data/mz-release-notes/v26.26.0/omitted.md

**Borderline PRs omitted: 9** — [review borderline
calls](https://github.com/MaterializeInc/mz-skills/blob/main/data/mz-release-notes/v26.26.0/omitted.md#borderline)

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: val-materialize <valerie.chiang@materialize.com>
Co-authored-by: Pranshu Maheshwari <pranshu.maheshwari@materialize.com>
### Motivation

CLU-68: The `SingleTimeMonotonic` physical-monotonicity interpreter
propagated
the input's monotonicity through `mfp` and `flat_map` without consulting
the
MFP's predicates or the table function's properties. The MIR
monotonicity
analysis (`src/transform/src/analysis/monotonic.rs`) already accounts
for both,
so the LIR interpreter was inconsistent with it.

### Severity

The practical correctness impact is low, and this is best understood as
defense-in-depth / consistency rather than a live bug fix:

- This analysis only ever **relaxes** the safe default. Single-time
peeks set
`must_consolidate = true` on hierarchical reductions / TopK by default;
a
  positive physical-monotonicity result is what lets that be relaxed to
`false`. A spuriously monotonic result is therefore the only way to
cause
  trouble.
- The only LIR-reachable operator that can actually emit negative diffs
is
  `repeat_row` with a negative count, which requires the unsafe
  `enable_repeat_row` flag and is not meaningfully representable in a
  single-time peek (a consolidated `-1` row has no SQL meaning).
- `preserves_monotonicity()` also returns `false` for `AclExplode`,
  `MzAclExplode`, and `GuardSubquerySize`, but those classifications are
  conservative — they do not emit negative diffs.

So no known query produces wrong results today; the change keeps the LIR
interpreter honest and aligned with the MIR analysis.

### Description

Corrects `mfp` and `flat_map` in `SingleTimeMonotonic`:

1. **`flat_map`** — now requires `func.preserves_monotonicity()` (the
substantive check; e.g. `repeat_row` does not preserve it) in addition
to
   propagating the input status. Mirrors `MirRelationExpr::FlatMap`.

2. **Temporal-predicate checks** (`mfp`, and the post-MFP of `flat_map`)
— now
   reject MFPs containing temporal predicates (`mz_now()`). Note this is
*conservative*: an MFP cannot itself negate diffs, and temporal
predicates
only introduce retractions *across* timestamps, while a one-shot
dataflow
runs at a single time. We keep the check as defense-in-depth and to
mirror
   `MirRelationExpr::Filter`.

The code comments document this severity nuance inline.

### Verification

Added unit tests covering:
- `flat_map` with a monotonicity-preserving function
(`GenerateSeriesInt64`)
- `flat_map` with a non-preserving function (`RepeatRow`)
- `flat_map` with a temporal predicate in the post-MFP
- `mfp` with and without temporal predicates

https://claude.ai/code/session_013EEMmczcmK98tcSa9B9vvQ

---------

Co-authored-by: Claude <noreply@anthropic.com>
### Motivation

Builds on #36391 (now merged), which introduced `mz_ore::pager`.
Layers a typed `Column` API over the pager with an injected policy that
decides, per call, whether to keep resident, page out raw, or page out
lz4-compressed.
This is the seam between in-memory columnar buffers and out-of-core
storage envisioned in `doc/developer/design/20260504_pager.md`.

### Description

Adds `mz_ore::pager::pageout_with` for explicit-backend dispatch,
bypassing the global atomic so layered consumers can route per call
without racing other writers.

New `mz_timely_util::column_pager` module:

* `PagingPolicy` trait with `decide(PageHint) -> PageDecision` (`Skip` /
`Page { backend, codec }`) and `record(PageEvent)` for budget
bookkeeping and metrics.
* `PagedColumn` with three variants: `Resident(Column, ResidentTicket)`,
`Paged { handle, meta }`, and `Compressed { inner, meta }` (memory or
pager-backed framed bytes).
* `ResidentTicket` is a drop guard that fires
`PageEvent::ResidentReleased { bytes }` whether the caller calls
`ColumnPager::take` or drops the column without taking — so policy
budgets don't leak.
* `ColumnPager::page` drains via `ContainerBytes`. The
`Column::Align(Vec)` uncompressed path moves the body `Vec` directly
into the pager handle; the other variants serialize and then
`Column::clear` in place, retaining the caller's `Typed` allocation for
reuse instead of dropping it. The compressed path wraps the target in
`FrameEncoder` so `into_bytes` streams straight through lz4 — no
intermediate uncompressed `Vec`. The compressed File path pads to u64
alignment; the frame trailer self-delimits so no length prefix or unpad
is needed on read.

`TieredPolicy` in `column_pager::policy` is a single process-wide byte
budget.
Each `decide` reserves `len_bytes` from one `AtomicUsize` pool — success
keeps the column resident, failure spills it via the configured
`backend` + `codec` — and `record` credits the bytes back on
`ResidentReleased`.
Accounting is a single atomic rather than per-worker `thread_local!`
state because resident columns move freely between Timely workers, so a
release can run on a different thread than the grant; one pool credits
correctly regardless of where the drop lands.
A future thread-aware tiered layout would need a `SendColumn` wrapper
that pins the column to its origin thread, or an origin-keyed
cross-thread credit channel.

Criterion bench (`cargo bench -p mz-timely-util --bench column_pager`)
covers 4 KiB / 256 KiB / 4 MiB × Swap / File × raw / lz4 for both
round-trip and operator-loop shapes.
Swap-backend results are labelled `swap-warm` to flag that the bench
never builds enough working set to force kernel eviction; a follow-up
`column_pager_pressure` bench under `systemd-run --user --scope -p
MemoryMax=...` will exercise the cold path.

### Checklist

- [ ] This PR has adequate test coverage / QA is not needed.
- [ ] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
- [ ] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
- [ ] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
This PR optimizes the memory straegy for upsert v2 by swapping to
RowRowSpine for the key-value lookup arrangement, which is able to swap
to disk much more efficiently when memory usage goes over the normal
memory limits

---------

Co-authored-by: Patrick Butler <patrick.butler@materialize.com>
### Motivation

The merge-batcher's transient state (chains of sealed input chunks
awaiting merge / extract) sits between input ingestion and the spine.
Under memory pressure that transient peak is what trips OOMs, not the
spine itself — the spine has already consolidated.

This PR plugs `ColumnPager` (#36552) into the merge-batcher so those
chain entries can live on disk (or compressed in RAM) instead of
resident memory, while leaving the hot-path merge / extract logic and
on-wire chunk shape unchanged.

### Description

A new `ColumnMergeBatcher` in `mz-timely-util` forks the DD
merge-batcher framework so chain entries are `PagedColumn` (`Resident` /
`Paged` / `Compressed`) rather than resident `Column`s. Streaming
drivers `merge_chains` / `extract_chain` walk those chains via a
`FetchIter` that materializes one head per side on demand and hands
finished output back to the pager. The per-chunk merge / extract logic
itself is the same `Column::merge_from` / `Column::extract` already used
by the in-memory `ColumnMerger`.

Compute integration:

- New dyncfgs in `compute-types`: `enable_column_paged_batcher` (off by
default — controls whether the installed pager actually evicts) and
`column_paged_batcher_budget_fraction` (default 5% of replica memory;
floored at 128 MiB so the no-pressure case doesn't page per chunk).
Pager backend follows `--scratch-directory` availability the same way
`mz_ore::pager` already does (file when a scratch directory is
configured, swap otherwise).
- `apply_worker_config` derives `(enabled, total, backend)` from those
dyncfgs and routes through `column_pager::apply_tiered_config`, which
holds a process-wide `TieredPolicy` singleton and mutates its atomic
budget / backend / codec in place. In-flight `ResidentTicket`s keep
crediting the same atomic across reconfigures, so operator-driven tunes
(or other workers in the same process reapplying the same config) don't
orphan accounting onto a stale policy.
- Two arrange call sites swapped from `Col2ValBatcher` to
`Col2ValPagedBatcher`: `arrange_collection` in `render/context.rs` and
the `JoinStage` arrange in `render/join/linear_join.rs`. Logging
arranges stay on the legacy `ColumnMerger` path.
- New `Col2ValPagedBatcher` type alias and a `BuilderInput for
Column<((K, V), T, R)>` impl so the DD `OrdValBuilder` can consume the
batcher's `Column` output without a container conversion.
`RowRowColPagedBuilder` + a couple of small `PushInto` / `PartialEq`
impls in `row_spine.rs` get the Row-keyed paths type-checking.
- `Materialized` and `Clusterd` mzcompose services gain `memory_swap`
and `mem_swappiness` so feature-benchmarks can configure container swap
behavior independently of the batcher.

Observability (last commit on the stack): `column_pager::metrics`
registers a `PagerMetrics` struct with the process metrics registry.
Counters cover skip / pageout / pagein decisions and bytes through each
path; computed gauges expose `mz_column_pager_budget_remaining_bytes`
and `mz_column_pager_budget_configured_bytes` against the live
`TieredPolicy` atomics. Compute init wires the registration; the
timely-util module is registry-agnostic, so callers without a registry
(tests, benches, examples) just see no-op observers.

### Verification

- **Unit tests** (`mz-timely-util`, columnar + column_pager modules): 46
tests covering chunker correctness, per-chunk merge / extract, drain,
`ColumnMergeBatcher` end-to-end seal under `ColumnPager::disabled()` and
a `ForcePagePolicy` that forces every chunk through the pager. New
`TieredPolicy::reconfigure` tests cover in-flight ticket preservation
across pool resizes, shrink-saturates-at-zero, and live backend/codec
swap. Proptests cover merge / extract invariants.
- **Criterion microbench**
([`benches/columnar_merge_batcher.rs`](src/timely-util/benches/columnar_merge_batcher.rs)):
compares legacy `ColumnMerger` against the paged path with disabled /
swap / lz4 across mixed / collision / disjoint inputs and four
cache-tier sizes; prints a throughput summary table.
- **End-to-end example**
([`examples/column_paged_spill.rs`](src/timely-util/examples/column_paged_spill.rs)):
drives `arrange_core` over a cancellation workload (positives +
negatives at the same time so the spine stays empty and all pressure
lives in the batcher). Back-to-back baseline + spill modes with an
optional RSS sampler thread.
- **Feature-benchmark scenarios**:
- `DifferentialJoinColumnPaged` measures steady-state overhead vs.
`DifferentialJoin`.
- `DifferentialJoinHydrationBaseline` / `DifferentialJoinHydrationFile`
measure re-hydration time after `REPLICATION FACTOR 0 → 1` toggling. Run
under `--this-memory` + `--this-memory-swap` to compare user-space spill
against OS swap.

### Risk

**The batcher type swap at the two arrange sites is unconditional.**
`enable_column_paged_batcher` only controls whether the pager actually
evicts — with the flag off, chunks stay in `PagedColumn::Resident` and
behave like the legacy in-memory path, but the surrounding code is the
new `ColumnMergeBatcher` (chunker, chain layout, `merge_chains` driver,
`extract_chain` driver, `BatcherEvent` accounting). That code path is
exercised in every dataflow that hits `arrange_collection` or
`JoinStage` from day one of this merging, flag or no flag. If the new
batcher misbehaves, mitigation is a revert, not a flag flip.

**Whole-chunk passthrough is lost on the new path.** Legacy
`ColumnMerger::merge` does a two-probe passthrough that hands
disjoint-key chains directly to output with no per-record work.
`merge_chains` doesn't implement that yet (peeking endpoints on a paged
head would force materialization with no clean way to undo it).
Workloads with disjoint key ranges across chains — monotonic source
ingestion, partitioned upserts, time-windowed batches — regress to the
per-record merge path even with the feature flag off. The criterion
bench compares paged-vs-paged variants and doesn't cover this regression
vs. the legacy passthrough; a future change can restore passthrough by
gating it on `PagedColumn::Resident` heads or by carrying first/last
keys in the pager metadata.

Logging arranges still use the legacy `ColumnMerger` path. Bytes paged
to swap / file don't enter the `mz_arrangement_batcher_*_raw` RSS-shaped
accounting tables, matching how those tables already treat shipped
chunks.
Our `WillDistinct` transform pushes down permission to do .. thing that
will be masked by `Distinct`. Generalized, this is "feel free to change
record multiplicity *magnitudes*, as long as you do not change the
polarity (pos, neg, zero)". With that generalization, we can move the
information farther along, as well as introduce some more behaviors.

Concretely, the PR also adds `Top1` as a thing that can introduce this
permission.

Edit from @ggevay: Linking the slack discussion:
https://materializeinc.slack.com/archives/C08ACQNGSQK/p1742482715848509?thread_ts=1742482658.602089&cid=C08ACQNGSQK

### Motivation

<!--
Which of the following best describes the motivation behind this PR?

  * This PR fixes a recognized bug.

    [Ensure issue is linked somewhere.]

  * This PR adds a known-desirable feature.

    [Ensure issue is linked somewhere.]

  * This PR fixes a previously unreported bug.

    [Describe the bug in detail, as if you were filing a bug report.]

  * This PR adds a feature that has not yet been specified.

[Write a brief specification for the feature, including justification
for its inclusion in Materialize, as if you were writing the original
     feature specification.]

   * This PR refactors existing code.

[Describe what was wrong with the existing code, if it is not obvious.]
-->

### Tips for reviewer

<!--
Leave some tips for your reviewer, like:

    * The diff is much smaller if viewed with whitespace hidden.
    * [Some function/module/file] deserves extra attention.
* [Some function/module/file] is pure code movement and only needs a
skim.

Delete this section if no tips.
-->

### Checklist

- [ ] This PR has adequate test coverage / QA involvement has been duly
considered. ([trigger-ci for additional test/nightly
runs](https://trigger-ci.dev.materialize.com/))
- [ ] This PR has an associated up-to-date [design
doc](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/README.md),
is a design doc
([template](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/design/00000000_template.md)),
or is sufficiently small to not require a design.
  <!-- Reference the design in the description. -->
- [ ] If this PR evolves [an existing `$T ⇔ Proto$T`
mapping](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/command-and-response-binary-encoding.md)
(possibly in a backwards-incompatible way), then it is tagged with a
`T-proto` label.
- [ ] If this PR will require changes to cloud orchestration or tests,
there is a companion cloud PR to account for those changes that is
tagged with the release-blocker label
([example](MaterializeInc/cloud#5021)).
<!-- Ask in #team-cloud on Slack if you need help preparing the cloud
PR. -->
- [ ] If this PR includes major [user-facing behavior
changes](https://github.com/MaterializeInc/materialize/blob/main/doc/developer/guide-changes.md#what-changes-require-a-release-note),
I have pinged the relevant PM to schedule a changelog post.

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
@pull pull Bot locked and limited conversation to collaborators May 29, 2026
@pull pull Bot added the ⤵️ pull label May 29, 2026
@pull pull Bot merged commit b9af1db into transparencies:main May 29, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants