diff --git a/crates/op-rbuilder/src/args/op.rs b/crates/op-rbuilder/src/args/op.rs index 8138d75f1..34d95c881 100644 --- a/crates/op-rbuilder/src/args/op.rs +++ b/crates/op-rbuilder/src/args/op.rs @@ -233,6 +233,16 @@ pub struct FlashblocksArgs { default_value = "256" )] pub ws_subscriber_limit: Option, + + /// Enable continuous building between scheduler triggers. + /// When enabled, the builder runs continuously on a blocking thread and + /// scheduler triggers interrupt the build to seal and publish. + #[arg( + long = "flashblocks.continuous-build", + env = "FLASHBLOCKS_CONTINUOUS_BUILD", + default_value = "false" + )] + pub flashblocks_continuous_build: bool, } impl Default for FlashblocksArgs { diff --git a/crates/op-rbuilder/src/builder/best_txs.rs b/crates/op-rbuilder/src/builder/best_txs.rs index be07a70cd..a7580aeea 100644 --- a/crates/op-rbuilder/src/builder/best_txs.rs +++ b/crates/op-rbuilder/src/builder/best_txs.rs @@ -13,7 +13,7 @@ use crate::tx::MaybeFlashblockFilter; /// - Reverted txs aren't re-simulated (would waste work). /// /// A fresh instance is created per block, so exclusions do not leak between blocks. -#[derive(Debug, Default)] +#[derive(Debug, Default, Clone)] pub(super) struct FlashblockTxTracker { /// Transactions already committed to state. committed: HashSet, diff --git a/crates/op-rbuilder/src/builder/builder_tx.rs b/crates/op-rbuilder/src/builder/builder_tx.rs index e56806d2e..082b4f4a5 100644 --- a/crates/op-rbuilder/src/builder/builder_tx.rs +++ b/crates/op-rbuilder/src/builder/builder_tx.rs @@ -538,7 +538,7 @@ pub fn get_balance( /// Adjust batch gas/DA/uncompressed limits to reserve capacity for bottom-of-block builder txs. /// Returns the adjusted `max_uncompressed_block_size`. -pub(super) fn reserve_builder_tx_budget( +pub(crate) fn reserve_builder_tx_budget( builder_txs: &[BuilderTransactionCtx], target_gas: &mut u64, target_da: &mut Option, diff --git a/crates/op-rbuilder/src/builder/config.rs b/crates/op-rbuilder/src/builder/config.rs index 9c943ce57..249cf5ccb 100644 --- a/crates/op-rbuilder/src/builder/config.rs +++ b/crates/op-rbuilder/src/builder/config.rs @@ -60,6 +60,9 @@ pub struct FlashblocksConfig { /// Maximum number of concurrent WebSocket subscribers pub ws_subscriber_limit: Option, + + /// Enable continuous building between scheduler triggers. + pub continuous_build: bool, } impl Default for FlashblocksConfig { @@ -79,6 +82,7 @@ impl Default for FlashblocksConfig { p2p_known_peers: None, p2p_max_peer_count: 50, ws_subscriber_limit: None, + continuous_build: false, } } } @@ -123,6 +127,7 @@ impl TryFrom for FlashblocksConfig { p2p_known_peers: args.flashblocks.p2p.p2p_known_peers, p2p_max_peer_count: args.flashblocks.p2p.p2p_max_peer_count, ws_subscriber_limit: args.flashblocks.ws_subscriber_limit, + continuous_build: args.flashblocks.flashblocks_continuous_build, }) } } diff --git a/crates/op-rbuilder/src/builder/context.rs b/crates/op-rbuilder/src/builder/context.rs index 887eef0bf..2f289d9b8 100644 --- a/crates/op-rbuilder/src/builder/context.rs +++ b/crates/op-rbuilder/src/builder/context.rs @@ -47,7 +47,7 @@ use crate::{ /// `BuilderConfig`) or is itself an `Arc`/handle whose backing data lives /// outside the per-job context (e.g. metrics registry, global backrun pool, /// per-address gas limiter buckets). -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct OpPayloadBuilderCtx { /// EVM configuration used to build a per-job [`OpBlockEvmFactory`]. pub evm_config: OpEvmConfig, @@ -81,7 +81,7 @@ pub struct OpPayloadBuilderCtx { /// Container type that holds all necessities to build a new payload. /// This struct is constructed once per payload job. -#[derive(derive_more::Constructor, derive_more::Deref)] +#[derive(Clone, derive_more::Constructor, derive_more::Deref)] #[allow(clippy::too_many_arguments)] pub struct OpPayloadJobCtx { /// Builder-lifetime configuration shared with all other in-flight jobs. @@ -101,10 +101,14 @@ pub struct OpPayloadJobCtx { backrun_pool: Option, /// Per-build guard onto the canonical [`AddressLimiter`] held by /// `builder_ctx`. Charges accumulate privately here and auto-commit back - /// into the canonical when this ctx is dropped. Shadows the + /// into the canonical when the last [`Arc`] is dropped. Shadows the /// `address_limiter` field reached via `Deref` to `OpPayloadBuilderCtx`, /// so `self.address_limiter` inside this ctx always hits the guard. - address_limiter: AddressLimiterGuard, + /// + /// Wrapped in [`Arc`] so the continuous build path can clone this ctx + /// across flashblock intervals while every clone still drives a single + /// shared guard. The guard auto-commits when the last clone is dropped. + address_limiter: Arc, } impl OpPayloadJobCtx { @@ -135,10 +139,22 @@ impl OpPayloadJobCtx { } /// Returns the builder attributes. - pub(super) const fn attributes(&self) -> &OpPayloadBuilderAttributes { + pub(crate) const fn attributes(&self) -> &OpPayloadBuilderAttributes { &self.config.attributes } + /// Returns this job's cancellation handle. + pub(crate) fn cancel(&self) -> &FlashblockJobCancellation { + &self.cancel + } + + /// Returns this job's per-build [`AddressLimiterGuard`]. All clones of this + /// ctx share the same guard via [`Arc`]; the canonical commit fires when + /// the last clone drops. + pub(crate) fn address_limiter(&self) -> &AddressLimiterGuard { + &self.address_limiter + } + /// Returns the block gas limit to target. pub fn block_gas_limit(&self) -> u64 { match self.gas_limit_config.gas_limit() { diff --git a/crates/op-rbuilder/src/builder/continuous/candidate_loop.rs b/crates/op-rbuilder/src/builder/continuous/candidate_loop.rs new file mode 100644 index 000000000..bec673900 --- /dev/null +++ b/crates/op-rbuilder/src/builder/continuous/candidate_loop.rs @@ -0,0 +1,473 @@ +use super::{ + shared_best::SharedBest, + types::{BestCandidate, CandidateLoopResult}, +}; +use crate::{ + builder::{ + best_txs::{FlashblockPoolTxCursor, FlashblockTxTracker}, + builder_tx::{BuilderTransactions, reserve_builder_tx_budget}, + context::OpPayloadJobCtx, + payload::{FlashblocksState, OpPayloadBuilder}, + state_root::StateRootCalculator, + }, + primitives::reth::ExecutionInfo, + traits::{ClientBounds, PoolBounds}, +}; +use alloy_primitives::U256; +use eyre::WrapErr as _; +use op_alloy_rpc_types_engine::OpFlashblockPayload; +use reth_optimism_node::OpBuiltPayload; +use reth_payload_util::BestPayloadTransactions; +use reth_provider::{ + HashedPostStateProvider, ProviderError, StateRootProvider, StorageRootProvider, +}; +use reth_revm::State; +use revm::Database; +use std::{ + sync::atomic::Ordering, + time::{Duration, Instant}, +}; +use tokio_util::sync::CancellationToken; +use tracing::{error, field, info, metadata::Level, span, warn}; + +// === Build / candidate loop internals ======================= +// +// Runs inside the spawned blocking task. The candidate loop repeatedly +// clones state, executes txs, seals a flashblock, and writes the new +// best into [`SharedBest`] until its per-interval cancel fires. + +impl OpPayloadBuilder +where + Pool: PoolBounds + 'static, + Client: ClientBounds + 'static, + BuilderTx: BuilderTransactions + Send + Sync + 'static, +{ + fn build_empty_flashblock_candidate< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + ctx: &OpPayloadJobCtx, + fb_state: &mut FlashblocksState, + info: &mut ExecutionInfo, + state: &mut State, + state_provider: impl reth::providers::StateProvider + Clone, + state_root_calc: &mut StateRootCalculator, + ) -> eyre::Result<(FlashblocksState, OpBuiltPayload, OpFlashblockPayload)> { + if let Err(e) = self.builder_tx().add_builder_txs( + &state_provider, + info, + &ctx.builder_tx_env(), + state, + false, + fb_state.is_first_flashblock(), + fb_state.is_last_flashblock(), + ) { + error!( + target: "payload_builder", + "Error adding bottom builder txs to empty flashblock candidate: {}", + e + ); + } + + let (new_payload, mut fb_payload) = ctx + .block_assembly_input() + .wrap_err("failed to construct block assembly input")? + .assemble( + state, + Some(fb_state), + info, + state_root_calc, + ctx.metrics.clone(), + ctx.enable_tx_tracking_debug_logs, + ) + .wrap_err("failed to build empty flashblock candidate")?; + + fb_payload.index = fb_state.flashblock_index(); + fb_payload.base = None; + + let next_target_da = fb_state.target_da_for_batch(); + let next_target_da_footprint = fb_state.target_da_footprint_for_batch(); + Ok(( + fb_state.next_after_seal(next_target_da, next_target_da_footprint), + new_payload, + fb_payload, + )) + } + + #[expect(clippy::too_many_arguments)] + pub(super) fn build_continuous_flashblock< + DB: Database + std::fmt::Debug + AsRef

, + P: StateRootProvider + HashedPostStateProvider + StorageRootProvider, + >( + &self, + ctx: &OpPayloadJobCtx, + fb_state: &mut FlashblocksState, + info: &mut ExecutionInfo, + state: &mut State, + state_provider: impl reth::providers::StateProvider + Clone, + tx_tracker: &mut FlashblockTxTracker, + state_root_calc: &mut StateRootCalculator, + block_cancel: &CancellationToken, + shared_best: &SharedBest, + ) -> eyre::Result { + let flashblock_index = fb_state.flashblock_index(); + let mut target_gas_for_batch = fb_state.target_gas_for_batch(); + let mut target_da_for_batch = fb_state.target_da_for_batch(); + let mut target_da_footprint_for_batch = fb_state.target_da_footprint_for_batch(); + + info!( + target: "payload_builder", + block_number = ctx.block_number(), + flashblock_index, + target_gas = target_gas_for_batch, + gas_used = info.cumulative_gas_used, + target_da = target_da_for_batch, + da_used = info.cumulative_da_bytes_used, + block_gas_used = ctx.block_gas_limit(), + target_da_footprint = target_da_footprint_for_batch, + "continuous: starting candidate loop", + ); + + let builder_txs = self + .builder_tx() + .add_builder_txs( + &state_provider, + info, + &ctx.builder_tx_env(), + state, + true, + fb_state.is_first_flashblock(), + fb_state.is_last_flashblock(), + ) + .inspect_err( + |e| error!(target: "payload_builder", "Error simulating builder txs: {}", e), + ) + .unwrap_or_default(); + + let max_uncompressed_block_size = reserve_builder_tx_budget( + &builder_txs, + &mut target_gas_for_batch, + &mut target_da_for_batch, + &mut target_da_footprint_for_batch, + info.da_footprint_scalar, + ctx.max_uncompressed_block_size, + info.cumulative_uncompressed_bytes, + ); + + // Each candidate must start with the same base state. + let base_cache = state.cache.clone(); + let base_transition = state.transition_state.clone(); + let base_info = info.clone(); + let base_fb_state = fb_state.clone(); + let base_tx_tracker = tx_tracker.clone(); + let base_limiter_snapshot = ctx.address_limiter().snapshot_pending(); + let base_state_root_calc = state_root_calc.clone(); + + let mut best: Option = None; + let mut candidates_evaluated: u64 = 0; + let mut candidates_improved: u64 = 0; + // Force one pool-backed candidate after the empty baseline so txs + // already pending at interval start are considered without waiting for + // a fresh pool event. + let mut last_seen_pool_change_epoch = self + .pool_change_epoch() + .load(Ordering::Relaxed) + .wrapping_sub(1); + let mut idle_backoff = Duration::from_millis(1); + let max_idle_backoff = Duration::from_millis(25); + + ctx.address_limiter() + .restore_pending(&base_limiter_snapshot); + let mut empty_candidate_info = base_info.clone(); + let mut empty_candidate_fb_state = base_fb_state.clone(); + let mut empty_candidate_state_root_calc = base_state_root_calc.clone(); + let empty_build_start = Instant::now(); + let empty_candidate = self.build_empty_flashblock_candidate( + ctx, + &mut empty_candidate_fb_state, + &mut empty_candidate_info, + state, + &state_provider, + &mut empty_candidate_state_root_calc, + ); + let empty_build_duration = empty_build_start.elapsed(); + candidates_evaluated += 1; + + // Baseline fees for the fee-improvement metric: whatever the first + // successful candidate produced. Subsequent candidates measure their + // gain against this. + let mut first_candidate_fees: Option = None; + + match empty_candidate { + Ok((next_flashblock_state, new_payload, fb_payload)) => { + let empty_fees = empty_candidate_info.total_fees; + let candidate = BestCandidate { + total_fees: empty_fees, + cache: state.cache.clone(), + transition: state.transition_state.clone(), + info: empty_candidate_info, + fb_state: empty_candidate_fb_state, + tx_tracker: base_tx_tracker.clone(), + result: (next_flashblock_state, new_payload, fb_payload), + build_duration: empty_build_duration, + limiter_snapshot: ctx.address_limiter().snapshot_pending(), + candidates_evaluated, + candidates_improved: candidates_improved + 1, + state_root_calc: empty_candidate_state_root_calc, + }; + shared_best.store(candidate.clone()); + best = Some(candidate); + first_candidate_fees = Some(empty_fees); + candidates_improved += 1; + } + Err(err) => { + ctx.metrics.invalid_built_blocks_count.increment(1); + warn!( + target: "payload_builder", + ?err, + "Empty flashblock candidate seal failed, continuing" + ); + } + } + + loop { + if ctx.cancel().is_cancelled() || block_cancel.is_cancelled() { + break; + } + + // Pool-change gating: if the pool hasn't changed since our last + // candidate AND we already have a best, back off to avoid burning + // CPU cloning and re-simulating identical state. The fresh load + // after the sleep catches any pool activity that arrived during + // the backoff and lets us react on the next iteration. + let current_epoch = self.pool_change_epoch().load(Ordering::Relaxed); + if best.is_some() && current_epoch == last_seen_pool_change_epoch { + std::thread::sleep(idle_backoff); + idle_backoff = (idle_backoff * 2).min(max_idle_backoff); + continue; + } + last_seen_pool_change_epoch = current_epoch; + idle_backoff = Duration::from_millis(1); + + let candidate_span = span!( + Level::INFO, + "candidate", + candidate_index = candidates_evaluated, + gas_used = field::Empty, + total_fees_wei = field::Empty, + is_best = field::Empty, + ); + let _candidate_guard = candidate_span.enter(); + + // Per-candidate build timing so a winning candidate can carry its + // own single-build duration. This is what gets recorded on + // `flashblock_build_duration` at publish time, keeping that + // metric comparable with the non-continuous path. + let candidate_build_start = Instant::now(); + + state.cache = base_cache.clone(); + state.transition_state = base_transition.clone(); + ctx.address_limiter() + .restore_pending(&base_limiter_snapshot); + let mut sim_info = base_info.clone(); + let mut sim_fb_state = base_fb_state.clone(); + let mut sim_tx_tracker = base_tx_tracker.clone(); + let mut sim_state_root_calc = base_state_root_calc.clone(); + + let mut best_txs = FlashblockPoolTxCursor::new(&mut sim_tx_tracker); + let best_txs_start_time = Instant::now(); + best_txs.refresh_iterator( + BestPayloadTransactions::new( + self.pool() + .best_transactions_with_attributes(ctx.best_transaction_attributes()), + ), + flashblock_index, + ); + let transaction_pool_fetch_time = best_txs_start_time.elapsed(); + ctx.metrics + .transaction_pool_fetch_duration + .record(transaction_pool_fetch_time); + ctx.metrics + .transaction_pool_fetch_gauge + .set(transaction_pool_fetch_time); + + let exec_cancelled = ctx + .execute_best_transactions( + &mut sim_info, + state, + &mut best_txs, + target_gas_for_batch.min(ctx.block_gas_limit()), + target_da_for_batch, + target_da_footprint_for_batch, + max_uncompressed_block_size, + sim_fb_state.flashblock_index(), + ) + .wrap_err("failed to execute best transactions")? + .is_some(); + + let new_transactions: Vec<_> = sim_fb_state + .slice_new_transactions(&sim_info.executed_transactions) + .iter() + .map(|tx| tx.tx_hash()) + .collect::>(); + best_txs.mark_committed(new_transactions); + drop(best_txs); + + if exec_cancelled || ctx.cancel().is_cancelled() || block_cancel.is_cancelled() { + break; + } + + if let Err(e) = self.builder_tx().add_builder_txs( + &state_provider, + &mut sim_info, + &ctx.builder_tx_env(), + state, + false, + sim_fb_state.is_first_flashblock(), + sim_fb_state.is_last_flashblock(), + ) { + error!(target: "payload_builder", "Error adding bottom builder txs: {}", e); + } + + if ctx.cancel().is_cancelled() || block_cancel.is_cancelled() { + break; + } + + let total_block_built_start = Instant::now(); + let build_result = ctx + .block_assembly_input() + .map_err(|e| eyre::eyre!("failed to construct block assembly input: {e}")) + .and_then(|input| { + input + .assemble( + state, + Some(&mut sim_fb_state), + &mut sim_info, + &mut sim_state_root_calc, + ctx.metrics.clone(), + ctx.enable_tx_tracking_debug_logs, + ) + .map_err(|e| eyre::eyre!("failed to assemble candidate: {e}")) + }); + let total_block_built_duration = total_block_built_start.elapsed(); + ctx.metrics + .total_block_built_duration + .record(total_block_built_duration); + ctx.metrics + .total_block_built_gauge + .set(total_block_built_duration); + + candidates_evaluated += 1; + + match build_result { + Ok((new_payload, mut fb_payload)) => { + fb_payload.index = flashblock_index; + fb_payload.base = None; + + let best_total_fees = best.as_ref().map_or(U256::ZERO, |b| b.total_fees); + let is_new_best = sim_info.total_fees > best_total_fees || best.is_none(); + candidate_span.record("gas_used", sim_info.cumulative_gas_used); + candidate_span.record("total_fees_wei", sim_info.total_fees.to_string()); + candidate_span.record("is_best", is_new_best); + + // Record the first successfully-built candidate's fees as + // the improvement baseline (in case the empty-candidate + // path failed or didn't run). + if first_candidate_fees.is_none() { + first_candidate_fees = Some(sim_info.total_fees); + } + + if is_new_best { + let next_flashblock_state = sim_fb_state + .next_after_seal(target_da_for_batch, target_da_footprint_for_batch); + let candidate = BestCandidate { + total_fees: sim_info.total_fees, + cache: state.cache.clone(), + transition: state.transition_state.clone(), + info: sim_info, + fb_state: sim_fb_state, + tx_tracker: sim_tx_tracker, + build_duration: candidate_build_start.elapsed(), + result: (next_flashblock_state, new_payload, fb_payload), + limiter_snapshot: ctx.address_limiter().snapshot_pending(), + candidates_evaluated, + candidates_improved: candidates_improved + 1, + state_root_calc: sim_state_root_calc, + }; + // Publish to shared slot so the main loop can take it + // on trigger without awaiting this task. + shared_best.store(candidate.clone()); + best = Some(candidate); + candidates_improved += 1; + } else if let Some(ref mut b) = best { + // Tie or worse: keep the winning candidate state, but + // update the counters exposed to the main loop. + b.candidates_evaluated = candidates_evaluated; + b.candidates_improved = candidates_improved; + shared_best.refresh_metrics(candidates_evaluated, candidates_improved); + } + } + Err(err) => { + ctx.metrics.invalid_built_blocks_count.increment(1); + warn!(target: "payload_builder", ?err, "Candidate seal failed, continuing"); + if best.is_some() { + shared_best.refresh_metrics(candidates_evaluated, candidates_improved); + } + } + } + + drop(_candidate_guard); + + if ctx.cancel().is_cancelled() { + break; + } + + if block_cancel.is_cancelled() { + break; + } + } + + // Record priority-fee improvement over the flashblock interval: how + // much more fee the best candidate captured vs. the first successful + // candidate (empty or otherwise). 0 when no improvement or no + // baseline. + let improvement_wei = match (best.as_ref(), first_candidate_fees) { + (Some(b), Some(first)) => b.total_fees.saturating_sub(first), + _ => U256::ZERO, + }; + let improvement_f64 = u128::try_from(improvement_wei).unwrap_or(u128::MAX) as f64; + ctx.metrics + .continuous_fee_improvement + .record(improvement_f64); + + if let Some(ref b) = best { + state.cache = b.cache.clone(); + state.transition_state = b.transition.clone(); + *info = b.info.clone(); + *fb_state = b.fb_state.clone(); + *tx_tracker = b.tx_tracker.clone(); + *state_root_calc = b.state_root_calc.clone(); + // Re-apply the winning candidate's pending limiter deltas so the + // per-build guard reflects the chosen candidate's gas/compute + // charges. Losing candidates' charges are discarded. + ctx.address_limiter().restore_pending(&b.limiter_snapshot); + } else if block_cancel.is_cancelled() { + state.cache = base_cache; + state.transition_state = base_transition; + ctx.address_limiter() + .restore_pending(&base_limiter_snapshot); + return Ok(CandidateLoopResult { + best: None, + candidates_evaluated, + candidates_improved, + }); + } + + Ok(CandidateLoopResult { + best, + candidates_evaluated, + candidates_improved, + }) + } +} diff --git a/crates/op-rbuilder/src/builder/continuous/interval.rs b/crates/op-rbuilder/src/builder/continuous/interval.rs new file mode 100644 index 000000000..3764a4598 --- /dev/null +++ b/crates/op-rbuilder/src/builder/continuous/interval.rs @@ -0,0 +1,213 @@ +use super::{ + shared_best::SharedBest, + types::{BuildOutput, BuildReceiver, BuildState, FlashblockInterval, JobDeps}, +}; +use crate::{ + builder::{ + builder_tx::BuilderTransactions, + cancellation::{FlashblockJobCancellation, PayloadJobCancellation}, + payload::OpPayloadBuilder, + }, + traits::{ClientBounds, PoolBounds}, +}; +use alloy_primitives::B256; +use reth_node_api::PayloadBuilderError; +use reth_revm::{State, database::StateProviderDatabase}; +use std::{ops::ControlFlow, time::Instant}; +use tokio::sync::{mpsc, oneshot}; +use tracing::{debug, field, metadata::Level, span}; + +// === Top-level loop ========================================= +// +// One `tokio::select!` over the payload-cancel and the per-interval +// trigger channel. On each trigger it advances one interval; on cancel +// or end-of-block it returns `Ok(())`. + +impl OpPayloadBuilder +where + Pool: PoolBounds + 'static, + Client: ClientBounds + 'static, + BuilderTx: BuilderTransactions + Send + Sync + 'static, +{ + /// Entrypoint to build a single payload in continuous mode, it should be called soon after FCU. + /// It handles the top-level loop: sets up state then iterates flashblock intervals. + /// + /// `base_state` is the seed for the next flashblock build. At entry it is the + /// post-fallback-block state (sealed by `build_fallback_block`, which already + /// applied deposits and the first builder tx); after each published flashblock + /// it advances to the top-of-last-flashblock state. + pub(crate) async fn run_continuous_flashblocks( + &self, + deps: JobDeps<'_>, + target_flashblocks: u64, + parent_hash: B256, + mut fb_trigger_rx: mpsc::Receiver, + base_state: BuildState, + ) -> Result<(), PayloadBuilderError> { + debug!( + target: "payload_builder", + "Continuous build mode enabled" + ); + + let mut interval = + self.start_flashblock_interval(parent_hash, deps.payload_cancel, deps.span, base_state); + + loop { + tokio::select! { + biased; + _ = deps.payload_cancel.wait_for_cancellation() => break, + trigger = fb_trigger_rx.recv() => match trigger { + Some(new_fb_cancel) => { + // Fast path: take the current best from the candidate slot + // without awaiting the task. + // Publish happens immediately; the old task will drop its work on cancel. + let pretaken = interval.candidate_slot.take(); + match self.publish_and_spawn_next( + &deps, interval, pretaken, new_fb_cancel, + target_flashblocks, parent_hash, + ).await? { + ControlFlow::Continue(next) => interval = next, + // Terminal interval: last flashblock, payload cancelled + // mid-flight, or fallback produced no candidate. + // Stop metrics already recorded in publish_and_spawn_next. + ControlFlow::Break(()) => return Ok(()), + } + } + None => { + // Channel closed (scheduler finished or dropped). Cancel + // the orphan build task and drop the receiver; metrics + // are recorded once after the loop from the snapshot + // captured when we spawned the build task. + interval.base_ctx.cancel().cancel_current_flashblock(); + drop(interval.build_rx); + break; + } + }, + } + } + + Self::record_cancellation_reason(self.metrics(), deps.payload_cancel, deps.span); + self.record_flashblocks_metrics( + &interval.base_ctx, + &interval.base_fb_state, + &interval.base_info, + target_flashblocks, + deps.span, + ); + Ok(()) + } + + /// Spawn the build task for a new flashblock interval and bundle the + /// per-interval state (span, build receiver, candidate slot, build start). + /// Used at payload entry and after each publish to advance to the next + /// interval, so both call sites share the same setup. + pub(super) fn start_flashblock_interval( + &self, + parent_hash: B256, + payload_cancel: &PayloadJobCancellation, + parent_span: &tracing::Span, + base_state: BuildState, + ) -> FlashblockInterval { + let fb_span = if parent_span.is_none() { + tracing::Span::none() + } else { + span!( + parent: parent_span, + Level::INFO, + "build_flashblock", + flashblock_index = base_state.fb_state.flashblock_index(), + block_number = base_state.ctx.block_number(), + tx_count = field::Empty, + gas_used = field::Empty, + candidates_evaluated = field::Empty, + candidates_improved = field::Empty, + ) + }; + + let candidate_slot = SharedBest::new(); + let base_ctx = base_state.ctx.clone(); + let base_fb_state = base_state.fb_state.clone(); + let base_info = base_state.info.clone(); + let build_rx = self.spawn_continuous_build_task( + parent_hash, + payload_cancel, + base_state, + fb_span.clone(), + candidate_slot.clone(), + ); + FlashblockInterval { + fb_span, + build_rx, + candidate_slot, + build_start: Instant::now(), + base_ctx, + base_fb_state, + base_info, + } + } + + fn spawn_continuous_build_task( + &self, + parent_hash: B256, + payload_cancel: &PayloadJobCancellation, + base_state: BuildState, + fb_span: tracing::Span, + shared_best: SharedBest, + ) -> BuildReceiver { + let (build_tx, build_rx) = oneshot::channel(); + self.executor().spawn_blocking_task(Box::pin({ + let builder = self.clone(); + let block_cancel = payload_cancel.token(); + async move { + let _ = build_tx.send((|| -> Result<_, PayloadBuilderError> { + let _enter = fb_span.enter(); + let base_state = base_state; + + let state_provider = builder.client().state_by_block_hash(parent_hash)?; + let mut state_db = State::builder() + .with_database(StateProviderDatabase::new(&state_provider)) + .with_cached_prestate(base_state.cache) + .with_bundle_update() + .build(); + state_db.transition_state = base_state.transition; + + let mut tx_tracker = base_state.tx_tracker; + let mut info = base_state.info; + let mut fb_state = base_state.fb_state; + let mut state_root_calc = base_state.state_root_calc; + + let candidate_result = builder + .build_continuous_flashblock( + &base_state.ctx, + &mut fb_state, + &mut info, + &mut state_db, + &state_provider, + &mut tx_tracker, + &mut state_root_calc, + &block_cancel, + &shared_best, + ) + .map_err(|e| PayloadBuilderError::Other(e.into()))?; + + let cache = std::mem::take(&mut state_db.cache); + let transition_state = state_db.transition_state.take(); + + Ok(BuildOutput { + base_state: BuildState { + ctx: base_state.ctx, + info, + cache, + transition: transition_state, + tx_tracker, + fb_state, + state_root_calc, + }, + candidate_result, + }) + })()); + } + })); + build_rx + } +} diff --git a/crates/op-rbuilder/src/builder/continuous/mod.rs b/crates/op-rbuilder/src/builder/continuous/mod.rs new file mode 100644 index 000000000..b30379e35 --- /dev/null +++ b/crates/op-rbuilder/src/builder/continuous/mod.rs @@ -0,0 +1,22 @@ +//! Continuous flashblock build mode. +//! +//! In continuous mode the builder produces sealed flashblock candidates back-to-back +//! within each flashblock interval, keeping only the highest-fee one in a shared +//! slot. When the scheduler signals end-of-interval, the main loop publishes the +//! pretaken candidate immediately without awaiting the current build task, so +//! trigger -> publish latency is fast (bounded by serialization + WS send), rather +//! than by one full build pass. + +mod candidate_loop; +mod interval; +mod publish; +mod shared_best; +mod transition; +mod types; + +pub(crate) use types::{BuildState, JobDeps}; + +#[cfg(test)] +pub(crate) mod test_hooks { + pub(crate) use super::shared_best::test_hooks::force_next_take_misses; +} diff --git a/crates/op-rbuilder/src/builder/continuous/publish.rs b/crates/op-rbuilder/src/builder/continuous/publish.rs new file mode 100644 index 000000000..d82d4130b --- /dev/null +++ b/crates/op-rbuilder/src/builder/continuous/publish.rs @@ -0,0 +1,524 @@ +use super::{ + transition::{ + StopMetricsSource, TriggerOutcome, fallback_no_candidate_metrics_source, + plan_with_candidate, + }, + types::{ + BestCandidate, BuildOutput, BuildReceiver, BuildState, CandidateLoopResult, + FlashblockInterval, JobDeps, + }, +}; +use crate::{ + builder::{ + builder_tx::BuilderTransactions, + cancellation::FlashblockJobCancellation, + context::OpPayloadJobCtx, + payload::{FlashblocksState, OpPayloadBuilder}, + timing::compute_slot_offset_ms, + }, + metrics::record_flashblock_publish_timing, + primitives::reth::ExecutionInfo, + traits::{ClientBounds, PoolBounds}, +}; +use alloy_primitives::B256; +use reth_node_api::PayloadBuilderError; +use reth_optimism_node::OpBuiltPayload; +use std::{ops::ControlFlow, time::Instant}; +use tokio::sync::watch; +use tracing::{debug, info, metadata::Level, span, warn}; + +// === Per-interval publishing and advancement ================= +// +// On each scheduler trigger: publish the pretaken candidate without awaiting +// the task, then advance from the published candidate. If no candidate is +// ready, await the task output as a fallback. +// +// Transitions: +// TriggerArrived +// ├─ Ready(candidate) → plan_with_candidate → execute_outcome → Advance | Stop +// └─ AwaitFallback → wait +// ├─ Some(candidate) → plan_with_candidate → execute_outcome → Advance | Stop +// └─ None → record_base_and_stop + +enum TriggerCandidate { + Ready { + candidate: Box, + stale_build_rx: BuildReceiver, + }, + AwaitFallback(BuildReceiver), +} + +impl TriggerCandidate { + fn from_parts(pretaken: Option, build_rx: BuildReceiver) -> Self { + match pretaken { + Some(candidate) => Self::Ready { + candidate: Box::new(candidate), + stale_build_rx: build_rx, + }, + None => Self::AwaitFallback(build_rx), + } + } +} + +/// Where the candidate came from. +#[derive(Clone, Copy)] +enum CandidateOrigin { + Ready, + Fallback, +} + +impl CandidateOrigin { + fn published_log_event(self) -> CandidateLogEvent { + match self { + Self::Ready => CandidateLogEvent::ReadyCandidatePublished, + Self::Fallback => CandidateLogEvent::FallbackCandidatePublished, + } + } +} + +#[derive(Clone, Copy)] +enum CandidateLogEvent { + ReadyCandidatePublished, + FallbackCandidatePublished, + CandidatePublishSuppressed, + FallbackNoCandidate, +} + +impl CandidateLogEvent { + fn as_str(self) -> &'static str { + match self { + Self::ReadyCandidatePublished => "ready_candidate_published", + Self::FallbackCandidatePublished => "fallback_candidate_published", + Self::CandidatePublishSuppressed => "candidate_publish_suppressed", + Self::FallbackNoCandidate => "fallback_no_candidate", + } + } +} + +impl OpPayloadBuilder +where + Pool: PoolBounds + 'static, + Client: ClientBounds + 'static, + BuilderTx: BuilderTransactions + Send + Sync + 'static, +{ + /// Publish a candidate flashblock immediately. The context is only used for + /// publish timing metadata, so this can run before awaiting the build task. + /// Returns the serialized byte size. + fn publish_candidate( + &self, + candidate: &BestCandidate, + best_payload_tx: &watch::Sender>, + fb_span: &tracing::Span, + ctx: &OpPayloadJobCtx, + ) -> Result { + let _publish_span = if fb_span.is_none() { + tracing::Span::none() + } else { + span!(parent: fb_span, Level::INFO, "publish_flashblock") + } + .entered(); + + let (_, ref new_payload, ref fb_payload_delta) = candidate.result; + let flashblock_byte_size = self + .ws_pub() + .publish(fb_payload_delta) + .map_err(PayloadBuilderError::other)?; + let slot_offset_ms = + compute_slot_offset_ms(ctx.attributes().timestamp(), self.config().block_time); + record_flashblock_publish_timing(candidate.fb_state.flashblock_index(), slot_offset_ms); + self.built_fb_payload_tx() + .try_send(new_payload.clone()) + .map_err(PayloadBuilderError::other)?; + if let Err(e) = self.built_payload_tx().try_send(new_payload.clone()) { + warn!( + target: "payload_builder", + error = %e, + "Failed to send updated payload" + ); + } + best_payload_tx.send_replace(Some(new_payload.clone())); + Ok(flashblock_byte_size) + } + + pub(super) async fn publish_and_spawn_next( + &self, + deps: &JobDeps<'_>, + interval: FlashblockInterval, + pretaken: Option, + new_fb_cancel: FlashblockJobCancellation, + target_flashblocks: u64, + parent_hash: B256, + ) -> Result, PayloadBuilderError> { + // The consumed `candidate_slot` Arc clone is dropped here; the build + // task still holds its own clone until it observes cancel. + let FlashblockInterval { + fb_span, + build_rx, + candidate_slot: _, + build_start, + base_ctx, + base_fb_state, + base_info, + } = interval; + + self.metrics() + .continuous_build_duration + .record(build_start.elapsed()); + + match TriggerCandidate::from_parts(pretaken, build_rx) { + // Fast path: publish pretaken BEFORE awaiting the build task, then + // advance directly from the published candidate. The old task has + // a stale per-interval slot and a detached gas limiter, so its + // eventual result is intentionally ignored. + TriggerCandidate::Ready { + candidate, + stale_build_rx, + } => { + self.metrics().continuous_trigger_ready_total.increment(1); + let candidates_evaluated = candidate.candidates_evaluated; + let candidates_improved = candidate.candidates_improved; + let is_last_flashblock = candidate.fb_state.is_last_flashblock(); + let outcome = plan_with_candidate( + deps.payload_cancel.is_cancelled(), + deps.payload_cancel.is_resolved(), + is_last_flashblock, + ); + drop(stale_build_rx); + self.execute_outcome( + deps, + &fb_span, + base_ctx, + None, + &base_fb_state, + &base_info, + *candidate, + outcome, + CandidateOrigin::Ready, + candidates_evaluated, + candidates_improved, + new_fb_cancel, + target_flashblocks, + parent_hash, + ) + } + + // No sealed candidate was available in the slot. Fall back to + // waiting for the task output so we can either publish its best + // candidate or terminate cleanly if it never produced one. + TriggerCandidate::AwaitFallback(build_rx) => { + self.metrics().continuous_trigger_miss_total.increment(1); + let fallback_wait_start = Instant::now(); + let output = build_rx + .await + .map_err(|_| PayloadBuilderError::Other("blocking task dropped".into()))?; + self.metrics() + .continuous_trigger_fallback_wait_duration + .record(fallback_wait_start.elapsed()); + let output = output?; + + let BuildOutput { + base_state, + candidate_result: + CandidateLoopResult { + best, + candidates_evaluated, + candidates_improved, + }, + } = output; + + match best { + Some(candidate) => { + let is_last_flashblock = candidate.fb_state.is_last_flashblock(); + let outcome = plan_with_candidate( + deps.payload_cancel.is_cancelled(), + deps.payload_cancel.is_resolved(), + is_last_flashblock, + ); + self.execute_outcome( + deps, + &fb_span, + base_state.ctx, + Some(&base_ctx), + &base_fb_state, + &base_info, + candidate, + outcome, + CandidateOrigin::Fallback, + candidates_evaluated, + candidates_improved, + new_fb_cancel, + target_flashblocks, + parent_hash, + ) + } + None => { + self.record_continuous_candidate_metrics( + &fb_span, + candidates_evaluated, + candidates_improved, + CandidateLogEvent::FallbackNoCandidate, + ); + debug_assert_eq!( + fallback_no_candidate_metrics_source(), + StopMetricsSource::IntervalBase, + ); + self.record_base_and_stop( + deps, + &base_ctx, + &base_fb_state, + &base_info, + target_flashblocks, + ); + Ok(ControlFlow::Break(())) + } + } + } + } + } + + /// Execute the planned outcome for a trigger that has a candidate. + /// + /// `candidate_ctx` is the context the candidate was built against + /// (`base_ctx` on the Ready path, `base_state.ctx` on the Fallback path). + /// `suppressed_base_ctx` overrides which context's `flashblocks_metrics` + /// receives the suppressed-stop record — only set when distinct from + /// `candidate_ctx` (Fallback path). + #[expect(clippy::too_many_arguments)] + fn execute_outcome( + &self, + deps: &JobDeps<'_>, + fb_span: &tracing::Span, + candidate_ctx: OpPayloadJobCtx, + suppressed_base_ctx: Option<&OpPayloadJobCtx>, + base_fb_state: &FlashblocksState, + base_info: &ExecutionInfo, + candidate: BestCandidate, + outcome: TriggerOutcome, + origin: CandidateOrigin, + candidates_evaluated: u64, + candidates_improved: u64, + new_fb_cancel: FlashblockJobCancellation, + target_flashblocks: u64, + parent_hash: B256, + ) -> Result, PayloadBuilderError> { + match outcome { + TriggerOutcome::PublishAndAdvance | TriggerOutcome::PublishAndStop => { + let byte_size = self.publish_candidate( + &candidate, + deps.best_payload_tx, + fb_span, + &candidate_ctx, + )?; + self.record_continuous_candidate_metrics( + fb_span, + candidates_evaluated, + candidates_improved, + origin.published_log_event(), + ); + let stop = matches!(outcome, TriggerOutcome::PublishAndStop); + self.advance_or_stop_published( + deps, + fb_span, + candidate_ctx, + candidate, + byte_size, + candidates_evaluated, + candidates_improved, + new_fb_cancel, + target_flashblocks, + parent_hash, + stop, + ) + } + TriggerOutcome::SuppressAndStop { + count_suppressed, + metrics_source, + } => { + if count_suppressed { + debug_assert_eq!(metrics_source, StopMetricsSource::IntervalBase); + candidate_ctx + .metrics + .flashblock_publish_suppressed_total + .increment(1); + } + self.record_continuous_candidate_metrics( + fb_span, + candidates_evaluated, + candidates_improved, + CandidateLogEvent::CandidatePublishSuppressed, + ); + self.record_base_and_stop( + deps, + suppressed_base_ctx.unwrap_or(&candidate_ctx), + base_fb_state, + base_info, + target_flashblocks, + ); + Ok(ControlFlow::Break(())) + } + } + } + + fn record_continuous_candidate_metrics( + &self, + fb_span: &tracing::Span, + candidates_evaluated: u64, + candidates_improved: u64, + event: CandidateLogEvent, + ) { + self.metrics() + .continuous_candidates_evaluated + .record(candidates_evaluated as f64); + self.metrics() + .continuous_candidates_improved + .record(candidates_improved as f64); + + fb_span.record("candidates_evaluated", candidates_evaluated); + fb_span.record("candidates_improved", candidates_improved); + + info!( + target: "payload_builder", + parent: fb_span, + event = event.as_str(), + candidates_evaluated, + candidates_improved, + "Continuous trigger handled" + ); + } + + fn record_base_and_stop( + &self, + deps: &JobDeps<'_>, + base_ctx: &OpPayloadJobCtx, + base_fb_state: &FlashblocksState, + base_info: &ExecutionInfo, + target_flashblocks: u64, + ) { + Self::record_cancellation_reason(self.metrics(), deps.payload_cancel, deps.span); + self.record_flashblocks_metrics( + base_ctx, + base_fb_state, + base_info, + target_flashblocks, + deps.span, + ); + } + + /// Record publish-side metrics for the just-published candidate, then + /// either advance to the next flashblock interval or stop, per `stop` + /// (decided up-front by `plan_with_candidate`). + #[expect(clippy::too_many_arguments)] + fn advance_or_stop_published( + &self, + deps: &JobDeps<'_>, + fb_span: &tracing::Span, + ctx: OpPayloadJobCtx, + candidate: BestCandidate, + byte_size: usize, + candidates_evaluated: u64, + candidates_improved: u64, + new_fb_cancel: FlashblockJobCancellation, + target_flashblocks: u64, + parent_hash: B256, + stop: bool, + ) -> Result, PayloadBuilderError> { + let BestCandidate { + result: (next_fb_state, _new_payload, _fb_payload_delta), + cache, + transition, + info, + fb_state, + tx_tracker, + limiter_snapshot, + build_duration, + state_root_calc, + .. + } = candidate; + + let mut base_state = BuildState { + ctx, + info, + cache, + transition, + tx_tracker, + fb_state, + state_root_calc, + }; + base_state + .ctx + .address_limiter() + .restore_pending(&limiter_snapshot); + + if self.config().enable_tx_tracking_debug_logs { + debug!( + target: "tx_trace", + payload_id = %base_state.ctx.payload_id(), + block_number = base_state.ctx.block_number(), + flashblock_index = base_state.fb_state.flashblock_index(), + byte_size, + total_txs = base_state.info.executed_transactions.len(), + stage = "fb_published" + ); + } + + base_state + .ctx + .metrics + .flashblock_byte_size_histogram + .record(byte_size as f64); + base_state + .ctx + .metrics + .flashblock_num_tx_histogram + .record(base_state.info.executed_transactions.len() as f64); + // Record only the winning candidate's single-build time, so this stays + // comparable with the non-continuous path. The full trigger-to-trigger + // interval is in `continuous_build_duration`. + base_state + .ctx + .metrics + .flashblock_build_duration + .record(build_duration); + + fb_span.record( + "tx_count", + base_state.info.executed_transactions.len() as u64, + ); + fb_span.record("gas_used", base_state.info.cumulative_gas_used); + + info!( + target: "payload_builder", + event = "flashblock_built", + id = %base_state.ctx.payload_id(), + flashblock_index = base_state.fb_state.flashblock_index(), + current_gas = base_state.info.cumulative_gas_used, + current_da = base_state.info.cumulative_da_bytes_used, + target_flashblocks = base_state.fb_state.target_flashblock_count(), + candidates_evaluated, + candidates_improved, + "Continuous flashblock built" + ); + + base_state.fb_state = next_fb_state; + + if stop { + Self::record_cancellation_reason(self.metrics(), deps.payload_cancel, deps.span); + self.record_flashblocks_metrics( + &base_state.ctx, + &base_state.fb_state, + &base_state.info, + target_flashblocks, + deps.span, + ); + return Ok(ControlFlow::Break(())); + } + + base_state.ctx = base_state.ctx.with_cancel(new_fb_cancel); + Ok(ControlFlow::Continue(self.start_flashblock_interval( + parent_hash, + deps.payload_cancel, + deps.span, + base_state, + ))) + } +} diff --git a/crates/op-rbuilder/src/builder/continuous/shared_best.rs b/crates/op-rbuilder/src/builder/continuous/shared_best.rs new file mode 100644 index 000000000..f7f6d283d --- /dev/null +++ b/crates/op-rbuilder/src/builder/continuous/shared_best.rs @@ -0,0 +1,201 @@ +use super::types::BestCandidate; +use std::sync::{Arc, Mutex, MutexGuard}; +use tracing::warn; + +trait CandidateCounters { + fn set_candidate_counters(&mut self, candidates_evaluated: u64, candidates_improved: u64); +} + +impl CandidateCounters for BestCandidate { + fn set_candidate_counters(&mut self, candidates_evaluated: u64, candidates_improved: u64) { + self.candidates_evaluated = candidates_evaluated; + self.candidates_improved = candidates_improved; + } +} + +/// Generic single-slot mailbox: one writer publishes the latest value, one +/// reader takes it on demand. +struct CandidateSlot(Arc>>); + +impl Clone for CandidateSlot { + fn clone(&self) -> Self { + Self(Arc::clone(&self.0)) + } +} + +impl CandidateSlot { + fn new() -> Self { + Self(Arc::new(Mutex::new(None))) + } + + fn guard(&self) -> MutexGuard<'_, Option> { + self.0.lock().unwrap_or_else(|poisoned| { + warn!( + target: "payload_builder", + "Continuous candidate slot was poisoned; recovering the last candidate" + ); + poisoned.into_inner() + }) + } + + fn take(&self) -> Option { + self.guard().take() + } + + fn store(&self, value: T) { + let _ = self.guard().replace(value); + } +} + +impl CandidateSlot { + fn refresh_metrics(&self, candidates_evaluated: u64, candidates_improved: u64) { + if let Some(c) = self.guard().as_mut() { + c.set_candidate_counters(candidates_evaluated, candidates_improved); + } + } +} + +/// Slot that holds the current highest-fee sealed [`BestCandidate`] from the +/// build task. The build task writes on each improvement; the main loop takes +/// on trigger to publish without awaiting task completion. +#[derive(Clone)] +pub(super) struct SharedBest(CandidateSlot); + +impl SharedBest { + pub(super) fn new() -> Self { + Self(CandidateSlot::new()) + } + + /// Take the current candidate (if any). + pub(super) fn take(&self) -> Option { + #[cfg(test)] + if test_hooks::should_force_take_miss() { + return None; + } + self.0.take() + } + + pub(super) fn store(&self, candidate: BestCandidate) { + self.0.store(candidate); + } + + pub(super) fn refresh_metrics(&self, candidates_evaluated: u64, candidates_improved: u64) { + self.0 + .refresh_metrics(candidates_evaluated, candidates_improved); + } +} + +#[cfg(test)] +pub(crate) mod test_hooks { + use std::sync::atomic::{AtomicU64, Ordering}; + + static FORCE_TAKE_MISS_COUNT: AtomicU64 = AtomicU64::new(0); + + pub(crate) struct ForceTakeMissGuard; + + impl Drop for ForceTakeMissGuard { + fn drop(&mut self) { + FORCE_TAKE_MISS_COUNT.store(0, Ordering::Release); + } + } + + pub(crate) fn force_next_take_misses(count: u64) -> ForceTakeMissGuard { + FORCE_TAKE_MISS_COUNT.store(count, Ordering::Release); + ForceTakeMissGuard + } + + pub(super) fn should_force_take_miss() -> bool { + FORCE_TAKE_MISS_COUNT + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |count| { + count.checked_sub(1) + }) + .is_ok() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Debug, PartialEq, Eq)] + struct TestCandidate { + id: u64, + candidates_evaluated: u64, + candidates_improved: u64, + } + + impl TestCandidate { + fn new(id: u64) -> Self { + Self { + id, + candidates_evaluated: 0, + candidates_improved: 0, + } + } + } + + impl CandidateCounters for TestCandidate { + fn set_candidate_counters(&mut self, candidates_evaluated: u64, candidates_improved: u64) { + self.candidates_evaluated = candidates_evaluated; + self.candidates_improved = candidates_improved; + } + } + + #[test] + fn take_empty_slot_returns_none() { + let slot = CandidateSlot::::new(); + + assert_eq!(slot.take(), None); + } + + #[test] + fn store_then_take_returns_candidate_and_clears_slot() { + let slot = CandidateSlot::new(); + + slot.store(TestCandidate::new(7)); + + assert_eq!(slot.take(), Some(TestCandidate::new(7))); + assert_eq!(slot.take(), None); + } + + #[test] + fn refresh_counters_does_not_replace_candidate() { + let slot = CandidateSlot::new(); + + slot.store(TestCandidate::new(11)); + slot.refresh_metrics(5, 3); + + assert_eq!( + slot.take(), + Some(TestCandidate { + id: 11, + candidates_evaluated: 5, + candidates_improved: 3, + }) + ); + } + + #[test] + fn refresh_empty_slot_is_noop() { + let slot = CandidateSlot::::new(); + + slot.refresh_metrics(5, 3); + + assert_eq!(slot.take(), None); + } + + #[test] + fn poisoned_slot_recovers_inner_candidate() { + let slot = CandidateSlot::new(); + let poisoned_slot = slot.clone(); + + let _ = std::thread::spawn(move || { + let mut guard = poisoned_slot.0.lock().expect("lock is not poisoned yet"); + *guard = Some(TestCandidate::new(13)); + panic!("poison candidate slot"); + }) + .join(); + + assert_eq!(slot.take(), Some(TestCandidate::new(13))); + } +} diff --git a/crates/op-rbuilder/src/builder/continuous/transition.rs b/crates/op-rbuilder/src/builder/continuous/transition.rs new file mode 100644 index 000000000..df55ae96e --- /dev/null +++ b/crates/op-rbuilder/src/builder/continuous/transition.rs @@ -0,0 +1,200 @@ +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum StopMetricsSource { + IntervalBase, + PublishedCandidate, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum PrePublishDecision { + Publish, + SuppressBeforePublish { + count_suppressed: bool, + metrics_source: StopMetricsSource, + }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum PostPublishDecision { + Continue, + Stop { metrics_source: StopMetricsSource }, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(super) enum TriggerOutcome { + /// Publish the candidate and continue to the next flashblock interval. + PublishAndAdvance, + /// Publish the candidate and stop (cancelled after publish or last flashblock). + PublishAndStop, + /// Suppress before publish; the loop records interval-base stop metrics. + SuppressAndStop { + count_suppressed: bool, + metrics_source: StopMetricsSource, + }, +} + +pub(super) fn decide_pre_publish( + payload_cancelled: bool, + payload_resolved: bool, +) -> PrePublishDecision { + if payload_cancelled { + return PrePublishDecision::SuppressBeforePublish { + count_suppressed: payload_resolved, + metrics_source: StopMetricsSource::IntervalBase, + }; + } + + PrePublishDecision::Publish +} + +pub(super) fn decide_after_publish( + payload_cancelled: bool, + is_last_flashblock: bool, +) -> PostPublishDecision { + if payload_cancelled || is_last_flashblock { + return PostPublishDecision::Stop { + metrics_source: StopMetricsSource::PublishedCandidate, + }; + } + + PostPublishDecision::Continue +} + +/// Compose pre- and post-publish decisions into a single trigger outcome. +pub(super) fn plan_with_candidate( + payload_cancelled: bool, + payload_resolved: bool, + is_last_flashblock: bool, +) -> TriggerOutcome { + match decide_pre_publish(payload_cancelled, payload_resolved) { + PrePublishDecision::Publish => { + match decide_after_publish(payload_cancelled, is_last_flashblock) { + PostPublishDecision::Continue => TriggerOutcome::PublishAndAdvance, + PostPublishDecision::Stop { metrics_source: _ } => TriggerOutcome::PublishAndStop, + } + } + PrePublishDecision::SuppressBeforePublish { + count_suppressed, + metrics_source, + } => TriggerOutcome::SuppressAndStop { + count_suppressed, + metrics_source, + }, + } +} + +pub(super) fn fallback_no_candidate_metrics_source() -> StopMetricsSource { + StopMetricsSource::IntervalBase +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn uncancelled_before_publish_decides_publish() { + assert_eq!( + decide_pre_publish(false, false), + PrePublishDecision::Publish + ); + } + + #[test] + fn resolved_before_publish_suppresses_and_counts_suppressed_metric() { + assert_eq!( + decide_pre_publish(true, true), + PrePublishDecision::SuppressBeforePublish { + count_suppressed: true, + metrics_source: StopMetricsSource::IntervalBase, + } + ); + } + + #[test] + fn cancelled_unresolved_before_publish_suppresses_without_counting() { + assert_eq!( + decide_pre_publish(true, false), + PrePublishDecision::SuppressBeforePublish { + count_suppressed: false, + metrics_source: StopMetricsSource::IntervalBase, + } + ); + } + + #[test] + fn published_candidate_not_cancelled_and_not_last_continues() { + assert_eq!( + decide_after_publish(false, false), + PostPublishDecision::Continue + ); + } + + #[test] + fn published_candidate_cancelled_after_publish_stops() { + assert_eq!( + decide_after_publish(true, false), + PostPublishDecision::Stop { + metrics_source: StopMetricsSource::PublishedCandidate, + } + ); + } + + #[test] + fn published_last_flashblock_stops() { + assert_eq!( + decide_after_publish(false, true), + PostPublishDecision::Stop { + metrics_source: StopMetricsSource::PublishedCandidate, + } + ); + } + + #[test] + fn plan_publish_and_advance_when_uncancelled_and_not_last() { + assert_eq!( + plan_with_candidate(false, false, false), + TriggerOutcome::PublishAndAdvance + ); + } + + #[test] + fn plan_publish_and_stop_on_last_flashblock() { + assert_eq!( + plan_with_candidate(false, false, true), + TriggerOutcome::PublishAndStop + ); + } + + #[test] + fn plan_suppress_counts_when_cancelled_and_resolved() { + assert_eq!( + plan_with_candidate(true, true, false), + TriggerOutcome::SuppressAndStop { + count_suppressed: true, + metrics_source: StopMetricsSource::IntervalBase, + } + ); + } + + #[test] + fn plan_suppress_does_not_count_when_cancelled_unresolved() { + assert_eq!( + plan_with_candidate(true, false, false), + TriggerOutcome::SuppressAndStop { + count_suppressed: false, + metrics_source: StopMetricsSource::IntervalBase, + } + ); + } + + #[test] + fn plan_suppress_takes_precedence_over_last_flashblock() { + // Cancellation routes through Suppress regardless of last-flashblock state. + assert_eq!( + plan_with_candidate(true, true, true), + TriggerOutcome::SuppressAndStop { + count_suppressed: true, + metrics_source: StopMetricsSource::IntervalBase, + } + ); + } +} diff --git a/crates/op-rbuilder/src/builder/continuous/types.rs b/crates/op-rbuilder/src/builder/continuous/types.rs new file mode 100644 index 000000000..587d3d5cb --- /dev/null +++ b/crates/op-rbuilder/src/builder/continuous/types.rs @@ -0,0 +1,104 @@ +use super::shared_best::SharedBest; +use crate::{ + builder::{ + best_txs::FlashblockTxTracker, cancellation::PayloadJobCancellation, + context::OpPayloadJobCtx, payload::FlashblocksState, state_root::StateRootCalculator, + }, + limiter::AddressLimiterDeltas, + primitives::reth::ExecutionInfo, +}; +use alloy_primitives::U256; +use op_alloy_rpc_types_engine::OpFlashblockPayload; +use reth_node_api::PayloadBuilderError; +use reth_optimism_node::OpBuiltPayload; +use reth_revm::db::{CacheState, TransitionState}; +use std::time::{Duration, Instant}; +use tokio::sync::{oneshot, watch}; + +pub(crate) struct BuildState { + pub(crate) ctx: OpPayloadJobCtx, + pub(crate) info: ExecutionInfo, + pub(crate) cache: CacheState, + pub(crate) transition: Option, + pub(crate) tx_tracker: FlashblockTxTracker, + pub(crate) fb_state: FlashblocksState, + /// Per-interval state-root calculator. Cloned per candidate inside the + /// build task so each seal sees the same baseline, then replaced with the + /// winner's calculator before the next interval begins. + pub(crate) state_root_calc: StateRootCalculator, +} + +/// References borrowed by the continuous build job for the duration of a single +/// payload. +pub(crate) struct JobDeps<'a> { + pub(crate) span: &'a tracing::Span, + pub(crate) best_payload_tx: &'a watch::Sender>, + pub(crate) payload_cancel: &'a PayloadJobCancellation, +} + +/// Result of a continuous candidate loop within a single flashblock interval. +/// Contains the best sealed candidate and statistics. +pub(super) struct CandidateLoopResult { + pub(super) best: Option, + pub(super) candidates_evaluated: u64, + pub(super) candidates_improved: u64, +} + +/// Output from a continuous build candidate loop. +/// Each iteration clones state, executes txs, seals, and keeps the best candidate. +/// When fb_cancel fires, the pre-sealed candidate is ready for instant publish. +pub(super) struct BuildOutput { + pub(super) base_state: BuildState, + pub(super) candidate_result: CandidateLoopResult, +} + +pub(super) type BuildReceiver = oneshot::Receiver>; + +/// Best sealed candidate found so far within a flashblock interval. +/// Groups all the state that must be updated in lockstep when a new best is found. +#[derive(Clone)] +pub(super) struct BestCandidate { + /// The flashblock state + sealed payload + flashblock delta for next flashblock + pub(super) result: (FlashblocksState, OpBuiltPayload, OpFlashblockPayload), + /// Total priority fees accumulated by this candidate (comparison key). + pub(super) total_fees: U256, + /// EVM cache. + pub(super) cache: CacheState, + /// EVM transition. + pub(super) transition: Option, + /// EVM execution info. + pub(super) info: ExecutionInfo, + /// Flashblock state. + pub(super) fb_state: FlashblocksState, + /// Committed txs from the winning build. + pub(super) tx_tracker: FlashblockTxTracker, + /// Address limiter pending-deltas snapshot taken at the moment this + /// candidate became the interval best. Restored into the per-build guard + /// when the candidate is published so the next interval inherits exactly + /// this candidate's gas/compute charges. + pub(super) limiter_snapshot: AddressLimiterDeltas, + /// Wall time spent building this single candidate. + pub(super) build_duration: Duration, + /// Number of candidates evaluated when this candidate became best. + pub(super) candidates_evaluated: u64, + /// Number of times the interval best improved including this candidate. + pub(super) candidates_improved: u64, + /// State-root calculator state captured at the moment this candidate was + /// sealed. Promoted into the next interval's [`BuildState`] when the + /// candidate is published. + pub(super) state_root_calc: StateRootCalculator, +} + +/// Per-flashblock-interval state owned by the main loop. Replaced as a unit +/// when advancing to the next interval; in particular `candidate_slot` is a +/// fresh `Arc>` per interval so a cancelled-but-not-yet-stopped +/// build task cannot overwrite the new task's slot. +pub(super) struct FlashblockInterval { + pub(super) fb_span: tracing::Span, + pub(super) build_rx: BuildReceiver, + pub(super) candidate_slot: SharedBest, + pub(super) build_start: Instant, + pub(super) base_ctx: OpPayloadJobCtx, + pub(super) base_fb_state: FlashblocksState, + pub(super) base_info: ExecutionInfo, +} diff --git a/crates/op-rbuilder/src/builder/mod.rs b/crates/op-rbuilder/src/builder/mod.rs index dfffe3c0d..d968a666b 100644 --- a/crates/op-rbuilder/src/builder/mod.rs +++ b/crates/op-rbuilder/src/builder/mod.rs @@ -15,6 +15,7 @@ mod builder_tx; pub(crate) mod cancellation; mod config; mod context; +mod continuous; mod flashblocks_builder_tx; mod generator; mod p2p; @@ -35,6 +36,9 @@ pub use context::OpPayloadJobCtx; pub use service::FlashblocksServiceBuilder; pub use state_root::StateRootCalculator; +#[cfg(test)] +pub(crate) use continuous::test_hooks as continuous_test_hooks; + /// Configuration values that are applicable to any type of block builder. #[derive(Debug, Clone)] pub struct BuilderConfig { diff --git a/crates/op-rbuilder/src/builder/payload.rs b/crates/op-rbuilder/src/builder/payload.rs index faa05c730..a5d4f2a8d 100644 --- a/crates/op-rbuilder/src/builder/payload.rs +++ b/crates/op-rbuilder/src/builder/payload.rs @@ -6,6 +6,7 @@ use crate::{ builder_tx::{BuilderTransactions, reserve_builder_tx_budget}, cancellation::{FlashblockJobCancellation, PayloadJobCancellation}, context::{OpPayloadBuilderCtx, OpPayloadJobCtx}, + continuous::{BuildState, JobDeps}, generator::{BuildArguments, PayloadBuilder}, timing::{FlashblockScheduler, compute_slot_offset_ms}, }, @@ -41,7 +42,7 @@ use reth_transaction_pool::TransactionPool; use revm::Database; use std::{ ops::Deref, - sync::Arc, + sync::{Arc, atomic::AtomicU64}, time::{Duration, Instant}, }; use tokio::sync::{mpsc, watch}; @@ -63,7 +64,7 @@ type NextFlashblockPoolTxCursor<'a, Pool> = FlashblockPoolTxCursor< >; #[derive(Debug, Default, Clone)] -pub(super) struct FlashblocksState { +pub(crate) struct FlashblocksState { /// Current flashblock index flashblock_index: u64, /// Target flashblock count per block @@ -150,7 +151,7 @@ impl FlashblocksState { /// incremented by the corresponding per-batch limit. `target_gas` is /// recomputed from `self.target_gas_for_batch` (pre-build) plus /// `gas_per_batch`. - fn next_after_seal( + pub(crate) fn next_after_seal( &self, mut target_da_for_batch: Option, mut target_da_footprint_for_batch: Option, @@ -199,11 +200,11 @@ impl FlashblocksState { self } - pub(super) fn flashblock_index(&self) -> u64 { + pub(crate) fn flashblock_index(&self) -> u64 { self.flashblock_index } - pub(super) fn target_flashblock_count(&self) -> u64 { + pub(crate) fn target_flashblock_count(&self) -> u64 { self.target_flashblock_count } @@ -214,15 +215,23 @@ impl FlashblocksState { } } - fn target_gas_for_batch(&self) -> u64 { + pub(crate) fn is_first_flashblock(&self) -> bool { + self.flashblock_index == 0 + } + + pub(crate) fn is_last_flashblock(&self) -> bool { + self.flashblock_index == self.target_flashblock_count + } + + pub(crate) fn target_gas_for_batch(&self) -> u64 { self.target_gas_for_batch } - fn target_da_for_batch(&self) -> Option { + pub(crate) fn target_da_for_batch(&self) -> Option { self.target_da_for_batch } - fn target_da_footprint_for_batch(&self) -> Option { + pub(crate) fn target_da_footprint_for_batch(&self) -> Option { self.target_da_footprint_for_batch } @@ -231,7 +240,7 @@ impl FlashblocksState { } /// Extracts new transactions since the last flashblock - pub(super) fn slice_new_transactions<'a>( + pub(crate) fn slice_new_transactions<'a>( &self, all_transactions: &'a [OpTransactionSigned], ) -> &'a [OpTransactionSigned] { @@ -264,12 +273,12 @@ impl FlashblockMeta { /// Optimism's payload builder #[derive(Debug)] -pub(super) struct OpPayloadBuilder { +pub(crate) struct OpPayloadBuilder { inner: Arc>, } #[derive(Debug)] -pub(super) struct OpPayloadBuilderInner { +pub(crate) struct OpPayloadBuilderInner { /// Builder context builder_ctx: Arc, /// The transaction pool @@ -291,10 +300,54 @@ pub(super) struct OpPayloadBuilderInner { builder_tx: BuilderTx, /// Tokio task metrics for monitoring spawned tasks task_metrics: Arc, + /// Monotonic epoch that advances on pool mutations. + pool_change_epoch: Arc, /// Task executor used to offload blocking work. executor: Runtime, } +impl OpPayloadBuilderInner { + pub(crate) fn pool(&self) -> &Pool { + &self.pool + } + + pub(crate) fn client(&self) -> &Client { + &self.client + } + + pub(crate) fn built_fb_payload_tx(&self) -> &mpsc::Sender { + &self.built_fb_payload_tx + } + + pub(crate) fn built_payload_tx(&self) -> &mpsc::Sender { + &self.built_payload_tx + } + + pub(crate) fn ws_pub(&self) -> &WebSocketPublisher { + &self.ws_pub + } + + pub(crate) fn config(&self) -> &BuilderConfig { + &self.config + } + + pub(crate) fn metrics(&self) -> &OpRBuilderMetrics { + &self.builder_ctx.metrics + } + + pub(crate) fn builder_tx(&self) -> &BuilderTx { + &self.builder_tx + } + + pub(crate) fn pool_change_epoch(&self) -> &AtomicU64 { + &self.pool_change_epoch + } + + pub(crate) fn executor(&self) -> &Runtime { + &self.executor + } +} + impl Deref for OpPayloadBuilder { type Target = OpPayloadBuilderInner; @@ -316,7 +369,7 @@ where Client: ClientBounds, { #[expect(clippy::too_many_arguments)] - pub(super) fn new( + pub(crate) fn new( evm_config: OpEvmConfig, pool: Pool, client: Client, @@ -327,6 +380,7 @@ where ws_pub: WebSocketPublisher, metrics: Arc, task_metrics: Arc, + pool_change_epoch: Arc, executor: Runtime, ) -> Self { let address_limiter = AddressLimiter::new( @@ -359,6 +413,7 @@ where config, builder_tx, task_metrics, + pool_change_epoch, executor, }), } @@ -428,7 +483,7 @@ where .backrun_bundle_pool() .map(|pool| pool.block_pool(config.parent_header.number + 1)); - let address_limiter = builder_ctx.address_limiter.begin(); + let address_limiter = Arc::new(builder_ctx.address_limiter.begin()); Ok(OpPayloadJobCtx::new( Arc::clone(builder_ctx), @@ -669,6 +724,27 @@ where let mut tx_tracker = FlashblockTxTracker::default(); let parent_hash = ctx.parent_hash(); + // Gate: continuous build mode + if self.config.flashblocks_config.continuous_build { + let deps = JobDeps { + span: &span, + best_payload_tx: &best_payload_tx, + payload_cancel: &payload_cancel, + }; + let base_state = BuildState { + ctx, + info, + cache, + transition, + tx_tracker, + fb_state, + state_root_calc, + }; + return self + .run_continuous_flashblocks(deps, target_flashblocks, parent_hash, rx, base_state) + .await; + } + // State machine: explicit select! at every phase for deterministic cancellation. loop { // Phase 1: Wait for scheduler trigger, or exit on cancellation. @@ -1204,7 +1280,7 @@ where } /// Records cancellation reason for observability. - fn record_cancellation_reason( + pub(crate) fn record_cancellation_reason( metrics: &OpRBuilderMetrics, cancellation: &PayloadJobCancellation, span: &tracing::Span, @@ -1236,7 +1312,7 @@ where } /// Do some logging and metric recording when we stop building flashblocks - fn record_flashblocks_metrics( + pub(crate) fn record_flashblocks_metrics( &self, ctx: &OpPayloadJobCtx, fb_state: &FlashblocksState, @@ -1314,8 +1390,7 @@ where .number .is_multiple_of(self.config.sampling_ratio) { - span!( - Level::INFO, + info_span!( "build_payload", payload_id = tracing::field::Empty, block_number = args.config.parent_header.number + 1, diff --git a/crates/op-rbuilder/src/builder/service.rs b/crates/op-rbuilder/src/builder/service.rs index 983b35ebc..758ad9aa2 100644 --- a/crates/op-rbuilder/src/builder/service.rs +++ b/crates/op-rbuilder/src/builder/service.rs @@ -16,12 +16,20 @@ use crate::{ traits::{NodeBounds, PoolBounds}, }; use eyre::WrapErr as _; +use futures_util::StreamExt; use reth_node_api::NodeTypes; use reth_node_builder::{BuilderContext, components::PayloadServiceBuilder}; use reth_optimism_evm::OpEvmConfig; use reth_payload_builder::{PayloadBuilderHandle, PayloadBuilderService}; use reth_provider::CanonStateSubscriptions; -use std::{sync::Arc, time::Duration}; +use reth_transaction_pool::FullTransactionEvent; +use std::{ + sync::{ + Arc, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, +}; use tracing::{error, info, warn}; #[derive(derive_more::Constructor)] @@ -108,6 +116,24 @@ impl FlashblocksServiceBuilder { let metrics = Arc::new(OpRBuilderMetrics::default()); let task_metrics = Arc::new(FlashblocksTaskMetrics::new()); + let pool_change_epoch = Arc::new(AtomicU64::new(0)); + + if flashblocks_config.continuous_build { + let mut pool_events = pool.all_transactions_event_listener(); + let pool_change_epoch = pool_change_epoch.clone(); + ctx.task_executor().spawn_task(async move { + while let Some(event) = pool_events.next().await { + match event { + // Only events that can improve a candidate trigger rebuilds. + FullTransactionEvent::Pending(_) + | FullTransactionEvent::Replaced { .. } => { + pool_change_epoch.fetch_add(1, Ordering::Relaxed); + } + _ => {} + } + } + }); + } // Channels for built flashblock payloads let (built_fb_payload_tx, built_fb_payload_rx) = tokio::sync::mpsc::channel(16); @@ -131,6 +157,7 @@ impl FlashblocksServiceBuilder { ws_pub, metrics.clone(), task_metrics.clone(), + pool_change_epoch, ctx.task_executor().clone(), ); let payload_generator = BlockPayloadJobGenerator::with_builder( diff --git a/crates/op-rbuilder/src/evm.rs b/crates/op-rbuilder/src/evm.rs index d83a0bf5e..5eca4961b 100644 --- a/crates/op-rbuilder/src/evm.rs +++ b/crates/op-rbuilder/src/evm.rs @@ -10,6 +10,7 @@ pub type OpBlockEvmFactory = BlockEvmFactory; /// /// Instead of threading a `ConfigureEvm` impl + `EvmEnv` separately through /// types that need to create an EVM, pass a single `BlockEvmFactory`. +#[derive(Clone)] pub struct BlockEvmFactory { evm_config: C, evm_env: EvmEnvFor, diff --git a/crates/op-rbuilder/src/limiter/bucket.rs b/crates/op-rbuilder/src/limiter/bucket.rs index bdf3eae8e..1722f4cd3 100644 --- a/crates/op-rbuilder/src/limiter/bucket.rs +++ b/crates/op-rbuilder/src/limiter/bucket.rs @@ -18,7 +18,7 @@ impl + AddAssign + Sub } /// Batch of pending per-address bucket updates produced by a [`BucketLimiterGuard`]. Hands the deltas back to [`BucketLimiter::commit`]. -#[derive(Debug)] +#[derive(Debug, Clone)] pub(super) struct PendingDeltas(HashMap>); impl PendingDeltas { @@ -154,6 +154,22 @@ impl AddressBucketsGuard { fn into_pending(self) -> HashMap> { self.pending.into_inner().unwrap_or_else(|p| p.into_inner()) } + + /// Clone the current pending map without consuming the guard. Used by the + /// continuous candidate loop to checkpoint pending state across candidate + /// iterations. + fn clone_pending(&self) -> HashMap> { + self.pending + .lock() + .unwrap_or_else(|p| p.into_inner()) + .clone() + } + + /// Replace the pending map. Used by the continuous candidate loop to + /// rewind to a saved checkpoint before evaluating the next candidate. + fn set_pending(&self, pending: HashMap>) { + *self.pending.lock().unwrap_or_else(|p| p.into_inner()) = pending; + } } /// Configuration for a [`BucketLimiter`]. @@ -229,6 +245,20 @@ impl BucketLimiterGuard { pub(super) fn into_pending(self) -> PendingDeltas { PendingDeltas(self.buckets.into_pending()) } + + /// Snapshot the current pending deltas without consuming the guard. + /// Continuous candidate evaluation uses this to checkpoint between + /// candidate iterations. + pub(super) fn snapshot_pending(&self) -> PendingDeltas { + PendingDeltas(self.buckets.clone_pending()) + } + + /// Replace pending deltas with a previous snapshot. Continuous candidate + /// evaluation uses this to rewind to a checkpoint before trying the next + /// candidate. + pub(super) fn restore_pending(&self, snapshot: PendingDeltas) { + self.buckets.set_pending(snapshot.0); + } } /// A `TokenBucket` can be used to track various resources. We currently use diff --git a/crates/op-rbuilder/src/limiter/mod.rs b/crates/op-rbuilder/src/limiter/mod.rs index 8100bf5d7..2d23c64ef 100644 --- a/crates/op-rbuilder/src/limiter/mod.rs +++ b/crates/op-rbuilder/src/limiter/mod.rs @@ -159,6 +159,17 @@ pub struct AddressLimiterGuard { metrics: LimiterMetrics, } +/// Opaque snapshot of an [`AddressLimiterGuard`]'s pending deltas. Cheap to +/// clone (just clones the underlying maps). Continuous candidate evaluation +/// uses this to rewind/replay the per-build guard across candidate iterations +/// so each candidate sees the same starting deltas and only the winner's +/// deltas survive into the canonical commit. +#[derive(Debug, Clone, Default)] +pub struct AddressLimiterDeltas { + gas: Option>, + compute: Option>, +} + impl AddressLimiterGuard { /// Returns `true` if the address is debt-free in all enabled limiters. /// Increments the rejection counter for the first failing reason only. @@ -207,6 +218,38 @@ impl AddressLimiterGuard { self.canonical .commit_deltas(self.epoch, gas_deltas, compute_deltas) } + + /// Snapshot the current pending deltas without consuming the guard. + /// Continuous candidate evaluation uses this to checkpoint pending state + /// before each candidate so it can be rewound for the next iteration and + /// the winning candidate's deltas re-loaded before the guard's eventual + /// commit. + pub fn snapshot_pending(&self) -> AddressLimiterDeltas { + AddressLimiterDeltas { + gas: self.gas.as_ref().map(|g| g.snapshot_pending()), + compute: self.compute.as_ref().map(|c| c.snapshot_pending()), + } + } + + /// Restore the pending deltas from a previous [`Self::snapshot_pending`]. + /// Each per-limiter delta map only overwrites pending if the corresponding + /// limiter is enabled on this guard; snapshot/restore is therefore a no-op + /// on disabled limiters. + pub fn restore_pending(&self, snapshot: &AddressLimiterDeltas) { + if let (Some(g), Some(snap)) = (self.gas.as_ref(), snapshot.gas.as_ref()) { + g.restore_pending(snap.clone()); + } + if let (Some(c), Some(snap)) = (self.compute.as_ref(), snapshot.compute.as_ref()) { + c.restore_pending(snap.clone()); + } + } + + /// Disarm the guard so its Drop becomes a no-op (no auto-commit). Used + /// when the caller knows the canonical has already advanced past this + /// guard's epoch, or when the deltas have been deliberately discarded. + pub fn disarm(&mut self) { + self.armed = false; + } } impl Drop for AddressLimiterGuard { diff --git a/crates/op-rbuilder/src/metrics.rs b/crates/op-rbuilder/src/metrics.rs index 8bd9c21ab..d1df13541 100644 --- a/crates/op-rbuilder/src/metrics.rs +++ b/crates/op-rbuilder/src/metrics.rs @@ -181,6 +181,23 @@ pub struct OpRBuilderMetrics { pub backrun_transaction_processing_duration: Histogram, /// Latest backrun transaction processing duration pub backrun_transaction_processing_gauge: Gauge, + /// Duration the continuous builder ran before being interrupted/completing + pub continuous_build_duration: Histogram, + /// Scheduler triggers that found a sealed continuous candidate ready. + pub continuous_trigger_ready_total: Counter, + /// Scheduler triggers that had to wait for the build task because no + /// candidate had been sealed yet. + pub continuous_trigger_miss_total: Counter, + /// Time spent waiting for the build task after a trigger miss. + pub continuous_trigger_fallback_wait_duration: Histogram, + /// Number of candidates evaluated per flashblock interval + pub continuous_candidates_evaluated: Histogram, + /// Number of candidates that improved the latest best candidate + pub continuous_candidates_improved: Histogram, + /// Priority fee (wei) gained per flashblock interval by the continuous + /// candidate loop: best-candidate fees minus first-candidate fees. + /// Recorded once per flashblock interval; 0 if no improvement. + pub continuous_fee_improvement: Histogram, /// Builds completed but not published due to resolved gate pub flashblock_publish_suppressed_total: Counter, /// Payload job ended because getPayload resolved diff --git a/crates/op-rbuilder/src/primitives/reth/execution.rs b/crates/op-rbuilder/src/primitives/reth/execution.rs index 47ed592f6..cae66463b 100644 --- a/crates/op-rbuilder/src/primitives/reth/execution.rs +++ b/crates/op-rbuilder/src/primitives/reth/execution.rs @@ -40,7 +40,7 @@ pub enum TxnExecutionResult { CoinbaseProfitTooLow, } -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub struct ExecutionInfo { /// All executed transactions (unrecovered). pub executed_transactions: Vec, diff --git a/crates/op-rbuilder/src/tests/flashblocks.rs b/crates/op-rbuilder/src/tests/flashblocks.rs index 98491f61e..98970a5a2 100644 --- a/crates/op-rbuilder/src/tests/flashblocks.rs +++ b/crates/op-rbuilder/src/tests/flashblocks.rs @@ -10,7 +10,7 @@ use crate::{ args::{FlashblocksArgs, OpRbuilderArgs}, tests::{ BlockTransactionsExt, BundleOpts, ChainDriver, FLASHBLOCKS_NUMBER_ADDRESS, - TransactionBuilderExt, flashblocks_number_contract::FlashblocksNumber, + FlashblocksListener, TransactionBuilderExt, flashblocks_number_contract::FlashblocksNumber, }, }; @@ -167,6 +167,254 @@ async fn test_flashblock_min_max_filtering(rbuilder: LocalInstance) -> eyre::Res flashblocks_listener.stop().await } +#[rb_test(args = OpRbuilderArgs { + chain_block_time: 1000, + enable_revert_protection: true, + flashblocks: FlashblocksArgs { + flashblocks_port: 1240, + flashblocks_addr: "127.0.0.1".into(), + flashblocks_block_time: 200, + flashblocks_continuous_build: true, + ..Default::default() + }, + ..Default::default() +})] +async fn smoke_continuous_flashblocks_publish_and_advance_state( + rbuilder: LocalInstance, +) -> eyre::Result<()> { + let driver = rbuilder.driver().await?; + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); + + let tx1 = driver + .create_transaction() + .random_valid_transfer() + .with_bundle(BundleOpts::default().with_min_flashblock_number(1)) + .with_max_priority_fee_per_gas(1) + .send() + .await?; + + let tx3 = driver + .create_transaction() + .random_valid_transfer() + .with_bundle(BundleOpts::default().with_min_flashblock_number(3)) + .with_max_priority_fee_per_gas(10) + .send() + .await?; + + let tx5 = driver + .create_transaction() + .random_valid_transfer() + .with_bundle(BundleOpts::default().with_min_flashblock_number(5)) + .with_max_priority_fee_per_gas(20) + .send() + .await?; + + let block = driver.build_new_block_with_current_timestamp(None).await?; + let tx_hashes = vec![*tx1.tx_hash(), *tx3.tx_hash(), *tx5.tx_hash()]; + + assert!( + block.includes(&tx_hashes), + "continuous flashblocks block should include all gated transactions" + ); + + for (tx_hash, expected_flashblock) in [ + (tx_hashes[0], 1_u64), + (tx_hashes[1], 3_u64), + (tx_hashes[2], 5_u64), + ] { + assert_eq!( + Some(expected_flashblock), + flashblocks_listener.find_transaction_flashblock(&tx_hash), + "transaction {tx_hash:?} should land in flashblock {expected_flashblock}" + ); + } + + let flashblocks = flashblocks_listener.get_flashblocks(); + assert_eq!( + 6, + flashblocks.len(), + "continuous mode should produce one base flashblock plus five scheduled flashblocks" + ); + + for (expected_index, flashblock) in flashblocks.iter().enumerate() { + assert_eq!( + expected_index as u64, flashblock.index, + "continuous flashblocks should arrive in index order" + ); + } + + flashblocks_listener.stop().await +} + +#[rb_test(args = OpRbuilderArgs { + chain_block_time: 1000, + enable_revert_protection: true, + flashblocks: FlashblocksArgs { + flashblocks_port: 1241, + flashblocks_addr: "127.0.0.1".into(), + flashblocks_block_time: 200, + flashblocks_continuous_build: true, + ..Default::default() + }, + ..Default::default() +})] +async fn smoke_continuous_resolve_before_publish_suppresses_future_flashblocks( + rbuilder: LocalInstance, +) -> eyre::Result<()> { + let driver = rbuilder.driver().await?; + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); + + let future_tx = driver + .create_transaction() + .random_valid_transfer() + .with_bundle(BundleOpts::default().with_min_flashblock_number(1)) + .send() + .await?; + + let started = driver.start_payload_with_current_timestamp(None).await?; + let payload = driver.resolve_started_payload(&started).await?; + + wait_for_flashblock_count(&flashblocks_listener, 1, Duration::from_secs(2)).await; + tokio::time::sleep(Duration::from_millis(450)).await; + + let flashblocks = flashblocks_listener.get_flashblocks(); + assert_eq!( + 1, + flashblocks.len(), + "resolving before the first scheduled continuous flashblock should suppress later publishes" + ); + assert!( + flashblocks_listener + .find_transaction_flashblock(future_tx.tx_hash()) + .is_none(), + "future flashblock-gated transaction should not be published after resolve" + ); + + let block = driver.submit_started_payload(&started, payload).await?; + assert!( + !block.includes(future_tx.tx_hash()), + "resolved payload should not advance to an unpublished continuous candidate" + ); + + flashblocks_listener.stop().await +} + +#[rb_test(args = OpRbuilderArgs { + chain_block_time: 1000, + enable_revert_protection: true, + flashblocks: FlashblocksArgs { + flashblocks_port: 1242, + flashblocks_addr: "127.0.0.1".into(), + flashblocks_block_time: 200, + flashblocks_continuous_build: true, + ..Default::default() + }, + ..Default::default() +})] +async fn smoke_continuous_resolve_after_publish_preserves_published_state( + rbuilder: LocalInstance, +) -> eyre::Result<()> { + let driver = rbuilder.driver().await?; + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); + + let published_tx = driver + .create_transaction() + .random_valid_transfer() + .with_bundle(BundleOpts::default().with_min_flashblock_number(1)) + .with_max_priority_fee_per_gas(5) + .send() + .await?; + + let future_tx = driver + .create_transaction() + .random_valid_transfer() + .with_bundle(BundleOpts::default().with_min_flashblock_number(5)) + .with_max_priority_fee_per_gas(10) + .send() + .await?; + + let started = driver.start_payload_with_current_timestamp(None).await?; + let published_tx_flashblock = wait_for_transaction_flashblock( + &flashblocks_listener, + published_tx.tx_hash(), + Duration::from_secs(3), + ) + .await; + assert_eq!( + Some(1), + published_tx_flashblock, + "first continuous flashblock should publish the min=1 transaction" + ); + + let payload = driver.resolve_started_payload(&started).await?; + tokio::time::sleep(Duration::from_millis(550)).await; + + let flashblocks = flashblocks_listener.get_flashblocks(); + assert!( + flashblocks.len() < 6, + "resolving after a published flashblock should stop before the full schedule" + ); + assert!( + flashblocks_listener + .find_transaction_flashblock(future_tx.tx_hash()) + .is_none(), + "future flashblock-gated transaction should not be published after resolve" + ); + + let block = driver.submit_started_payload(&started, payload).await?; + assert!( + block.includes(published_tx.tx_hash()), + "resolved payload should keep already-published continuous state" + ); + assert!( + !block.includes(future_tx.tx_hash()), + "resolved payload should not include future unpublished continuous state" + ); + + flashblocks_listener.stop().await +} + +#[rb_test(args = OpRbuilderArgs { + chain_block_time: 1000, + enable_revert_protection: true, + flashblocks: FlashblocksArgs { + flashblocks_port: 1243, + flashblocks_addr: "127.0.0.1".into(), + flashblocks_block_time: 200, + flashblocks_continuous_build: true, + ..Default::default() + }, + ..Default::default() +})] +async fn smoke_continuous_trigger_miss_fallback_publishes_candidate( + rbuilder: LocalInstance, +) -> eyre::Result<()> { + let _force_miss = crate::builder::continuous_test_hooks::force_next_take_misses(1); + let driver = rbuilder.driver().await?; + let flashblocks_listener = rbuilder.spawn_flashblocks_listener(); + + let tx = driver + .create_transaction() + .random_valid_transfer() + .with_bundle(BundleOpts::default().with_min_flashblock_number(1)) + .with_max_priority_fee_per_gas(10) + .send() + .await?; + + let block = driver.build_new_block_with_current_timestamp(None).await?; + assert!( + block.includes(tx.tx_hash()), + "fallback publish path should still advance to the best candidate" + ); + assert_eq!( + Some(1), + flashblocks_listener.find_transaction_flashblock(tx.tx_hash()), + "forced trigger miss should still publish the min=1 transaction through fallback" + ); + + flashblocks_listener.stop().await +} + #[rb_test(args = OpRbuilderArgs { chain_block_time: 1000, flashblocks: FlashblocksArgs { @@ -388,6 +636,45 @@ async fn create_flashblock_transactions( Ok(txs) } +async fn wait_for_flashblock_count( + flashblocks_listener: &FlashblocksListener, + min_count: usize, + timeout_after: Duration, +) { + tokio::time::timeout(timeout_after, async { + loop { + if flashblocks_listener.get_flashblocks().len() >= min_count { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap_or_else(|_| { + panic!( + "timed out waiting for at least {min_count} flashblocks, got {}", + flashblocks_listener.get_flashblocks().len() + ) + }); +} + +async fn wait_for_transaction_flashblock( + flashblocks_listener: &FlashblocksListener, + tx_hash: &TxHash, + timeout_after: Duration, +) -> Option { + tokio::time::timeout(timeout_after, async { + loop { + if let Some(flashblock) = flashblocks_listener.find_transaction_flashblock(tx_hash) { + return Some(flashblock); + } + tokio::time::sleep(Duration::from_millis(10)).await; + } + }) + .await + .unwrap_or(None) +} + // Helper to verify builder transactions fn verify_builder_txs( block_txs: &[impl Transaction], diff --git a/crates/op-rbuilder/src/tests/framework/driver.rs b/crates/op-rbuilder/src/tests/framework/driver.rs index 1e4d78641..c788802f5 100644 --- a/crates/op-rbuilder/src/tests/framework/driver.rs +++ b/crates/op-rbuilder/src/tests/framework/driver.rs @@ -8,7 +8,9 @@ use alloy_rpc_types_eth::Block; use op_alloy_consensus::{OpTypedTransaction, TxDeposit}; use op_alloy_network::Optimism; use op_alloy_rpc_types::Transaction; +use op_alloy_rpc_types_engine::OpExecutionPayloadV4; use reth_optimism_node::OpPayloadAttributes; +use reth_payload_builder::PayloadId; use super::{EngineApi, Ipc, LocalInstance, TransactionBuilder}; use crate::{ @@ -32,6 +34,13 @@ pub struct ChainDriver { validation_nodes: Vec, } +#[derive(Debug, Clone)] +pub struct StartedPayload { + pub payload_id: PayloadId, + pub parent_hash: B256, + wait_after_fcu: Duration, +} + // instantiation and configuration impl ChainDriver { const MIN_BLOCK_TIME: Duration = Duration::from_secs(1); @@ -123,6 +132,37 @@ impl ChainDriver { timestamp_jitter: Option, min_base_fee: Option, ) -> eyre::Result> { + let started = self + .start_payload_with_txs_timestamp( + txs, + no_tx_pool, + block_timestamp, + timestamp_jitter, + min_base_fee, + ) + .await?; + + tokio::time::sleep(started.wait_after_fcu).await; + self.finish_started_payload(&started).await + } + + pub async fn start_payload_with_current_timestamp( + &self, + timestamp_jitter: Option, + ) -> eyre::Result { + self.start_payload_with_txs_timestamp(vec![], None, None, timestamp_jitter, Some(0)) + .await + } + + pub async fn start_payload_with_txs_timestamp( + &self, + txs: Vec, + no_tx_pool: Option, + block_timestamp: Option, + // Amount of time to lag before sending FCU. This tests late FCU scenarios + timestamp_jitter: Option, + min_base_fee: Option, + ) -> eyre::Result { let latest = self.latest().await?; // Add L1 block info as the first transaction in every L2 block @@ -203,25 +243,44 @@ impl ChainDriver { .payload_id .ok_or_else(|| eyre::eyre!("Forkchoice update did not return a payload ID"))?; - // wait for the block to be built for the specified chain block time - if let Some(timestamp_jitter) = timestamp_jitter { - tokio::time::sleep( - Duration::from_millis(self.args.chain_block_time) - .max(Self::MIN_BLOCK_TIME) - .saturating_sub(timestamp_jitter), - ) - .await; + let wait_after_fcu = if let Some(timestamp_jitter) = timestamp_jitter { + Duration::from_millis(self.args.chain_block_time) + .max(Self::MIN_BLOCK_TIME) + .saturating_sub(timestamp_jitter) } else { - tokio::time::sleep( - Duration::from_millis(self.args.chain_block_time).max(Self::MIN_BLOCK_TIME), - ) - .await; - } + Duration::from_millis(self.args.chain_block_time).max(Self::MIN_BLOCK_TIME) + }; + + Ok(StartedPayload { + payload_id, + parent_hash: latest.header.hash, + wait_after_fcu, + }) + } + pub async fn resolve_started_payload( + &self, + started: &StartedPayload, + ) -> eyre::Result { let payload: op_alloy_rpc_types_engine::OpExecutionPayloadEnvelopeV4 = - self.engine_api.get_payload(payload_id).await?; - let payload = payload.execution_payload; + self.engine_api.get_payload(started.payload_id).await?; + + Ok(payload.execution_payload) + } + pub async fn finish_started_payload( + &self, + started: &StartedPayload, + ) -> eyre::Result> { + let payload = self.resolve_started_payload(started).await?; + self.submit_started_payload(started, payload).await + } + + pub async fn submit_started_payload( + &self, + started: &StartedPayload, + payload: OpExecutionPayloadV4, + ) -> eyre::Result> { if self .engine_api .new_payload(payload.clone(), vec![], B256::ZERO, Requests::default()) @@ -235,7 +294,7 @@ impl ChainDriver { let new_block_hash = payload.payload_inner.payload_inner.payload_inner.block_hash; self.engine_api - .update_forkchoice(latest.header.hash, new_block_hash, None) + .update_forkchoice(started.parent_hash, new_block_hash, None) .await?; let block = self diff --git a/local-testing/grafana/dashboards/op-rbuilder.json b/local-testing/grafana/dashboards/op-rbuilder.json index fc8c64558..096217991 100644 --- a/local-testing/grafana/dashboards/op-rbuilder.json +++ b/local-testing/grafana/dashboards/op-rbuilder.json @@ -426,6 +426,88 @@ "title": "Landing %", "type": "stat" }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "decimals": 1, + "min": 0, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 6, + "w": 24, + "x": 0, + "y": 5 + }, + "id": 407, + "options": { + "legend": { + "calcs": [ + "mean", + "max", + "last" + ], + "displayMode": "table", + "placement": "right", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "editorMode": "code", + "expr": "sum(rate(chain_monitor_flashblocks_receive_success_count_total{job=\"chain-monitor\"}[$__rate_interval]))", + "legendFormat": "flashblocks/s", + "range": true, + "refId": "A" + } + ], + "title": "Flashblocks per Second (chain-monitor)", + "type": "timeseries" + }, { "datasource": { "type": "loki", @@ -435,7 +517,7 @@ "h": 9, "w": 16, "x": 0, - "y": 5 + "y": 11 }, "id": 301, "options": { @@ -545,7 +627,7 @@ "h": 9, "w": 8, "x": 16, - "y": 5 + "y": 11 }, "id": 302, "options": { @@ -563,37 +645,777 @@ "serviceMapUseNativeHistograms": false, "tableType": "traces" } - ], - "title": "Recent Traces", - "transformations": [ + ], + "title": "Recent Traces", + "transformations": [ + { + "id": "filterFieldsByName", + "options": { + "include": { + "names": [ + "Trace ID", + "Start time", + "Name", + "Duration" + ] + } + } + } + ], + "type": "table" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 20 + }, + "id": 500, + "panels": [], + "title": "Flashblock Delivery", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 0, + "y": 21 + }, + "id": 501, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "editorMode": "code", + "expr": "reth_op_rbuilder_flashblock_count{quantile=\"0.5\"}", + "legendFormat": "p50", + "range": true, + "refId": "A" + } + ], + "title": "FB per Block (p50)", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "yellow", + "value": 1 + }, + { + "color": "red", + "value": 5 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 6, + "y": 21 + }, + "id": 502, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "inverted", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": true, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "editorMode": "code", + "expr": "rate(reth_op_rbuilder_missing_flashblocks_count[$__rate_interval]) * 60", + "legendFormat": "missing/min", + "range": true, + "refId": "A" + } + ], + "title": "Missing FB/min", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "decimals": 1, + "max": 100, + "min": 0, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "red", + "value": 0 + }, + { + "color": "yellow", + "value": 90 + }, + { + "color": "green", + "value": 99 + } + ] + }, + "unit": "percent" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 12, + "y": 21 + }, + "id": 503, + "options": { + "colorMode": "value", + "graphMode": "none", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "editorMode": "code", + "expr": "chain_monitor_flashblocks_landed_count / clamp_min(chain_monitor_flashblocks_landed_count + (chain_monitor_flashblocks_missed_count or (0 * chain_monitor_flashblocks_landed_count)), 1) * 100", + "legendFormat": "landing", + "range": true, + "refId": "A" + } + ], + "title": "FB Landing %", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "yellow", + "value": 1 + } + ] + }, + "unit": "short" + }, + "overrides": [] + }, + "gridPos": { + "h": 4, + "w": 6, + "x": 18, + "y": 21 + }, + "id": 504, + "options": { + "colorMode": "value", + "graphMode": "area", + "justifyMode": "auto", + "orientation": "auto", + "percentChangeColorMode": "standard", + "reduceOptions": { + "calcs": [ + "lastNotNull" + ], + "fields": "", + "values": false + }, + "showPercentChange": false, + "textMode": "auto", + "wideLayout": true + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "editorMode": "code", + "expr": "rate(reth_op_rbuilder_reduced_flashblocks_number[$__rate_interval]) * 60", + "legendFormat": "reduced/min", + "range": true, + "refId": "A" + } + ], + "title": "Reduced FB/min", + "type": "stat" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "landed/min" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "green", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "missed/min" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "red", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 0, + "y": 25 + }, + "id": 505, + "options": { + "legend": { + "calcs": [ + "sum", + "last" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "editorMode": "code", + "expr": "rate(chain_monitor_blocks_landed_count[$__rate_interval]) * 60", + "legendFormat": "landed/min", + "range": true, + "refId": "A" + }, + { + "editorMode": "code", + "expr": "rate(chain_monitor_blocks_missed_count[$__rate_interval]) * 60 or on() vector(0)", + "legendFormat": "missed/min", + "range": true, + "refId": "B" + } + ], + "title": "Blocks Landed vs Missed", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "never", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "min": 0, + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "short" + }, + "overrides": [ + { + "matcher": { + "id": "byName", + "options": "landed/min" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "green", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "missed/min" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "red", + "mode": "fixed" + } + } + ] + }, + { + "matcher": { + "id": "byName", + "options": "builder missing/min" + }, + "properties": [ + { + "id": "color", + "value": { + "fixedColor": "orange", + "mode": "fixed" + } + } + ] + } + ] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 25 + }, + "id": 506, + "options": { + "legend": { + "calcs": [ + "sum", + "last" + ], + "displayMode": "table", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "editorMode": "code", + "expr": "rate(chain_monitor_flashblocks_landed_count[$__rate_interval]) * 60", + "legendFormat": "landed/min", + "range": true, + "refId": "A" + }, + { + "editorMode": "code", + "expr": "rate(chain_monitor_flashblocks_missed_count[$__rate_interval]) * 60 or on() vector(0)", + "legendFormat": "missed/min", + "range": true, + "refId": "B" + }, + { + "editorMode": "code", + "expr": "rate(reth_op_rbuilder_missing_flashblocks_count[$__rate_interval]) * 60", + "legendFormat": "builder missing/min", + "range": true, + "refId": "C" + } + ], + "title": "Flashblocks Landed vs Missed", + "type": "timeseries" + }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 33 + }, + "id": 100, + "panels": [], + "title": "Block Building Performance", + "type": "row" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 6, + "x": 0, + "y": 34 + }, + "id": 1, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "12.4.0", + "targets": [ + { + "expr": "reth_op_rbuilder_total_block_built_gauge", + "legendFormat": "latest", + "refId": "A" + } + ], + "title": "Block Build Duration", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "barWidthFactor": 0.6, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "showValues": false, + "spanNulls": true, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": 0 + }, + { + "color": "red", + "value": 80 + } + ] + }, + "unit": "s" + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 6, + "x": 6, + "y": 34 + }, + "id": 2, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "hideZeros": false, + "mode": "multi", + "sort": "desc" + } + }, + "pluginVersion": "12.4.0", + "targets": [ { - "id": "filterFieldsByName", - "options": { - "include": { - "names": [ - "Trace ID", - "Start time", - "Name", - "Duration" - ] - } - } + "expr": "reth_op_rbuilder_state_root_calculation_gauge", + "legendFormat": "latest", + "refId": "A" } ], - "type": "table" - }, - { - "collapsed": false, - "gridPos": { - "h": 1, - "w": 24, - "x": 0, - "y": 14 - }, - "id": 100, - "panels": [], - "title": "Block Building Performance", - "type": "row" + "title": "State Root Duration", + "type": "timeseries" }, { "datasource": { @@ -660,10 +1482,10 @@ "gridPos": { "h": 8, "w": 6, - "x": 0, - "y": 15 + "x": 12, + "y": 34 }, - "id": 1, + "id": 3, "options": { "legend": { "calcs": [], @@ -680,12 +1502,22 @@ "pluginVersion": "12.4.0", "targets": [ { - "expr": "reth_op_rbuilder_total_block_built_gauge", - "legendFormat": "latest", + "expr": "reth_op_rbuilder_flashblock_build_duration{quantile=\"0.5\"}", + "legendFormat": "p50", "refId": "A" + }, + { + "expr": "reth_op_rbuilder_flashblock_build_duration{quantile=\"0.95\"}", + "legendFormat": "p95", + "refId": "B" + }, + { + "expr": "reth_op_rbuilder_flashblock_build_duration{quantile=\"0.99\"}", + "legendFormat": "p99", + "refId": "C" } ], - "title": "Block Build Duration", + "title": "Flashblock Build Duration", "type": "timeseries" }, { @@ -753,10 +1585,10 @@ "gridPos": { "h": 8, "w": 6, - "x": 6, - "y": 15 + "x": 18, + "y": 34 }, - "id": 2, + "id": 7, "options": { "legend": { "calcs": [], @@ -773,14 +1605,42 @@ "pluginVersion": "12.4.0", "targets": [ { - "expr": "reth_op_rbuilder_state_root_calculation_gauge", - "legendFormat": "latest", + "expr": "reth_op_rbuilder_payload_transaction_simulation_gauge", + "legendFormat": "total simulation", "refId": "A" + }, + { + "expr": "reth_op_rbuilder_transaction_pool_fetch_gauge", + "legendFormat": "pool fetch", + "refId": "B" + }, + { + "expr": "reth_op_rbuilder_sequencer_tx_gauge", + "legendFormat": "sequencer tx", + "refId": "C" + }, + { + "expr": "reth_op_rbuilder_state_transition_merge_gauge", + "legendFormat": "state merge", + "refId": "D" } ], - "title": "State Root Duration", + "title": "Tx Simulation Duration", "type": "timeseries" }, + { + "collapsed": false, + "gridPos": { + "h": 1, + "w": 24, + "x": 0, + "y": 42 + }, + "id": 410, + "panels": [], + "title": "Continuous Build", + "type": "row" + }, { "datasource": { "type": "prometheus", @@ -794,11 +1654,9 @@ "custom": { "axisBorderShow": false, "axisCenteredZero": false, - "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, - "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 10, "gradientMode": "none", @@ -815,8 +1673,7 @@ "type": "linear" }, "showPoints": "auto", - "showValues": false, - "spanNulls": true, + "spanNulls": false, "stacking": { "group": "A", "mode": "none" @@ -831,58 +1688,150 @@ "steps": [ { "color": "green", - "value": 0 - }, - { - "color": "red", - "value": 80 + "value": null } ] }, - "unit": "s" + "unit": "short" }, "overrides": [] }, "gridPos": { "h": 8, - "w": 6, - "x": 12, - "y": 15 + "w": 8, + "x": 0, + "y": 43 }, - "id": 3, + "id": 411, "options": { "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true + "calcs": [ + "mean", + "max" + ], + "displayMode": "table", + "placement": "bottom" }, "tooltip": { - "hideZeros": false, "mode": "multi", "sort": "desc" } }, - "pluginVersion": "12.4.0", + "title": "Candidates per Flashblock Interval", + "type": "timeseries", "targets": [ { - "expr": "reth_op_rbuilder_flashblock_build_duration{quantile=\"0.5\"}", - "legendFormat": "p50", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "rate(reth_op_rbuilder_continuous_candidates_evaluated_sum[1m]) / rate(reth_op_rbuilder_continuous_candidates_evaluated_count[1m])", + "legendFormat": "avg evaluated", "refId": "A" }, { - "expr": "reth_op_rbuilder_flashblock_build_duration{quantile=\"0.95\"}", - "legendFormat": "p95", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "rate(reth_op_rbuilder_continuous_candidates_improved_sum[1m]) / rate(reth_op_rbuilder_continuous_candidates_improved_count[1m])", + "legendFormat": "avg improved", "refId": "B" + } + ] + }, + { + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisBorderShow": false, + "axisCenteredZero": false, + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 10, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + } + ] + }, + "unit": "percentunit", + "min": 0, + "max": 1 + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 8, + "x": 8, + "y": 43 + }, + "id": 412, + "options": { + "legend": { + "calcs": [ + "mean", + "min", + "max" + ], + "displayMode": "table", + "placement": "bottom" }, + "tooltip": { + "mode": "multi", + "sort": "desc" + } + }, + "title": "Candidate Improvement Rate", + "type": "timeseries", + "targets": [ { - "expr": "reth_op_rbuilder_flashblock_build_duration{quantile=\"0.99\"}", - "legendFormat": "p99", - "refId": "C" + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "rate(reth_op_rbuilder_continuous_candidates_improved_sum[1m]) / rate(reth_op_rbuilder_continuous_candidates_evaluated_sum[1m])", + "legendFormat": "improvement rate", + "refId": "A" } - ], - "title": "Flashblock Build Duration", - "type": "timeseries" + ] }, { "datasource": { @@ -897,11 +1846,9 @@ "custom": { "axisBorderShow": false, "axisCenteredZero": false, - "axisColorMode": "text", "axisLabel": "", "axisPlacement": "auto", "barAlignment": 0, - "barWidthFactor": 0.6, "drawStyle": "line", "fillOpacity": 10, "gradientMode": "none", @@ -918,8 +1865,7 @@ "type": "linear" }, "showPoints": "auto", - "showValues": false, - "spanNulls": true, + "spanNulls": false, "stacking": { "group": "A", "mode": "none" @@ -934,63 +1880,68 @@ "steps": [ { "color": "green", - "value": 0 - }, - { - "color": "red", - "value": 80 + "value": null } ] }, - "unit": "s" + "unit": "ops" }, "overrides": [] }, "gridPos": { "h": 8, - "w": 6, - "x": 18, - "y": 15 + "w": 8, + "x": 16, + "y": 43 }, - "id": 7, + "id": 413, "options": { "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true + "calcs": [ + "mean", + "p50", + "p99", + "max" + ], + "displayMode": "table", + "placement": "bottom" }, "tooltip": { - "hideZeros": false, "mode": "multi", "sort": "desc" } }, - "pluginVersion": "12.4.0", + "title": "Continuous Trigger Path Rate", + "type": "timeseries", "targets": [ { - "expr": "reth_op_rbuilder_payload_transaction_simulation_gauge", - "legendFormat": "total simulation", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "rate(reth_op_rbuilder_continuous_trigger_ready_total[1m])", + "legendFormat": "ready fast path", "refId": "A" }, { - "expr": "reth_op_rbuilder_transaction_pool_fetch_gauge", - "legendFormat": "pool fetch", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "rate(reth_op_rbuilder_continuous_trigger_miss_total[1m])", + "legendFormat": "trigger miss", "refId": "B" }, { - "expr": "reth_op_rbuilder_sequencer_tx_gauge", - "legendFormat": "sequencer tx", + "datasource": { + "type": "prometheus", + "uid": "prometheus" + }, + "expr": "rate(reth_op_rbuilder_flashblock_publish_suppressed_total[1m])", + "legendFormat": "suppressed", "refId": "C" - }, - { - "expr": "reth_op_rbuilder_state_transition_merge_gauge", - "legendFormat": "state merge", - "refId": "D" } - ], - "title": "Tx Simulation Duration", - "type": "timeseries" + ] }, { "collapsed": true, @@ -998,7 +1949,7 @@ "h": 1, "w": 24, "x": 0, - "y": 23 + "y": 51 }, "id": 104, "panels": [ @@ -1068,7 +2019,7 @@ "h": 8, "w": 12, "x": 0, - "y": 24 + "y": 43 }, "id": 30, "options": { @@ -1207,7 +2158,7 @@ "h": 8, "w": 12, "x": 12, - "y": 24 + "y": 43 }, "id": 31, "options": { @@ -1254,7 +2205,7 @@ "h": 1, "w": 24, "x": 0, - "y": 24 + "y": 52 }, "id": 103, "panels": [ @@ -1316,7 +2267,7 @@ } ] }, - "unit": "µs" + "unit": "\u00b5s" }, "overrides": [] }, @@ -1324,7 +2275,7 @@ "h": 8, "w": 8, "x": 0, - "y": 25 + "y": 44 }, "id": 13, "options": { @@ -1409,7 +2360,7 @@ } ] }, - "unit": "µs" + "unit": "\u00b5s" }, "overrides": [] }, @@ -1417,7 +2368,7 @@ "h": 8, "w": 8, "x": 8, - "y": 25 + "y": 44 }, "id": 14, "options": { @@ -1510,7 +2461,7 @@ "h": 8, "w": 8, "x": 16, - "y": 25 + "y": 44 }, "id": 15, "options": { diff --git a/local-testing/playground.yaml b/local-testing/playground.yaml index 6f44965f8..43ee68f3a 100755 --- a/local-testing/playground.yaml +++ b/local-testing/playground.yaml @@ -2,6 +2,8 @@ base: opstack base_args: - --external-builder - op-rbuilder + - --block-time + - '1' setup: - cd .. && make op-rbuilder-profiling FEATURES=telemetry,loki @@ -42,14 +44,25 @@ recipe: - 0x2a871d0798f97d79848a013d4936a73bf4cc922c825d33c1cf7073dff6d409c6 - --bootnodes - '{{Service "bootnode" "rpc" "enode" "79be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8"}}' + - --flashblocks.block-time + - '200' + - --flashblocks.end-buffer-ms + - '75' + - --flashblocks.send-offset-ms + - '-30' + - --flashblocks.continuous-build + - --txpool.max-account-slots + - '256' - --log.stdout.filter - - op_rbuilder=debug + - op_rbuilder=debug,tx_trace=debug - --telemetry.otlp-endpoint - http://localhost:4318 - --telemetry.sampling-ratio - '1' - --telemetry.loki-url - http://localhost:3100 + env: + ENABLE_TX_TRACKING_DEBUG_LOGS: "true" ports: authrpc: 4444 http: 2222 @@ -76,7 +89,7 @@ recipe: - --l2-rpc - http://host.docker.internal:2222 - --l2-block-time - - 2s + - 1s - --l2-monitor-builder-address - '0xa0Ee7A142d267C1f36714E4a8F75612F20a79720' - --l2-monitor-wallet