openzeppelin_monitor/services/blockchain/transports/
endpoint_manager.rs

1//! Manages the rotation of blockchain RPC endpoints
2//!
3//! Provides methods for rotating between multiple URLs and sending requests to the active endpoint
4//! with automatic fallback to other URLs on failure.
5use reqwest_middleware::ClientWithMiddleware;
6use serde::Serialize;
7use serde_json::Value;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11use crate::services::blockchain::transports::{
12	RotatingTransport, TransportError, ROTATE_ON_ERROR_CODES,
13};
14
15/// Manages the rotation of blockchain RPC endpoints
16///
17/// Provides methods for rotating between multiple URLs and sending requests to the active endpoint
18/// with automatic fallback to other URLs on failure.
19///
20/// # Fields
21/// * `active_url` - The current active URL
22/// * `fallback_urls` - A list of fallback URLs to rotate to
23/// * `client` - The client to use for the endpoint manager
24/// * `rotation_lock` - A lock for managing the rotation process
25#[derive(Clone, Debug)]
26pub struct EndpointManager {
27	pub active_url: Arc<RwLock<String>>,
28	pub fallback_urls: Arc<RwLock<Vec<String>>>,
29	client: ClientWithMiddleware,
30	rotation_lock: Arc<tokio::sync::Mutex<()>>,
31}
32
33/// Represents the outcome of a `EndpointManager::attempt_request_on_url` method call
34/// Used within the `EndpointManager::send_raw_request` method to handle different paths of request execution
35/// and response handling.
36#[derive(Debug)]
37enum SingleRequestAttemptOutcome {
38	/// Successfully got a response (status might still be error)
39	Success(reqwest::Response),
40	/// Error during send (e.g., connection, timeout)
41	NetworkError(reqwest_middleware::Error),
42	/// Error serializing the request body
43	SerializationError(TransportError),
44}
45
46impl EndpointManager {
47	/// Creates a new rotating URL client
48	///
49	/// # Arguments
50	/// * `client` - The client to use for the endpoint manager
51	/// * `active_url` - The initial active URL
52	/// * `fallback_urls` - A list of fallback URLs to rotate to
53	///
54	/// # Returns
55	pub fn new(client: ClientWithMiddleware, active_url: &str, fallback_urls: Vec<String>) -> Self {
56		Self {
57			active_url: Arc::new(RwLock::new(active_url.to_string())),
58			fallback_urls: Arc::new(RwLock::new(fallback_urls)),
59			rotation_lock: Arc::new(tokio::sync::Mutex::new(())),
60			client,
61		}
62	}
63
64	/// Updates the client with a new client
65	///
66	/// Useful for updating the client with a new retry policy or strategy
67	///
68	/// # Arguments
69	/// * `client` - The new client to use for the endpoint manager
70	pub fn update_client(&mut self, client: ClientWithMiddleware) {
71		self.client = client;
72	}
73
74	/// Rotates to the next available URL
75	///
76	/// # Arguments
77	/// * `transport` - The transport client implementing the RotatingTransport trait
78	///
79	/// # Returns
80	/// * `Result<String, TransportError>` - The result of the rotation attempt, containing the new active URL or an error
81	pub async fn try_rotate_url<T: RotatingTransport>(
82		&self,
83		transport: &T,
84	) -> Result<String, TransportError> {
85		// Acquire the rotation lock to prevent concurrent rotations
86		let _guard = self.rotation_lock.lock().await;
87		let initial_active_url = self.active_url.read().await.clone();
88		let current_fallbacks_snapshot = self.fallback_urls.read().await.clone();
89
90		tracing::debug!(
91			"Trying to rotate URL: Current Active: '{}', Fallbacks: {:?}",
92			initial_active_url,
93			current_fallbacks_snapshot,
94		);
95
96		// --- Select a new URL ---
97		let new_url = match current_fallbacks_snapshot
98			.iter()
99			.find(|&url| *url != initial_active_url)
100		{
101			Some(url) => url.clone(),
102			None => {
103				let msg = format!(
104					"No fallback URLs available. Current active: '{}', Fallbacks checked: {:?}",
105					initial_active_url, current_fallbacks_snapshot
106				);
107				return Err(TransportError::url_rotation(msg, None, None));
108			}
109		};
110
111		// --- Attempt to connect and update the transport client ---
112		tracing::debug!(
113			"Attempting try_connect to new_url during rotation: '{}'",
114			new_url
115		);
116
117		transport
118			.try_connect(&new_url)
119			.await
120			.map_err(|connect_err| {
121				TransportError::url_rotation(
122					format!("Failed to connect to new URL '{}'", new_url),
123					Some(connect_err.into()),
124					None,
125				)
126			})?;
127
128		tracing::debug!(
129			"Attempting update_client with new_url during rotation: '{}'",
130			new_url
131		);
132
133		transport
134			.update_client(&new_url)
135			.await
136			.map_err(|update_err| {
137				TransportError::url_rotation(
138					format!(
139						"Failed to update transport client with new URL '{}'",
140						new_url
141					),
142					Some(update_err.into()),
143					None,
144				)
145			})?;
146
147		// --- All checks passed, update shared state ---
148		{
149			let mut active_url_guard = self.active_url.write().await;
150			let mut fallback_urls_guard = self.fallback_urls.write().await;
151
152			// Construct the new fallbacks list:
153			// old fallbacks, MINUS the new_url_candidate, PLUS the initial_active_url.
154			let mut next_fallback_urls: Vec<String> = Vec::with_capacity(fallback_urls_guard.len());
155			for url in fallback_urls_guard.iter() {
156				if *url != new_url {
157					next_fallback_urls.push(url.clone());
158				}
159			}
160			next_fallback_urls.push(initial_active_url.clone()); // Add the previously active URL
161
162			tracing::debug!(
163				"Successful URL rotation - from: '{}', to: '{}'. New Fallbacks: {:?}",
164				initial_active_url,
165				new_url,
166				next_fallback_urls
167			);
168
169			*fallback_urls_guard = next_fallback_urls;
170			*active_url_guard = new_url.clone();
171		}
172		Ok(new_url)
173	}
174
175	/// Attempts to send a request to the specified URL
176	/// # Arguments
177	/// * `url` - The URL to send the request to
178	/// * `transport` - The transport client implementing the RotatingTransport trait
179	/// * `method` - The HTTP method to use for the request (e.g., "POST")
180	/// * `params` - Optional parameters for the request, serialized to JSON
181	///
182	/// # Returns
183	/// * `SingleRequestAttemptOutcome` - The outcome of the request attempt
184	async fn try_request_on_url<P>(
185		&self,
186		url: &str,
187		transport: &impl RotatingTransport,
188		method: &str,
189		params: Option<P>,
190	) -> SingleRequestAttemptOutcome
191	where
192		P: Into<Value> + Send + Clone + Serialize,
193	{
194		// Create the request body using the transport's customization method
195		let request_body = transport.customize_request(method, params).await;
196
197		// Serialize the request body to JSON
198		let request_body_str = match serde_json::to_string(&request_body) {
199			Ok(body) => body,
200			Err(e) => {
201				tracing::error!("Failed to serialize request body: {}", e);
202				return SingleRequestAttemptOutcome::SerializationError(
203					TransportError::request_serialization(
204						"Failed to serialize request JSON",
205						Some(Box::new(e)),
206						None,
207					),
208				);
209			}
210		};
211
212		// Send the request to the specified URL
213		let response_result = self
214			.client
215			.post(url)
216			.header("Content-Type", "application/json")
217			.body(request_body_str)
218			.send()
219			.await;
220
221		// Handle the response
222		match response_result {
223			Ok(response) => SingleRequestAttemptOutcome::Success(response),
224			Err(network_error) => {
225				tracing::warn!("Network error while sending request: {}", network_error);
226				SingleRequestAttemptOutcome::NetworkError(network_error)
227			}
228		}
229	}
230
231	/// Sends a raw request to the blockchain RPC endpoint with automatic URL rotation on failure
232	///
233	/// # Arguments
234	/// * `transport` - The transport client implementing the RotatingTransport trait
235	/// * `method` - The RPC method name to call
236	/// * `params` - The parameters for the RPC method call as a JSON Value
237	///
238	/// # Returns
239	/// * `Result<Value, TransportError>` - The JSON response from the RPC endpoint or an error
240	///
241	/// # Behavior
242	/// - Automatically rotates to fallback URLs if the request fails with specific status codes
243	///   (e.g., 429)
244	/// - Retries the request with the new URL after rotation
245	/// - Returns the first successful response or an error if all attempts fail
246	pub async fn send_raw_request<
247		T: RotatingTransport,
248		P: Into<Value> + Send + Clone + Serialize,
249	>(
250		&self,
251		transport: &T,
252		method: &str,
253		params: Option<P>,
254	) -> Result<Value, TransportError> {
255		loop {
256			let current_url_snapshot = self.active_url.read().await.clone();
257
258			tracing::debug!(
259				"Attempting request on active URL: '{}'",
260				current_url_snapshot
261			);
262
263			// Attempt to send the request to the current active URL
264			let attempt_result = self
265				.try_request_on_url(&current_url_snapshot, transport, method, params.clone())
266				.await;
267
268			match attempt_result {
269				// Handle successful response
270				SingleRequestAttemptOutcome::Success(response) => {
271					let status = response.status();
272					if status.is_success() {
273						// Successful response, parse JSON
274						return response.json().await.map_err(|e| {
275							TransportError::response_parse(
276								"Failed to parse JSON response".to_string(),
277								Some(Box::new(e)),
278								None,
279							)
280						});
281					} else {
282						// HTTP error
283						let error_body = response.text().await.unwrap_or_default();
284						tracing::warn!(
285							"Request to {} failed with status {}: {}",
286							current_url_snapshot,
287							status,
288							error_body
289						);
290
291						// Check if we should rotate based on status code
292						if ROTATE_ON_ERROR_CODES.contains(&status.as_u16()) {
293							tracing::debug!(
294								"send_raw_request: HTTP status {} on '{}' triggers URL rotation attempt",
295								status, current_url_snapshot
296							);
297
298							match self.try_rotate_url(transport).await {
299								Ok(_new_url) => {
300									continue; // Retry on the new active URL
301								}
302								Err(rotation_error) => {
303									// Return the original HTTP error with rotation error context
304									return Err(TransportError::http(
305										status,
306										current_url_snapshot.clone(),
307										error_body,
308										Some(Box::new(rotation_error)),
309										None,
310									));
311								}
312							}
313						} else {
314							// HTTP error that doesn't trigger rotation
315							tracing::warn!(
316								"HTTP error status {} on {} does not trigger rotation. Failing.",
317								status,
318								current_url_snapshot
319							);
320							return Err(TransportError::http(
321								status,
322								current_url_snapshot,
323								error_body,
324								None,
325								None,
326							));
327						}
328					}
329				}
330				// Handle network error, try rotation
331				SingleRequestAttemptOutcome::NetworkError(network_error) => {
332					tracing::warn!(
333						"Network error for {}: {}",
334						current_url_snapshot,
335						network_error,
336					);
337
338					// Always attempt rotation on network errors
339					match self.try_rotate_url(transport).await {
340						Ok(new_url) => {
341							tracing::debug!("Rotation successful after network error, retrying request on new URL: '{}'", new_url);
342							continue; // Retry on the new active URL
343						}
344						Err(rotation_error) => {
345							// Return network error with rotation error context
346							return Err(TransportError::network(
347								network_error.to_string(),
348								Some(Box::new(rotation_error)),
349								None,
350							));
351						}
352					}
353				}
354				// Non-retryable serialization error
355				SingleRequestAttemptOutcome::SerializationError(serialization_error) => {
356					return Err(serialization_error);
357				}
358			}
359		}
360	}
361}