arb_node/
producer.rs

1//! Block producer implementation.
2//!
3//! Produces blocks from L1 incoming messages by parsing transactions,
4//! executing them against the current state, and persisting the results.
5
6use std::sync::Arc;
7
8use alloy_consensus::{
9    proofs, transaction::SignerRecoverable, Block, BlockBody, BlockHeader, Header, TxReceipt,
10    EMPTY_OMMER_ROOT_HASH,
11};
12use alloy_eips::eip2718::Decodable2718;
13use alloy_evm::{
14    block::{BlockExecutor, BlockExecutorFactory},
15    EvmFactory,
16};
17use alloy_primitives::{Address, Bytes, B256, B64, U256};
18use alloy_rpc_types_eth::BlockNumberOrTag;
19use parking_lot::Mutex;
20use reth_chainspec::ChainSpec;
21use reth_evm::ConfigureEvm;
22use reth_primitives_traits::{logs_bloom, SealedHeader};
23use reth_provider::{BlockNumReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory};
24use reth_revm::database::StateProviderDatabase;
25use reth_storage_api::StateProvider;
26use reth_trie_common::{updates::TrieUpdates, HashedPostState};
27use revm::database::{BundleState, StateBuilder};
28use revm_database::states::bundle_state::BundleRetention;
29use tracing::{debug, info, warn};
30
31use arb_evm::config::{arbos_version_from_mix_hash, ArbEvmConfig};
32use arb_primitives::{signed_tx::ArbTransactionSigned, tx_types::ArbInternalTx};
33use arb_rpc::block_producer::{
34    BlockProducer, BlockProducerError, BlockProductionInput, ProducedBlock,
35};
36use arbos::{
37    arbos_types::parse_init_message,
38    header::{derive_arb_header_info, ArbHeaderInfo},
39    internal_tx,
40    parse_l2::{parse_l2_transactions, parsed_tx_to_signed, ParsedTransaction},
41};
42
43use crate::genesis;
44
45/// Type-erased block persister.
46///
47/// Wraps the concrete persistence operations so the block producer
48/// does not need to carry `DatabaseProviderFactory` and `CanonChainTracker`
49/// trait bounds, which cannot be threaded through reth's node builder
50/// without modifying upstream traits.
51pub(crate) struct ErasedPersister {
52    /// Persist a sealed block with execution output to the database.
53    persist_fn: Box<
54        dyn Fn(
55                &reth_primitives_traits::SealedBlock<Block<ArbTransactionSigned>>,
56                Vec<arb_primitives::ArbReceipt>,
57                BundleState,
58                HashedPostState,
59                TrieUpdates,
60            ) -> Result<(), BlockProducerError>
61            + Send
62            + Sync,
63    >,
64}
65
66impl ErasedPersister {
67    fn persist(
68        &self,
69        sealed: &reth_primitives_traits::SealedBlock<Block<ArbTransactionSigned>>,
70        receipts: Vec<arb_primitives::ArbReceipt>,
71        bundle_state: BundleState,
72        hashed_state: HashedPostState,
73        trie_updates: TrieUpdates,
74    ) -> Result<(), BlockProducerError> {
75        (self.persist_fn)(sealed, receipts, bundle_state, hashed_state, trie_updates)
76    }
77}
78
79/// Concrete block producer backed by reth's database.
80pub struct ArbBlockProducer<Provider> {
81    provider: Provider,
82    chain_spec: Arc<ChainSpec>,
83    evm_config: ArbEvmConfig,
84    persister: ErasedPersister,
85    /// Mutex to serialize block production.
86    produce_lock: Mutex<()>,
87    /// Cached Init message params, applied during the first block's execution.
88    cached_init: Mutex<Option<arbos::arbos_types::ParsedInitMessage>>,
89}
90
91impl<Provider> ArbBlockProducer<Provider> {
92    /// Create a new block producer.
93    ///
94    /// The `persist_fn` closure handles writing blocks and state to the database.
95    /// It captures the concrete provider type so the producer itself
96    /// does not need `DatabaseProviderFactory` bounds.
97    pub fn new(
98        provider: Provider,
99        chain_spec: Arc<ChainSpec>,
100        evm_config: ArbEvmConfig,
101        persist_fn: impl Fn(
102                &reth_primitives_traits::SealedBlock<Block<ArbTransactionSigned>>,
103                Vec<arb_primitives::ArbReceipt>,
104                BundleState,
105                HashedPostState,
106                TrieUpdates,
107            ) -> Result<(), BlockProducerError>
108            + Send
109            + Sync
110            + 'static,
111    ) -> Self {
112        Self {
113            provider,
114            chain_spec,
115            evm_config,
116            persister: ErasedPersister {
117                persist_fn: Box::new(persist_fn),
118            },
119            produce_lock: Mutex::new(()),
120            cached_init: Mutex::new(None),
121        }
122    }
123}
124
125impl<Provider> ArbBlockProducer<Provider>
126where
127    Provider: BlockNumReader
128        + BlockReaderIdExt
129        + HeaderProvider<Header = Header>
130        + StateProviderFactory
131        + Send
132        + Sync
133        + 'static,
134{
135    /// Get the current head block number.
136    fn head_block_number(&self) -> Result<u64, BlockProducerError> {
137        self.provider
138            .last_block_number()
139            .map_err(|e| BlockProducerError::StateAccess(e.to_string()))
140    }
141
142    /// Get the parent sealed header for block production.
143    fn parent_header(&self, head_num: u64) -> Result<SealedHeader<Header>, BlockProducerError> {
144        self.provider
145            .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(head_num))
146            .map_err(|e| BlockProducerError::StateAccess(e.to_string()))?
147            .ok_or_else(|| {
148                BlockProducerError::StateAccess(format!("Parent block {head_num} not found"))
149            })
150    }
151
152    /// Produce a block with full transaction execution.
153    fn produce_block_with_execution(
154        &self,
155        input: &BlockProductionInput,
156        parsed_txs: Vec<ParsedTransaction>,
157    ) -> Result<ProducedBlock, BlockProducerError> {
158        let head_num = self.head_block_number()?;
159        let parent_header = self.parent_header(head_num)?;
160        let l2_block_number = head_num + 1;
161
162        let timestamp = input.l1_timestamp.max(parent_header.timestamp());
163        let time_passed = timestamp.saturating_sub(parent_header.timestamp());
164
165        let parent_mix_hash = parent_header.mix_hash().unwrap_or_default();
166        let parent_arbos_version = arbos_version_from_mix_hash(&parent_mix_hash);
167
168        // Build the EVM environment for this block.
169        let l1_block_number = input.l1_block_number;
170        let arbos_version = parent_arbos_version; // May upgrade during StartBlock
171
172        // Construct a provisional mix_hash for the EVM environment.
173        let send_count = {
174            let mut buf = [0u8; 8];
175            buf.copy_from_slice(&parent_mix_hash.0[0..8]);
176            u64::from_be_bytes(buf)
177        };
178        let provisional_mix_hash = compute_mix_hash(send_count, l1_block_number, arbos_version);
179
180        // Open state at parent block via block hash (matches reth fork pattern).
181        let state_provider = self
182            .provider
183            .state_by_block_hash(parent_header.hash())
184            .map_err(|e| BlockProducerError::StateAccess(e.to_string()))?;
185
186        // Read the L2 baseFee from the parent's committed state.
187        // This is the value written by the parent block's StartBlock — the correct
188        // baseFee for this block's header and EVM execution.
189        let l2_base_fee = {
190            let read_slot = |addr: Address, slot: B256| -> Option<U256> {
191                state_provider.storage(addr, slot).ok().flatten()
192            };
193            arbos::header::read_l2_base_fee(&read_slot).or(parent_header.base_fee_per_gas())
194        };
195
196        // Build a provisional header for the EVM config.
197        let provisional_header = Header {
198            parent_hash: parent_header.hash(),
199            ommers_hash: EMPTY_OMMER_ROOT_HASH,
200            beneficiary: input.sender,
201            state_root: B256::ZERO, // placeholder
202            transactions_root: B256::ZERO,
203            receipts_root: B256::ZERO,
204            withdrawals_root: None,
205            logs_bloom: Default::default(),
206            timestamp,
207            mix_hash: provisional_mix_hash,
208            nonce: B64::from(input.delayed_messages_read.to_be_bytes()),
209            base_fee_per_gas: l2_base_fee,
210            number: l2_block_number,
211            gas_limit: parent_header.gas_limit(),
212            difficulty: U256::from(1),
213            gas_used: 0,
214            extra_data: Default::default(),
215            parent_beacon_block_root: None,
216            blob_gas_used: None,
217            excess_blob_gas: None,
218            requests_hash: None,
219        };
220
221        let evm_env = self
222            .evm_config
223            .evm_env(&provisional_header)
224            .map_err(|_| BlockProducerError::Execution("evm_env construction failed".into()))?;
225
226        // without_state_clear() disables EIP-161 empty account pruning.
227        // Arbitrum needs this for zombie accounts (e.g. retryable escrow
228        // accounts that are created and destroyed within a single block).
229        let mut db = StateBuilder::new()
230            .with_database(StateProviderDatabase::new(state_provider.as_ref()))
231            .with_bundle_update()
232            .without_state_clear()
233            .build();
234
235        let chain_id = self.chain_spec.chain().id();
236
237        // If Init params were cached, apply ArbOS initialization now.
238        // This makes the Init state changes part of block 1's delta so the
239        // state root correctly includes both Init and execution changes.
240        // Skip if genesis alloc already includes ArbOS state.
241        if let Some(init_msg) = self.cached_init.lock().take() {
242            if !genesis::is_arbos_initialized(&mut db) {
243                info!(
244                    target: "block_producer",
245                    "Applying cached ArbOS Init during block {} execution",
246                    l2_block_number
247                );
248                genesis::initialize_arbos_state(
249                    &mut db,
250                    &init_msg,
251                    chain_id,
252                    genesis::INITIAL_ARBOS_VERSION,
253                    genesis::DEFAULT_CHAIN_OWNER,
254                )
255                .map_err(BlockProducerError::Execution)?;
256            } else {
257                debug!(
258                    target: "block_producer",
259                    "ArbOS already initialized in genesis alloc, skipping Init"
260                );
261            }
262        }
263
264        // Build execution context: extra_data carries send_root + delayed_messages_read.
265        let parent_extra = parent_header.extra_data().to_vec();
266        let mut exec_extra = parent_extra.clone();
267        // Append delayed_messages_read as bytes 32..39.
268        exec_extra.resize(32, 0);
269        exec_extra.extend_from_slice(&input.delayed_messages_read.to_be_bytes());
270
271        let exec_ctx = alloy_evm::eth::EthBlockExecutionCtx {
272            tx_count_hint: Some(parsed_txs.len() + 2), // +2 for internal txs
273            parent_hash: parent_header.hash(),
274            parent_beacon_block_root: None,
275            ommers: &[],
276            withdrawals: None,
277            extra_data: exec_extra.into(),
278        };
279
280        // Create the block executor via the factory.
281        let evm = self
282            .evm_config
283            .block_executor_factory()
284            .evm_factory()
285            .create_evm(&mut db, evm_env.clone());
286        let mut executor = self
287            .evm_config
288            .block_executor_factory()
289            .create_arb_executor(evm, exec_ctx, chain_id);
290        executor.arb_ctx.l2_block_number = l2_block_number;
291        executor.arb_ctx.l1_block_number = l1_block_number;
292
293        // Add the parent hash to the L2 block hash cache for arbBlockHash().
294        // The cache persists across blocks (static HashMap), so we only need
295        // to add one new entry per block. Old entries remain from previous blocks.
296        // First block populates the full 256 entries; subsequent blocks add 1.
297        {
298            let parent_num = l2_block_number.saturating_sub(1);
299            arb_precompiles::set_l2_block_hash(parent_num, parent_header.hash());
300
301            // If cache is mostly empty (first block or after restart), do a full populate.
302            if arb_precompiles::get_l2_block_hash(parent_num.saturating_sub(1)).is_none()
303                && parent_num > 1
304            {
305                let mut hash = parent_header.parent_hash();
306                for i in 2..=256u64 {
307                    let n = l2_block_number.checked_sub(i);
308                    if let Some(n) = n {
309                        arb_precompiles::set_l2_block_hash(n, hash);
310                        match self
311                            .provider
312                            .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(n))
313                        {
314                            Ok(Some(h)) => hash = h.parent_hash(),
315                            _ => break,
316                        }
317                    }
318                }
319            }
320        }
321
322        // Apply pre-execution changes (loads ArbOS state, fee accounts, block hashes).
323        executor
324            .apply_pre_execution_changes()
325            .map_err(|e| BlockProducerError::Execution(format!("pre-exec: {e}")))?;
326
327        let mut all_txs: Vec<ArbTransactionSigned> = Vec::new();
328
329        // 1. Generate and execute the StartBlock internal tx (always first).
330        let l1_base_fee = input.l1_base_fee.unwrap_or(U256::ZERO);
331        let start_block_data = internal_tx::encode_start_block(
332            l1_base_fee,
333            l1_block_number,
334            l2_block_number,
335            time_passed,
336        );
337
338        let start_block_tx = create_internal_tx(chain_id, &start_block_data);
339        execute_and_commit_tx(&mut executor, &start_block_tx, "StartBlock")?;
340        all_txs.push(start_block_tx);
341
342        // 2. Execute parsed user transactions.
343        for parsed in &parsed_txs {
344            match parsed {
345                ParsedTransaction::InternalStartBlock { .. } => {
346                    // StartBlock is handled above, skip.
347                    continue;
348                }
349                ParsedTransaction::BatchPostingReport {
350                    batch_timestamp,
351                    batch_poster,
352                    batch_number,
353                    l1_base_fee_estimate,
354                    extra_gas,
355                    ..
356                } => {
357                    // Delayed message kind=13 contains a batch posting report.
358                    // Encode as V1 or V2 based on parent ArbOS version.
359                    let report_data =
360                        if parent_arbos_version >= arb_chainspec::arbos_version::ARBOS_VERSION_50 {
361                            // V2: pass raw batch data stats + extra_gas.
362                            let (length, non_zeros) = input.batch_data_stats.unwrap_or((0, 0));
363                            internal_tx::encode_batch_posting_report_v2(
364                                *batch_timestamp,
365                                *batch_poster,
366                                *batch_number,
367                                length,
368                                non_zeros,
369                                *extra_gas,
370                                *l1_base_fee_estimate,
371                            )
372                        } else {
373                            // V1: combine legacy gas cost + extra_gas into single field.
374                            let legacy_gas = input.batch_gas_cost.unwrap_or(0);
375                            let batch_data_gas = legacy_gas.saturating_add(*extra_gas);
376                            internal_tx::encode_batch_posting_report(
377                                *batch_timestamp,
378                                *batch_poster,
379                                *batch_number,
380                                batch_data_gas,
381                                *l1_base_fee_estimate,
382                            )
383                        };
384                    let report_tx = create_internal_tx(chain_id, &report_data);
385                    execute_and_commit_tx(&mut executor, &report_tx, "BatchPostingReport")?;
386                    all_txs.push(report_tx);
387                    continue;
388                }
389                _ => {}
390            }
391
392            let signed_tx = match parsed_tx_to_signed(parsed, chain_id) {
393                Some(tx) => tx,
394                None => {
395                    debug!(
396                        target: "block_producer",
397                        ?parsed,
398                        "Skipping unparseable transaction"
399                    );
400                    continue;
401                }
402            };
403
404            // Recover the signer to get a Recovered<ArbTransactionSigned>.
405            let recovered = match signed_tx.clone().try_into_recovered() {
406                Ok(r) => r,
407                Err(e) => {
408                    warn!(
409                        target: "block_producer",
410                        error = %e,
411                        "Failed to recover tx sender, skipping"
412                    );
413                    continue;
414                }
415            };
416
417            match executor.execute_transaction_without_commit(recovered) {
418                Ok(result) => {
419                    match executor.commit_transaction(result) {
420                        Ok(_gas_used) => {
421                            all_txs.push(signed_tx);
422
423                            // Drain and execute any scheduled txs (auto-redeems).
424                            // After a SubmitRetryable or manual Redeem precompile call,
425                            // the executor queues retry txs that must execute in the
426                            // same block, immediately after the triggering tx.
427                            loop {
428                                let scheduled = executor.drain_scheduled_txs();
429                                debug!(
430                                    target: "block_producer",
431                                    count = scheduled.len(),
432                                    "Drained scheduled txs"
433                                );
434                                if scheduled.is_empty() {
435                                    break;
436                                }
437                                for encoded in scheduled {
438                                    let retry_tx: Option<ArbTransactionSigned> =
439                                        ArbTransactionSigned::decode_2718(&mut &encoded[..]).ok();
440                                    if let Some(retry_tx) = retry_tx {
441                                        let retry_signed = retry_tx.clone();
442                                        match retry_tx.try_into_recovered() {
443                                            Ok(recovered_retry) => {
444                                                match executor.execute_transaction_without_commit(
445                                                    recovered_retry,
446                                                ) {
447                                                    Ok(retry_result) => {
448                                                        match executor
449                                                            .commit_transaction(retry_result)
450                                                        {
451                                                            Ok(_) => {
452                                                                all_txs.push(retry_signed);
453                                                            }
454                                                            Err(e) => {
455                                                                warn!(
456                                                                    target: "block_producer",
457                                                                    error = %e,
458                                                                    "Failed to commit auto-redeem tx"
459                                                                );
460                                                            }
461                                                        }
462                                                    }
463                                                    Err(e) => {
464                                                        warn!(
465                                                            target: "block_producer",
466                                                            error = %e,
467                                                            "Auto-redeem tx execution failed"
468                                                        );
469                                                    }
470                                                }
471                                            }
472                                            Err(e) => {
473                                                warn!(
474                                                    target: "block_producer",
475                                                    error = %e,
476                                                    "Failed to recover auto-redeem tx sender"
477                                                );
478                                            }
479                                        }
480                                    }
481                                }
482                            }
483                        }
484                        Err(e) => {
485                            warn!(
486                                target: "block_producer",
487                                error = %e,
488                                "Failed to commit transaction"
489                            );
490                        }
491                    }
492                }
493                Err(ref e) if e.to_string().contains("block gas limit reached") => {
494                    debug!(
495                        target: "block_producer",
496                        "Block gas limit reached, stopping execution"
497                    );
498                    break;
499                }
500                Err(e) => {
501                    warn!(
502                        target: "block_producer",
503                        error = %e,
504                        "Transaction execution failed, skipping"
505                    );
506                }
507            }
508        }
509
510        // Extract zombie accounts before finish() consumes the executor.
511        // Zombie accounts are empty accounts preserved by pre-Stylus ArbOS
512        // (CreateZombieIfDeleted) and must NOT be deleted during EIP-161 cleanup.
513        let zombie_accounts = executor.zombie_accounts().clone();
514        let finalise_deleted = executor.finalise_deleted().clone();
515
516        // Finalize execution: finish() consumes the executor and returns
517        // the EVM and BlockExecutionResult containing receipts.
518        let (_, exec_result) = executor
519            .finish()
520            .map_err(|e| BlockProducerError::Execution(format!("finish: {e}")))?;
521
522        let receipts: Vec<arb_primitives::ArbReceipt> = exec_result.receipts;
523
524        // After executor is dropped, we can access the db again.
525        db.merge_transitions(BundleRetention::Reverts);
526        let mut bundle = db.take_bundle();
527
528        // Augment bundle with direct cache modifications (bypass txs,
529        // post-commit hooks) that didn't go through revm's commit.
530        augment_bundle_from_cache(&mut bundle, &db.cache, &*state_provider);
531
532        // Mark per-tx finalise deletions in the bundle.
533        // Accounts deleted by per-tx EIP-161 cleanup are kept in the cache
534        // as Destroyed (account=None) so augment_bundle_from_cache handles
535        // them. This loop serves as a safety net and handles zombie checks.
536        //
537        // Skip accounts that were later re-created as zombies — those are
538        // valid empty accounts that must persist in the trie.
539        let keccak_empty_hash = alloy_primitives::B256::from(alloy_primitives::keccak256([]));
540        for addr in &finalise_deleted {
541            // Zombie accounts were re-created after Finalise deleted them.
542            // They're back in cache and handled by augment_bundle_from_cache.
543            if zombie_accounts.contains(addr) {
544                continue;
545            }
546            if bundle.state.contains_key(addr) {
547                // Account is in bundle from EVM transitions. Check whether
548                // it existed in the trie before this block.
549                let existed_before = state_provider.basic_account(addr).ok().flatten().is_some();
550                if existed_before {
551                    // Account was in the trie. Only mark as deleted if it's
552                    // still empty — it may have been re-created with non-zero
553                    // state (e.g., nonce=1) by a later tx in this block.
554                    let still_empty = bundle
555                        .state
556                        .get(addr)
557                        .and_then(|a| a.info.as_ref())
558                        .is_none_or(|info| {
559                            info.nonce == 0
560                                && info.balance.is_zero()
561                                && info.code_hash == keccak_empty_hash
562                        });
563                    if still_empty {
564                        if let Some(bundle_acct) = bundle.state.get_mut(addr) {
565                            bundle_acct.info = None;
566                        }
567                    }
568                } else {
569                    // Account was created within this block. It may have been
570                    // emptied and then re-created (e.g., sender emptied after
571                    // SubmitRetryable, then nonce incremented during RetryTx).
572                    // Only remove if the account is still empty.
573                    let still_empty = bundle
574                        .state
575                        .get(addr)
576                        .and_then(|a| a.info.as_ref())
577                        .is_none_or(|info| {
578                            info.nonce == 0
579                                && info.balance.is_zero()
580                                && info.code_hash == keccak_empty_hash
581                        });
582                    if still_empty {
583                        bundle.state.remove(addr);
584                    }
585                }
586                continue;
587            }
588            if let Ok(Some(acct)) = state_provider.basic_account(addr) {
589                let was_originally_empty = acct.balance.is_zero()
590                    && acct.nonce == 0
591                    && acct.bytecode_hash.is_none_or(|h| h == keccak_empty_hash);
592                if was_originally_empty {
593                    continue;
594                }
595                bundle.state.insert(
596                    *addr,
597                    revm_database::BundleAccount {
598                        info: None, // signals trie deletion
599                        original_info: None,
600                        storage: Default::default(),
601                        status: revm_database::AccountStatus::Changed,
602                    },
603                );
604            }
605        }
606
607        // Filter bundle to only include actually changed storage slots.
608        // revm's bundle may include storage slots that were loaded (read) but
609        // not modified. Including unchanged slots in the HashedPostState would
610        // produce an incorrect state root.
611        filter_unchanged_storage(&mut bundle);
612
613        // Delete empty accounts from the bundle (EIP-161).
614        // Zombie accounts are preserved.
615        delete_empty_accounts(&mut bundle, &zombie_accounts, &*state_provider);
616
617        let hashed_state =
618            HashedPostState::from_bundle_state::<reth_trie_common::KeccakKeyHasher>(bundle.state());
619
620        let (state_root, trie_updates) = state_provider
621            .state_root_with_updates(hashed_state.clone())
622            .map_err(|e| BlockProducerError::Execution(format!("state root: {e}")))?;
623
624        debug!(
625            target: "block_producer",
626            changed_accounts = hashed_state.accounts.len(),
627            changed_storages = hashed_state.storages.len(),
628            total_storage_slots = hashed_state.storages.values().map(|s| s.storage.len()).sum::<usize>(),
629            ?state_root,
630            "HashedPostState from bundle"
631        );
632
633        // Derive header info (send_root, send_count, etc.) from post-execution state.
634        let arb_info = derive_header_info_from_state(state_provider.as_ref(), &bundle);
635
636        let final_mix_hash = arb_info
637            .as_ref()
638            .map(|info| info.compute_mix_hash())
639            .unwrap_or(provisional_mix_hash);
640
641        let extra_data: Bytes = arb_info
642            .as_ref()
643            .map(|info| {
644                let mut data = info.send_root.to_vec();
645                data.resize(32, 0);
646                data.into()
647            })
648            .unwrap_or_else(|| {
649                let mut data = parent_extra.clone();
650                data.resize(32, 0);
651                data.into()
652            });
653
654        let send_root = arb_info
655            .as_ref()
656            .map(|info| info.send_root)
657            .unwrap_or_else(|| {
658                if parent_extra.len() >= 32 {
659                    B256::from_slice(&parent_extra[..32])
660                } else {
661                    B256::ZERO
662                }
663            });
664
665        // Compute receipt-derived fields.
666        let gas_used = exec_result.gas_used;
667        let logs_bloom_val = logs_bloom(receipts.iter().flat_map(|r| r.logs()));
668
669        let transactions_root =
670            proofs::calculate_transaction_root::<ArbTransactionSigned>(&all_txs);
671        let receipts_root = proofs::calculate_receipt_root(
672            &receipts
673                .iter()
674                .map(|r| r.with_bloom_ref())
675                .collect::<Vec<_>>(),
676        );
677
678        let header = Header {
679            parent_hash: parent_header.hash(),
680            ommers_hash: EMPTY_OMMER_ROOT_HASH,
681            beneficiary: input.sender,
682            state_root,
683            transactions_root,
684            receipts_root,
685            withdrawals_root: None,
686            logs_bloom: logs_bloom_val,
687            timestamp,
688            mix_hash: final_mix_hash,
689            nonce: B64::from(input.delayed_messages_read.to_be_bytes()),
690            base_fee_per_gas: l2_base_fee,
691            number: l2_block_number,
692            gas_limit: parent_header.gas_limit(),
693            difficulty: U256::from(1),
694            gas_used,
695            extra_data,
696            parent_beacon_block_root: None,
697            blob_gas_used: None,
698            excess_blob_gas: None,
699            requests_hash: None,
700        };
701
702        let block = Block::<ArbTransactionSigned> {
703            header,
704            body: BlockBody {
705                transactions: all_txs,
706                ommers: Default::default(),
707                withdrawals: None,
708            },
709        };
710
711        let sealed = reth_primitives_traits::SealedBlock::seal_slow(block);
712        let block_hash = sealed.hash();
713
714        // Persist block, receipts, state changes, hashed state, and trie updates.
715        self.persister
716            .persist(&sealed, receipts, bundle, hashed_state, trie_updates)?;
717
718        info!(
719            target: "block_producer",
720            block_num = l2_block_number,
721            ?block_hash,
722            ?send_root,
723            ?state_root,
724            num_txs = sealed.body().transactions.len(),
725            gas_used,
726            "Produced block"
727        );
728
729        Ok(ProducedBlock {
730            block_hash,
731            send_root,
732        })
733    }
734
735    /// Produce a minimal block for messages with no transactions.
736    #[allow(dead_code)]
737    fn produce_empty_block(
738        &self,
739        input: &BlockProductionInput,
740    ) -> Result<ProducedBlock, BlockProducerError> {
741        // Even empty blocks need to execute the StartBlock internal tx
742        // so that ArbOS state updates (pricing, retryable reaping) happen.
743        self.produce_block_with_execution(input, vec![])
744    }
745}
746
747#[async_trait::async_trait]
748impl<Provider> BlockProducer for ArbBlockProducer<Provider>
749where
750    Provider: BlockNumReader
751        + BlockReaderIdExt
752        + HeaderProvider<Header = Header>
753        + StateProviderFactory
754        + Send
755        + Sync
756        + 'static,
757{
758    fn cache_init_message(&self, l2_msg: &[u8]) -> Result<(), BlockProducerError> {
759        let init_msg = parse_init_message(l2_msg)
760            .map_err(|e| BlockProducerError::Parse(format!("init message: {e}")))?;
761
762        info!(
763            target: "block_producer",
764            chain_id = %init_msg.chain_id,
765            initial_l1_base_fee = %init_msg.initial_l1_base_fee,
766            "Cached Init message params"
767        );
768
769        *self.cached_init.lock() = Some(init_msg);
770        Ok(())
771    }
772
773    async fn produce_block(
774        &self,
775        msg_idx: u64,
776        input: BlockProductionInput,
777    ) -> Result<ProducedBlock, BlockProducerError> {
778        let _lock = self.produce_lock.lock();
779
780        // Validate that this message is the next expected one.
781        let head_num = self.head_block_number()?;
782        let expected_block = head_num + 1;
783        let actual_block = msg_idx;
784
785        if expected_block != actual_block {
786            return Err(BlockProducerError::Unexpected(format!(
787                "Expected block {expected_block} but got msg_idx {msg_idx} (block {actual_block})"
788            )));
789        }
790
791        // Parse L2 transactions from the message.
792        let chain_id = self.chain_spec.chain().id();
793        let parsed_txs = parse_l2_transactions(
794            input.kind,
795            input.sender,
796            &input.l2_msg,
797            input.request_id,
798            input.l1_base_fee,
799            chain_id,
800        )
801        .unwrap_or_else(|e| {
802            // If ParseL2Transactions returns an error, treat as empty.
803            warn!(target: "block_producer", error=%e, "Error parsing L2 message, treating as empty");
804            vec![]
805        });
806
807        debug!(
808            target: "block_producer",
809            msg_idx,
810            kind = input.kind,
811            num_txs = parsed_txs.len(),
812            "Parsed L1 message"
813        );
814
815        self.produce_block_with_execution(&input, parsed_txs)
816    }
817}
818
819// ---------------------------------------------------------------------------
820// Helper functions
821// ---------------------------------------------------------------------------
822
823/// Create an internal transaction (type 0x6A).
824fn create_internal_tx(chain_id: u64, data: &[u8]) -> ArbTransactionSigned {
825    use arb_primitives::signed_tx::ArbTypedTransaction;
826    let tx = ArbTypedTransaction::Internal(ArbInternalTx {
827        chain_id: U256::from(chain_id),
828        data: Bytes::copy_from_slice(data),
829    });
830    let sig = alloy_primitives::Signature::new(U256::ZERO, U256::ZERO, false);
831    ArbTransactionSigned::new_unhashed(tx, sig)
832}
833
834/// Execute and commit an internal transaction via the block executor.
835fn execute_and_commit_tx<E>(
836    executor: &mut E,
837    tx: &ArbTransactionSigned,
838    label: &str,
839) -> Result<(), BlockProducerError>
840where
841    E: BlockExecutor<Transaction = ArbTransactionSigned>,
842{
843    let recovered = tx
844        .clone()
845        .try_into_recovered()
846        .map_err(|e| BlockProducerError::Execution(format!("{label} recovery: {e}")))?;
847
848    let result = executor
849        .execute_transaction_without_commit(recovered)
850        .map_err(|e| BlockProducerError::Execution(format!("{label} execution: {e}")))?;
851
852    executor
853        .commit_transaction(result)
854        .map_err(|e| BlockProducerError::Execution(format!("{label} commit: {e}")))?;
855
856    Ok(())
857}
858
859/// Construct a mix_hash from send_count, l1_block_number, and arbos_version.
860fn compute_mix_hash(send_count: u64, l1_block_number: u64, arbos_version: u64) -> B256 {
861    let mut bytes = [0u8; 32];
862    bytes[0..8].copy_from_slice(&send_count.to_be_bytes());
863    bytes[8..16].copy_from_slice(&l1_block_number.to_be_bytes());
864    bytes[16..24].copy_from_slice(&arbos_version.to_be_bytes());
865    B256::from(bytes)
866}
867
868/// EIP-161: mark empty non-zombie accounts for trie deletion.
869///
870/// Accounts that existed in the trie before this block are marked as deleted
871/// (info=None). Accounts that were created and emptied within this block are
872/// removed from the bundle entirely (no trie operation needed).
873fn delete_empty_accounts(
874    bundle: &mut BundleState,
875    zombie_accounts: &std::collections::HashSet<Address>,
876    state_provider: &dyn StateProvider,
877) {
878    let keccak_empty = alloy_primitives::B256::from(alloy_primitives::keccak256([]));
879    let mut to_remove = Vec::new();
880    for (addr, account) in bundle.state.iter_mut() {
881        if let Some(ref info) = account.info {
882            let is_empty =
883                info.nonce == 0 && info.balance.is_zero() && info.code_hash == keccak_empty;
884            if is_empty && !zombie_accounts.contains(addr) {
885                let existed_before = state_provider.basic_account(addr).ok().flatten().is_some();
886                if existed_before {
887                    debug!(
888                        target: "block_producer",
889                        addr = ?addr,
890                        "EIP-161: deleting empty account from state"
891                    );
892                    account.info = None;
893                } else {
894                    to_remove.push(*addr);
895                }
896            }
897        }
898    }
899    for addr in to_remove {
900        bundle.state.remove(&addr);
901    }
902}
903
904/// Remove unchanged storage slots from the bundle.
905fn filter_unchanged_storage(bundle: &mut BundleState) {
906    for (_addr, account) in bundle.state.iter_mut() {
907        account
908            .storage
909            .retain(|_key, slot| slot.present_value != slot.previous_or_original_value);
910    }
911}
912
913/// Derive ArbHeaderInfo from post-execution state.
914fn derive_header_info_from_state(
915    state_provider: &dyn StateProvider,
916    bundle_state: &BundleState,
917) -> Option<ArbHeaderInfo> {
918    let read_slot = |addr: Address, slot: B256| -> Option<U256> {
919        if let Some(account) = bundle_state.state.get(&addr) {
920            let slot_u256 = U256::from_be_bytes(slot.0);
921            if let Some(storage_slot) = account.storage.get(&slot_u256) {
922                return Some(storage_slot.present_value);
923            }
924        }
925        state_provider.storage(addr, slot).ok().flatten()
926    };
927
928    derive_arb_header_info(&read_slot)
929}
930
931/// Augment the bundle with cache modifications not captured by transitions.
932/// Diffs cache against state provider and adds missing/changed entries.
933fn augment_bundle_from_cache(
934    bundle: &mut BundleState,
935    cache: &revm_database::CacheState,
936    state_provider: &dyn StateProvider,
937) {
938    use revm_database::states::plain_account::StorageSlot;
939
940    for (addr, cache_acct) in &cache.accounts {
941        let current_info = cache_acct.account.as_ref().map(|a| a.info.clone());
942        let current_storage = cache_acct
943            .account
944            .as_ref()
945            .map(|a| &a.storage)
946            .cloned()
947            .unwrap_or_default();
948
949        if let Some(bundle_acct) = bundle.state.get_mut(addr) {
950            // Account already in bundle — update info and storage from cache
951            // to capture post-commit modifications.
952            bundle_acct.info = current_info;
953
954            for (key, value) in &current_storage {
955                if let Some(slot) = bundle_acct.storage.get_mut(key) {
956                    slot.present_value = *value;
957                } else {
958                    // Storage slot written via direct cache mod, not by EVM.
959                    let original_value = state_provider
960                        .storage(*addr, B256::from(*key))
961                        .ok()
962                        .flatten()
963                        .unwrap_or(U256::ZERO);
964                    if *value != original_value {
965                        bundle_acct.storage.insert(
966                            *key,
967                            StorageSlot {
968                                previous_or_original_value: original_value,
969                                present_value: *value,
970                            },
971                        );
972                    }
973                }
974            }
975        } else {
976            // Account not in bundle — check if it was modified from original.
977            let original = state_provider.basic_account(addr).ok().flatten();
978
979            let info_changed = match (&original, &current_info) {
980                (None, None) => false,
981                (Some(_), None) | (None, Some(_)) => true,
982                (Some(orig), Some(curr)) => {
983                    orig.balance != curr.balance
984                        || orig.nonce != curr.nonce
985                        || orig
986                            .bytecode_hash
987                            .unwrap_or(alloy_primitives::KECCAK256_EMPTY)
988                            != curr.code_hash
989                }
990            };
991
992            let storage_changes: alloy_primitives::map::HashMap<U256, StorageSlot> =
993                current_storage
994                    .iter()
995                    .filter_map(|(key, value)| {
996                        let original_value = state_provider
997                            .storage(*addr, B256::from(*key))
998                            .ok()
999                            .flatten()
1000                            .unwrap_or(U256::ZERO);
1001                        if original_value != *value {
1002                            Some((
1003                                *key,
1004                                StorageSlot {
1005                                    previous_or_original_value: original_value,
1006                                    present_value: *value,
1007                                },
1008                            ))
1009                        } else {
1010                            None
1011                        }
1012                    })
1013                    .collect();
1014
1015            if info_changed || !storage_changes.is_empty() {
1016                // For OriginalValuesKnown::No persistence, original_info
1017                // is not relied upon; set to None for simplicity.
1018                let original_info = None;
1019
1020                let status = if original.is_some() {
1021                    revm_database::AccountStatus::Changed
1022                } else {
1023                    revm_database::AccountStatus::InMemoryChange
1024                };
1025
1026                bundle.state.insert(
1027                    *addr,
1028                    revm_database::BundleAccount {
1029                        info: current_info,
1030                        original_info,
1031                        storage: storage_changes,
1032                        status,
1033                    },
1034                );
1035            }
1036        }
1037    }
1038}