openzeppelin_monitor/services/trigger/
service.rs1use std::{collections::HashMap, path::Path};
7
8use anyhow::Context;
9use async_trait::async_trait;
10
11use crate::{
12 models::{Monitor, MonitorMatch, ScriptLanguage, TriggerTypeConfig},
13 repositories::{TriggerRepositoryTrait, TriggerService},
14 services::{notification::NotificationService, trigger::error::TriggerError},
15 utils::normalize_string,
16};
17
18#[async_trait]
23pub trait TriggerExecutionServiceTrait {
24 async fn execute(
25 &self,
26 trigger_slugs: &[String],
27 variables: HashMap<String, String>,
28 monitor_match: &MonitorMatch,
29 trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
30 ) -> Result<(), TriggerError>;
31 async fn load_scripts(
32 &self,
33 monitors: &[Monitor],
34 ) -> Result<HashMap<String, (ScriptLanguage, String)>, TriggerError>;
35}
36
37pub struct TriggerExecutionService<T: TriggerRepositoryTrait> {
42 trigger_service: TriggerService<T>,
44 notification_service: NotificationService,
46}
47
48impl<T: TriggerRepositoryTrait> TriggerExecutionService<T> {
49 pub fn new(
58 trigger_service: TriggerService<T>,
59 notification_service: NotificationService,
60 ) -> Self {
61 Self {
62 trigger_service,
63 notification_service,
64 }
65 }
66}
67
68#[async_trait]
69impl<T: TriggerRepositoryTrait + Send + Sync> TriggerExecutionServiceTrait
70 for TriggerExecutionService<T>
71{
72 async fn execute(
85 &self,
86 trigger_slugs: &[String],
87 variables: HashMap<String, String>,
88 monitor_match: &MonitorMatch,
89 trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
90 ) -> Result<(), TriggerError> {
91 use futures::future::join_all;
92
93 let futures = trigger_slugs.iter().map(|trigger_slug| async {
94 let trigger = self
95 .trigger_service
96 .get(trigger_slug)
97 .ok_or_else(|| TriggerError::not_found(trigger_slug.to_string(), None, None))?;
98
99 self.notification_service
100 .execute(&trigger, &variables, monitor_match, trigger_scripts)
101 .await
102 .map_err(|e| TriggerError::execution_error_without_log(e.to_string(), None, None))
104 });
105
106 let results = join_all(futures).await;
107 let errors: Vec<_> = results.into_iter().filter_map(|r| r.err()).collect();
108
109 if errors.is_empty() {
110 Ok(())
111 } else {
112 Err(TriggerError::execution_error(
113 format!("Some trigger(s) failed ({} failure(s))", errors.len()),
114 Some(
117 TriggerError::execution_error(
118 format!(
119 "{:#?}",
120 errors
121 .iter()
122 .map(|e| e.to_string())
123 .collect::<Vec<_>>()
124 .join(", ")
125 ),
126 None,
127 None,
128 )
129 .into(),
130 ),
131 None,
132 ))
133 }
134 }
135 async fn load_scripts(
147 &self,
148 monitors: &[Monitor],
149 ) -> Result<HashMap<String, (ScriptLanguage, String)>, TriggerError> {
150 let mut scripts = HashMap::new();
151
152 for monitor in monitors {
153 if monitor.trigger_conditions.is_empty() && monitor.triggers.is_empty() {
155 continue;
156 }
157
158 for condition in &monitor.trigger_conditions {
160 let script_path = Path::new(&condition.script_path);
161
162 let content = tokio::fs::read_to_string(script_path)
164 .await
165 .with_context(|| {
166 format!("Failed to read script file: {}", condition.script_path)
167 })?;
168 scripts.insert(
170 format!(
171 "{}|{}",
172 normalize_string(&monitor.name),
173 condition.script_path
174 ),
175 (condition.language.clone(), content),
176 );
177 }
178
179 for trigger in &monitor.triggers {
181 let trigger_config =
182 self.trigger_service.get(trigger.as_str()).ok_or_else(|| {
183 TriggerError::configuration_error(
184 format!("Failed to get trigger: {}", trigger),
185 None,
186 None,
187 )
188 })?;
189
190 let TriggerTypeConfig::Script {
191 language,
192 script_path,
193 arguments: _,
194 timeout_ms: _,
195 } = &trigger_config.config
196 else {
197 continue;
198 };
199
200 let script_path = Path::new(script_path);
201 let content = tokio::fs::read_to_string(script_path).await.map_err(|e| {
202 TriggerError::configuration_error(
203 format!(
204 "Failed to read script file {}: {}",
205 script_path.display(),
206 e
207 ),
208 None,
209 None,
210 )
211 })?;
212
213 scripts.insert(
214 format!(
215 "{}|{}",
216 normalize_string(&monitor.name),
217 script_path.display()
218 ),
219 (language.clone(), content),
220 );
221 }
222 }
223
224 Ok(scripts)
225 }
226}