openzeppelin_monitor/services/trigger/
service.rs

1//! Trigger execution service implementation.
2//!
3//! Provides functionality to execute triggers with variable substitution
4//! and notification delivery. Manages trigger lookup and execution flow.
5
6use std::{collections::HashMap, path::Path};
7
8use anyhow::Context;
9use async_trait::async_trait;
10
11use crate::{
12	models::{Monitor, MonitorMatch, ScriptLanguage, TriggerTypeConfig},
13	repositories::{TriggerRepositoryTrait, TriggerService},
14	services::{notification::NotificationService, trigger::error::TriggerError},
15	utils::normalize_string,
16};
17
18/// Trait for executing triggers
19///
20/// This trait must be implemented by all trigger execution services to provide
21/// a way to execute triggers.
22#[async_trait]
23pub trait TriggerExecutionServiceTrait {
24	async fn execute(
25		&self,
26		trigger_slugs: &[String],
27		variables: HashMap<String, String>,
28		monitor_match: &MonitorMatch,
29		trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
30	) -> Result<(), TriggerError>;
31	async fn load_scripts(
32		&self,
33		monitors: &[Monitor],
34	) -> Result<HashMap<String, (ScriptLanguage, String)>, TriggerError>;
35}
36
37/// Service for executing triggers with notifications
38///
39/// Coordinates trigger lookup, variable substitution, and notification
40/// delivery across different notification channels
41pub struct TriggerExecutionService<T: TriggerRepositoryTrait> {
42	/// Service for trigger management and lookup
43	trigger_service: TriggerService<T>,
44	/// Service for sending notifications
45	notification_service: NotificationService,
46}
47
48impl<T: TriggerRepositoryTrait> TriggerExecutionService<T> {
49	/// Creates a new trigger execution service
50	///
51	/// # Arguments
52	/// * `trigger_service` - Service for trigger operations
53	/// * `notification_service` - Service for notification delivery
54	///
55	/// # Returns
56	/// * `Self` - New trigger execution service instance
57	pub fn new(
58		trigger_service: TriggerService<T>,
59		notification_service: NotificationService,
60	) -> Self {
61		Self {
62			trigger_service,
63			notification_service,
64		}
65	}
66}
67
68#[async_trait]
69impl<T: TriggerRepositoryTrait + Send + Sync> TriggerExecutionServiceTrait
70	for TriggerExecutionService<T>
71{
72	/// Executes multiple triggers with variable substitution
73	///
74	/// # Arguments
75	/// * `trigger_slugs` - List of trigger identifiers to execute
76	/// * `variables` - Variables to substitute in trigger templates
77	///
78	/// # Returns
79	/// * `Result<(), TriggerError>` - Success or error
80	///
81	/// # Errors
82	/// - Returns `TriggerError::NotFound` if a trigger cannot be found
83	/// - Returns `TriggerError::ExecutionError` if notification delivery fails
84	async fn execute(
85		&self,
86		trigger_slugs: &[String],
87		variables: HashMap<String, String>,
88		monitor_match: &MonitorMatch,
89		trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
90	) -> Result<(), TriggerError> {
91		use futures::future::join_all;
92
93		let futures = trigger_slugs.iter().map(|trigger_slug| async {
94			let trigger = self
95				.trigger_service
96				.get(trigger_slug)
97				.ok_or_else(|| TriggerError::not_found(trigger_slug.to_string(), None, None))?;
98
99			self.notification_service
100				.execute(&trigger, &variables, monitor_match, trigger_scripts)
101				.await
102				// We remove logging capability here since we're logging it further down
103				.map_err(|e| TriggerError::execution_error_without_log(e.to_string(), None, None))
104		});
105
106		let results = join_all(futures).await;
107		let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
108
109		if errors.is_empty() {
110			Ok(())
111		} else {
112			Err(TriggerError::execution_error(
113				format!("Some trigger(s) failed ({} failure(s))", errors.len()),
114				// We join all errors into a single string for the source and wrap it as a single
115				// Execution
116				Some(
117					TriggerError::execution_error(
118						format!(
119							"{:#?}",
120							errors
121								.iter()
122								.map(|e| e.to_string())
123								.collect::<Vec<_>>()
124								.join(", ")
125						),
126						None,
127						None,
128					)
129					.into(),
130				),
131				None,
132			))
133		}
134	}
135	/// Loads trigger condition scripts for monitors
136	///
137	/// # Arguments
138	/// * `monitors` - List of monitors containing trigger conditions
139	///
140	/// # Returns
141	/// * `Result<HashMap<String, (ScriptLanguage, String)>, TriggerError>` - Map of monitor names
142	///   and script path to their script language and content
143	///
144	/// # Errors
145	/// - Returns `TriggerError::ConfigurationError` if script files cannot be read
146	async fn load_scripts(
147		&self,
148		monitors: &[Monitor],
149	) -> Result<HashMap<String, (ScriptLanguage, String)>, TriggerError> {
150		let mut scripts = HashMap::new();
151
152		for monitor in monitors {
153			// Skip monitors without trigger conditions
154			if monitor.trigger_conditions.is_empty() && monitor.triggers.is_empty() {
155				continue;
156			}
157
158			// For each monitor, we'll load all its trigger condition scripts
159			for condition in &monitor.trigger_conditions {
160				let script_path = Path::new(&condition.script_path);
161
162				// Read the script content
163				let content = tokio::fs::read_to_string(script_path)
164					.await
165					.with_context(|| {
166						format!("Failed to read script file: {}", condition.script_path)
167					})?;
168				// Store the script content with its language
169				scripts.insert(
170					format!(
171						"{}|{}",
172						normalize_string(&monitor.name),
173						condition.script_path
174					),
175					(condition.language.clone(), content),
176				);
177			}
178
179			// For each trigger, we'll load the script
180			for trigger in &monitor.triggers {
181				let trigger_config =
182					self.trigger_service.get(trigger.as_str()).ok_or_else(|| {
183						TriggerError::configuration_error(
184							format!("Failed to get trigger: {}", trigger),
185							None,
186							None,
187						)
188					})?;
189
190				let TriggerTypeConfig::Script {
191					language,
192					script_path,
193					arguments: _,
194					timeout_ms: _,
195				} = &trigger_config.config
196				else {
197					continue;
198				};
199
200				let script_path = Path::new(script_path);
201				let content = tokio::fs::read_to_string(script_path).await.map_err(|e| {
202					TriggerError::configuration_error(
203						format!(
204							"Failed to read script file {}: {}",
205							script_path.display(),
206							e
207						),
208						None,
209						None,
210					)
211				})?;
212
213				scripts.insert(
214					format!(
215						"{}|{}",
216						normalize_string(&monitor.name),
217						script_path.display()
218					),
219					(language.clone(), content),
220				);
221			}
222		}
223
224		Ok(scripts)
225	}
226}