openzeppelin_monitor/services/blockwatcher/
service.rs

1//! Block watcher service implementation.
2//!
3//! Provides functionality to watch and process blockchain blocks across multiple networks,
4//! managing individual watchers for each network and coordinating block processing.
5
6use anyhow::Context;
7use futures::{channel::mpsc, future::BoxFuture, stream::StreamExt, SinkExt};
8use std::{
9	collections::{BTreeMap, HashMap},
10	sync::Arc,
11};
12use tokio::sync::RwLock;
13use tokio_cron_scheduler::{Job, JobScheduler};
14use tracing::instrument;
15
16use crate::{
17	models::{BlockType, Network, ProcessedBlock},
18	services::{
19		blockchain::BlockChainClient,
20		blockwatcher::{
21			error::BlockWatcherError,
22			storage::BlockStorage,
23			tracker::{BlockTracker, BlockTrackerTrait},
24		},
25	},
26};
27
28/// Trait for job scheduler
29///
30/// This trait is used to abstract the job scheduler implementation.
31/// It is used to allow the block watcher service to be used with different job scheduler
32/// implementations.
33#[async_trait::async_trait]
34pub trait JobSchedulerTrait: Send + Sync + Sized {
35	async fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>>;
36	async fn add(&self, job: Job) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
37	async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
38	async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
39}
40
41/// Implementation of the job scheduler trait for the JobScheduler struct
42#[async_trait::async_trait]
43impl JobSchedulerTrait for JobScheduler {
44	async fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
45		Self::new().await.map_err(Into::into)
46	}
47
48	async fn add(&self, job: Job) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
49		self.add(job).await.map(|_| ()).map_err(Into::into)
50	}
51
52	async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
53		self.start().await.map(|_| ()).map_err(Into::into)
54	}
55
56	async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
57		self.shutdown().await.map(|_| ()).map_err(Into::into)
58	}
59}
60
61/// Watcher implementation for a single network
62///
63/// Manages block watching and processing for a specific blockchain network,
64/// including scheduling and block handling.
65///
66/// # Type Parameters
67/// * `S` - Storage implementation for blocks
68/// * `H` - Handler function for processed blocks
69/// * `T` - Trigger handler function for processed blocks
70/// * `J` - Job scheduler implementation (must implement JobSchedulerTrait)
71pub struct NetworkBlockWatcher<S, H, T, J>
72where
73	J: JobSchedulerTrait,
74{
75	pub network: Network,
76	pub block_storage: Arc<S>,
77	pub block_handler: Arc<H>,
78	pub trigger_handler: Arc<T>,
79	pub scheduler: J,
80	pub block_tracker: Arc<BlockTracker<S>>,
81}
82
83/// Map of active block watchers
84type BlockWatchersMap<S, H, T, J> = HashMap<String, NetworkBlockWatcher<S, H, T, J>>;
85
86/// Service for managing multiple network watchers
87///
88/// Coordinates block watching across multiple networks, managing individual
89/// watchers and their lifecycles.
90///
91/// # Type Parameters
92/// * `S` - Storage implementation for blocks
93/// * `H` - Handler function for processed blocks
94/// * `T` - Trigger handler function for processed blocks
95/// * `J` - Job scheduler implementation (must implement JobSchedulerTrait)
96pub struct BlockWatcherService<S, H, T, J>
97where
98	J: JobSchedulerTrait,
99{
100	pub block_storage: Arc<S>,
101	pub block_handler: Arc<H>,
102	pub trigger_handler: Arc<T>,
103	pub active_watchers: Arc<RwLock<BlockWatchersMap<S, H, T, J>>>,
104	pub block_tracker: Arc<BlockTracker<S>>,
105}
106
107impl<S, H, T, J> NetworkBlockWatcher<S, H, T, J>
108where
109	S: BlockStorage + Send + Sync + 'static,
110	H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
111	T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
112	J: JobSchedulerTrait,
113{
114	/// Creates a new network watcher instance
115	///
116	/// # Arguments
117	/// * `network` - Network configuration
118	/// * `block_storage` - Storage implementation for blocks
119	/// * `block_handler` - Handler function for processed blocks
120	///
121	/// # Returns
122	/// * `Result<Self, BlockWatcherError>` - New watcher instance or error
123	pub async fn new(
124		network: Network,
125		block_storage: Arc<S>,
126		block_handler: Arc<H>,
127		trigger_handler: Arc<T>,
128		block_tracker: Arc<BlockTracker<S>>,
129	) -> Result<Self, BlockWatcherError> {
130		let scheduler = J::new().await.map_err(|e| {
131			BlockWatcherError::scheduler_error(
132				e.to_string(),
133				Some(e),
134				Some(HashMap::from([(
135					"network".to_string(),
136					network.slug.clone(),
137				)])),
138			)
139		})?;
140		Ok(Self {
141			network,
142			block_storage,
143			block_handler,
144			trigger_handler,
145			scheduler,
146			block_tracker,
147		})
148	}
149
150	/// Starts the network watcher
151	///
152	/// Initializes the scheduler and begins watching for new blocks according
153	/// to the network's cron schedule.
154	pub async fn start<C: BlockChainClient + Clone + Send + 'static>(
155		&mut self,
156		rpc_client: C,
157	) -> Result<(), BlockWatcherError> {
158		let network = self.network.clone();
159		let block_storage = self.block_storage.clone();
160		let block_handler = self.block_handler.clone();
161		let trigger_handler = self.trigger_handler.clone();
162		let block_tracker = self.block_tracker.clone();
163
164		let job = Job::new_async(self.network.cron_schedule.as_str(), move |_uuid, _l| {
165			let network = network.clone();
166			let block_storage = block_storage.clone();
167			let block_handler = block_handler.clone();
168			let block_tracker = block_tracker.clone();
169			let rpc_client = rpc_client.clone();
170			let trigger_handler = trigger_handler.clone();
171			Box::pin(async move {
172				let _ = process_new_blocks(
173					&network,
174					&rpc_client,
175					block_storage,
176					block_handler,
177					trigger_handler,
178					block_tracker,
179				)
180				.await
181				.map_err(|e| {
182					BlockWatcherError::processing_error(
183						"Failed to process blocks".to_string(),
184						Some(e.into()),
185						Some(HashMap::from([(
186							"network".to_string(),
187							network.slug.clone(),
188						)])),
189					)
190				});
191			})
192		})
193		.with_context(|| "Failed to create job")?;
194
195		self.scheduler.add(job).await.map_err(|e| {
196			BlockWatcherError::scheduler_error(
197				e.to_string(),
198				Some(e),
199				Some(HashMap::from([(
200					"network".to_string(),
201					self.network.slug.clone(),
202				)])),
203			)
204		})?;
205
206		self.scheduler.start().await.map_err(|e| {
207			BlockWatcherError::scheduler_error(
208				e.to_string(),
209				Some(e),
210				Some(HashMap::from([(
211					"network".to_string(),
212					self.network.slug.clone(),
213				)])),
214			)
215		})?;
216
217		tracing::info!("Started block watcher for network: {}", self.network.slug);
218		Ok(())
219	}
220
221	/// Stops the network watcher
222	///
223	/// Shuts down the scheduler and stops watching for new blocks.
224	pub async fn stop(&mut self) -> Result<(), BlockWatcherError> {
225		self.scheduler.shutdown().await.map_err(|e| {
226			BlockWatcherError::scheduler_error(
227				e.to_string(),
228				Some(e),
229				Some(HashMap::from([(
230					"network".to_string(),
231					self.network.slug.clone(),
232				)])),
233			)
234		})?;
235
236		tracing::info!("Stopped block watcher for network: {}", self.network.slug);
237		Ok(())
238	}
239}
240
241impl<S, H, T, J> BlockWatcherService<S, H, T, J>
242where
243	S: BlockStorage + Send + Sync + 'static,
244	H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
245	T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
246	J: JobSchedulerTrait,
247{
248	/// Creates a new block watcher service
249	///
250	/// # Arguments
251	/// * `network_service` - Service for network operations
252	/// * `block_storage` - Storage implementation for blocks
253	/// * `block_handler` - Handler function for processed blocks
254	pub async fn new(
255		block_storage: Arc<S>,
256		block_handler: Arc<H>,
257		trigger_handler: Arc<T>,
258		block_tracker: Arc<BlockTracker<S>>,
259	) -> Result<Self, BlockWatcherError> {
260		Ok(BlockWatcherService {
261			block_storage,
262			block_handler,
263			trigger_handler,
264			active_watchers: Arc::new(RwLock::new(HashMap::new())),
265			block_tracker,
266		})
267	}
268
269	/// Starts a watcher for a specific network
270	///
271	/// # Arguments
272	/// * `network` - Network configuration to start watching
273	pub async fn start_network_watcher<C: BlockChainClient + Send + Clone + 'static>(
274		&self,
275		network: &Network,
276		rpc_client: C,
277	) -> Result<(), BlockWatcherError> {
278		let mut watchers = self.active_watchers.write().await;
279
280		if watchers.contains_key(&network.slug) {
281			tracing::info!(
282				"Block watcher already running for network: {}",
283				network.slug
284			);
285			return Ok(());
286		}
287
288		let mut watcher = NetworkBlockWatcher::new(
289			network.clone(),
290			self.block_storage.clone(),
291			self.block_handler.clone(),
292			self.trigger_handler.clone(),
293			self.block_tracker.clone(),
294		)
295		.await?;
296
297		watcher.start(rpc_client).await?;
298		watchers.insert(network.slug.clone(), watcher);
299
300		Ok(())
301	}
302
303	/// Stops a watcher for a specific network
304	///
305	/// # Arguments
306	/// * `network_slug` - Identifier of the network to stop watching
307	pub async fn stop_network_watcher(&self, network_slug: &str) -> Result<(), BlockWatcherError> {
308		let mut watchers = self.active_watchers.write().await;
309
310		if let Some(mut watcher) = watchers.remove(network_slug) {
311			watcher.stop().await?;
312		}
313
314		Ok(())
315	}
316}
317
318/// Processes new blocks for a network
319///
320/// # Arguments
321/// * `network` - Network configuration
322/// * `rpc_client` - RPC client for the network
323/// * `block_storage` - Storage implementation for blocks
324/// * `block_handler` - Handler function for processed blocks
325/// * `trigger_handler` - Handler function for processed blocks
326/// * `block_tracker` - Tracker implementation for block processing
327///
328/// # Returns
329/// * `Result<(), BlockWatcherError>` - Success or error
330#[instrument(skip_all, fields(network = network.slug))]
331pub async fn process_new_blocks<
332	S: BlockStorage,
333	C: BlockChainClient + Send + Clone + 'static,
334	H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
335	T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
336	TR: BlockTrackerTrait<S>,
337>(
338	network: &Network,
339	rpc_client: &C,
340	block_storage: Arc<S>,
341	block_handler: Arc<H>,
342	trigger_handler: Arc<T>,
343	block_tracker: Arc<TR>,
344) -> Result<(), BlockWatcherError> {
345	let start_time = std::time::Instant::now();
346
347	let last_processed_block = block_storage
348		.get_last_processed_block(&network.slug)
349		.await
350		.with_context(|| "Failed to get last processed block")?
351		.unwrap_or(0);
352
353	let latest_block = rpc_client
354		.get_latest_block_number()
355		.await
356		.with_context(|| "Failed to get latest block number")?;
357
358	let latest_confirmed_block = latest_block.saturating_sub(network.confirmation_blocks);
359
360	let recommended_past_blocks = network.get_recommended_past_blocks();
361
362	let max_past_blocks = network.max_past_blocks.unwrap_or(recommended_past_blocks);
363
364	// Calculate the start block number, using the default if max_past_blocks is not set
365	let start_block = std::cmp::max(
366		last_processed_block + 1,
367		latest_confirmed_block.saturating_sub(max_past_blocks),
368	);
369
370	tracing::info!(
371		"Processing blocks:\n\tLast processed block: {}\n\tLatest confirmed block: {}\n\tStart \
372		 block: {}{}\n\tConfirmations required: {}\n\tMax past blocks: {}",
373		last_processed_block,
374		latest_confirmed_block,
375		start_block,
376		if start_block > last_processed_block + 1 {
377			format!(
378				" (skipped {} blocks)",
379				start_block - (last_processed_block + 1)
380			)
381		} else {
382			String::new()
383		},
384		network.confirmation_blocks,
385		max_past_blocks
386	);
387
388	let mut blocks = Vec::new();
389	if last_processed_block == 0 {
390		blocks = rpc_client
391			.get_blocks(latest_confirmed_block, None)
392			.await
393			.with_context(|| format!("Failed to get block {}", latest_confirmed_block))?;
394	} else if last_processed_block < latest_confirmed_block {
395		blocks = rpc_client
396			.get_blocks(start_block, Some(latest_confirmed_block))
397			.await
398			.with_context(|| {
399				format!(
400					"Failed to get blocks from {} to {}",
401					start_block, latest_confirmed_block
402				)
403			})?;
404	}
405
406	// Create channels for our pipeline
407	let (process_tx, process_rx) = mpsc::channel::<(BlockType, u64)>(blocks.len() * 2);
408	let (trigger_tx, trigger_rx) = mpsc::channel::<ProcessedBlock>(blocks.len() * 2);
409
410	// Stage 1: Block Processing Pipeline
411	let process_handle = tokio::spawn({
412		let network = network.clone();
413		let block_handler = block_handler.clone();
414		let mut trigger_tx = trigger_tx.clone();
415
416		async move {
417			// Process blocks concurrently, up to 32 at a time
418			let mut results = process_rx
419				.map(|(block, _)| {
420					let network = network.clone();
421					let block_handler = block_handler.clone();
422					async move { (block_handler)(block, network).await }
423				})
424				.buffer_unordered(32);
425
426			// Process all results and send them to trigger channel
427			while let Some(result) = results.next().await {
428				trigger_tx
429					.send(result)
430					.await
431					.with_context(|| "Failed to send processed block")?;
432			}
433
434			Ok::<(), BlockWatcherError>(())
435		}
436	});
437
438	// Stage 2: Trigger Pipeline
439	let trigger_handle = tokio::spawn({
440		let trigger_handler = trigger_handler.clone();
441
442		async move {
443			let mut trigger_rx = trigger_rx;
444			let mut pending_blocks = BTreeMap::new();
445			let mut next_block_number = Some(start_block);
446
447			// Process all incoming blocks
448			while let Some(processed_block) = trigger_rx.next().await {
449				let block_number = processed_block.block_number;
450				pending_blocks.insert(block_number, processed_block);
451
452				// Process blocks in order as long as we have the next expected block
453				while let Some(expected) = next_block_number {
454					if let Some(block) = pending_blocks.remove(&expected) {
455						(trigger_handler)(&block);
456						next_block_number = Some(expected + 1);
457					} else {
458						break;
459					}
460				}
461			}
462
463			// Process any remaining blocks in order after the channel is closed
464			while let Some(min_block) = pending_blocks.keys().next().copied() {
465				if let Some(block) = pending_blocks.remove(&min_block) {
466					(trigger_handler)(&block);
467				}
468			}
469			Ok::<(), BlockWatcherError>(())
470		}
471	});
472
473	// Feed blocks into the pipeline
474	futures::future::join_all(blocks.iter().map(|block| {
475		let network = network.clone();
476		let block_tracker = block_tracker.clone();
477		let mut process_tx = process_tx.clone();
478		async move {
479			let block_number = block.number().unwrap_or(0);
480
481			// Record block in tracker
482			block_tracker.record_block(&network, block_number).await?;
483
484			// Send block to processing pipeline
485			process_tx
486				.send((block.clone(), block_number))
487				.await
488				.with_context(|| "Failed to send block to pipeline")?;
489
490			Ok::<(), BlockWatcherError>(())
491		}
492	}))
493	.await
494	.into_iter()
495	.collect::<Result<Vec<_>, _>>()
496	.with_context(|| format!("Failed to process blocks for network {}", network.slug))?;
497
498	// Drop the sender after all blocks are sent
499	drop(process_tx);
500	drop(trigger_tx);
501
502	// Wait for both pipeline stages to complete
503	let (_process_result, _trigger_result) = tokio::join!(process_handle, trigger_handle);
504
505	if network.store_blocks.unwrap_or(false) {
506		// Delete old blocks before saving new ones
507		block_storage
508			.delete_blocks(&network.slug)
509			.await
510			.with_context(|| "Failed to delete old blocks")?;
511
512		block_storage
513			.save_blocks(&network.slug, &blocks)
514			.await
515			.with_context(|| "Failed to save blocks")?;
516	}
517	// Update the last processed block
518	block_storage
519		.save_last_processed_block(&network.slug, latest_confirmed_block)
520		.await
521		.with_context(|| "Failed to save last processed block")?;
522
523	tracing::info!(
524		"Processed {} blocks in {}ms",
525		blocks.len(),
526		start_time.elapsed().as_millis()
527	);
528
529	Ok(())
530}