openzeppelin_monitor/services/blockwatcher/
storage.rs

1//! Block storage implementations for the block watcher service.
2//!
3//! This module provides storage interfaces and implementations for persisting
4//! blockchain blocks and tracking processing state. Currently supports:
5//! - File-based storage with JSON serialization
6//! - Last processed block tracking
7//! - Block deletion for cleanup
8
9use async_trait::async_trait;
10use glob::glob;
11use std::path::PathBuf;
12
13use crate::models::BlockType;
14
15/// Interface for block storage implementations
16///
17/// Defines the required functionality for storing and retrieving blocks
18/// and tracking the last processed block for each network.
19#[async_trait]
20pub trait BlockStorage: Clone + Send + Sync {
21	/// Retrieves the last processed block number for a network
22	///
23	/// # Arguments
24	/// * `network_id` - Unique identifier for the network
25	///
26	/// # Returns
27	/// * `Result<Option<u64>, anyhow::Error>` - Last processed block number or None if not found
28	async fn get_last_processed_block(
29		&self,
30		network_id: &str,
31	) -> Result<Option<u64>, anyhow::Error>;
32
33	/// Saves the last processed block number for a network
34	///
35	/// # Arguments
36	/// * `network_id` - Unique identifier for the network
37	/// * `block` - Block number to save
38	///
39	/// # Returns
40	/// * `Result<(), anyhow::Error>` - Success or error
41	async fn save_last_processed_block(
42		&self,
43		network_id: &str,
44		block: u64,
45	) -> Result<(), anyhow::Error>;
46
47	/// Saves a collection of blocks for a network
48	///
49	/// # Arguments
50	/// * `network_id` - Unique identifier for the network
51	/// * `blocks` - Collection of blocks to save
52	///
53	/// # Returns
54	/// * `Result<(), anyhow::Error>` - Success or error
55	async fn save_blocks(
56		&self,
57		network_id: &str,
58		blocks: &[BlockType],
59	) -> Result<(), anyhow::Error>;
60
61	/// Deletes all stored blocks for a network
62	///
63	/// # Arguments
64	/// * `network_id` - Unique identifier for the network
65	///
66	/// # Returns
67	/// * `Result<(), anyhow::Error>` - Success or error
68	async fn delete_blocks(&self, network_id: &str) -> Result<(), anyhow::Error>;
69
70	/// Saves a missed block for a network
71	///
72	/// # Arguments
73	/// * `network_id` - Unique identifier for the network
74	/// * `block` - Block number to save
75	///
76	/// # Returns
77	/// * `Result<(), anyhow::Error>` - Success or error
78	async fn save_missed_block(&self, network_id: &str, block: u64) -> Result<(), anyhow::Error>;
79}
80
81/// File-based implementation of block storage
82///
83/// Stores blocks and processing state in JSON files within a configured
84/// directory structure.
85#[derive(Clone)]
86pub struct FileBlockStorage {
87	/// Base path for all storage files
88	storage_path: PathBuf,
89}
90
91impl FileBlockStorage {
92	/// Creates a new file-based block storage instance
93	///
94	/// Initializes storage with the provided path
95	pub fn new(storage_path: PathBuf) -> Self {
96		FileBlockStorage { storage_path }
97	}
98}
99
100impl Default for FileBlockStorage {
101	/// Default implementation for FileBlockStorage
102	///
103	/// Initializes storage with the default path "data"
104	fn default() -> Self {
105		FileBlockStorage::new(PathBuf::from("data"))
106	}
107}
108
109#[async_trait]
110impl BlockStorage for FileBlockStorage {
111	/// Retrieves the last processed block from a network-specific file
112	///
113	/// The file is named "{network_id}_last_block.txt"
114	async fn get_last_processed_block(
115		&self,
116		network_id: &str,
117	) -> Result<Option<u64>, anyhow::Error> {
118		let file_path = self
119			.storage_path
120			.join(format!("{}_last_block.txt", network_id));
121
122		if !file_path.exists() {
123			return Ok(None);
124		}
125
126		let content = tokio::fs::read_to_string(file_path)
127			.await
128			.map_err(|e| anyhow::anyhow!("Failed to read last processed block: {}", e))?;
129		let block_number = content
130			.trim()
131			.parse::<u64>()
132			.map_err(|e| anyhow::anyhow!("Failed to parse last processed block: {}", e))?;
133		Ok(Some(block_number))
134	}
135
136	/// Saves the last processed block to a network-specific file
137	///
138	/// # Note
139	/// Overwrites any existing last block file for the network
140	async fn save_last_processed_block(
141		&self,
142		network_id: &str,
143		block: u64,
144	) -> Result<(), anyhow::Error> {
145		let file_path = self
146			.storage_path
147			.join(format!("{}_last_block.txt", network_id));
148		tokio::fs::write(file_path, block.to_string())
149			.await
150			.map_err(|e| anyhow::anyhow!("Failed to save last processed block: {}", e))?;
151		Ok(())
152	}
153
154	/// Saves blocks to a timestamped JSON file
155	///
156	/// # Note
157	/// Creates a new file for each save operation, named:
158	/// "{network_id}_blocks_{timestamp}.json"
159	async fn save_blocks(
160		&self,
161		network_slug: &str,
162		blocks: &[BlockType],
163	) -> Result<(), anyhow::Error> {
164		let file_path = self.storage_path.join(format!(
165			"{}_blocks_{}.json",
166			network_slug,
167			chrono::Utc::now().timestamp()
168		));
169		let json = serde_json::to_string(blocks)
170			.map_err(|e| anyhow::anyhow!("Failed to serialize blocks: {}", e))?;
171		tokio::fs::write(file_path, json)
172			.await
173			.map_err(|e| anyhow::anyhow!("Failed to save blocks: {}", e))?;
174		Ok(())
175	}
176
177	/// Deletes all block files for a network
178	///
179	/// # Note
180	/// Uses glob pattern matching to find and delete all files matching:
181	/// "{network_id}_blocks_*.json"
182	async fn delete_blocks(&self, network_slug: &str) -> Result<(), anyhow::Error> {
183		let pattern = self
184			.storage_path
185			.join(format!("{}_blocks_*.json", network_slug))
186			.to_string_lossy()
187			.to_string();
188
189		for entry in glob(&pattern)
190			.map_err(|e| anyhow::anyhow!("Failed to parse blocks: {}", e))?
191			.flatten()
192		{
193			tokio::fs::remove_file(entry)
194				.await
195				.map_err(|e| anyhow::anyhow!("Failed to delete blocks: {}", e))?;
196		}
197		Ok(())
198	}
199
200	/// Saves a missed block for a network
201	///
202	/// # Arguments
203	/// * `network_id` - Unique identifier for the network
204	/// * `block` - Block number to save
205	///
206	/// # Returns
207	/// * `Result<(), anyhow::Error>` - Success or error
208	async fn save_missed_block(&self, network_id: &str, block: u64) -> Result<(), anyhow::Error> {
209		let file_path = self
210			.storage_path
211			.join(format!("{}_missed_blocks.txt", network_id));
212
213		// Open file in append mode, create if it doesn't exist
214		let mut file = tokio::fs::OpenOptions::new()
215			.create(true)
216			.append(true)
217			.open(file_path)
218			.await
219			.map_err(|e| anyhow::anyhow!("Failed to create missed block file: {}", e))?;
220
221		// Write the block number followed by a newline
222		tokio::io::AsyncWriteExt::write_all(&mut file, format!("{}\n", block).as_bytes())
223			.await
224			.map_err(|e| anyhow::anyhow!("Failed to save missed block: {}", e))?;
225
226		Ok(())
227	}
228}
229
230#[cfg(test)]
231mod tests {
232	use super::*;
233	use tempfile;
234
235	#[tokio::test]
236	async fn test_get_last_processed_block() {
237		let temp_dir = tempfile::tempdir().unwrap();
238		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
239
240		// Test 1: existing file
241		let existing_file = temp_dir.path().join("existing_last_block.txt");
242		tokio::fs::write(&existing_file, "100").await.unwrap();
243		let result = storage.get_last_processed_block("existing").await;
244		assert!(result.is_ok());
245		assert_eq!(result.unwrap(), Some(100));
246
247		// Test 2: Non-existent file
248		let result = storage.get_last_processed_block("non_existent").await;
249		assert!(result.is_ok());
250		assert_eq!(result.unwrap(), None);
251
252		// Test 3: Invalid content (not a number)
253		let invalid_file = temp_dir.path().join("invalid_last_block.txt");
254		tokio::fs::write(&invalid_file, "not a number")
255			.await
256			.unwrap();
257		let result = storage.get_last_processed_block("invalid").await;
258		assert!(result.is_err());
259		let err = result.unwrap_err();
260		assert!(err
261			.to_string()
262			.contains("Failed to parse last processed block"));
263		assert!(err.to_string().contains("invalid"));
264
265		// Test 4: Valid block number
266		let valid_file = temp_dir.path().join("valid_last_block.txt");
267		tokio::fs::write(&valid_file, "123").await.unwrap();
268		let result = storage.get_last_processed_block("valid").await;
269		assert_eq!(result.unwrap(), Some(123));
270	}
271
272	#[tokio::test]
273	async fn test_save_last_processed_block() {
274		let temp_dir = tempfile::tempdir().unwrap();
275		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
276
277		// Test 1: Normal save
278		let result = storage.save_last_processed_block("test", 100).await;
279		assert!(result.is_ok());
280
281		// Verify the content
282		let content = tokio::fs::read_to_string(temp_dir.path().join("test_last_block.txt"))
283			.await
284			.unwrap();
285		assert_eq!(content, "100");
286
287		// Test 2: Save with invalid path (create a readonly directory)
288		#[cfg(unix)]
289		{
290			use std::os::unix::fs::PermissionsExt;
291			let readonly_dir = temp_dir.path().join("readonly");
292			tokio::fs::create_dir(&readonly_dir).await.unwrap();
293			let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
294			perms.set_mode(0o444); // Read-only
295			std::fs::set_permissions(&readonly_dir, perms).unwrap();
296
297			let readonly_storage = FileBlockStorage::new(readonly_dir);
298			let result = readonly_storage
299				.save_last_processed_block("test", 100)
300				.await;
301			assert!(result.is_err());
302			let err = result.unwrap_err();
303			assert!(err
304				.to_string()
305				.contains("Failed to save last processed block"));
306			assert!(err.to_string().contains("Permission denied"));
307		}
308	}
309
310	#[tokio::test]
311	async fn test_save_blocks() {
312		let temp_dir = tempfile::tempdir().unwrap();
313		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
314
315		// Test 1: Save empty blocks array
316		let result = storage.save_blocks("test", &[]).await;
317		assert!(result.is_ok());
318
319		// Test 2: Save with invalid path
320		#[cfg(unix)]
321		{
322			use std::os::unix::fs::PermissionsExt;
323			let readonly_dir = temp_dir.path().join("readonly");
324			tokio::fs::create_dir(&readonly_dir).await.unwrap();
325			let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
326			perms.set_mode(0o444); // Read-only
327			std::fs::set_permissions(&readonly_dir, perms).unwrap();
328
329			let readonly_storage = FileBlockStorage::new(readonly_dir);
330			let result = readonly_storage.save_blocks("test", &[]).await;
331			assert!(result.is_err());
332			let err = result.unwrap_err();
333			assert!(err.to_string().contains("Failed to save blocks"));
334			assert!(err.to_string().contains("Permission denied"));
335		}
336	}
337
338	#[tokio::test]
339	async fn test_delete_blocks() {
340		let temp_dir = tempfile::tempdir().unwrap();
341		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
342
343		// Create some test block files
344		tokio::fs::write(temp_dir.path().join("test_blocks_1.json"), "[]")
345			.await
346			.unwrap();
347		tokio::fs::write(temp_dir.path().join("test_blocks_2.json"), "[]")
348			.await
349			.unwrap();
350
351		// Test 1: Normal delete
352		let result = storage.delete_blocks("test").await;
353		assert!(result.is_ok());
354
355		// Test 2: Delete with invalid path
356		#[cfg(unix)]
357		{
358			use std::os::unix::fs::PermissionsExt;
359			let readonly_dir = temp_dir.path().join("readonly");
360			tokio::fs::create_dir(&readonly_dir).await.unwrap();
361
362			// Create test files first
363			tokio::fs::write(readonly_dir.join("test_blocks_1.json"), "[]")
364				.await
365				.unwrap();
366
367			// Then make directory readonly
368			let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
369			perms.set_mode(0o555); // Read-only directory with execute permission
370			std::fs::set_permissions(&readonly_dir, perms).unwrap();
371
372			let readonly_storage = FileBlockStorage::new(readonly_dir);
373			let result = readonly_storage.delete_blocks("test").await;
374			assert!(result.is_err());
375			let err = result.unwrap_err();
376			assert!(err.to_string().contains("Failed to delete blocks"));
377			assert!(err.to_string().contains("Permission denied"));
378		}
379	}
380
381	#[tokio::test]
382	async fn test_save_missed_block() {
383		let temp_dir = tempfile::tempdir().unwrap();
384		let storage = FileBlockStorage::new(temp_dir.path().to_path_buf());
385
386		// Test 1: Normal save
387		let result = storage.save_missed_block("test", 100).await;
388		assert!(result.is_ok());
389
390		// Verify the content
391		let content = tokio::fs::read_to_string(temp_dir.path().join("test_missed_blocks.txt"))
392			.await
393			.unwrap();
394		assert_eq!(content, "100\n");
395
396		// Test 2: Save with invalid path
397		#[cfg(unix)]
398		{
399			use std::os::unix::fs::PermissionsExt;
400			let readonly_dir = temp_dir.path().join("readonly");
401			tokio::fs::create_dir(&readonly_dir).await.unwrap();
402			let mut perms = std::fs::metadata(&readonly_dir).unwrap().permissions();
403			perms.set_mode(0o444); // Read-only
404			std::fs::set_permissions(&readonly_dir, perms).unwrap();
405
406			let readonly_storage = FileBlockStorage::new(readonly_dir);
407			let result = readonly_storage.save_missed_block("test", 100).await;
408			assert!(result.is_err());
409			let err = result.unwrap_err();
410
411			assert!(err
412				.to_string()
413				.contains("Failed to create missed block file"));
414			assert!(err.to_string().contains("Permission denied"));
415		}
416	}
417}