1use crate::engine::{build_arb_engine_orchestrator, TreeSender};
12use alloy_consensus::BlockHeader;
13use futures::{stream::FusedStream, stream_select, FutureExt, StreamExt};
14use reth_chainspec::{EthChainSpec, EthereumHardforks};
15use reth_engine_tree::{
16 chain::{ChainEvent, FromOrchestrator},
17 engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
18 tree::TreeConfig,
19};
20use reth_engine_util::EngineMessageStreamExt;
21use reth_exex::ExExManagerHandle;
22use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
23use reth_network_api::BlockDownloaderProvider;
24use reth_node_api::{
25 BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
26};
27use reth_node_builder::{
28 common::{Attached, LaunchContextWith, WithConfigs},
29 hooks::NodeHooks,
30 rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
31 setup::build_networked_pipeline,
32 AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
33 NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
34};
35use reth_node_core::{
36 dirs::{ChainPath, DataDirPath},
37 exit::NodeExitFuture,
38 primitives::Head,
39};
40use reth_node_events::node;
41use reth_provider::{
42 providers::{BlockchainProvider, NodeTypesForProvider},
43 BlockNumReader, StorageSettingsCache,
44};
45use reth_tasks::TaskExecutor;
46use reth_tokio_util::EventSender;
47use reth_tracing::tracing::{debug, error, info};
48use reth_trie_db::ChangesetCache;
49use std::{
50 future::Future,
51 pin::Pin,
52 sync::{Arc, OnceLock},
53};
54use tokio::sync::{mpsc::unbounded_channel, oneshot};
55use tokio_stream::wrappers::UnboundedReceiverStream;
56
57use arb_payload::ArbEngineTypes;
58use arb_primitives::ArbPrimitives;
59
60static TREE_SENDER: OnceLock<TreeSender<ArbEngineTypes, ArbPrimitives>> = OnceLock::new();
61static ENGINE_HANDLE: OnceLock<ConsensusEngineHandle<ArbEngineTypes>> = OnceLock::new();
62static FLUSH_HANDLE: OnceLock<FlushHandle> = OnceLock::new();
63static PARALLEL_STATE_ROOT_FN: OnceLock<ParallelStateRootFn> = OnceLock::new();
64
65pub enum PersistenceRequest {
67 Flush(FlushRequest),
68 Unwind {
69 target: u64,
70 done: crossbeam_channel::Sender<Result<(), String>>,
71 },
72}
73
74pub struct FlushRequest {
76 pub blocks: Vec<reth_chain_state::ExecutedBlock<ArbPrimitives>>,
77 pub last_num_hash: alloy_eips::BlockNumHash,
78}
79
80pub struct FlushResult {
82 pub last_num_hash: alloy_eips::BlockNumHash,
83 pub count: usize,
84 pub duration: std::time::Duration,
85}
86
87struct FlushHandle {
89 sender: std::sync::mpsc::Sender<PersistenceRequest>,
90 result_rx: crossbeam_channel::Receiver<FlushResult>,
91}
92
93type ParallelStateRootFn = Box<
95 dyn Fn(
96 Arc<reth_trie_common::TrieInputSorted>,
97 reth_trie_common::prefix_set::TriePrefixSets,
98 ) -> Result<(alloy_primitives::B256, reth_trie::updates::TrieUpdates), String>
99 + Send
100 + Sync,
101>;
102
103pub fn tree_sender() -> Option<&'static TreeSender<ArbEngineTypes, ArbPrimitives>> {
104 TREE_SENDER.get()
105}
106
107pub fn engine_handle() -> Option<&'static ConsensusEngineHandle<ArbEngineTypes>> {
108 ENGINE_HANDLE.get()
109}
110
111pub fn start_flush(request: FlushRequest) {
113 if let Some(handle) = FLUSH_HANDLE.get() {
114 if let Err(e) = handle.sender.send(PersistenceRequest::Flush(request)) {
115 error!(target: "reth::cli", "Failed to send flush request: {e}");
116 }
117 }
118}
119
120pub fn start_unwind(target: u64) -> Option<crossbeam_channel::Receiver<Result<(), String>>> {
125 let handle = FLUSH_HANDLE.get()?;
126 let (done_tx, done_rx) = crossbeam_channel::bounded(1);
127 if let Err(e) = handle.sender.send(PersistenceRequest::Unwind {
128 target,
129 done: done_tx,
130 }) {
131 error!(target: "reth::cli", "Failed to send unwind request: {e}");
132 return None;
133 }
134 Some(done_rx)
135}
136
137pub fn try_flush_result() -> Option<FlushResult> {
139 FLUSH_HANDLE
140 .get()
141 .and_then(|handle| handle.result_rx.try_recv().ok())
142}
143
144pub fn compute_parallel_state_root(
145 overlay: Arc<reth_trie_common::TrieInputSorted>,
146 prefix_sets: reth_trie_common::prefix_set::TriePrefixSets,
147) -> Result<(alloy_primitives::B256, reth_trie::updates::TrieUpdates), String> {
148 let f = PARALLEL_STATE_ROOT_FN
149 .get()
150 .ok_or_else(|| "parallel_state_root not initialized".to_string())?;
151 f(overlay, prefix_sets)
152}
153
154#[derive(Debug)]
159pub struct ArbEngineLauncher {
160 pub ctx: LaunchContext,
161 pub engine_tree_config: TreeConfig,
162}
163
164impl ArbEngineLauncher {
165 pub const fn new(
166 task_executor: TaskExecutor,
167 data_dir: ChainPath<DataDirPath>,
168 engine_tree_config: TreeConfig,
169 ) -> Self {
170 Self {
171 ctx: LaunchContext::new(task_executor, data_dir),
172 engine_tree_config,
173 }
174 }
175
176 async fn launch_node<T, CB, AO>(
179 self,
180 target: NodeBuilderWithComponents<T, CB, AO>,
181 ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
182 where
183 T: FullNodeTypes<
184 Types: NodeTypesForProvider<Payload = ArbEngineTypes, Primitives = ArbPrimitives>,
185 Provider = BlockchainProvider<
186 NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
187 >,
188 >,
189 CB: NodeComponentsBuilder<T>,
190 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
191 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
192 {
193 let Self {
194 ctx,
195 engine_tree_config,
196 } = self;
197 let NodeBuilderWithComponents {
198 adapter: NodeTypesAdapter { database },
199 components_builder,
200 add_ons:
201 AddOns {
202 hooks,
203 exexs: installed_exex,
204 add_ons,
205 },
206 config,
207 } = target;
208 let NodeHooks {
209 on_component_initialized,
210 on_node_started,
211 ..
212 } = hooks;
213
214 let changeset_cache = ChangesetCache::new();
215
216 let ctx = ctx
217 .with_configured_globals(engine_tree_config.reserved_cpu_cores())
218 .with_loaded_toml_config(config)?
219 .with_resolved_peers()?
220 .attach(database.clone())
221 .with_adjusted_configs()
222 .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(
223 changeset_cache.clone(),
224 )
225 .await?
226 .inspect(|_| {
227 info!(target: "reth::cli", "Database opened");
228 })
229 .with_prometheus_server()
230 .await?
231 .inspect(|this| {
232 debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
233 })
234 .with_genesis()?
235 .inspect(
236 |this: &LaunchContextWith<
237 Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>,
238 >| {
239 info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
240 let settings = this.provider_factory().cached_storage_settings();
241 info!(target: "reth::cli", ?settings, "Loaded storage settings");
242 },
243 )
244 .with_metrics_task()
245 .with_blockchain_db::<T, _>(move |provider_factory| {
246 Ok(BlockchainProvider::new(provider_factory)?)
247 })?
248 .with_components(components_builder, on_component_initialized)
249 .await?;
250
251 let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
252
253 let network_handle = ctx.components().network().clone();
254 let network_client = network_handle.fetch_client().await?;
255 let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
256
257 let node_config = ctx.node_config();
258
259 network_handle.update_sync_state(SyncState::Syncing);
260
261 let max_block = ctx.max_block(network_client.clone()).await?;
262
263 let static_file_producer = ctx.static_file_producer();
264 let static_file_producer_events = static_file_producer.lock().events();
265 info!(target: "reth::cli", "StaticFileProducer initialized");
266
267 let consensus = Arc::new(ctx.components().consensus().clone());
268
269 let pipeline = build_networked_pipeline(
270 &ctx.toml_config().stages,
271 network_client.clone(),
272 consensus.clone(),
273 ctx.provider_factory().clone(),
274 ctx.task_executor(),
275 ctx.sync_metrics_tx(),
276 ctx.prune_config(),
277 max_block,
278 static_file_producer,
279 ctx.components().evm_config().clone(),
280 maybe_exex_manager_handle
281 .clone()
282 .unwrap_or_else(ExExManagerHandle::empty),
283 ctx.era_import_source(),
284 )?;
285
286 pipeline.move_to_static_files()?;
287
288 let pipeline_events = pipeline.events();
289
290 let mut pruner_builder = ctx.pruner_builder();
291 if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
292 pruner_builder =
293 pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
294 }
295 let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
296 let pruner_events = pruner.events();
297 info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
298
299 let event_sender = EventSender::default();
300
301 let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
302
303 let jwt_secret = ctx.auth_jwt_secret()?;
304
305 let add_ons_ctx = AddOnsContext {
306 node: ctx.node_adapter().clone(),
307 config: ctx.node_config(),
308 beacon_engine_handle: beacon_engine_handle.clone(),
309 jwt_secret,
310 engine_events: event_sender.clone(),
311 };
312 let validator_builder = add_ons.engine_validator_builder();
313
314 let engine_validator = validator_builder
315 .clone()
316 .build_tree_validator(
317 &add_ons_ctx,
318 engine_tree_config.clone(),
319 changeset_cache.clone(),
320 )
321 .await?;
322
323 let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
324 .maybe_skip_fcu(node_config.debug.skip_fcu)
325 .maybe_skip_new_payload(node_config.debug.skip_new_payload)
326 .maybe_reorg(
327 ctx.blockchain_db().clone(),
328 ctx.components().evm_config().clone(),
329 || async {
330 let reorg_cache = ChangesetCache::new();
331 validator_builder
332 .build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
333 .await
334 },
335 node_config.debug.reorg_frequency,
336 node_config.debug.reorg_depth,
337 )
338 .await?
339 .maybe_store_messages(node_config.debug.engine_api_store.clone());
340
341 let engine_kind = if ctx.chain_spec().is_optimism() {
342 EngineApiKind::OpStack
343 } else {
344 EngineApiKind::Ethereum
345 };
346
347 {
351 use reth_provider::{DatabaseProviderFactory, SaveBlocksMode};
352 use reth_storage_api::{BlockExecutionWriter, DBProvider};
353
354 let pf = ctx.provider_factory().clone();
355 let (req_tx, req_rx) = std::sync::mpsc::channel::<PersistenceRequest>();
356 let (res_tx, res_rx) = crossbeam_channel::bounded::<FlushResult>(1);
357
358 std::thread::Builder::new()
359 .name("arb-persistence".into())
360 .spawn(move || {
361 while let Ok(req) = req_rx.recv() {
362 match req {
363 PersistenceRequest::Flush(flush) => {
364 let start = std::time::Instant::now();
365 let count = flush.blocks.len();
366 let last = flush.last_num_hash;
367
368 let result = (|| -> Result<(), String> {
369 let provider_rw = pf
370 .database_provider_rw()
371 .map_err(|e| format!("database_provider_rw: {e}"))?;
372 provider_rw
373 .save_blocks(flush.blocks, SaveBlocksMode::Full)
374 .map_err(|e| format!("save_blocks: {e}"))?;
375 provider_rw.commit().map_err(|e| format!("commit: {e}"))?;
376 Ok(())
377 })();
378
379 match result {
380 Ok(()) => {
381 let _ = res_tx.send(FlushResult {
382 last_num_hash: last,
383 count,
384 duration: start.elapsed(),
385 });
386 }
387 Err(e) => {
388 error!(target: "reth::cli", "Background flush failed: {e}");
389 let _ = res_tx.send(FlushResult {
390 last_num_hash: last,
391 count: 0, duration: start.elapsed(),
393 });
394 }
395 }
396 }
397 PersistenceRequest::Unwind { target, done } => {
398 let start = std::time::Instant::now();
399 let result = (|| -> Result<(), String> {
400 let provider_rw = pf
401 .database_provider_rw()
402 .map_err(|e| format!("database_provider_rw: {e}"))?;
403 provider_rw
404 .remove_block_and_execution_above(target)
405 .map_err(|e| {
406 format!("remove_block_and_execution_above: {e}")
407 })?;
408 provider_rw.commit().map_err(|e| format!("commit: {e}"))?;
409 Ok(())
410 })();
411 match &result {
412 Ok(()) => info!(
413 target: "reth::cli",
414 target,
415 duration_ms = start.elapsed().as_millis(),
416 "Persisted unwind complete"
417 ),
418 Err(e) => error!(
419 target: "reth::cli",
420 target,
421 err = %e,
422 "Persisted unwind failed"
423 ),
424 }
425 let _ = done.send(result);
426 }
427 }
428 }
429 })
430 .expect("failed to spawn persistence thread");
431
432 let _ = FLUSH_HANDLE.set(FlushHandle {
433 sender: req_tx,
434 result_rx: res_rx,
435 });
436 }
437
438 {
439 use reth_chain_state::{
440 AnchoredTrieInput, ComputedTrieData, DeferredTrieData, LazyOverlay,
441 };
442 use reth_provider::providers::OverlayStateProviderFactory;
443 use reth_trie_parallel::root::ParallelStateRoot;
444
445 let pf = ctx.provider_factory().clone();
446 let runtime = ctx.task_executor().clone();
447 let changeset_cache_for_root = changeset_cache.clone();
448
449 let state_root_fn: ParallelStateRootFn = Box::new(move |overlay, prefix_sets| {
450 let anchor_hash = alloy_primitives::B256::ZERO;
451 let computed = ComputedTrieData {
452 hashed_state: Arc::clone(&overlay.state),
453 trie_updates: Arc::clone(&overlay.nodes),
454 anchored_trie_input: Some(AnchoredTrieInput {
455 anchor_hash,
456 trie_input: overlay,
457 }),
458 };
459 let lazy = LazyOverlay::new(anchor_hash, vec![DeferredTrieData::ready(computed)]);
460 let factory =
461 OverlayStateProviderFactory::new(pf.clone(), changeset_cache_for_root.clone())
462 .with_lazy_overlay(Some(lazy));
463
464 ParallelStateRoot::new(factory, prefix_sets, runtime.clone())
465 .incremental_root_with_updates()
466 .map_err(|e| format!("parallel state root: {e}"))
467 });
468 let _ = PARALLEL_STATE_ROOT_FN.set(state_root_fn);
469 }
470
471 let (mut orchestrator, arb_tree_sender) = build_arb_engine_orchestrator(
472 engine_kind,
473 consensus.clone(),
474 network_client.clone(),
475 Box::pin(consensus_engine_stream),
476 pipeline,
477 ctx.task_executor().clone(),
478 ctx.provider_factory().clone(),
479 ctx.blockchain_db().clone(),
480 pruner,
481 ctx.components().payload_builder_handle().clone(),
482 engine_validator,
483 engine_tree_config,
484 ctx.sync_metrics_tx(),
485 ctx.components().evm_config().clone(),
486 changeset_cache,
487 );
488
489 let _ = TREE_SENDER.set(arb_tree_sender);
490 let _ = ENGINE_HANDLE.set(beacon_engine_handle.clone());
491 info!(target: "reth::cli", "Arbitrum engine tree sender and handle captured");
492
493 info!(target: "reth::cli", "Consensus engine initialized");
494
495 #[allow(clippy::needless_continue)]
496 let events = stream_select!(
497 event_sender.new_listener().map(Into::into),
498 pipeline_events.map(Into::into),
499 ctx.consensus_layer_events(),
500 pruner_events.map(Into::into),
501 static_file_producer_events.map(Into::into),
502 );
503
504 ctx.task_executor().spawn_critical_task(
505 "events task",
506 Box::pin(node::handle_events(
507 Some(Box::new(ctx.components().network().clone())),
508 Some(ctx.head().number),
509 events,
510 )),
511 );
512
513 let RpcHandle {
514 rpc_server_handles,
515 rpc_registry,
516 engine_events,
517 beacon_engine_handle,
518 engine_shutdown: _,
519 } = add_ons.launch_add_ons(add_ons_ctx).await?;
520
521 let (engine_shutdown, shutdown_rx) = EngineShutdown::new();
522
523 let initial_target = ctx.initial_backfill_target()?;
524 let mut built_payloads = ctx
525 .components()
526 .payload_builder_handle()
527 .subscribe()
528 .await
529 .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
530 .into_built_payload_stream()
531 .fuse();
532
533 let chainspec = ctx.chain_spec();
534 let provider = ctx.blockchain_db().clone();
535 let (exit, rx) = oneshot::channel();
536 let terminate_after_backfill = ctx.terminate_after_initial_backfill();
537 let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
538
539 info!(target: "reth::cli", "Starting consensus engine");
540 let consensus_engine = async move {
541 if let Some(initial_target) = initial_target {
542 debug!(target: "reth::cli", %initial_target, "start backfill sync");
543 orchestrator.start_backfill_sync(initial_target);
544 } else if startup_sync_state_idle {
545 network_handle.update_sync_state(SyncState::Idle);
546 }
547
548 let mut res = Ok(());
549 let mut shutdown_rx = shutdown_rx.fuse();
550
551 loop {
552 tokio::select! {
553 event = orchestrator.next() => {
554 let Some(event) = event else { break };
555 debug!(target: "reth::cli", "Event: {event}");
556 match event {
557 ChainEvent::BackfillSyncFinished => {
558 if terminate_after_backfill {
559 debug!(target: "reth::cli", "Terminating after initial backfill");
560 break
561 }
562 if startup_sync_state_idle {
563 network_handle.update_sync_state(SyncState::Idle);
564 }
565 }
566 ChainEvent::BackfillSyncStarted => {
567 network_handle.update_sync_state(SyncState::Syncing);
568 }
569 ChainEvent::FatalError => {
570 error!(target: "reth::cli", "Fatal error in consensus engine");
571 res = Err(eyre::eyre!("Fatal error in consensus engine"));
572 break
573 }
574 ChainEvent::Handler(ev) => {
575 if let Some(head) = ev.canonical_header() {
576 network_handle.update_sync_state(SyncState::Idle);
577 let head_block = Head {
578 number: head.number(),
579 hash: head.hash(),
580 difficulty: head.difficulty(),
581 timestamp: head.timestamp(),
582 total_difficulty: chainspec.final_paris_total_difficulty()
583 .filter(|_| chainspec.is_paris_active_at_block(head.number()))
584 .unwrap_or_default(),
585 };
586 network_handle.update_status(head_block);
587
588 let updated = BlockRangeUpdate {
589 earliest: provider.earliest_block_number().unwrap_or_default(),
590 latest: head.number(),
591 latest_hash: head.hash(),
592 };
593 network_handle.update_block_range(updated);
594 }
595 event_sender.notify(ev);
596 }
597 }
598 }
599 payload = built_payloads.select_next_some(), if !built_payloads.is_terminated() => {
600 if let Some(executed_block) = payload.executed_block() {
601 debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
602 orchestrator.handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
603 }
604 }
605 shutdown_req = &mut shutdown_rx => {
606 if let Ok(req) = shutdown_req {
607 debug!(target: "reth::cli", "received engine shutdown request");
608 orchestrator.handler_mut().handler_mut().on_event(
609 FromOrchestrator::Terminate { tx: req.done_tx }.into()
610 );
611 }
612 }
613 }
614 }
615
616 let _ = exit.send(res);
617 };
618 ctx.task_executor()
619 .spawn_critical_task("consensus engine", Box::pin(consensus_engine));
620
621 let engine_events_for_ethstats = engine_events.new_listener();
622
623 let full_node = FullNode {
624 evm_config: ctx.components().evm_config().clone(),
625 pool: ctx.components().pool().clone(),
626 network: ctx.components().network().clone(),
627 provider: ctx.node_adapter().provider.clone(),
628 payload_builder_handle: ctx.components().payload_builder_handle().clone(),
629 task_executor: ctx.task_executor().clone(),
630 config: ctx.node_config().clone(),
631 data_dir: ctx.data_dir().clone(),
632 add_ons_handle: RpcHandle {
633 rpc_server_handles,
634 rpc_registry,
635 engine_events,
636 beacon_engine_handle,
637 engine_shutdown,
638 },
639 };
640 on_node_started.on_event(FullNode::clone(&full_node))?;
641
642 ctx.spawn_ethstats(engine_events_for_ethstats).await?;
643
644 let handle = NodeHandle {
645 node_exit_future: NodeExitFuture::new(
646 async { rx.await? },
647 full_node.config.debug.terminate,
648 ),
649 node: full_node,
650 };
651
652 Ok(handle)
653 }
654}
655
656impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for ArbEngineLauncher
657where
658 T: FullNodeTypes<
659 Types: NodeTypesForProvider<Payload = ArbEngineTypes, Primitives = ArbPrimitives>,
660 Provider = BlockchainProvider<
661 NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
662 >,
663 >,
664 CB: NodeComponentsBuilder<T> + 'static,
665 AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
666 + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
667 + 'static,
668{
669 type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
670 type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
671
672 fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
673 Box::pin(self.launch_node(target))
674 }
675}