openzeppelin_monitor/utils/monitor/
execution.rs

1//! Execution monitor module
2//!
3//! This module provides functionality to execute monitors against specific block numbers on blockchain networks.
4use crate::{
5	bootstrap::{get_contract_specs, has_active_monitors},
6	models::{BlockChainType, ScriptLanguage},
7	repositories::{
8		MonitorRepositoryTrait, MonitorService, NetworkRepositoryTrait, NetworkService,
9		TriggerRepositoryTrait,
10	},
11	services::{
12		blockchain::{BlockChainClient, ClientPoolTrait},
13		filter::{handle_match, FilterService},
14		trigger::TriggerExecutionService,
15	},
16	utils::monitor::MonitorExecutionError,
17};
18use std::{collections::HashMap, path::Path, sync::Arc};
19use tokio::sync::Mutex;
20use tracing::{info, instrument};
21
22/// Configuration for executing a monitor
23///
24/// # Arguments
25///
26/// * `path` - The path to the monitor to execute
27/// * `network_slug` - The network slug to execute the monitor against
28/// * `block_number` - The block number to execute the monitor against
29/// * `monitor_service` - The monitor service to use
30/// * `network_service` - The network service to use
31/// * `filter_service` - The filter service to use
32/// * `trigger_execution_service` - The trigger execution service to use
33/// * `active_monitors_trigger_scripts` - The active monitors trigger scripts to use
34/// * `client_pool` - The client pool to use
35pub struct MonitorExecutionConfig<
36	M: MonitorRepositoryTrait<N, TR>,
37	N: NetworkRepositoryTrait + Send + Sync + 'static,
38	TR: TriggerRepositoryTrait + Send + Sync + 'static,
39	CP: ClientPoolTrait + Send + Sync + 'static,
40> {
41	pub path: String,
42	pub network_slug: Option<String>,
43	pub block_number: Option<u64>,
44	pub monitor_service: Arc<Mutex<MonitorService<M, N, TR>>>,
45	pub network_service: Arc<Mutex<NetworkService<N>>>,
46	pub filter_service: Arc<FilterService>,
47	pub trigger_execution_service: Arc<TriggerExecutionService<TR>>,
48	pub active_monitors_trigger_scripts: HashMap<String, (ScriptLanguage, String)>,
49	pub client_pool: Arc<CP>,
50}
51pub type ExecutionResult<T> = std::result::Result<T, MonitorExecutionError>;
52
53/// Executes a monitor against a specific block number on a blockchain network.
54///
55/// This function allows testing monitors by running them against historical blocks.
56/// It supports both EVM and Stellar networks, retrieving the block data and applying
57/// the monitor's filters to check for matches.
58///
59/// # Arguments
60///
61/// * `monitor_name` - The name of the monitor to execute
62/// * `network_slug` - The network identifier to run the monitor against
63/// * `block_number` - The specific block number to analyze
64/// * `active_monitors` - List of currently active monitors
65/// * `network_service` - The network service to use
66/// * `filter_service` - The filter service to use
67/// * `client_pool` - The client pool to use
68///
69/// # Returns
70/// * `Result<String, ExecutionError>` - JSON string containing matches or error
71#[instrument(skip_all)]
72#[allow(clippy::too_many_arguments)]
73pub async fn execute_monitor<
74	M: MonitorRepositoryTrait<N, TR>,
75	N: NetworkRepositoryTrait + Send + Sync + 'static,
76	TR: TriggerRepositoryTrait + Send + Sync + 'static,
77	CP: ClientPoolTrait + Send + Sync + 'static,
78>(
79	config: MonitorExecutionConfig<M, N, TR, CP>,
80) -> ExecutionResult<String> {
81	tracing::debug!("Loading monitor configuration");
82	let monitor = config
83		.monitor_service
84		.lock()
85		.await
86		.load_from_path(Some(Path::new(&config.path)), None, None)
87		.await
88		.map_err(|e| MonitorExecutionError::execution_error(e.to_string(), None, None))?;
89
90	tracing::debug!(monitor_name = %monitor.name, "Monitor loaded successfully");
91
92	let networks_for_monitor = if let Some(network_slug) = config.network_slug {
93		tracing::debug!(network = %network_slug, "Finding specific network");
94		let network = config
95			.network_service
96			.lock()
97			.await
98			.get(network_slug.as_str())
99			.ok_or_else(|| {
100				MonitorExecutionError::not_found(
101					format!("Network '{}' not found", network_slug),
102					None,
103					None,
104				)
105			})?;
106		vec![network.clone()]
107	} else {
108		tracing::debug!("Finding all active networks for monitor");
109		config
110			.network_service
111			.lock()
112			.await
113			.get_all()
114			.values()
115			.filter(|network| has_active_monitors(&[monitor.clone()], &network.slug))
116			.cloned()
117			.collect()
118	};
119
120	tracing::debug!(
121		networks_count = networks_for_monitor.len(),
122		"Networks found for monitor"
123	);
124
125	let mut all_matches = Vec::new();
126	for network in networks_for_monitor {
127		tracing::debug!(
128			network_type = ?network.network_type,
129			network_slug = %network.slug,
130			"Processing network"
131		);
132
133		let contract_specs = get_contract_specs(
134			&config.client_pool,
135			&[(network.clone(), vec![monitor.clone()])],
136		)
137		.await;
138
139		let matches = match network.network_type {
140			BlockChainType::EVM => {
141				let client = config
142					.client_pool
143					.get_evm_client(&network)
144					.await
145					.map_err(|e| {
146						MonitorExecutionError::execution_error(
147							format!("Failed to get EVM client: {}", e),
148							None,
149							None,
150						)
151					})?;
152
153				let block_number = match config.block_number {
154					Some(block_number) => {
155						tracing::debug!(block = %block_number, "Using specified block number");
156						block_number
157					}
158					None => {
159						let latest = client.get_latest_block_number().await.map_err(|e| {
160							MonitorExecutionError::execution_error(e.to_string(), None, None)
161						})?;
162						tracing::debug!(block = %latest, "Using latest block number");
163						latest
164					}
165				};
166
167				tracing::debug!(block = %block_number, "Fetching block");
168				let blocks = client.get_blocks(block_number, None).await.map_err(|e| {
169					MonitorExecutionError::execution_error(
170						format!("Failed to get block {}: {}", block_number, e),
171						None,
172						None,
173					)
174				})?;
175
176				let block = blocks.first().ok_or_else(|| {
177					MonitorExecutionError::not_found(
178						format!("Block {} not found", block_number),
179						None,
180						None,
181					)
182				})?;
183
184				tracing::debug!(block = %block_number, "Filtering block");
185				config
186					.filter_service
187					.filter_block(
188						&*client,
189						&network,
190						block,
191						&[monitor.clone()],
192						Some(&contract_specs),
193					)
194					.await
195					.map_err(|e| {
196						MonitorExecutionError::execution_error(
197							format!("Failed to filter block: {}", e),
198							None,
199							None,
200						)
201					})?
202			}
203			BlockChainType::Stellar => {
204				let client = config
205					.client_pool
206					.get_stellar_client(&network)
207					.await
208					.map_err(|e| {
209						MonitorExecutionError::execution_error(
210							format!("Failed to get Stellar client: {}", e),
211							None,
212							None,
213						)
214					})?;
215
216				// If block number is not provided, get the latest block number
217				let block_number = match config.block_number {
218					Some(block_number) => block_number,
219					None => client.get_latest_block_number().await.map_err(|e| {
220						MonitorExecutionError::execution_error(e.to_string(), None, None)
221					})?,
222				};
223
224				let blocks = client.get_blocks(block_number, None).await.map_err(|e| {
225					MonitorExecutionError::execution_error(
226						format!("Failed to get block {}: {}", block_number, e),
227						None,
228						None,
229					)
230				})?;
231
232				let block = blocks.first().ok_or_else(|| {
233					MonitorExecutionError::not_found(
234						format!("Block {} not found", block_number),
235						None,
236						None,
237					)
238				})?;
239
240				config
241					.filter_service
242					.filter_block(
243						&*client,
244						&network,
245						block,
246						&[monitor.clone()],
247						Some(&contract_specs),
248					)
249					.await
250					.map_err(|e| {
251						MonitorExecutionError::execution_error(
252							format!("Failed to filter block: {}", e),
253							None,
254							None,
255						)
256					})?
257			}
258			BlockChainType::Midnight => {
259				return Err(MonitorExecutionError::execution_error(
260					"Midnight network not supported",
261					None,
262					None,
263				))
264			}
265			BlockChainType::Solana => {
266				return Err(MonitorExecutionError::execution_error(
267					"Solana network not supported",
268					None,
269					None,
270				))
271			}
272		};
273
274		tracing::debug!(matches_count = matches.len(), "Found matches for network");
275		all_matches.extend(matches);
276	}
277
278	// Send notifications for each match
279	for match_result in all_matches.clone() {
280		let result = handle_match(
281			match_result,
282			&*config.trigger_execution_service,
283			&config.active_monitors_trigger_scripts,
284		)
285		.await;
286		match result {
287			Ok(_result) => info!("Successfully sent notifications for match"),
288			Err(e) => {
289				tracing::error!("Error sending notifications: {}", e);
290				continue;
291			}
292		};
293	}
294
295	tracing::debug!(total_matches = all_matches.len(), "Serializing results");
296	let json_matches = serde_json::to_string(&all_matches).map_err(|e| {
297		MonitorExecutionError::execution_error(
298			format!("Failed to serialize matches: {}", e),
299			None,
300			None,
301		)
302	})?;
303
304	tracing::debug!("Monitor execution completed successfully");
305	Ok(json_matches)
306}