1use crossbeam_channel::Sender;
9use futures::Stream;
10use reth_consensus::FullConsensus;
11use reth_engine_primitives::BeaconEngineMessage;
12use reth_engine_tree::{
13 backfill::PipelineSync,
14 chain::ChainOrchestrator,
15 download::BasicBlockDownloader,
16 engine::{EngineApiKind, EngineApiRequest, EngineApiRequestHandler, EngineHandler, FromEngine},
17 persistence::PersistenceHandle,
18 tree::{EngineApiTreeHandler, EngineValidator, TreeConfig, WaitForCaches},
19};
20use reth_evm::ConfigureEvm;
21use reth_network_p2p::BlockClient;
22use reth_payload_builder::PayloadBuilderHandle;
23use reth_primitives_traits::NodePrimitives;
24use reth_provider::{
25 providers::{BlockchainProvider, ProviderNodeTypes},
26 ProviderFactory, StorageSettingsCache,
27};
28use reth_prune::PrunerWithFactory;
29use reth_stages_api::{MetricEventsSender, Pipeline};
30use reth_tasks::Runtime;
31use reth_trie_db::ChangesetCache;
32use std::sync::Arc;
33
34pub type TreeSender<T, N> =
36 Sender<FromEngine<EngineApiRequest<T, N>, <N as NodePrimitives>::Block>>;
37
38#[expect(clippy::too_many_arguments, clippy::type_complexity)]
45pub fn build_arb_engine_orchestrator<N, Client, S, V, C>(
46 engine_kind: EngineApiKind,
47 consensus: Arc<dyn FullConsensus<N::Primitives>>,
48 client: Client,
49 incoming_requests: S,
50 pipeline: Pipeline<N>,
51 pipeline_task_spawner: Runtime,
52 provider: ProviderFactory<N>,
53 blockchain_db: BlockchainProvider<N>,
54 pruner: PrunerWithFactory<ProviderFactory<N>>,
55 payload_builder: PayloadBuilderHandle<N::Payload>,
56 payload_validator: V,
57 tree_config: TreeConfig,
58 sync_metrics_tx: MetricEventsSender,
59 evm_config: C,
60 changeset_cache: ChangesetCache,
61) -> (
62 ChainOrchestrator<
63 EngineHandler<
64 EngineApiRequestHandler<EngineApiRequest<N::Payload, N::Primitives>, N::Primitives>,
65 S,
66 BasicBlockDownloader<Client, <N::Primitives as NodePrimitives>::Block>,
67 >,
68 PipelineSync<N>,
69 >,
70 TreeSender<N::Payload, N::Primitives>,
71)
72where
73 N: ProviderNodeTypes,
74 Client: BlockClient<Block = <N::Primitives as NodePrimitives>::Block> + 'static,
75 S: Stream<Item = BeaconEngineMessage<N::Payload>> + Send + Sync + Unpin + 'static,
76 V: EngineValidator<N::Payload> + WaitForCaches,
77 C: ConfigureEvm<Primitives = N::Primitives> + 'static,
78{
79 let downloader = BasicBlockDownloader::new(client, consensus.clone());
80 let use_hashed_state = provider.cached_storage_settings().use_hashed_state();
81
82 let persistence_handle =
83 PersistenceHandle::<N::Primitives>::spawn_service(provider, pruner, sync_metrics_tx);
84
85 let canonical_in_memory_state = blockchain_db.canonical_in_memory_state();
86
87 let (to_tree_tx, from_tree) = EngineApiTreeHandler::spawn_new(
88 blockchain_db,
89 consensus,
90 payload_validator,
91 persistence_handle,
92 payload_builder,
93 canonical_in_memory_state,
94 tree_config,
95 engine_kind,
96 evm_config,
97 changeset_cache,
98 use_hashed_state,
99 );
100
101 let tree_sender = to_tree_tx.clone();
104
105 let engine_handler = EngineApiRequestHandler::new(to_tree_tx, from_tree);
106 let handler = EngineHandler::new(engine_handler, downloader, incoming_requests);
107
108 let backfill_sync = PipelineSync::new(pipeline, pipeline_task_spawner);
109
110 (ChainOrchestrator::new(handler, backfill_sync), tree_sender)
111}