openzeppelin_monitor/services/blockchain/transports/
http.rs1use anyhow::Context;
12use async_trait::async_trait;
13use reqwest_middleware::ClientWithMiddleware;
14use serde::Serialize;
15use serde_json::{json, Value};
16use std::{sync::Arc, time::Duration};
17use url::Url;
18
19use crate::{
20 models::Network,
21 services::blockchain::transports::{
22 BlockchainTransport, EndpointManager, RotatingTransport, TransientErrorRetryStrategy,
23 TransportError,
24 },
25 utils::http::{create_retryable_http_client, HttpRetryConfig},
26};
27
28#[derive(Clone, Debug)]
38pub struct HttpTransportClient {
39 pub client: ClientWithMiddleware,
41 endpoint_manager: EndpointManager,
43 test_connection_payload: Option<String>,
45}
46
47impl HttpTransportClient {
48 pub async fn new(
61 network: &Network,
62 test_connection_payload: Option<String>,
63 ) -> Result<Self, anyhow::Error> {
64 let mut rpc_urls: Vec<_> = network
65 .rpc_urls
66 .iter()
67 .filter(|rpc_url| rpc_url.type_ == "rpc" && rpc_url.weight > 0)
68 .collect();
69
70 rpc_urls.sort_by(|a, b| b.weight.cmp(&a.weight));
71
72 let http_retry_config = HttpRetryConfig::default();
75
76 let base_http_client = Arc::new(
78 reqwest::ClientBuilder::new()
79 .pool_idle_timeout(Duration::from_secs(90))
80 .pool_max_idle_per_host(32)
81 .timeout(Duration::from_secs(30))
82 .connect_timeout(Duration::from_secs(20))
83 .build()
84 .context("Failed to create base HTTP client")?,
85 );
86
87 let retryable_client = create_retryable_http_client(
92 &http_retry_config,
93 (*base_http_client).clone(),
94 Some(TransientErrorRetryStrategy),
95 );
96
97 for rpc_url in rpc_urls.iter() {
98 let url = match Url::parse(rpc_url.url.as_ref()) {
99 Ok(url) => url,
100 Err(_) => continue,
101 };
102
103 let test_request = if let Some(test_payload) = &test_connection_payload {
104 serde_json::from_str(test_payload)
105 .context("Failed to parse test payload as JSON")?
106 } else {
107 json!({
108 "jsonrpc": "2.0",
109 "id": 1,
110 "method": "net_version",
111 "params": []
112 })
113 };
114
115 let request_result = retryable_client
117 .post(url.clone())
118 .json(&test_request)
119 .send()
120 .await;
121
122 match request_result {
123 Ok(response) => {
124 if !response.status().is_success() {
126 continue;
128 }
129
130 let fallback_urls: Vec<String> = rpc_urls
132 .iter()
133 .filter(|url| url.url != rpc_url.url)
134 .map(|url| url.url.as_ref().to_string())
135 .collect();
136
137 return Ok(Self {
139 client: retryable_client.clone(),
140 endpoint_manager: EndpointManager::new(
141 retryable_client,
142 rpc_url.url.as_ref(),
143 fallback_urls,
144 ),
145 test_connection_payload,
146 });
147 }
148 Err(_) => {
149 continue;
151 }
152 }
153 }
154
155 Err(anyhow::anyhow!("All RPC URLs failed to connect"))
156 }
157}
158
159#[async_trait]
160impl BlockchainTransport for HttpTransportClient {
161 async fn get_current_url(&self) -> String {
169 self.endpoint_manager.active_url.read().await.clone()
170 }
171
172 async fn send_raw_request<P>(
190 &self,
191 method: &str,
192 params: Option<P>,
193 ) -> Result<Value, TransportError>
194 where
195 P: Into<Value> + Send + Clone + Serialize,
196 {
197 let response = self
198 .endpoint_manager
199 .send_raw_request(self, method, params)
200 .await?;
201
202 Ok(response)
203 }
204
205 fn update_endpoint_manager_client(
210 &mut self,
211 client: ClientWithMiddleware,
212 ) -> Result<(), anyhow::Error> {
213 self.endpoint_manager.update_client(client);
214 Ok(())
215 }
216}
217
218#[async_trait]
219impl RotatingTransport for HttpTransportClient {
220 async fn try_connect(&self, url: &str) -> Result<(), anyhow::Error> {
231 let url = Url::parse(url).map_err(|_| anyhow::anyhow!("Invalid URL: {}", url))?;
232
233 let test_request = if let Some(test_payload) = &self.test_connection_payload {
234 serde_json::from_str(test_payload).context("Failed to parse test payload as JSON")?
235 } else {
236 json!({
237 "jsonrpc": "2.0",
238 "id": 1,
239 "method": "net_version",
240 "params": []
241 })
242 };
243
244 let request = self.client.post(url.clone()).json(&test_request);
245
246 match request.send().await {
247 Ok(response) => {
248 let status = response.status();
249 if !status.is_success() {
250 Err(anyhow::anyhow!(
251 "Failed to connect to {}: {}",
252 url,
253 status.as_u16()
254 ))
255 } else {
256 Ok(())
257 }
258 }
259 Err(e) => Err(anyhow::anyhow!("Failed to connect to {}: {}", url, e)),
260 }
261 }
262
263 async fn update_client(&self, url: &str) -> Result<(), anyhow::Error> {
274 let parsed_url = Url::parse(url).map_err(|_| anyhow::anyhow!("Invalid URL: {}", url))?;
275 let normalized_url = parsed_url.as_str().trim_end_matches('/');
277
278 let mut active_url = self.endpoint_manager.active_url.write().await;
281 *active_url = normalized_url.to_string();
282 Ok(())
283 }
284}