1use std::sync::{
7 atomic::{AtomicBool, AtomicU64, Ordering},
8 Arc,
9};
10
11use alloy_consensus::{
12 proofs,
13 transaction::{SignerRecoverable, TxHashRef},
14 Block, BlockBody, BlockHeader, Header, TxReceipt, EMPTY_OMMER_ROOT_HASH,
15};
16use alloy_eips::eip2718::Decodable2718;
17use alloy_evm::{
18 block::{BlockExecutor, BlockExecutorFactory},
19 EvmFactory,
20};
21use alloy_primitives::{Address, Bytes, B256, B64, U256};
22use alloy_rpc_types_eth::BlockNumberOrTag;
23use parking_lot::Mutex;
24use reth_chain_state::{CanonicalInMemoryState, ExecutedBlock, NewCanonicalChain};
25use reth_chainspec::ChainSpec;
26use reth_evm::ConfigureEvm;
27use reth_primitives_traits::{logs_bloom, NodePrimitives, SealedHeader};
28use reth_provider::{BlockNumReader, BlockReaderIdExt, HeaderProvider, StateProviderFactory};
29use reth_revm::database::StateProviderDatabase;
30use reth_storage_api::{StateProvider, StateProviderBox};
31use reth_trie_common::{HashedPostState, TrieInputSorted};
32use revm::database::{BundleState, StateBuilder};
33use revm_database::states::bundle_state::BundleRetention;
34use tracing::{debug, info, warn};
35
36use arb_evm::config::{arbos_version_from_mix_hash, ArbEvmConfig};
37use arb_primitives::{signed_tx::ArbTransactionSigned, tx_types::ArbInternalTx, ArbPrimitives};
38use arb_rpc::block_producer::{
39 BlockProducer, BlockProducerError, BlockProductionInput, ProducedBlock,
40};
41use arbos::{
42 arbos_types::parse_init_message,
43 header::{derive_arb_header_info, ArbHeaderInfo},
44 internal_tx,
45 parse_l2::{parse_l2_transactions, parsed_tx_to_signed, ParsedTransaction},
46};
47
48use crate::genesis;
49
50pub trait InMemoryStateAccess {
56 type Primitives: NodePrimitives;
57 fn canonical_in_memory_state(&self) -> CanonicalInMemoryState<Self::Primitives>;
58}
59
60impl<N> InMemoryStateAccess for reth_provider::providers::BlockchainProvider<N>
62where
63 N: reth_provider::providers::ProviderNodeTypes,
64{
65 type Primitives = N::Primitives;
66 fn canonical_in_memory_state(&self) -> CanonicalInMemoryState<Self::Primitives> {
67 self.canonical_in_memory_state()
68 }
69}
70
71pub const DEFAULT_FLUSH_INTERVAL: u64 = 128;
73
74pub struct ArbBlockProducer<Provider> {
76 provider: Provider,
77 chain_spec: Arc<ChainSpec>,
78 evm_config: ArbEvmConfig,
79 in_memory_state: CanonicalInMemoryState<ArbPrimitives>,
80 head_block_num: AtomicU64,
81 blocks_since_flush: AtomicU64,
82 flush_interval: u64,
83 accumulated_trie_input: Mutex<Arc<TrieInputSorted>>,
84 flushing_trie_input: Mutex<Option<Arc<TrieInputSorted>>>,
85 pending_flush: AtomicBool,
86 produce_lock: Mutex<()>,
87 cached_init: Mutex<Option<arbos::arbos_types::ParsedInitMessage>>,
88 finality: Mutex<FinalityMarkers>,
90 validated_watcher: Mutex<Option<Arc<parking_lot::RwLock<alloy_primitives::B256>>>>,
94 cached_overlay: Mutex<Option<CachedOverlay>>,
98 cached_prestate: Mutex<Option<CachedPrestate>>,
99}
100
101#[derive(Debug, Default, Clone)]
102struct FinalityMarkers {
103 safe: Option<alloy_primitives::B256>,
104 finalized: Option<alloy_primitives::B256>,
105 validated: Option<alloy_primitives::B256>,
106}
107
108struct CachedOverlay {
109 parent_hash: B256,
110 overlay: Arc<crate::coalesced_state::CoalescedOverlay>,
111}
112
113struct CachedPrestate {
114 parent_hash: B256,
115 contracts: Arc<alloy_primitives::map::HashMap<B256, revm::bytecode::Bytecode>>,
116}
117
118impl<Provider> ArbBlockProducer<Provider>
119where
120 Provider: BlockNumReader,
121{
122 pub fn new(
123 provider: Provider,
124 chain_spec: Arc<ChainSpec>,
125 evm_config: ArbEvmConfig,
126 in_memory_state: CanonicalInMemoryState<ArbPrimitives>,
127 flush_interval: u64,
128 ) -> Self {
129 let head = provider.last_block_number().unwrap_or(0);
130 Self {
131 provider,
132 chain_spec,
133 evm_config,
134 in_memory_state,
135 head_block_num: AtomicU64::new(head),
136 blocks_since_flush: AtomicU64::new(0),
137 flush_interval,
138 accumulated_trie_input: Mutex::new(Arc::new(TrieInputSorted::default())),
139 flushing_trie_input: Mutex::new(None),
140 pending_flush: AtomicBool::new(false),
141 produce_lock: Mutex::new(()),
142 cached_init: Mutex::new(None),
143 finality: Mutex::new(FinalityMarkers::default()),
144 validated_watcher: Mutex::new(None),
145 cached_overlay: Mutex::new(None),
146 cached_prestate: Mutex::new(None),
147 }
148 }
149
150 fn get_or_build_overlay(
151 &self,
152 parent_hash: B256,
153 head_state: &reth_chain_state::BlockState<ArbPrimitives>,
154 ) -> Arc<crate::coalesced_state::CoalescedOverlay> {
155 let mut cache = self.cached_overlay.lock();
156 if let Some(c) = cache.as_ref() {
157 if c.parent_hash == parent_hash {
158 return c.overlay.clone();
159 }
160 }
161 let overlay = Arc::new(crate::coalesced_state::CoalescedOverlay::from_chain(
162 head_state,
163 ));
164 *cache = Some(CachedOverlay {
165 parent_hash,
166 overlay: overlay.clone(),
167 });
168 overlay
169 }
170
171 fn extend_cached_overlay(&self, new_block_hash: B256, bundle: &BundleState) {
172 let mut cache = self.cached_overlay.lock();
173 let mut overlay = match cache.take() {
174 Some(c) => match Arc::try_unwrap(c.overlay) {
175 Ok(o) => o,
176 Err(arc) => (*arc).clone(),
177 },
178 None => crate::coalesced_state::CoalescedOverlay::default(),
179 };
180 overlay.extend_with_block(bundle);
181 *cache = Some(CachedOverlay {
182 parent_hash: new_block_hash,
183 overlay: Arc::new(overlay),
184 });
185 }
186
187 fn invalidate_cached_overlay(&self) {
188 *self.cached_overlay.lock() = None;
189 }
190
191 fn get_or_build_prestate(
192 &self,
193 parent_hash: B256,
194 head_state: Option<&reth_chain_state::BlockState<ArbPrimitives>>,
195 ) -> Arc<alloy_primitives::map::HashMap<B256, revm::bytecode::Bytecode>> {
196 let mut cache = self.cached_prestate.lock();
197 if let Some(c) = cache.as_ref() {
198 if c.parent_hash == parent_hash {
199 return c.contracts.clone();
200 }
201 }
202 let mut contracts: alloy_primitives::map::HashMap<B256, revm::bytecode::Bytecode> =
203 Default::default();
204 if let Some(head_state) = head_state {
205 for block_state in head_state.chain() {
206 let exec_output = &block_state.block().execution_output;
207 for (hash, code) in &exec_output.state.contracts {
208 contracts.entry(*hash).or_insert_with(|| code.clone());
209 }
210 }
211 }
212 let arc = Arc::new(contracts);
213 *cache = Some(CachedPrestate {
214 parent_hash,
215 contracts: arc.clone(),
216 });
217 arc
218 }
219
220 fn extend_cached_prestate(&self, new_block_hash: B256, bundle: &BundleState) {
221 let mut cache = self.cached_prestate.lock();
222 let mut contracts = match cache.take() {
223 Some(c) => match Arc::try_unwrap(c.contracts) {
224 Ok(map) => map,
225 Err(arc) => (*arc).clone(),
226 },
227 None => Default::default(),
228 };
229 for (hash, code) in &bundle.contracts {
230 contracts.entry(*hash).or_insert_with(|| code.clone());
231 }
232 *cache = Some(CachedPrestate {
233 parent_hash: new_block_hash,
234 contracts: Arc::new(contracts),
235 });
236 }
237
238 fn invalidate_cached_prestate(&self) {
239 *self.cached_prestate.lock() = None;
240 }
241
242 pub fn finality_markers(
244 &self,
245 ) -> (
246 Option<alloy_primitives::B256>,
247 Option<alloy_primitives::B256>,
248 Option<alloy_primitives::B256>,
249 ) {
250 let f = self.finality.lock();
251 (f.safe, f.finalized, f.validated)
252 }
253}
254
255impl<Provider> ArbBlockProducer<Provider>
256where
257 Provider: BlockNumReader
258 + BlockReaderIdExt
259 + HeaderProvider<Header = Header>
260 + StateProviderFactory
261 + Send
262 + Sync
263 + 'static,
264{
265 fn head_block_number(&self) -> Result<u64, BlockProducerError> {
267 let head = self.head_block_num.load(Ordering::SeqCst);
268 if head > 0 {
269 Ok(head)
270 } else {
271 self.provider
272 .last_block_number()
273 .map_err(|e| BlockProducerError::StateAccess(e.to_string()))
274 }
275 }
276
277 fn parent_header(&self, head_num: u64) -> Result<SealedHeader<Header>, BlockProducerError> {
279 self.provider
280 .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(head_num))
281 .map_err(|e| BlockProducerError::StateAccess(e.to_string()))?
282 .ok_or_else(|| {
283 BlockProducerError::StateAccess(format!("Parent block {head_num} not found"))
284 })
285 }
286
287 fn produce_block_with_execution(
289 &self,
290 input: &BlockProductionInput,
291 parsed_txs: Vec<ParsedTransaction>,
292 ) -> Result<ProducedBlock, BlockProducerError> {
293 if self.pending_flush.load(Ordering::SeqCst) {
295 if let Some(result) = crate::launcher::try_flush_result() {
296 self.in_memory_state
297 .remove_persisted_blocks(result.last_num_hash);
298 *self.flushing_trie_input.lock() = None;
299 self.pending_flush.store(false, Ordering::SeqCst);
300 self.invalidate_cached_overlay();
301 self.invalidate_cached_prestate();
302 info!(
303 target: "block_producer",
304 flushed = result.count,
305 last_block = result.last_num_hash.number,
306 duration_ms = result.duration.as_millis(),
307 "Background flush completed"
308 );
309 }
310 }
311
312 let head_num = self.head_block_number()?;
313 let l2_block_number = head_num + 1;
314 let parent_header = self.parent_header(head_num)?;
315
316 let timestamp = input.l1_timestamp.max(parent_header.timestamp());
317 let time_passed = timestamp.saturating_sub(parent_header.timestamp());
318
319 let parent_mix_hash = parent_header.mix_hash().unwrap_or_default();
320 let parent_arbos_version = arbos_version_from_mix_hash(&parent_mix_hash);
321
322 let l1_block_number = input.l1_block_number;
324 let arbos_version = parent_arbos_version; let send_count = {
328 let mut buf = [0u8; 8];
329 buf.copy_from_slice(&parent_mix_hash.0[0..8]);
330 u64::from_be_bytes(buf)
331 };
332 let provisional_mix_hash = compute_mix_hash(send_count, l1_block_number, arbos_version);
333
334 let raw_state_provider = self
336 .provider
337 .state_by_block_hash(parent_header.hash())
338 .map_err(|e| BlockProducerError::StateAccess(e.to_string()))?;
339
340 let state_provider: StateProviderBox =
341 if let Some(head_state) = self.in_memory_state.state_by_hash(parent_header.hash()) {
342 let overlay = self.get_or_build_overlay(parent_header.hash(), &head_state);
343 if overlay.is_empty() {
344 raw_state_provider
345 } else {
346 crate::coalesced_state::CoalescedStateProvider::new(raw_state_provider, overlay)
347 .boxed()
348 }
349 } else {
350 raw_state_provider
351 };
352
353 let l2_base_fee = {
355 let read_slot = |addr: Address, slot: B256| -> Option<U256> {
356 state_provider.storage(addr, slot).ok().flatten()
357 };
358 arbos::header::read_l2_base_fee(&read_slot).or(parent_header.base_fee_per_gas())
359 };
360
361 let provisional_header = Header {
363 parent_hash: parent_header.hash(),
364 ommers_hash: EMPTY_OMMER_ROOT_HASH,
365 beneficiary: input.sender,
366 state_root: B256::ZERO, transactions_root: B256::ZERO,
368 receipts_root: B256::ZERO,
369 withdrawals_root: None,
370 logs_bloom: Default::default(),
371 timestamp,
372 mix_hash: provisional_mix_hash,
373 nonce: B64::from(input.delayed_messages_read.to_be_bytes()),
374 base_fee_per_gas: l2_base_fee,
375 number: l2_block_number,
376 gas_limit: parent_header.gas_limit(),
377 difficulty: U256::from(1),
378 gas_used: 0,
379 extra_data: Default::default(),
380 parent_beacon_block_root: None,
381 blob_gas_used: None,
382 excess_blob_gas: None,
383 requests_hash: None,
384 };
385
386 let evm_env = self
387 .evm_config
388 .evm_env(&provisional_header)
389 .map_err(|_| BlockProducerError::Execution("evm_env construction failed".into()))?;
390
391 let prestate = {
398 let head_state_opt = self.in_memory_state.state_by_hash(parent_header.hash());
399 let contracts =
400 self.get_or_build_prestate(parent_header.hash(), head_state_opt.as_deref());
401 BundleState {
402 contracts: (*contracts).clone(),
403 ..Default::default()
404 }
405 };
406
407 let mut db = StateBuilder::new()
408 .with_database(StateProviderDatabase::new(state_provider.as_ref()))
409 .with_bundle_prestate(prestate)
410 .with_bundle_update()
411 .without_state_clear()
412 .build();
413
414 let chain_id = self.chain_spec.chain().id();
415
416 if let Some(init_msg) = self.cached_init.lock().take() {
423 if !genesis::is_arbos_initialized(&mut db) {
424 let initial_version = std::env::var("ARB_INITIAL_ARBOS_VERSION")
425 .ok()
426 .and_then(|v| v.parse::<u64>().ok())
427 .unwrap_or(genesis::INITIAL_ARBOS_VERSION);
428 info!(
429 target: "block_producer",
430 initial_version,
431 "Applying cached ArbOS Init during block {} execution",
432 l2_block_number
433 );
434 genesis::initialize_arbos_state(
435 &mut db,
436 &init_msg,
437 chain_id,
438 initial_version,
439 genesis::DEFAULT_CHAIN_OWNER,
440 genesis::ArbOSInit::default(),
441 )
442 .map_err(BlockProducerError::Execution)?;
443 } else {
444 use arbos::{arbos_state::ArbosState, burn::SystemBurner};
445 info!(
446 target: "block_producer",
447 initial_l1_base_fee = %init_msg.initial_l1_base_fee,
448 "ArbOS already initialized; overriding L1 price_per_unit from Init message"
449 );
450 let state_ptr = &mut db as *mut _;
451 if let Ok(mut arb_state) =
452 ArbosState::open(state_ptr, SystemBurner::new(None, false))
453 {
454 let _ = arb_state
455 .l1_pricing_state
456 .set_price_per_unit(init_msg.initial_l1_base_fee);
457 if let Ok(target) = std::env::var("ARB_INITIAL_ARBOS_VERSION") {
461 if let Ok(target_version) = target.parse::<u64>() {
462 let current = arb_state.arbos_version();
463 if target_version > current {
464 if let Err(e) =
465 arb_state.upgrade_arbos_version(target_version, true)
466 {
467 info!(target: "block_producer", err = ?e, target_version, "ArbOS upgrade via env var failed");
468 } else {
469 info!(
470 target: "block_producer",
471 from = current,
472 to = target_version,
473 "ArbOS upgraded via ARB_INITIAL_ARBOS_VERSION"
474 );
475 }
476 }
477 }
478 }
479 }
480 }
481 }
482
483 let parent_extra = parent_header.extra_data().to_vec();
484 let mut exec_extra = parent_extra.clone();
485 exec_extra.resize(32, 0);
486 exec_extra.extend_from_slice(&input.delayed_messages_read.to_be_bytes());
487
488 let exec_ctx = alloy_evm::eth::EthBlockExecutionCtx {
489 tx_count_hint: Some(parsed_txs.len() + 2), parent_hash: parent_header.hash(),
491 parent_beacon_block_root: None,
492 ommers: &[],
493 withdrawals: None,
494 extra_data: exec_extra.into(),
495 };
496
497 let evm = self
499 .evm_config
500 .block_executor_factory()
501 .evm_factory()
502 .create_evm(&mut db, evm_env.clone());
503 let mut executor = self
504 .evm_config
505 .block_executor_factory()
506 .create_arb_executor(evm, exec_ctx, chain_id);
507 executor.arb_ctx.l2_block_number = l2_block_number;
508 executor.arb_ctx.l1_block_number = l1_block_number;
509
510 {
512 let parent_num = l2_block_number.saturating_sub(1);
513 arb_precompiles::set_l2_block_hash(parent_num, parent_header.hash());
514
515 if arb_precompiles::get_l2_block_hash(parent_num.saturating_sub(1)).is_none()
517 && parent_num > 1
518 {
519 let mut hash = parent_header.parent_hash();
520 for i in 2..=256u64 {
521 let n = l2_block_number.checked_sub(i);
522 if let Some(n) = n {
523 arb_precompiles::set_l2_block_hash(n, hash);
524 match self
525 .provider
526 .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(n))
527 {
528 Ok(Some(h)) => hash = h.parent_hash(),
529 _ => break,
530 }
531 }
532 }
533 }
534 }
535
536 executor
538 .apply_pre_execution_changes()
539 .map_err(|e| BlockProducerError::Execution(format!("pre-exec: {e}")))?;
540
541 let mut all_txs: Vec<ArbTransactionSigned> = Vec::new();
542
543 let l1_base_fee = input.l1_base_fee.unwrap_or(U256::ZERO);
545 let start_block_data = internal_tx::encode_start_block(
546 l1_base_fee,
547 l1_block_number,
548 l2_block_number,
549 time_passed,
550 );
551
552 let start_block_tx = create_internal_tx(chain_id, &start_block_data);
553 execute_and_commit_tx(&mut executor, &start_block_tx, "StartBlock")?;
554 all_txs.push(start_block_tx);
555
556 let pre_recovered: Vec<Option<ArbTransactionSigned>> = {
558 use rayon::prelude::*;
559 parsed_txs
560 .par_iter()
561 .map(|parsed| match parsed {
562 ParsedTransaction::InternalStartBlock { .. }
563 | ParsedTransaction::BatchPostingReport { .. } => None,
564 other => {
565 let signed = parsed_tx_to_signed(other, chain_id)?;
566 let _ = signed.recover_signer();
567 Some(signed)
568 }
569 })
570 .collect()
571 };
572
573 for (idx, parsed) in parsed_txs.iter().enumerate() {
575 match parsed {
576 ParsedTransaction::InternalStartBlock { .. } => {
577 continue;
579 }
580 ParsedTransaction::BatchPostingReport {
581 batch_timestamp,
582 batch_poster,
583 batch_number,
584 l1_base_fee_estimate,
585 extra_gas,
586 ..
587 } => {
588 let report_data =
591 if parent_arbos_version >= arb_chainspec::arbos_version::ARBOS_VERSION_50 {
592 let (length, non_zeros) = input.batch_data_stats.unwrap_or((0, 0));
594 internal_tx::encode_batch_posting_report_v2(
595 *batch_timestamp,
596 *batch_poster,
597 *batch_number,
598 length,
599 non_zeros,
600 *extra_gas,
601 *l1_base_fee_estimate,
602 )
603 } else {
604 let legacy_gas = input.batch_gas_cost.unwrap_or(0);
606 let batch_data_gas = legacy_gas.saturating_add(*extra_gas);
607 internal_tx::encode_batch_posting_report(
608 *batch_timestamp,
609 *batch_poster,
610 *batch_number,
611 batch_data_gas,
612 *l1_base_fee_estimate,
613 )
614 };
615 let report_tx = create_internal_tx(chain_id, &report_data);
616 execute_and_commit_tx(&mut executor, &report_tx, "BatchPostingReport")?;
617 all_txs.push(report_tx);
618 continue;
619 }
620 _ => {}
621 }
622
623 let signed_tx = match pre_recovered.get(idx).and_then(|s| s.clone()) {
624 Some(tx) => tx,
625 None => {
626 debug!(target: "block_producer", ?parsed, "Skipping unparseable transaction");
627 continue;
628 }
629 };
630
631 let recovered = match signed_tx.clone().try_into_recovered() {
632 Ok(r) => r,
633 Err(e) => {
634 warn!(target: "block_producer", error = %e, "Failed to recover tx sender, skipping");
635 continue;
636 }
637 };
638 let tx_hash = *signed_tx.tx_hash();
639 let (exec_outcome, hostio_records) = arb_rpc::stylus_tracer::with_trace_buffer(|| {
640 executor.execute_transaction_without_commit(recovered)
641 });
642 match exec_outcome {
643 Ok(result) => {
644 match executor.commit_transaction(result) {
645 Ok(_gas_used) => {
646 all_txs.push(signed_tx);
647 if !hostio_records.is_empty() {
648 arb_rpc::stylus_tracer::cache_trace(tx_hash, hostio_records);
649 }
650
651 loop {
656 let scheduled = executor.drain_scheduled_txs();
657 debug!(
658 target: "block_producer",
659 count = scheduled.len(),
660 "Drained scheduled txs"
661 );
662 if scheduled.is_empty() {
663 break;
664 }
665 for encoded in scheduled {
666 let retry_tx: Option<ArbTransactionSigned> =
667 ArbTransactionSigned::decode_2718(&mut &encoded[..]).ok();
668 if let Some(retry_tx) = retry_tx {
669 let retry_signed = retry_tx.clone();
670 let retry_hash = *retry_signed.tx_hash();
671 match retry_tx.try_into_recovered() {
672 Ok(recovered_retry) => {
673 let (retry_outcome, retry_records) =
674 arb_rpc::stylus_tracer::with_trace_buffer(
675 || {
676 executor
677 .execute_transaction_without_commit(
678 recovered_retry,
679 )
680 },
681 );
682 match retry_outcome {
683 Ok(retry_result) => {
684 match executor
685 .commit_transaction(retry_result)
686 {
687 Ok(_) => {
688 all_txs.push(retry_signed);
689 if !retry_records.is_empty() {
690 arb_rpc::stylus_tracer::cache_trace(
691 retry_hash,
692 retry_records,
693 );
694 }
695 }
696 Err(e) => {
697 warn!(
698 target: "block_producer",
699 error = %e,
700 "Failed to commit auto-redeem tx"
701 );
702 }
703 }
704 }
705 Err(e) => {
706 warn!(
707 target: "block_producer",
708 error = %e,
709 "Auto-redeem tx execution failed"
710 );
711 }
712 }
713 }
714 Err(e) => {
715 warn!(
716 target: "block_producer",
717 error = %e,
718 "Failed to recover auto-redeem tx sender"
719 );
720 }
721 }
722 }
723 }
724 }
725 }
726 Err(e) => {
727 warn!(target: "block_producer", error = %e, "Failed to commit transaction");
728 }
729 }
730 }
731 Err(ref e) if e.to_string().contains("block gas limit reached") => {
732 break;
733 }
734 Err(e) => {
735 warn!(target: "block_producer", error = %e, "Transaction execution failed, skipping");
736 }
737 }
738 }
739
740 let zombie_accounts = executor.zombie_accounts().clone();
741 let finalise_deleted = executor.finalise_deleted().clone();
742
743 let (_, exec_result) = executor
744 .finish()
745 .map_err(|e| BlockProducerError::Execution(format!("finish: {e}")))?;
746
747 let receipts: Vec<arb_primitives::ArbReceipt> = exec_result.receipts;
748
749 db.merge_transitions(BundleRetention::Reverts);
750 let mut bundle = db.take_bundle();
751
752 augment_bundle_from_cache(&mut bundle, &db.cache, &*state_provider);
753
754 let keccak_empty_hash = alloy_primitives::B256::from(alloy_primitives::keccak256([]));
756 for addr in &finalise_deleted {
757 if zombie_accounts.contains(addr) {
758 continue;
759 }
760 if bundle.state.contains_key(addr) {
761 let existed_before = state_provider.basic_account(addr).ok().flatten().is_some();
762 if existed_before {
763 let still_empty = bundle
767 .state
768 .get(addr)
769 .and_then(|a| a.info.as_ref())
770 .is_none_or(|info| {
771 info.nonce == 0
772 && info.balance.is_zero()
773 && info.code_hash == keccak_empty_hash
774 });
775 if still_empty {
776 if let Some(bundle_acct) = bundle.state.get_mut(addr) {
777 bundle_acct.info = None;
778 }
779 }
780 } else {
781 let still_empty = bundle
782 .state
783 .get(addr)
784 .and_then(|a| a.info.as_ref())
785 .is_none_or(|info| {
786 info.nonce == 0
787 && info.balance.is_zero()
788 && info.code_hash == keccak_empty_hash
789 });
790 if still_empty {
791 bundle.state.remove(addr);
792 }
793 }
794 continue;
795 }
796 if let Ok(Some(acct)) = state_provider.basic_account(addr) {
797 let was_originally_empty = acct.balance.is_zero()
798 && acct.nonce == 0
799 && acct.bytecode_hash.is_none_or(|h| h == keccak_empty_hash);
800 if was_originally_empty {
801 continue;
802 }
803 bundle.state.insert(
804 *addr,
805 revm_database::BundleAccount {
806 info: None, original_info: None,
808 storage: Default::default(),
809 status: revm_database::AccountStatus::Changed,
810 },
811 );
812 }
813 }
814
815 filter_unchanged_storage(&mut bundle);
816 delete_empty_accounts(&mut bundle, &zombie_accounts, &*state_provider);
817
818 let hashed_state =
819 HashedPostState::from_bundle_state::<reth_trie_common::KeccakKeyHasher>(bundle.state());
820
821 let (state_root, trie_updates) = {
822 let acc_arc = self.accumulated_trie_input.lock().clone();
823 let flushing_arc = self.flushing_trie_input.lock().clone();
824
825 let block_state_sorted = hashed_state.clone().into_sorted();
826 let prefix_sets = block_state_sorted.construct_prefix_sets().freeze();
827
828 let mut new_acc_state = (*acc_arc.state).clone();
829 new_acc_state.extend_ref_and_sort(&block_state_sorted);
830 let new_acc_state_arc = Arc::new(new_acc_state);
831
832 let (overlay_state_arc, overlay_nodes_arc) = if let Some(f) = &flushing_arc {
833 let mut s = (*f.state).clone();
834 s.extend_ref_and_sort(&new_acc_state_arc);
835 let mut n = (*f.nodes).clone();
836 n.extend_ref_and_sort(&acc_arc.nodes);
837 (Arc::new(s), Arc::new(n))
838 } else {
839 (Arc::clone(&new_acc_state_arc), Arc::clone(&acc_arc.nodes))
840 };
841
842 let overlay = Arc::new(TrieInputSorted::new(
843 overlay_nodes_arc,
844 overlay_state_arc,
845 Default::default(),
846 ));
847
848 let (root, updates) =
849 crate::launcher::compute_parallel_state_root(overlay, prefix_sets)
850 .map_err(|e| BlockProducerError::Execution(format!("state root: {e}")))?;
851
852 let mut new_acc_nodes = (*acc_arc.nodes).clone();
853 new_acc_nodes.extend_ref_and_sort(&updates.clone_into_sorted());
854 *self.accumulated_trie_input.lock() = Arc::new(TrieInputSorted::new(
855 Arc::new(new_acc_nodes),
856 new_acc_state_arc,
857 Default::default(),
858 ));
859
860 (root, updates)
861 };
862
863 let arb_info = derive_header_info_from_state(state_provider.as_ref(), &bundle);
865
866 let final_mix_hash = arb_info
867 .as_ref()
868 .map(|info| info.compute_mix_hash())
869 .unwrap_or(provisional_mix_hash);
870
871 let extra_data: Bytes = arb_info
872 .as_ref()
873 .map(|info| {
874 let mut data = info.send_root.to_vec();
875 data.resize(32, 0);
876 data.into()
877 })
878 .unwrap_or_else(|| {
879 let mut data = parent_extra.clone();
880 data.resize(32, 0);
881 data.into()
882 });
883
884 let send_root = arb_info
885 .as_ref()
886 .map(|info| info.send_root)
887 .unwrap_or_else(|| {
888 if parent_extra.len() >= 32 {
889 B256::from_slice(&parent_extra[..32])
890 } else {
891 B256::ZERO
892 }
893 });
894
895 let gas_used = exec_result.gas_used;
897 let logs_bloom_val = logs_bloom(receipts.iter().flat_map(|r| r.logs()));
898
899 let transactions_root =
900 proofs::calculate_transaction_root::<ArbTransactionSigned>(&all_txs);
901 let receipts_root = proofs::calculate_receipt_root(
902 &receipts
903 .iter()
904 .map(|r| r.with_bloom_ref())
905 .collect::<Vec<_>>(),
906 );
907
908 let header = Header {
909 parent_hash: parent_header.hash(),
910 ommers_hash: EMPTY_OMMER_ROOT_HASH,
911 beneficiary: input.sender,
912 state_root,
913 transactions_root,
914 receipts_root,
915 withdrawals_root: None,
916 logs_bloom: logs_bloom_val,
917 timestamp,
918 mix_hash: final_mix_hash,
919 nonce: B64::from(input.delayed_messages_read.to_be_bytes()),
920 base_fee_per_gas: l2_base_fee,
921 number: l2_block_number,
922 gas_limit: parent_header.gas_limit(),
923 difficulty: U256::from(1),
924 gas_used,
925 extra_data,
926 parent_beacon_block_root: None,
927 blob_gas_used: None,
928 excess_blob_gas: None,
929 requests_hash: None,
930 };
931
932 let block = Block::<ArbTransactionSigned> {
933 header,
934 body: BlockBody {
935 transactions: all_txs,
936 ommers: Default::default(),
937 withdrawals: None,
938 },
939 };
940
941 let sealed = reth_primitives_traits::SealedBlock::seal_slow(block);
942 let block_hash = sealed.hash();
943
944 self.extend_cached_overlay(block_hash, &bundle);
945 self.extend_cached_prestate(block_hash, &bundle);
946
947 {
949 use alloy_evm::block::BlockExecutionResult;
950 use reth_chain_state::ComputedTrieData;
951 use reth_execution_types::BlockExecutionOutput;
952 use reth_primitives_traits::RecoveredBlock;
953
954 let recovered = Arc::new(RecoveredBlock::new_sealed(sealed.clone(), vec![]));
955 let exec_output = Arc::new(BlockExecutionOutput {
956 state: bundle,
957 result: BlockExecutionResult {
958 receipts,
959 requests: Default::default(),
960 gas_used,
961 blob_gas_used: 0,
962 },
963 });
964 let computed = ComputedTrieData {
965 hashed_state: Arc::new(hashed_state.into_sorted()),
966 trie_updates: Arc::new(trie_updates.into_sorted()),
967 anchored_trie_input: None,
968 };
969 let executed = ExecutedBlock::new(recovered, exec_output, computed);
970
971 self.in_memory_state
972 .update_chain(NewCanonicalChain::Commit {
973 new: vec![executed],
974 });
975
976 let sealed_header = SealedHeader::new(sealed.header().clone(), sealed.hash());
977 self.in_memory_state.set_canonical_head(sealed_header);
978 }
979
980 self.head_block_num.store(l2_block_number, Ordering::SeqCst);
981
982 let since_flush = self.blocks_since_flush.fetch_add(1, Ordering::SeqCst) + 1;
984 if since_flush >= self.flush_interval && !self.pending_flush.load(Ordering::SeqCst) {
985 self.start_async_flush();
986 }
987
988 info!(
989 target: "block_producer",
990 block_num = l2_block_number,
991 ?block_hash,
992 ?send_root,
993 ?state_root,
994 num_txs = sealed.body().transactions.len(),
995 gas_used,
996 "Produced block"
997 );
998
999 Ok(ProducedBlock {
1000 block_hash,
1001 send_root,
1002 })
1003 }
1004
1005 fn start_async_flush(&self) {
1007 let mut blocks: Vec<ExecutedBlock<ArbPrimitives>> = Vec::new();
1008 if let Some(head_state) = self.in_memory_state.head_state() {
1009 for block_state in head_state.chain() {
1010 blocks.push(block_state.block().clone());
1011 }
1012 }
1013 blocks.reverse();
1014
1015 if blocks.is_empty() {
1016 return;
1017 }
1018
1019 let last = blocks.last().unwrap();
1020 let last_num_hash = alloy_eips::BlockNumHash::new(
1021 last.recovered_block().number(),
1022 last.recovered_block().hash(),
1023 );
1024
1025 let current = std::mem::take(&mut *self.accumulated_trie_input.lock());
1027 *self.flushing_trie_input.lock() = Some(current);
1028
1029 self.blocks_since_flush.store(0, Ordering::SeqCst);
1030 self.pending_flush.store(true, Ordering::SeqCst);
1031
1032 let count = blocks.len();
1033 crate::launcher::start_flush(crate::launcher::FlushRequest {
1034 blocks,
1035 last_num_hash,
1036 });
1037
1038 debug!(
1039 target: "block_producer",
1040 count,
1041 last_block = last_num_hash.number,
1042 "Started async flush"
1043 );
1044 }
1045
1046 #[allow(dead_code)]
1048 fn produce_empty_block(
1049 &self,
1050 input: &BlockProductionInput,
1051 ) -> Result<ProducedBlock, BlockProducerError> {
1052 self.produce_block_with_execution(input, vec![])
1054 }
1055}
1056
1057#[async_trait::async_trait]
1058impl<Provider> BlockProducer for ArbBlockProducer<Provider>
1059where
1060 Provider: BlockNumReader
1061 + BlockReaderIdExt
1062 + HeaderProvider<Header = Header>
1063 + StateProviderFactory
1064 + Send
1065 + Sync
1066 + 'static,
1067{
1068 fn cache_init_message(&self, l2_msg: &[u8]) -> Result<(), BlockProducerError> {
1069 let init_msg = parse_init_message(l2_msg)
1070 .map_err(|e| BlockProducerError::Parse(format!("init message: {e}")))?;
1071
1072 info!(
1073 target: "block_producer",
1074 chain_id = %init_msg.chain_id,
1075 initial_l1_base_fee = %init_msg.initial_l1_base_fee,
1076 "Cached Init message params"
1077 );
1078
1079 *self.cached_init.lock() = Some(init_msg);
1080 Ok(())
1081 }
1082
1083 async fn produce_block(
1084 &self,
1085 msg_idx: u64,
1086 input: BlockProductionInput,
1087 ) -> Result<ProducedBlock, BlockProducerError> {
1088 let _lock = self.produce_lock.lock();
1089
1090 let head_num = self.head_block_number()?;
1092 let expected_block = head_num + 1;
1093 let actual_block = msg_idx;
1094
1095 if expected_block != actual_block {
1096 return Err(BlockProducerError::Unexpected(format!(
1097 "Expected block {expected_block} but got msg_idx {msg_idx} (block {actual_block})"
1098 )));
1099 }
1100
1101 let chain_id = self.chain_spec.chain().id();
1103
1104 let parsed_txs = parse_l2_transactions(
1105 input.kind,
1106 input.sender,
1107 &input.l2_msg,
1108 input.request_id,
1109 input.l1_base_fee,
1110 chain_id,
1111 )
1112 .unwrap_or_else(|e| {
1113 warn!(target: "block_producer", error=%e, "Error parsing L2 message, treating as empty");
1114 vec![]
1115 });
1116
1117 debug!(
1118 target: "block_producer",
1119 msg_idx,
1120 kind = input.kind,
1121 num_txs = parsed_txs.len(),
1122 "Parsed L1 message"
1123 );
1124
1125 self.produce_block_with_execution(&input, parsed_txs)
1126 }
1127
1128 async fn reset_to_block(&self, target_block_number: u64) -> Result<(), BlockProducerError> {
1129 let _lock = self.produce_lock.lock();
1130 let current = self.head_block_num.load(Ordering::SeqCst);
1131 if target_block_number > current {
1132 return Err(BlockProducerError::Unexpected(format!(
1133 "reset target {target_block_number} > current head {current}"
1134 )));
1135 }
1136 if target_block_number == current {
1137 return Ok(());
1138 }
1139
1140 let header = self
1141 .provider
1142 .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(target_block_number))
1143 .map_err(|e| BlockProducerError::StateAccess(e.to_string()))?
1144 .ok_or_else(|| {
1145 BlockProducerError::Unexpected(format!(
1146 "reset target block {target_block_number} not found"
1147 ))
1148 })?;
1149
1150 if self.pending_flush.load(Ordering::SeqCst) {
1152 if let Some(result) = crate::launcher::try_flush_result() {
1153 self.in_memory_state
1154 .remove_persisted_blocks(result.last_num_hash);
1155 *self.flushing_trie_input.lock() = None;
1156 self.pending_flush.store(false, Ordering::SeqCst);
1157 }
1158 }
1159
1160 let mut old_blocks: Vec<reth_chain_state::ExecutedBlock<ArbPrimitives>> = Vec::new();
1165 for bn in (target_block_number + 1)..=current {
1166 if let Some(state) = self.in_memory_state.state_by_number(bn) {
1167 old_blocks.push(state.block());
1168 }
1169 }
1170
1171 if !old_blocks.is_empty() {
1173 self.in_memory_state
1174 .update_chain(reth_chain_state::NewCanonicalChain::Reorg {
1175 new: Vec::new(),
1176 old: old_blocks,
1177 });
1178 }
1179
1180 self.invalidate_cached_overlay();
1181 self.invalidate_cached_prestate();
1182
1183 self.in_memory_state.set_canonical_head(header.clone());
1186
1187 self.head_block_num
1190 .store(target_block_number, Ordering::SeqCst);
1191
1192 if let Some(rx) = crate::launcher::start_unwind(target_block_number) {
1195 match rx.recv() {
1196 Ok(Ok(())) => {}
1197 Ok(Err(e)) => {
1198 return Err(BlockProducerError::Storage(format!(
1199 "unwind above {target_block_number}: {e}"
1200 )));
1201 }
1202 Err(e) => {
1203 return Err(BlockProducerError::Storage(format!(
1204 "unwind channel closed: {e}"
1205 )));
1206 }
1207 }
1208 }
1209
1210 *self.accumulated_trie_input.lock() = Arc::new(TrieInputSorted::default());
1212
1213 info!(
1214 target: "block_producer",
1215 target = target_block_number,
1216 hash = %header.hash(),
1217 old_count = current - target_block_number,
1218 "reset head"
1219 );
1220 Ok(())
1221 }
1222
1223 fn set_finality(
1224 &self,
1225 safe: Option<alloy_primitives::B256>,
1226 finalized: Option<alloy_primitives::B256>,
1227 validated: Option<alloy_primitives::B256>,
1228 ) -> Result<(), BlockProducerError> {
1229 let mut f = self.finality.lock();
1230 if safe.is_some() {
1231 f.safe = safe;
1232 }
1233 if finalized.is_some() {
1234 f.finalized = finalized;
1235 }
1236 if validated.is_some() {
1237 f.validated = validated;
1238 }
1239 drop(f);
1240
1241 if let Some(h) = safe {
1245 if let Ok(Some(sealed)) = self.provider.sealed_header_by_hash(h) {
1246 self.in_memory_state.set_safe(sealed);
1247 }
1248 }
1249 if let Some(h) = finalized {
1250 if let Ok(Some(sealed)) = self.provider.sealed_header_by_hash(h) {
1251 self.in_memory_state.set_finalized(sealed);
1252 }
1253 }
1254 if let Some(h) = validated {
1258 if let Some(w) = self.validated_watcher.lock().as_ref() {
1259 *w.write() = h;
1260 }
1261 }
1262 Ok(())
1263 }
1264
1265 fn attach_validated_watcher(&self, watcher: Arc<parking_lot::RwLock<alloy_primitives::B256>>) {
1266 *self.validated_watcher.lock() = Some(watcher);
1267 }
1268}
1269
1270fn create_internal_tx(chain_id: u64, data: &[u8]) -> ArbTransactionSigned {
1276 use arb_primitives::signed_tx::ArbTypedTransaction;
1277 let tx = ArbTypedTransaction::Internal(ArbInternalTx {
1278 chain_id: U256::from(chain_id),
1279 data: Bytes::copy_from_slice(data),
1280 });
1281 let sig = alloy_primitives::Signature::new(U256::ZERO, U256::ZERO, false);
1282 ArbTransactionSigned::new_unhashed(tx, sig)
1283}
1284
1285fn execute_and_commit_tx<E>(
1287 executor: &mut E,
1288 tx: &ArbTransactionSigned,
1289 label: &str,
1290) -> Result<(), BlockProducerError>
1291where
1292 E: BlockExecutor<Transaction = ArbTransactionSigned>,
1293{
1294 let recovered = tx
1295 .clone()
1296 .try_into_recovered()
1297 .map_err(|e| BlockProducerError::Execution(format!("{label} recovery: {e}")))?;
1298
1299 let result = executor
1300 .execute_transaction_without_commit(recovered)
1301 .map_err(|e| BlockProducerError::Execution(format!("{label} execution: {e}")))?;
1302
1303 executor
1304 .commit_transaction(result)
1305 .map_err(|e| BlockProducerError::Execution(format!("{label} commit: {e}")))?;
1306
1307 Ok(())
1308}
1309
1310fn compute_mix_hash(send_count: u64, l1_block_number: u64, arbos_version: u64) -> B256 {
1312 let mut bytes = [0u8; 32];
1313 bytes[0..8].copy_from_slice(&send_count.to_be_bytes());
1314 bytes[8..16].copy_from_slice(&l1_block_number.to_be_bytes());
1315 bytes[16..24].copy_from_slice(&arbos_version.to_be_bytes());
1316 B256::from(bytes)
1317}
1318
1319fn delete_empty_accounts(
1321 bundle: &mut BundleState,
1322 zombie_accounts: &rustc_hash::FxHashSet<Address>,
1323 state_provider: &dyn StateProvider,
1324) {
1325 let keccak_empty = alloy_primitives::B256::from(alloy_primitives::keccak256([]));
1326 let mut to_remove = Vec::new();
1327 for (addr, account) in bundle.state.iter_mut() {
1328 if let Some(ref info) = account.info {
1329 let is_empty =
1330 info.nonce == 0 && info.balance.is_zero() && info.code_hash == keccak_empty;
1331 if is_empty && !zombie_accounts.contains(addr) {
1332 let existed_before = state_provider.basic_account(addr).ok().flatten().is_some();
1333 if existed_before {
1334 debug!(
1335 target: "block_producer",
1336 addr = ?addr,
1337 "EIP-161: deleting empty account from state"
1338 );
1339 account.info = None;
1340 } else {
1341 to_remove.push(*addr);
1342 }
1343 }
1344 }
1345 }
1346 for addr in to_remove {
1347 bundle.state.remove(&addr);
1348 }
1349}
1350
1351fn filter_unchanged_storage(bundle: &mut BundleState) {
1353 for (_addr, account) in bundle.state.iter_mut() {
1354 account
1355 .storage
1356 .retain(|_key, slot| slot.present_value != slot.previous_or_original_value);
1357 }
1358}
1359
1360fn derive_header_info_from_state(
1362 state_provider: &dyn StateProvider,
1363 bundle_state: &BundleState,
1364) -> Option<ArbHeaderInfo> {
1365 let read_slot = |addr: Address, slot: B256| -> Option<U256> {
1366 if let Some(account) = bundle_state.state.get(&addr) {
1367 let slot_u256 = U256::from_be_bytes(slot.0);
1368 if let Some(storage_slot) = account.storage.get(&slot_u256) {
1369 return Some(storage_slot.present_value);
1370 }
1371 }
1372 state_provider.storage(addr, slot).ok().flatten()
1373 };
1374
1375 derive_arb_header_info(&read_slot)
1376}
1377
1378fn augment_bundle_from_cache(
1380 bundle: &mut BundleState,
1381 cache: &revm_database::CacheState,
1382 state_provider: &dyn StateProvider,
1383) {
1384 use revm_database::states::plain_account::StorageSlot;
1385
1386 for (addr, cache_acct) in &cache.accounts {
1387 let current_info = cache_acct.account.as_ref().map(|a| a.info.clone());
1388 let current_storage = cache_acct
1389 .account
1390 .as_ref()
1391 .map(|a| &a.storage)
1392 .cloned()
1393 .unwrap_or_default();
1394
1395 if let Some(bundle_acct) = bundle.state.get_mut(addr) {
1396 bundle_acct.info = current_info;
1398
1399 for (key, value) in ¤t_storage {
1400 if let Some(slot) = bundle_acct.storage.get_mut(key) {
1401 slot.present_value = *value;
1402 } else {
1403 let original_value = state_provider
1405 .storage(*addr, B256::from(*key))
1406 .ok()
1407 .flatten()
1408 .unwrap_or(U256::ZERO);
1409 if *value != original_value {
1410 bundle_acct.storage.insert(
1411 *key,
1412 StorageSlot {
1413 previous_or_original_value: original_value,
1414 present_value: *value,
1415 },
1416 );
1417 }
1418 }
1419 }
1420 } else {
1421 let original = state_provider.basic_account(addr).ok().flatten();
1423
1424 let info_changed = match (&original, ¤t_info) {
1425 (None, None) => false,
1426 (Some(_), None) | (None, Some(_)) => true,
1427 (Some(orig), Some(curr)) => {
1428 orig.balance != curr.balance
1429 || orig.nonce != curr.nonce
1430 || orig
1431 .bytecode_hash
1432 .unwrap_or(alloy_primitives::KECCAK256_EMPTY)
1433 != curr.code_hash
1434 }
1435 };
1436
1437 let storage_changes: alloy_primitives::map::HashMap<U256, StorageSlot> =
1438 current_storage
1439 .iter()
1440 .filter_map(|(key, value)| {
1441 let original_value = state_provider
1442 .storage(*addr, B256::from(*key))
1443 .ok()
1444 .flatten()
1445 .unwrap_or(U256::ZERO);
1446 if original_value != *value {
1447 Some((
1448 *key,
1449 StorageSlot {
1450 previous_or_original_value: original_value,
1451 present_value: *value,
1452 },
1453 ))
1454 } else {
1455 None
1456 }
1457 })
1458 .collect();
1459
1460 if info_changed || !storage_changes.is_empty() {
1461 let original_info = original.as_ref().map(|a| revm::state::AccountInfo {
1462 balance: a.balance,
1463 nonce: a.nonce,
1464 code_hash: a.bytecode_hash.unwrap_or(alloy_primitives::KECCAK256_EMPTY),
1465 code: None,
1466 account_id: None,
1467 });
1468
1469 let status = if original.is_some() {
1470 revm_database::AccountStatus::Changed
1471 } else {
1472 revm_database::AccountStatus::InMemoryChange
1473 };
1474
1475 bundle.state.insert(
1476 *addr,
1477 revm_database::BundleAccount {
1478 info: current_info,
1479 original_info,
1480 storage: storage_changes,
1481 status,
1482 },
1483 );
1484 }
1485 }
1486 }
1487}