openzeppelin_monitor/services/blockwatcher/
tracker.rs

1//! Block tracking functionality for monitoring blockchain networks.
2//!
3//! This module provides tools for tracking processed blocks across different networks
4//! and identifying potential issues such as:
5//! - Missed blocks
6//! - Out-of-order block processing
7//! - Duplicate block processing
8//!
9//! The primary component is the [`BlockTracker`] which maintains a history of
10//! recently processed blocks and can optionally persist information about missed
11//! blocks using a storage implementation.
12
13use async_trait::async_trait;
14use std::{
15	collections::{HashMap, VecDeque},
16	sync::Arc,
17};
18use tokio::sync::Mutex;
19
20use crate::{
21	models::Network,
22	services::blockwatcher::{error::BlockWatcherError, storage::BlockStorage},
23};
24
25/// Trait for the BlockTracker
26///
27/// This trait defines the interface for the BlockTracker.
28#[async_trait]
29pub trait BlockTrackerTrait<S: BlockStorage> {
30	fn new(history_size: usize, storage: Option<Arc<S>>) -> Self;
31	async fn record_block(&self, network: &Network, block_number: u64)
32		-> Result<(), anyhow::Error>;
33	async fn get_last_block(&self, network_slug: &str) -> Option<u64>;
34}
35
36/// BlockTracker is responsible for monitoring the sequence of processed blocks
37/// across different networks and identifying any gaps or irregularities in block processing.
38///
39/// It maintains a history of recently processed blocks for each network and can optionally
40/// persist information about missed blocks using the provided storage implementation.
41///
42/// # Type Parameters
43///
44/// * `S` - A type that implements the `BlockStorage` trait for persisting missed block information
45#[derive(Clone)]
46pub struct BlockTracker<S> {
47	/// Tracks the last N blocks processed for each network
48	/// Key: network_slug, Value: Queue of block numbers
49	block_history: Arc<Mutex<HashMap<String, VecDeque<u64>>>>,
50	/// Maximum number of blocks to keep in history per network
51	history_size: usize,
52	/// Storage interface for persisting missed blocks
53	storage: Option<Arc<S>>,
54}
55
56#[async_trait]
57impl<S: BlockStorage> BlockTrackerTrait<S> for BlockTracker<S> {
58	/// Creates a new BlockTracker instance.
59	///
60	/// # Arguments
61	///
62	/// * `history_size` - The maximum number of recent blocks to track per network
63	/// * `storage` - Optional storage implementation for persisting missed block information
64	///
65	/// # Returns
66	///
67	/// A new `BlockTracker` instance
68	fn new(history_size: usize, storage: Option<Arc<S>>) -> Self {
69		Self {
70			block_history: Arc::new(Mutex::new(HashMap::new())),
71			history_size,
72			storage,
73		}
74	}
75
76	/// Records a processed block and identifies any gaps in block sequence.
77	///
78	/// This method performs several checks:
79	/// - Detects gaps between the last processed block and the current block
80	/// - Identifies out-of-order or duplicate blocks
81	/// - Stores information about missed blocks if storage is configured
82	///
83	/// # Arguments
84	///
85	/// * `network` - The network information for the processed block
86	/// * `block_number` - The block number being recorded
87	///
88	/// # Warning
89	///
90	/// This method will log warnings for out-of-order blocks and errors for missed blocks.
91	async fn record_block(
92		&self,
93		network: &Network,
94		block_number: u64,
95	) -> Result<(), anyhow::Error> {
96		let mut history = self.block_history.lock().await;
97		let network_history = history
98			.entry(network.slug.clone())
99			.or_insert_with(|| VecDeque::with_capacity(self.history_size));
100
101		// Check for gaps if we have previous blocks
102		if let Some(&last_block) = network_history.back() {
103			if block_number > last_block + 1 {
104				// Log each missed block number
105				for missed in (last_block + 1)..block_number {
106					BlockWatcherError::block_tracker_error(
107						format!("Missed block {}", missed),
108						None,
109						None,
110					);
111
112					if network.store_blocks.unwrap_or(false) {
113						if let Some(storage) = &self.storage {
114							// Store the missed block info
115							if (storage.save_missed_block(&network.slug, missed).await).is_err() {
116								BlockWatcherError::storage_error(
117									format!("Failed to store missed block {}", missed),
118									None,
119									None,
120								);
121							}
122						}
123					}
124				}
125			} else if block_number <= last_block {
126				BlockWatcherError::block_tracker_error(
127					format!(
128						"Out of order or duplicate block detected: received {} after {}",
129						block_number, last_block
130					),
131					None,
132					None,
133				);
134			}
135		}
136
137		// Add the new block to history
138		network_history.push_back(block_number);
139
140		// Maintain history size
141		while network_history.len() > self.history_size {
142			network_history.pop_front();
143		}
144		Ok(())
145	}
146
147	/// Retrieves the most recently processed block number for a given network.
148	///
149	/// # Arguments
150	///
151	/// * `network_slug` - The unique identifier for the network
152	///
153	/// # Returns
154	///
155	/// Returns `Some(block_number)` if blocks have been processed for the network,
156	/// otherwise returns `None`.
157	async fn get_last_block(&self, network_slug: &str) -> Option<u64> {
158		self.block_history
159			.lock()
160			.await
161			.get(network_slug)
162			.and_then(|history| history.back().copied())
163	}
164}
165
166#[cfg(test)]
167mod tests {
168	use crate::{models::BlockType, utils::tests::network::NetworkBuilder};
169
170	use super::*;
171	use mockall::mock;
172
173	// Create mock storage
174	mock! {
175		pub BlockStorage {}
176		#[async_trait::async_trait]
177		impl BlockStorage for BlockStorage {
178			async fn save_missed_block(&self, network_slug: &str, block_number: u64) -> Result<(), anyhow::Error>;
179			async fn save_last_processed_block(&self, network_slug: &str, block_number: u64) -> Result<(), anyhow::Error>;
180			async fn get_last_processed_block(&self, network_slug: &str) -> Result<Option<u64>, anyhow::Error>;
181			async fn save_blocks(&self, network_slug: &str, blocks: &[BlockType]) -> Result<(), anyhow::Error>;
182			async fn delete_blocks(&self, network_slug: &str) -> Result<(), anyhow::Error>;
183		}
184
185		impl Clone for BlockStorage {
186			fn clone(&self) -> Self {
187				Self::new()
188			}
189		}
190	}
191	fn create_test_network(name: &str, slug: &str, store_blocks: bool) -> Network {
192		NetworkBuilder::new()
193			.name(name)
194			.slug(slug)
195			.store_blocks(store_blocks)
196			.build()
197	}
198
199	#[tokio::test]
200	async fn test_normal_block_sequence() {
201		let mock_storage = MockBlockStorage::new();
202
203		let tracker = BlockTracker::new(5, Some(Arc::new(mock_storage)));
204		let network = create_test_network("test-net", "test_net", true);
205
206		// Process blocks in sequence
207		tracker.record_block(&network, 1).await.unwrap();
208		tracker.record_block(&network, 2).await.unwrap();
209		tracker.record_block(&network, 3).await.unwrap();
210
211		assert_eq!(tracker.get_last_block("test_net").await, Some(3));
212	}
213
214	#[tokio::test]
215	async fn test_history_size_limit() {
216		let mock_storage = MockBlockStorage::new();
217
218		let tracker = BlockTracker::new(3, Some(Arc::new(mock_storage)));
219		let network = create_test_network("test-net", "test_net", true);
220
221		// Process 5 blocks with a history limit of 3
222		for i in 1..=5 {
223			tracker.record_block(&network, i).await.unwrap();
224		}
225
226		let history = tracker.block_history.lock().await;
227		let network_history = history
228			.get(&network.slug)
229			.expect("Network history should exist");
230
231		// Verify we only kept the last 3 blocks
232		assert_eq!(network_history.len(), 3);
233		assert_eq!(network_history.front(), Some(&3)); // Oldest block
234		assert_eq!(network_history.back(), Some(&5)); // Newest block
235	}
236
237	#[tokio::test]
238	async fn test_missed_blocks_with_storage() {
239		let mut mock_storage = MockBlockStorage::new();
240
241		// Expect block 2 to be recorded as missed
242		mock_storage
243			.expect_save_missed_block()
244			.with(
245				mockall::predicate::eq("test_net"),
246				mockall::predicate::eq(2),
247			)
248			.times(1)
249			.returning(|_, _| Ok(()));
250
251		let tracker = BlockTracker::new(5, Some(Arc::new(mock_storage)));
252		let network = create_test_network("test-net", "test_net", true);
253
254		// Process block 1
255		tracker.record_block(&network, 1).await.unwrap();
256		// Skip block 2 and process block 3
257		tracker.record_block(&network, 3).await.unwrap();
258	}
259
260	#[tokio::test]
261	async fn test_out_of_order_blocks() {
262		let mock_storage = MockBlockStorage::new();
263
264		let tracker = BlockTracker::new(5, Some(Arc::new(mock_storage)));
265		let network = create_test_network("test-net", "test_net", true);
266
267		// Process blocks out of order
268		tracker.record_block(&network, 2).await.unwrap();
269		tracker.record_block(&network, 1).await.unwrap();
270
271		assert_eq!(tracker.get_last_block("test_net").await, Some(1));
272	}
273
274	#[tokio::test]
275	async fn test_multiple_networks() {
276		let mock_storage = MockBlockStorage::new();
277
278		let tracker = BlockTracker::new(5, Some(Arc::new(mock_storage)));
279		let network1 = create_test_network("net-1", "net_1", true);
280		let network2 = create_test_network("net-2", "net_2", true);
281
282		// Process blocks for both networks
283		tracker.record_block(&network1, 1).await.unwrap();
284		tracker.record_block(&network2, 100).await.unwrap();
285		tracker.record_block(&network1, 2).await.unwrap();
286		tracker.record_block(&network2, 101).await.unwrap();
287
288		assert_eq!(tracker.get_last_block("net_1").await, Some(2));
289		assert_eq!(tracker.get_last_block("net_2").await, Some(101));
290	}
291
292	#[tokio::test]
293	async fn test_get_last_block_empty_network() {
294		let tracker = BlockTracker::new(5, None::<Arc<MockBlockStorage>>);
295		assert_eq!(tracker.get_last_block("nonexistent").await, None);
296	}
297
298	#[tokio::test]
299	async fn test_save_missed_block_record() {
300		let mut mock_storage = MockBlockStorage::new();
301
302		mock_storage
303			.expect_save_missed_block()
304			.with(
305				mockall::predicate::eq("test_network"),
306				mockall::predicate::eq(2),
307			)
308			.times(1)
309			.returning(|_, _| Ok(()));
310
311		let tracker = BlockTracker::new(5, Some(Arc::new(mock_storage)));
312		let network = create_test_network("test-network", "test_network", true);
313
314		// This should trigger save_last_processed_block
315		tracker.record_block(&network, 1).await.unwrap();
316		// This should trigger save_missed_block for block 2
317		tracker.record_block(&network, 3).await.unwrap();
318	}
319}