1use anyhow::Context;
7use async_trait::async_trait;
8use serde_json::json;
9use std::marker::PhantomData;
10use stellar_xdr::curr::{Limits, WriteXdr};
11use tracing::instrument;
12
13use crate::{
14 models::{
15 BlockType, ContractSpec, Network, StellarBlock, StellarContractSpec, StellarEvent,
16 StellarTransaction, StellarTransactionInfo,
17 },
18 services::{
19 blockchain::{
20 client::{BlockChainClient, BlockFilterFactory},
21 transports::StellarTransportClient,
22 BlockchainTransport,
23 },
24 filter::{
25 stellar_helpers::{
26 get_contract_code_ledger_key, get_contract_instance_ledger_key, get_contract_spec,
27 get_wasm_code_from_ledger_entry_data, get_wasm_hash_from_ledger_entry_data,
28 },
29 StellarBlockFilter,
30 },
31 },
32};
33
34use super::error::StellarClientError;
35
36const RPC_METHOD_GET_TRANSACTIONS: &str = "getTransactions";
38const RPC_METHOD_GET_EVENTS: &str = "getEvents";
39const RPC_METHOD_GET_LATEST_LEDGER: &str = "getLatestLedger";
40const RPC_METHOD_GET_LEDGERS: &str = "getLedgers";
41const RPC_METHOD_GET_LEDGER_ENTRIES: &str = "getLedgerEntries";
42
43const RETENTION_MESSAGES: [&str; 2] = [
44 "must be within the ledger range",
45 "must be between the oldest ledger",
46];
47
48#[derive(Clone)]
52pub struct StellarClient<T: Send + Sync + Clone> {
53 http_client: T,
55}
56
57impl<T: Send + Sync + Clone> StellarClient<T> {
58 pub fn new_with_transport(http_client: T) -> Self {
60 Self { http_client }
61 }
62
63 fn check_and_handle_rpc_error(
81 &self,
82 response_body: &serde_json::Value,
83 start_sequence: u32,
84 target_sequence: u32,
85 method_name: &'static str,
86 ) -> Result<(), StellarClientError> {
87 if let Some(json_rpc_error) = response_body.get("error") {
88 let rpc_code = json_rpc_error
89 .get("code")
90 .and_then(|c| c.as_i64())
91 .unwrap_or(0);
92 let rpc_message = json_rpc_error
93 .get("message")
94 .and_then(|m| m.as_str())
95 .unwrap_or("Unknown RPC error")
96 .to_string();
97
98 if rpc_code == -32600
99 && RETENTION_MESSAGES
100 .iter()
101 .any(|msg| rpc_message.to_lowercase().contains(msg))
102 {
103 let ledger_info_str = format!(
104 "start_sequence: {}, target_sequence: {}",
105 start_sequence, target_sequence
106 );
107
108 return Err(StellarClientError::outside_retention_window(
109 rpc_code,
110 rpc_message,
111 ledger_info_str,
112 None,
113 None,
114 ));
115 } else {
116 let message = format!(
118 "Stellar RPC request failed for method '{}': {} (code {})",
119 method_name, rpc_message, rpc_code
120 );
121
122 return Err(StellarClientError::rpc_error(message, None, None));
123 }
124 }
125 Ok(())
126 }
127}
128
129impl StellarClient<StellarTransportClient> {
130 pub async fn new(network: &Network) -> Result<Self, anyhow::Error> {
138 let http_client = StellarTransportClient::new(network).await?;
139 Ok(Self::new_with_transport(http_client))
140 }
141}
142
143#[async_trait]
145pub trait StellarClientTrait {
146 async fn get_transactions(
155 &self,
156 start_sequence: u32,
157 end_sequence: Option<u32>,
158 ) -> Result<Vec<StellarTransaction>, anyhow::Error>;
159
160 async fn get_events(
169 &self,
170 start_sequence: u32,
171 end_sequence: Option<u32>,
172 ) -> Result<Vec<StellarEvent>, anyhow::Error>;
173}
174
175#[async_trait]
176impl<T: Send + Sync + Clone + BlockchainTransport> StellarClientTrait for StellarClient<T> {
177 #[instrument(skip(self), fields(start_sequence, end_sequence))]
183 async fn get_transactions(
184 &self,
185 start_sequence: u32,
186 end_sequence: Option<u32>,
187 ) -> Result<Vec<StellarTransaction>, anyhow::Error> {
188 if let Some(end_sequence) = end_sequence {
190 if start_sequence > end_sequence {
191 let message = format!(
192 "start_sequence {} cannot be greater than end_sequence {}",
193 start_sequence, end_sequence
194 );
195 let input_error = StellarClientError::invalid_input(message, None, None);
196 return Err(anyhow::anyhow!(input_error))
197 .context("Invalid input parameters for Stellar RPC");
198 }
199 }
200
201 const PAGE_LIMIT: u32 = 200;
203 let mut transactions = Vec::new();
204 let target_sequence = end_sequence.unwrap_or(start_sequence);
205 let mut cursor: Option<String> = None;
206 let mut current_iteration = 0;
207
208 while cursor.is_some() || current_iteration <= 0 {
209 let params = if current_iteration == 0 {
210 json!({
212 "startLedger": start_sequence,
213 "pagination": {
214 "limit": PAGE_LIMIT
215 }
216 })
217 } else {
218 json!({
220 "pagination": {
221 "cursor": cursor,
222 "limit": PAGE_LIMIT
223 }
224 })
225 };
226
227 let http_response = self
228 .http_client
229 .send_raw_request(RPC_METHOD_GET_TRANSACTIONS, Some(params))
230 .await;
231
232 match http_response {
233 Ok(response_body) => {
234 if let Err(rpc_error) = self.check_and_handle_rpc_error(
236 &response_body,
237 start_sequence,
238 target_sequence,
239 RPC_METHOD_GET_TRANSACTIONS,
240 ) {
241 return Err(anyhow::anyhow!(rpc_error).context(format!(
243 "Soroban RPC reported an error during {}",
244 RPC_METHOD_GET_TRANSACTIONS,
245 )));
246 }
247
248 let raw_transactions = response_body
250 .get("result")
251 .and_then(|r| r.get("transactions"))
252 .ok_or_else(|| {
253 let message = format!(
254 "Unexpected response structure for method '{}'",
255 RPC_METHOD_GET_TRANSACTIONS
256 );
257 StellarClientError::unexpected_response_structure(message, None, None)
258 })
259 .map_err(|client_parse_error| {
260 anyhow::anyhow!(client_parse_error)
261 .context("Failed to parse transaction response")
262 })?;
263
264 let ledger_transactions: Vec<StellarTransactionInfo> =
265 serde_json::from_value(raw_transactions.clone()).map_err(|e| {
266 let message = format!(
267 "Failed to parse transactions from response for method '{}': {}",
268 RPC_METHOD_GET_TRANSACTIONS, e
269 );
270 let sce_parse_error = StellarClientError::response_parse_error(
271 message,
272 Some(e.into()),
273 None,
274 );
275 anyhow::anyhow!(sce_parse_error)
276 .context("Failed to parse transaction response")
277 })?;
278
279 if ledger_transactions.is_empty() {
280 break;
281 }
282
283 for transaction in ledger_transactions {
284 if transaction.ledger > target_sequence {
285 return Ok(transactions);
286 }
287 transactions.push(StellarTransaction::from(transaction));
288 }
289
290 current_iteration += 1;
292 cursor = response_body["result"]["cursor"]
293 .as_str()
294 .map(|s| s.to_string());
295 if cursor.is_none() {
296 break;
297 }
298 }
299 Err(transport_err) => {
300 let ledger_info = format!(
302 "start_sequence: {}, end_sequence: {:?}",
303 start_sequence, end_sequence
304 );
305
306 return Err(anyhow::anyhow!(transport_err)).context(format!(
307 "Failed to {} from Stellar RPC for ledger: {}",
308 RPC_METHOD_GET_TRANSACTIONS, ledger_info
309 ));
310 }
311 }
312 }
313 Ok(transactions)
314 }
315
316 #[instrument(skip(self), fields(start_sequence, end_sequence))]
322 async fn get_events(
323 &self,
324 start_sequence: u32,
325 end_sequence: Option<u32>,
326 ) -> Result<Vec<StellarEvent>, anyhow::Error> {
327 if let Some(end_sequence) = end_sequence {
329 if start_sequence > end_sequence {
330 let message = format!(
331 "start_sequence {} cannot be greater than end_sequence {}",
332 start_sequence, end_sequence
333 );
334 let input_error = StellarClientError::invalid_input(message, None, None);
335 return Err(anyhow::anyhow!(input_error))
336 .context("Invalid input parameters for Stellar RPC");
337 }
338 }
339
340 const PAGE_LIMIT: u32 = 200;
342 let mut events = Vec::new();
343 let target_sequence = end_sequence.unwrap_or(start_sequence);
344 let mut cursor: Option<String> = None;
345 let mut current_iteration = 0;
346
347 while cursor.is_some() || current_iteration <= 0 {
348 let params = if current_iteration == 0 {
349 json!({
351 "startLedger": start_sequence,
352 "filters": [{
353 "type": "contract",
354 }],
355 "pagination": {
356 "limit": PAGE_LIMIT
357 }
358 })
359 } else {
360 json!({
362 "filters": [{
363 "type": "contract",
364 }],
365 "pagination": {
366 "cursor": cursor,
367 "limit": PAGE_LIMIT
368 }
369 })
370 };
371
372 let http_response = self
373 .http_client
374 .send_raw_request(RPC_METHOD_GET_EVENTS, Some(params))
375 .await;
376
377 match http_response {
378 Ok(response_body) => {
379 if let Err(rpc_error) = self.check_and_handle_rpc_error(
381 &response_body,
382 start_sequence,
383 target_sequence,
384 RPC_METHOD_GET_EVENTS,
385 ) {
386 return Err(anyhow::anyhow!(rpc_error).context(format!(
388 "Soroban RPC reported an error during {}",
389 RPC_METHOD_GET_EVENTS
390 )));
391 }
392
393 let raw_events = response_body
395 .get("result")
396 .and_then(|r| r.get("events"))
397 .ok_or_else(|| {
398 let message = format!(
399 "Unexpected response structure for method '{}'",
400 RPC_METHOD_GET_EVENTS
401 );
402 StellarClientError::unexpected_response_structure(message, None, None)
403 })
404 .map_err(|client_parse_error| {
405 anyhow::anyhow!(client_parse_error)
406 .context("Failed to parse event response")
407 })?;
408
409 let ledger_events: Vec<StellarEvent> =
410 serde_json::from_value(raw_events.clone()).map_err(|e| {
411 let message = format!(
412 "Failed to parse events from response for method '{}': {}",
413 RPC_METHOD_GET_EVENTS, e
414 );
415 let sce_parse_error = StellarClientError::response_parse_error(
416 message,
417 Some(e.into()),
418 None,
419 );
420 anyhow::anyhow!(sce_parse_error)
421 .context("Failed to parse event response")
422 })?;
423
424 for event in ledger_events {
425 if event.ledger > target_sequence {
426 return Ok(events);
427 }
428 events.push(event);
429 }
430
431 current_iteration += 1;
433 cursor = response_body["result"]["cursor"]
434 .as_str()
435 .map(|s| s.to_string());
436 if cursor.is_none() {
437 break;
438 }
439 }
440 Err(transport_err) => {
441 let ledger_info = format!(
443 "start_sequence: {}, end_sequence: {:?}",
444 start_sequence, end_sequence
445 );
446
447 return Err(anyhow::anyhow!(transport_err)).context(format!(
448 "Failed to {} from Stellar RPC for ledger: {}",
449 RPC_METHOD_GET_EVENTS, ledger_info,
450 ));
451 }
452 }
453 }
454 Ok(events)
455 }
456}
457
458impl<T: Send + Sync + Clone + BlockchainTransport> BlockFilterFactory<Self> for StellarClient<T> {
459 type Filter = StellarBlockFilter<Self>;
460
461 fn filter() -> Self::Filter {
462 StellarBlockFilter {
463 _client: PhantomData {},
464 }
465 }
466}
467
468#[async_trait]
469impl<T: Send + Sync + Clone + BlockchainTransport> BlockChainClient for StellarClient<T> {
470 #[instrument(skip(self))]
472 async fn get_latest_block_number(&self) -> Result<u64, anyhow::Error> {
473 let response = self
474 .http_client
475 .send_raw_request::<serde_json::Value>(RPC_METHOD_GET_LATEST_LEDGER, None)
476 .await
477 .with_context(|| "Failed to get latest ledger")?;
478
479 let sequence = response["result"]["sequence"]
480 .as_u64()
481 .ok_or_else(|| anyhow::anyhow!("Invalid sequence number"))?;
482
483 Ok(sequence)
484 }
485
486 #[instrument(skip(self), fields(start_block, end_block))]
494 async fn get_blocks(
495 &self,
496 start_block: u64,
497 end_block: Option<u64>,
498 ) -> Result<Vec<BlockType>, anyhow::Error> {
499 const PAGE_LIMIT: u32 = 200;
501
502 if let Some(end_block) = end_block {
504 if start_block > end_block {
505 let message = format!(
506 "start_block {} cannot be greater than end_block {}",
507 start_block, end_block
508 );
509 let input_error = StellarClientError::invalid_input(message, None, None);
510 return Err(anyhow::anyhow!(input_error))
511 .context("Invalid input parameters for Stellar RPC");
512 }
513 }
514
515 let mut blocks = Vec::new();
516 let target_block = end_block.unwrap_or(start_block);
517 let mut cursor: Option<String> = None;
518 let mut current_iteration = 0;
519
520 while cursor.is_some() || current_iteration <= 0 {
521 let params = if current_iteration == 0 {
522 json!({
524 "startLedger": start_block,
525 "pagination": {
526 "limit": if end_block.is_none() { 1 } else { PAGE_LIMIT }
527 }
528 })
529 } else {
530 json!({
532 "pagination": {
533 "cursor": cursor,
534 "limit": PAGE_LIMIT
535 }
536 })
537 };
538
539 let http_response = self
540 .http_client
541 .send_raw_request(RPC_METHOD_GET_LEDGERS, Some(params))
542 .await;
543
544 match http_response {
545 Ok(response_body) => {
546 if let Err(rpc_error) = self.check_and_handle_rpc_error(
552 &response_body,
553 start_block as u32,
554 target_block as u32,
555 RPC_METHOD_GET_LEDGERS,
556 ) {
557 return Err(anyhow::anyhow!(rpc_error).context(format!(
559 "Soroban RPC reported an error during {}",
560 RPC_METHOD_GET_LEDGERS,
561 )));
562 }
563
564 let raw_ledgers = response_body
566 .get("result")
567 .and_then(|r| r.get("ledgers"))
568 .ok_or_else(|| {
569 let message = format!(
570 "Unexpected response structure for method '{}'",
571 RPC_METHOD_GET_LEDGERS
572 );
573 let sce_parse_error = StellarClientError::unexpected_response_structure(
574 message, None, None,
575 );
576 anyhow::anyhow!(sce_parse_error)
577 .context("Failed to parse ledger response")
578 })?;
579
580 let ledgers: Vec<StellarBlock> = serde_json::from_value(raw_ledgers.clone())
581 .map_err(|e| {
582 let message = format!(
583 "Failed to parse ledgers from response for method '{}': {}",
584 RPC_METHOD_GET_LEDGERS, e
585 );
586 let sce_parse_error = StellarClientError::response_parse_error(
587 message,
588 Some(e.into()),
589 None,
590 );
591 anyhow::anyhow!(sce_parse_error)
592 .context("Failed to parse ledger response")
593 })?;
594
595 if ledgers.is_empty() {
596 break;
597 }
598
599 for ledger in ledgers {
600 if (ledger.sequence as u64) > target_block {
601 return Ok(blocks);
602 }
603 blocks.push(BlockType::Stellar(Box::new(ledger)));
604 }
605
606 current_iteration += 1;
608 cursor = response_body["result"]["cursor"]
609 .as_str()
610 .map(|s| s.to_string());
611
612 if cursor == Some(start_block.to_string()) {
614 break;
615 }
616
617 if cursor.is_none() {
618 break;
619 }
620 }
621 Err(transport_err) => {
622 let ledger_info =
624 format!("start_block: {}, end_block: {:?}", start_block, end_block);
625
626 return Err(anyhow::anyhow!(transport_err)).context(format!(
627 "Failed to {} from Stellar RPC for ledger: {}",
628 RPC_METHOD_GET_LEDGERS, ledger_info,
629 ));
630 }
631 }
632 }
633 Ok(blocks)
634 }
635
636 #[instrument(skip(self), fields(contract_id))]
644 async fn get_contract_spec(&self, contract_id: &str) -> Result<ContractSpec, anyhow::Error> {
645 let contract_instance_ledger_key = get_contract_instance_ledger_key(contract_id)
647 .map_err(|e| anyhow::anyhow!("Failed to get contract instance ledger key: {}", e))?;
648
649 let contract_instance_ledger_key_xdr = contract_instance_ledger_key
650 .to_xdr_base64(Limits::none())
651 .map_err(|e| {
652 anyhow::anyhow!(
653 "Failed to convert contract instance ledger key to XDR: {}",
654 e
655 )
656 })?;
657
658 let params = json!({
659 "keys": [contract_instance_ledger_key_xdr],
660 "xdrFormat": "base64"
661 });
662
663 let response = self
664 .http_client
665 .send_raw_request(RPC_METHOD_GET_LEDGER_ENTRIES, Some(params))
666 .await
667 .with_context(|| format!("Failed to get contract wasm code for {}", contract_id))?;
668
669 let contract_data_xdr_base64 = match response["result"]["entries"][0]["xdr"].as_str() {
670 Some(xdr) => xdr,
671 None => {
672 return Err(anyhow::anyhow!("Failed to get contract data XDR"));
673 }
674 };
675
676 let wasm_hash = get_wasm_hash_from_ledger_entry_data(contract_data_xdr_base64)
677 .map_err(|e| anyhow::anyhow!("Failed to get wasm hash: {}", e))?;
678
679 let contract_code_ledger_key = get_contract_code_ledger_key(wasm_hash.as_str())
680 .map_err(|e| anyhow::anyhow!("Failed to get contract code ledger key: {}", e))?;
681
682 let contract_code_ledger_key_xdr = contract_code_ledger_key
683 .to_xdr_base64(Limits::none())
684 .map_err(|e| {
685 anyhow::anyhow!("Failed to convert contract code ledger key to XDR: {}", e)
686 })?;
687
688 let params = json!({
689 "keys": [contract_code_ledger_key_xdr],
690 "xdrFormat": "base64"
691 });
692
693 let response = self
694 .http_client
695 .send_raw_request(RPC_METHOD_GET_LEDGER_ENTRIES, Some(params))
696 .await
697 .with_context(|| format!("Failed to get contract wasm code for {}", contract_id))?;
698
699 let contract_code_xdr_base64 = match response["result"]["entries"][0]["xdr"].as_str() {
700 Some(xdr) => xdr,
701 None => {
702 return Err(anyhow::anyhow!("Failed to get contract code XDR"));
703 }
704 };
705
706 let wasm_code = get_wasm_code_from_ledger_entry_data(contract_code_xdr_base64)
707 .map_err(|e| anyhow::anyhow!("Failed to get wasm code: {}", e))?;
708
709 let contract_spec = get_contract_spec(wasm_code.as_str())
710 .map_err(|e| anyhow::anyhow!("Failed to get contract spec: {}", e))?;
711
712 Ok(ContractSpec::Stellar(StellarContractSpec::from(
713 contract_spec,
714 )))
715 }
716}