openzeppelin_monitor/services/blockchain/clients/stellar/
client.rs

1//! Stellar blockchain client implementation.
2//!
3//! This module provides functionality to interact with the Stellar blockchain,
4//! supporting operations like block retrieval, transaction lookup, and event filtering.
5
6use anyhow::Context;
7use async_trait::async_trait;
8use serde_json::json;
9use std::marker::PhantomData;
10use stellar_xdr::curr::{Limits, WriteXdr};
11use tracing::instrument;
12
13use crate::{
14	models::{
15		BlockType, ContractSpec, Network, StellarBlock, StellarContractSpec, StellarEvent,
16		StellarTransaction, StellarTransactionInfo,
17	},
18	services::{
19		blockchain::{
20			client::{BlockChainClient, BlockFilterFactory},
21			transports::StellarTransportClient,
22			BlockchainTransport,
23		},
24		filter::{
25			stellar_helpers::{
26				get_contract_code_ledger_key, get_contract_instance_ledger_key, get_contract_spec,
27				get_wasm_code_from_ledger_entry_data, get_wasm_hash_from_ledger_entry_data,
28			},
29			StellarBlockFilter,
30		},
31	},
32};
33
34use super::error::StellarClientError;
35
36/// Stellar RPC method constants
37const RPC_METHOD_GET_TRANSACTIONS: &str = "getTransactions";
38const RPC_METHOD_GET_EVENTS: &str = "getEvents";
39const RPC_METHOD_GET_LATEST_LEDGER: &str = "getLatestLedger";
40const RPC_METHOD_GET_LEDGERS: &str = "getLedgers";
41const RPC_METHOD_GET_LEDGER_ENTRIES: &str = "getLedgerEntries";
42
43const RETENTION_MESSAGES: [&str; 2] = [
44	"must be within the ledger range",
45	"must be between the oldest ledger",
46];
47
48/// Client implementation for the Stellar blockchain
49///
50/// Provides high-level access to Stellar blockchain data and operations through HTTP transport.
51#[derive(Clone)]
52pub struct StellarClient<T: Send + Sync + Clone> {
53	/// The underlying Stellar transport client for RPC communication
54	http_client: T,
55}
56
57impl<T: Send + Sync + Clone> StellarClient<T> {
58	/// Creates a new Stellar client instance with a specific transport client
59	pub fn new_with_transport(http_client: T) -> Self {
60		Self { http_client }
61	}
62
63	/// Checks a JSON-RPC response for error information and converts it into a `StellarClientError` if present.
64	///
65	/// This function inspects the given JSON response body for an "error" field.
66	/// If a known "outside of retention window" condition is detected (by code/message),
67	/// it returns a specific `StellarClientError::outside_retention_window`.
68	/// Otherwise, it returns a generic `StellarClientError::rpc_error` for any other error found.
69	/// If no error is present, it returns `Ok(())`.
70	///
71	/// # Arguments
72	/// * `response_body` - A reference to the JSON response body to inspect.
73	/// * `start_sequence` - The starting ledger sequence number relevant to the request.
74	/// * `target_sequence` - The target ledger sequence number relevant to the request.
75	/// * `method_name` - The name of the RPC method that was called (for error reporting).
76	///
77	/// # Returns
78	/// * `Ok(())` if no error is present in the response.
79	/// * `Err(StellarClientError)` if an error is detected.
80	fn check_and_handle_rpc_error(
81		&self,
82		response_body: &serde_json::Value,
83		start_sequence: u32,
84		target_sequence: u32,
85		method_name: &'static str,
86	) -> Result<(), StellarClientError> {
87		if let Some(json_rpc_error) = response_body.get("error") {
88			let rpc_code = json_rpc_error
89				.get("code")
90				.and_then(|c| c.as_i64())
91				.unwrap_or(0);
92			let rpc_message = json_rpc_error
93				.get("message")
94				.and_then(|m| m.as_str())
95				.unwrap_or("Unknown RPC error")
96				.to_string();
97
98			if rpc_code == -32600
99				&& RETENTION_MESSAGES
100					.iter()
101					.any(|msg| rpc_message.to_lowercase().contains(msg))
102			{
103				let ledger_info_str = format!(
104					"start_sequence: {}, target_sequence: {}",
105					start_sequence, target_sequence
106				);
107
108				return Err(StellarClientError::outside_retention_window(
109					rpc_code,
110					rpc_message,
111					ledger_info_str,
112					None,
113					None,
114				));
115			} else {
116				// Other JSON-RPC error reported by the server
117				let message = format!(
118					"Stellar RPC request failed for method '{}': {} (code {})",
119					method_name, rpc_message, rpc_code
120				);
121
122				return Err(StellarClientError::rpc_error(message, None, None));
123			}
124		}
125		Ok(())
126	}
127}
128
129impl StellarClient<StellarTransportClient> {
130	/// Creates a new Stellar client instance
131	///
132	/// # Arguments
133	/// * `network` - Network configuration containing RPC endpoints and chain details
134	///
135	/// # Returns
136	/// * `Result<Self, anyhow::Error>` - New client instance or connection error
137	pub async fn new(network: &Network) -> Result<Self, anyhow::Error> {
138		let http_client = StellarTransportClient::new(network).await?;
139		Ok(Self::new_with_transport(http_client))
140	}
141}
142
143/// Extended functionality specific to the Stellar blockchain
144#[async_trait]
145pub trait StellarClientTrait {
146	/// Retrieves transactions within a sequence range
147	///
148	/// # Arguments
149	/// * `start_sequence` - Starting sequence number
150	/// * `end_sequence` - Optional ending sequence number. If None, only fetches start_sequence
151	///
152	/// # Returns
153	/// * `Result<Vec<StellarTransaction>, anyhow::Error>` - Collection of transactions or error
154	async fn get_transactions(
155		&self,
156		start_sequence: u32,
157		end_sequence: Option<u32>,
158	) -> Result<Vec<StellarTransaction>, anyhow::Error>;
159
160	/// Retrieves events within a sequence range
161	///
162	/// # Arguments
163	/// * `start_sequence` - Starting sequence number
164	/// * `end_sequence` - Optional ending sequence number. If None, only fetches start_sequence
165	///
166	/// # Returns
167	/// * `Result<Vec<StellarEvent>, anyhow::Error>` - Collection of events or error
168	async fn get_events(
169		&self,
170		start_sequence: u32,
171		end_sequence: Option<u32>,
172	) -> Result<Vec<StellarEvent>, anyhow::Error>;
173}
174
175#[async_trait]
176impl<T: Send + Sync + Clone + BlockchainTransport> StellarClientTrait for StellarClient<T> {
177	/// Retrieves transactions within a sequence range with pagination
178	///
179	/// # Errors
180	/// - Returns `anyhow::Error` if start_sequence > end_sequence
181	/// - Returns `anyhow::Error` if transaction parsing fails
182	#[instrument(skip(self), fields(start_sequence, end_sequence))]
183	async fn get_transactions(
184		&self,
185		start_sequence: u32,
186		end_sequence: Option<u32>,
187	) -> Result<Vec<StellarTransaction>, anyhow::Error> {
188		// Validate input parameters
189		if let Some(end_sequence) = end_sequence {
190			if start_sequence > end_sequence {
191				let message = format!(
192					"start_sequence {} cannot be greater than end_sequence {}",
193					start_sequence, end_sequence
194				);
195				let input_error = StellarClientError::invalid_input(message, None, None);
196				return Err(anyhow::anyhow!(input_error))
197					.context("Invalid input parameters for Stellar RPC");
198			}
199		}
200
201		// max limit for the RPC endpoint is 200
202		const PAGE_LIMIT: u32 = 200;
203		let mut transactions = Vec::new();
204		let target_sequence = end_sequence.unwrap_or(start_sequence);
205		let mut cursor: Option<String> = None;
206		let mut current_iteration = 0;
207
208		while cursor.is_some() || current_iteration <= 0 {
209			let params = if current_iteration == 0 {
210				// First iteration, we need to fetch the transactions from the start sequence without a cursor
211				json!({
212					"startLedger": start_sequence,
213					"pagination": {
214						"limit": PAGE_LIMIT
215					}
216				})
217			} else {
218				// Subsequent iterations, we need to fetch the transactions from the cursor
219				json!({
220					"pagination": {
221						"cursor": cursor,
222						"limit": PAGE_LIMIT
223					}
224				})
225			};
226
227			let http_response = self
228				.http_client
229				.send_raw_request(RPC_METHOD_GET_TRANSACTIONS, Some(params))
230				.await;
231
232			match http_response {
233				Ok(response_body) => {
234					// Check for RPC errors in the response
235					if let Err(rpc_error) = self.check_and_handle_rpc_error(
236						&response_body,
237						start_sequence,
238						target_sequence,
239						RPC_METHOD_GET_TRANSACTIONS,
240					) {
241						// A terminal JSON-RPC error was found, convert and return
242						return Err(anyhow::anyhow!(rpc_error).context(format!(
243							"Soroban RPC reported an error during {}",
244							RPC_METHOD_GET_TRANSACTIONS,
245						)));
246					}
247
248					// Extract the transactions from the response
249					let raw_transactions = response_body
250						.get("result")
251						.and_then(|r| r.get("transactions"))
252						.ok_or_else(|| {
253							let message = format!(
254								"Unexpected response structure for method '{}'",
255								RPC_METHOD_GET_TRANSACTIONS
256							);
257							StellarClientError::unexpected_response_structure(message, None, None)
258						})
259						.map_err(|client_parse_error| {
260							anyhow::anyhow!(client_parse_error)
261								.context("Failed to parse transaction response")
262						})?;
263
264					let ledger_transactions: Vec<StellarTransactionInfo> =
265						serde_json::from_value(raw_transactions.clone()).map_err(|e| {
266							let message = format!(
267								"Failed to parse transactions from response for method '{}': {}",
268								RPC_METHOD_GET_TRANSACTIONS, e
269							);
270							let sce_parse_error = StellarClientError::response_parse_error(
271								message,
272								Some(e.into()),
273								None,
274							);
275							anyhow::anyhow!(sce_parse_error)
276								.context("Failed to parse transaction response")
277						})?;
278
279					if ledger_transactions.is_empty() {
280						break;
281					}
282
283					for transaction in ledger_transactions {
284						if transaction.ledger > target_sequence {
285							return Ok(transactions);
286						}
287						transactions.push(StellarTransaction::from(transaction));
288					}
289
290					// Increment the number of iterations to ensure we break the loop in case there is no cursor
291					current_iteration += 1;
292					cursor = response_body["result"]["cursor"]
293						.as_str()
294						.map(|s| s.to_string());
295					if cursor.is_none() {
296						break;
297					}
298				}
299				Err(transport_err) => {
300					// Ledger info for logging
301					let ledger_info = format!(
302						"start_sequence: {}, end_sequence: {:?}",
303						start_sequence, end_sequence
304					);
305
306					return Err(anyhow::anyhow!(transport_err)).context(format!(
307						"Failed to {} from Stellar RPC for ledger: {}",
308						RPC_METHOD_GET_TRANSACTIONS, ledger_info
309					));
310				}
311			}
312		}
313		Ok(transactions)
314	}
315
316	/// Retrieves events within a sequence range with pagination
317	///
318	/// # Errors
319	/// - Returns `anyhow::Error` if start_sequence > end_sequence
320	/// - Returns `anyhow::Error` if event parsing fails
321	#[instrument(skip(self), fields(start_sequence, end_sequence))]
322	async fn get_events(
323		&self,
324		start_sequence: u32,
325		end_sequence: Option<u32>,
326	) -> Result<Vec<StellarEvent>, anyhow::Error> {
327		// Validate input parameters
328		if let Some(end_sequence) = end_sequence {
329			if start_sequence > end_sequence {
330				let message = format!(
331					"start_sequence {} cannot be greater than end_sequence {}",
332					start_sequence, end_sequence
333				);
334				let input_error = StellarClientError::invalid_input(message, None, None);
335				return Err(anyhow::anyhow!(input_error))
336					.context("Invalid input parameters for Stellar RPC");
337			}
338		}
339
340		// max limit for the RPC endpoint is 200
341		const PAGE_LIMIT: u32 = 200;
342		let mut events = Vec::new();
343		let target_sequence = end_sequence.unwrap_or(start_sequence);
344		let mut cursor: Option<String> = None;
345		let mut current_iteration = 0;
346
347		while cursor.is_some() || current_iteration <= 0 {
348			let params = if current_iteration == 0 {
349				// First iteration, we need to fetch the events from the start sequence without a cursor
350				json!({
351					"startLedger": start_sequence,
352					"filters": [{
353						"type": "contract",
354					}],
355					"pagination": {
356						"limit": PAGE_LIMIT
357					}
358				})
359			} else {
360				// Subsequent iterations, we need to fetch the events from the cursor
361				json!({
362					"filters": [{
363						"type": "contract",
364					}],
365					"pagination": {
366						"cursor": cursor,
367						"limit": PAGE_LIMIT
368					}
369				})
370			};
371
372			let http_response = self
373				.http_client
374				.send_raw_request(RPC_METHOD_GET_EVENTS, Some(params))
375				.await;
376
377			match http_response {
378				Ok(response_body) => {
379					// Check for RPC errors in the response
380					if let Err(rpc_error) = self.check_and_handle_rpc_error(
381						&response_body,
382						start_sequence,
383						target_sequence,
384						RPC_METHOD_GET_EVENTS,
385					) {
386						// A terminal JSON-RPC error was found, convert and return
387						return Err(anyhow::anyhow!(rpc_error).context(format!(
388							"Soroban RPC reported an error during {}",
389							RPC_METHOD_GET_EVENTS
390						)));
391					}
392
393					// Extract the events from the response
394					let raw_events = response_body
395						.get("result")
396						.and_then(|r| r.get("events"))
397						.ok_or_else(|| {
398							let message = format!(
399								"Unexpected response structure for method '{}'",
400								RPC_METHOD_GET_EVENTS
401							);
402							StellarClientError::unexpected_response_structure(message, None, None)
403						})
404						.map_err(|client_parse_error| {
405							anyhow::anyhow!(client_parse_error)
406								.context("Failed to parse event response")
407						})?;
408
409					let ledger_events: Vec<StellarEvent> =
410						serde_json::from_value(raw_events.clone()).map_err(|e| {
411							let message = format!(
412								"Failed to parse events from response for method '{}': {}",
413								RPC_METHOD_GET_EVENTS, e
414							);
415							let sce_parse_error = StellarClientError::response_parse_error(
416								message,
417								Some(e.into()),
418								None,
419							);
420							anyhow::anyhow!(sce_parse_error)
421								.context("Failed to parse event response")
422						})?;
423
424					for event in ledger_events {
425						if event.ledger > target_sequence {
426							return Ok(events);
427						}
428						events.push(event);
429					}
430
431					// Increment the number of iterations to ensure we break the loop in case there is no cursor
432					current_iteration += 1;
433					cursor = response_body["result"]["cursor"]
434						.as_str()
435						.map(|s| s.to_string());
436					if cursor.is_none() {
437						break;
438					}
439				}
440				Err(transport_err) => {
441					// Ledger info for logging
442					let ledger_info = format!(
443						"start_sequence: {}, end_sequence: {:?}",
444						start_sequence, end_sequence
445					);
446
447					return Err(anyhow::anyhow!(transport_err)).context(format!(
448						"Failed to {} from Stellar RPC for ledger: {}",
449						RPC_METHOD_GET_EVENTS, ledger_info,
450					));
451				}
452			}
453		}
454		Ok(events)
455	}
456}
457
458impl<T: Send + Sync + Clone + BlockchainTransport> BlockFilterFactory<Self> for StellarClient<T> {
459	type Filter = StellarBlockFilter<Self>;
460
461	fn filter() -> Self::Filter {
462		StellarBlockFilter {
463			_client: PhantomData {},
464		}
465	}
466}
467
468#[async_trait]
469impl<T: Send + Sync + Clone + BlockchainTransport> BlockChainClient for StellarClient<T> {
470	/// Retrieves the latest block number with retry functionality
471	#[instrument(skip(self))]
472	async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
473		let response = self
474			.http_client
475			.send_raw_request::<serde_json::Value>(RPC_METHOD_GET_LATEST_LEDGER, None)
476			.await
477			.with_context(|| "Failed to get latest ledger")?;
478
479		let sequence = response["result"]["sequence"]
480			.as_u64()
481			.ok_or_else(|| anyhow::anyhow!("Invalid sequence number"))?;
482
483		Ok(sequence)
484	}
485
486	/// Retrieves blocks within the specified range with retry functionality
487	///
488	/// # Note
489	/// If end_block is None, only the start_block will be retrieved
490	///
491	/// # Errors
492	/// - Returns `anyhow::Error`
493	#[instrument(skip(self), fields(start_block, end_block))]
494	async fn get_blocks(
495		&self,
496		start_block: u64,
497		end_block: Option<u64>,
498	) -> Result<Vec<BlockType>, anyhow::Error> {
499		// max limit for the RPC endpoint is 200
500		const PAGE_LIMIT: u32 = 200;
501
502		// Validate input parameters
503		if let Some(end_block) = end_block {
504			if start_block > end_block {
505				let message = format!(
506					"start_block {} cannot be greater than end_block {}",
507					start_block, end_block
508				);
509				let input_error = StellarClientError::invalid_input(message, None, None);
510				return Err(anyhow::anyhow!(input_error))
511					.context("Invalid input parameters for Stellar RPC");
512			}
513		}
514
515		let mut blocks = Vec::new();
516		let target_block = end_block.unwrap_or(start_block);
517		let mut cursor: Option<String> = None;
518		let mut current_iteration = 0;
519
520		while cursor.is_some() || current_iteration <= 0 {
521			let params = if current_iteration == 0 {
522				// First iteration, we need to fetch the ledgers from the start block without a cursor
523				json!({
524					"startLedger": start_block,
525					"pagination": {
526						"limit": if end_block.is_none() { 1 } else { PAGE_LIMIT }
527					}
528				})
529			} else {
530				// Subsequent iterations, we need to fetch the ledgers from the cursor
531				json!({
532					"pagination": {
533						"cursor": cursor,
534						"limit": PAGE_LIMIT
535					}
536				})
537			};
538
539			let http_response = self
540				.http_client
541				.send_raw_request(RPC_METHOD_GET_LEDGERS, Some(params))
542				.await;
543
544			match http_response {
545				Ok(response_body) => {
546					// Check for RPC errors in the response
547					// Currently this won't catch OutsideRetentionWindow errors because RPC returns generic error message:
548					// "[-32003] request failed to process due to internal issue"
549					// but we can still handle it as generic RPC error
550					// TODO: revisit after this issue is resolved: https://github.com/stellar/stellar-rpc/issues/454
551					if let Err(rpc_error) = self.check_and_handle_rpc_error(
552						&response_body,
553						start_block as u32,
554						target_block as u32,
555						RPC_METHOD_GET_LEDGERS,
556					) {
557						// A terminal JSON-RPC error was found, convert and return
558						return Err(anyhow::anyhow!(rpc_error).context(format!(
559							"Soroban RPC reported an error during {}",
560							RPC_METHOD_GET_LEDGERS,
561						)));
562					}
563
564					// Extract the ledgers from the response
565					let raw_ledgers = response_body
566						.get("result")
567						.and_then(|r| r.get("ledgers"))
568						.ok_or_else(|| {
569							let message = format!(
570								"Unexpected response structure for method '{}'",
571								RPC_METHOD_GET_LEDGERS
572							);
573							let sce_parse_error = StellarClientError::unexpected_response_structure(
574								message, None, None,
575							);
576							anyhow::anyhow!(sce_parse_error)
577								.context("Failed to parse ledger response")
578						})?;
579
580					let ledgers: Vec<StellarBlock> = serde_json::from_value(raw_ledgers.clone())
581						.map_err(|e| {
582							let message = format!(
583								"Failed to parse ledgers from response for method '{}': {}",
584								RPC_METHOD_GET_LEDGERS, e
585							);
586							let sce_parse_error = StellarClientError::response_parse_error(
587								message,
588								Some(e.into()),
589								None,
590							);
591							anyhow::anyhow!(sce_parse_error)
592								.context("Failed to parse ledger response")
593						})?;
594
595					if ledgers.is_empty() {
596						break;
597					}
598
599					for ledger in ledgers {
600						if (ledger.sequence as u64) > target_block {
601							return Ok(blocks);
602						}
603						blocks.push(BlockType::Stellar(Box::new(ledger)));
604					}
605
606					// Increment the number of iterations to ensure we break the loop in case there is no cursor
607					current_iteration += 1;
608					cursor = response_body["result"]["cursor"]
609						.as_str()
610						.map(|s| s.to_string());
611
612					// If the cursor is the same as the start block, we have reached the end of the range
613					if cursor == Some(start_block.to_string()) {
614						break;
615					}
616
617					if cursor.is_none() {
618						break;
619					}
620				}
621				Err(transport_err) => {
622					// Ledger info for logging
623					let ledger_info =
624						format!("start_block: {}, end_block: {:?}", start_block, end_block);
625
626					return Err(anyhow::anyhow!(transport_err)).context(format!(
627						"Failed to {} from Stellar RPC for ledger: {}",
628						RPC_METHOD_GET_LEDGERS, ledger_info,
629					));
630				}
631			}
632		}
633		Ok(blocks)
634	}
635
636	/// Retrieves the contract spec for a given contract ID
637	///
638	/// # Arguments
639	/// * `contract_id` - The ID of the contract to retrieve the spec for
640	///
641	/// # Returns
642	/// * `Result<ContractSpec, anyhow::Error>` - The contract spec or error
643	#[instrument(skip(self), fields(contract_id))]
644	async fn get_contract_spec(&self, contract_id: &str) -> Result<ContractSpec, anyhow::Error> {
645		// Get contract wasm code from contract ID
646		let contract_instance_ledger_key = get_contract_instance_ledger_key(contract_id)
647			.map_err(|e| anyhow::anyhow!("Failed to get contract instance ledger key: {}", e))?;
648
649		let contract_instance_ledger_key_xdr = contract_instance_ledger_key
650			.to_xdr_base64(Limits::none())
651			.map_err(|e| {
652				anyhow::anyhow!(
653					"Failed to convert contract instance ledger key to XDR: {}",
654					e
655				)
656			})?;
657
658		let params = json!({
659			"keys": [contract_instance_ledger_key_xdr],
660			"xdrFormat": "base64"
661		});
662
663		let response = self
664			.http_client
665			.send_raw_request(RPC_METHOD_GET_LEDGER_ENTRIES, Some(params))
666			.await
667			.with_context(|| format!("Failed to get contract wasm code for {}", contract_id))?;
668
669		let contract_data_xdr_base64 = match response["result"]["entries"][0]["xdr"].as_str() {
670			Some(xdr) => xdr,
671			None => {
672				return Err(anyhow::anyhow!("Failed to get contract data XDR"));
673			}
674		};
675
676		let wasm_hash = get_wasm_hash_from_ledger_entry_data(contract_data_xdr_base64)
677			.map_err(|e| anyhow::anyhow!("Failed to get wasm hash: {}", e))?;
678
679		let contract_code_ledger_key = get_contract_code_ledger_key(wasm_hash.as_str())
680			.map_err(|e| anyhow::anyhow!("Failed to get contract code ledger key: {}", e))?;
681
682		let contract_code_ledger_key_xdr = contract_code_ledger_key
683			.to_xdr_base64(Limits::none())
684			.map_err(|e| {
685			anyhow::anyhow!("Failed to convert contract code ledger key to XDR: {}", e)
686		})?;
687
688		let params = json!({
689			"keys": [contract_code_ledger_key_xdr],
690			"xdrFormat": "base64"
691		});
692
693		let response = self
694			.http_client
695			.send_raw_request(RPC_METHOD_GET_LEDGER_ENTRIES, Some(params))
696			.await
697			.with_context(|| format!("Failed to get contract wasm code for {}", contract_id))?;
698
699		let contract_code_xdr_base64 = match response["result"]["entries"][0]["xdr"].as_str() {
700			Some(xdr) => xdr,
701			None => {
702				return Err(anyhow::anyhow!("Failed to get contract code XDR"));
703			}
704		};
705
706		let wasm_code = get_wasm_code_from_ledger_entry_data(contract_code_xdr_base64)
707			.map_err(|e| anyhow::anyhow!("Failed to get wasm code: {}", e))?;
708
709		let contract_spec = get_contract_spec(wasm_code.as_str())
710			.map_err(|e| anyhow::anyhow!("Failed to get contract spec: {}", e))?;
711
712		Ok(ContractSpec::Stellar(StellarContractSpec::from(
713			contract_spec,
714		)))
715	}
716}