openzeppelin_monitor/services/notification/
mod.rs

1//! Notification service implementation.
2//!
3//! This module provides functionality to send notifications through various channels
4//! Supports variable substitution in message templates.
5
6use async_trait::async_trait;
7
8use std::{collections::HashMap, sync::Arc};
9
10mod discord;
11mod email;
12mod error;
13mod pool;
14mod script;
15mod slack;
16mod telegram;
17mod webhook;
18
19use crate::{
20	models::{MonitorMatch, ScriptLanguage, Trigger, TriggerType, TriggerTypeConfig},
21	utils::normalize_string,
22};
23
24pub use discord::DiscordNotifier;
25pub use email::{EmailContent, EmailNotifier, SmtpConfig};
26pub use error::NotificationError;
27pub use pool::NotificationClientPool;
28pub use script::ScriptNotifier;
29pub use slack::SlackNotifier;
30pub use telegram::TelegramNotifier;
31pub use webhook::{WebhookConfig, WebhookNotifier};
32
33/// Interface for notification implementations
34///
35/// All notification types must implement this trait to provide
36/// consistent notification behavior.
37#[async_trait]
38pub trait Notifier {
39	/// Sends a notification with the given message
40	///
41	/// # Arguments
42	/// * `message` - The formatted message to send
43	///
44	/// # Returns
45	/// * `Result<(), NotificationError>` - Success or error
46	async fn notify(&self, message: &str) -> Result<(), NotificationError>;
47
48	/// Sends a notification with custom payload fields
49	///
50	/// # Arguments
51	/// * `message` - The formatted message to send
52	/// * `payload_fields` - Additional fields to include in the payload
53	///
54	/// # Returns
55	/// * `Result<(), NotificationError>` - Success or error
56	async fn notify_with_payload(
57		&self,
58		message: &str,
59		_payload_fields: HashMap<String, serde_json::Value>,
60	) -> Result<(), NotificationError> {
61		// Default implementation just calls notify
62		self.notify(message).await
63	}
64}
65
66/// Interface for executing scripts
67///
68/// This Interface is used to execute scripts for notifications.
69/// It is implemented by the ScriptNotifier struct.
70#[async_trait]
71pub trait ScriptExecutor {
72	/// Executes a script to send a custom notifications
73	///
74	/// # Arguments
75	/// * `monitor_match` - The monitor match to send
76	/// * `script_content` - The script content to execute
77	///
78	/// # Returns
79	/// * `Result<(), NotificationError>` - Success or error
80	async fn script_notify(
81		&self,
82		monitor_match: &MonitorMatch,
83		script_content: &(ScriptLanguage, String),
84	) -> Result<(), NotificationError>;
85}
86
87/// Service for managing notifications across different channels
88pub struct NotificationService {
89	/// Client pool for managing notification clients (HTTP, SMTP)
90	client_pool: Arc<NotificationClientPool>,
91}
92
93impl NotificationService {
94	/// Creates a new notification service instance
95	pub fn new() -> Self {
96		NotificationService {
97			client_pool: Arc::new(NotificationClientPool::new()),
98		}
99	}
100
101	/// Executes a notification based on the trigger configuration
102	///
103	/// # Arguments
104	/// * `trigger` - Trigger containing the notification type and parameters
105	/// * `variables` - Variables to substitute in message templates
106	/// * `monitor_match` - Monitor match to send (needed for custom script trigger)
107	/// * `trigger_scripts` - Contains the script content to execute (needed for custom script
108	///   trigger)
109	///
110	/// # Returns
111	/// * `Result<(), NotificationError>` - Success or error
112	pub async fn execute(
113		&self,
114		trigger: &Trigger,
115		variables: &HashMap<String, String>,
116		monitor_match: &MonitorMatch,
117		trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
118	) -> Result<(), NotificationError> {
119		match &trigger.trigger_type {
120			// Match Webhook-based triggers
121			TriggerType::Slack
122			| TriggerType::Discord
123			| TriggerType::Webhook
124			| TriggerType::Telegram => {
125				// Extract retry policy from the trigger configuration
126				let retry_policy = trigger.config.get_retry_policy().ok_or_else(|| {
127					NotificationError::config_error(
128						format!("Expected retry policy in trigger config: {}", trigger.name),
129						None,
130						None,
131					)
132				})?;
133
134				// Get or create the HTTP client from the pool
135				let http_client = self
136					.client_pool
137					.get_or_create_http_client(&retry_policy)
138					.await
139					.map_err(|e| {
140						NotificationError::execution_error(
141							"Failed to get or create HTTP client from pool".to_string(),
142							Some(e.into()),
143							None,
144						)
145					})?;
146
147				match &trigger.trigger_type {
148					TriggerType::Webhook => {
149						let notifier = WebhookNotifier::from_config(&trigger.config, http_client)?;
150						let message = notifier.format_message(variables);
151						notifier.notify(&message).await?;
152					}
153					TriggerType::Discord => {
154						let notifier = DiscordNotifier::from_config(&trigger.config, http_client)?;
155						let message = notifier.format_message(variables);
156						notifier.notify(&message).await?;
157					}
158					TriggerType::Telegram => {
159						let notifier = TelegramNotifier::from_config(&trigger.config, http_client)?;
160						let message = notifier.format_message(variables);
161						notifier.notify(&message).await?;
162					}
163					TriggerType::Slack => {
164						let notifier = SlackNotifier::from_config(&trigger.config, http_client)?;
165						let message = notifier.format_message(variables);
166						notifier.notify(&message).await?;
167					}
168					_ => unreachable!(),
169				}
170			}
171			TriggerType::Email => {
172				// Extract SMTP configuration from the trigger
173				let smtp_config = match &trigger.config {
174					TriggerTypeConfig::Email {
175						host,
176						port,
177						username,
178						password,
179						..
180					} => SmtpConfig {
181						host: host.clone(),
182						port: port.unwrap_or(465),
183						username: username.as_ref().to_string(),
184						password: password.as_ref().to_string(),
185					},
186					_ => {
187						return Err(NotificationError::config_error(
188							"Invalid email configuration".to_string(),
189							None,
190							None,
191						))
192					}
193				};
194
195				// Get or create the SMTP client from the pool
196				let smtp_client = self
197					.client_pool
198					.get_or_create_smtp_client(&smtp_config)
199					.await
200					.map_err(|e| {
201						NotificationError::execution_error(
202							"Failed to get SMTP client from pool".to_string(),
203							Some(e.into()),
204							None,
205						)
206					})?;
207
208				let notifier = EmailNotifier::from_config(&trigger.config, smtp_client)?;
209				let message = notifier.format_message(variables);
210				notifier.notify(&message).await?;
211			}
212			TriggerType::Script => {
213				let notifier = ScriptNotifier::from_config(&trigger.config)?;
214				let monitor_name = match monitor_match {
215					MonitorMatch::EVM(evm_match) => &evm_match.monitor.name,
216					MonitorMatch::Stellar(stellar_match) => &stellar_match.monitor.name,
217				};
218				let script_path = match &trigger.config {
219					TriggerTypeConfig::Script { script_path, .. } => script_path,
220					_ => {
221						return Err(NotificationError::config_error(
222							"Invalid script configuration".to_string(),
223							None,
224							None,
225						))
226					}
227				};
228				let script = trigger_scripts
229					.get(&format!(
230						"{}|{}",
231						normalize_string(monitor_name),
232						script_path
233					))
234					.ok_or_else(|| {
235						NotificationError::config_error(
236							"Script content not found".to_string(),
237							None,
238							None,
239						)
240					});
241				let script_content = match &script {
242					Ok(content) => content,
243					Err(e) => {
244						return Err(NotificationError::config_error(e.to_string(), None, None))
245					}
246				};
247
248				notifier
249					.script_notify(monitor_match, script_content)
250					.await?;
251			}
252		}
253		Ok(())
254	}
255}
256
257impl Default for NotificationService {
258	fn default() -> Self {
259		Self::new()
260	}
261}
262
263#[cfg(test)]
264mod tests {
265	use super::*;
266	use crate::{
267		models::{
268			AddressWithSpec, EVMMonitorMatch, EVMTransactionReceipt, EventCondition,
269			FunctionCondition, MatchConditions, Monitor, MonitorMatch, ScriptLanguage,
270			TransactionCondition, TriggerType,
271		},
272		utils::tests::{
273			builders::{evm::monitor::MonitorBuilder, trigger::TriggerBuilder},
274			evm::transaction::TransactionBuilder,
275		},
276	};
277	use std::collections::HashMap;
278
279	fn create_test_monitor(
280		event_conditions: Vec<EventCondition>,
281		function_conditions: Vec<FunctionCondition>,
282		transaction_conditions: Vec<TransactionCondition>,
283		addresses: Vec<AddressWithSpec>,
284	) -> Monitor {
285		let mut builder = MonitorBuilder::new()
286			.name("test")
287			.networks(vec!["evm_mainnet".to_string()]);
288
289		// Add all conditions
290		for event in event_conditions {
291			builder = builder.event(&event.signature, event.expression);
292		}
293		for function in function_conditions {
294			builder = builder.function(&function.signature, function.expression);
295		}
296		for transaction in transaction_conditions {
297			builder = builder.transaction(transaction.status, transaction.expression);
298		}
299
300		// Add addresses
301		for addr in addresses {
302			builder = builder.address(&addr.address);
303		}
304
305		builder.build()
306	}
307
308	fn create_mock_monitor_match() -> MonitorMatch {
309		MonitorMatch::EVM(Box::new(EVMMonitorMatch {
310			monitor: create_test_monitor(vec![], vec![], vec![], vec![]),
311			transaction: TransactionBuilder::new().build(),
312			receipt: Some(EVMTransactionReceipt::default()),
313			logs: Some(vec![]),
314			network_slug: "evm_mainnet".to_string(),
315			matched_on: MatchConditions {
316				functions: vec![],
317				events: vec![],
318				transactions: vec![],
319			},
320			matched_on_args: None,
321		}))
322	}
323
324	#[tokio::test]
325	async fn test_slack_notification_invalid_config() {
326		let service = NotificationService::new();
327
328		let trigger = TriggerBuilder::new()
329			.name("test_slack")
330			.script("invalid", ScriptLanguage::Python)
331			.trigger_type(TriggerType::Slack) // Intentionally wrong config type
332			.build();
333
334		let variables = HashMap::new();
335		let result = service
336			.execute(
337				&trigger,
338				&variables,
339				&create_mock_monitor_match(),
340				&HashMap::new(),
341			)
342			.await;
343		assert!(result.is_err());
344		match result {
345			Err(NotificationError::ConfigError(ctx)) => {
346				assert!(ctx
347					.message
348					.contains("Expected retry policy in trigger config"));
349			}
350			_ => panic!("Expected ConfigError"),
351		}
352	}
353
354	#[tokio::test]
355	async fn test_email_notification_invalid_config() {
356		let service = NotificationService::new();
357
358		let trigger = TriggerBuilder::new()
359			.name("test_email")
360			.script("invalid", ScriptLanguage::Python)
361			.trigger_type(TriggerType::Email) // Intentionally wrong config type
362			.build();
363
364		let variables = HashMap::new();
365		let result = service
366			.execute(
367				&trigger,
368				&variables,
369				&create_mock_monitor_match(),
370				&HashMap::new(),
371			)
372			.await;
373		assert!(result.is_err());
374		match result {
375			Err(NotificationError::ConfigError(ctx)) => {
376				assert!(ctx.message.contains("Invalid email configuration"));
377			}
378			_ => panic!("Expected ConfigError"),
379		}
380	}
381
382	#[tokio::test]
383	async fn test_webhook_notification_invalid_config() {
384		let service = NotificationService::new();
385
386		let trigger = TriggerBuilder::new()
387			.name("test_webhook")
388			.script("invalid", ScriptLanguage::Python)
389			.trigger_type(TriggerType::Webhook) // Intentionally wrong config type
390			.build();
391
392		let variables = HashMap::new();
393		let result = service
394			.execute(
395				&trigger,
396				&variables,
397				&create_mock_monitor_match(),
398				&HashMap::new(),
399			)
400			.await;
401		assert!(result.is_err());
402		match result {
403			Err(NotificationError::ConfigError(ctx)) => {
404				assert!(ctx
405					.message
406					.contains("Expected retry policy in trigger config"));
407			}
408			_ => panic!("Expected ConfigError"),
409		}
410	}
411
412	#[tokio::test]
413	async fn test_discord_notification_invalid_config() {
414		let service = NotificationService::new();
415
416		let trigger = TriggerBuilder::new()
417			.name("test_discord")
418			.script("invalid", ScriptLanguage::Python)
419			.trigger_type(TriggerType::Discord) // Intentionally wrong config type
420			.build();
421
422		let variables = HashMap::new();
423		let result = service
424			.execute(
425				&trigger,
426				&variables,
427				&create_mock_monitor_match(),
428				&HashMap::new(),
429			)
430			.await;
431		assert!(result.is_err());
432		match result {
433			Err(NotificationError::ConfigError(ctx)) => {
434				assert!(ctx
435					.message
436					.contains("Expected retry policy in trigger config"));
437			}
438			_ => panic!("Expected ConfigError"),
439		}
440	}
441
442	#[tokio::test]
443	async fn test_telegram_notification_invalid_config() {
444		let service = NotificationService::new();
445
446		let trigger = TriggerBuilder::new()
447			.name("test_telegram")
448			.script("invalid", ScriptLanguage::Python)
449			.trigger_type(TriggerType::Telegram) // Intentionally wrong config type
450			.build();
451
452		let variables = HashMap::new();
453		let result = service
454			.execute(
455				&trigger,
456				&variables,
457				&create_mock_monitor_match(),
458				&HashMap::new(),
459			)
460			.await;
461		assert!(result.is_err());
462		match result {
463			Err(NotificationError::ConfigError(ctx)) => {
464				assert!(ctx
465					.message
466					.contains("Expected retry policy in trigger config"));
467			}
468			_ => panic!("Expected ConfigError"),
469		}
470	}
471
472	#[tokio::test]
473	async fn test_script_notification_invalid_config() {
474		let service = NotificationService::new();
475
476		let trigger = TriggerBuilder::new()
477			.name("test_script")
478			.telegram("invalid", "invalid", false)
479			.trigger_type(TriggerType::Script) // Intentionally wrong config type
480			.build();
481
482		let variables = HashMap::new();
483
484		let result = service
485			.execute(
486				&trigger,
487				&variables,
488				&create_mock_monitor_match(),
489				&HashMap::new(),
490			)
491			.await;
492
493		assert!(result.is_err());
494		match result {
495			Err(NotificationError::ConfigError(ctx)) => {
496				assert!(ctx.message.contains("Invalid script configuration"));
497			}
498			_ => panic!("Expected ConfigError"),
499		}
500	}
501}