openzeppelin_monitor/utils/metrics/
server.rs

1//! Metrics server module
2//!
3//! This module provides an HTTP server to expose Prometheus metrics for scraping.
4
5use actix_web::middleware::{Compress, DefaultHeaders, NormalizePath};
6use actix_web::{web, App, HttpResponse, HttpServer, Responder};
7use std::sync::Arc;
8use tokio::sync::Mutex;
9use tracing::{error, info};
10
11use crate::{
12	repositories::{
13		MonitorRepository, MonitorService, NetworkRepository, NetworkService, TriggerRepository,
14		TriggerService,
15	},
16	utils::metrics::{gather_metrics, update_monitoring_metrics, update_system_metrics},
17};
18
19// Type aliases to simplify complex types in function signatures
20//  MonitorService
21pub type MonitorServiceData = web::Data<
22	Arc<
23		Mutex<
24			MonitorService<
25				MonitorRepository<NetworkRepository, TriggerRepository>,
26				NetworkRepository,
27				TriggerRepository,
28			>,
29		>,
30	>,
31>;
32
33// NetworkService
34pub type NetworkServiceData = web::Data<Arc<Mutex<NetworkService<NetworkRepository>>>>;
35
36// TriggerService
37pub type TriggerServiceData = web::Data<Arc<Mutex<TriggerService<TriggerRepository>>>>;
38
39// For Arc<Mutex<...>> MonitorService
40pub type MonitorServiceArc = Arc<
41	Mutex<
42		MonitorService<
43			MonitorRepository<NetworkRepository, TriggerRepository>,
44			NetworkRepository,
45			TriggerRepository,
46		>,
47	>,
48>;
49
50// For Arc<Mutex<...>> NetworkService
51pub type NetworkServiceArc = Arc<Mutex<NetworkService<NetworkRepository>>>;
52
53// For Arc<Mutex<...>> TriggerService
54pub type TriggerServiceArc = Arc<Mutex<TriggerService<TriggerRepository>>>;
55
56/// Metrics endpoint handler
57async fn metrics_handler(
58	monitor_service: MonitorServiceData,
59	network_service: NetworkServiceData,
60	trigger_service: TriggerServiceData,
61) -> impl Responder {
62	// Update system metrics
63	update_system_metrics();
64
65	// Get current state and update metrics
66	{
67		let monitors = monitor_service.lock().await.get_all();
68		let networks = network_service.lock().await.get_all();
69		let triggers = trigger_service.lock().await.get_all();
70
71		update_monitoring_metrics(&monitors, &triggers, &networks);
72	}
73
74	// Gather all metrics
75	match gather_metrics() {
76		Ok(buffer) => HttpResponse::Ok()
77			.content_type("text/plain; version=0.0.4; charset=utf-8")
78			.body(buffer),
79		Err(e) => {
80			error!("Error gathering metrics: {}", e);
81			HttpResponse::InternalServerError().finish()
82		}
83	}
84}
85
86// Create metrics server
87pub fn create_metrics_server(
88	bind_address: String,
89	monitor_service: MonitorServiceArc,
90	network_service: NetworkServiceArc,
91	trigger_service: TriggerServiceArc,
92) -> std::io::Result<actix_web::dev::Server> {
93	let actual_bind_address = if std::env::var("IN_DOCKER").unwrap_or_default() == "true" {
94		if let Some(port) = bind_address.split(':').nth(1) {
95			format!("0.0.0.0:{}", port)
96		} else {
97			"0.0.0.0:8081".to_string()
98		}
99	} else {
100		bind_address.clone()
101	};
102
103	info!(
104		"Starting metrics server on {} (actual bind: {})",
105		bind_address, actual_bind_address
106	);
107
108	Ok(HttpServer::new(move || {
109		App::new()
110			.wrap(Compress::default())
111			.wrap(NormalizePath::trim())
112			.wrap(DefaultHeaders::new())
113			.app_data(web::Data::new(monitor_service.clone()))
114			.app_data(web::Data::new(network_service.clone()))
115			.app_data(web::Data::new(trigger_service.clone()))
116			.route("/metrics", web::get().to(metrics_handler))
117	})
118	.workers(2)
119	.bind(actual_bind_address)?
120	.shutdown_timeout(5)
121	.run())
122}
123
124#[cfg(test)]
125mod tests {
126	use super::*;
127	use crate::{
128		models::{BlockChainType, Monitor, Network, Trigger},
129		repositories::{
130			MonitorService, NetworkRepository, NetworkService, TriggerRepository, TriggerService,
131		},
132		utils::tests::{
133			evm::monitor::MonitorBuilder, network::NetworkBuilder, trigger::TriggerBuilder,
134		},
135	};
136	use actix_web::{test, App};
137	use std::{fs, path::PathBuf};
138	use tempfile::TempDir;
139	use tokio::net::TcpListener;
140
141	fn create_test_monitor(
142		name: &str,
143		networks: Vec<&str>,
144		paused: bool,
145		triggers: Vec<&str>,
146	) -> Monitor {
147		MonitorBuilder::new()
148			.name(name)
149			.networks(networks.into_iter().map(|s| s.to_string()).collect())
150			.paused(paused)
151			.triggers(triggers.into_iter().map(|s| s.to_string()).collect())
152			.build()
153	}
154
155	fn create_test_trigger(name: &str) -> Trigger {
156		TriggerBuilder::new()
157			.name(name)
158			.slack("https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX") //noboost
159			.message("Test Title", "Test Body")
160			.build()
161	}
162
163	pub fn create_test_network(name: &str, slug: &str, network_type: BlockChainType) -> Network {
164		NetworkBuilder::new()
165			.name(name)
166			.slug(slug)
167			.network_type(network_type)
168			.chain_id(1)
169			.rpc_url("http://localhost:8545")
170			.block_time_ms(1000)
171			.confirmation_blocks(1)
172			.cron_schedule("*/5 * * * * *")
173			.store_blocks(false)
174			.build()
175	}
176
177	fn create_mock_configs() -> (PathBuf, PathBuf, PathBuf, TempDir) {
178		// Create a temporary directory
179		let temp_dir = TempDir::new().expect("Failed to create temporary directory");
180		let config_path = temp_dir.path().join("config");
181		let monitor_dir = config_path.join("monitors");
182		let trigger_dir = config_path.join("triggers");
183		let network_dir = config_path.join("networks");
184
185		// Create directories
186		fs::create_dir_all(&monitor_dir).expect("Failed to create monitor directory");
187		fs::create_dir_all(&trigger_dir).expect("Failed to create trigger directory");
188		fs::create_dir_all(&network_dir).expect("Failed to create network directory");
189
190		let monitor_path = monitor_dir.join("test_monitor.json");
191		let trigger_path = trigger_dir.join("test_trigger.json");
192		let network_path = network_dir.join("test_network.json");
193
194		fs::write(
195			&monitor_path,
196			serde_json::to_string(&create_test_monitor(
197				"test_monitor",
198				vec!["ethereum_mainnet"],
199				false,
200				vec!["test_trigger"],
201			))
202			.unwrap(),
203		)
204		.expect("Failed to create mock monitor");
205
206		fs::write(
207			&trigger_path,
208			serde_json::to_string(&create_test_trigger("test_trigger")).unwrap(),
209		)
210		.expect("Failed to create mock trigger");
211
212		fs::write(
213			&network_path,
214			serde_json::to_string(&create_test_network(
215				"Ethereum Mainnet",
216				"ethereum_mainnet",
217				BlockChainType::EVM,
218			))
219			.unwrap(),
220		)
221		.expect("Failed to create mock network");
222
223		// Return directory paths and temp_dir to keep it alive
224		(monitor_dir, trigger_dir, network_dir, temp_dir)
225	}
226
227	async fn create_test_services() -> (
228		MonitorServiceArc,
229		NetworkServiceArc,
230		TriggerServiceArc,
231		TempDir,
232	) {
233		let (monitor_path, trigger_path, network_path, temp_dir) = create_mock_configs();
234		let network_service =
235			NetworkService::<NetworkRepository>::new(Some(network_path.parent().unwrap()))
236				.await
237				.unwrap();
238		let trigger_service =
239			TriggerService::<TriggerRepository>::new(Some(trigger_path.parent().unwrap()))
240				.await
241				.unwrap();
242		let monitor_service = MonitorService::new(
243			Some(monitor_path.parent().unwrap()),
244			Some(network_service.clone()),
245			Some(trigger_service.clone()),
246		)
247		.await
248		.unwrap();
249
250		(
251			Arc::new(Mutex::new(monitor_service)),
252			Arc::new(Mutex::new(network_service)),
253			Arc::new(Mutex::new(trigger_service)),
254			temp_dir,
255		)
256	}
257
258	#[actix_web::test]
259	async fn test_metrics_handler() {
260		// Create test services
261		let (monitor_service, network_service, trigger_service, _temp_dir) =
262			create_test_services().await;
263
264		// Create test app
265		let app = test::init_service(
266			App::new()
267				.app_data(web::Data::new(monitor_service.clone()))
268				.app_data(web::Data::new(network_service.clone()))
269				.app_data(web::Data::new(trigger_service.clone()))
270				.route("/metrics", web::get().to(metrics_handler)),
271		)
272		.await;
273
274		// Create test request
275		let req = test::TestRequest::get().uri("/metrics").to_request();
276
277		// Execute request
278		let resp = test::call_service(&app, req).await;
279
280		// Assert response is successful
281		assert!(resp.status().is_success());
282
283		// Check content type
284		let content_type = resp
285			.headers()
286			.get("content-type")
287			.unwrap()
288			.to_str()
289			.unwrap();
290		assert_eq!(content_type, "text/plain; version=0.0.4; charset=utf-8");
291
292		// Verify response body contains expected metrics
293		let body = test::read_body(resp).await;
294		let body_str = String::from_utf8(body.to_vec()).unwrap();
295
296		// Basic check that we have some metrics content
297		assert!(body_str.contains("# HELP"));
298	}
299
300	#[tokio::test]
301	async fn test_create_metrics_server() {
302		// Create test services
303		let (monitor_service, network_service, trigger_service, _temp_dir) =
304			create_test_services().await;
305
306		// Find an available port
307		let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
308		let port = listener.local_addr().unwrap().port();
309		drop(listener);
310
311		let bind_address = format!("127.0.0.1:{}", port);
312
313		// Create server
314		let server = create_metrics_server(
315			bind_address.clone(),
316			monitor_service,
317			network_service,
318			trigger_service,
319		);
320
321		// Assert server creation is successful
322		assert!(server.is_ok());
323
324		// Start server in a separate thread so it can be dropped immediately
325		let server_handle = server.unwrap();
326		let server_task = tokio::spawn(async move {
327			// This will run until the server is stopped
328			let result = server_handle.await;
329			assert!(result.is_ok(), "Server should shut down gracefully");
330		});
331
332		// Give the server a moment to start
333		tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
334
335		// Make a request to verify the server is actually running
336		let client = reqwest::Client::new();
337		let response = client
338			.get(format!("http://{}/metrics", bind_address))
339			.timeout(std::time::Duration::from_secs(1))
340			.send()
341			.await;
342
343		// Verify we got a successful response
344		assert!(response.is_ok(), "Server should respond to requests");
345		let response = response.unwrap();
346		assert!(
347			response.status().is_success(),
348			"Server should return 200 OK"
349		);
350
351		// Gracefully shut down the server
352		server_task.abort();
353	}
354
355	#[tokio::test]
356	async fn test_docker_bind_address_handling() {
357		// Save original environment state
358		let original_docker_env = std::env::var("IN_DOCKER").ok();
359
360		// Set IN_DOCKER environment variable
361		std::env::set_var("IN_DOCKER", "true");
362
363		// Mock the HttpServer::bind function to avoid actual network binding
364		// We'll just test the address transformation logic
365		let bind_address = "localhost:8081".to_string();
366		let actual_bind_address = if std::env::var("IN_DOCKER").unwrap_or_default() == "true" {
367			if let Some(port) = bind_address.split(':').nth(1) {
368				format!("0.0.0.0:{}", port)
369			} else {
370				"0.0.0.0:8081".to_string()
371			}
372		} else {
373			bind_address.clone()
374		};
375
376		// Verify the address transformation logic
377		assert_eq!(actual_bind_address, "0.0.0.0:8081");
378
379		// Test with no port specified
380		let bind_address = "localhost".to_string();
381		let actual_bind_address = if std::env::var("IN_DOCKER").unwrap_or_default() == "true" {
382			if let Some(port) = bind_address.split(':').nth(1) {
383				format!("0.0.0.0:{}", port)
384			} else {
385				"0.0.0.0:8081".to_string()
386			}
387		} else {
388			bind_address.clone()
389		};
390
391		// Verify the address transformation logic
392		assert_eq!(actual_bind_address, "0.0.0.0:8081");
393
394		// Restore original environment
395		match original_docker_env {
396			Some(val) => std::env::set_var("IN_DOCKER", val),
397			None => std::env::remove_var("IN_DOCKER"),
398		}
399	}
400}