1use futures_util::{ready, StreamExt};
8use reth_node_builder::{
9 components::PayloadServiceBuilder, BuilderContext, FullNodeTypes, NodeTypes,
10};
11use reth_payload_builder::{PayloadBuilderHandle, PayloadServiceCommand};
12use reth_payload_primitives::{PayloadBuilderAttributes, PayloadTypes};
13use reth_transaction_pool::TransactionPool;
14use std::{
15 future::Future,
16 pin::Pin,
17 task::{Context, Poll},
18};
19use tokio::sync::{broadcast, mpsc};
20use tokio_stream::wrappers::UnboundedReceiverStream;
21use tracing::info;
22
23struct ArbPayloadService<T: PayloadTypes> {
29 command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
30 events_tx: broadcast::Sender<reth_payload_builder_primitives::Events<T>>,
31}
32
33impl<T: PayloadTypes> ArbPayloadService<T> {
34 fn new() -> (Self, PayloadBuilderHandle<T>) {
35 let (service_tx, command_rx) = mpsc::unbounded_channel();
36 let (events_tx, _) = broadcast::channel(16);
37 (
38 Self {
39 command_rx: UnboundedReceiverStream::new(command_rx),
40 events_tx,
41 },
42 PayloadBuilderHandle::new(service_tx),
43 )
44 }
45}
46
47impl<T: PayloadTypes> Future for ArbPayloadService<T> {
48 type Output = ();
49
50 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51 let this = self.get_mut();
52 loop {
53 let Some(cmd) = ready!(this.command_rx.poll_next_unpin(cx)) else {
54 return Poll::Ready(());
55 };
56 match cmd {
57 PayloadServiceCommand::BuildNewPayload(attr, tx) => {
58 let id = attr.payload_id();
59 let _ = tx.send(Ok(id));
60 }
61 PayloadServiceCommand::BestPayload(_, tx) => {
62 let _ = tx.send(None);
63 }
64 PayloadServiceCommand::PayloadTimestamp(_, tx) => {
65 let _ = tx.send(None);
66 }
67 PayloadServiceCommand::Resolve(_, _, tx) => {
68 let _ = tx.send(None);
69 }
70 PayloadServiceCommand::Subscribe(tx) => {
71 let rx = this.events_tx.subscribe();
72 let _ = tx.send(rx);
73 }
74 }
75 }
76 }
77}
78
79#[derive(Debug, Default, Clone, Copy)]
84pub struct ArbPayloadServiceBuilder;
85
86impl<Node, Pool, Evm> PayloadServiceBuilder<Node, Pool, Evm> for ArbPayloadServiceBuilder
87where
88 Node: FullNodeTypes,
89 Pool: TransactionPool + Unpin + 'static,
90 Evm: Send + 'static,
91{
92 async fn spawn_payload_builder_service(
93 self,
94 ctx: &BuilderContext<Node>,
95 _pool: Pool,
96 _evm_config: Evm,
97 ) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
98 let (service, handle) = ArbPayloadService::<<Node::Types as NodeTypes>::Payload>::new();
99 ctx.task_executor()
100 .spawn_critical_task("payload builder service", Box::pin(service));
101 info!(target: "reth::cli", "Payload builder service initialized");
102 Ok(handle)
103 }
104}