arb_rpc/
nitro_execution_handler.rs

1//! Implementation of the `nitroexecution` RPC handler.
2//!
3//! Receives messages from the Nitro consensus layer, produces blocks,
4//! and maintains the mapping between message indices and block numbers.
5
6use std::sync::Arc;
7
8use alloy_consensus::BlockHeader;
9use alloy_primitives::B256;
10use alloy_rpc_types_eth::BlockNumberOrTag;
11use jsonrpsee::core::RpcResult;
12use parking_lot::RwLock;
13use reth_provider::{BlockNumReader, BlockReaderIdExt, HeaderProvider};
14use tracing::{debug, info, warn};
15
16use crate::{
17    block_producer::{BlockProducer, BlockProductionInput},
18    nitro_execution::{
19        NitroExecutionApiServer, RpcConsensusSyncData, RpcFinalityData, RpcMaintenanceStatus,
20        RpcMessageResult, RpcMessageWithMetadata, RpcMessageWithMetadataAndBlockInfo,
21    },
22};
23
24/// State shared between the RPC handler and the node.
25#[derive(Debug, Default)]
26pub struct NitroExecutionState {
27    /// Whether the node is synced with consensus.
28    pub synced: bool,
29    /// Maximum message count from consensus.
30    pub max_message_count: u64,
31}
32
33/// Handler for the `nitroexecution` RPC namespace.
34///
35/// Receives L1 incoming messages from the consensus layer and produces blocks.
36/// Delegates actual block production to the `BlockProducer` implementation.
37pub struct NitroExecutionHandler<Provider, BP> {
38    provider: Provider,
39    block_producer: Arc<BP>,
40    state: Arc<RwLock<NitroExecutionState>>,
41    /// Genesis block number (0 for Arbitrum Sepolia, 22207817 for Arbitrum One).
42    genesis_block_num: u64,
43}
44
45impl<Provider, BP> NitroExecutionHandler<Provider, BP> {
46    /// Create a new handler with a block producer.
47    pub fn new(provider: Provider, block_producer: Arc<BP>, genesis_block_num: u64) -> Self {
48        Self {
49            provider,
50            block_producer,
51            state: Arc::new(RwLock::new(NitroExecutionState::default())),
52            genesis_block_num,
53        }
54    }
55
56    /// Convert a message index to a block number.
57    fn message_index_to_block_number(&self, msg_idx: u64) -> u64 {
58        self.genesis_block_num + msg_idx
59    }
60
61    /// Convert a block number to a message index.
62    fn block_number_to_message_index(&self, block_num: u64) -> Option<u64> {
63        if block_num < self.genesis_block_num {
64            return None;
65        }
66        Some(block_num - self.genesis_block_num)
67    }
68}
69
70impl<Provider, BP> NitroExecutionHandler<Provider, BP>
71where
72    Provider: BlockReaderIdExt + HeaderProvider,
73{
74    /// Look up a sealed header by block number.
75    fn get_header(
76        &self,
77        block_num: u64,
78    ) -> Result<
79        Option<reth_primitives_traits::SealedHeader<<Provider as HeaderProvider>::Header>>,
80        String,
81    > {
82        self.provider
83            .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(block_num))
84            .map_err(|e| e.to_string())
85    }
86
87    /// Extract send root from a header's extra_data.
88    fn send_root_from_header(header: &impl BlockHeader) -> B256 {
89        let extra = header.extra_data();
90        if extra.len() >= 32 {
91            B256::from_slice(&extra[..32])
92        } else {
93            B256::ZERO
94        }
95    }
96}
97
98fn internal_error(msg: impl Into<String>) -> jsonrpsee::types::ErrorObjectOwned {
99    jsonrpsee::types::ErrorObject::owned(
100        jsonrpsee::types::error::INTERNAL_ERROR_CODE,
101        msg.into(),
102        None::<()>,
103    )
104}
105
106/// Decode the l2_msg field from the RPC message.
107///
108/// JSON encoding always base64-encodes byte fields. The base64 output
109/// can start with "0x" as valid base64 characters, so always decode as base64.
110fn decode_l2_msg(l2_msg: &Option<String>) -> Result<Vec<u8>, String> {
111    match l2_msg {
112        Some(s) if !s.is_empty() => base64_decode(s).map_err(|e| format!("base64 decode: {e}")),
113        _ => Ok(vec![]),
114    }
115}
116
117/// Simple base64 decoder (standard alphabet with padding).
118fn base64_decode(input: &str) -> Result<Vec<u8>, String> {
119    const ALPHABET: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
120
121    let input = input.trim_end_matches('=');
122    let mut result = Vec::with_capacity(input.len() * 3 / 4);
123    let mut buf: u32 = 0;
124    let mut bits: u32 = 0;
125
126    for &byte in input.as_bytes() {
127        let val = ALPHABET
128            .iter()
129            .position(|&c| c == byte)
130            .ok_or_else(|| format!("invalid base64 character: {}", byte as char))?
131            as u32;
132        buf = (buf << 6) | val;
133        bits += 6;
134        if bits >= 8 {
135            bits -= 8;
136            result.push((buf >> bits) as u8);
137            buf &= (1 << bits) - 1;
138        }
139    }
140
141    Ok(result)
142}
143
144#[async_trait::async_trait]
145impl<Provider, BP> NitroExecutionApiServer for NitroExecutionHandler<Provider, BP>
146where
147    Provider: BlockNumReader + BlockReaderIdExt + HeaderProvider + 'static,
148    BP: BlockProducer,
149{
150    async fn digest_message(
151        &self,
152        msg_idx: u64,
153        message: RpcMessageWithMetadata,
154        _message_for_prefetch: Option<RpcMessageWithMetadata>,
155    ) -> RpcResult<RpcMessageResult> {
156        let block_num = self.message_index_to_block_number(msg_idx);
157        let kind = message.message.header.kind;
158        info!(target: "nitroexecution", msg_idx, block_num, kind, "digestMessage called");
159
160        // Handle init message (Kind=11) — cache params, return genesis block.
161        // The Init message does NOT produce a block. Its params are applied
162        // during the first real block's execution.
163        if kind == 11 {
164            let l2_msg = decode_l2_msg(&message.message.l2_msg).map_err(internal_error)?;
165            self.block_producer
166                .cache_init_message(&l2_msg)
167                .map_err(|e| internal_error(e.to_string()))?;
168
169            // Return the genesis block info.
170            let genesis_header = self
171                .get_header(self.genesis_block_num)
172                .map_err(internal_error)?
173                .ok_or_else(|| internal_error("Genesis block not found for Init message"))?;
174            let send_root = Self::send_root_from_header(genesis_header.header());
175            info!(target: "nitroexecution", "Init message cached, returning genesis block");
176            return Ok(RpcMessageResult {
177                block_hash: genesis_header.hash(),
178                send_root,
179            });
180        }
181
182        // Check if we already have this block (idempotent).
183        if let Some(header) = self.get_header(block_num).map_err(internal_error)? {
184            let send_root = Self::send_root_from_header(header.header());
185            debug!(target: "nitroexecution", block_num, ?send_root, "Block already exists");
186            return Ok(RpcMessageResult {
187                block_hash: header.hash(),
188                send_root,
189            });
190        }
191
192        // Decode the L2 message bytes
193        let l2_msg = decode_l2_msg(&message.message.l2_msg).map_err(internal_error)?;
194
195        // Build batch data stats if present
196        let batch_data_stats = message
197            .message
198            .batch_data_tokens
199            .as_ref()
200            .map(|s| (s.length, s.nonzeros));
201
202        // Build the block production input
203        let input = BlockProductionInput {
204            kind,
205            sender: message.message.header.sender,
206            l1_block_number: message.message.header.block_number,
207            l1_timestamp: message.message.header.timestamp,
208            request_id: message.message.header.request_id,
209            l1_base_fee: message.message.header.base_fee_l1,
210            l2_msg,
211            delayed_messages_read: message.delayed_messages_read,
212            batch_gas_cost: message.message.batch_gas_cost,
213            batch_data_stats,
214        };
215
216        // Delegate to the block producer
217        let result = self
218            .block_producer
219            .produce_block(msg_idx, input)
220            .await
221            .map_err(|e| internal_error(e.to_string()))?;
222
223        Ok(RpcMessageResult {
224            block_hash: result.block_hash,
225            send_root: result.send_root,
226        })
227    }
228
229    async fn reorg(
230        &self,
231        msg_idx_of_first_msg_to_add: u64,
232        _new_messages: Vec<RpcMessageWithMetadataAndBlockInfo>,
233        _old_messages: Vec<RpcMessageWithMetadata>,
234    ) -> RpcResult<Vec<RpcMessageResult>> {
235        warn!(target: "nitroexecution", msg_idx_of_first_msg_to_add, "Reorg not yet implemented");
236        Err(internal_error("Reorg not yet implemented"))
237    }
238
239    async fn head_message_index(&self) -> RpcResult<u64> {
240        let best = self
241            .provider
242            .best_block_number()
243            .map_err(|e| internal_error(e.to_string()))?;
244
245        let msg_idx = self.block_number_to_message_index(best).unwrap_or(0);
246        debug!(target: "nitroexecution", best, msg_idx, "headMessageIndex");
247        Ok(msg_idx)
248    }
249
250    async fn result_at_message_index(&self, msg_idx: u64) -> RpcResult<RpcMessageResult> {
251        let block_num = self.message_index_to_block_number(msg_idx);
252
253        let header = self
254            .get_header(block_num)
255            .map_err(internal_error)?
256            .ok_or_else(|| internal_error(format!("Block {block_num} not found")))?;
257
258        let send_root = Self::send_root_from_header(header.header());
259
260        Ok(RpcMessageResult {
261            block_hash: header.hash(),
262            send_root,
263        })
264    }
265
266    fn set_finality_data(
267        &self,
268        safe: Option<RpcFinalityData>,
269        finalized: Option<RpcFinalityData>,
270        validated: Option<RpcFinalityData>,
271    ) -> RpcResult<()> {
272        debug!(target: "nitroexecution", ?safe, ?finalized, ?validated, "setFinalityData");
273        Ok(())
274    }
275
276    fn set_consensus_sync_data(&self, sync_data: RpcConsensusSyncData) -> RpcResult<()> {
277        let mut state = self.state.write();
278        state.synced = sync_data.synced;
279        state.max_message_count = sync_data.max_message_count;
280        debug!(target: "nitroexecution", synced = sync_data.synced, max = sync_data.max_message_count, "setConsensusSyncData");
281        Ok(())
282    }
283
284    fn mark_feed_start(&self, to: u64) -> RpcResult<()> {
285        debug!(target: "nitroexecution", to, "markFeedStart");
286        Ok(())
287    }
288
289    async fn trigger_maintenance(&self) -> RpcResult<()> {
290        Ok(())
291    }
292
293    async fn should_trigger_maintenance(&self) -> RpcResult<bool> {
294        Ok(false)
295    }
296
297    async fn maintenance_status(&self) -> RpcResult<RpcMaintenanceStatus> {
298        Ok(RpcMaintenanceStatus { is_running: false })
299    }
300
301    async fn arbos_version_for_message_index(&self, msg_idx: u64) -> RpcResult<u64> {
302        let block_num = self.message_index_to_block_number(msg_idx);
303
304        let header = self
305            .get_header(block_num)
306            .map_err(internal_error)?
307            .ok_or_else(|| internal_error(format!("Block {block_num} not found")))?;
308
309        let mix = header.header().mix_hash().unwrap_or_default();
310        let arbos_version = u64::from_be_bytes(mix.0[16..24].try_into().unwrap_or_default());
311
312        Ok(arbos_version)
313    }
314}