1use std::sync::Arc;
7
8use alloy_consensus::BlockHeader;
9use alloy_primitives::B256;
10use alloy_rpc_types_eth::BlockNumberOrTag;
11use jsonrpsee::core::RpcResult;
12use parking_lot::RwLock;
13use reth_provider::{BlockNumReader, BlockReaderIdExt, HeaderProvider};
14use tracing::{debug, info, warn};
15
16use crate::{
17 block_producer::{BlockProducer, BlockProductionInput},
18 nitro_execution::{
19 NitroExecutionApiServer, RpcConsensusSyncData, RpcFinalityData, RpcMaintenanceStatus,
20 RpcMessageResult, RpcMessageWithMetadata, RpcMessageWithMetadataAndBlockInfo,
21 },
22};
23
24#[derive(Debug, Default)]
26pub struct NitroExecutionState {
27 pub synced: bool,
29 pub max_message_count: u64,
31}
32
33pub struct NitroExecutionHandler<Provider, BP> {
38 provider: Provider,
39 block_producer: Arc<BP>,
40 state: Arc<RwLock<NitroExecutionState>>,
41 genesis_block_num: u64,
43}
44
45impl<Provider, BP> NitroExecutionHandler<Provider, BP> {
46 pub fn new(provider: Provider, block_producer: Arc<BP>, genesis_block_num: u64) -> Self {
48 Self {
49 provider,
50 block_producer,
51 state: Arc::new(RwLock::new(NitroExecutionState::default())),
52 genesis_block_num,
53 }
54 }
55
56 fn message_index_to_block_number(&self, msg_idx: u64) -> u64 {
58 self.genesis_block_num + msg_idx
59 }
60
61 fn block_number_to_message_index(&self, block_num: u64) -> Option<u64> {
63 if block_num < self.genesis_block_num {
64 return None;
65 }
66 Some(block_num - self.genesis_block_num)
67 }
68}
69
70impl<Provider, BP> NitroExecutionHandler<Provider, BP>
71where
72 Provider: BlockReaderIdExt + HeaderProvider,
73{
74 fn get_header(
76 &self,
77 block_num: u64,
78 ) -> Result<
79 Option<reth_primitives_traits::SealedHeader<<Provider as HeaderProvider>::Header>>,
80 String,
81 > {
82 self.provider
83 .sealed_header_by_number_or_tag(BlockNumberOrTag::Number(block_num))
84 .map_err(|e| e.to_string())
85 }
86
87 fn send_root_from_header(header: &impl BlockHeader) -> B256 {
89 let extra = header.extra_data();
90 if extra.len() >= 32 {
91 B256::from_slice(&extra[..32])
92 } else {
93 B256::ZERO
94 }
95 }
96}
97
98fn internal_error(msg: impl Into<String>) -> jsonrpsee::types::ErrorObjectOwned {
99 jsonrpsee::types::ErrorObject::owned(
100 jsonrpsee::types::error::INTERNAL_ERROR_CODE,
101 msg.into(),
102 None::<()>,
103 )
104}
105
106fn decode_l2_msg(l2_msg: &Option<String>) -> Result<Vec<u8>, String> {
111 match l2_msg {
112 Some(s) if !s.is_empty() => base64_decode(s).map_err(|e| format!("base64 decode: {e}")),
113 _ => Ok(vec![]),
114 }
115}
116
117fn base64_decode(input: &str) -> Result<Vec<u8>, String> {
119 const ALPHABET: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
120
121 let input = input.trim_end_matches('=');
122 let mut result = Vec::with_capacity(input.len() * 3 / 4);
123 let mut buf: u32 = 0;
124 let mut bits: u32 = 0;
125
126 for &byte in input.as_bytes() {
127 let val = ALPHABET
128 .iter()
129 .position(|&c| c == byte)
130 .ok_or_else(|| format!("invalid base64 character: {}", byte as char))?
131 as u32;
132 buf = (buf << 6) | val;
133 bits += 6;
134 if bits >= 8 {
135 bits -= 8;
136 result.push((buf >> bits) as u8);
137 buf &= (1 << bits) - 1;
138 }
139 }
140
141 Ok(result)
142}
143
144#[async_trait::async_trait]
145impl<Provider, BP> NitroExecutionApiServer for NitroExecutionHandler<Provider, BP>
146where
147 Provider: BlockNumReader + BlockReaderIdExt + HeaderProvider + 'static,
148 BP: BlockProducer,
149{
150 async fn digest_message(
151 &self,
152 msg_idx: u64,
153 message: RpcMessageWithMetadata,
154 _message_for_prefetch: Option<RpcMessageWithMetadata>,
155 ) -> RpcResult<RpcMessageResult> {
156 let block_num = self.message_index_to_block_number(msg_idx);
157 let kind = message.message.header.kind;
158 info!(target: "nitroexecution", msg_idx, block_num, kind, "digestMessage called");
159
160 if kind == 11 {
164 let l2_msg = decode_l2_msg(&message.message.l2_msg).map_err(internal_error)?;
165 self.block_producer
166 .cache_init_message(&l2_msg)
167 .map_err(|e| internal_error(e.to_string()))?;
168
169 let genesis_header = self
171 .get_header(self.genesis_block_num)
172 .map_err(internal_error)?
173 .ok_or_else(|| internal_error("Genesis block not found for Init message"))?;
174 let send_root = Self::send_root_from_header(genesis_header.header());
175 info!(target: "nitroexecution", "Init message cached, returning genesis block");
176 return Ok(RpcMessageResult {
177 block_hash: genesis_header.hash(),
178 send_root,
179 });
180 }
181
182 if let Some(header) = self.get_header(block_num).map_err(internal_error)? {
184 let send_root = Self::send_root_from_header(header.header());
185 debug!(target: "nitroexecution", block_num, ?send_root, "Block already exists");
186 return Ok(RpcMessageResult {
187 block_hash: header.hash(),
188 send_root,
189 });
190 }
191
192 let l2_msg = decode_l2_msg(&message.message.l2_msg).map_err(internal_error)?;
194
195 let batch_data_stats = message
197 .message
198 .batch_data_tokens
199 .as_ref()
200 .map(|s| (s.length, s.nonzeros));
201
202 let input = BlockProductionInput {
204 kind,
205 sender: message.message.header.sender,
206 l1_block_number: message.message.header.block_number,
207 l1_timestamp: message.message.header.timestamp,
208 request_id: message.message.header.request_id,
209 l1_base_fee: message.message.header.base_fee_l1,
210 l2_msg,
211 delayed_messages_read: message.delayed_messages_read,
212 batch_gas_cost: message.message.batch_gas_cost,
213 batch_data_stats,
214 };
215
216 let result = self
218 .block_producer
219 .produce_block(msg_idx, input)
220 .await
221 .map_err(|e| internal_error(e.to_string()))?;
222
223 Ok(RpcMessageResult {
224 block_hash: result.block_hash,
225 send_root: result.send_root,
226 })
227 }
228
229 async fn reorg(
230 &self,
231 msg_idx_of_first_msg_to_add: u64,
232 _new_messages: Vec<RpcMessageWithMetadataAndBlockInfo>,
233 _old_messages: Vec<RpcMessageWithMetadata>,
234 ) -> RpcResult<Vec<RpcMessageResult>> {
235 warn!(target: "nitroexecution", msg_idx_of_first_msg_to_add, "Reorg not yet implemented");
236 Err(internal_error("Reorg not yet implemented"))
237 }
238
239 async fn head_message_index(&self) -> RpcResult<u64> {
240 let best = self
241 .provider
242 .best_block_number()
243 .map_err(|e| internal_error(e.to_string()))?;
244
245 let msg_idx = self.block_number_to_message_index(best).unwrap_or(0);
246 debug!(target: "nitroexecution", best, msg_idx, "headMessageIndex");
247 Ok(msg_idx)
248 }
249
250 async fn result_at_message_index(&self, msg_idx: u64) -> RpcResult<RpcMessageResult> {
251 let block_num = self.message_index_to_block_number(msg_idx);
252
253 let header = self
254 .get_header(block_num)
255 .map_err(internal_error)?
256 .ok_or_else(|| internal_error(format!("Block {block_num} not found")))?;
257
258 let send_root = Self::send_root_from_header(header.header());
259
260 Ok(RpcMessageResult {
261 block_hash: header.hash(),
262 send_root,
263 })
264 }
265
266 fn set_finality_data(
267 &self,
268 safe: Option<RpcFinalityData>,
269 finalized: Option<RpcFinalityData>,
270 validated: Option<RpcFinalityData>,
271 ) -> RpcResult<()> {
272 debug!(target: "nitroexecution", ?safe, ?finalized, ?validated, "setFinalityData");
273 Ok(())
274 }
275
276 fn set_consensus_sync_data(&self, sync_data: RpcConsensusSyncData) -> RpcResult<()> {
277 let mut state = self.state.write();
278 state.synced = sync_data.synced;
279 state.max_message_count = sync_data.max_message_count;
280 debug!(target: "nitroexecution", synced = sync_data.synced, max = sync_data.max_message_count, "setConsensusSyncData");
281 Ok(())
282 }
283
284 fn mark_feed_start(&self, to: u64) -> RpcResult<()> {
285 debug!(target: "nitroexecution", to, "markFeedStart");
286 Ok(())
287 }
288
289 async fn trigger_maintenance(&self) -> RpcResult<()> {
290 Ok(())
291 }
292
293 async fn should_trigger_maintenance(&self) -> RpcResult<bool> {
294 Ok(false)
295 }
296
297 async fn maintenance_status(&self) -> RpcResult<RpcMaintenanceStatus> {
298 Ok(RpcMaintenanceStatus { is_running: false })
299 }
300
301 async fn arbos_version_for_message_index(&self, msg_idx: u64) -> RpcResult<u64> {
302 let block_num = self.message_index_to_block_number(msg_idx);
303
304 let header = self
305 .get_header(block_num)
306 .map_err(internal_error)?
307 .ok_or_else(|| internal_error(format!("Block {block_num} not found")))?;
308
309 let mix = header.header().mix_hash().unwrap_or_default();
310 let arbos_version = u64::from_be_bytes(mix.0[16..24].try_into().unwrap_or_default());
311
312 Ok(arbos_version)
313 }
314}