openzeppelin_monitor/services/blockwatcher/
tracker.rs1use async_trait::async_trait;
14use std::{
15 collections::{HashMap, VecDeque},
16 sync::Arc,
17};
18use tokio::sync::Mutex;
19
20use crate::{
21 models::Network,
22 services::blockwatcher::{error::BlockWatcherError, storage::BlockStorage},
23};
24
25#[async_trait]
29pub trait BlockTrackerTrait<S: BlockStorage> {
30 fn new(history_size: usize, storage: Option<Arc<S>>) -> Self;
31 async fn record_block(&self, network: &Network, block_number: u64)
32 -> Result<(), anyhow::Error>;
33 async fn get_last_block(&self, network_slug: &str) -> Option<u64>;
34}
35
36#[derive(Clone)]
46pub struct BlockTracker<S> {
47 block_history: Arc<Mutex<HashMap<String, VecDeque<u64>>>>,
50 history_size: usize,
52 storage: Option<Arc<S>>,
54}
55
56#[async_trait]
57impl<S: BlockStorage> BlockTrackerTrait<S> for BlockTracker<S> {
58 fn new(history_size: usize, storage: Option<Arc<S>>) -> Self {
69 Self {
70 block_history: Arc::new(Mutex::new(HashMap::new())),
71 history_size,
72 storage,
73 }
74 }
75
76 async fn record_block(
92 &self,
93 network: &Network,
94 block_number: u64,
95 ) -> Result<(), anyhow::Error> {
96 let mut history = self.block_history.lock().await;
97 let network_history = history
98 .entry(network.slug.clone())
99 .or_insert_with(|| VecDeque::with_capacity(self.history_size));
100
101 if let Some(&last_block) = network_history.back() {
103 if block_number > last_block + 1 {
104 for missed in (last_block + 1)..block_number {
106 BlockWatcherError::block_tracker_error(
107 format!("Missed block {}", missed),
108 None,
109 None,
110 );
111
112 if network.store_blocks.unwrap_or(false) {
113 if let Some(storage) = &self.storage {
114 if (storage.save_missed_block(&network.slug, missed).await).is_err() {
116 BlockWatcherError::storage_error(
117 format!("Failed to store missed block {}", missed),
118 None,
119 None,
120 );
121 }
122 }
123 }
124 }
125 } else if block_number <= last_block {
126 BlockWatcherError::block_tracker_error(
127 format!(
128 "Out of order or duplicate block detected: received {} after {}",
129 block_number, last_block
130 ),
131 None,
132 None,
133 );
134 }
135 }
136
137 network_history.push_back(block_number);
139
140 while network_history.len() > self.history_size {
142 network_history.pop_front();
143 }
144 Ok(())
145 }
146
147 async fn get_last_block(&self, network_slug: &str) -> Option<u64> {
158 self.block_history
159 .lock()
160 .await
161 .get(network_slug)
162 .and_then(|history| history.back().copied())
163 }
164}
165
166#[cfg(test)]
167mod tests {
168 use crate::{models::BlockType, utils::tests::network::NetworkBuilder};
169
170 use super::*;
171 use mockall::mock;
172
173 mock! {
175 pub BlockStorage {}
176 #[async_trait::async_trait]
177 impl BlockStorage for BlockStorage {
178 async fn save_missed_block(&self, network_slug: &str, block_number: u64) -> Result<(), anyhow::Error>;
179 async fn save_last_processed_block(&self, network_slug: &str, block_number: u64) -> Result<(), anyhow::Error>;
180 async fn get_last_processed_block(&self, network_slug: &str) -> Result<Option<u64>, anyhow::Error>;
181 async fn save_blocks(&self, network_slug: &str, blocks: &[BlockType]) -> Result<(), anyhow::Error>;
182 async fn delete_blocks(&self, network_slug: &str) -> Result<(), anyhow::Error>;
183 }
184
185 impl Clone for BlockStorage {
186 fn clone(&self) -> Self {
187 Self::new()
188 }
189 }
190 }
191 fn create_test_network(name: &str, slug: &str, store_blocks: bool) -> Network {
192 NetworkBuilder::new()
193 .name(name)
194 .slug(slug)
195 .store_blocks(store_blocks)
196 .build()
197 }
198
199 #[tokio::test]
200 async fn test_normal_block_sequence() {
201 let mock_storage = MockBlockStorage::new();
202
203 let tracker = BlockTracker::new(5, Some(Arc::new(mock_storage)));
204 let network = create_test_network("test-net", "test_net", true);
205
206 tracker.record_block(&network, 1).await.unwrap();
208 tracker.record_block(&network, 2).await.unwrap();
209 tracker.record_block(&network, 3).await.unwrap();
210
211 assert_eq!(tracker.get_last_block("test_net").await, Some(3));
212 }
213
214 #[tokio::test]
215 async fn test_history_size_limit() {
216 let mock_storage = MockBlockStorage::new();
217
218 let tracker = BlockTracker::new(3, Some(Arc::new(mock_storage)));
219 let network = create_test_network("test-net", "test_net", true);
220
221 for i in 1..=5 {
223 tracker.record_block(&network, i).await.unwrap();
224 }
225
226 let history = tracker.block_history.lock().await;
227 let network_history = history
228 .get(&network.slug)
229 .expect("Network history should exist");
230
231 assert_eq!(network_history.len(), 3);
233 assert_eq!(network_history.front(), Some(&3)); assert_eq!(network_history.back(), Some(&5)); }
236
237 #[tokio::test]
238 async fn test_missed_blocks_with_storage() {
239 let mut mock_storage = MockBlockStorage::new();
240
241 mock_storage
243 .expect_save_missed_block()
244 .with(
245 mockall::predicate::eq("test_net"),
246 mockall::predicate::eq(2),
247 )
248 .times(1)
249 .returning(|_, _| Ok(()));
250
251 let tracker = BlockTracker::new(5, Some(Arc::new(mock_storage)));
252 let network = create_test_network("test-net", "test_net", true);
253
254 tracker.record_block(&network, 1).await.unwrap();
256 tracker.record_block(&network, 3).await.unwrap();
258 }
259
260 #[tokio::test]
261 async fn test_out_of_order_blocks() {
262 let mock_storage = MockBlockStorage::new();
263
264 let tracker = BlockTracker::new(5, Some(Arc::new(mock_storage)));
265 let network = create_test_network("test-net", "test_net", true);
266
267 tracker.record_block(&network, 2).await.unwrap();
269 tracker.record_block(&network, 1).await.unwrap();
270
271 assert_eq!(tracker.get_last_block("test_net").await, Some(1));
272 }
273
274 #[tokio::test]
275 async fn test_multiple_networks() {
276 let mock_storage = MockBlockStorage::new();
277
278 let tracker = BlockTracker::new(5, Some(Arc::new(mock_storage)));
279 let network1 = create_test_network("net-1", "net_1", true);
280 let network2 = create_test_network("net-2", "net_2", true);
281
282 tracker.record_block(&network1, 1).await.unwrap();
284 tracker.record_block(&network2, 100).await.unwrap();
285 tracker.record_block(&network1, 2).await.unwrap();
286 tracker.record_block(&network2, 101).await.unwrap();
287
288 assert_eq!(tracker.get_last_block("net_1").await, Some(2));
289 assert_eq!(tracker.get_last_block("net_2").await, Some(101));
290 }
291
292 #[tokio::test]
293 async fn test_get_last_block_empty_network() {
294 let tracker = BlockTracker::new(5, None::<Arc<MockBlockStorage>>);
295 assert_eq!(tracker.get_last_block("nonexistent").await, None);
296 }
297
298 #[tokio::test]
299 async fn test_save_missed_block_record() {
300 let mut mock_storage = MockBlockStorage::new();
301
302 mock_storage
303 .expect_save_missed_block()
304 .with(
305 mockall::predicate::eq("test_network"),
306 mockall::predicate::eq(2),
307 )
308 .times(1)
309 .returning(|_, _| Ok(()));
310
311 let tracker = BlockTracker::new(5, Some(Arc::new(mock_storage)));
312 let network = create_test_network("test-network", "test_network", true);
313
314 tracker.record_block(&network, 1).await.unwrap();
316 tracker.record_block(&network, 3).await.unwrap();
318 }
319}