openzeppelin_monitor/services/blockchain/transports/
http.rs

1//! HTTP transport implementation for blockchain interactions.
2//!
3//! This module provides a generic HTTP client implementation for interacting with blockchain nodes
4//! via JSON-RPC, supporting:
5//! - Multiple RPC endpoints with automatic failover
6//! - Configurable retry policies
7//! - Authentication via bearer tokens
8//! - Connection health checks
9//! - Endpoint rotation for high availability
10
11use anyhow::Context;
12use async_trait::async_trait;
13use reqwest_middleware::ClientWithMiddleware;
14use serde::Serialize;
15use serde_json::{json, Value};
16use std::{sync::Arc, time::Duration};
17use url::Url;
18
19use crate::{
20	models::Network,
21	services::blockchain::transports::{
22		BlockchainTransport, EndpointManager, RotatingTransport, TransientErrorRetryStrategy,
23		TransportError,
24	},
25	utils::http::{create_retryable_http_client, HttpRetryConfig},
26};
27
28/// Basic HTTP transport client for blockchain interactions
29///
30/// This client provides a foundation for making JSON-RPC requests to blockchain nodes
31/// with built-in support for:
32/// - Connection pooling and reuse
33/// - Automatic endpoint rotation on failure
34/// - Configurable retry policies
35///
36/// The client is thread-safe and can be shared across multiple tasks.
37#[derive(Clone, Debug)]
38pub struct HttpTransportClient {
39	/// Retryable HTTP client for making requests
40	pub client: ClientWithMiddleware,
41	/// Manages RPC endpoint rotation and request handling for high availability
42	endpoint_manager: EndpointManager,
43	/// The stringified JSON RPC payload to use for testing the connection
44	test_connection_payload: Option<String>,
45}
46
47impl HttpTransportClient {
48	/// Creates a new HTTP transport client with automatic endpoint management
49	///
50	/// This constructor attempts to connect to available endpoints in order of their
51	/// weight until a successful connection is established. It configures default
52	/// timeout and retry policies suitable for blockchain interactions.
53	///
54	/// # Arguments
55	/// * `network` - Network configuration containing RPC URLs, weights, and other details
56	/// * `test_connection_payload` - Optional JSON RPC payload to test the connection (default is net_version)
57	///
58	/// # Returns
59	/// * `Result<Self, anyhow::Error>` - New client instance or connection error
60	pub async fn new(
61		network: &Network,
62		test_connection_payload: Option<String>,
63	) -> Result<Self, anyhow::Error> {
64		let mut rpc_urls: Vec<_> = network
65			.rpc_urls
66			.iter()
67			.filter(|rpc_url| rpc_url.type_ == "rpc" && rpc_url.weight > 0)
68			.collect();
69
70		rpc_urls.sort_by(|a, b| b.weight.cmp(&a.weight));
71
72		// Create a retry policy with default settings
73		// Shared config for endpoint manager and test connection
74		let http_retry_config = HttpRetryConfig::default();
75
76		// Create the base HTTP client
77		let base_http_client = Arc::new(
78			reqwest::ClientBuilder::new()
79				.pool_idle_timeout(Duration::from_secs(90))
80				.pool_max_idle_per_host(32)
81				.timeout(Duration::from_secs(30))
82				.connect_timeout(Duration::from_secs(20))
83				.build()
84				.context("Failed to create base HTTP client")?,
85		);
86
87		// Create a retryable HTTP client with the base client and retry policy
88		// Shared across:
89		// - EndpointManager for handling endpoint rotation
90		// - Connection testing for verifying endpoint availability
91		let retryable_client = create_retryable_http_client(
92			&http_retry_config,
93			(*base_http_client).clone(),
94			Some(TransientErrorRetryStrategy),
95		);
96
97		for rpc_url in rpc_urls.iter() {
98			let url = match Url::parse(rpc_url.url.as_ref()) {
99				Ok(url) => url,
100				Err(_) => continue,
101			};
102
103			let test_request = if let Some(test_payload) = &test_connection_payload {
104				serde_json::from_str(test_payload)
105					.context("Failed to parse test payload as JSON")?
106			} else {
107				json!({
108					"jsonrpc": "2.0",
109					"id": 1,
110					"method": "net_version",
111					"params": []
112				})
113			};
114
115			// Attempt to connect to the endpoint
116			let request_result = retryable_client
117				.post(url.clone())
118				.json(&test_request)
119				.send()
120				.await;
121
122			match request_result {
123				Ok(response) => {
124					// Check if the response indicates an error status (4xx or 5xx)
125					if !response.status().is_success() {
126						// Skip this URL if we got an error status
127						continue;
128					}
129
130					// Create list of fallback URLs (all URLs except the current one)
131					let fallback_urls: Vec<String> = rpc_urls
132						.iter()
133						.filter(|url| url.url != rpc_url.url)
134						.map(|url| url.url.as_ref().to_string())
135						.collect();
136
137					// Successfully connected - create and return the client
138					return Ok(Self {
139						client: retryable_client.clone(),
140						endpoint_manager: EndpointManager::new(
141							retryable_client,
142							rpc_url.url.as_ref(),
143							fallback_urls,
144						),
145						test_connection_payload,
146					});
147				}
148				Err(_) => {
149					// Connection failed - try next URL
150					continue;
151				}
152			}
153		}
154
155		Err(anyhow::anyhow!("All RPC URLs failed to connect"))
156	}
157}
158
159#[async_trait]
160impl BlockchainTransport for HttpTransportClient {
161	/// Retrieves the currently active RPC endpoint URL
162	///
163	/// This method is useful for monitoring which endpoint is currently in use,
164	/// especially in scenarios with multiple failover URLs.
165	///
166	/// # Returns
167	/// * `String` - The URL of the currently active endpoint
168	async fn get_current_url(&self) -> String {
169		self.endpoint_manager.active_url.read().await.clone()
170	}
171
172	/// Sends a JSON-RPC request to the blockchain node
173	///
174	/// This method handles the formatting of the JSON-RPC request, including:
175	/// - Adding required JSON-RPC 2.0 fields
176	/// - Generating unique request IDs
177	/// - Converting parameters to the correct format
178	/// - Handling authentication
179	///
180	/// # Arguments
181	/// * `method` - The JSON-RPC method name to call
182	/// * `params` - Optional parameters for the method call
183	///
184	/// # Returns
185	/// * `Result<Value, TransportError>` - JSON response or error with context
186	///
187	/// # Type Parameters
188	/// * `P` - Parameter type that can be serialized to JSON
189	async fn send_raw_request<P>(
190		&self,
191		method: &str,
192		params: Option<P>,
193	) -> Result<Value, TransportError>
194	where
195		P: Into<Value> + Send + Clone + Serialize,
196	{
197		let response = self
198			.endpoint_manager
199			.send_raw_request(self, method, params)
200			.await?;
201
202		Ok(response)
203	}
204
205	/// Update endpoint manager with a new client
206	///
207	/// # Arguments
208	/// * `client` - The new client to use for the endpoint manager
209	fn update_endpoint_manager_client(
210		&mut self,
211		client: ClientWithMiddleware,
212	) -> Result<(), anyhow::Error> {
213		self.endpoint_manager.update_client(client);
214		Ok(())
215	}
216}
217
218#[async_trait]
219impl RotatingTransport for HttpTransportClient {
220	/// Tests connectivity to a specific RPC endpoint
221	///
222	/// Performs a basic JSON-RPC request to verify the endpoint is responsive
223	/// and correctly handling requests.
224	///
225	/// # Arguments
226	/// * `url` - The URL to test
227	///
228	/// # Returns
229	/// * `Result<(), anyhow::Error>` - Success or detailed error message
230	async fn try_connect(&self, url: &str) -> Result<(), anyhow::Error> {
231		let url = Url::parse(url).map_err(|_| anyhow::anyhow!("Invalid URL: {}", url))?;
232
233		let test_request = if let Some(test_payload) = &self.test_connection_payload {
234			serde_json::from_str(test_payload).context("Failed to parse test payload as JSON")?
235		} else {
236			json!({
237				"jsonrpc": "2.0",
238				"id": 1,
239				"method": "net_version",
240				"params": []
241			})
242		};
243
244		let request = self.client.post(url.clone()).json(&test_request);
245
246		match request.send().await {
247			Ok(response) => {
248				let status = response.status();
249				if !status.is_success() {
250					Err(anyhow::anyhow!(
251						"Failed to connect to {}: {}",
252						url,
253						status.as_u16()
254					))
255				} else {
256					Ok(())
257				}
258			}
259			Err(e) => Err(anyhow::anyhow!("Failed to connect to {}: {}", url, e)),
260		}
261	}
262
263	/// Updates the active endpoint URL
264	///
265	/// This method is called when rotating to a new endpoint, typically
266	/// after a failure of the current endpoint.
267	///
268	/// # Arguments
269	/// * `url` - The new URL to use for subsequent requests
270	///
271	/// # Returns
272	/// * `Result<(), anyhow::Error>` - Success or error status
273	async fn update_client(&self, url: &str) -> Result<(), anyhow::Error> {
274		let parsed_url = Url::parse(url).map_err(|_| anyhow::anyhow!("Invalid URL: {}", url))?;
275		// Normalize the URL by trimming trailing slash if present
276		let normalized_url = parsed_url.as_str().trim_end_matches('/');
277
278		// For HTTP client, we don't need to update the client itself
279		// We just need to update the endpoint manager's active URL
280		let mut active_url = self.endpoint_manager.active_url.write().await;
281		*active_url = normalized_url.to_string();
282		Ok(())
283	}
284}