arb_node/
producer.rs

1//! Block producer implementation.
2//!
3//! Produces blocks from L1 incoming messages by parsing transactions,
4//! executing them against the current state, and persisting the results.
5
6use std::sync::{
7    atomic::{AtomicBool, AtomicU64, Ordering},
8    Arc,
9};
10
11use alloy_consensus::{
12    proofs,
13    transaction::{SignerRecoverable, TxHashRef},
14    Block, BlockBody, BlockHeader, Header, TxReceipt, EMPTY_OMMER_ROOT_HASH,
15};
16use alloy_eips::eip2718::Decodable2718;
17use alloy_evm::{
18    block::{BlockExecutor, BlockExecutorFactory},
19    EvmFactory,
20};
21use alloy_primitives::{Address, Bytes, B256, B64, U256};
22use alloy_rpc_types_eth::BlockNumberOrTag;
23use parking_lot::Mutex;
24use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain};
25use reth_chainspec::ChainSpec;
26use reth_evm::ConfigureEvm;
27use reth_primitives_traits::{logs_bloom, NodePrimitives, SealedHeader};
28use reth_provider::{BlockNumReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory};
29use reth_revm::database::StateProviderDatabase;
30use reth_storage_api::{StateProvider, StateProviderBox};
31use reth_trie_common::{HashedPostState, TrieInputSorted};
32use revm::database::{BundleState, StateBuilder};
33use revm_database::states::bundle_state::BundleRetention;
34use tracing::{debug, info, warn};
35
36use arb_evm::config::{arbos_version_from_mix_hash, ArbEvmConfig};
37use arb_primitives::{signed_tx::ArbTransactionSigned, tx_types::ArbInternalTx, ArbPrimitives};
38use arb_rpc::block_producer::{
39    BlockProducer, BlockProducerError, BlockProductionInput, ProducedBlock,
40};
41use arbos::{
42    arbos_types::parse_init_message,
43    header::{derive_arb_header_info, ArbHeaderInfo},
44    internal_tx,
45    parse_l2::{parse_l2_transactions, parsed_tx_to_signed, ParsedTransaction},
46};
47
48use crate::genesis;
49
50/// Trait to access the in-memory canonical state from a provider.
51///
52/// `BlockchainProvider` has `canonical_in_memory_state()` as an inherent method
53/// but it's not exposed via any reth trait. This trait bridges that gap so
54/// the block producer can receive the handle generically.
55pub trait InMemoryStateAccess {
56    type Primitives: NodePrimitives;
57    fn canonical_in_memory_state(&self) -> CanonicalInMemoryState<Self::Primitives>;
58}
59
60/// Implement `InMemoryStateAccess` for reth's `BlockchainProvider`.
61impl<N> InMemoryStateAccess for reth_provider::providers::BlockchainProvider<N>
62where
63    N: reth_provider::providers::ProviderNodeTypes,
64{
65    type Primitives = N::Primitives;
66    fn canonical_in_memory_state(&self) -> CanonicalInMemoryState<Self::Primitives> {
67        self.canonical_in_memory_state()
68    }
69}
70
71/// Default number of blocks to buffer before flushing via save_blocks(Full).
72pub const DEFAULT_FLUSH_INTERVAL: u64 = 128;
73
74/// Block producer using reth's save_blocks(Full) for persistence.
75pub struct ArbBlockProducer<Provider> {
76    provider: Provider,
77    chain_spec: Arc<ChainSpec>,
78    evm_config: ArbEvmConfig,
79    in_memory_state: CanonicalInMemoryState<ArbPrimitives>,
80    head_block_num: AtomicU64,
81    blocks_since_flush: AtomicU64,
82    flush_interval: u64,
83    accumulated_trie_input: Mutex<Arc<TrieInputSorted>>,
84    flushing_trie_input: Mutex<Option<Arc<TrieInputSorted>>>,
85    pending_flush: AtomicBool,
86    produce_lock: Mutex<()>,
87    cached_init: Mutex<Option<arbos::arbos_types::ParsedInitMessage>>,
88    /// Finality markers propagated by `nitroexecution_setFinalityData`.
89    finality: Mutex<FinalityMarkers>,
90    /// External shared slot pushed to on every set_finality update so
91    /// the `arb_getValidatedBlock` RPC handler can read it without
92    /// holding a strong reference to the producer.
93    validated_watcher: Mutex<Option<Arc<parking_lot::RwLock<alloy_primitives::B256>>>>,
94    /// Cached coalesced storage overlay for the current in-memory chain.
95    /// Extended in place after each block produced; invalidated on flush
96    /// or rollback so a stale chain view never feeds an SLOAD.
97    cached_overlay: Mutex<Option<CachedOverlay>>,
98    cached_prestate: Mutex<Option<CachedPrestate>>,
99}
100
101#[derive(Debug, Default, Clone)]
102struct FinalityMarkers {
103    safe: Option<alloy_primitives::B256>,
104    finalized: Option<alloy_primitives::B256>,
105    validated: Option<alloy_primitives::B256>,
106}
107
108struct CachedOverlay {
109    parent_hash: B256,
110    overlay: Arc<crate::coalesced_state::CoalescedOverlay>,
111}
112
113struct CachedPrestate {
114    parent_hash: B256,
115    contracts: Arc<alloy_primitives::map::HashMap<B256, revm::bytecode::Bytecode>>,
116}
117
118impl<Provider> ArbBlockProducer<Provider>
119where
120    Provider: BlockNumReader,
121{
122    pub fn new(
123        provider: Provider,
124        chain_spec: Arc<ChainSpec>,
125        evm_config: ArbEvmConfig,
126        in_memory_state: CanonicalInMemoryState<ArbPrimitives>,
127        flush_interval: u64,
128    ) -> Self {
129        let head = provider.last_block_number().unwrap_or(0);
130        Self {
131            provider,
132            chain_spec,
133            evm_config,
134            in_memory_state,
135            head_block_num: AtomicU64::new(head),
136            blocks_since_flush: AtomicU64::new(0),
137            flush_interval,
138            accumulated_trie_input: Mutex::new(Arc::new(TrieInputSorted::default())),
139            flushing_trie_input: Mutex::new(None),
140            pending_flush: AtomicBool::new(false),
141            produce_lock: Mutex::new(()),
142            cached_init: Mutex::new(None),
143            finality: Mutex::new(FinalityMarkers::default()),
144            validated_watcher: Mutex::new(None),
145            cached_overlay: Mutex::new(None),
146            cached_prestate: Mutex::new(None),
147        }
148    }
149
150    fn get_or_build_overlay(
151        &self,
152        parent_hash: B256,
153        head_state: &reth_chain_state::BlockState<ArbPrimitives>,
154    ) -> Arc<crate::coalesced_state::CoalescedOverlay> {
155        let mut cache = self.cached_overlay.lock();
156        if let Some(c) = cache.as_ref() {
157            if c.parent_hash == parent_hash {
158                return c.overlay.clone();
159            }
160        }
161        let overlay = Arc::new(crate::coalesced_state::CoalescedOverlay::from_chain(
162            head_state,
163        ));
164        *cache = Some(CachedOverlay {
165            parent_hash,
166            overlay: overlay.clone(),
167        });
168        overlay
169    }
170
171    fn extend_cached_overlay(&self, new_block_hash: B256, bundle: &BundleState) {
172        let mut cache = self.cached_overlay.lock();
173        let mut overlay = match cache.take() {
174            Some(c) => match Arc::try_unwrap(c.overlay) {
175                Ok(o) => o,
176                Err(arc) => (*arc).clone(),
177            },
178            None => crate::coalesced_state::CoalescedOverlay::default(),
179        };
180        overlay.extend_with_block(bundle);
181        *cache = Some(CachedOverlay {
182            parent_hash: new_block_hash,
183            overlay: Arc::new(overlay),
184        });
185    }
186
187    fn invalidate_cached_overlay(&self) {
188        *self.cached_overlay.lock() = None;
189    }
190
191    fn get_or_build_prestate(
192        &self,
193        parent_hash: B256,
194        head_state: Option<&reth_chain_state::BlockState<ArbPrimitives>>,
195    ) -> Arc<alloy_primitives::map::HashMap<B256, revm::bytecode::Bytecode>> {
196        let mut cache = self.cached_prestate.lock();
197        if let Some(c) = cache.as_ref() {
198            if c.parent_hash == parent_hash {
199                return c.contracts.clone();
200            }
201        }
202        let mut contracts: alloy_primitives::map::HashMap<B256, revm::bytecode::Bytecode> =
203            Default::default();
204        if let Some(head_state) = head_state {
205            for block_state in head_state.chain() {
206                let exec_output = &block_state.block().execution_output;
207                for (hash, code) in &exec_output.state.contracts {
208                    contracts.entry(*hash).or_insert_with(|| code.clone());
209                }
210            }
211        }
212        let arc = Arc::new(contracts);
213        *cache = Some(CachedPrestate {
214            parent_hash,
215            contracts: arc.clone(),
216        });
217        arc
218    }
219
220    fn extend_cached_prestate(&self, new_block_hash: B256, bundle: &BundleState) {
221        let mut cache = self.cached_prestate.lock();
222        let mut contracts = match cache.take() {
223            Some(c) => match Arc::try_unwrap(c.contracts) {
224                Ok(map) => map,
225                Err(arc) => (*arc).clone(),
226            },
227            None => Default::default(),
228        };
229        for (hash, code) in &bundle.contracts {
230            contracts.entry(*hash).or_insert_with(|| code.clone());
231        }
232        *cache = Some(CachedPrestate {
233            parent_hash: new_block_hash,
234            contracts: Arc::new(contracts),
235        });
236    }
237
238    fn invalidate_cached_prestate(&self) {
239        *self.cached_prestate.lock() = None;
240    }
241
242    /// Currently-tracked finality markers (for RPC / debugging use).
243    pub fn finality_markers(
244        &self,
245    ) -> (
246        Option<alloy_primitives::B256>,
247        Option<alloy_primitives::B256>,
248        Option<alloy_primitives::B256>,
249    ) {
250        let f = self.finality.lock();
251        (f.safe, f.finalized, f.validated)
252    }
253}
254
255impl<Provider> ArbBlockProducer<Provider>
256where
257    Provider: BlockNumReader
258        + BlockReaderIdExt
259        + HeaderProvider<Header = Header>
260        + StateProviderFactory
261        + Send
262        + Sync
263        + 'static,
264{
265    /// Get the current head block number (includes in-memory buffered blocks).
266    fn head_block_number(&self) -> Result<u64, BlockProducerError> {
267        let head = self.head_block_num.load(Ordering::SeqCst);
268        if head > 0 {
269            Ok(head)
270        } else {
271            self.provider
272                .last_block_number()
273                .map_err(|e| BlockProducerError::StateAccess(e.to_string()))
274        }
275    }
276
277    /// Get the parent sealed header for block production.
278    fn parent_header(&self, head_num: u64) -> Result<SealedHeader<Header>, BlockProducerError> {
279        self.provider
280            .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(head_num))
281            .map_err(|e| BlockProducerError::StateAccess(e.to_string()))?
282            .ok_or_else(|| {
283                BlockProducerError::StateAccess(format!("Parent block {head_num} not found"))
284            })
285    }
286
287    /// Produce a block with full transaction execution.
288    fn produce_block_with_execution(
289        &self,
290        input: &BlockProductionInput,
291        parsed_txs: Vec<ParsedTransaction>,
292    ) -> Result<ProducedBlock, BlockProducerError> {
293        // Check if a background flush completed.
294        if self.pending_flush.load(Ordering::SeqCst) {
295            if let Some(result) = crate::launcher::try_flush_result() {
296                self.in_memory_state
297                    .remove_persisted_blocks(result.last_num_hash);
298                *self.flushing_trie_input.lock() = None;
299                self.pending_flush.store(false, Ordering::SeqCst);
300                self.invalidate_cached_overlay();
301                self.invalidate_cached_prestate();
302                info!(
303                    target: "block_producer",
304                    flushed = result.count,
305                    last_block = result.last_num_hash.number,
306                    duration_ms = result.duration.as_millis(),
307                    "Background flush completed"
308                );
309            }
310        }
311
312        let head_num = self.head_block_number()?;
313        let l2_block_number = head_num + 1;
314        let parent_header = self.parent_header(head_num)?;
315
316        let timestamp = input.l1_timestamp.max(parent_header.timestamp());
317        let time_passed = timestamp.saturating_sub(parent_header.timestamp());
318
319        let parent_mix_hash = parent_header.mix_hash().unwrap_or_default();
320        let parent_arbos_version = arbos_version_from_mix_hash(&parent_mix_hash);
321
322        // Build the EVM environment for this block.
323        let l1_block_number = input.l1_block_number;
324        let arbos_version = parent_arbos_version; // May upgrade during StartBlock
325
326        // Construct a provisional mix_hash for the EVM environment.
327        let send_count = {
328            let mut buf = [0u8; 8];
329            buf.copy_from_slice(&parent_mix_hash.0[0..8]);
330            u64::from_be_bytes(buf)
331        };
332        let provisional_mix_hash = compute_mix_hash(send_count, l1_block_number, arbos_version);
333
334        // Open state at parent block via block hash.
335        let raw_state_provider = self
336            .provider
337            .state_by_block_hash(parent_header.hash())
338            .map_err(|e| BlockProducerError::StateAccess(e.to_string()))?;
339
340        let state_provider: StateProviderBox =
341            if let Some(head_state) = self.in_memory_state.state_by_hash(parent_header.hash()) {
342                let overlay = self.get_or_build_overlay(parent_header.hash(), &head_state);
343                if overlay.is_empty() {
344                    raw_state_provider
345                } else {
346                    crate::coalesced_state::CoalescedStateProvider::new(raw_state_provider, overlay)
347                        .boxed()
348                }
349            } else {
350                raw_state_provider
351            };
352
353        // Read the L2 baseFee from the parent's committed state.
354        let l2_base_fee = {
355            let read_slot = |addr: Address, slot: B256| -> Option<U256> {
356                state_provider.storage(addr, slot).ok().flatten()
357            };
358            arbos::header::read_l2_base_fee(&read_slot).or(parent_header.base_fee_per_gas())
359        };
360
361        // Build a provisional header for the EVM config.
362        let provisional_header = Header {
363            parent_hash: parent_header.hash(),
364            ommers_hash: EMPTY_OMMER_ROOT_HASH,
365            beneficiary: input.sender,
366            state_root: B256::ZERO, // placeholder
367            transactions_root: B256::ZERO,
368            receipts_root: B256::ZERO,
369            withdrawals_root: None,
370            logs_bloom: Default::default(),
371            timestamp,
372            mix_hash: provisional_mix_hash,
373            nonce: B64::from(input.delayed_messages_read.to_be_bytes()),
374            base_fee_per_gas: l2_base_fee,
375            number: l2_block_number,
376            gas_limit: parent_header.gas_limit(),
377            difficulty: U256::from(1),
378            gas_used: 0,
379            extra_data: Default::default(),
380            parent_beacon_block_root: None,
381            blob_gas_used: None,
382            excess_blob_gas: None,
383            requests_hash: None,
384        };
385
386        let evm_env = self
387            .evm_config
388            .evm_env(&provisional_header)
389            .map_err(|_| BlockProducerError::Execution("evm_env construction failed".into()))?;
390
391        // Collect bytecodes from in-memory blocks that might not be flushed to DB yet.
392        // When a Stylus contract is deployed in a recent block and the flush hasn't
393        // persisted it yet, the DB's Bytecodes table won't have the code. The
394        // State<DB>'s `code_by_hash` with `use_preloaded_bundle` will check the
395        // bundle_state.contracts before falling back to the DB, ensuring all
396        // bytecodes from recent blocks are available during execution.
397        let prestate = {
398            let head_state_opt = self.in_memory_state.state_by_hash(parent_header.hash());
399            let contracts =
400                self.get_or_build_prestate(parent_header.hash(), head_state_opt.as_deref());
401            BundleState {
402                contracts: (*contracts).clone(),
403                ..Default::default()
404            }
405        };
406
407        let mut db = StateBuilder::new()
408            .with_database(StateProviderDatabase::new(state_provider.as_ref()))
409            .with_bundle_prestate(prestate)
410            .with_bundle_update()
411            .without_state_clear()
412            .build();
413
414        let chain_id = self.chain_spec.chain().id();
415
416        // Apply cached ArbOS Init during block 1.
417        // Two cases:
418        //   - ArbOS not yet initialized (no chainspec alloc): full init from message.
419        //   - ArbOS already initialized (chainspec did it with placeholder L1 base fee): override
420        //     the L1 price_per_unit slot with the value from the init message, since chainspec has
421        //     no way to know the real value.
422        if let Some(init_msg) = self.cached_init.lock().take() {
423            if !genesis::is_arbos_initialized(&mut db) {
424                let initial_version = std::env::var("ARB_INITIAL_ARBOS_VERSION")
425                    .ok()
426                    .and_then(|v| v.parse::<u64>().ok())
427                    .unwrap_or(genesis::INITIAL_ARBOS_VERSION);
428                info!(
429                    target: "block_producer",
430                    initial_version,
431                    "Applying cached ArbOS Init during block {} execution",
432                    l2_block_number
433                );
434                genesis::initialize_arbos_state(
435                    &mut db,
436                    &init_msg,
437                    chain_id,
438                    initial_version,
439                    genesis::DEFAULT_CHAIN_OWNER,
440                    genesis::ArbOSInit::default(),
441                )
442                .map_err(BlockProducerError::Execution)?;
443            } else {
444                use arbos::{arbos_state::ArbosState, burn::SystemBurner};
445                info!(
446                    target: "block_producer",
447                    initial_l1_base_fee = %init_msg.initial_l1_base_fee,
448                    "ArbOS already initialized; overriding L1 price_per_unit from Init message"
449                );
450                let state_ptr = &mut db as *mut _;
451                if let Ok(mut arb_state) =
452                    ArbosState::open(state_ptr, SystemBurner::new(None, false))
453                {
454                    let _ = arb_state
455                        .l1_pricing_state
456                        .set_price_per_unit(init_msg.initial_l1_base_fee);
457                    // Optional ArbOS upgrade hook for benchmarking: lets the
458                    // bench's subprocess boot at any target ArbOS version
459                    // without needing to schedule an on-chain upgrade.
460                    if let Ok(target) = std::env::var("ARB_INITIAL_ARBOS_VERSION") {
461                        if let Ok(target_version) = target.parse::<u64>() {
462                            let current = arb_state.arbos_version();
463                            if target_version > current {
464                                if let Err(e) =
465                                    arb_state.upgrade_arbos_version(target_version, true)
466                                {
467                                    info!(target: "block_producer", err = ?e, target_version, "ArbOS upgrade via env var failed");
468                                } else {
469                                    info!(
470                                        target: "block_producer",
471                                        from = current,
472                                        to = target_version,
473                                        "ArbOS upgraded via ARB_INITIAL_ARBOS_VERSION"
474                                    );
475                                }
476                            }
477                        }
478                    }
479                }
480            }
481        }
482
483        let parent_extra = parent_header.extra_data().to_vec();
484        let mut exec_extra = parent_extra.clone();
485        exec_extra.resize(32, 0);
486        exec_extra.extend_from_slice(&input.delayed_messages_read.to_be_bytes());
487
488        let exec_ctx = alloy_evm::eth::EthBlockExecutionCtx {
489            tx_count_hint: Some(parsed_txs.len() + 2), // +2 for internal txs
490            parent_hash: parent_header.hash(),
491            parent_beacon_block_root: None,
492            ommers: &[],
493            withdrawals: None,
494            extra_data: exec_extra.into(),
495        };
496
497        // Create the block executor via the factory.
498        let evm = self
499            .evm_config
500            .block_executor_factory()
501            .evm_factory()
502            .create_evm(&mut db, evm_env.clone());
503        let mut executor = self
504            .evm_config
505            .block_executor_factory()
506            .create_arb_executor(evm, exec_ctx, chain_id);
507        executor.arb_ctx.l2_block_number = l2_block_number;
508        executor.arb_ctx.l1_block_number = l1_block_number;
509
510        // Populate L2 block hash cache for arbBlockHash().
511        {
512            let parent_num = l2_block_number.saturating_sub(1);
513            arb_precompiles::set_l2_block_hash(parent_num, parent_header.hash());
514
515            // If cache is mostly empty (first block or after restart), do a full populate.
516            if arb_precompiles::get_l2_block_hash(parent_num.saturating_sub(1)).is_none()
517                && parent_num > 1
518            {
519                let mut hash = parent_header.parent_hash();
520                for i in 2..=256u64 {
521                    let n = l2_block_number.checked_sub(i);
522                    if let Some(n) = n {
523                        arb_precompiles::set_l2_block_hash(n, hash);
524                        match self
525                            .provider
526                            .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(n))
527                        {
528                            Ok(Some(h)) => hash = h.parent_hash(),
529                            _ => break,
530                        }
531                    }
532                }
533            }
534        }
535
536        // Apply pre-execution changes (loads ArbOS state, fee accounts, block hashes).
537        executor
538            .apply_pre_execution_changes()
539            .map_err(|e| BlockProducerError::Execution(format!("pre-exec: {e}")))?;
540
541        let mut all_txs: Vec<ArbTransactionSigned> = Vec::new();
542
543        // 1. Generate and execute the StartBlock internal tx (always first).
544        let l1_base_fee = input.l1_base_fee.unwrap_or(U256::ZERO);
545        let start_block_data = internal_tx::encode_start_block(
546            l1_base_fee,
547            l1_block_number,
548            l2_block_number,
549            time_passed,
550        );
551
552        let start_block_tx = create_internal_tx(chain_id, &start_block_data);
553        execute_and_commit_tx(&mut executor, &start_block_tx, "StartBlock")?;
554        all_txs.push(start_block_tx);
555
556        // Warm sender caches in parallel; kinds with an embedded `from` are skipped.
557        let pre_recovered: Vec<Option<ArbTransactionSigned>> = {
558            use rayon::prelude::*;
559            parsed_txs
560                .par_iter()
561                .map(|parsed| match parsed {
562                    ParsedTransaction::InternalStartBlock { .. }
563                    | ParsedTransaction::BatchPostingReport { .. } => None,
564                    other => {
565                        let signed = parsed_tx_to_signed(other, chain_id)?;
566                        let _ = signed.recover_signer();
567                        Some(signed)
568                    }
569                })
570                .collect()
571        };
572
573        // 2. Execute parsed user transactions.
574        for (idx, parsed) in parsed_txs.iter().enumerate() {
575            match parsed {
576                ParsedTransaction::InternalStartBlock { .. } => {
577                    // StartBlock is handled above, skip.
578                    continue;
579                }
580                ParsedTransaction::BatchPostingReport {
581                    batch_timestamp,
582                    batch_poster,
583                    batch_number,
584                    l1_base_fee_estimate,
585                    extra_gas,
586                    ..
587                } => {
588                    // Delayed message kind=13 contains a batch posting report.
589                    // Encode as V1 or V2 based on parent ArbOS version.
590                    let report_data =
591                        if parent_arbos_version >= arb_chainspec::arbos_version::ARBOS_VERSION_50 {
592                            // V2: pass raw batch data stats + extra_gas.
593                            let (length, non_zeros) = input.batch_data_stats.unwrap_or((0, 0));
594                            internal_tx::encode_batch_posting_report_v2(
595                                *batch_timestamp,
596                                *batch_poster,
597                                *batch_number,
598                                length,
599                                non_zeros,
600                                *extra_gas,
601                                *l1_base_fee_estimate,
602                            )
603                        } else {
604                            // V1: combine legacy gas cost + extra_gas into single field.
605                            let legacy_gas = input.batch_gas_cost.unwrap_or(0);
606                            let batch_data_gas = legacy_gas.saturating_add(*extra_gas);
607                            internal_tx::encode_batch_posting_report(
608                                *batch_timestamp,
609                                *batch_poster,
610                                *batch_number,
611                                batch_data_gas,
612                                *l1_base_fee_estimate,
613                            )
614                        };
615                    let report_tx = create_internal_tx(chain_id, &report_data);
616                    execute_and_commit_tx(&mut executor, &report_tx, "BatchPostingReport")?;
617                    all_txs.push(report_tx);
618                    continue;
619                }
620                _ => {}
621            }
622
623            let signed_tx = match pre_recovered.get(idx).and_then(|s| s.clone()) {
624                Some(tx) => tx,
625                None => {
626                    debug!(target: "block_producer", ?parsed, "Skipping unparseable transaction");
627                    continue;
628                }
629            };
630
631            let recovered = match signed_tx.clone().try_into_recovered() {
632                Ok(r) => r,
633                Err(e) => {
634                    warn!(target: "block_producer", error = %e, "Failed to recover tx sender, skipping");
635                    continue;
636                }
637            };
638            let tx_hash = *signed_tx.tx_hash();
639            let (exec_outcome, hostio_records) = arb_rpc::stylus_tracer::with_trace_buffer(|| {
640                executor.execute_transaction_without_commit(recovered)
641            });
642            match exec_outcome {
643                Ok(result) => {
644                    match executor.commit_transaction(result) {
645                        Ok(_gas_used) => {
646                            all_txs.push(signed_tx);
647                            if !hostio_records.is_empty() {
648                                arb_rpc::stylus_tracer::cache_trace(tx_hash, hostio_records);
649                            }
650
651                            // Drain and execute any scheduled txs (auto-redeems).
652                            // After a SubmitRetryable or manual Redeem precompile call,
653                            // the executor queues retry txs that must execute in the
654                            // same block, immediately after the triggering tx.
655                            loop {
656                                let scheduled = executor.drain_scheduled_txs();
657                                debug!(
658                                    target: "block_producer",
659                                    count = scheduled.len(),
660                                    "Drained scheduled txs"
661                                );
662                                if scheduled.is_empty() {
663                                    break;
664                                }
665                                for encoded in scheduled {
666                                    let retry_tx: Option<ArbTransactionSigned> =
667                                        ArbTransactionSigned::decode_2718(&mut &encoded[..]).ok();
668                                    if let Some(retry_tx) = retry_tx {
669                                        let retry_signed = retry_tx.clone();
670                                        let retry_hash = *retry_signed.tx_hash();
671                                        match retry_tx.try_into_recovered() {
672                                            Ok(recovered_retry) => {
673                                                let (retry_outcome, retry_records) =
674                                                    arb_rpc::stylus_tracer::with_trace_buffer(
675                                                        || {
676                                                            executor
677                                                                .execute_transaction_without_commit(
678                                                                    recovered_retry,
679                                                                )
680                                                        },
681                                                    );
682                                                match retry_outcome {
683                                                    Ok(retry_result) => {
684                                                        match executor
685                                                            .commit_transaction(retry_result)
686                                                        {
687                                                            Ok(_) => {
688                                                                all_txs.push(retry_signed);
689                                                                if !retry_records.is_empty() {
690                                                                    arb_rpc::stylus_tracer::cache_trace(
691                                                                        retry_hash,
692                                                                        retry_records,
693                                                                    );
694                                                                }
695                                                            }
696                                                            Err(e) => {
697                                                                warn!(
698                                                                    target: "block_producer",
699                                                                    error = %e,
700                                                                    "Failed to commit auto-redeem tx"
701                                                                );
702                                                            }
703                                                        }
704                                                    }
705                                                    Err(e) => {
706                                                        warn!(
707                                                            target: "block_producer",
708                                                            error = %e,
709                                                            "Auto-redeem tx execution failed"
710                                                        );
711                                                    }
712                                                }
713                                            }
714                                            Err(e) => {
715                                                warn!(
716                                                    target: "block_producer",
717                                                    error = %e,
718                                                    "Failed to recover auto-redeem tx sender"
719                                                );
720                                            }
721                                        }
722                                    }
723                                }
724                            }
725                        }
726                        Err(e) => {
727                            warn!(target: "block_producer", error = %e, "Failed to commit transaction");
728                        }
729                    }
730                }
731                Err(ref e) if e.to_string().contains("block gas limit reached") => {
732                    break;
733                }
734                Err(e) => {
735                    warn!(target: "block_producer", error = %e, "Transaction execution failed, skipping");
736                }
737            }
738        }
739
740        let zombie_accounts = executor.zombie_accounts().clone();
741        let finalise_deleted = executor.finalise_deleted().clone();
742
743        let (_, exec_result) = executor
744            .finish()
745            .map_err(|e| BlockProducerError::Execution(format!("finish: {e}")))?;
746
747        let receipts: Vec<arb_primitives::ArbReceipt> = exec_result.receipts;
748
749        db.merge_transitions(BundleRetention::Reverts);
750        let mut bundle = db.take_bundle();
751
752        augment_bundle_from_cache(&mut bundle, &db.cache, &*state_provider);
753
754        // Mark per-tx finalise deletions, skipping zombie accounts.
755        let keccak_empty_hash = alloy_primitives::B256::from(alloy_primitives::keccak256([]));
756        for addr in &finalise_deleted {
757            if zombie_accounts.contains(addr) {
758                continue;
759            }
760            if bundle.state.contains_key(addr) {
761                let existed_before = state_provider.basic_account(addr).ok().flatten().is_some();
762                if existed_before {
763                    // Account was in the trie. Only mark as deleted if it's
764                    // still empty — it may have been re-created with non-zero
765                    // state (e.g., nonce=1) by a later tx in this block.
766                    let still_empty = bundle
767                        .state
768                        .get(addr)
769                        .and_then(|a| a.info.as_ref())
770                        .is_none_or(|info| {
771                            info.nonce == 0
772                                && info.balance.is_zero()
773                                && info.code_hash == keccak_empty_hash
774                        });
775                    if still_empty {
776                        if let Some(bundle_acct) = bundle.state.get_mut(addr) {
777                            bundle_acct.info = None;
778                        }
779                    }
780                } else {
781                    let still_empty = bundle
782                        .state
783                        .get(addr)
784                        .and_then(|a| a.info.as_ref())
785                        .is_none_or(|info| {
786                            info.nonce == 0
787                                && info.balance.is_zero()
788                                && info.code_hash == keccak_empty_hash
789                        });
790                    if still_empty {
791                        bundle.state.remove(addr);
792                    }
793                }
794                continue;
795            }
796            if let Ok(Some(acct)) = state_provider.basic_account(addr) {
797                let was_originally_empty = acct.balance.is_zero()
798                    && acct.nonce == 0
799                    && acct.bytecode_hash.is_none_or(|h| h == keccak_empty_hash);
800                if was_originally_empty {
801                    continue;
802                }
803                bundle.state.insert(
804                    *addr,
805                    revm_database::BundleAccount {
806                        info: None, // signals trie deletion
807                        original_info: None,
808                        storage: Default::default(),
809                        status: revm_database::AccountStatus::Changed,
810                    },
811                );
812            }
813        }
814
815        filter_unchanged_storage(&mut bundle);
816        delete_empty_accounts(&mut bundle, &zombie_accounts, &*state_provider);
817
818        let hashed_state =
819            HashedPostState::from_bundle_state::<reth_trie_common::KeccakKeyHasher>(bundle.state());
820
821        let (state_root, trie_updates) = {
822            let acc_arc = self.accumulated_trie_input.lock().clone();
823            let flushing_arc = self.flushing_trie_input.lock().clone();
824
825            let block_state_sorted = hashed_state.clone().into_sorted();
826            let prefix_sets = block_state_sorted.construct_prefix_sets().freeze();
827
828            let mut new_acc_state = (*acc_arc.state).clone();
829            new_acc_state.extend_ref_and_sort(&block_state_sorted);
830            let new_acc_state_arc = Arc::new(new_acc_state);
831
832            let (overlay_state_arc, overlay_nodes_arc) = if let Some(f) = &flushing_arc {
833                let mut s = (*f.state).clone();
834                s.extend_ref_and_sort(&new_acc_state_arc);
835                let mut n = (*f.nodes).clone();
836                n.extend_ref_and_sort(&acc_arc.nodes);
837                (Arc::new(s), Arc::new(n))
838            } else {
839                (Arc::clone(&new_acc_state_arc), Arc::clone(&acc_arc.nodes))
840            };
841
842            let overlay = Arc::new(TrieInputSorted::new(
843                overlay_nodes_arc,
844                overlay_state_arc,
845                Default::default(),
846            ));
847
848            let (root, updates) =
849                crate::launcher::compute_parallel_state_root(overlay, prefix_sets)
850                    .map_err(|e| BlockProducerError::Execution(format!("state root: {e}")))?;
851
852            let mut new_acc_nodes = (*acc_arc.nodes).clone();
853            new_acc_nodes.extend_ref_and_sort(&updates.clone_into_sorted());
854            *self.accumulated_trie_input.lock() = Arc::new(TrieInputSorted::new(
855                Arc::new(new_acc_nodes),
856                new_acc_state_arc,
857                Default::default(),
858            ));
859
860            (root, updates)
861        };
862
863        // Derive header info (send_root, send_count, etc.) from post-execution state.
864        let arb_info = derive_header_info_from_state(state_provider.as_ref(), &bundle);
865
866        let final_mix_hash = arb_info
867            .as_ref()
868            .map(|info| info.compute_mix_hash())
869            .unwrap_or(provisional_mix_hash);
870
871        let extra_data: Bytes = arb_info
872            .as_ref()
873            .map(|info| {
874                let mut data = info.send_root.to_vec();
875                data.resize(32, 0);
876                data.into()
877            })
878            .unwrap_or_else(|| {
879                let mut data = parent_extra.clone();
880                data.resize(32, 0);
881                data.into()
882            });
883
884        let send_root = arb_info
885            .as_ref()
886            .map(|info| info.send_root)
887            .unwrap_or_else(|| {
888                if parent_extra.len() >= 32 {
889                    B256::from_slice(&parent_extra[..32])
890                } else {
891                    B256::ZERO
892                }
893            });
894
895        // Compute receipt-derived fields.
896        let gas_used = exec_result.gas_used;
897        let logs_bloom_val = logs_bloom(receipts.iter().flat_map(|r| r.logs()));
898
899        let transactions_root =
900            proofs::calculate_transaction_root::<ArbTransactionSigned>(&all_txs);
901        let receipts_root = proofs::calculate_receipt_root(
902            &receipts
903                .iter()
904                .map(|r| r.with_bloom_ref())
905                .collect::<Vec<_>>(),
906        );
907
908        let header = Header {
909            parent_hash: parent_header.hash(),
910            ommers_hash: EMPTY_OMMER_ROOT_HASH,
911            beneficiary: input.sender,
912            state_root,
913            transactions_root,
914            receipts_root,
915            withdrawals_root: None,
916            logs_bloom: logs_bloom_val,
917            timestamp,
918            mix_hash: final_mix_hash,
919            nonce: B64::from(input.delayed_messages_read.to_be_bytes()),
920            base_fee_per_gas: l2_base_fee,
921            number: l2_block_number,
922            gas_limit: parent_header.gas_limit(),
923            difficulty: U256::from(1),
924            gas_used,
925            extra_data,
926            parent_beacon_block_root: None,
927            blob_gas_used: None,
928            excess_blob_gas: None,
929            requests_hash: None,
930        };
931
932        let block = Block::<ArbTransactionSigned> {
933            header,
934            body: BlockBody {
935                transactions: all_txs,
936                ommers: Default::default(),
937                withdrawals: None,
938            },
939        };
940
941        let sealed = reth_primitives_traits::SealedBlock::seal_slow(block);
942        let block_hash = sealed.hash();
943
944        self.extend_cached_overlay(block_hash, &bundle);
945        self.extend_cached_prestate(block_hash, &bundle);
946
947        // Buffer block in memory for batched persistence.
948        {
949            use alloy_evm::block::BlockExecutionResult;
950            use reth_chain_state::ComputedTrieData;
951            use reth_execution_types::BlockExecutionOutput;
952            use reth_primitives_traits::RecoveredBlock;
953
954            let recovered = Arc::new(RecoveredBlock::new_sealed(sealed.clone(), vec![]));
955            let exec_output = Arc::new(BlockExecutionOutput {
956                state: bundle,
957                result: BlockExecutionResult {
958                    receipts,
959                    requests: Default::default(),
960                    gas_used,
961                    blob_gas_used: 0,
962                },
963            });
964            let computed = ComputedTrieData {
965                hashed_state: Arc::new(hashed_state.into_sorted()),
966                trie_updates: Arc::new(trie_updates.into_sorted()),
967                anchored_trie_input: None,
968            };
969            let executed = ExecutedBlock::new(recovered, exec_output, computed);
970
971            self.in_memory_state
972                .update_chain(NewCanonicalChain::Commit {
973                    new: vec![executed],
974                });
975
976            let sealed_header = SealedHeader::new(sealed.header().clone(), sealed.hash());
977            self.in_memory_state.set_canonical_head(sealed_header);
978        }
979
980        self.head_block_num.store(l2_block_number, Ordering::SeqCst);
981
982        // Start async flush when buffer threshold reached (non-blocking).
983        let since_flush = self.blocks_since_flush.fetch_add(1, Ordering::SeqCst) + 1;
984        if since_flush >= self.flush_interval && !self.pending_flush.load(Ordering::SeqCst) {
985            self.start_async_flush();
986        }
987
988        info!(
989            target: "block_producer",
990            block_num = l2_block_number,
991            ?block_hash,
992            ?send_root,
993            ?state_root,
994            num_txs = sealed.body().transactions.len(),
995            gas_used,
996            "Produced block"
997        );
998
999        Ok(ProducedBlock {
1000            block_hash,
1001            send_root,
1002        })
1003    }
1004
1005    /// Start an async (non-blocking) flush to the background persistence thread.
1006    fn start_async_flush(&self) {
1007        let mut blocks: Vec<ExecutedBlock<ArbPrimitives>> = Vec::new();
1008        if let Some(head_state) = self.in_memory_state.head_state() {
1009            for block_state in head_state.chain() {
1010                blocks.push(block_state.block().clone());
1011            }
1012        }
1013        blocks.reverse();
1014
1015        if blocks.is_empty() {
1016            return;
1017        }
1018
1019        let last = blocks.last().unwrap();
1020        let last_num_hash = alloy_eips::BlockNumHash::new(
1021            last.recovered_block().number(),
1022            last.recovered_block().hash(),
1023        );
1024
1025        // Double-buffer: move current accumulator to flushing slot.
1026        let current = std::mem::take(&mut *self.accumulated_trie_input.lock());
1027        *self.flushing_trie_input.lock() = Some(current);
1028
1029        self.blocks_since_flush.store(0, Ordering::SeqCst);
1030        self.pending_flush.store(true, Ordering::SeqCst);
1031
1032        let count = blocks.len();
1033        crate::launcher::start_flush(crate::launcher::FlushRequest {
1034            blocks,
1035            last_num_hash,
1036        });
1037
1038        debug!(
1039            target: "block_producer",
1040            count,
1041            last_block = last_num_hash.number,
1042            "Started async flush"
1043        );
1044    }
1045
1046    /// Produce a minimal block for messages with no transactions.
1047    #[allow(dead_code)]
1048    fn produce_empty_block(
1049        &self,
1050        input: &BlockProductionInput,
1051    ) -> Result<ProducedBlock, BlockProducerError> {
1052        // Empty blocks still need StartBlock execution for ArbOS state updates.
1053        self.produce_block_with_execution(input, vec![])
1054    }
1055}
1056
1057#[async_trait::async_trait]
1058impl<Provider> BlockProducer for ArbBlockProducer<Provider>
1059where
1060    Provider: BlockNumReader
1061        + BlockReaderIdExt
1062        + HeaderProvider<Header = Header>
1063        + StateProviderFactory
1064        + Send
1065        + Sync
1066        + 'static,
1067{
1068    fn cache_init_message(&self, l2_msg: &[u8]) -> Result<(), BlockProducerError> {
1069        let init_msg = parse_init_message(l2_msg)
1070            .map_err(|e| BlockProducerError::Parse(format!("init message: {e}")))?;
1071
1072        info!(
1073            target: "block_producer",
1074            chain_id = %init_msg.chain_id,
1075            initial_l1_base_fee = %init_msg.initial_l1_base_fee,
1076            "Cached Init message params"
1077        );
1078
1079        *self.cached_init.lock() = Some(init_msg);
1080        Ok(())
1081    }
1082
1083    async fn produce_block(
1084        &self,
1085        msg_idx: u64,
1086        input: BlockProductionInput,
1087    ) -> Result<ProducedBlock, BlockProducerError> {
1088        let _lock = self.produce_lock.lock();
1089
1090        // Validate that this message is the next expected one.
1091        let head_num = self.head_block_number()?;
1092        let expected_block = head_num + 1;
1093        let actual_block = msg_idx;
1094
1095        if expected_block != actual_block {
1096            return Err(BlockProducerError::Unexpected(format!(
1097                "Expected block {expected_block} but got msg_idx {msg_idx} (block {actual_block})"
1098            )));
1099        }
1100
1101        // Parse L2 transactions from the message.
1102        let chain_id = self.chain_spec.chain().id();
1103
1104        let parsed_txs = parse_l2_transactions(
1105            input.kind,
1106            input.sender,
1107            &input.l2_msg,
1108            input.request_id,
1109            input.l1_base_fee,
1110            chain_id,
1111        )
1112        .unwrap_or_else(|e| {
1113            warn!(target: "block_producer", error=%e, "Error parsing L2 message, treating as empty");
1114            vec![]
1115        });
1116
1117        debug!(
1118            target: "block_producer",
1119            msg_idx,
1120            kind = input.kind,
1121            num_txs = parsed_txs.len(),
1122            "Parsed L1 message"
1123        );
1124
1125        self.produce_block_with_execution(&input, parsed_txs)
1126    }
1127
1128    async fn reset_to_block(&self, target_block_number: u64) -> Result<(), BlockProducerError> {
1129        let _lock = self.produce_lock.lock();
1130        let current = self.head_block_num.load(Ordering::SeqCst);
1131        if target_block_number > current {
1132            return Err(BlockProducerError::Unexpected(format!(
1133                "reset target {target_block_number} > current head {current}"
1134            )));
1135        }
1136        if target_block_number == current {
1137            return Ok(());
1138        }
1139
1140        let header = self
1141            .provider
1142            .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(target_block_number))
1143            .map_err(|e| BlockProducerError::StateAccess(e.to_string()))?
1144            .ok_or_else(|| {
1145                BlockProducerError::Unexpected(format!(
1146                    "reset target block {target_block_number} not found"
1147                ))
1148            })?;
1149
1150        // Drain any in-flight flush before unwinding so disk state is consistent.
1151        if self.pending_flush.load(Ordering::SeqCst) {
1152            if let Some(result) = crate::launcher::try_flush_result() {
1153                self.in_memory_state
1154                    .remove_persisted_blocks(result.last_num_hash);
1155                *self.flushing_trie_input.lock() = None;
1156                self.pending_flush.store(false, Ordering::SeqCst);
1157            }
1158        }
1159
1160        // Walk blocks above target in the in-memory state and gather
1161        // them as "old" for a reorg. Without them, the canonical head
1162        // points at the truncated block but consumers still see the
1163        // stale blocks in memory.
1164        let mut old_blocks: Vec<reth_chain_state::ExecutedBlock<ArbPrimitives>> = Vec::new();
1165        for bn in (target_block_number + 1)..=current {
1166            if let Some(state) = self.in_memory_state.state_by_number(bn) {
1167                old_blocks.push(state.block());
1168            }
1169        }
1170
1171        // Reorg with no new blocks => pure rollback.
1172        if !old_blocks.is_empty() {
1173            self.in_memory_state
1174                .update_chain(reth_chain_state::NewCanonicalChain::Reorg {
1175                    new: Vec::new(),
1176                    old: old_blocks,
1177                });
1178        }
1179
1180        self.invalidate_cached_overlay();
1181        self.invalidate_cached_prestate();
1182
1183        // Anchor the canonical head at the rolled-back block so RPC
1184        // queries like eth_blockNumber return the correct value.
1185        self.in_memory_state.set_canonical_head(header.clone());
1186
1187        // Reset the block producer's counter so the next digestMessage
1188        // extends from the new head.
1189        self.head_block_num
1190            .store(target_block_number, Ordering::SeqCst);
1191
1192        // Also remove persisted blocks above target from disk. The worker
1193        // thread runs this serially with flushes to avoid races.
1194        if let Some(rx) = crate::launcher::start_unwind(target_block_number) {
1195            match rx.recv() {
1196                Ok(Ok(())) => {}
1197                Ok(Err(e)) => {
1198                    return Err(BlockProducerError::Storage(format!(
1199                        "unwind above {target_block_number}: {e}"
1200                    )));
1201                }
1202                Err(e) => {
1203                    return Err(BlockProducerError::Storage(format!(
1204                        "unwind channel closed: {e}"
1205                    )));
1206                }
1207            }
1208        }
1209
1210        // Invalidate any trie-input carrying the now-removed blocks.
1211        *self.accumulated_trie_input.lock() = Arc::new(TrieInputSorted::default());
1212
1213        info!(
1214            target: "block_producer",
1215            target = target_block_number,
1216            hash = %header.hash(),
1217            old_count = current - target_block_number,
1218            "reset head"
1219        );
1220        Ok(())
1221    }
1222
1223    fn set_finality(
1224        &self,
1225        safe: Option<alloy_primitives::B256>,
1226        finalized: Option<alloy_primitives::B256>,
1227        validated: Option<alloy_primitives::B256>,
1228    ) -> Result<(), BlockProducerError> {
1229        let mut f = self.finality.lock();
1230        if safe.is_some() {
1231            f.safe = safe;
1232        }
1233        if finalized.is_some() {
1234            f.finalized = finalized;
1235        }
1236        if validated.is_some() {
1237            f.validated = validated;
1238        }
1239        drop(f);
1240
1241        // Propagate to reth's canonical in-memory state so
1242        // eth_getBlockByNumber("safe" | "finalized") returns the
1243        // correct header.
1244        if let Some(h) = safe {
1245            if let Ok(Some(sealed)) = self.provider.sealed_header_by_hash(h) {
1246                self.in_memory_state.set_safe(sealed);
1247            }
1248        }
1249        if let Some(h) = finalized {
1250            if let Ok(Some(sealed)) = self.provider.sealed_header_by_hash(h) {
1251                self.in_memory_state.set_finalized(sealed);
1252            }
1253        }
1254        // `validated` is Arbitrum-specific — reth's canonical state
1255        // exposes only safe/finalized. Push to the external watcher
1256        // so `arb_getValidatedBlock` RPC returns the latest value.
1257        if let Some(h) = validated {
1258            if let Some(w) = self.validated_watcher.lock().as_ref() {
1259                *w.write() = h;
1260            }
1261        }
1262        Ok(())
1263    }
1264
1265    fn attach_validated_watcher(&self, watcher: Arc<parking_lot::RwLock<alloy_primitives::B256>>) {
1266        *self.validated_watcher.lock() = Some(watcher);
1267    }
1268}
1269
1270// ---------------------------------------------------------------------------
1271// Helper functions
1272// ---------------------------------------------------------------------------
1273
1274/// Create an internal transaction (type 0x6A).
1275fn create_internal_tx(chain_id: u64, data: &[u8]) -> ArbTransactionSigned {
1276    use arb_primitives::signed_tx::ArbTypedTransaction;
1277    let tx = ArbTypedTransaction::Internal(ArbInternalTx {
1278        chain_id: U256::from(chain_id),
1279        data: Bytes::copy_from_slice(data),
1280    });
1281    let sig = alloy_primitives::Signature::new(U256::ZERO, U256::ZERO, false);
1282    ArbTransactionSigned::new_unhashed(tx, sig)
1283}
1284
1285/// Execute and commit an internal transaction via the block executor.
1286fn execute_and_commit_tx<E>(
1287    executor: &mut E,
1288    tx: &ArbTransactionSigned,
1289    label: &str,
1290) -> Result<(), BlockProducerError>
1291where
1292    E: BlockExecutor<Transaction = ArbTransactionSigned>,
1293{
1294    let recovered = tx
1295        .clone()
1296        .try_into_recovered()
1297        .map_err(|e| BlockProducerError::Execution(format!("{label} recovery: {e}")))?;
1298
1299    let result = executor
1300        .execute_transaction_without_commit(recovered)
1301        .map_err(|e| BlockProducerError::Execution(format!("{label} execution: {e}")))?;
1302
1303    executor
1304        .commit_transaction(result)
1305        .map_err(|e| BlockProducerError::Execution(format!("{label} commit: {e}")))?;
1306
1307    Ok(())
1308}
1309
1310/// Construct a mix_hash from send_count, l1_block_number, and arbos_version.
1311fn compute_mix_hash(send_count: u64, l1_block_number: u64, arbos_version: u64) -> B256 {
1312    let mut bytes = [0u8; 32];
1313    bytes[0..8].copy_from_slice(&send_count.to_be_bytes());
1314    bytes[8..16].copy_from_slice(&l1_block_number.to_be_bytes());
1315    bytes[16..24].copy_from_slice(&arbos_version.to_be_bytes());
1316    B256::from(bytes)
1317}
1318
1319/// EIP-161: mark empty non-zombie accounts for trie deletion.
1320fn delete_empty_accounts(
1321    bundle: &mut BundleState,
1322    zombie_accounts: &rustc_hash::FxHashSet<Address>,
1323    state_provider: &dyn StateProvider,
1324) {
1325    let keccak_empty = alloy_primitives::B256::from(alloy_primitives::keccak256([]));
1326    let mut to_remove = Vec::new();
1327    for (addr, account) in bundle.state.iter_mut() {
1328        if let Some(ref info) = account.info {
1329            let is_empty =
1330                info.nonce == 0 && info.balance.is_zero() && info.code_hash == keccak_empty;
1331            if is_empty && !zombie_accounts.contains(addr) {
1332                let existed_before = state_provider.basic_account(addr).ok().flatten().is_some();
1333                if existed_before {
1334                    debug!(
1335                        target: "block_producer",
1336                        addr = ?addr,
1337                        "EIP-161: deleting empty account from state"
1338                    );
1339                    account.info = None;
1340                } else {
1341                    to_remove.push(*addr);
1342                }
1343            }
1344        }
1345    }
1346    for addr in to_remove {
1347        bundle.state.remove(&addr);
1348    }
1349}
1350
1351/// Remove unchanged storage slots from the bundle.
1352fn filter_unchanged_storage(bundle: &mut BundleState) {
1353    for (_addr, account) in bundle.state.iter_mut() {
1354        account
1355            .storage
1356            .retain(|_key, slot| slot.present_value != slot.previous_or_original_value);
1357    }
1358}
1359
1360/// Derive ArbHeaderInfo from post-execution state.
1361fn derive_header_info_from_state(
1362    state_provider: &dyn StateProvider,
1363    bundle_state: &BundleState,
1364) -> Option<ArbHeaderInfo> {
1365    let read_slot = |addr: Address, slot: B256| -> Option<U256> {
1366        if let Some(account) = bundle_state.state.get(&addr) {
1367            let slot_u256 = U256::from_be_bytes(slot.0);
1368            if let Some(storage_slot) = account.storage.get(&slot_u256) {
1369                return Some(storage_slot.present_value);
1370            }
1371        }
1372        state_provider.storage(addr, slot).ok().flatten()
1373    };
1374
1375    derive_arb_header_info(&read_slot)
1376}
1377
1378/// Augment the bundle with direct cache modifications not captured by EVM transitions.
1379fn augment_bundle_from_cache(
1380    bundle: &mut BundleState,
1381    cache: &revm_database::CacheState,
1382    state_provider: &dyn StateProvider,
1383) {
1384    use revm_database::states::plain_account::StorageSlot;
1385
1386    for (addr, cache_acct) in &cache.accounts {
1387        let current_info = cache_acct.account.as_ref().map(|a| a.info.clone());
1388        let current_storage = cache_acct
1389            .account
1390            .as_ref()
1391            .map(|a| &a.storage)
1392            .cloned()
1393            .unwrap_or_default();
1394
1395        if let Some(bundle_acct) = bundle.state.get_mut(addr) {
1396            // Update existing bundle entry from cache.
1397            bundle_acct.info = current_info;
1398
1399            for (key, value) in &current_storage {
1400                if let Some(slot) = bundle_acct.storage.get_mut(key) {
1401                    slot.present_value = *value;
1402                } else {
1403                    // Slot written via direct cache modification.
1404                    let original_value = state_provider
1405                        .storage(*addr, B256::from(*key))
1406                        .ok()
1407                        .flatten()
1408                        .unwrap_or(U256::ZERO);
1409                    if *value != original_value {
1410                        bundle_acct.storage.insert(
1411                            *key,
1412                            StorageSlot {
1413                                previous_or_original_value: original_value,
1414                                present_value: *value,
1415                            },
1416                        );
1417                    }
1418                }
1419            }
1420        } else {
1421            // Account not in bundle — check if modified from original.
1422            let original = state_provider.basic_account(addr).ok().flatten();
1423
1424            let info_changed = match (&original, &current_info) {
1425                (None, None) => false,
1426                (Some(_), None) | (None, Some(_)) => true,
1427                (Some(orig), Some(curr)) => {
1428                    orig.balance != curr.balance
1429                        || orig.nonce != curr.nonce
1430                        || orig
1431                            .bytecode_hash
1432                            .unwrap_or(alloy_primitives::KECCAK256_EMPTY)
1433                            != curr.code_hash
1434                }
1435            };
1436
1437            let storage_changes: alloy_primitives::map::HashMap<U256, StorageSlot> =
1438                current_storage
1439                    .iter()
1440                    .filter_map(|(key, value)| {
1441                        let original_value = state_provider
1442                            .storage(*addr, B256::from(*key))
1443                            .ok()
1444                            .flatten()
1445                            .unwrap_or(U256::ZERO);
1446                        if original_value != *value {
1447                            Some((
1448                                *key,
1449                                StorageSlot {
1450                                    previous_or_original_value: original_value,
1451                                    present_value: *value,
1452                                },
1453                            ))
1454                        } else {
1455                            None
1456                        }
1457                    })
1458                    .collect();
1459
1460            if info_changed || !storage_changes.is_empty() {
1461                let original_info = original.as_ref().map(|a| revm::state::AccountInfo {
1462                    balance: a.balance,
1463                    nonce: a.nonce,
1464                    code_hash: a.bytecode_hash.unwrap_or(alloy_primitives::KECCAK256_EMPTY),
1465                    code: None,
1466                    account_id: None,
1467                });
1468
1469                let status = if original.is_some() {
1470                    revm_database::AccountStatus::Changed
1471                } else {
1472                    revm_database::AccountStatus::InMemoryChange
1473                };
1474
1475                bundle.state.insert(
1476                    *addr,
1477                    revm_database::BundleAccount {
1478                        info: current_info,
1479                        original_info,
1480                        storage: storage_changes,
1481                        status,
1482                    },
1483                );
1484            }
1485        }
1486    }
1487}