1#![allow(clippy::result_large_err)]
8
9use std::{collections::HashMap, marker::PhantomData, path::Path};
10
11use async_trait::async_trait;
12
13use crate::{
14 models::{ConfigLoader, Monitor, Network, ScriptLanguage, Trigger},
15 repositories::{
16 error::RepositoryError,
17 network::{NetworkRepository, NetworkRepositoryTrait, NetworkService},
18 trigger::{TriggerRepository, TriggerRepositoryTrait, TriggerService},
19 },
20};
21
22const LANGUAGE_EXTENSIONS: &[(&ScriptLanguage, &str)] = &[
24 (&ScriptLanguage::Python, "py"),
25 (&ScriptLanguage::JavaScript, "js"),
26 (&ScriptLanguage::Bash, "sh"),
27];
28
29#[derive(Clone)]
31pub struct MonitorRepository<
32 N: NetworkRepositoryTrait + Send + 'static,
33 T: TriggerRepositoryTrait + Send + 'static,
34> {
35 pub monitors: HashMap<String, Monitor>,
37 _network_repository: PhantomData<N>,
38 _trigger_repository: PhantomData<T>,
39}
40
41impl<
42 N: NetworkRepositoryTrait + Send + Sync + 'static,
43 T: TriggerRepositoryTrait + Send + Sync + 'static,
44 > MonitorRepository<N, T>
45{
46 pub async fn new(
51 path: Option<&Path>,
52 network_service: Option<NetworkService<N>>,
53 trigger_service: Option<TriggerService<T>>,
54 ) -> Result<Self, RepositoryError> {
55 let monitors = Self::load_all(path, network_service, trigger_service).await?;
56 Ok(MonitorRepository {
57 monitors,
58 _network_repository: PhantomData,
59 _trigger_repository: PhantomData,
60 })
61 }
62
63 pub fn new_with_monitors(monitors: HashMap<String, Monitor>) -> Self {
65 MonitorRepository {
66 monitors,
67 _network_repository: PhantomData,
68 _trigger_repository: PhantomData,
69 }
70 }
71
72 pub fn validate_monitor_references(
74 monitors: &HashMap<String, Monitor>,
75 triggers: &HashMap<String, Trigger>,
76 networks: &HashMap<String, Network>,
77 ) -> Result<(), RepositoryError> {
78 let mut validation_errors = Vec::new();
79 let mut metadata = HashMap::new();
80
81 for (monitor_name, monitor) in monitors {
82 for trigger_id in &monitor.triggers {
84 if !triggers.contains_key(trigger_id) {
85 validation_errors.push(format!(
86 "Monitor '{}' references non-existent trigger '{}'",
87 monitor_name, trigger_id
88 ));
89 metadata.insert(
90 format!("monitor_{}_invalid_trigger", monitor_name),
91 trigger_id.clone(),
92 );
93 }
94 }
95
96 for network_slug in &monitor.networks {
98 if !networks.contains_key(network_slug) {
99 validation_errors.push(format!(
100 "Monitor '{}' references non-existent network '{}'",
101 monitor_name, network_slug
102 ));
103 metadata.insert(
104 format!("monitor_{}_invalid_network", monitor_name),
105 network_slug.clone(),
106 );
107 }
108 }
109
110 for condition in &monitor.trigger_conditions {
112 let script_path = Path::new(&condition.script_path);
113 if !script_path.exists() {
114 validation_errors.push(format!(
115 "Monitor '{}' has a custom filter script that does not exist: {}",
116 monitor_name, condition.script_path
117 ));
118 }
119
120 let expected_extension = match LANGUAGE_EXTENSIONS
122 .iter()
123 .find(|(lang, _)| *lang == &condition.language)
124 .map(|(_, ext)| *ext)
125 {
126 Some(ext) => ext,
127 None => {
128 validation_errors.push(format!(
129 "Monitor '{}' uses unsupported script language {:?}",
130 monitor_name, condition.language
131 ));
132 continue;
133 }
134 };
135
136 match script_path.extension().and_then(|ext| ext.to_str()) {
137 Some(ext) if ext == expected_extension => (), _ => validation_errors.push(format!(
139 "Monitor '{}' has a custom filter script with invalid extension - must be \
140 .{} for {:?} language: {}",
141 monitor_name, expected_extension, condition.language, condition.script_path
142 )),
143 }
144
145 if condition.timeout_ms == 0 {
146 validation_errors.push(format!(
147 "Monitor '{}' should have a custom filter timeout_ms greater than 0",
148 monitor_name
149 ));
150 }
151 }
152 }
153
154 if !validation_errors.is_empty() {
155 return Err(RepositoryError::validation_error(
156 format!(
157 "Configuration validation failed:\n{}",
158 validation_errors.join("\n"),
159 ),
160 None,
161 Some(metadata),
162 ));
163 }
164
165 Ok(())
166 }
167}
168
169#[async_trait]
174pub trait MonitorRepositoryTrait<
175 N: NetworkRepositoryTrait + Send + 'static,
176 T: TriggerRepositoryTrait + Send + 'static,
177>: Clone + Send
178{
179 async fn new(
181 path: Option<&Path>,
182 network_service: Option<NetworkService<N>>,
183 trigger_service: Option<TriggerService<T>>,
184 ) -> Result<Self, RepositoryError>
185 where
186 Self: Sized;
187
188 async fn load_all(
194 path: Option<&Path>,
195 network_service: Option<NetworkService<N>>,
196 trigger_service: Option<TriggerService<T>>,
197 ) -> Result<HashMap<String, Monitor>, RepositoryError>;
198
199 async fn load_from_path(
203 &self,
204 path: Option<&Path>,
205 network_service: Option<NetworkService<N>>,
206 trigger_service: Option<TriggerService<T>>,
207 ) -> Result<Monitor, RepositoryError>;
208
209 fn get(&self, monitor_id: &str) -> Option<Monitor>;
213
214 fn get_all(&self) -> HashMap<String, Monitor>;
218}
219
220#[async_trait]
221impl<
222 N: NetworkRepositoryTrait + Send + Sync + 'static,
223 T: TriggerRepositoryTrait + Send + Sync + 'static,
224 > MonitorRepositoryTrait<N, T> for MonitorRepository<N, T>
225{
226 async fn new(
227 path: Option<&Path>,
228 network_service: Option<NetworkService<N>>,
229 trigger_service: Option<TriggerService<T>>,
230 ) -> Result<Self, RepositoryError> {
231 MonitorRepository::new(path, network_service, trigger_service).await
232 }
233
234 async fn load_all(
235 path: Option<&Path>,
236 network_service: Option<NetworkService<N>>,
237 trigger_service: Option<TriggerService<T>>,
238 ) -> Result<HashMap<String, Monitor>, RepositoryError> {
239 let monitors = Monitor::load_all(path).await.map_err(|e| {
240 RepositoryError::load_error(
241 "Failed to load monitors",
242 Some(Box::new(e)),
243 Some(HashMap::from([(
244 "path".to_string(),
245 path.map_or_else(|| "default".to_string(), |p| p.display().to_string()),
246 )])),
247 )
248 })?;
249
250 let networks = match network_service {
251 Some(service) => service.get_all(),
252 None => {
253 NetworkRepository::new(None)
254 .await
255 .map_err(|e| {
256 RepositoryError::load_error(
257 "Failed to load networks for monitor validation",
258 Some(Box::new(e)),
259 None,
260 )
261 })?
262 .networks
263 }
264 };
265
266 let triggers = match trigger_service {
267 Some(service) => service.get_all(),
268 None => {
269 TriggerRepository::new(None)
270 .await
271 .map_err(|e| {
272 RepositoryError::load_error(
273 "Failed to load triggers for monitor validation",
274 Some(Box::new(e)),
275 None,
276 )
277 })?
278 .triggers
279 }
280 };
281
282 Self::validate_monitor_references(&monitors, &triggers, &networks)?;
283 Ok(monitors)
284 }
285
286 async fn load_from_path(
290 &self,
291 path: Option<&Path>,
292 network_service: Option<NetworkService<N>>,
293 trigger_service: Option<TriggerService<T>>,
294 ) -> Result<Monitor, RepositoryError> {
295 match path {
296 Some(path) => {
297 let monitor = Monitor::load_from_path(path).await.map_err(|e| {
298 RepositoryError::load_error(
299 "Failed to load monitors",
300 Some(Box::new(e)),
301 Some(HashMap::from([(
302 "path".to_string(),
303 path.display().to_string(),
304 )])),
305 )
306 })?;
307
308 let networks = match network_service {
309 Some(service) => service.get_all(),
310 None => NetworkRepository::new(None).await?.networks,
311 };
312
313 let triggers = match trigger_service {
314 Some(service) => service.get_all(),
315 None => TriggerRepository::new(None).await?.triggers,
316 };
317 let monitors = HashMap::from([(monitor.name.clone(), monitor)]);
318 Self::validate_monitor_references(&monitors, &triggers, &networks)?;
319 match monitors.values().next() {
320 Some(monitor) => Ok(monitor.clone()),
321 None => Err(RepositoryError::load_error("No monitors found", None, None)),
322 }
323 }
324 None => Err(RepositoryError::load_error(
325 "Failed to load monitors",
326 None,
327 None,
328 )),
329 }
330 }
331
332 fn get(&self, monitor_id: &str) -> Option<Monitor> {
333 self.monitors.get(monitor_id).cloned()
334 }
335
336 fn get_all(&self) -> HashMap<String, Monitor> {
337 self.monitors.clone()
338 }
339}
340
341#[derive(Clone)]
347pub struct MonitorService<
348 M: MonitorRepositoryTrait<N, T> + Send,
349 N: NetworkRepositoryTrait + Send + Sync + 'static,
350 T: TriggerRepositoryTrait + Send + Sync + 'static,
351> {
352 repository: M,
353 _network_repository: PhantomData<N>,
354 _trigger_repository: PhantomData<T>,
355}
356
357impl<
358 M: MonitorRepositoryTrait<N, T> + Send,
359 N: NetworkRepositoryTrait + Send + Sync + 'static,
360 T: TriggerRepositoryTrait + Send + Sync + 'static,
361 > MonitorService<M, N, T>
362{
363 pub async fn new(
368 path: Option<&Path>,
369 network_service: Option<NetworkService<N>>,
370 trigger_service: Option<TriggerService<T>>,
371 ) -> Result<MonitorService<M, N, T>, RepositoryError> {
372 let repository = M::new(path, network_service, trigger_service).await?;
373 Ok(MonitorService {
374 repository,
375 _network_repository: PhantomData,
376 _trigger_repository: PhantomData,
377 })
378 }
379
380 pub async fn new_with_path(
384 path: Option<&Path>,
385 ) -> Result<MonitorService<M, N, T>, RepositoryError> {
386 let repository = M::new(path, None, None).await?;
387 Ok(MonitorService {
388 repository,
389 _network_repository: PhantomData,
390 _trigger_repository: PhantomData,
391 })
392 }
393
394 pub fn new_with_repository(repository: M) -> Result<Self, RepositoryError> {
398 Ok(MonitorService {
399 repository,
400 _network_repository: PhantomData,
401 _trigger_repository: PhantomData,
402 })
403 }
404
405 pub fn get(&self, monitor_id: &str) -> Option<Monitor> {
409 self.repository.get(monitor_id)
410 }
411
412 pub fn get_all(&self) -> HashMap<String, Monitor> {
416 self.repository.get_all()
417 }
418
419 pub async fn load_from_path(
423 &self,
424 path: Option<&Path>,
425 network_service: Option<NetworkService<N>>,
426 trigger_service: Option<TriggerService<T>>,
427 ) -> Result<Monitor, RepositoryError> {
428 self.repository
429 .load_from_path(path, network_service, trigger_service)
430 .await
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use super::*;
437 use crate::{models::ScriptLanguage, utils::tests::builders::evm::monitor::MonitorBuilder};
438 use std::fs;
439 use tempfile::TempDir;
440
441 #[test]
442 fn test_validate_custom_trigger_conditions() {
443 let temp_dir = TempDir::new().unwrap();
444 let script_path = temp_dir.path().join("test_script.py");
445 fs::write(&script_path, "print('test')").unwrap();
446
447 let mut monitors = HashMap::new();
448 let triggers = HashMap::new();
449 let networks = HashMap::new();
450
451 let monitor = MonitorBuilder::new()
453 .name("test_monitor")
454 .networks(vec![])
455 .trigger_condition(
456 script_path.to_str().unwrap(),
457 1000,
458 ScriptLanguage::Python,
459 None,
460 )
461 .build();
462 monitors.insert("test_monitor".to_string(), monitor);
463
464 let result =
465 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
466 &monitors, &triggers, &networks,
467 );
468 assert!(result.is_ok());
469
470 let monitor_bad_path = MonitorBuilder::new()
472 .name("test_monitor_bad_path")
473 .trigger_condition("non_existent_script.py", 1000, ScriptLanguage::Python, None)
474 .build();
475 monitors.insert("test_monitor_bad_path".to_string(), monitor_bad_path);
476
477 let err =
478 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
479 &monitors, &triggers, &networks,
480 )
481 .unwrap_err();
482 assert!(err.to_string().contains("does not exist"));
483
484 let wrong_ext_path = temp_dir.path().join("test_script.js");
486 fs::write(&wrong_ext_path, "print('test')").unwrap();
487
488 let monitor_wrong_ext = MonitorBuilder::new()
489 .name("test_monitor_wrong_ext")
490 .trigger_condition(
491 wrong_ext_path.to_str().unwrap(),
492 1000,
493 ScriptLanguage::Python,
494 None,
495 )
496 .build();
497 monitors.clear();
498 monitors.insert("test_monitor_wrong_ext".to_string(), monitor_wrong_ext);
499
500 let err =
501 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
502 &monitors, &triggers, &networks,
503 )
504 .unwrap_err();
505 assert!(err.to_string().contains(
506 "Monitor 'test_monitor_wrong_ext' has a custom filter script with invalid extension - \
507 must be .py for Python language"
508 ));
509
510 let monitor_zero_timeout = MonitorBuilder::new()
512 .name("test_monitor_zero_timeout")
513 .trigger_condition(
514 script_path.to_str().unwrap(),
515 0,
516 ScriptLanguage::Python,
517 None,
518 )
519 .build();
520 monitors.clear();
521 monitors.insert(
522 "test_monitor_zero_timeout".to_string(),
523 monitor_zero_timeout,
524 );
525
526 let err =
527 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
528 &monitors, &triggers, &networks,
529 )
530 .unwrap_err();
531 assert!(err.to_string().contains("timeout_ms greater than 0"));
532 }
533
534 #[tokio::test]
535 async fn test_load_error_messages() {
536 let invalid_path = Path::new("/non/existent/path");
538 let result = MonitorRepository::<NetworkRepository, TriggerRepository>::load_all(
539 Some(invalid_path),
540 None,
541 None,
542 )
543 .await;
544
545 assert!(result.is_err());
546 let err = result.unwrap_err();
547 match err {
548 RepositoryError::LoadError(message) => {
549 assert!(message.to_string().contains("Failed to load monitors"));
550 }
551 _ => panic!("Expected RepositoryError::LoadError"),
552 }
553 }
554
555 #[test]
556 fn test_network_validation_error() {
557 let mut monitors = HashMap::new();
559 let monitor = MonitorBuilder::new()
560 .name("test_monitor")
561 .networks(vec!["non_existent_network".to_string()])
562 .build();
563 monitors.insert("test_monitor".to_string(), monitor);
564
565 let networks = HashMap::new();
567 let triggers = HashMap::new();
568
569 let result =
571 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
572 &monitors, &triggers, &networks,
573 );
574
575 assert!(result.is_err());
576 let err = result.unwrap_err();
577 assert!(err.to_string().contains("references non-existent network"));
578 }
579
580 #[test]
581 fn test_trigger_validation_error() {
582 let mut monitors = HashMap::new();
584 let monitor = MonitorBuilder::new()
585 .name("test_monitor")
586 .triggers(vec!["non_existent_trigger".to_string()])
587 .build();
588 monitors.insert("test_monitor".to_string(), monitor);
589
590 let networks = HashMap::new();
592 let triggers = HashMap::new();
593
594 let result =
596 MonitorRepository::<NetworkRepository, TriggerRepository>::validate_monitor_references(
597 &monitors, &triggers, &networks,
598 );
599
600 assert!(result.is_err());
601 let err = result.unwrap_err();
602 assert!(err.to_string().contains("references non-existent trigger"));
603 }
604
605 #[tokio::test]
606 async fn test_load_from_path_error_handling() {
607 let temp_dir = TempDir::new().unwrap();
609 let invalid_path = temp_dir.path().join("non_existent_monitor.json");
610
611 let repository =
613 MonitorRepository::<NetworkRepository, TriggerRepository>::new_with_monitors(
614 HashMap::new(),
615 );
616
617 let result = repository
619 .load_from_path(Some(&invalid_path), None, None)
620 .await;
621
622 assert!(result.is_err());
624 let err = result.unwrap_err();
625 match err {
626 RepositoryError::LoadError(message) => {
627 assert!(message.to_string().contains("Failed to load monitors"));
628 assert!(message
630 .to_string()
631 .contains(&invalid_path.display().to_string()));
632 }
633 _ => panic!("Expected RepositoryError::LoadError"),
634 }
635 }
636}