openzeppelin_monitor/bootstrap/
mod.rs

1//! Bootstrap module for initializing services and creating handlers.
2//!
3//! This module provides functions to initialize the necessary services and create handlers for
4//! processing blocks and triggers. It also includes helper functions for filtering and processing
5//! monitors and networks.
6//!
7//! # Services
8//! - `FilterService`: Handles filtering of blockchain data
9//! - `TriggerExecutionService`: Manages trigger execution
10//! - `NotificationService`: Handles notifications
11//!
12//! # Handlers
13//! - `create_block_handler`: Creates a block handler function that processes new blocks from the
14//!   blockchain
15//! - `create_trigger_handler`: Creates a trigger handler function that processes trigger events
16//!   from the block processing pipeline
17
18use futures::future::BoxFuture;
19use std::{collections::HashMap, error::Error, sync::Arc};
20use tokio::sync::{watch, Mutex};
21
22use crate::{
23	models::{
24		BlockChainType, BlockType, ContractSpec, Monitor, MonitorMatch, Network, ProcessedBlock,
25		ScriptLanguage, TriggerConditions,
26	},
27	repositories::{
28		MonitorRepositoryTrait, MonitorService, NetworkRepositoryTrait, NetworkService,
29		TriggerRepositoryTrait, TriggerService,
30	},
31	services::{
32		blockchain::{BlockChainClient, BlockFilterFactory, ClientPoolTrait},
33		filter::{evm_helpers, handle_match, stellar_helpers, FilterService},
34		notification::NotificationService,
35		trigger::{
36			ScriptError, ScriptExecutorFactory, TriggerError, TriggerExecutionService,
37			TriggerExecutionServiceTrait,
38		},
39	},
40	utils::normalize_string,
41};
42
43/// Type alias for handling ServiceResult
44pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
45
46type ServiceResult<M, N, T> = Result<(
47	Arc<FilterService>,
48	Arc<TriggerExecutionService<T>>,
49	Vec<Monitor>,
50	HashMap<String, Network>,
51	Arc<Mutex<MonitorService<M, N, T>>>,
52	Arc<Mutex<NetworkService<N>>>,
53	Arc<Mutex<TriggerService<T>>>,
54)>;
55
56/// Initializes all required services for the blockchain monitor.
57///
58/// # Returns
59/// Returns a tuple containing:
60/// - FilterService: Handles filtering of blockchain data
61/// - TriggerExecutionService: Manages trigger execution
62/// - `Vec<Monitor>`: List of active monitors
63/// - `HashMap<String, Network>`: Available networks indexed by slug
64/// - `Arc<Mutex<M>>`: Data access for monitor configs
65/// - `Arc<Mutex<N>>`: Data access for network configs
66/// - `Arc<Mutex<T>>`: Data access for trigger configs
67/// # Errors
68/// Returns an error if any service initialization fails
69pub async fn initialize_services<M, N, T>(
70	monitor_service: Option<MonitorService<M, N, T>>,
71	network_service: Option<NetworkService<N>>,
72	trigger_service: Option<TriggerService<T>>,
73) -> ServiceResult<M, N, T>
74where
75	M: MonitorRepositoryTrait<N, T> + Send + Sync + 'static,
76	N: NetworkRepositoryTrait + Send + Sync + 'static,
77	T: TriggerRepositoryTrait + Send + Sync + 'static,
78{
79	let network_service = match network_service {
80		Some(service) => service,
81		None => {
82			let repository = N::new(None).await?;
83			NetworkService::<N>::new_with_repository(repository)?
84		}
85	};
86
87	let trigger_service = match trigger_service {
88		Some(service) => service,
89		None => {
90			let repository = T::new(None).await?;
91			TriggerService::<T>::new_with_repository(repository)?
92		}
93	};
94
95	let monitor_service = match monitor_service {
96		Some(service) => service,
97		None => {
98			let repository = M::new(
99				None,
100				Some(network_service.clone()),
101				Some(trigger_service.clone()),
102			)
103			.await?;
104			MonitorService::<M, N, T>::new_with_repository(repository)?
105		}
106	};
107
108	let notification_service = NotificationService::new();
109
110	let filter_service = Arc::new(FilterService::new());
111	let trigger_execution_service = Arc::new(TriggerExecutionService::new(
112		trigger_service.clone(),
113		notification_service,
114	));
115
116	let monitors = monitor_service.get_all();
117	let active_monitors = filter_active_monitors(monitors);
118	let networks = network_service.get_all();
119
120	Ok((
121		filter_service,
122		trigger_execution_service,
123		active_monitors,
124		networks,
125		Arc::new(Mutex::new(monitor_service)),
126		Arc::new(Mutex::new(network_service)),
127		Arc::new(Mutex::new(trigger_service)),
128	))
129}
130
131/// Creates a block handler function that processes new blocks from the blockchain.
132///
133/// # Arguments
134/// * `shutdown_tx` - Watch channel for shutdown signals
135/// * `filter_service` - Service for filtering blockchain data
136/// * `active_monitors` - List of active monitors
137/// * `client_pools` - Client pools for accessing blockchain clients
138///
139/// # Returns
140/// Returns a function that handles incoming blocks
141pub fn create_block_handler<P: ClientPoolTrait + 'static>(
142	shutdown_tx: watch::Sender<bool>,
143	filter_service: Arc<FilterService>,
144	active_monitors: Vec<Monitor>,
145	client_pools: Arc<P>,
146	contract_specs: Vec<(String, ContractSpec)>,
147) -> Arc<impl Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync> {
148	Arc::new(
149		move |block: BlockType, network: Network| -> BoxFuture<'static, ProcessedBlock> {
150			let filter_service = filter_service.clone();
151			let active_monitors = active_monitors.clone();
152			let client_pools = client_pools.clone();
153			let shutdown_tx = shutdown_tx.clone();
154			let contract_specs = contract_specs.clone();
155			Box::pin(async move {
156				let applicable_monitors = filter_network_monitors(&active_monitors, &network.slug);
157
158				let mut processed_block = ProcessedBlock {
159					block_number: block.number().unwrap_or(0),
160					network_slug: network.slug.clone(),
161					processing_results: Vec::new(),
162				};
163
164				if !applicable_monitors.is_empty() {
165					let mut shutdown_rx = shutdown_tx.subscribe();
166
167					let matches = match network.network_type {
168						BlockChainType::EVM => match client_pools.get_evm_client(&network).await {
169							Ok(client) => {
170								process_block(
171									client.as_ref(),
172									&network,
173									&block,
174									&applicable_monitors,
175									Some(&contract_specs),
176									&filter_service,
177									&mut shutdown_rx,
178								)
179								.await
180							}
181							Err(_) => None,
182						},
183						BlockChainType::Stellar => {
184							match client_pools.get_stellar_client(&network).await {
185								Ok(client) => {
186									process_block(
187										client.as_ref(),
188										&network,
189										&block,
190										&applicable_monitors,
191										Some(&contract_specs),
192										&filter_service,
193										&mut shutdown_rx,
194									)
195									.await
196								}
197								Err(_) => None,
198							}
199						}
200						BlockChainType::Midnight => None,
201						BlockChainType::Solana => None,
202					};
203
204					processed_block.processing_results = matches.unwrap_or_default();
205				}
206
207				processed_block
208			})
209		},
210	)
211}
212
213/// Processes a single block for all applicable monitors.
214///
215/// # Arguments
216/// * `client` - The client to use to process the block
217/// * `network` - The network the block belongs to
218/// * `block` - The block to process
219/// * `applicable_monitors` - List of monitors that apply to this network
220/// * `filter_service` - Service for filtering blockchain data
221/// * `shutdown_rx` - Receiver for shutdown signals
222pub async fn process_block<T>(
223	client: &T,
224	network: &Network,
225	block: &BlockType,
226	applicable_monitors: &[Monitor],
227	contract_specs: Option<&[(String, ContractSpec)]>,
228	filter_service: &FilterService,
229	shutdown_rx: &mut watch::Receiver<bool>,
230) -> Option<Vec<MonitorMatch>>
231where
232	T: BlockChainClient + BlockFilterFactory<T>,
233{
234	tokio::select! {
235		result = filter_service.filter_block(client, network, block, applicable_monitors, contract_specs) => {
236			result.ok()
237		}
238		_ = shutdown_rx.changed() => {
239			tracing::info!("Shutting down block processing task");
240			None
241		}
242	}
243}
244
245/// Get contract specs for all applicable monitors
246///
247/// # Arguments
248/// * `client_pool` - The client pool to use to get the contract specs
249/// * `network_monitors` - The monitors to get the contract specs for
250///
251/// # Returns
252/// Returns a vector of contract specs
253pub async fn get_contract_specs<P: ClientPoolTrait + 'static>(
254	client_pool: &Arc<P>,
255	network_monitors: &[(Network, Vec<Monitor>)],
256) -> Vec<(String, ContractSpec)> {
257	let mut all_specs = Vec::new();
258
259	for (network, monitors) in network_monitors {
260		for monitor in monitors {
261			let specs = match network.network_type {
262				BlockChainType::Stellar => {
263					let mut contract_specs = Vec::new();
264					let mut addresses_without_specs = Vec::new();
265					// First collect addresses that have contract specs configured in the monitor
266					for monitored_addr in &monitor.addresses {
267						if let Some(spec) = &monitored_addr.contract_spec {
268							let parsed_spec = match spec {
269								ContractSpec::Stellar(spec) => spec,
270								_ => {
271									tracing::warn!(
272										"Skipping non-Stellar contract spec for address {}",
273										monitored_addr.address
274									);
275									continue;
276								}
277							};
278
279							contract_specs.push((
280								stellar_helpers::normalize_address(&monitored_addr.address),
281								ContractSpec::Stellar(parsed_spec.clone()),
282							))
283						} else {
284							addresses_without_specs.push(monitored_addr.address.clone());
285						}
286					}
287
288					// Fetch remaining specs from chain
289					if !addresses_without_specs.is_empty() {
290						// Get the client once
291						let client: Arc<P::StellarClient> =
292							match client_pool.get_stellar_client(network).await {
293								Ok(client) => client,
294								Err(_) => {
295									tracing::warn!("Failed to get stellar client");
296									continue;
297								}
298							};
299
300						let chain_specs = futures::future::join_all(
301							addresses_without_specs.iter().map(|address| {
302								let client = client.clone();
303								async move {
304									let spec = client.get_contract_spec(address).await;
305									(address.clone(), spec)
306								}
307							}),
308						)
309						.await
310						.into_iter()
311						.filter_map(|(addr, spec)| match spec {
312							Ok(s) => Some((addr, s)),
313							Err(e) => {
314								tracing::warn!(
315									"Failed to fetch contract spec for address {}: {:?}",
316									addr,
317									e
318								);
319								None
320							}
321						})
322						.collect::<Vec<_>>();
323
324						contract_specs.extend(chain_specs);
325					}
326					contract_specs
327				}
328				BlockChainType::EVM => {
329					let mut contract_specs = Vec::new();
330					// First collect addresses that have contract specs configured in the monitor
331					for monitored_addr in &monitor.addresses {
332						if let Some(spec) = &monitored_addr.contract_spec {
333							let parsed_spec = match spec {
334								ContractSpec::EVM(spec) => spec,
335								_ => {
336									tracing::warn!(
337										"Skipping non-EVM contract spec for address {}",
338										monitored_addr.address
339									);
340									continue;
341								}
342							};
343
344							contract_specs.push((
345								format!(
346									"0x{}",
347									evm_helpers::normalize_address(&monitored_addr.address)
348								),
349								ContractSpec::EVM(parsed_spec.clone()),
350							))
351						}
352					}
353					contract_specs
354				}
355				_ => {
356					vec![]
357				}
358			};
359			all_specs.extend(specs);
360		}
361	}
362	all_specs
363}
364
365/// Creates a trigger handler function that processes trigger events from the block processing
366/// pipeline.
367///
368/// # Arguments
369/// * `shutdown_tx` - Watch channel for shutdown signals
370/// * `trigger_service` - Service for executing triggers
371///
372/// # Returns
373/// Returns a function that handles trigger execution for matching monitors
374pub fn create_trigger_handler<S: TriggerExecutionServiceTrait + Send + Sync + 'static>(
375	shutdown_tx: watch::Sender<bool>,
376	trigger_service: Arc<S>,
377	active_monitors_trigger_scripts: HashMap<String, (ScriptLanguage, String)>,
378) -> Arc<impl Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync> {
379	Arc::new(move |block: &ProcessedBlock| {
380		let mut shutdown_rx = shutdown_tx.subscribe();
381		let trigger_service = trigger_service.clone();
382		let trigger_scripts = active_monitors_trigger_scripts.clone();
383		let block = block.clone();
384
385		tokio::spawn(async move {
386			tokio::select! {
387				_ = async {
388					if block.processing_results.is_empty() {
389						return;
390					}
391					let filtered_matches = run_trigger_filters(&block.processing_results, &block.network_slug, &trigger_scripts).await;
392					for monitor_match in &filtered_matches {
393						if let Err(e) = handle_match(monitor_match.clone(), &*trigger_service, &trigger_scripts).await {
394							TriggerError::execution_error(e.to_string(), Some(e.into()), None);
395						}
396					}
397				} => {}
398				_ = shutdown_rx.changed() => {
399					tracing::info!("Shutting down trigger handling task");
400				}
401			}
402		})
403	})
404}
405
406/// Checks if a network has any active monitors.
407///
408/// # Arguments
409/// * `monitors` - List of monitors to check
410/// * `network_slug` - Network identifier to check for
411///
412/// # Returns
413/// Returns true if there are any active monitors for the given network
414pub fn has_active_monitors(monitors: &[Monitor], network_slug: &String) -> bool {
415	monitors
416		.iter()
417		.any(|m| m.networks.contains(network_slug) && !m.paused)
418}
419
420/// Filters out paused monitors from the provided collection.
421///
422/// # Arguments
423/// * `monitors` - HashMap of monitors to filter
424///
425/// # Returns
426/// Returns a vector containing only active (non-paused) monitors
427fn filter_active_monitors(monitors: HashMap<String, Monitor>) -> Vec<Monitor> {
428	monitors
429		.into_values()
430		.filter(|m| !m.paused)
431		.collect::<Vec<_>>()
432}
433
434/// Filters monitors that are applicable to a specific network.
435///
436/// # Arguments
437/// * `monitors` - List of monitors to filter
438/// * `network_slug` - Network identifier to filter by
439///
440/// # Returns
441/// Returns a vector of monitors that are configured for the specified network
442fn filter_network_monitors(monitors: &[Monitor], network_slug: &String) -> Vec<Monitor> {
443	monitors
444		.iter()
445		.filter(|m| m.networks.contains(network_slug))
446		.cloned()
447		.collect()
448}
449
450async fn execute_trigger_condition(
451	trigger_condition: &TriggerConditions,
452	monitor_match: &MonitorMatch,
453	script_content: &(ScriptLanguage, String),
454) -> bool {
455	let executor = ScriptExecutorFactory::create(&script_content.0, &script_content.1);
456
457	let result = executor
458		.execute(
459			monitor_match.clone(),
460			&trigger_condition.timeout_ms,
461			trigger_condition.arguments.as_deref(),
462			false,
463		)
464		.await;
465
466	match result {
467		Ok(true) => true,
468		Err(e) => {
469			ScriptError::execution_error(e.to_string(), None, None);
470			false
471		}
472		_ => false,
473	}
474}
475
476async fn run_trigger_filters(
477	matches: &[MonitorMatch],
478	_network: &str,
479	trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
480) -> Vec<MonitorMatch> {
481	let mut filtered_matches = vec![];
482
483	for monitor_match in matches {
484		let mut is_filtered = false;
485		let trigger_conditions = match monitor_match {
486			MonitorMatch::EVM(evm_match) => &evm_match.monitor.trigger_conditions,
487			MonitorMatch::Stellar(stellar_match) => &stellar_match.monitor.trigger_conditions,
488		};
489
490		for trigger_condition in trigger_conditions {
491			let monitor_name = match monitor_match {
492				MonitorMatch::EVM(evm_match) => evm_match.monitor.name.clone(),
493				MonitorMatch::Stellar(stellar_match) => stellar_match.monitor.name.clone(),
494			};
495
496			let script_content = trigger_scripts
497				.get(&format!(
498					"{}|{}",
499					normalize_string(&monitor_name),
500					trigger_condition.script_path
501				))
502				.ok_or_else(|| {
503					ScriptError::execution_error("Script content not found".to_string(), None, None)
504				});
505			if let Ok(script_content) = script_content {
506				if execute_trigger_condition(trigger_condition, monitor_match, script_content).await
507				{
508					is_filtered = true;
509					break;
510				}
511			}
512		}
513		if !is_filtered {
514			filtered_matches.push(monitor_match.clone());
515		}
516	}
517
518	filtered_matches
519}
520
521#[cfg(test)]
522mod tests {
523	use super::*;
524	use crate::{
525		models::{
526			EVMMonitorMatch, EVMReceiptLog, EVMTransaction, EVMTransactionReceipt, MatchConditions,
527			Monitor, MonitorMatch, ScriptLanguage, StellarBlock, StellarMonitorMatch,
528			StellarTransaction, StellarTransactionInfo, TriggerConditions,
529		},
530		utils::tests::{builders::evm::monitor::MonitorBuilder, evm::receipt::ReceiptBuilder},
531	};
532	use alloy::{
533		consensus::{transaction::Recovered, Signed, TxEnvelope},
534		primitives::{Address, Bytes, TxKind, B256, U256},
535	};
536	use std::io::Write;
537	use tempfile::NamedTempFile;
538
539	// Helper function to create a temporary script file
540	fn create_temp_script(content: &str) -> NamedTempFile {
541		let mut file = NamedTempFile::new().unwrap();
542		file.write_all(content.as_bytes()).unwrap();
543		file
544	}
545	fn create_test_monitor(
546		name: &str,
547		networks: Vec<&str>,
548		paused: bool,
549		script_path: Option<&str>,
550	) -> Monitor {
551		let mut builder = MonitorBuilder::new()
552			.name(name)
553			.networks(networks.into_iter().map(|s| s.to_string()).collect())
554			.paused(paused);
555
556		if let Some(path) = script_path {
557			builder = builder.trigger_condition(path, 1000, ScriptLanguage::Python, None);
558		}
559
560		builder.build()
561	}
562
563	fn create_test_evm_transaction_receipt() -> EVMTransactionReceipt {
564		ReceiptBuilder::new().build()
565	}
566
567	fn create_test_evm_logs() -> Vec<EVMReceiptLog> {
568		ReceiptBuilder::new().build().logs.clone()
569	}
570
571	fn create_test_evm_transaction() -> EVMTransaction {
572		let tx = alloy::consensus::TxLegacy {
573			chain_id: None,
574			nonce: 0,
575			gas_price: 0,
576			gas_limit: 0,
577			to: TxKind::Call(Address::ZERO),
578			value: U256::ZERO,
579			input: Bytes::default(),
580		};
581
582		let signature =
583			alloy::signers::Signature::from_scalars_and_parity(B256::ZERO, B256::ZERO, false);
584
585		let hash = B256::ZERO;
586
587		EVMTransaction::from(alloy::rpc::types::Transaction {
588			inner: Recovered::new_unchecked(
589				TxEnvelope::Legacy(Signed::new_unchecked(tx, signature, hash)),
590				Address::ZERO,
591			),
592			block_hash: None,
593			block_number: None,
594			transaction_index: None,
595			effective_gas_price: None,
596		})
597	}
598
599	fn create_test_stellar_transaction() -> StellarTransaction {
600		StellarTransaction::from({
601			StellarTransactionInfo {
602				..Default::default()
603			}
604		})
605	}
606
607	fn create_test_stellar_block() -> StellarBlock {
608		StellarBlock::default()
609	}
610
611	fn create_mock_monitor_match_from_path(
612		blockchain_type: BlockChainType,
613		script_path: Option<&str>,
614	) -> MonitorMatch {
615		match blockchain_type {
616			BlockChainType::EVM => MonitorMatch::EVM(Box::new(EVMMonitorMatch {
617				monitor: create_test_monitor("test", vec![], false, script_path),
618				transaction: create_test_evm_transaction(),
619				receipt: Some(create_test_evm_transaction_receipt()),
620				logs: Some(create_test_evm_logs()),
621				network_slug: "ethereum_mainnet".to_string(),
622				matched_on: MatchConditions {
623					functions: vec![],
624					events: vec![],
625					transactions: vec![],
626				},
627				matched_on_args: None,
628			})),
629			BlockChainType::Stellar => MonitorMatch::Stellar(Box::new(StellarMonitorMatch {
630				monitor: create_test_monitor("test", vec![], false, script_path),
631				transaction: create_test_stellar_transaction(),
632				ledger: create_test_stellar_block(),
633				network_slug: "stellar_mainnet".to_string(),
634				matched_on: MatchConditions {
635					functions: vec![],
636					events: vec![],
637					transactions: vec![],
638				},
639				matched_on_args: None,
640			})),
641			BlockChainType::Midnight => unimplemented!(),
642			BlockChainType::Solana => unimplemented!(),
643		}
644	}
645
646	fn create_mock_monitor_match_from_monitor(
647		blockchain_type: BlockChainType,
648		monitor: Monitor,
649	) -> MonitorMatch {
650		match blockchain_type {
651			BlockChainType::EVM => MonitorMatch::EVM(Box::new(EVMMonitorMatch {
652				monitor,
653				transaction: create_test_evm_transaction(),
654				receipt: Some(create_test_evm_transaction_receipt()),
655				logs: Some(create_test_evm_logs()),
656				network_slug: "ethereum_mainnet".to_string(),
657				matched_on: MatchConditions {
658					functions: vec![],
659					events: vec![],
660					transactions: vec![],
661				},
662				matched_on_args: None,
663			})),
664			BlockChainType::Stellar => MonitorMatch::Stellar(Box::new(StellarMonitorMatch {
665				monitor,
666				transaction: create_test_stellar_transaction(),
667				ledger: create_test_stellar_block(),
668				network_slug: "stellar_mainnet".to_string(),
669				matched_on: MatchConditions {
670					functions: vec![],
671					events: vec![],
672					transactions: vec![],
673				},
674				matched_on_args: None,
675			})),
676			BlockChainType::Midnight => unimplemented!(),
677			BlockChainType::Solana => unimplemented!(),
678		}
679	}
680
681	fn matches_equal(a: &MonitorMatch, b: &MonitorMatch) -> bool {
682		match (a, b) {
683			(MonitorMatch::EVM(a), MonitorMatch::EVM(b)) => a.monitor.name == b.monitor.name,
684			(MonitorMatch::Stellar(a), MonitorMatch::Stellar(b)) => {
685				a.monitor.name == b.monitor.name
686			}
687			_ => false,
688		}
689	}
690
691	#[test]
692	fn test_has_active_monitors() {
693		let monitors = vec![
694			create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
695			create_test_monitor("2", vec!["ethereum_sepolia"], false, None),
696			create_test_monitor(
697				"3",
698				vec!["ethereum_mainnet", "ethereum_sepolia"],
699				false,
700				None,
701			),
702			create_test_monitor("4", vec!["stellar_mainnet"], true, None),
703		];
704
705		assert!(has_active_monitors(
706			&monitors,
707			&"ethereum_mainnet".to_string()
708		));
709		assert!(has_active_monitors(
710			&monitors,
711			&"ethereum_sepolia".to_string()
712		));
713		assert!(!has_active_monitors(
714			&monitors,
715			&"solana_mainnet".to_string()
716		));
717		assert!(!has_active_monitors(
718			&monitors,
719			&"stellar_mainnet".to_string()
720		));
721	}
722
723	#[test]
724	fn test_filter_active_monitors() {
725		let mut monitors = HashMap::new();
726		monitors.insert(
727			"1".to_string(),
728			create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
729		);
730		monitors.insert(
731			"2".to_string(),
732			create_test_monitor("2", vec!["stellar_mainnet"], true, None),
733		);
734		monitors.insert(
735			"3".to_string(),
736			create_test_monitor("3", vec!["ethereum_mainnet"], false, None),
737		);
738
739		let active_monitors = filter_active_monitors(monitors);
740		assert_eq!(active_monitors.len(), 2);
741		assert!(active_monitors.iter().all(|m| !m.paused));
742	}
743
744	#[test]
745	fn test_filter_network_monitors() {
746		let monitors = vec![
747			create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
748			create_test_monitor("2", vec!["stellar_mainnet"], true, None),
749			create_test_monitor(
750				"3",
751				vec!["ethereum_mainnet", "stellar_mainnet"],
752				false,
753				None,
754			),
755		];
756
757		let eth_monitors = filter_network_monitors(&monitors, &"ethereum_mainnet".to_string());
758		assert_eq!(eth_monitors.len(), 2);
759		assert!(eth_monitors
760			.iter()
761			.all(|m| m.networks.contains(&"ethereum_mainnet".to_string())));
762
763		let stellar_monitors = filter_network_monitors(&monitors, &"stellar_mainnet".to_string());
764		assert_eq!(stellar_monitors.len(), 2);
765		assert!(stellar_monitors
766			.iter()
767			.all(|m| m.networks.contains(&"stellar_mainnet".to_string())));
768
769		let sol_monitors = filter_network_monitors(&monitors, &"solana_mainnet".to_string());
770		assert!(sol_monitors.is_empty());
771	}
772
773	#[tokio::test]
774	async fn test_run_trigger_filters_empty_matches() {
775		// Create empty matches vector
776		let matches: Vec<MonitorMatch> = vec![];
777
778		// Create trigger scripts with a more realistic script path
779		let mut trigger_scripts = HashMap::new();
780		trigger_scripts.insert(
781			"monitor_test-test.py".to_string(), // Using a more realistic key format
782			(
783				ScriptLanguage::Python,
784				r#"
785import sys
786import json
787
788input_data = sys.stdin.read()
789data = json.loads(input_data)
790print(False)
791            "#
792				.to_string(),
793			),
794		);
795
796		// Test the filter function
797		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
798		assert!(filtered.is_empty());
799	}
800
801	#[tokio::test]
802	async fn test_run_trigger_filters_true_condition() {
803		let script_content = r#"
804import sys
805import json
806
807input_json = sys.argv[1]
808data = json.loads(input_json)
809print("debugging...")
810def test():
811	return True
812result = test()
813print(result)
814"#;
815		let temp_file = create_temp_script(script_content);
816		let mut trigger_scripts = HashMap::new();
817		trigger_scripts.insert(
818			format!("test-{}", temp_file.path().to_str().unwrap()),
819			(ScriptLanguage::Python, script_content.to_string()),
820		);
821		let match_item = create_mock_monitor_match_from_path(
822			BlockChainType::EVM,
823			Some(temp_file.path().to_str().unwrap()),
824		);
825		let matches = vec![match_item.clone()];
826
827		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
828		assert_eq!(filtered.len(), 1);
829		assert!(matches_equal(&filtered[0], &match_item));
830	}
831
832	#[tokio::test]
833	async fn test_run_trigger_filters_false_condition() {
834		let script_content = r#"
835import sys
836import json
837
838input_data = sys.stdin.read()
839data = json.loads(input_data)
840print("debugging...")
841def test():
842	return False
843result = test()
844print(result)
845"#;
846		let temp_file = create_temp_script(script_content);
847		let mut trigger_scripts = HashMap::new();
848		trigger_scripts.insert(
849			format!("test-{}", temp_file.path().to_str().unwrap()),
850			(ScriptLanguage::Python, script_content.to_string()),
851		);
852		let match_item = create_mock_monitor_match_from_path(
853			BlockChainType::EVM,
854			Some(temp_file.path().to_str().unwrap()),
855		);
856		let matches = vec![match_item.clone()];
857
858		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
859		assert_eq!(filtered.len(), 1);
860	}
861
862	#[tokio::test]
863	async fn test_execute_trigger_condition_returns_false() {
864		let script_content = r#"print(False)  # Script returns false"#;
865		let temp_file = create_temp_script(script_content);
866		let trigger_condition = TriggerConditions {
867			language: ScriptLanguage::Python,
868			script_path: temp_file.path().to_str().unwrap().to_string(),
869			timeout_ms: 1000,
870			arguments: None,
871		};
872		let match_item = create_mock_monitor_match_from_path(
873			BlockChainType::EVM,
874			Some(temp_file.path().to_str().unwrap()),
875		);
876		let script_content = (ScriptLanguage::Python, script_content.to_string());
877
878		let result =
879			execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
880		assert!(!result); // Should be false when script returns false
881	}
882
883	#[tokio::test]
884	async fn test_execute_trigger_condition_script_error() {
885		let script_content = r#"raise Exception("Test error")  # Raise an error"#;
886		let temp_file = create_temp_script(script_content);
887		let trigger_condition = TriggerConditions {
888			language: ScriptLanguage::Python,
889			script_path: temp_file.path().to_str().unwrap().to_string(),
890			timeout_ms: 1000,
891			arguments: None,
892		};
893		let match_item = create_mock_monitor_match_from_path(
894			BlockChainType::EVM,
895			Some(temp_file.path().to_str().unwrap()),
896		);
897		let script_content = (ScriptLanguage::Python, script_content.to_string());
898
899		let result =
900			execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
901		assert!(!result); // Should be false when script errors
902	}
903
904	#[tokio::test]
905	async fn test_execute_trigger_condition_invalid_script() {
906		let trigger_condition = TriggerConditions {
907			language: ScriptLanguage::Python,
908			script_path: "non_existent_script.py".to_string(),
909			timeout_ms: 1000,
910			arguments: None,
911		};
912		let match_item = create_mock_monitor_match_from_path(
913			BlockChainType::EVM,
914			Some("non_existent_script.py"),
915		);
916		let script_content = (ScriptLanguage::Python, "invalid script content".to_string());
917
918		let result =
919			execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
920		assert!(!result); // Should be false for invalid script
921	}
922
923	#[tokio::test]
924	async fn test_run_trigger_filters_multiple_conditions_keep_match() {
925		// Create a monitor with two trigger conditions
926		let monitor = MonitorBuilder::new()
927			.name("monitor_test")
928			.networks(vec!["ethereum_mainnet".to_string()])
929			.trigger_condition("test1.py", 1000, ScriptLanguage::Python, None)
930			.trigger_condition("test2.py", 1000, ScriptLanguage::Python, None)
931			.build();
932
933		// Create a match with this monitor
934		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
935
936		let mut trigger_scripts = HashMap::new();
937		trigger_scripts.insert(
938			"monitor_test|test1.py".to_string(),
939			(
940				ScriptLanguage::Python,
941				r#"
942import sys
943import json
944
945input_data = sys.stdin.read()
946data = json.loads(input_data)
947print(True)
948"#
949				.to_string(),
950			),
951		);
952		trigger_scripts.insert(
953			"monitor_test|test2.py".to_string(),
954			(
955				ScriptLanguage::Python,
956				r#"
957import sys
958import json
959input_data = sys.stdin.read()
960data = json.loads(input_data)
961print(True)
962"#
963				.to_string(),
964			),
965		);
966
967		// Run the filter with our test data
968		let matches = vec![match_item.clone()];
969		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
970
971		assert_eq!(filtered.len(), 0);
972	}
973
974	#[tokio::test]
975	async fn test_run_trigger_filters_condition_two_combinations_exclude_match() {
976		let monitor = MonitorBuilder::new()
977			.name("monitor_test")
978			.networks(vec!["ethereum_mainnet".to_string()])
979			.trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
980			.trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
981			.build();
982
983		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
984
985		// Test case 1: All conditions return true - match should be kept
986		let mut trigger_scripts = HashMap::new();
987		trigger_scripts.insert(
988			"monitor_test|condition1.py".to_string(),
989			(ScriptLanguage::Python, "print(True)".to_string()),
990		);
991		trigger_scripts.insert(
992			"monitor_test|condition2.py".to_string(),
993			(ScriptLanguage::Python, "print(False)".to_string()),
994		);
995
996		let matches = vec![match_item.clone()];
997		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
998		assert_eq!(filtered.len(), 0);
999	}
1000
1001	#[tokio::test]
1002	async fn test_run_trigger_filters_condition_two_combinations_keep_match() {
1003		let monitor = MonitorBuilder::new()
1004			.name("monitor_test")
1005			.networks(vec!["ethereum_mainnet".to_string()])
1006			.trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1007			.trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1008			.build();
1009
1010		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1011
1012		let mut trigger_scripts = HashMap::new();
1013		trigger_scripts.insert(
1014			"monitor_test|condition1.py".to_string(),
1015			(ScriptLanguage::Python, "print(False)".to_string()),
1016		);
1017		trigger_scripts.insert(
1018			"monitor_test|condition2.py".to_string(),
1019			(ScriptLanguage::Python, "print(False)".to_string()),
1020		);
1021
1022		let matches = vec![match_item.clone()];
1023		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1024		assert_eq!(filtered.len(), 1);
1025	}
1026
1027	#[tokio::test]
1028	async fn test_run_trigger_filters_condition_two_combinations_exclude_match_last_condition() {
1029		let monitor = MonitorBuilder::new()
1030			.name("monitor_test")
1031			.networks(vec!["ethereum_mainnet".to_string()])
1032			.trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1033			.trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1034			.build();
1035
1036		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1037
1038		let mut trigger_scripts = HashMap::new();
1039		trigger_scripts.insert(
1040			"monitor_test|condition1.py".to_string(),
1041			(ScriptLanguage::Python, "print(False)".to_string()),
1042		);
1043		trigger_scripts.insert(
1044			"monitor_test|condition2.py".to_string(),
1045			(ScriptLanguage::Python, "print(True)".to_string()),
1046		);
1047
1048		let matches = vec![match_item.clone()];
1049		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1050		assert_eq!(filtered.len(), 0);
1051	}
1052
1053	#[tokio::test]
1054	async fn test_run_trigger_filters_condition_three_combinations_exclude_match() {
1055		let monitor = MonitorBuilder::new()
1056			.name("monitor_test")
1057			.networks(vec!["ethereum_mainnet".to_string()])
1058			.trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1059			.trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1060			.trigger_condition("condition3.py", 1000, ScriptLanguage::Python, None)
1061			.build();
1062
1063		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1064
1065		let mut trigger_scripts = HashMap::new();
1066		trigger_scripts.insert(
1067			"monitor_test|condition1.py".to_string(),
1068			(ScriptLanguage::Python, "print(False)".to_string()),
1069		);
1070		trigger_scripts.insert(
1071			"monitor_test|condition2.py".to_string(),
1072			(ScriptLanguage::Python, "print(False)".to_string()),
1073		);
1074		trigger_scripts.insert(
1075			"monitor_test|condition3.py".to_string(),
1076			(ScriptLanguage::Python, "print(True)".to_string()),
1077		);
1078
1079		let matches = vec![match_item.clone()];
1080		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1081		assert_eq!(filtered.len(), 0);
1082	}
1083
1084	#[tokio::test]
1085	async fn test_run_trigger_filters_condition_three_combinations_keep_match() {
1086		let monitor = MonitorBuilder::new()
1087			.name("monitor_test")
1088			.networks(vec!["ethereum_mainnet".to_string()])
1089			.trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1090			.trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1091			.trigger_condition("condition3.py", 1000, ScriptLanguage::Python, None)
1092			.build();
1093
1094		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1095
1096		let mut trigger_scripts = HashMap::new();
1097		trigger_scripts.insert(
1098			"monitor_test|condition1.py".to_string(),
1099			(ScriptLanguage::Python, "print(False)".to_string()),
1100		);
1101		trigger_scripts.insert(
1102			"monitor_test|condition2.py".to_string(),
1103			(ScriptLanguage::Python, "print(False)".to_string()),
1104		);
1105		trigger_scripts.insert(
1106			"monitor_test|condition3.py".to_string(),
1107			(ScriptLanguage::Python, "print(False)".to_string()),
1108		);
1109
1110		let matches = vec![match_item.clone()];
1111		let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1112		assert_eq!(filtered.len(), 1);
1113	}
1114
1115	// Add these new test cases
1116	#[tokio::test]
1117	async fn test_run_trigger_filters_stellar_empty_matches() {
1118		let matches: Vec<MonitorMatch> = vec![];
1119		let mut trigger_scripts = HashMap::new();
1120		trigger_scripts.insert(
1121			"monitor_test|test.py".to_string(),
1122			(
1123				ScriptLanguage::Python,
1124				r#"
1125import sys
1126import json
1127
1128input_data = sys.stdin.read()
1129data = json.loads(input_data)
1130print(False)
1131"#
1132				.to_string(),
1133			),
1134		);
1135
1136		let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1137		assert!(filtered.is_empty());
1138	}
1139
1140	#[tokio::test]
1141	async fn test_run_trigger_filters_stellar_true_condition() {
1142		let script_content = r#"
1143import sys
1144import json
1145
1146input_json = sys.argv[1]
1147data = json.loads(input_json)
1148print("debugging...")
1149def test():
1150	return True
1151result = test()
1152print(result)
1153"#;
1154		let temp_file = create_temp_script(script_content);
1155		let mut trigger_scripts = HashMap::new();
1156		trigger_scripts.insert(
1157			format!("test|{}", temp_file.path().to_str().unwrap()),
1158			(ScriptLanguage::Python, script_content.to_string()),
1159		);
1160		let match_item = create_mock_monitor_match_from_path(
1161			BlockChainType::Stellar,
1162			Some(temp_file.path().to_str().unwrap()),
1163		);
1164		let matches = vec![match_item.clone()];
1165
1166		let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1167		assert_eq!(filtered.len(), 1);
1168		assert!(matches_equal(&filtered[0], &match_item));
1169	}
1170
1171	#[tokio::test]
1172	async fn test_run_trigger_filters_stellar_multiple_conditions() {
1173		let monitor = MonitorBuilder::new()
1174			.name("monitor_test")
1175			.networks(vec!["stellar_mainnet".to_string()])
1176			.trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1177			.trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1178			.build();
1179
1180		let match_item = create_mock_monitor_match_from_monitor(BlockChainType::Stellar, monitor);
1181
1182		let mut trigger_scripts = HashMap::new();
1183		trigger_scripts.insert(
1184			"monitor_test|condition1.py".to_string(),
1185			(ScriptLanguage::Python, "print(False)".to_string()),
1186		);
1187		trigger_scripts.insert(
1188			"monitor_test|condition2.py".to_string(),
1189			(ScriptLanguage::Python, "print(True)".to_string()),
1190		);
1191
1192		let matches = vec![match_item.clone()];
1193		let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1194		assert_eq!(filtered.len(), 0); // Match should be filtered out because condition2 returns true
1195	}
1196}