openzeppelin_monitor/services/blockchain/transports/
endpoint_manager.rs1use reqwest_middleware::ClientWithMiddleware;
6use serde::Serialize;
7use serde_json::Value;
8use std::sync::Arc;
9use tokio::sync::RwLock;
10
11use crate::services::blockchain::transports::{
12 RotatingTransport, TransportError, ROTATE_ON_ERROR_CODES,
13};
14
15#[derive(Clone, Debug)]
26pub struct EndpointManager {
27 pub active_url: Arc<RwLock<String>>,
28 pub fallback_urls: Arc<RwLock<Vec<String>>>,
29 client: ClientWithMiddleware,
30 rotation_lock: Arc<tokio::sync::Mutex<()>>,
31}
32
33#[derive(Debug)]
37enum SingleRequestAttemptOutcome {
38 Success(reqwest::Response),
40 NetworkError(reqwest_middleware::Error),
42 SerializationError(TransportError),
44}
45
46impl EndpointManager {
47 pub fn new(client: ClientWithMiddleware, active_url: &str, fallback_urls: Vec<String>) -> Self {
56 Self {
57 active_url: Arc::new(RwLock::new(active_url.to_string())),
58 fallback_urls: Arc::new(RwLock::new(fallback_urls)),
59 rotation_lock: Arc::new(tokio::sync::Mutex::new(())),
60 client,
61 }
62 }
63
64 pub fn update_client(&mut self, client: ClientWithMiddleware) {
71 self.client = client;
72 }
73
74 pub async fn try_rotate_url<T: RotatingTransport>(
82 &self,
83 transport: &T,
84 ) -> Result<String, TransportError> {
85 let _guard = self.rotation_lock.lock().await;
87 let initial_active_url = self.active_url.read().await.clone();
88 let current_fallbacks_snapshot = self.fallback_urls.read().await.clone();
89
90 tracing::debug!(
91 "Trying to rotate URL: Current Active: '{}', Fallbacks: {:?}",
92 initial_active_url,
93 current_fallbacks_snapshot,
94 );
95
96 let new_url = match current_fallbacks_snapshot
98 .iter()
99 .find(|&url| *url != initial_active_url)
100 {
101 Some(url) => url.clone(),
102 None => {
103 let msg = format!(
104 "No fallback URLs available. Current active: '{}', Fallbacks checked: {:?}",
105 initial_active_url, current_fallbacks_snapshot
106 );
107 return Err(TransportError::url_rotation(msg, None, None));
108 }
109 };
110
111 tracing::debug!(
113 "Attempting try_connect to new_url during rotation: '{}'",
114 new_url
115 );
116
117 transport
118 .try_connect(&new_url)
119 .await
120 .map_err(|connect_err| {
121 TransportError::url_rotation(
122 format!("Failed to connect to new URL '{}'", new_url),
123 Some(connect_err.into()),
124 None,
125 )
126 })?;
127
128 tracing::debug!(
129 "Attempting update_client with new_url during rotation: '{}'",
130 new_url
131 );
132
133 transport
134 .update_client(&new_url)
135 .await
136 .map_err(|update_err| {
137 TransportError::url_rotation(
138 format!(
139 "Failed to update transport client with new URL '{}'",
140 new_url
141 ),
142 Some(update_err.into()),
143 None,
144 )
145 })?;
146
147 {
149 let mut active_url_guard = self.active_url.write().await;
150 let mut fallback_urls_guard = self.fallback_urls.write().await;
151
152 let mut next_fallback_urls: Vec<String> = Vec::with_capacity(fallback_urls_guard.len());
155 for url in fallback_urls_guard.iter() {
156 if *url != new_url {
157 next_fallback_urls.push(url.clone());
158 }
159 }
160 next_fallback_urls.push(initial_active_url.clone()); tracing::debug!(
163 "Successful URL rotation - from: '{}', to: '{}'. New Fallbacks: {:?}",
164 initial_active_url,
165 new_url,
166 next_fallback_urls
167 );
168
169 *fallback_urls_guard = next_fallback_urls;
170 *active_url_guard = new_url.clone();
171 }
172 Ok(new_url)
173 }
174
175 async fn try_request_on_url<P>(
185 &self,
186 url: &str,
187 transport: &impl RotatingTransport,
188 method: &str,
189 params: Option<P>,
190 ) -> SingleRequestAttemptOutcome
191 where
192 P: Into<Value> + Send + Clone + Serialize,
193 {
194 let request_body = transport.customize_request(method, params).await;
196
197 let request_body_str = match serde_json::to_string(&request_body) {
199 Ok(body) => body,
200 Err(e) => {
201 tracing::error!("Failed to serialize request body: {}", e);
202 return SingleRequestAttemptOutcome::SerializationError(
203 TransportError::request_serialization(
204 "Failed to serialize request JSON",
205 Some(Box::new(e)),
206 None,
207 ),
208 );
209 }
210 };
211
212 let response_result = self
214 .client
215 .post(url)
216 .header("Content-Type", "application/json")
217 .body(request_body_str)
218 .send()
219 .await;
220
221 match response_result {
223 Ok(response) => SingleRequestAttemptOutcome::Success(response),
224 Err(network_error) => {
225 tracing::warn!("Network error while sending request: {}", network_error);
226 SingleRequestAttemptOutcome::NetworkError(network_error)
227 }
228 }
229 }
230
231 pub async fn send_raw_request<
247 T: RotatingTransport,
248 P: Into<Value> + Send + Clone + Serialize,
249 >(
250 &self,
251 transport: &T,
252 method: &str,
253 params: Option<P>,
254 ) -> Result<Value, TransportError> {
255 loop {
256 let current_url_snapshot = self.active_url.read().await.clone();
257
258 tracing::debug!(
259 "Attempting request on active URL: '{}'",
260 current_url_snapshot
261 );
262
263 let attempt_result = self
265 .try_request_on_url(¤t_url_snapshot, transport, method, params.clone())
266 .await;
267
268 match attempt_result {
269 SingleRequestAttemptOutcome::Success(response) => {
271 let status = response.status();
272 if status.is_success() {
273 return response.json().await.map_err(|e| {
275 TransportError::response_parse(
276 "Failed to parse JSON response".to_string(),
277 Some(Box::new(e)),
278 None,
279 )
280 });
281 } else {
282 let error_body = response.text().await.unwrap_or_default();
284 tracing::warn!(
285 "Request to {} failed with status {}: {}",
286 current_url_snapshot,
287 status,
288 error_body
289 );
290
291 if ROTATE_ON_ERROR_CODES.contains(&status.as_u16()) {
293 tracing::debug!(
294 "send_raw_request: HTTP status {} on '{}' triggers URL rotation attempt",
295 status, current_url_snapshot
296 );
297
298 match self.try_rotate_url(transport).await {
299 Ok(_new_url) => {
300 continue; }
302 Err(rotation_error) => {
303 return Err(TransportError::http(
305 status,
306 current_url_snapshot.clone(),
307 error_body,
308 Some(Box::new(rotation_error)),
309 None,
310 ));
311 }
312 }
313 } else {
314 tracing::warn!(
316 "HTTP error status {} on {} does not trigger rotation. Failing.",
317 status,
318 current_url_snapshot
319 );
320 return Err(TransportError::http(
321 status,
322 current_url_snapshot,
323 error_body,
324 None,
325 None,
326 ));
327 }
328 }
329 }
330 SingleRequestAttemptOutcome::NetworkError(network_error) => {
332 tracing::warn!(
333 "Network error for {}: {}",
334 current_url_snapshot,
335 network_error,
336 );
337
338 match self.try_rotate_url(transport).await {
340 Ok(new_url) => {
341 tracing::debug!("Rotation successful after network error, retrying request on new URL: '{}'", new_url);
342 continue; }
344 Err(rotation_error) => {
345 return Err(TransportError::network(
347 network_error.to_string(),
348 Some(Box::new(rotation_error)),
349 None,
350 ));
351 }
352 }
353 }
354 SingleRequestAttemptOutcome::SerializationError(serialization_error) => {
356 return Err(serialization_error);
357 }
358 }
359 }
360 }
361}