openzeppelin_monitor/services/blockwatcher/
storage.rs1use async_trait::async_trait;
10use glob::glob;
11use std::path::PathBuf;
12
13use crate::models::BlockType;
14
15#[async_trait]
20pub trait BlockStorage: Clone + Send + Sync {
21 async fn get_last_processed_block(
29 &self,
30 network_id: &str,
31 ) -> Result<Option<u64>, anyhow::Error>;
32
33 async fn save_last_processed_block(
42 &self,
43 network_id: &str,
44 block: u64,
45 ) -> Result<(), anyhow::Error>;
46
47 async fn save_blocks(
56 &self,
57 network_id: &str,
58 blocks: &[BlockType],
59 ) -> Result<(), anyhow::Error>;
60
61 async fn delete_blocks(&self, network_id: &str) -> Result<(), anyhow::Error>;
69
70 async fn save_missed_block(&self, network_id: &str, block: u64) -> Result<(), anyhow::Error>;
79}
80
81#[derive(Clone)]
86pub struct FileBlockStorage {
87 storage_path: PathBuf,
89}
90
91impl FileBlockStorage {
92 pub fn new(storage_path: PathBuf) -> Self {
96 FileBlockStorage { storage_path }
97 }
98}
99
100impl Default for FileBlockStorage {
101 fn default() -> Self {
105 FileBlockStorage::new(PathBuf::from("data"))
106 }
107}
108
109#[async_trait]
110impl BlockStorage for FileBlockStorage {
111 async fn get_last_processed_block(
115 &self,
116 network_id: &str,
117 ) -> Result<Option<u64>, anyhow::Error> {
118 let file_path = self
119 .storage_path
120 .join(format!("{}_last_block.txt", network_id));
121
122 if !file_path.exists() {
123 return Ok(None);
124 }
125
126 let content = tokio::fs::read_to_string(file_path)
127 .await
128 .map_err(|e| anyhow::anyhow!("Failed to read last processed block: {}", e))?;
129 let block_number = content
130 .trim()
131 .parse::<u64>()
132 .map_err(|e| anyhow::anyhow!("Failed to parse last processed block: {}", e))?;
133 Ok(Some(block_number))
134 }
135
136 async fn save_last_processed_block(
141 &self,
142 network_id: &str,
143 block: u64,
144 ) -> Result<(), anyhow::Error> {
145 let file_path = self
146 .storage_path
147 .join(format!("{}_last_block.txt", network_id));
148 tokio::fs::write(file_path, block.to_string())
149 .await
150 .map_err(|e| anyhow::anyhow!("Failed to save last processed block: {}", e))?;
151 Ok(())
152 }
153
154 async fn save_blocks(
160 &self,
161 network_slug: &str,
162 blocks: &[BlockType],
163 ) -> Result<(), anyhow::Error> {
164 let file_path = self.storage_path.join(format!(
165 "{}_blocks_{}.json",
166 network_slug,
167 chrono::Utc::now().timestamp()
168 ));
169 let json = serde_json::to_string(blocks)
170 .map_err(|e| anyhow::anyhow!("Failed to serialize blocks: {}", e))?;
171 tokio::fs::write(file_path, json)
172 .await
173 .map_err(|e| anyhow::anyhow!("Failed to save blocks: {}", e))?;
174 Ok(())
175 }
176
177 async fn delete_blocks(&self, network_slug: &str) -> Result<(), anyhow::Error> {
183 let pattern = self
184 .storage_path
185 .join(format!("{}_blocks_*.json", network_slug))
186 .to_string_lossy()
187 .to_string();
188
189 for entry in glob(&pattern)
190 .map_err(|e| anyhow::anyhow!("Failed to parse blocks: {}", e))?
191 .flatten()
192 {
193 tokio::fs::remove_file(entry)
194 .await
195 .map_err(|e| anyhow::anyhow!("Failed to delete blocks: {}", e))?;
196 }
197 Ok(())
198 }
199
200 async fn save_missed_block(&self, network_id: &str, block: u64) -> Result<(), anyhow::Error> {
209 let file_path = self
210 .storage_path
211 .join(format!("{}_missed_blocks.txt", network_id));
212
213 let mut file = tokio::fs::OpenOptions::new()
215 .create(true)
216 .append(true)
217 .open(file_path)
218 .await
219 .map_err(|e| anyhow::anyhow!("Failed to create missed block file: {}", e))?;
220
221 tokio::io::AsyncWriteExt::write_all(&mut file, format!("{}\n", block).as_bytes())
223 .await
224 .map_err(|e| anyhow::anyhow!("Failed to save missed block: {}", e))?;
225
226 Ok(())
227 }
228}
229
230#[cfg(test)]
231mod tests {
232 use super::*;
233 use tempfile;
234
235 #[tokio::test]
236 async fn test_get_last_processed_block() {
237 let temp_dir = tempfile::tempdir().unwrap();
238 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
239
240 let existing_file = temp_dir.path().join("existing_last_block.txt");
242 tokio::fs::write(&existing_file, "100").await.unwrap();
243 let result = storage.get_last_processed_block("existing").await;
244 assert!(result.is_ok());
245 assert_eq!(result.unwrap(), Some(100));
246
247 let result = storage.get_last_processed_block("non_existent").await;
249 assert!(result.is_ok());
250 assert_eq!(result.unwrap(), None);
251
252 let invalid_file = temp_dir.path().join("invalid_last_block.txt");
254 tokio::fs::write(&invalid_file, "not a number")
255 .await
256 .unwrap();
257 let result = storage.get_last_processed_block("invalid").await;
258 assert!(result.is_err());
259 let err = result.unwrap_err();
260 assert!(err
261 .to_string()
262 .contains("Failed to parse last processed block"));
263 assert!(err.to_string().contains("invalid"));
264
265 let valid_file = temp_dir.path().join("valid_last_block.txt");
267 tokio::fs::write(&valid_file, "123").await.unwrap();
268 let result = storage.get_last_processed_block("valid").await;
269 assert_eq!(result.unwrap(), Some(123));
270 }
271
272 #[tokio::test]
273 async fn test_save_last_processed_block() {
274 let temp_dir = tempfile::tempdir().unwrap();
275 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
276
277 let result = storage.save_last_processed_block("test", 100).await;
279 assert!(result.is_ok());
280
281 let content = tokio::fs::read_to_string(temp_dir.path().join("test_last_block.txt"))
283 .await
284 .unwrap();
285 assert_eq!(content, "100");
286
287 #[cfg(unix)]
289 {
290 use std::os::unix::fs::PermissionsExt;
291 let readonly_dir = temp_dir.path().join("readonly");
292 tokio::fs::create_dir(&readonly_dir).await.unwrap();
293 let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
294 perms.set_mode(0o444); std::fs::set_permissions(&readonly_dir, perms).unwrap();
296
297 let readonly_storage = FileBlockStorage::new(readonly_dir);
298 let result = readonly_storage
299 .save_last_processed_block("test", 100)
300 .await;
301 assert!(result.is_err());
302 let err = result.unwrap_err();
303 assert!(err
304 .to_string()
305 .contains("Failed to save last processed block"));
306 assert!(err.to_string().contains("Permission denied"));
307 }
308 }
309
310 #[tokio::test]
311 async fn test_save_blocks() {
312 let temp_dir = tempfile::tempdir().unwrap();
313 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
314
315 let result = storage.save_blocks("test", &[]).await;
317 assert!(result.is_ok());
318
319 #[cfg(unix)]
321 {
322 use std::os::unix::fs::PermissionsExt;
323 let readonly_dir = temp_dir.path().join("readonly");
324 tokio::fs::create_dir(&readonly_dir).await.unwrap();
325 let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
326 perms.set_mode(0o444); std::fs::set_permissions(&readonly_dir, perms).unwrap();
328
329 let readonly_storage = FileBlockStorage::new(readonly_dir);
330 let result = readonly_storage.save_blocks("test", &[]).await;
331 assert!(result.is_err());
332 let err = result.unwrap_err();
333 assert!(err.to_string().contains("Failed to save blocks"));
334 assert!(err.to_string().contains("Permission denied"));
335 }
336 }
337
338 #[tokio::test]
339 async fn test_delete_blocks() {
340 let temp_dir = tempfile::tempdir().unwrap();
341 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
342
343 tokio::fs::write(temp_dir.path().join("test_blocks_1.json"), "[]")
345 .await
346 .unwrap();
347 tokio::fs::write(temp_dir.path().join("test_blocks_2.json"), "[]")
348 .await
349 .unwrap();
350
351 let result = storage.delete_blocks("test").await;
353 assert!(result.is_ok());
354
355 #[cfg(unix)]
357 {
358 use std::os::unix::fs::PermissionsExt;
359 let readonly_dir = temp_dir.path().join("readonly");
360 tokio::fs::create_dir(&readonly_dir).await.unwrap();
361
362 tokio::fs::write(readonly_dir.join("test_blocks_1.json"), "[]")
364 .await
365 .unwrap();
366
367 let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
369 perms.set_mode(0o555); std::fs::set_permissions(&readonly_dir, perms).unwrap();
371
372 let readonly_storage = FileBlockStorage::new(readonly_dir);
373 let result = readonly_storage.delete_blocks("test").await;
374 assert!(result.is_err());
375 let err = result.unwrap_err();
376 assert!(err.to_string().contains("Failed to delete blocks"));
377 assert!(err.to_string().contains("Permission denied"));
378 }
379 }
380
381 #[tokio::test]
382 async fn test_save_missed_block() {
383 let temp_dir = tempfile::tempdir().unwrap();
384 let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
385
386 let result = storage.save_missed_block("test", 100).await;
388 assert!(result.is_ok());
389
390 let content = tokio::fs::read_to_string(temp_dir.path().join("test_missed_blocks.txt"))
392 .await
393 .unwrap();
394 assert_eq!(content, "100\n");
395
396 #[cfg(unix)]
398 {
399 use std::os::unix::fs::PermissionsExt;
400 let readonly_dir = temp_dir.path().join("readonly");
401 tokio::fs::create_dir(&readonly_dir).await.unwrap();
402 let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
403 perms.set_mode(0o444); std::fs::set_permissions(&readonly_dir, perms).unwrap();
405
406 let readonly_storage = FileBlockStorage::new(readonly_dir);
407 let result = readonly_storage.save_missed_block("test", 100).await;
408 assert!(result.is_err());
409 let err = result.unwrap_err();
410
411 assert!(err
412 .to_string()
413 .contains("Failed to create missed block file"));
414 assert!(err.to_string().contains("Permission denied"));
415 }
416 }
417}