1use futures::future::BoxFuture;
19use std::{collections::HashMap, error::Error, sync::Arc};
20use tokio::sync::{watch, Mutex};
21
22use crate::{
23 models::{
24 BlockChainType, BlockType, ContractSpec, Monitor, MonitorMatch, Network, ProcessedBlock,
25 ScriptLanguage, TriggerConditions,
26 },
27 repositories::{
28 MonitorRepositoryTrait, MonitorService, NetworkRepositoryTrait, NetworkService,
29 TriggerRepositoryTrait, TriggerService,
30 },
31 services::{
32 blockchain::{BlockChainClient, BlockFilterFactory, ClientPoolTrait},
33 filter::{evm_helpers, handle_match, stellar_helpers, FilterService},
34 notification::NotificationService,
35 trigger::{
36 ScriptError, ScriptExecutorFactory, TriggerError, TriggerExecutionService,
37 TriggerExecutionServiceTrait,
38 },
39 },
40 utils::normalize_string,
41};
42
43pub type Result<T> = std::result::Result<T, Box<dyn Error>>;
45
46type ServiceResult<M, N, T> = Result<(
47 Arc<FilterService>,
48 Arc<TriggerExecutionService<T>>,
49 Vec<Monitor>,
50 HashMap<String, Network>,
51 Arc<Mutex<MonitorService<M, N, T>>>,
52 Arc<Mutex<NetworkService<N>>>,
53 Arc<Mutex<TriggerService<T>>>,
54)>;
55
56pub async fn initialize_services<M, N, T>(
70 monitor_service: Option<MonitorService<M, N, T>>,
71 network_service: Option<NetworkService<N>>,
72 trigger_service: Option<TriggerService<T>>,
73) -> ServiceResult<M, N, T>
74where
75 M: MonitorRepositoryTrait<N, T> + Send + Sync + 'static,
76 N: NetworkRepositoryTrait + Send + Sync + 'static,
77 T: TriggerRepositoryTrait + Send + Sync + 'static,
78{
79 let network_service = match network_service {
80 Some(service) => service,
81 None => {
82 let repository = N::new(None).await?;
83 NetworkService::<N>::new_with_repository(repository)?
84 }
85 };
86
87 let trigger_service = match trigger_service {
88 Some(service) => service,
89 None => {
90 let repository = T::new(None).await?;
91 TriggerService::<T>::new_with_repository(repository)?
92 }
93 };
94
95 let monitor_service = match monitor_service {
96 Some(service) => service,
97 None => {
98 let repository = M::new(
99 None,
100 Some(network_service.clone()),
101 Some(trigger_service.clone()),
102 )
103 .await?;
104 MonitorService::<M, N, T>::new_with_repository(repository)?
105 }
106 };
107
108 let notification_service = NotificationService::new();
109
110 let filter_service = Arc::new(FilterService::new());
111 let trigger_execution_service = Arc::new(TriggerExecutionService::new(
112 trigger_service.clone(),
113 notification_service,
114 ));
115
116 let monitors = monitor_service.get_all();
117 let active_monitors = filter_active_monitors(monitors);
118 let networks = network_service.get_all();
119
120 Ok((
121 filter_service,
122 trigger_execution_service,
123 active_monitors,
124 networks,
125 Arc::new(Mutex::new(monitor_service)),
126 Arc::new(Mutex::new(network_service)),
127 Arc::new(Mutex::new(trigger_service)),
128 ))
129}
130
131pub fn create_block_handler<P: ClientPoolTrait + 'static>(
142 shutdown_tx: watch::Sender<bool>,
143 filter_service: Arc<FilterService>,
144 active_monitors: Vec<Monitor>,
145 client_pools: Arc<P>,
146 contract_specs: Vec<(String, ContractSpec)>,
147) -> Arc<impl Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync> {
148 Arc::new(
149 move |block: BlockType, network: Network| -> BoxFuture<'static, ProcessedBlock> {
150 let filter_service = filter_service.clone();
151 let active_monitors = active_monitors.clone();
152 let client_pools = client_pools.clone();
153 let shutdown_tx = shutdown_tx.clone();
154 let contract_specs = contract_specs.clone();
155 Box::pin(async move {
156 let applicable_monitors = filter_network_monitors(&active_monitors, &network.slug);
157
158 let mut processed_block = ProcessedBlock {
159 block_number: block.number().unwrap_or(0),
160 network_slug: network.slug.clone(),
161 processing_results: Vec::new(),
162 };
163
164 if !applicable_monitors.is_empty() {
165 let mut shutdown_rx = shutdown_tx.subscribe();
166
167 let matches = match network.network_type {
168 BlockChainType::EVM => match client_pools.get_evm_client(&network).await {
169 Ok(client) => {
170 process_block(
171 client.as_ref(),
172 &network,
173 &block,
174 &applicable_monitors,
175 Some(&contract_specs),
176 &filter_service,
177 &mut shutdown_rx,
178 )
179 .await
180 }
181 Err(_) => None,
182 },
183 BlockChainType::Stellar => {
184 match client_pools.get_stellar_client(&network).await {
185 Ok(client) => {
186 process_block(
187 client.as_ref(),
188 &network,
189 &block,
190 &applicable_monitors,
191 Some(&contract_specs),
192 &filter_service,
193 &mut shutdown_rx,
194 )
195 .await
196 }
197 Err(_) => None,
198 }
199 }
200 BlockChainType::Midnight => None,
201 BlockChainType::Solana => None,
202 };
203
204 processed_block.processing_results = matches.unwrap_or_default();
205 }
206
207 processed_block
208 })
209 },
210 )
211}
212
213pub async fn process_block<T>(
223 client: &T,
224 network: &Network,
225 block: &BlockType,
226 applicable_monitors: &[Monitor],
227 contract_specs: Option<&[(String, ContractSpec)]>,
228 filter_service: &FilterService,
229 shutdown_rx: &mut watch::Receiver<bool>,
230) -> Option<Vec<MonitorMatch>>
231where
232 T: BlockChainClient + BlockFilterFactory<T>,
233{
234 tokio::select! {
235 result = filter_service.filter_block(client, network, block, applicable_monitors, contract_specs) => {
236 result.ok()
237 }
238 _ = shutdown_rx.changed() => {
239 tracing::info!("Shutting down block processing task");
240 None
241 }
242 }
243}
244
245pub async fn get_contract_specs<P: ClientPoolTrait + 'static>(
254 client_pool: &Arc<P>,
255 network_monitors: &[(Network, Vec<Monitor>)],
256) -> Vec<(String, ContractSpec)> {
257 let mut all_specs = Vec::new();
258
259 for (network, monitors) in network_monitors {
260 for monitor in monitors {
261 let specs = match network.network_type {
262 BlockChainType::Stellar => {
263 let mut contract_specs = Vec::new();
264 let mut addresses_without_specs = Vec::new();
265 for monitored_addr in &monitor.addresses {
267 if let Some(spec) = &monitored_addr.contract_spec {
268 let parsed_spec = match spec {
269 ContractSpec::Stellar(spec) => spec,
270 _ => {
271 tracing::warn!(
272 "Skipping non-Stellar contract spec for address {}",
273 monitored_addr.address
274 );
275 continue;
276 }
277 };
278
279 contract_specs.push((
280 stellar_helpers::normalize_address(&monitored_addr.address),
281 ContractSpec::Stellar(parsed_spec.clone()),
282 ))
283 } else {
284 addresses_without_specs.push(monitored_addr.address.clone());
285 }
286 }
287
288 if !addresses_without_specs.is_empty() {
290 let client: Arc<P::StellarClient> =
292 match client_pool.get_stellar_client(network).await {
293 Ok(client) => client,
294 Err(_) => {
295 tracing::warn!("Failed to get stellar client");
296 continue;
297 }
298 };
299
300 let chain_specs = futures::future::join_all(
301 addresses_without_specs.iter().map(|address| {
302 let client = client.clone();
303 async move {
304 let spec = client.get_contract_spec(address).await;
305 (address.clone(), spec)
306 }
307 }),
308 )
309 .await
310 .into_iter()
311 .filter_map(|(addr, spec)| match spec {
312 Ok(s) => Some((addr, s)),
313 Err(e) => {
314 tracing::warn!(
315 "Failed to fetch contract spec for address {}: {:?}",
316 addr,
317 e
318 );
319 None
320 }
321 })
322 .collect::<Vec<_>>();
323
324 contract_specs.extend(chain_specs);
325 }
326 contract_specs
327 }
328 BlockChainType::EVM => {
329 let mut contract_specs = Vec::new();
330 for monitored_addr in &monitor.addresses {
332 if let Some(spec) = &monitored_addr.contract_spec {
333 let parsed_spec = match spec {
334 ContractSpec::EVM(spec) => spec,
335 _ => {
336 tracing::warn!(
337 "Skipping non-EVM contract spec for address {}",
338 monitored_addr.address
339 );
340 continue;
341 }
342 };
343
344 contract_specs.push((
345 format!(
346 "0x{}",
347 evm_helpers::normalize_address(&monitored_addr.address)
348 ),
349 ContractSpec::EVM(parsed_spec.clone()),
350 ))
351 }
352 }
353 contract_specs
354 }
355 _ => {
356 vec![]
357 }
358 };
359 all_specs.extend(specs);
360 }
361 }
362 all_specs
363}
364
365pub fn create_trigger_handler<S: TriggerExecutionServiceTrait + Send + Sync + 'static>(
375 shutdown_tx: watch::Sender<bool>,
376 trigger_service: Arc<S>,
377 active_monitors_trigger_scripts: HashMap<String, (ScriptLanguage, String)>,
378) -> Arc<impl Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync> {
379 Arc::new(move |block: &ProcessedBlock| {
380 let mut shutdown_rx = shutdown_tx.subscribe();
381 let trigger_service = trigger_service.clone();
382 let trigger_scripts = active_monitors_trigger_scripts.clone();
383 let block = block.clone();
384
385 tokio::spawn(async move {
386 tokio::select! {
387 _ = async {
388 if block.processing_results.is_empty() {
389 return;
390 }
391 let filtered_matches = run_trigger_filters(&block.processing_results, &block.network_slug, &trigger_scripts).await;
392 for monitor_match in &filtered_matches {
393 if let Err(e) = handle_match(monitor_match.clone(), &*trigger_service, &trigger_scripts).await {
394 TriggerError::execution_error(e.to_string(), Some(e.into()), None);
395 }
396 }
397 } => {}
398 _ = shutdown_rx.changed() => {
399 tracing::info!("Shutting down trigger handling task");
400 }
401 }
402 })
403 })
404}
405
406pub fn has_active_monitors(monitors: &[Monitor], network_slug: &String) -> bool {
415 monitors
416 .iter()
417 .any(|m| m.networks.contains(network_slug) && !m.paused)
418}
419
420fn filter_active_monitors(monitors: HashMap<String, Monitor>) -> Vec<Monitor> {
428 monitors
429 .into_values()
430 .filter(|m| !m.paused)
431 .collect::<Vec<_>>()
432}
433
434fn filter_network_monitors(monitors: &[Monitor], network_slug: &String) -> Vec<Monitor> {
443 monitors
444 .iter()
445 .filter(|m| m.networks.contains(network_slug))
446 .cloned()
447 .collect()
448}
449
450async fn execute_trigger_condition(
451 trigger_condition: &TriggerConditions,
452 monitor_match: &MonitorMatch,
453 script_content: &(ScriptLanguage, String),
454) -> bool {
455 let executor = ScriptExecutorFactory::create(&script_content.0, &script_content.1);
456
457 let result = executor
458 .execute(
459 monitor_match.clone(),
460 &trigger_condition.timeout_ms,
461 trigger_condition.arguments.as_deref(),
462 false,
463 )
464 .await;
465
466 match result {
467 Ok(true) => true,
468 Err(e) => {
469 ScriptError::execution_error(e.to_string(), None, None);
470 false
471 }
472 _ => false,
473 }
474}
475
476async fn run_trigger_filters(
477 matches: &[MonitorMatch],
478 _network: &str,
479 trigger_scripts: &HashMap<String, (ScriptLanguage, String)>,
480) -> Vec<MonitorMatch> {
481 let mut filtered_matches = vec![];
482
483 for monitor_match in matches {
484 let mut is_filtered = false;
485 let trigger_conditions = match monitor_match {
486 MonitorMatch::EVM(evm_match) => &evm_match.monitor.trigger_conditions,
487 MonitorMatch::Stellar(stellar_match) => &stellar_match.monitor.trigger_conditions,
488 };
489
490 for trigger_condition in trigger_conditions {
491 let monitor_name = match monitor_match {
492 MonitorMatch::EVM(evm_match) => evm_match.monitor.name.clone(),
493 MonitorMatch::Stellar(stellar_match) => stellar_match.monitor.name.clone(),
494 };
495
496 let script_content = trigger_scripts
497 .get(&format!(
498 "{}|{}",
499 normalize_string(&monitor_name),
500 trigger_condition.script_path
501 ))
502 .ok_or_else(|| {
503 ScriptError::execution_error("Script content not found".to_string(), None, None)
504 });
505 if let Ok(script_content) = script_content {
506 if execute_trigger_condition(trigger_condition, monitor_match, script_content).await
507 {
508 is_filtered = true;
509 break;
510 }
511 }
512 }
513 if !is_filtered {
514 filtered_matches.push(monitor_match.clone());
515 }
516 }
517
518 filtered_matches
519}
520
521#[cfg(test)]
522mod tests {
523 use super::*;
524 use crate::{
525 models::{
526 EVMMonitorMatch, EVMReceiptLog, EVMTransaction, EVMTransactionReceipt, MatchConditions,
527 Monitor, MonitorMatch, ScriptLanguage, StellarBlock, StellarMonitorMatch,
528 StellarTransaction, StellarTransactionInfo, TriggerConditions,
529 },
530 utils::tests::{builders::evm::monitor::MonitorBuilder, evm::receipt::ReceiptBuilder},
531 };
532 use alloy::{
533 consensus::{transaction::Recovered, Signed, TxEnvelope},
534 primitives::{Address, Bytes, TxKind, B256, U256},
535 };
536 use std::io::Write;
537 use tempfile::NamedTempFile;
538
539 fn create_temp_script(content: &str) -> NamedTempFile {
541 let mut file = NamedTempFile::new().unwrap();
542 file.write_all(content.as_bytes()).unwrap();
543 file
544 }
545 fn create_test_monitor(
546 name: &str,
547 networks: Vec<&str>,
548 paused: bool,
549 script_path: Option<&str>,
550 ) -> Monitor {
551 let mut builder = MonitorBuilder::new()
552 .name(name)
553 .networks(networks.into_iter().map(|s| s.to_string()).collect())
554 .paused(paused);
555
556 if let Some(path) = script_path {
557 builder = builder.trigger_condition(path, 1000, ScriptLanguage::Python, None);
558 }
559
560 builder.build()
561 }
562
563 fn create_test_evm_transaction_receipt() -> EVMTransactionReceipt {
564 ReceiptBuilder::new().build()
565 }
566
567 fn create_test_evm_logs() -> Vec<EVMReceiptLog> {
568 ReceiptBuilder::new().build().logs.clone()
569 }
570
571 fn create_test_evm_transaction() -> EVMTransaction {
572 let tx = alloy::consensus::TxLegacy {
573 chain_id: None,
574 nonce: 0,
575 gas_price: 0,
576 gas_limit: 0,
577 to: TxKind::Call(Address::ZERO),
578 value: U256::ZERO,
579 input: Bytes::default(),
580 };
581
582 let signature =
583 alloy::signers::Signature::from_scalars_and_parity(B256::ZERO, B256::ZERO, false);
584
585 let hash = B256::ZERO;
586
587 EVMTransaction::from(alloy::rpc::types::Transaction {
588 inner: Recovered::new_unchecked(
589 TxEnvelope::Legacy(Signed::new_unchecked(tx, signature, hash)),
590 Address::ZERO,
591 ),
592 block_hash: None,
593 block_number: None,
594 transaction_index: None,
595 effective_gas_price: None,
596 })
597 }
598
599 fn create_test_stellar_transaction() -> StellarTransaction {
600 StellarTransaction::from({
601 StellarTransactionInfo {
602 ..Default::default()
603 }
604 })
605 }
606
607 fn create_test_stellar_block() -> StellarBlock {
608 StellarBlock::default()
609 }
610
611 fn create_mock_monitor_match_from_path(
612 blockchain_type: BlockChainType,
613 script_path: Option<&str>,
614 ) -> MonitorMatch {
615 match blockchain_type {
616 BlockChainType::EVM => MonitorMatch::EVM(Box::new(EVMMonitorMatch {
617 monitor: create_test_monitor("test", vec![], false, script_path),
618 transaction: create_test_evm_transaction(),
619 receipt: Some(create_test_evm_transaction_receipt()),
620 logs: Some(create_test_evm_logs()),
621 network_slug: "ethereum_mainnet".to_string(),
622 matched_on: MatchConditions {
623 functions: vec![],
624 events: vec![],
625 transactions: vec![],
626 },
627 matched_on_args: None,
628 })),
629 BlockChainType::Stellar => MonitorMatch::Stellar(Box::new(StellarMonitorMatch {
630 monitor: create_test_monitor("test", vec![], false, script_path),
631 transaction: create_test_stellar_transaction(),
632 ledger: create_test_stellar_block(),
633 network_slug: "stellar_mainnet".to_string(),
634 matched_on: MatchConditions {
635 functions: vec![],
636 events: vec![],
637 transactions: vec![],
638 },
639 matched_on_args: None,
640 })),
641 BlockChainType::Midnight => unimplemented!(),
642 BlockChainType::Solana => unimplemented!(),
643 }
644 }
645
646 fn create_mock_monitor_match_from_monitor(
647 blockchain_type: BlockChainType,
648 monitor: Monitor,
649 ) -> MonitorMatch {
650 match blockchain_type {
651 BlockChainType::EVM => MonitorMatch::EVM(Box::new(EVMMonitorMatch {
652 monitor,
653 transaction: create_test_evm_transaction(),
654 receipt: Some(create_test_evm_transaction_receipt()),
655 logs: Some(create_test_evm_logs()),
656 network_slug: "ethereum_mainnet".to_string(),
657 matched_on: MatchConditions {
658 functions: vec![],
659 events: vec![],
660 transactions: vec![],
661 },
662 matched_on_args: None,
663 })),
664 BlockChainType::Stellar => MonitorMatch::Stellar(Box::new(StellarMonitorMatch {
665 monitor,
666 transaction: create_test_stellar_transaction(),
667 ledger: create_test_stellar_block(),
668 network_slug: "stellar_mainnet".to_string(),
669 matched_on: MatchConditions {
670 functions: vec![],
671 events: vec![],
672 transactions: vec![],
673 },
674 matched_on_args: None,
675 })),
676 BlockChainType::Midnight => unimplemented!(),
677 BlockChainType::Solana => unimplemented!(),
678 }
679 }
680
681 fn matches_equal(a: &MonitorMatch, b: &MonitorMatch) -> bool {
682 match (a, b) {
683 (MonitorMatch::EVM(a), MonitorMatch::EVM(b)) => a.monitor.name == b.monitor.name,
684 (MonitorMatch::Stellar(a), MonitorMatch::Stellar(b)) => {
685 a.monitor.name == b.monitor.name
686 }
687 _ => false,
688 }
689 }
690
691 #[test]
692 fn test_has_active_monitors() {
693 let monitors = vec![
694 create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
695 create_test_monitor("2", vec!["ethereum_sepolia"], false, None),
696 create_test_monitor(
697 "3",
698 vec!["ethereum_mainnet", "ethereum_sepolia"],
699 false,
700 None,
701 ),
702 create_test_monitor("4", vec!["stellar_mainnet"], true, None),
703 ];
704
705 assert!(has_active_monitors(
706 &monitors,
707 &"ethereum_mainnet".to_string()
708 ));
709 assert!(has_active_monitors(
710 &monitors,
711 &"ethereum_sepolia".to_string()
712 ));
713 assert!(!has_active_monitors(
714 &monitors,
715 &"solana_mainnet".to_string()
716 ));
717 assert!(!has_active_monitors(
718 &monitors,
719 &"stellar_mainnet".to_string()
720 ));
721 }
722
723 #[test]
724 fn test_filter_active_monitors() {
725 let mut monitors = HashMap::new();
726 monitors.insert(
727 "1".to_string(),
728 create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
729 );
730 monitors.insert(
731 "2".to_string(),
732 create_test_monitor("2", vec!["stellar_mainnet"], true, None),
733 );
734 monitors.insert(
735 "3".to_string(),
736 create_test_monitor("3", vec!["ethereum_mainnet"], false, None),
737 );
738
739 let active_monitors = filter_active_monitors(monitors);
740 assert_eq!(active_monitors.len(), 2);
741 assert!(active_monitors.iter().all(|m| !m.paused));
742 }
743
744 #[test]
745 fn test_filter_network_monitors() {
746 let monitors = vec![
747 create_test_monitor("1", vec!["ethereum_mainnet"], false, None),
748 create_test_monitor("2", vec!["stellar_mainnet"], true, None),
749 create_test_monitor(
750 "3",
751 vec!["ethereum_mainnet", "stellar_mainnet"],
752 false,
753 None,
754 ),
755 ];
756
757 let eth_monitors = filter_network_monitors(&monitors, &"ethereum_mainnet".to_string());
758 assert_eq!(eth_monitors.len(), 2);
759 assert!(eth_monitors
760 .iter()
761 .all(|m| m.networks.contains(&"ethereum_mainnet".to_string())));
762
763 let stellar_monitors = filter_network_monitors(&monitors, &"stellar_mainnet".to_string());
764 assert_eq!(stellar_monitors.len(), 2);
765 assert!(stellar_monitors
766 .iter()
767 .all(|m| m.networks.contains(&"stellar_mainnet".to_string())));
768
769 let sol_monitors = filter_network_monitors(&monitors, &"solana_mainnet".to_string());
770 assert!(sol_monitors.is_empty());
771 }
772
773 #[tokio::test]
774 async fn test_run_trigger_filters_empty_matches() {
775 let matches: Vec<MonitorMatch> = vec![];
777
778 let mut trigger_scripts = HashMap::new();
780 trigger_scripts.insert(
781 "monitor_test-test.py".to_string(), (
783 ScriptLanguage::Python,
784 r#"
785import sys
786import json
787
788input_data = sys.stdin.read()
789data = json.loads(input_data)
790print(False)
791 "#
792 .to_string(),
793 ),
794 );
795
796 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
798 assert!(filtered.is_empty());
799 }
800
801 #[tokio::test]
802 async fn test_run_trigger_filters_true_condition() {
803 let script_content = r#"
804import sys
805import json
806
807input_json = sys.argv[1]
808data = json.loads(input_json)
809print("debugging...")
810def test():
811 return True
812result = test()
813print(result)
814"#;
815 let temp_file = create_temp_script(script_content);
816 let mut trigger_scripts = HashMap::new();
817 trigger_scripts.insert(
818 format!("test-{}", temp_file.path().to_str().unwrap()),
819 (ScriptLanguage::Python, script_content.to_string()),
820 );
821 let match_item = create_mock_monitor_match_from_path(
822 BlockChainType::EVM,
823 Some(temp_file.path().to_str().unwrap()),
824 );
825 let matches = vec![match_item.clone()];
826
827 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
828 assert_eq!(filtered.len(), 1);
829 assert!(matches_equal(&filtered[0], &match_item));
830 }
831
832 #[tokio::test]
833 async fn test_run_trigger_filters_false_condition() {
834 let script_content = r#"
835import sys
836import json
837
838input_data = sys.stdin.read()
839data = json.loads(input_data)
840print("debugging...")
841def test():
842 return False
843result = test()
844print(result)
845"#;
846 let temp_file = create_temp_script(script_content);
847 let mut trigger_scripts = HashMap::new();
848 trigger_scripts.insert(
849 format!("test-{}", temp_file.path().to_str().unwrap()),
850 (ScriptLanguage::Python, script_content.to_string()),
851 );
852 let match_item = create_mock_monitor_match_from_path(
853 BlockChainType::EVM,
854 Some(temp_file.path().to_str().unwrap()),
855 );
856 let matches = vec![match_item.clone()];
857
858 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
859 assert_eq!(filtered.len(), 1);
860 }
861
862 #[tokio::test]
863 async fn test_execute_trigger_condition_returns_false() {
864 let script_content = r#"print(False) # Script returns false"#;
865 let temp_file = create_temp_script(script_content);
866 let trigger_condition = TriggerConditions {
867 language: ScriptLanguage::Python,
868 script_path: temp_file.path().to_str().unwrap().to_string(),
869 timeout_ms: 1000,
870 arguments: None,
871 };
872 let match_item = create_mock_monitor_match_from_path(
873 BlockChainType::EVM,
874 Some(temp_file.path().to_str().unwrap()),
875 );
876 let script_content = (ScriptLanguage::Python, script_content.to_string());
877
878 let result =
879 execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
880 assert!(!result); }
882
883 #[tokio::test]
884 async fn test_execute_trigger_condition_script_error() {
885 let script_content = r#"raise Exception("Test error") # Raise an error"#;
886 let temp_file = create_temp_script(script_content);
887 let trigger_condition = TriggerConditions {
888 language: ScriptLanguage::Python,
889 script_path: temp_file.path().to_str().unwrap().to_string(),
890 timeout_ms: 1000,
891 arguments: None,
892 };
893 let match_item = create_mock_monitor_match_from_path(
894 BlockChainType::EVM,
895 Some(temp_file.path().to_str().unwrap()),
896 );
897 let script_content = (ScriptLanguage::Python, script_content.to_string());
898
899 let result =
900 execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
901 assert!(!result); }
903
904 #[tokio::test]
905 async fn test_execute_trigger_condition_invalid_script() {
906 let trigger_condition = TriggerConditions {
907 language: ScriptLanguage::Python,
908 script_path: "non_existent_script.py".to_string(),
909 timeout_ms: 1000,
910 arguments: None,
911 };
912 let match_item = create_mock_monitor_match_from_path(
913 BlockChainType::EVM,
914 Some("non_existent_script.py"),
915 );
916 let script_content = (ScriptLanguage::Python, "invalid script content".to_string());
917
918 let result =
919 execute_trigger_condition(&trigger_condition, &match_item, &script_content).await;
920 assert!(!result); }
922
923 #[tokio::test]
924 async fn test_run_trigger_filters_multiple_conditions_keep_match() {
925 let monitor = MonitorBuilder::new()
927 .name("monitor_test")
928 .networks(vec!["ethereum_mainnet".to_string()])
929 .trigger_condition("test1.py", 1000, ScriptLanguage::Python, None)
930 .trigger_condition("test2.py", 1000, ScriptLanguage::Python, None)
931 .build();
932
933 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
935
936 let mut trigger_scripts = HashMap::new();
937 trigger_scripts.insert(
938 "monitor_test|test1.py".to_string(),
939 (
940 ScriptLanguage::Python,
941 r#"
942import sys
943import json
944
945input_data = sys.stdin.read()
946data = json.loads(input_data)
947print(True)
948"#
949 .to_string(),
950 ),
951 );
952 trigger_scripts.insert(
953 "monitor_test|test2.py".to_string(),
954 (
955 ScriptLanguage::Python,
956 r#"
957import sys
958import json
959input_data = sys.stdin.read()
960data = json.loads(input_data)
961print(True)
962"#
963 .to_string(),
964 ),
965 );
966
967 let matches = vec![match_item.clone()];
969 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
970
971 assert_eq!(filtered.len(), 0);
972 }
973
974 #[tokio::test]
975 async fn test_run_trigger_filters_condition_two_combinations_exclude_match() {
976 let monitor = MonitorBuilder::new()
977 .name("monitor_test")
978 .networks(vec!["ethereum_mainnet".to_string()])
979 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
980 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
981 .build();
982
983 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
984
985 let mut trigger_scripts = HashMap::new();
987 trigger_scripts.insert(
988 "monitor_test|condition1.py".to_string(),
989 (ScriptLanguage::Python, "print(True)".to_string()),
990 );
991 trigger_scripts.insert(
992 "monitor_test|condition2.py".to_string(),
993 (ScriptLanguage::Python, "print(False)".to_string()),
994 );
995
996 let matches = vec![match_item.clone()];
997 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
998 assert_eq!(filtered.len(), 0);
999 }
1000
1001 #[tokio::test]
1002 async fn test_run_trigger_filters_condition_two_combinations_keep_match() {
1003 let monitor = MonitorBuilder::new()
1004 .name("monitor_test")
1005 .networks(vec!["ethereum_mainnet".to_string()])
1006 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1007 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1008 .build();
1009
1010 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1011
1012 let mut trigger_scripts = HashMap::new();
1013 trigger_scripts.insert(
1014 "monitor_test|condition1.py".to_string(),
1015 (ScriptLanguage::Python, "print(False)".to_string()),
1016 );
1017 trigger_scripts.insert(
1018 "monitor_test|condition2.py".to_string(),
1019 (ScriptLanguage::Python, "print(False)".to_string()),
1020 );
1021
1022 let matches = vec![match_item.clone()];
1023 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1024 assert_eq!(filtered.len(), 1);
1025 }
1026
1027 #[tokio::test]
1028 async fn test_run_trigger_filters_condition_two_combinations_exclude_match_last_condition() {
1029 let monitor = MonitorBuilder::new()
1030 .name("monitor_test")
1031 .networks(vec!["ethereum_mainnet".to_string()])
1032 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1033 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1034 .build();
1035
1036 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1037
1038 let mut trigger_scripts = HashMap::new();
1039 trigger_scripts.insert(
1040 "monitor_test|condition1.py".to_string(),
1041 (ScriptLanguage::Python, "print(False)".to_string()),
1042 );
1043 trigger_scripts.insert(
1044 "monitor_test|condition2.py".to_string(),
1045 (ScriptLanguage::Python, "print(True)".to_string()),
1046 );
1047
1048 let matches = vec![match_item.clone()];
1049 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1050 assert_eq!(filtered.len(), 0);
1051 }
1052
1053 #[tokio::test]
1054 async fn test_run_trigger_filters_condition_three_combinations_exclude_match() {
1055 let monitor = MonitorBuilder::new()
1056 .name("monitor_test")
1057 .networks(vec!["ethereum_mainnet".to_string()])
1058 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1059 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1060 .trigger_condition("condition3.py", 1000, ScriptLanguage::Python, None)
1061 .build();
1062
1063 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1064
1065 let mut trigger_scripts = HashMap::new();
1066 trigger_scripts.insert(
1067 "monitor_test|condition1.py".to_string(),
1068 (ScriptLanguage::Python, "print(False)".to_string()),
1069 );
1070 trigger_scripts.insert(
1071 "monitor_test|condition2.py".to_string(),
1072 (ScriptLanguage::Python, "print(False)".to_string()),
1073 );
1074 trigger_scripts.insert(
1075 "monitor_test|condition3.py".to_string(),
1076 (ScriptLanguage::Python, "print(True)".to_string()),
1077 );
1078
1079 let matches = vec![match_item.clone()];
1080 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1081 assert_eq!(filtered.len(), 0);
1082 }
1083
1084 #[tokio::test]
1085 async fn test_run_trigger_filters_condition_three_combinations_keep_match() {
1086 let monitor = MonitorBuilder::new()
1087 .name("monitor_test")
1088 .networks(vec!["ethereum_mainnet".to_string()])
1089 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1090 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1091 .trigger_condition("condition3.py", 1000, ScriptLanguage::Python, None)
1092 .build();
1093
1094 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::EVM, monitor);
1095
1096 let mut trigger_scripts = HashMap::new();
1097 trigger_scripts.insert(
1098 "monitor_test|condition1.py".to_string(),
1099 (ScriptLanguage::Python, "print(False)".to_string()),
1100 );
1101 trigger_scripts.insert(
1102 "monitor_test|condition2.py".to_string(),
1103 (ScriptLanguage::Python, "print(False)".to_string()),
1104 );
1105 trigger_scripts.insert(
1106 "monitor_test|condition3.py".to_string(),
1107 (ScriptLanguage::Python, "print(False)".to_string()),
1108 );
1109
1110 let matches = vec![match_item.clone()];
1111 let filtered = run_trigger_filters(&matches, "ethereum_mainnet", &trigger_scripts).await;
1112 assert_eq!(filtered.len(), 1);
1113 }
1114
1115 #[tokio::test]
1117 async fn test_run_trigger_filters_stellar_empty_matches() {
1118 let matches: Vec<MonitorMatch> = vec![];
1119 let mut trigger_scripts = HashMap::new();
1120 trigger_scripts.insert(
1121 "monitor_test|test.py".to_string(),
1122 (
1123 ScriptLanguage::Python,
1124 r#"
1125import sys
1126import json
1127
1128input_data = sys.stdin.read()
1129data = json.loads(input_data)
1130print(False)
1131"#
1132 .to_string(),
1133 ),
1134 );
1135
1136 let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1137 assert!(filtered.is_empty());
1138 }
1139
1140 #[tokio::test]
1141 async fn test_run_trigger_filters_stellar_true_condition() {
1142 let script_content = r#"
1143import sys
1144import json
1145
1146input_json = sys.argv[1]
1147data = json.loads(input_json)
1148print("debugging...")
1149def test():
1150 return True
1151result = test()
1152print(result)
1153"#;
1154 let temp_file = create_temp_script(script_content);
1155 let mut trigger_scripts = HashMap::new();
1156 trigger_scripts.insert(
1157 format!("test|{}", temp_file.path().to_str().unwrap()),
1158 (ScriptLanguage::Python, script_content.to_string()),
1159 );
1160 let match_item = create_mock_monitor_match_from_path(
1161 BlockChainType::Stellar,
1162 Some(temp_file.path().to_str().unwrap()),
1163 );
1164 let matches = vec![match_item.clone()];
1165
1166 let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1167 assert_eq!(filtered.len(), 1);
1168 assert!(matches_equal(&filtered[0], &match_item));
1169 }
1170
1171 #[tokio::test]
1172 async fn test_run_trigger_filters_stellar_multiple_conditions() {
1173 let monitor = MonitorBuilder::new()
1174 .name("monitor_test")
1175 .networks(vec!["stellar_mainnet".to_string()])
1176 .trigger_condition("condition1.py", 1000, ScriptLanguage::Python, None)
1177 .trigger_condition("condition2.py", 1000, ScriptLanguage::Python, None)
1178 .build();
1179
1180 let match_item = create_mock_monitor_match_from_monitor(BlockChainType::Stellar, monitor);
1181
1182 let mut trigger_scripts = HashMap::new();
1183 trigger_scripts.insert(
1184 "monitor_test|condition1.py".to_string(),
1185 (ScriptLanguage::Python, "print(False)".to_string()),
1186 );
1187 trigger_scripts.insert(
1188 "monitor_test|condition2.py".to_string(),
1189 (ScriptLanguage::Python, "print(True)".to_string()),
1190 );
1191
1192 let matches = vec![match_item.clone()];
1193 let filtered = run_trigger_filters(&matches, "stellar_mainnet", &trigger_scripts).await;
1194 assert_eq!(filtered.len(), 0); }
1196}