arb_node/
engine.rs

1//! Custom engine orchestrator builder that exposes the tree sender.
2//!
3//! This is a thin wrapper around reth's `build_engine_orchestrator` pattern
4//! that also returns a clone of the tree sender channel, allowing our
5//! block producer to send `InsertExecutedBlock` and `ForkchoiceUpdated`
6//! directly to the engine tree for persistence.
7
8use crossbeam_channel::Sender;
9use futures::Stream;
10use reth_consensus::FullConsensus;
11use reth_engine_primitives::BeaconEngineMessage;
12use reth_engine_tree::{
13    backfill::PipelineSync,
14    chain::ChainOrchestrator,
15    download::BasicBlockDownloader,
16    engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler, FromEngine},
17    persistence::PersistenceHandle,
18    tree::{EngineApiTreeHandler, EngineValidator, TreeConfig, WaitForCaches},
19};
20use reth_evm::ConfigureEvm;
21use reth_network_p2p::BlockClient;
22use reth_payload_builder::PayloadBuilderHandle;
23use reth_primitives_traits::NodePrimitives;
24use reth_provider::{
25    providers::{BlockchainProvider, ProviderNodeTypes},
26    ProviderFactory, StorageSettingsCache,
27};
28use reth_prune::PrunerWithFactory;
29use reth_stages_api::{MetricEventsSender, Pipeline};
30use reth_tasks::Runtime;
31use reth_trie_db::ChangesetCache;
32use std::sync::Arc;
33
34/// The sender type for injecting blocks and FCU into the engine tree.
35pub type TreeSender<T, N> =
36    Sender<FromEngine<EngineApiRequest<T, N>, <N as NodePrimitives>::Block>>;
37
38/// Builds the engine orchestrator AND returns a clone of the tree sender.
39///
40/// This is identical to reth's `build_engine_orchestrator` but clones
41/// `to_tree_tx` before passing it to the request handler, allowing
42/// external code (our block producer) to send `InsertExecutedBlock`
43/// and `ForkchoiceUpdated` directly.
44#[expect(clippy::too_many_arguments, clippy::type_complexity)]
45pub fn build_arb_engine_orchestrator<N, Client, S, V, C>(
46    engine_kind: EngineApiKind,
47    consensus: Arc<dyn FullConsensus<N::Primitives>>,
48    client: Client,
49    incoming_requests: S,
50    pipeline: Pipeline<N>,
51    pipeline_task_spawner: Runtime,
52    provider: ProviderFactory<N>,
53    blockchain_db: BlockchainProvider<N>,
54    pruner: PrunerWithFactory<ProviderFactory<N>>,
55    payload_builder: PayloadBuilderHandle<N::Payload>,
56    payload_validator: V,
57    tree_config: TreeConfig,
58    sync_metrics_tx: MetricEventsSender,
59    evm_config: C,
60    changeset_cache: ChangesetCache,
61) -> (
62    ChainOrchestrator<
63        EngineHandler<
64            EngineApiRequestHandler<EngineApiRequest<N::Payload, N::Primitives>, N::Primitives>,
65            S,
66            BasicBlockDownloader<Client, <N::Primitives as NodePrimitives>::Block>,
67        >,
68        PipelineSync<N>,
69    >,
70    TreeSender<N::Payload, N::Primitives>,
71)
72where
73    N: ProviderNodeTypes,
74    Client: BlockClient<Block = <N::Primitives as NodePrimitives>::Block> + 'static,
75    S: Stream<Item = BeaconEngineMessage<N::Payload>> + Send + Sync + Unpin + 'static,
76    V: EngineValidator<N::Payload> + WaitForCaches,
77    C: ConfigureEvm<Primitives = N::Primitives> + 'static,
78{
79    let downloader = BasicBlockDownloader::new(client, consensus.clone());
80    let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
81
82    let persistence_handle =
83        PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
84
85    let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
86
87    let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
88        blockchain_db,
89        consensus,
90        payload_validator,
91        persistence_handle,
92        payload_builder,
93        canonical_in_memory_state,
94        tree_config,
95        engine_kind,
96        evm_config,
97        changeset_cache,
98        use_hashed_state,
99    );
100
101    // Clone the tree sender BEFORE it's consumed by the request handler.
102    // This allows our block producer to inject ExecutedBlocks directly.
103    let tree_sender = to_tree_tx.clone();
104
105    let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
106    let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
107
108    let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
109
110    (ChainOrchestrator::new(handler, backfill_sync), tree_sender)
111}