arb_node/
launcher.rs

1//! Custom engine node launcher for Arbitrum.
2//!
3//! Extends reth's standard `EngineNodeLauncher` by capturing the engine tree
4//! sender during orchestrator construction. This sender allows the block
5//! producer to inject `InsertExecutedBlock` directly into reth's engine tree
6//! for persistence via `PersistenceService::save_blocks(Full)`.
7//!
8//! This is the reth SDK-native approach: implement `LaunchNode` with custom
9//! orchestrator wiring while reusing all other engine infrastructure.
10
11use crate::engine::{build_arb_engine_orchestrator, TreeSender};
12use alloy_consensus::BlockHeader;
13use futures::{stream::FusedStream, stream_select, FutureExt, StreamExt};
14use reth_chainspec::{EthChainSpec, EthereumHardforks};
15use reth_engine_tree::{
16    chain::{ChainEvent, FromOrchestrator},
17    engine::{EngineApiKind, EngineApiRequest, EngineRequestHandler},
18    tree::TreeConfig,
19};
20use reth_engine_util::EngineMessageStreamExt;
21use reth_exex::ExExManagerHandle;
22use reth_network::{types::BlockRangeUpdate, NetworkSyncUpdater, SyncState};
23use reth_network_api::BlockDownloaderProvider;
24use reth_node_api::{
25    BuiltPayload, ConsensusEngineHandle, FullNodeTypes, NodeTypes, NodeTypesWithDBAdapter,
26};
27use reth_node_builder::{
28    common::{Attached, LaunchContextWith, WithConfigs},
29    hooks::NodeHooks,
30    rpc::{EngineShutdown, EngineValidatorAddOn, EngineValidatorBuilder, RethRpcAddOns, RpcHandle},
31    setup::build_networked_pipeline,
32    AddOns, AddOnsContext, FullNode, LaunchContext, LaunchNode, NodeAdapter,
33    NodeBuilderWithComponents, NodeComponents, NodeComponentsBuilder, NodeHandle, NodeTypesAdapter,
34};
35use reth_node_core::{
36    dirs::{ChainPath, DataDirPath},
37    exit::NodeExitFuture,
38    primitives::Head,
39};
40use reth_node_events::node;
41use reth_provider::{
42    providers::{BlockchainProvider, NodeTypesForProvider},
43    BlockNumReader, StorageSettingsCache,
44};
45use reth_tasks::TaskExecutor;
46use reth_tokio_util::EventSender;
47use reth_tracing::tracing::{debug, error, info};
48use reth_trie_db::ChangesetCache;
49use std::{
50    future::Future,
51    pin::Pin,
52    sync::{Arc, OnceLock},
53};
54use tokio::sync::{mpsc::unbounded_channel, oneshot};
55use tokio_stream::wrappers::UnboundedReceiverStream;
56
57use arb_payload::ArbEngineTypes;
58use arb_primitives::ArbPrimitives;
59
60static TREE_SENDER: OnceLock<TreeSender<ArbEngineTypes, ArbPrimitives>> = OnceLock::new();
61static ENGINE_HANDLE: OnceLock<ConsensusEngineHandle<ArbEngineTypes>> = OnceLock::new();
62static FLUSH_HANDLE: OnceLock<FlushHandle> = OnceLock::new();
63static PARALLEL_STATE_ROOT_FN: OnceLock<ParallelStateRootFn> = OnceLock::new();
64
65/// Request sent to the background persistence thread.
66pub enum PersistenceRequest {
67    Flush(FlushRequest),
68    Unwind {
69        target: u64,
70        done: crossbeam_channel::Sender<Result<(), String>>,
71    },
72}
73
74/// Flush payload: buffered blocks to persist.
75pub struct FlushRequest {
76    pub blocks: Vec<reth_chain_state::ExecutedBlock<ArbPrimitives>>,
77    pub last_num_hash: alloy_eips::BlockNumHash,
78}
79
80/// Result from a completed flush.
81pub struct FlushResult {
82    pub last_num_hash: alloy_eips::BlockNumHash,
83    pub count: usize,
84    pub duration: std::time::Duration,
85}
86
87/// Handle to the background persistence thread.
88struct FlushHandle {
89    sender: std::sync::mpsc::Sender<PersistenceRequest>,
90    result_rx: crossbeam_channel::Receiver<FlushResult>,
91}
92
93/// Type-erased parallel state root function.
94type ParallelStateRootFn = Box<
95    dyn Fn(
96            Arc<reth_trie_common::TrieInputSorted>,
97            reth_trie_common::prefix_set::TriePrefixSets,
98        ) -> Result<(alloy_primitives::B256, reth_trie::updates::TrieUpdates), String>
99        + Send
100        + Sync,
101>;
102
103pub fn tree_sender() -> Option<&'static TreeSender<ArbEngineTypes, ArbPrimitives>> {
104    TREE_SENDER.get()
105}
106
107pub fn engine_handle() -> Option<&'static ConsensusEngineHandle<ArbEngineTypes>> {
108    ENGINE_HANDLE.get()
109}
110
111/// Send blocks to the background persistence thread (non-blocking).
112pub fn start_flush(request: FlushRequest) {
113    if let Some(handle) = FLUSH_HANDLE.get() {
114        if let Err(e) = handle.sender.send(PersistenceRequest::Flush(request)) {
115            error!(target: "reth::cli", "Failed to send flush request: {e}");
116        }
117    }
118}
119
120/// Send an unwind request to the background persistence thread and return a
121/// receiver for the result. Blocks from `target + 1` onward (and their
122/// execution state + trie state) are removed from disk. This runs on the
123/// same thread as flushes to avoid races with in-flight persistence.
124pub fn start_unwind(target: u64) -> Option<crossbeam_channel::Receiver<Result<(), String>>> {
125    let handle = FLUSH_HANDLE.get()?;
126    let (done_tx, done_rx) = crossbeam_channel::bounded(1);
127    if let Err(e) = handle.sender.send(PersistenceRequest::Unwind {
128        target,
129        done: done_tx,
130    }) {
131        error!(target: "reth::cli", "Failed to send unwind request: {e}");
132        return None;
133    }
134    Some(done_rx)
135}
136
137/// Check if a background flush completed (non-blocking).
138pub fn try_flush_result() -> Option<FlushResult> {
139    FLUSH_HANDLE
140        .get()
141        .and_then(|handle| handle.result_rx.try_recv().ok())
142}
143
144pub fn compute_parallel_state_root(
145    overlay: Arc<reth_trie_common::TrieInputSorted>,
146    prefix_sets: reth_trie_common::prefix_set::TriePrefixSets,
147) -> Result<(alloy_primitives::B256, reth_trie::updates::TrieUpdates), String> {
148    let f = PARALLEL_STATE_ROOT_FN
149        .get()
150        .ok_or_else(|| "parallel_state_root not initialized".to_string())?;
151    f(overlay, prefix_sets)
152}
153
154/// Arbitrum engine node launcher.
155///
156/// Identical to reth's `EngineNodeLauncher` but captures the engine tree sender
157/// during orchestrator construction for block injection.
158#[derive(Debug)]
159pub struct ArbEngineLauncher {
160    pub ctx: LaunchContext,
161    pub engine_tree_config: TreeConfig,
162}
163
164impl ArbEngineLauncher {
165    pub const fn new(
166        task_executor: TaskExecutor,
167        data_dir: ChainPath<DataDirPath>,
168        engine_tree_config: TreeConfig,
169    ) -> Self {
170        Self {
171            ctx: LaunchContext::new(task_executor, data_dir),
172            engine_tree_config,
173        }
174    }
175
176    /// Launch the node — mirrors EngineNodeLauncher::launch_node exactly,
177    /// except uses build_arb_engine_orchestrator to capture the tree sender.
178    async fn launch_node<T, CB, AO>(
179        self,
180        target: NodeBuilderWithComponents<T, CB, AO>,
181    ) -> eyre::Result<NodeHandle<NodeAdapter<T, CB::Components>, AO>>
182    where
183        T: FullNodeTypes<
184            Types: NodeTypesForProvider<Payload = ArbEngineTypes, Primitives = ArbPrimitives>,
185            Provider = BlockchainProvider<
186                NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
187            >,
188        >,
189        CB: NodeComponentsBuilder<T>,
190        AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
191            + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>,
192    {
193        let Self {
194            ctx,
195            engine_tree_config,
196        } = self;
197        let NodeBuilderWithComponents {
198            adapter: NodeTypesAdapter { database },
199            components_builder,
200            add_ons:
201                AddOns {
202                    hooks,
203                    exexs: installed_exex,
204                    add_ons,
205                },
206            config,
207        } = target;
208        let NodeHooks {
209            on_component_initialized,
210            on_node_started,
211            ..
212        } = hooks;
213
214        let changeset_cache = ChangesetCache::new();
215
216        let ctx = ctx
217            .with_configured_globals(engine_tree_config.reserved_cpu_cores())
218            .with_loaded_toml_config(config)?
219            .with_resolved_peers()?
220            .attach(database.clone())
221            .with_adjusted_configs()
222            .with_provider_factory::<_, <CB::Components as NodeComponents<T>>::Evm>(
223                changeset_cache.clone(),
224            )
225            .await?
226            .inspect(|_| {
227                info!(target: "reth::cli", "Database opened");
228            })
229            .with_prometheus_server()
230            .await?
231            .inspect(|this| {
232                debug!(target: "reth::cli", chain=%this.chain_id(), genesis=?this.genesis_hash(), "Initializing genesis");
233            })
234            .with_genesis()?
235            .inspect(
236                |this: &LaunchContextWith<
237                    Attached<WithConfigs<<T::Types as NodeTypes>::ChainSpec>, _>,
238                >| {
239                    info!(target: "reth::cli", "\n{}", this.chain_spec().display_hardforks());
240                    let settings = this.provider_factory().cached_storage_settings();
241                    info!(target: "reth::cli", ?settings, "Loaded storage settings");
242                },
243            )
244            .with_metrics_task()
245            .with_blockchain_db::<T, _>(move |provider_factory| {
246                Ok(BlockchainProvider::new(provider_factory)?)
247            })?
248            .with_components(components_builder, on_component_initialized)
249            .await?;
250
251        let maybe_exex_manager_handle = ctx.launch_exex(installed_exex).await?;
252
253        let network_handle = ctx.components().network().clone();
254        let network_client = network_handle.fetch_client().await?;
255        let (consensus_engine_tx, consensus_engine_rx) = unbounded_channel();
256
257        let node_config = ctx.node_config();
258
259        network_handle.update_sync_state(SyncState::Syncing);
260
261        let max_block = ctx.max_block(network_client.clone()).await?;
262
263        let static_file_producer = ctx.static_file_producer();
264        let static_file_producer_events = static_file_producer.lock().events();
265        info!(target: "reth::cli", "StaticFileProducer initialized");
266
267        let consensus = Arc::new(ctx.components().consensus().clone());
268
269        let pipeline = build_networked_pipeline(
270            &ctx.toml_config().stages,
271            network_client.clone(),
272            consensus.clone(),
273            ctx.provider_factory().clone(),
274            ctx.task_executor(),
275            ctx.sync_metrics_tx(),
276            ctx.prune_config(),
277            max_block,
278            static_file_producer,
279            ctx.components().evm_config().clone(),
280            maybe_exex_manager_handle
281                .clone()
282                .unwrap_or_else(ExExManagerHandle::empty),
283            ctx.era_import_source(),
284        )?;
285
286        pipeline.move_to_static_files()?;
287
288        let pipeline_events = pipeline.events();
289
290        let mut pruner_builder = ctx.pruner_builder();
291        if let Some(exex_manager_handle) = &maybe_exex_manager_handle {
292            pruner_builder =
293                pruner_builder.finished_exex_height(exex_manager_handle.finished_height());
294        }
295        let pruner = pruner_builder.build_with_provider_factory(ctx.provider_factory().clone());
296        let pruner_events = pruner.events();
297        info!(target: "reth::cli", prune_config=?ctx.prune_config(), "Pruner initialized");
298
299        let event_sender = EventSender::default();
300
301        let beacon_engine_handle = ConsensusEngineHandle::new(consensus_engine_tx.clone());
302
303        let jwt_secret = ctx.auth_jwt_secret()?;
304
305        let add_ons_ctx = AddOnsContext {
306            node: ctx.node_adapter().clone(),
307            config: ctx.node_config(),
308            beacon_engine_handle: beacon_engine_handle.clone(),
309            jwt_secret,
310            engine_events: event_sender.clone(),
311        };
312        let validator_builder = add_ons.engine_validator_builder();
313
314        let engine_validator = validator_builder
315            .clone()
316            .build_tree_validator(
317                &add_ons_ctx,
318                engine_tree_config.clone(),
319                changeset_cache.clone(),
320            )
321            .await?;
322
323        let consensus_engine_stream = UnboundedReceiverStream::from(consensus_engine_rx)
324            .maybe_skip_fcu(node_config.debug.skip_fcu)
325            .maybe_skip_new_payload(node_config.debug.skip_new_payload)
326            .maybe_reorg(
327                ctx.blockchain_db().clone(),
328                ctx.components().evm_config().clone(),
329                || async {
330                    let reorg_cache = ChangesetCache::new();
331                    validator_builder
332                        .build_tree_validator(&add_ons_ctx, engine_tree_config.clone(), reorg_cache)
333                        .await
334                },
335                node_config.debug.reorg_frequency,
336                node_config.debug.reorg_depth,
337            )
338            .await?
339            .maybe_store_messages(node_config.debug.engine_api_store.clone());
340
341        let engine_kind = if ctx.chain_spec().is_optimism() {
342            EngineApiKind::OpStack
343        } else {
344            EngineApiKind::Ethereum
345        };
346
347        // Spawn background persistence thread (like reth's PersistenceHandle).
348        // Handles flush and unwind requests serially — same thread guarantees
349        // no races between saving new blocks and rolling back.
350        {
351            use reth_provider::{DatabaseProviderFactory, SaveBlocksMode};
352            use reth_storage_api::{BlockExecutionWriter, DBProvider};
353
354            let pf = ctx.provider_factory().clone();
355            let (req_tx, req_rx) = std::sync::mpsc::channel::<PersistenceRequest>();
356            let (res_tx, res_rx) = crossbeam_channel::bounded::<FlushResult>(1);
357
358            std::thread::Builder::new()
359                .name("arb-persistence".into())
360                .spawn(move || {
361                    while let Ok(req) = req_rx.recv() {
362                        match req {
363                            PersistenceRequest::Flush(flush) => {
364                                let start = std::time::Instant::now();
365                                let count = flush.blocks.len();
366                                let last = flush.last_num_hash;
367
368                                let result = (|| -> Result<(), String> {
369                                    let provider_rw = pf
370                                        .database_provider_rw()
371                                        .map_err(|e| format!("database_provider_rw: {e}"))?;
372                                    provider_rw
373                                        .save_blocks(flush.blocks, SaveBlocksMode::Full)
374                                        .map_err(|e| format!("save_blocks: {e}"))?;
375                                    provider_rw.commit().map_err(|e| format!("commit: {e}"))?;
376                                    Ok(())
377                                })();
378
379                                match result {
380                                    Ok(()) => {
381                                        let _ = res_tx.send(FlushResult {
382                                            last_num_hash: last,
383                                            count,
384                                            duration: start.elapsed(),
385                                        });
386                                    }
387                                    Err(e) => {
388                                        error!(target: "reth::cli", "Background flush failed: {e}");
389                                        let _ = res_tx.send(FlushResult {
390                                            last_num_hash: last,
391                                            count: 0, // signal failure
392                                            duration: start.elapsed(),
393                                        });
394                                    }
395                                }
396                            }
397                            PersistenceRequest::Unwind { target, done } => {
398                                let start = std::time::Instant::now();
399                                let result = (|| -> Result<(), String> {
400                                    let provider_rw = pf
401                                        .database_provider_rw()
402                                        .map_err(|e| format!("database_provider_rw: {e}"))?;
403                                    provider_rw
404                                        .remove_block_and_execution_above(target)
405                                        .map_err(|e| {
406                                            format!("remove_block_and_execution_above: {e}")
407                                        })?;
408                                    provider_rw.commit().map_err(|e| format!("commit: {e}"))?;
409                                    Ok(())
410                                })();
411                                match &result {
412                                    Ok(()) => info!(
413                                        target: "reth::cli",
414                                        target,
415                                        duration_ms = start.elapsed().as_millis(),
416                                        "Persisted unwind complete"
417                                    ),
418                                    Err(e) => error!(
419                                        target: "reth::cli",
420                                        target,
421                                        err = %e,
422                                        "Persisted unwind failed"
423                                    ),
424                                }
425                                let _ = done.send(result);
426                            }
427                        }
428                    }
429                })
430                .expect("failed to spawn persistence thread");
431
432            let _ = FLUSH_HANDLE.set(FlushHandle {
433                sender: req_tx,
434                result_rx: res_rx,
435            });
436        }
437
438        {
439            use reth_chain_state::{
440                AnchoredTrieInput, ComputedTrieData, DeferredTrieData, LazyOverlay,
441            };
442            use reth_provider::providers::OverlayStateProviderFactory;
443            use reth_trie_parallel::root::ParallelStateRoot;
444
445            let pf = ctx.provider_factory().clone();
446            let runtime = ctx.task_executor().clone();
447            let changeset_cache_for_root = changeset_cache.clone();
448
449            let state_root_fn: ParallelStateRootFn = Box::new(move |overlay, prefix_sets| {
450                let anchor_hash = alloy_primitives::B256::ZERO;
451                let computed = ComputedTrieData {
452                    hashed_state: Arc::clone(&overlay.state),
453                    trie_updates: Arc::clone(&overlay.nodes),
454                    anchored_trie_input: Some(AnchoredTrieInput {
455                        anchor_hash,
456                        trie_input: overlay,
457                    }),
458                };
459                let lazy = LazyOverlay::new(anchor_hash, vec![DeferredTrieData::ready(computed)]);
460                let factory =
461                    OverlayStateProviderFactory::new(pf.clone(), changeset_cache_for_root.clone())
462                        .with_lazy_overlay(Some(lazy));
463
464                ParallelStateRoot::new(factory, prefix_sets, runtime.clone())
465                    .incremental_root_with_updates()
466                    .map_err(|e| format!("parallel state root: {e}"))
467            });
468            let _ = PARALLEL_STATE_ROOT_FN.set(state_root_fn);
469        }
470
471        let (mut orchestrator, arb_tree_sender) = build_arb_engine_orchestrator(
472            engine_kind,
473            consensus.clone(),
474            network_client.clone(),
475            Box::pin(consensus_engine_stream),
476            pipeline,
477            ctx.task_executor().clone(),
478            ctx.provider_factory().clone(),
479            ctx.blockchain_db().clone(),
480            pruner,
481            ctx.components().payload_builder_handle().clone(),
482            engine_validator,
483            engine_tree_config,
484            ctx.sync_metrics_tx(),
485            ctx.components().evm_config().clone(),
486            changeset_cache,
487        );
488
489        let _ = TREE_SENDER.set(arb_tree_sender);
490        let _ = ENGINE_HANDLE.set(beacon_engine_handle.clone());
491        info!(target: "reth::cli", "Arbitrum engine tree sender and handle captured");
492
493        info!(target: "reth::cli", "Consensus engine initialized");
494
495        #[allow(clippy::needless_continue)]
496        let events = stream_select!(
497            event_sender.new_listener().map(Into::into),
498            pipeline_events.map(Into::into),
499            ctx.consensus_layer_events(),
500            pruner_events.map(Into::into),
501            static_file_producer_events.map(Into::into),
502        );
503
504        ctx.task_executor().spawn_critical_task(
505            "events task",
506            Box::pin(node::handle_events(
507                Some(Box::new(ctx.components().network().clone())),
508                Some(ctx.head().number),
509                events,
510            )),
511        );
512
513        let RpcHandle {
514            rpc_server_handles,
515            rpc_registry,
516            engine_events,
517            beacon_engine_handle,
518            engine_shutdown: _,
519        } = add_ons.launch_add_ons(add_ons_ctx).await?;
520
521        let (engine_shutdown, shutdown_rx) = EngineShutdown::new();
522
523        let initial_target = ctx.initial_backfill_target()?;
524        let mut built_payloads = ctx
525            .components()
526            .payload_builder_handle()
527            .subscribe()
528            .await
529            .map_err(|e| eyre::eyre!("Failed to subscribe to payload builder events: {:?}", e))?
530            .into_built_payload_stream()
531            .fuse();
532
533        let chainspec = ctx.chain_spec();
534        let provider = ctx.blockchain_db().clone();
535        let (exit, rx) = oneshot::channel();
536        let terminate_after_backfill = ctx.terminate_after_initial_backfill();
537        let startup_sync_state_idle = ctx.node_config().debug.startup_sync_state_idle;
538
539        info!(target: "reth::cli", "Starting consensus engine");
540        let consensus_engine = async move {
541            if let Some(initial_target) = initial_target {
542                debug!(target: "reth::cli", %initial_target, "start backfill sync");
543                orchestrator.start_backfill_sync(initial_target);
544            } else if startup_sync_state_idle {
545                network_handle.update_sync_state(SyncState::Idle);
546            }
547
548            let mut res = Ok(());
549            let mut shutdown_rx = shutdown_rx.fuse();
550
551            loop {
552                tokio::select! {
553                    event = orchestrator.next() => {
554                        let Some(event) = event else { break };
555                        debug!(target: "reth::cli", "Event: {event}");
556                        match event {
557                            ChainEvent::BackfillSyncFinished => {
558                                if terminate_after_backfill {
559                                    debug!(target: "reth::cli", "Terminating after initial backfill");
560                                    break
561                                }
562                                if startup_sync_state_idle {
563                                    network_handle.update_sync_state(SyncState::Idle);
564                                }
565                            }
566                            ChainEvent::BackfillSyncStarted => {
567                                network_handle.update_sync_state(SyncState::Syncing);
568                            }
569                            ChainEvent::FatalError => {
570                                error!(target: "reth::cli", "Fatal error in consensus engine");
571                                res = Err(eyre::eyre!("Fatal error in consensus engine"));
572                                break
573                            }
574                            ChainEvent::Handler(ev) => {
575                                if let Some(head) = ev.canonical_header() {
576                                    network_handle.update_sync_state(SyncState::Idle);
577                                    let head_block = Head {
578                                        number: head.number(),
579                                        hash: head.hash(),
580                                        difficulty: head.difficulty(),
581                                        timestamp: head.timestamp(),
582                                        total_difficulty: chainspec.final_paris_total_difficulty()
583                                            .filter(|_| chainspec.is_paris_active_at_block(head.number()))
584                                            .unwrap_or_default(),
585                                    };
586                                    network_handle.update_status(head_block);
587
588                                    let updated = BlockRangeUpdate {
589                                        earliest: provider.earliest_block_number().unwrap_or_default(),
590                                        latest: head.number(),
591                                        latest_hash: head.hash(),
592                                    };
593                                    network_handle.update_block_range(updated);
594                                }
595                                event_sender.notify(ev);
596                            }
597                        }
598                    }
599                    payload = built_payloads.select_next_some(), if !built_payloads.is_terminated() => {
600                        if let Some(executed_block) = payload.executed_block() {
601                            debug!(target: "reth::cli", block=?executed_block.recovered_block.num_hash(), "inserting built payload");
602                            orchestrator.handler_mut().handler_mut().on_event(EngineApiRequest::InsertExecutedBlock(executed_block.into_executed_payload()).into());
603                        }
604                    }
605                    shutdown_req = &mut shutdown_rx => {
606                        if let Ok(req) = shutdown_req {
607                            debug!(target: "reth::cli", "received engine shutdown request");
608                            orchestrator.handler_mut().handler_mut().on_event(
609                                FromOrchestrator::Terminate { tx: req.done_tx }.into()
610                            );
611                        }
612                    }
613                }
614            }
615
616            let _ = exit.send(res);
617        };
618        ctx.task_executor()
619            .spawn_critical_task("consensus engine", Box::pin(consensus_engine));
620
621        let engine_events_for_ethstats = engine_events.new_listener();
622
623        let full_node = FullNode {
624            evm_config: ctx.components().evm_config().clone(),
625            pool: ctx.components().pool().clone(),
626            network: ctx.components().network().clone(),
627            provider: ctx.node_adapter().provider.clone(),
628            payload_builder_handle: ctx.components().payload_builder_handle().clone(),
629            task_executor: ctx.task_executor().clone(),
630            config: ctx.node_config().clone(),
631            data_dir: ctx.data_dir().clone(),
632            add_ons_handle: RpcHandle {
633                rpc_server_handles,
634                rpc_registry,
635                engine_events,
636                beacon_engine_handle,
637                engine_shutdown,
638            },
639        };
640        on_node_started.on_event(FullNode::clone(&full_node))?;
641
642        ctx.spawn_ethstats(engine_events_for_ethstats).await?;
643
644        let handle = NodeHandle {
645            node_exit_future: NodeExitFuture::new(
646                async { rx.await? },
647                full_node.config.debug.terminate,
648            ),
649            node: full_node,
650        };
651
652        Ok(handle)
653    }
654}
655
656impl<T, CB, AO> LaunchNode<NodeBuilderWithComponents<T, CB, AO>> for ArbEngineLauncher
657where
658    T: FullNodeTypes<
659        Types: NodeTypesForProvider<Payload = ArbEngineTypes, Primitives = ArbPrimitives>,
660        Provider = BlockchainProvider<
661            NodeTypesWithDBAdapter<<T as FullNodeTypes>::Types, <T as FullNodeTypes>::DB>,
662        >,
663    >,
664    CB: NodeComponentsBuilder<T> + 'static,
665    AO: RethRpcAddOns<NodeAdapter<T, CB::Components>>
666        + EngineValidatorAddOn<NodeAdapter<T, CB::Components>>
667        + 'static,
668{
669    type Node = NodeHandle<NodeAdapter<T, CB::Components>, AO>;
670    type Future = Pin<Box<dyn Future<Output = eyre::Result<Self::Node>> + Send>>;
671
672    fn launch_node(self, target: NodeBuilderWithComponents<T, CB, AO>) -> Self::Future {
673        Box::pin(self.launch_node(target))
674    }
675}