openzeppelin_monitor/utils/monitor/
execution.rs1use crate::{
5 bootstrap::{get_contract_specs, has_active_monitors},
6 models::{BlockChainType, ScriptLanguage},
7 repositories::{
8 MonitorRepositoryTrait, MonitorService, NetworkRepositoryTrait, NetworkService,
9 TriggerRepositoryTrait,
10 },
11 services::{
12 blockchain::{BlockChainClient, ClientPoolTrait},
13 filter::{handle_match, FilterService},
14 trigger::TriggerExecutionService,
15 },
16 utils::monitor::MonitorExecutionError,
17};
18use std::{collections::HashMap, path::Path, sync::Arc};
19use tokio::sync::Mutex;
20use tracing::{info, instrument};
21
22pub struct MonitorExecutionConfig<
36 M: MonitorRepositoryTrait<N, TR>,
37 N: NetworkRepositoryTrait + Send + Sync + 'static,
38 TR: TriggerRepositoryTrait + Send + Sync + 'static,
39 CP: ClientPoolTrait + Send + Sync + 'static,
40> {
41 pub path: String,
42 pub network_slug: Option<String>,
43 pub block_number: Option<u64>,
44 pub monitor_service: Arc<Mutex<MonitorService<M, N, TR>>>,
45 pub network_service: Arc<Mutex<NetworkService<N>>>,
46 pub filter_service: Arc<FilterService>,
47 pub trigger_execution_service: Arc<TriggerExecutionService<TR>>,
48 pub active_monitors_trigger_scripts: HashMap<String, (ScriptLanguage, String)>,
49 pub client_pool: Arc<CP>,
50}
51pub type ExecutionResult<T> = std::result::Result<T, MonitorExecutionError>;
52
53#[instrument(skip_all)]
72#[allow(clippy::too_many_arguments)]
73pub async fn execute_monitor<
74 M: MonitorRepositoryTrait<N, TR>,
75 N: NetworkRepositoryTrait + Send + Sync + 'static,
76 TR: TriggerRepositoryTrait + Send + Sync + 'static,
77 CP: ClientPoolTrait + Send + Sync + 'static,
78>(
79 config: MonitorExecutionConfig<M, N, TR, CP>,
80) -> ExecutionResult<String> {
81 tracing::debug!("Loading monitor configuration");
82 let monitor = config
83 .monitor_service
84 .lock()
85 .await
86 .load_from_path(Some(Path::new(&config.path)), None, None)
87 .await
88 .map_err(|e| MonitorExecutionError::execution_error(e.to_string(), None, None))?;
89
90 tracing::debug!(monitor_name = %monitor.name, "Monitor loaded successfully");
91
92 let networks_for_monitor = if let Some(network_slug) = config.network_slug {
93 tracing::debug!(network = %network_slug, "Finding specific network");
94 let network = config
95 .network_service
96 .lock()
97 .await
98 .get(network_slug.as_str())
99 .ok_or_else(|| {
100 MonitorExecutionError::not_found(
101 format!("Network '{}' not found", network_slug),
102 None,
103 None,
104 )
105 })?;
106 vec![network.clone()]
107 } else {
108 tracing::debug!("Finding all active networks for monitor");
109 config
110 .network_service
111 .lock()
112 .await
113 .get_all()
114 .values()
115 .filter(|network| has_active_monitors(&[monitor.clone()], &network.slug))
116 .cloned()
117 .collect()
118 };
119
120 tracing::debug!(
121 networks_count = networks_for_monitor.len(),
122 "Networks found for monitor"
123 );
124
125 let mut all_matches = Vec::new();
126 for network in networks_for_monitor {
127 tracing::debug!(
128 network_type = ?network.network_type,
129 network_slug = %network.slug,
130 "Processing network"
131 );
132
133 let contract_specs = get_contract_specs(
134 &config.client_pool,
135 &[(network.clone(), vec![monitor.clone()])],
136 )
137 .await;
138
139 let matches = match network.network_type {
140 BlockChainType::EVM => {
141 let client = config
142 .client_pool
143 .get_evm_client(&network)
144 .await
145 .map_err(|e| {
146 MonitorExecutionError::execution_error(
147 format!("Failed to get EVM client: {}", e),
148 None,
149 None,
150 )
151 })?;
152
153 let block_number = match config.block_number {
154 Some(block_number) => {
155 tracing::debug!(block = %block_number, "Using specified block number");
156 block_number
157 }
158 None => {
159 let latest = client.get_latest_block_number().await.map_err(|e| {
160 MonitorExecutionError::execution_error(e.to_string(), None, None)
161 })?;
162 tracing::debug!(block = %latest, "Using latest block number");
163 latest
164 }
165 };
166
167 tracing::debug!(block = %block_number, "Fetching block");
168 let blocks = client.get_blocks(block_number, None).await.map_err(|e| {
169 MonitorExecutionError::execution_error(
170 format!("Failed to get block {}: {}", block_number, e),
171 None,
172 None,
173 )
174 })?;
175
176 let block = blocks.first().ok_or_else(|| {
177 MonitorExecutionError::not_found(
178 format!("Block {} not found", block_number),
179 None,
180 None,
181 )
182 })?;
183
184 tracing::debug!(block = %block_number, "Filtering block");
185 config
186 .filter_service
187 .filter_block(
188 &*client,
189 &network,
190 block,
191 &[monitor.clone()],
192 Some(&contract_specs),
193 )
194 .await
195 .map_err(|e| {
196 MonitorExecutionError::execution_error(
197 format!("Failed to filter block: {}", e),
198 None,
199 None,
200 )
201 })?
202 }
203 BlockChainType::Stellar => {
204 let client = config
205 .client_pool
206 .get_stellar_client(&network)
207 .await
208 .map_err(|e| {
209 MonitorExecutionError::execution_error(
210 format!("Failed to get Stellar client: {}", e),
211 None,
212 None,
213 )
214 })?;
215
216 let block_number = match config.block_number {
218 Some(block_number) => block_number,
219 None => client.get_latest_block_number().await.map_err(|e| {
220 MonitorExecutionError::execution_error(e.to_string(), None, None)
221 })?,
222 };
223
224 let blocks = client.get_blocks(block_number, None).await.map_err(|e| {
225 MonitorExecutionError::execution_error(
226 format!("Failed to get block {}: {}", block_number, e),
227 None,
228 None,
229 )
230 })?;
231
232 let block = blocks.first().ok_or_else(|| {
233 MonitorExecutionError::not_found(
234 format!("Block {} not found", block_number),
235 None,
236 None,
237 )
238 })?;
239
240 config
241 .filter_service
242 .filter_block(
243 &*client,
244 &network,
245 block,
246 &[monitor.clone()],
247 Some(&contract_specs),
248 )
249 .await
250 .map_err(|e| {
251 MonitorExecutionError::execution_error(
252 format!("Failed to filter block: {}", e),
253 None,
254 None,
255 )
256 })?
257 }
258 BlockChainType::Midnight => {
259 return Err(MonitorExecutionError::execution_error(
260 "Midnight network not supported",
261 None,
262 None,
263 ))
264 }
265 BlockChainType::Solana => {
266 return Err(MonitorExecutionError::execution_error(
267 "Solana network not supported",
268 None,
269 None,
270 ))
271 }
272 };
273
274 tracing::debug!(matches_count = matches.len(), "Found matches for network");
275 all_matches.extend(matches);
276 }
277
278 for match_result in all_matches.clone() {
280 let result = handle_match(
281 match_result,
282 &*config.trigger_execution_service,
283 &config.active_monitors_trigger_scripts,
284 )
285 .await;
286 match result {
287 Ok(_result) => info!("Successfully sent notifications for match"),
288 Err(e) => {
289 tracing::error!("Error sending notifications: {}", e);
290 continue;
291 }
292 };
293 }
294
295 tracing::debug!(total_matches = all_matches.len(), "Serializing results");
296 let json_matches = serde_json::to_string(&all_matches).map_err(|e| {
297 MonitorExecutionError::execution_error(
298 format!("Failed to serialize matches: {}", e),
299 None,
300 None,
301 )
302 })?;
303
304 tracing::debug!("Monitor execution completed successfully");
305 Ok(json_matches)
306}