arb_node/
payload.rs

1//! Arbitrum payload service builder.
2//!
3//! Spawns a minimal payload service that handles Subscribe commands
4//! by returning a valid broadcast receiver. Block building is driven
5//! externally by the sequencer via RPC, not by the payload builder.
6
7use futures_util::{ready, StreamExt};
8use reth_node_builder::{
9    components::PayloadServiceBuilder, BuilderContext, FullNodeTypes, NodeTypes,
10};
11use reth_payload_builder::{PayloadBuilderHandle, PayloadServiceCommand};
12use reth_payload_primitives::{PayloadBuilderAttributes, PayloadTypes};
13use reth_transaction_pool::TransactionPool;
14use std::{
15    future::Future,
16    pin::Pin,
17    task::{Context, Poll},
18};
19use tokio::sync::{broadcast, mpsc};
20use tokio_stream::wrappers::UnboundedReceiverStream;
21use tracing::info;
22
23/// Payload builder service that handles Subscribe commands properly.
24///
25/// The noop service drops Subscribe senders, which causes reth's engine
26/// tree to fail with "ChannelClosed". This service keeps a broadcast
27/// channel alive and responds to Subscribe with a valid receiver.
28struct ArbPayloadService<T: PayloadTypes> {
29    command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
30    events_tx: broadcast::Sender<reth_payload_builder_primitives::Events<T>>,
31}
32
33impl<T: PayloadTypes> ArbPayloadService<T> {
34    fn new() -> (Self, PayloadBuilderHandle<T>) {
35        let (service_tx, command_rx) = mpsc::unbounded_channel();
36        let (events_tx, _) = broadcast::channel(16);
37        (
38            Self {
39                command_rx: UnboundedReceiverStream::new(command_rx),
40                events_tx,
41            },
42            PayloadBuilderHandle::new(service_tx),
43        )
44    }
45}
46
47impl<T: PayloadTypes> Future for ArbPayloadService<T> {
48    type Output = ();
49
50    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
51        let this = self.get_mut();
52        loop {
53            let Some(cmd) = ready!(this.command_rx.poll_next_unpin(cx)) else {
54                return Poll::Ready(());
55            };
56            match cmd {
57                PayloadServiceCommand::BuildNewPayload(attr, tx) => {
58                    let id = attr.payload_id();
59                    let _ = tx.send(Ok(id));
60                }
61                PayloadServiceCommand::BestPayload(_, tx) => {
62                    let _ = tx.send(None);
63                }
64                PayloadServiceCommand::PayloadTimestamp(_, tx) => {
65                    let _ = tx.send(None);
66                }
67                PayloadServiceCommand::Resolve(_, _, tx) => {
68                    let _ = tx.send(None);
69                }
70                PayloadServiceCommand::Subscribe(tx) => {
71                    let rx = this.events_tx.subscribe();
72                    let _ = tx.send(rx);
73                }
74            }
75        }
76    }
77}
78
79/// Builder for the Arbitrum payload service.
80///
81/// Spawns a minimal payload builder service. Block building is driven
82/// by the sequencer through RPC calls, not through the payload service.
83#[derive(Debug, Default, Clone, Copy)]
84pub struct ArbPayloadServiceBuilder;
85
86impl<Node, Pool, Evm> PayloadServiceBuilder<Node, Pool, Evm> for ArbPayloadServiceBuilder
87where
88    Node: FullNodeTypes,
89    Pool: TransactionPool + Unpin + 'static,
90    Evm: Send + 'static,
91{
92    async fn spawn_payload_builder_service(
93        self,
94        ctx: &BuilderContext<Node>,
95        _pool: Pool,
96        _evm_config: Evm,
97    ) -> eyre::Result<PayloadBuilderHandle<<Node::Types as NodeTypes>::Payload>> {
98        let (service, handle) = ArbPayloadService::<<Node::Types as NodeTypes>::Payload>::new();
99        ctx.task_executor()
100            .spawn_critical_task("payload builder service", Box::pin(service));
101        info!(target: "reth::cli", "Payload builder service initialized");
102        Ok(handle)
103    }
104}