diff --git a/src/compute/src/extensions/arrange.rs b/src/compute/src/extensions/arrange.rs index 56987ae2b4dfc..c9616c83a924c 100644 --- a/src/compute/src/extensions/arrange.rs +++ b/src/compute/src/extensions/arrange.rs @@ -8,7 +8,7 @@ // by the Apache License, Version 2.0. use std::collections::BTreeMap; -use std::rc::Rc; +use std::rc::{Rc, Weak}; use differential_dataflow::difference::Semigroup; use differential_dataflow::lattice::Lattice; @@ -272,14 +272,19 @@ where )); // Weak references to batches, so we can observe batches outside the trace. - let mut batches = BTreeMap::new(); + // Batches are immutable once sealed, so we compute their size exactly + // once (when first observed) and cache it alongside the weak reference. + // Subsequent activations only sum the cached values for live batches, + // avoiding a repeated walk of every batch's backing regions. + let mut batches: BTreeMap<*const B, (Weak, (usize, usize, usize))> = BTreeMap::new(); move |input, output| { input.for_each(|time, data| { - batches.extend( - data.iter() - .map(|batch| (Rc::as_ptr(batch), Rc::downgrade(batch))), - ); + for batch in data.iter() { + batches + .entry(Rc::as_ptr(batch)) + .or_insert_with(|| (Rc::downgrade(batch), logic(batch))); + } output.session(&time).give_container(data); }); let Some(trace) = trace.upgrade() else { @@ -287,13 +292,15 @@ where }; trace.borrow().trace().map_batches(|batch| { - batches.insert(Rc::as_ptr(batch), Rc::downgrade(batch)); + batches + .entry(Rc::as_ptr(batch)) + .or_insert_with(|| (Rc::downgrade(batch), logic(batch))); }); let (mut size, mut capacity, mut allocations) = (0, 0, 0); - batches.retain(|_, weak| { - if let Some(batch) = weak.upgrade() { - let (sz, c, a) = logic(&batch); + batches.retain(|_, (weak, cached)| { + if weak.strong_count() > 0 { + let (sz, c, a) = *cached; (size += sz, capacity += c, allocations += a); true } else {