From 993c492f8140f96b569e233523d9a71dd8f6302d Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 11 Jun 2026 10:25:53 +0200 Subject: [PATCH] compute: cache per-batch heap size in ArrangementSize operator The arrangement-size logging operator recomputed every live batch's heap size on every activation, walking each batch's backing regions repeatedly even when nothing changed. Batches are immutable once sealed, so this work is redundant. Cache the computed (size, capacity, allocations) tuple alongside the weak reference when a batch is first observed, and only sum the cached values on subsequent activations. This drops per-activation cost from O(batches * columns * regions) to O(new batches * regions) plus an O(batches) sum, eliminating the repeated region walk that made the operator slow for large arrangements. Co-Authored-By: Claude Opus 4.8 --- src/compute/src/extensions/arrange.rs | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/src/compute/src/extensions/arrange.rs b/src/compute/src/extensions/arrange.rs index 56987ae2b4dfc..c9616c83a924c 100644 --- a/src/compute/src/extensions/arrange.rs +++ b/src/compute/src/extensions/arrange.rs @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0. use std::collections::BTreeMap; -use std::rc::Rc; +use std::rc::{Rc, Weak}; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; @@ -272,14 +272,19 @@ where )); // Weak references to batches, so we can observe batches outside the trace. - let mut batches = BTreeMap::new(); + // Batches are immutable once sealed, so we compute their size exactly + // once (when first observed) and cache it alongside the weak reference. + // Subsequent activations only sum the cached values for live batches, + // avoiding a repeated walk of every batch's backing regions. + let mut batches: BTreeMap<*const B, (Weak, (usize, usize, usize))> = BTreeMap::new(); move |input, output| { input.for_each(|time, data| { - batches.extend( - data.iter() - .map(|batch| (Rc::as_ptr(batch), Rc::downgrade(batch))), - ); + for batch in data.iter() { + batches + .entry(Rc::as_ptr(batch)) + .or_insert_with(|| (Rc::downgrade(batch), logic(batch))); + } output.session(&time).give_container(data); }); let Some(trace) = trace.upgrade() else { @@ -287,13 +292,15 @@ where }; trace.borrow().trace().map_batches(|batch| { - batches.insert(Rc::as_ptr(batch), Rc::downgrade(batch)); + batches + .entry(Rc::as_ptr(batch)) + .or_insert_with(|| (Rc::downgrade(batch), logic(batch))); }); let (mut size, mut capacity, mut allocations) = (0, 0, 0); - batches.retain(|_, weak| { - if let Some(batch) = weak.upgrade() { - let (sz, c, a) = logic(&batch); + batches.retain(|_, (weak, cached)| { + if weak.strong_count() > 0 { + let (sz, c, a) = *cached; (size += sz, capacity += c, allocations += a); true } else {