1use anyhow::Context;
7use futures::{channel::mpsc, future::BoxFuture, stream::StreamExt, SinkExt};
8use std::{
9 collections::{BTreeMap, HashMap},
10 sync::Arc,
11};
12use tokio::sync::RwLock;
13use tokio_cron_scheduler::{Job, JobScheduler};
14use tracing::instrument;
15
16use crate::{
17 models::{BlockType, Network, ProcessedBlock},
18 services::{
19 blockchain::BlockChainClient,
20 blockwatcher::{
21 error::BlockWatcherError,
22 storage::BlockStorage,
23 tracker::{BlockTracker, BlockTrackerTrait},
24 },
25 },
26};
27
28#[async_trait::async_trait]
34pub trait JobSchedulerTrait: Send + Sync + Sized {
35 async fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>>;
36 async fn add(&self, job: Job) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
37 async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
38 async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>>;
39}
40
41#[async_trait::async_trait]
43impl JobSchedulerTrait for JobScheduler {
44 async fn new() -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
45 Self::new().await.map_err(Into::into)
46 }
47
48 async fn add(&self, job: Job) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
49 self.add(job).await.map(|_| ()).map_err(Into::into)
50 }
51
52 async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
53 self.start().await.map(|_| ()).map_err(Into::into)
54 }
55
56 async fn shutdown(&mut self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
57 self.shutdown().await.map(|_| ()).map_err(Into::into)
58 }
59}
60
61pub struct NetworkBlockWatcher<S, H, T, J>
72where
73 J: JobSchedulerTrait,
74{
75 pub network: Network,
76 pub block_storage: Arc<S>,
77 pub block_handler: Arc<H>,
78 pub trigger_handler: Arc<T>,
79 pub scheduler: J,
80 pub block_tracker: Arc<BlockTracker<S>>,
81}
82
83type BlockWatchersMap<S, H, T, J> = HashMap<String, NetworkBlockWatcher<S, H, T, J>>;
85
86pub struct BlockWatcherService<S, H, T, J>
97where
98 J: JobSchedulerTrait,
99{
100 pub block_storage: Arc<S>,
101 pub block_handler: Arc<H>,
102 pub trigger_handler: Arc<T>,
103 pub active_watchers: Arc<RwLock<BlockWatchersMap<S, H, T, J>>>,
104 pub block_tracker: Arc<BlockTracker<S>>,
105}
106
107impl<S, H, T, J> NetworkBlockWatcher<S, H, T, J>
108where
109 S: BlockStorage + Send + Sync + 'static,
110 H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
111 T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
112 J: JobSchedulerTrait,
113{
114 pub async fn new(
124 network: Network,
125 block_storage: Arc<S>,
126 block_handler: Arc<H>,
127 trigger_handler: Arc<T>,
128 block_tracker: Arc<BlockTracker<S>>,
129 ) -> Result<Self, BlockWatcherError> {
130 let scheduler = J::new().await.map_err(|e| {
131 BlockWatcherError::scheduler_error(
132 e.to_string(),
133 Some(e),
134 Some(HashMap::from([(
135 "network".to_string(),
136 network.slug.clone(),
137 )])),
138 )
139 })?;
140 Ok(Self {
141 network,
142 block_storage,
143 block_handler,
144 trigger_handler,
145 scheduler,
146 block_tracker,
147 })
148 }
149
150 pub async fn start<C: BlockChainClient + Clone + Send + 'static>(
155 &mut self,
156 rpc_client: C,
157 ) -> Result<(), BlockWatcherError> {
158 let network = self.network.clone();
159 let block_storage = self.block_storage.clone();
160 let block_handler = self.block_handler.clone();
161 let trigger_handler = self.trigger_handler.clone();
162 let block_tracker = self.block_tracker.clone();
163
164 let job = Job::new_async(self.network.cron_schedule.as_str(), move |_uuid, _l| {
165 let network = network.clone();
166 let block_storage = block_storage.clone();
167 let block_handler = block_handler.clone();
168 let block_tracker = block_tracker.clone();
169 let rpc_client = rpc_client.clone();
170 let trigger_handler = trigger_handler.clone();
171 Box::pin(async move {
172 let _ = process_new_blocks(
173 &network,
174 &rpc_client,
175 block_storage,
176 block_handler,
177 trigger_handler,
178 block_tracker,
179 )
180 .await
181 .map_err(|e| {
182 BlockWatcherError::processing_error(
183 "Failed to process blocks".to_string(),
184 Some(e.into()),
185 Some(HashMap::from([(
186 "network".to_string(),
187 network.slug.clone(),
188 )])),
189 )
190 });
191 })
192 })
193 .with_context(|| "Failed to create job")?;
194
195 self.scheduler.add(job).await.map_err(|e| {
196 BlockWatcherError::scheduler_error(
197 e.to_string(),
198 Some(e),
199 Some(HashMap::from([(
200 "network".to_string(),
201 self.network.slug.clone(),
202 )])),
203 )
204 })?;
205
206 self.scheduler.start().await.map_err(|e| {
207 BlockWatcherError::scheduler_error(
208 e.to_string(),
209 Some(e),
210 Some(HashMap::from([(
211 "network".to_string(),
212 self.network.slug.clone(),
213 )])),
214 )
215 })?;
216
217 tracing::info!("Started block watcher for network: {}", self.network.slug);
218 Ok(())
219 }
220
221 pub async fn stop(&mut self) -> Result<(), BlockWatcherError> {
225 self.scheduler.shutdown().await.map_err(|e| {
226 BlockWatcherError::scheduler_error(
227 e.to_string(),
228 Some(e),
229 Some(HashMap::from([(
230 "network".to_string(),
231 self.network.slug.clone(),
232 )])),
233 )
234 })?;
235
236 tracing::info!("Stopped block watcher for network: {}", self.network.slug);
237 Ok(())
238 }
239}
240
241impl<S, H, T, J> BlockWatcherService<S, H, T, J>
242where
243 S: BlockStorage + Send + Sync + 'static,
244 H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
245 T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
246 J: JobSchedulerTrait,
247{
248 pub async fn new(
255 block_storage: Arc<S>,
256 block_handler: Arc<H>,
257 trigger_handler: Arc<T>,
258 block_tracker: Arc<BlockTracker<S>>,
259 ) -> Result<Self, BlockWatcherError> {
260 Ok(BlockWatcherService {
261 block_storage,
262 block_handler,
263 trigger_handler,
264 active_watchers: Arc::new(RwLock::new(HashMap::new())),
265 block_tracker,
266 })
267 }
268
269 pub async fn start_network_watcher<C: BlockChainClient + Send + Clone + 'static>(
274 &self,
275 network: &Network,
276 rpc_client: C,
277 ) -> Result<(), BlockWatcherError> {
278 let mut watchers = self.active_watchers.write().await;
279
280 if watchers.contains_key(&network.slug) {
281 tracing::info!(
282 "Block watcher already running for network: {}",
283 network.slug
284 );
285 return Ok(());
286 }
287
288 let mut watcher = NetworkBlockWatcher::new(
289 network.clone(),
290 self.block_storage.clone(),
291 self.block_handler.clone(),
292 self.trigger_handler.clone(),
293 self.block_tracker.clone(),
294 )
295 .await?;
296
297 watcher.start(rpc_client).await?;
298 watchers.insert(network.slug.clone(), watcher);
299
300 Ok(())
301 }
302
303 pub async fn stop_network_watcher(&self, network_slug: &str) -> Result<(), BlockWatcherError> {
308 let mut watchers = self.active_watchers.write().await;
309
310 if let Some(mut watcher) = watchers.remove(network_slug) {
311 watcher.stop().await?;
312 }
313
314 Ok(())
315 }
316}
317
318#[instrument(skip_all, fields(network = network.slug))]
331pub async fn process_new_blocks<
332 S: BlockStorage,
333 C: BlockChainClient + Send + Clone + 'static,
334 H: Fn(BlockType, Network) -> BoxFuture<'static, ProcessedBlock> + Send + Sync + 'static,
335 T: Fn(&ProcessedBlock) -> tokio::task::JoinHandle<()> + Send + Sync + 'static,
336 TR: BlockTrackerTrait<S>,
337>(
338 network: &Network,
339 rpc_client: &C,
340 block_storage: Arc<S>,
341 block_handler: Arc<H>,
342 trigger_handler: Arc<T>,
343 block_tracker: Arc<TR>,
344) -> Result<(), BlockWatcherError> {
345 let start_time = std::time::Instant::now();
346
347 let last_processed_block = block_storage
348 .get_last_processed_block(&network.slug)
349 .await
350 .with_context(|| "Failed to get last processed block")?
351 .unwrap_or(0);
352
353 let latest_block = rpc_client
354 .get_latest_block_number()
355 .await
356 .with_context(|| "Failed to get latest block number")?;
357
358 let latest_confirmed_block = latest_block.saturating_sub(network.confirmation_blocks);
359
360 let recommended_past_blocks = network.get_recommended_past_blocks();
361
362 let max_past_blocks = network.max_past_blocks.unwrap_or(recommended_past_blocks);
363
364 let start_block = std::cmp::max(
366 last_processed_block + 1,
367 latest_confirmed_block.saturating_sub(max_past_blocks),
368 );
369
370 tracing::info!(
371 "Processing blocks:\n\tLast processed block: {}\n\tLatest confirmed block: {}\n\tStart \
372 block: {}{}\n\tConfirmations required: {}\n\tMax past blocks: {}",
373 last_processed_block,
374 latest_confirmed_block,
375 start_block,
376 if start_block > last_processed_block + 1 {
377 format!(
378 " (skipped {} blocks)",
379 start_block - (last_processed_block + 1)
380 )
381 } else {
382 String::new()
383 },
384 network.confirmation_blocks,
385 max_past_blocks
386 );
387
388 let mut blocks = Vec::new();
389 if last_processed_block == 0 {
390 blocks = rpc_client
391 .get_blocks(latest_confirmed_block, None)
392 .await
393 .with_context(|| format!("Failed to get block {}", latest_confirmed_block))?;
394 } else if last_processed_block < latest_confirmed_block {
395 blocks = rpc_client
396 .get_blocks(start_block, Some(latest_confirmed_block))
397 .await
398 .with_context(|| {
399 format!(
400 "Failed to get blocks from {} to {}",
401 start_block, latest_confirmed_block
402 )
403 })?;
404 }
405
406 let (process_tx, process_rx) = mpsc::channel::<(BlockType, u64)>(blocks.len() * 2);
408 let (trigger_tx, trigger_rx) = mpsc::channel::<ProcessedBlock>(blocks.len() * 2);
409
410 let process_handle = tokio::spawn({
412 let network = network.clone();
413 let block_handler = block_handler.clone();
414 let mut trigger_tx = trigger_tx.clone();
415
416 async move {
417 let mut results = process_rx
419 .map(|(block, _)| {
420 let network = network.clone();
421 let block_handler = block_handler.clone();
422 async move { (block_handler)(block, network).await }
423 })
424 .buffer_unordered(32);
425
426 while let Some(result) = results.next().await {
428 trigger_tx
429 .send(result)
430 .await
431 .with_context(|| "Failed to send processed block")?;
432 }
433
434 Ok::<(), BlockWatcherError>(())
435 }
436 });
437
438 let trigger_handle = tokio::spawn({
440 let trigger_handler = trigger_handler.clone();
441
442 async move {
443 let mut trigger_rx = trigger_rx;
444 let mut pending_blocks = BTreeMap::new();
445 let mut next_block_number = Some(start_block);
446
447 while let Some(processed_block) = trigger_rx.next().await {
449 let block_number = processed_block.block_number;
450 pending_blocks.insert(block_number, processed_block);
451
452 while let Some(expected) = next_block_number {
454 if let Some(block) = pending_blocks.remove(&expected) {
455 (trigger_handler)(&block);
456 next_block_number = Some(expected + 1);
457 } else {
458 break;
459 }
460 }
461 }
462
463 while let Some(min_block) = pending_blocks.keys().next().copied() {
465 if let Some(block) = pending_blocks.remove(&min_block) {
466 (trigger_handler)(&block);
467 }
468 }
469 Ok::<(), BlockWatcherError>(())
470 }
471 });
472
473 futures::future::join_all(blocks.iter().map(|block| {
475 let network = network.clone();
476 let block_tracker = block_tracker.clone();
477 let mut process_tx = process_tx.clone();
478 async move {
479 let block_number = block.number().unwrap_or(0);
480
481 block_tracker.record_block(&network, block_number).await?;
483
484 process_tx
486 .send((block.clone(), block_number))
487 .await
488 .with_context(|| "Failed to send block to pipeline")?;
489
490 Ok::<(), BlockWatcherError>(())
491 }
492 }))
493 .await
494 .into_iter()
495 .collect::<Result<Vec<_>, _>>()
496 .with_context(|| format!("Failed to process blocks for network {}", network.slug))?;
497
498 drop(process_tx);
500 drop(trigger_tx);
501
502 let (_process_result, _trigger_result) = tokio::join!(process_handle, trigger_handle);
504
505 if network.store_blocks.unwrap_or(false) {
506 block_storage
508 .delete_blocks(&network.slug)
509 .await
510 .with_context(|| "Failed to delete old blocks")?;
511
512 block_storage
513 .save_blocks(&network.slug, &blocks)
514 .await
515 .with_context(|| "Failed to save blocks")?;
516 }
517 block_storage
519 .save_last_processed_block(&network.slug, latest_confirmed_block)
520 .await
521 .with_context(|| "Failed to save last processed block")?;
522
523 tracing::info!(
524 "Processed {} blocks in {}ms",
525 blocks.len(),
526 start_time.elapsed().as_millis()
527 );
528
529 Ok(())
530}