openzeppelin_monitor/services/notification/
mod.rs1use async_trait::async_trait;
7
8use std::{collections::HashMap, sync::Arc};
9
10mod discord;
11mod email;
12mod error;
13mod pool;
14mod script;
15mod slack;
16mod telegram;
17mod webhook;
18
19use crate::{
20 models::{MonitorMatch, ScriptLanguage, Trigger, TriggerType, TriggerTypeConfig},
21 utils::normalize_string,
22};
23
24pub use discord::DiscordNotifier;
25pub use email::{EmailContent, EmailNotifier, SmtpConfig};
26pub use error::NotificationError;
27pub use pool::NotificationClientPool;
28pub use script::ScriptNotifier;
29pub use slack::SlackNotifier;
30pub use telegram::TelegramNotifier;
31pub use webhook::{WebhookConfig, WebhookNotifier};
32
33#[async_trait]
38pub trait Notifier {
39 async fn notify(&self, message: &str) -> Result<(), NotificationError>;
47
48 async fn notify_with_payload(
57 &self,
58 message: &str,
59 _payload_fields: HashMap<String, serde_json::Value>,
60 ) -> Result<(), NotificationError> {
61 self.notify(message).await
63 }
64}
65
66#[async_trait]
71pub trait ScriptExecutor {
72 async fn script_notify(
81 &self,
82 monitor_match: &MonitorMatch,
83 script_content: &(ScriptLanguage, String),
84 ) -> Result<(), NotificationError>;
85}
86
87pub struct NotificationService {
89 client_pool: Arc<NotificationClientPool>,
91}
92
93impl NotificationService {
94 pub fn new() -> Self {
96 NotificationService {
97 client_pool: Arc::new(NotificationClientPool::new()),
98 }
99 }
100
101 pub async fn execute(
113 &self,
114 trigger: &Trigger,
115 variables: &HashMap<String, String>,
116 monitor_match: &MonitorMatch,
117 trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
118 ) -> Result<(), NotificationError> {
119 match &trigger.trigger_type {
120 TriggerType::Slack
122 | TriggerType::Discord
123 | TriggerType::Webhook
124 | TriggerType::Telegram => {
125 let retry_policy = trigger.config.get_retry_policy().ok_or_else(|| {
127 NotificationError::config_error(
128 format!("Expected retry policy in trigger config: {}", trigger.name),
129 None,
130 None,
131 )
132 })?;
133
134 let http_client = self
136 .client_pool
137 .get_or_create_http_client(&retry_policy)
138 .await
139 .map_err(|e| {
140 NotificationError::execution_error(
141 "Failed to get or create HTTP client from pool".to_string(),
142 Some(e.into()),
143 None,
144 )
145 })?;
146
147 match &trigger.trigger_type {
148 TriggerType::Webhook => {
149 let notifier = WebhookNotifier::from_config(&trigger.config, http_client)?;
150 let message = notifier.format_message(variables);
151 notifier.notify(&message).await?;
152 }
153 TriggerType::Discord => {
154 let notifier = DiscordNotifier::from_config(&trigger.config, http_client)?;
155 let message = notifier.format_message(variables);
156 notifier.notify(&message).await?;
157 }
158 TriggerType::Telegram => {
159 let notifier = TelegramNotifier::from_config(&trigger.config, http_client)?;
160 let message = notifier.format_message(variables);
161 notifier.notify(&message).await?;
162 }
163 TriggerType::Slack => {
164 let notifier = SlackNotifier::from_config(&trigger.config, http_client)?;
165 let message = notifier.format_message(variables);
166 notifier.notify(&message).await?;
167 }
168 _ => unreachable!(),
169 }
170 }
171 TriggerType::Email => {
172 let smtp_config = match &trigger.config {
174 TriggerTypeConfig::Email {
175 host,
176 port,
177 username,
178 password,
179 ..
180 } => SmtpConfig {
181 host: host.clone(),
182 port: port.unwrap_or(465),
183 username: username.as_ref().to_string(),
184 password: password.as_ref().to_string(),
185 },
186 _ => {
187 return Err(NotificationError::config_error(
188 "Invalid email configuration".to_string(),
189 None,
190 None,
191 ))
192 }
193 };
194
195 let smtp_client = self
197 .client_pool
198 .get_or_create_smtp_client(&smtp_config)
199 .await
200 .map_err(|e| {
201 NotificationError::execution_error(
202 "Failed to get SMTP client from pool".to_string(),
203 Some(e.into()),
204 None,
205 )
206 })?;
207
208 let notifier = EmailNotifier::from_config(&trigger.config, smtp_client)?;
209 let message = notifier.format_message(variables);
210 notifier.notify(&message).await?;
211 }
212 TriggerType::Script => {
213 let notifier = ScriptNotifier::from_config(&trigger.config)?;
214 let monitor_name = match monitor_match {
215 MonitorMatch::EVM(evm_match) => &evm_match.monitor.name,
216 MonitorMatch::Stellar(stellar_match) => &stellar_match.monitor.name,
217 };
218 let script_path = match &trigger.config {
219 TriggerTypeConfig::Script { script_path, .. } => script_path,
220 _ => {
221 return Err(NotificationError::config_error(
222 "Invalid script configuration".to_string(),
223 None,
224 None,
225 ))
226 }
227 };
228 let script = trigger_scripts
229 .get(&format!(
230 "{}|{}",
231 normalize_string(monitor_name),
232 script_path
233 ))
234 .ok_or_else(|| {
235 NotificationError::config_error(
236 "Script content not found".to_string(),
237 None,
238 None,
239 )
240 });
241 let script_content = match &script {
242 Ok(content) => content,
243 Err(e) => {
244 return Err(NotificationError::config_error(e.to_string(), None, None))
245 }
246 };
247
248 notifier
249 .script_notify(monitor_match, script_content)
250 .await?;
251 }
252 }
253 Ok(())
254 }
255}
256
257impl Default for NotificationService {
258 fn default() -> Self {
259 Self::new()
260 }
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use crate::{
267 models::{
268 AddressWithSpec, EVMMonitorMatch, EVMTransactionReceipt, EventCondition,
269 FunctionCondition, MatchConditions, Monitor, MonitorMatch, ScriptLanguage,
270 TransactionCondition, TriggerType,
271 },
272 utils::tests::{
273 builders::{evm::monitor::MonitorBuilder, trigger::TriggerBuilder},
274 evm::transaction::TransactionBuilder,
275 },
276 };
277 use std::collections::HashMap;
278
279 fn create_test_monitor(
280 event_conditions: Vec<EventCondition>,
281 function_conditions: Vec<FunctionCondition>,
282 transaction_conditions: Vec<TransactionCondition>,
283 addresses: Vec<AddressWithSpec>,
284 ) -> Monitor {
285 let mut builder = MonitorBuilder::new()
286 .name("test")
287 .networks(vec!["evm_mainnet".to_string()]);
288
289 for event in event_conditions {
291 builder = builder.event(&event.signature, event.expression);
292 }
293 for function in function_conditions {
294 builder = builder.function(&function.signature, function.expression);
295 }
296 for transaction in transaction_conditions {
297 builder = builder.transaction(transaction.status, transaction.expression);
298 }
299
300 for addr in addresses {
302 builder = builder.address(&addr.address);
303 }
304
305 builder.build()
306 }
307
308 fn create_mock_monitor_match() -> MonitorMatch {
309 MonitorMatch::EVM(Box::new(EVMMonitorMatch {
310 monitor: create_test_monitor(vec![], vec![], vec![], vec![]),
311 transaction: TransactionBuilder::new().build(),
312 receipt: Some(EVMTransactionReceipt::default()),
313 logs: Some(vec![]),
314 network_slug: "evm_mainnet".to_string(),
315 matched_on: MatchConditions {
316 functions: vec![],
317 events: vec![],
318 transactions: vec![],
319 },
320 matched_on_args: None,
321 }))
322 }
323
324 #[tokio::test]
325 async fn test_slack_notification_invalid_config() {
326 let service = NotificationService::new();
327
328 let trigger = TriggerBuilder::new()
329 .name("test_slack")
330 .script("invalid", ScriptLanguage::Python)
331 .trigger_type(TriggerType::Slack) .build();
333
334 let variables = HashMap::new();
335 let result = service
336 .execute(
337 &trigger,
338 &variables,
339 &create_mock_monitor_match(),
340 &HashMap::new(),
341 )
342 .await;
343 assert!(result.is_err());
344 match result {
345 Err(NotificationError::ConfigError(ctx)) => {
346 assert!(ctx
347 .message
348 .contains("Expected retry policy in trigger config"));
349 }
350 _ => panic!("Expected ConfigError"),
351 }
352 }
353
354 #[tokio::test]
355 async fn test_email_notification_invalid_config() {
356 let service = NotificationService::new();
357
358 let trigger = TriggerBuilder::new()
359 .name("test_email")
360 .script("invalid", ScriptLanguage::Python)
361 .trigger_type(TriggerType::Email) .build();
363
364 let variables = HashMap::new();
365 let result = service
366 .execute(
367 &trigger,
368 &variables,
369 &create_mock_monitor_match(),
370 &HashMap::new(),
371 )
372 .await;
373 assert!(result.is_err());
374 match result {
375 Err(NotificationError::ConfigError(ctx)) => {
376 assert!(ctx.message.contains("Invalid email configuration"));
377 }
378 _ => panic!("Expected ConfigError"),
379 }
380 }
381
382 #[tokio::test]
383 async fn test_webhook_notification_invalid_config() {
384 let service = NotificationService::new();
385
386 let trigger = TriggerBuilder::new()
387 .name("test_webhook")
388 .script("invalid", ScriptLanguage::Python)
389 .trigger_type(TriggerType::Webhook) .build();
391
392 let variables = HashMap::new();
393 let result = service
394 .execute(
395 &trigger,
396 &variables,
397 &create_mock_monitor_match(),
398 &HashMap::new(),
399 )
400 .await;
401 assert!(result.is_err());
402 match result {
403 Err(NotificationError::ConfigError(ctx)) => {
404 assert!(ctx
405 .message
406 .contains("Expected retry policy in trigger config"));
407 }
408 _ => panic!("Expected ConfigError"),
409 }
410 }
411
412 #[tokio::test]
413 async fn test_discord_notification_invalid_config() {
414 let service = NotificationService::new();
415
416 let trigger = TriggerBuilder::new()
417 .name("test_discord")
418 .script("invalid", ScriptLanguage::Python)
419 .trigger_type(TriggerType::Discord) .build();
421
422 let variables = HashMap::new();
423 let result = service
424 .execute(
425 &trigger,
426 &variables,
427 &create_mock_monitor_match(),
428 &HashMap::new(),
429 )
430 .await;
431 assert!(result.is_err());
432 match result {
433 Err(NotificationError::ConfigError(ctx)) => {
434 assert!(ctx
435 .message
436 .contains("Expected retry policy in trigger config"));
437 }
438 _ => panic!("Expected ConfigError"),
439 }
440 }
441
442 #[tokio::test]
443 async fn test_telegram_notification_invalid_config() {
444 let service = NotificationService::new();
445
446 let trigger = TriggerBuilder::new()
447 .name("test_telegram")
448 .script("invalid", ScriptLanguage::Python)
449 .trigger_type(TriggerType::Telegram) .build();
451
452 let variables = HashMap::new();
453 let result = service
454 .execute(
455 &trigger,
456 &variables,
457 &create_mock_monitor_match(),
458 &HashMap::new(),
459 )
460 .await;
461 assert!(result.is_err());
462 match result {
463 Err(NotificationError::ConfigError(ctx)) => {
464 assert!(ctx
465 .message
466 .contains("Expected retry policy in trigger config"));
467 }
468 _ => panic!("Expected ConfigError"),
469 }
470 }
471
472 #[tokio::test]
473 async fn test_script_notification_invalid_config() {
474 let service = NotificationService::new();
475
476 let trigger = TriggerBuilder::new()
477 .name("test_script")
478 .telegram("invalid", "invalid", false)
479 .trigger_type(TriggerType::Script) .build();
481
482 let variables = HashMap::new();
483
484 let result = service
485 .execute(
486 &trigger,
487 &variables,
488 &create_mock_monitor_match(),
489 &HashMap::new(),
490 )
491 .await;
492
493 assert!(result.is_err());
494 match result {
495 Err(NotificationError::ConfigError(ctx)) => {
496 assert!(ctx.message.contains("Invalid script configuration"));
497 }
498 _ => panic!("Expected ConfigError"),
499 }
500 }
501}