openzeppelin_monitor/utils/metrics/
server.rs1use actix_web::middleware::{Compress, DefaultHeaders, NormalizePath};
6use actix_web::{web, App, HttpResponse, HttpServer, Responder};
7use std::sync::Arc;
8use tokio::sync::Mutex;
9use tracing::{error, info};
10
11use crate::{
12 repositories::{
13 MonitorRepository, MonitorService, NetworkRepository, NetworkService, TriggerRepository,
14 TriggerService,
15 },
16 utils::metrics::{gather_metrics, update_monitoring_metrics, update_system_metrics},
17};
18
19pub type MonitorServiceData = web::Data<
22 Arc<
23 Mutex<
24 MonitorService<
25 MonitorRepository<NetworkRepository, TriggerRepository>,
26 NetworkRepository,
27 TriggerRepository,
28 >,
29 >,
30 >,
31>;
32
33pub type NetworkServiceData = web::Data<Arc<Mutex<NetworkService<NetworkRepository>>>>;
35
36pub type TriggerServiceData = web::Data<Arc<Mutex<TriggerService<TriggerRepository>>>>;
38
39pub type MonitorServiceArc = Arc<
41 Mutex<
42 MonitorService<
43 MonitorRepository<NetworkRepository, TriggerRepository>,
44 NetworkRepository,
45 TriggerRepository,
46 >,
47 >,
48>;
49
50pub type NetworkServiceArc = Arc<Mutex<NetworkService<NetworkRepository>>>;
52
53pub type TriggerServiceArc = Arc<Mutex<TriggerService<TriggerRepository>>>;
55
56async fn metrics_handler(
58 monitor_service: MonitorServiceData,
59 network_service: NetworkServiceData,
60 trigger_service: TriggerServiceData,
61) -> impl Responder {
62 update_system_metrics();
64
65 {
67 let monitors = monitor_service.lock().await.get_all();
68 let networks = network_service.lock().await.get_all();
69 let triggers = trigger_service.lock().await.get_all();
70
71 update_monitoring_metrics(&monitors, &triggers, &networks);
72 }
73
74 match gather_metrics() {
76 Ok(buffer) => HttpResponse::Ok()
77 .content_type("text/plain; version=0.0.4; charset=utf-8")
78 .body(buffer),
79 Err(e) => {
80 error!("Error gathering metrics: {}", e);
81 HttpResponse::InternalServerError().finish()
82 }
83 }
84}
85
86pub fn create_metrics_server(
88 bind_address: String,
89 monitor_service: MonitorServiceArc,
90 network_service: NetworkServiceArc,
91 trigger_service: TriggerServiceArc,
92) -> std::io::Result<actix_web::dev::Server> {
93 let actual_bind_address = if std::env::var("IN_DOCKER").unwrap_or_default() == "true" {
94 if let Some(port) = bind_address.split(':').nth(1) {
95 format!("0.0.0.0:{}", port)
96 } else {
97 "0.0.0.0:8081".to_string()
98 }
99 } else {
100 bind_address.clone()
101 };
102
103 info!(
104 "Starting metrics server on {} (actual bind: {})",
105 bind_address, actual_bind_address
106 );
107
108 Ok(HttpServer::new(move || {
109 App::new()
110 .wrap(Compress::default())
111 .wrap(NormalizePath::trim())
112 .wrap(DefaultHeaders::new())
113 .app_data(web::Data::new(monitor_service.clone()))
114 .app_data(web::Data::new(network_service.clone()))
115 .app_data(web::Data::new(trigger_service.clone()))
116 .route("/metrics", web::get().to(metrics_handler))
117 })
118 .workers(2)
119 .bind(actual_bind_address)?
120 .shutdown_timeout(5)
121 .run())
122}
123
124#[cfg(test)]
125mod tests {
126 use super::*;
127 use crate::{
128 models::{BlockChainType, Monitor, Network, Trigger},
129 repositories::{
130 MonitorService, NetworkRepository, NetworkService, TriggerRepository, TriggerService,
131 },
132 utils::tests::{
133 evm::monitor::MonitorBuilder, network::NetworkBuilder, trigger::TriggerBuilder,
134 },
135 };
136 use actix_web::{test, App};
137 use std::{fs, path::PathBuf};
138 use tempfile::TempDir;
139 use tokio::net::TcpListener;
140
141 fn create_test_monitor(
142 name: &str,
143 networks: Vec<&str>,
144 paused: bool,
145 triggers: Vec<&str>,
146 ) -> Monitor {
147 MonitorBuilder::new()
148 .name(name)
149 .networks(networks.into_iter().map(|s| s.to_string()).collect())
150 .paused(paused)
151 .triggers(triggers.into_iter().map(|s| s.to_string()).collect())
152 .build()
153 }
154
155 fn create_test_trigger(name: &str) -> Trigger {
156 TriggerBuilder::new()
157 .name(name)
158 .slack("https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX") .message("Test Title", "Test Body")
160 .build()
161 }
162
163 pub fn create_test_network(name: &str, slug: &str, network_type: BlockChainType) -> Network {
164 NetworkBuilder::new()
165 .name(name)
166 .slug(slug)
167 .network_type(network_type)
168 .chain_id(1)
169 .rpc_url("http://localhost:8545")
170 .block_time_ms(1000)
171 .confirmation_blocks(1)
172 .cron_schedule("*/5 * * * * *")
173 .store_blocks(false)
174 .build()
175 }
176
177 fn create_mock_configs() -> (PathBuf, PathBuf, PathBuf, TempDir) {
178 let temp_dir = TempDir::new().expect("Failed to create temporary directory");
180 let config_path = temp_dir.path().join("config");
181 let monitor_dir = config_path.join("monitors");
182 let trigger_dir = config_path.join("triggers");
183 let network_dir = config_path.join("networks");
184
185 fs::create_dir_all(&monitor_dir).expect("Failed to create monitor directory");
187 fs::create_dir_all(&trigger_dir).expect("Failed to create trigger directory");
188 fs::create_dir_all(&network_dir).expect("Failed to create network directory");
189
190 let monitor_path = monitor_dir.join("test_monitor.json");
191 let trigger_path = trigger_dir.join("test_trigger.json");
192 let network_path = network_dir.join("test_network.json");
193
194 fs::write(
195 &monitor_path,
196 serde_json::to_string(&create_test_monitor(
197 "test_monitor",
198 vec!["ethereum_mainnet"],
199 false,
200 vec!["test_trigger"],
201 ))
202 .unwrap(),
203 )
204 .expect("Failed to create mock monitor");
205
206 fs::write(
207 &trigger_path,
208 serde_json::to_string(&create_test_trigger("test_trigger")).unwrap(),
209 )
210 .expect("Failed to create mock trigger");
211
212 fs::write(
213 &network_path,
214 serde_json::to_string(&create_test_network(
215 "Ethereum Mainnet",
216 "ethereum_mainnet",
217 BlockChainType::EVM,
218 ))
219 .unwrap(),
220 )
221 .expect("Failed to create mock network");
222
223 (monitor_dir, trigger_dir, network_dir, temp_dir)
225 }
226
227 async fn create_test_services() -> (
228 MonitorServiceArc,
229 NetworkServiceArc,
230 TriggerServiceArc,
231 TempDir,
232 ) {
233 let (monitor_path, trigger_path, network_path, temp_dir) = create_mock_configs();
234 let network_service =
235 NetworkService::<NetworkRepository>::new(Some(network_path.parent().unwrap()))
236 .await
237 .unwrap();
238 let trigger_service =
239 TriggerService::<TriggerRepository>::new(Some(trigger_path.parent().unwrap()))
240 .await
241 .unwrap();
242 let monitor_service = MonitorService::new(
243 Some(monitor_path.parent().unwrap()),
244 Some(network_service.clone()),
245 Some(trigger_service.clone()),
246 )
247 .await
248 .unwrap();
249
250 (
251 Arc::new(Mutex::new(monitor_service)),
252 Arc::new(Mutex::new(network_service)),
253 Arc::new(Mutex::new(trigger_service)),
254 temp_dir,
255 )
256 }
257
258 #[actix_web::test]
259 async fn test_metrics_handler() {
260 let (monitor_service, network_service, trigger_service, _temp_dir) =
262 create_test_services().await;
263
264 let app = test::init_service(
266 App::new()
267 .app_data(web::Data::new(monitor_service.clone()))
268 .app_data(web::Data::new(network_service.clone()))
269 .app_data(web::Data::new(trigger_service.clone()))
270 .route("/metrics", web::get().to(metrics_handler)),
271 )
272 .await;
273
274 let req = test::TestRequest::get().uri("/metrics").to_request();
276
277 let resp = test::call_service(&app, req).await;
279
280 assert!(resp.status().is_success());
282
283 let content_type = resp
285 .headers()
286 .get("content-type")
287 .unwrap()
288 .to_str()
289 .unwrap();
290 assert_eq!(content_type, "text/plain; version=0.0.4; charset=utf-8");
291
292 let body = test::read_body(resp).await;
294 let body_str = String::from_utf8(body.to_vec()).unwrap();
295
296 assert!(body_str.contains("# HELP"));
298 }
299
300 #[tokio::test]
301 async fn test_create_metrics_server() {
302 let (monitor_service, network_service, trigger_service, _temp_dir) =
304 create_test_services().await;
305
306 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
308 let port = listener.local_addr().unwrap().port();
309 drop(listener);
310
311 let bind_address = format!("127.0.0.1:{}", port);
312
313 let server = create_metrics_server(
315 bind_address.clone(),
316 monitor_service,
317 network_service,
318 trigger_service,
319 );
320
321 assert!(server.is_ok());
323
324 let server_handle = server.unwrap();
326 let server_task = tokio::spawn(async move {
327 let result = server_handle.await;
329 assert!(result.is_ok(), "Server should shut down gracefully");
330 });
331
332 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
334
335 let client = reqwest::Client::new();
337 let response = client
338 .get(format!("http://{}/metrics", bind_address))
339 .timeout(std::time::Duration::from_secs(1))
340 .send()
341 .await;
342
343 assert!(response.is_ok(), "Server should respond to requests");
345 let response = response.unwrap();
346 assert!(
347 response.status().is_success(),
348 "Server should return 200 OK"
349 );
350
351 server_task.abort();
353 }
354
355 #[tokio::test]
356 async fn test_docker_bind_address_handling() {
357 let original_docker_env = std::env::var("IN_DOCKER").ok();
359
360 std::env::set_var("IN_DOCKER", "true");
362
363 let bind_address = "localhost:8081".to_string();
366 let actual_bind_address = if std::env::var("IN_DOCKER").unwrap_or_default() == "true" {
367 if let Some(port) = bind_address.split(':').nth(1) {
368 format!("0.0.0.0:{}", port)
369 } else {
370 "0.0.0.0:8081".to_string()
371 }
372 } else {
373 bind_address.clone()
374 };
375
376 assert_eq!(actual_bind_address, "0.0.0.0:8081");
378
379 let bind_address = "localhost".to_string();
381 let actual_bind_address = if std::env::var("IN_DOCKER").unwrap_or_default() == "true" {
382 if let Some(port) = bind_address.split(':').nth(1) {
383 format!("0.0.0.0:{}", port)
384 } else {
385 "0.0.0.0:8081".to_string()
386 }
387 } else {
388 bind_address.clone()
389 };
390
391 assert_eq!(actual_bind_address, "0.0.0.0:8081");
393
394 match original_docker_env {
396 Some(val) => std::env::set_var("IN_DOCKER", val),
397 None => std::env::remove_var("IN_DOCKER"),
398 }
399 }
400}