1use std::{collections::BTreeSet, fmt::Display, sync::Arc, time::Duration};
4
5use cardano_chain_follower::{ChainFollower, ChainSyncConfig, MultiEraBlock, Network, Point, Slot};
6use duration_string::DurationString;
7use futures::{stream::FuturesUnordered, StreamExt};
8use rand::{Rng, SeedableRng};
9use tokio::sync::watch;
10use tracing::{debug, error, info};
11
12use crate::{
13 db::index::{
14 block::{
15 index_block,
16 roll_forward::{self, PurgeCondition},
17 },
18 queries::sync_status::{
19 get::{get_sync_status, SyncStatus},
20 update::update_sync_status,
21 },
22 session::CassandraSession,
23 },
24 metrics::chain_indexer::metrics_updater,
25 service::utilities::health::{
26 set_follower_immutable_first_reached_tip, set_follower_live_first_reached_tip,
27 },
28 settings::{chain_follower, Settings},
29};
30
31pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
33
34async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
36 let chain = cfg.chain;
37 let dl_config = cfg.dl_config.clone();
38
39 let mut cfg = ChainSyncConfig::default_for(chain);
40 cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
41 info!(chain = %chain, "Starting Chain Sync Task");
42
43 if let Err(error) = cfg.run().await {
44 error!(chain=%chain, error=%error, "Failed to start Chain Sync Task");
45 Err(error)?;
46 }
47
48 Ok(())
49}
50
51#[derive(Clone)]
53struct SyncParams {
54 chain: Network,
56 start: Point,
58 end: Point,
60 first_indexed_block: Option<Point>,
62 first_is_immutable: bool,
64 last_indexed_block: Option<Point>,
66 last_is_immutable: bool,
68 total_blocks_synced: u64,
70 last_blocks_synced: u64,
72 retries: u64,
74 backoff_delay: Option<Duration>,
76 result: Arc<Option<anyhow::Result<()>>>,
78 follower_roll_forward: Option<Point>,
80}
81
82impl Display for SyncParams {
83 fn fmt(
84 &self,
85 f: &mut std::fmt::Formatter<'_>,
86 ) -> std::fmt::Result {
87 if self.result.is_none() {
88 write!(f, "Sync_Params {{ ")?;
89 } else {
90 write!(f, "Sync_Result {{ ")?;
91 }
92
93 write!(f, "start: {}, end: {}", self.start, self.end)?;
94
95 if let Some(first) = self.first_indexed_block.as_ref() {
96 write!(
97 f,
98 ", first_indexed_block: {first}{}",
99 if self.first_is_immutable { ":I" } else { "" }
100 )?;
101 }
102
103 if let Some(last) = self.last_indexed_block.as_ref() {
104 write!(
105 f,
106 ", last_indexed_block: {last}{}",
107 if self.last_is_immutable { ":I" } else { "" }
108 )?;
109 }
110
111 if self.retries > 0 {
112 write!(f, ", retries: {}", self.retries)?;
113 }
114
115 if self.retries > 0 || self.result.is_some() {
116 write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
117 }
118
119 if self.result.is_some() {
120 write!(f, ", last_sync: {}", self.last_blocks_synced)?;
121 }
122
123 if let Some(backoff) = self.backoff_delay.as_ref() {
124 write!(f, ", backoff: {}", DurationString::from(*backoff))?;
125 }
126
127 if let Some(result) = self.result.as_ref() {
128 match result {
129 Ok(()) => write!(f, ", Success")?,
130 Err(error) => write!(f, ", {error}")?,
131 }
132 }
133
134 f.write_str(" }")
135 }
136}
137
138const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
140
141impl SyncParams {
142 fn new(
144 chain: Network,
145 start: Point,
146 end: Point,
147 ) -> Self {
148 Self {
149 chain,
150 start,
151 end,
152 first_indexed_block: None,
153 first_is_immutable: false,
154 last_indexed_block: None,
155 last_is_immutable: false,
156 total_blocks_synced: 0,
157 last_blocks_synced: 0,
158 retries: 0,
159 backoff_delay: None,
160 result: Arc::new(None),
161 follower_roll_forward: None,
162 }
163 }
164
165 fn retry(&self) -> Self {
167 let retry_count = self.retries.saturating_add(1);
168
169 let mut backoff = None;
170
171 if self.last_blocks_synced == 0 {
174 backoff = match retry_count {
176 1 => Some(Duration::from_secs(1)), 2..5 => Some(Duration::from_secs(10)), _ => Some(Duration::from_secs(30)), };
180 }
181
182 let mut retry = self.clone();
183 retry.last_blocks_synced = 0;
184 retry.retries = retry_count;
185 retry.backoff_delay = backoff;
186 retry.result = Arc::new(None);
187 retry.follower_roll_forward = None;
188
189 retry
190 }
191
192 pub(crate) fn done(
194 &self,
195 first: Option<Point>,
196 first_immutable: bool,
197 last: Option<Point>,
198 last_immutable: bool,
199 synced: u64,
200 result: anyhow::Result<()>,
201 ) -> Self {
202 if result.is_ok() && self.end != Point::TIP {
203 update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
207 }
208 let mut done = self.clone();
209 done.first_indexed_block = first;
210 done.first_is_immutable = first_immutable;
211 done.last_indexed_block = last;
212 done.last_is_immutable = last_immutable;
213 done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
214 done.last_blocks_synced = synced;
215 done.result = Arc::new(Some(result));
216
217 done
218 }
219
220 fn actual_start(&self) -> Point {
222 self.last_indexed_block
223 .as_ref()
224 .unwrap_or(&self.start)
225 .clone()
226 }
227
228 async fn backoff(&self) {
234 if let Some(backoff) = self.backoff_delay {
235 let mut rng = rand::rngs::StdRng::from_entropy();
236 let actual_backoff =
237 rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
238
239 tokio::time::sleep(actual_backoff).await;
240 }
241 }
242}
243
244#[allow(clippy::too_many_lines)]
247fn sync_subchain(
248 params: SyncParams,
249 mut pending_blocks: watch::Receiver<BTreeSet<Slot>>,
250) -> tokio::task::JoinHandle<SyncParams> {
251 tokio::spawn(async move {
252 params.backoff().await;
254
255 CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
257 info!(chain=%params.chain, params=%params,"Starting Chain Indexing Task");
258
259 let mut first_indexed_block = params.first_indexed_block.clone();
260 let mut first_immutable = params.first_is_immutable;
261 let mut last_indexed_block = params.last_indexed_block.clone();
262 let mut last_immutable = params.last_is_immutable;
263 let mut blocks_synced = 0u64;
264
265 let mut follower =
266 ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
267 while let Some(chain_update) = follower.next().await {
268 let tips = ChainFollower::get_tips(params.chain).await;
269 let immutable_slot = tips.0.slot_or_default();
270 let live_slot = tips.1.slot_or_default();
271 metrics_updater::current_tip_slot(live_slot, immutable_slot);
272 match chain_update.kind {
273 cardano_chain_follower::Kind::ImmutableBlockRollForward => {
274 if params.end == Point::TIP {
276 info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
280 let mut result = params.done(
281 first_indexed_block,
282 first_immutable,
283 last_indexed_block,
284 last_immutable,
285 blocks_synced,
286 Ok(()),
287 );
288 result.follower_roll_forward = Some(chain_update.block_data().point());
291 return result;
292 }
293 },
294 cardano_chain_follower::Kind::Block => {
295 let block = chain_update.block_data();
296
297 if let Err(error) =
298 index_block(block, &mut pending_blocks, params.end.slot_or_default()).await
299 {
300 let error_msg = format!("Failed to index block {}", block.point());
301 error!(chain=%params.chain, error=%error, params=%params, error_msg);
302 return params.done(
303 first_indexed_block,
304 first_immutable,
305 last_indexed_block,
306 last_immutable,
307 blocks_synced,
308 Err(error.context(error_msg)),
309 );
310 }
311
312 if chain_update.tip && !set_follower_live_first_reached_tip() {
313 metrics_updater::reached_live_tip(true);
314 }
315
316 update_block_state(
317 block,
318 &mut first_indexed_block,
319 &mut first_immutable,
320 &mut last_indexed_block,
321 &mut last_immutable,
322 &mut blocks_synced,
323 );
324 },
325
326 cardano_chain_follower::Kind::Rollback => {
327 let rollback_slot = chain_update.block_data().slot();
329
330 let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
331 if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
332 error!(chain=%params.chain, error=%error,
333 "Chain follower rollback, purging volatile data task failed."
334 );
335 } else {
336 #[allow(clippy::arithmetic_side_effects)]
338 let purge_slots = params
339 .last_indexed_block
340 .as_ref()
341 .map_or(0.into(), |l| l.slot_or_default() - rollback_slot);
343
344 metrics_updater::forward_data_purge(purge_slots.into());
345
346 let block = chain_update.block_data();
348 if let Err(error) =
349 index_block(block, &mut pending_blocks, params.end.slot_or_default())
350 .await
351 {
352 let error_msg =
353 format!("Failed to index block after rollback {}", block.point());
354 error!(chain=%params.chain, error=%error, params=%params, error_msg);
355 return params.done(
356 first_indexed_block,
357 first_immutable,
358 last_indexed_block,
359 last_immutable,
360 blocks_synced,
361 Err(error.context(error_msg)),
362 );
363 }
364
365 update_block_state(
366 block,
367 &mut first_indexed_block,
368 &mut first_immutable,
369 &mut last_indexed_block,
370 &mut last_immutable,
371 &mut blocks_synced,
372 );
373 }
374 },
375 }
376 }
377
378 let result = params.done(
379 first_indexed_block,
380 first_immutable,
381 last_indexed_block,
382 last_immutable,
383 blocks_synced,
384 Ok(()),
385 );
386
387 info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");
388
389 result
390 })
391}
392
393fn update_block_state(
395 block: &MultiEraBlock,
396 first_indexed_block: &mut Option<Point>,
397 first_immutable: &mut bool,
398 last_indexed_block: &mut Option<Point>,
399 last_immutable: &mut bool,
400 blocks_synced: &mut u64,
401) {
402 *last_immutable = block.is_immutable();
403 *last_indexed_block = Some(block.point());
404
405 if first_indexed_block.is_none() {
406 *first_immutable = *last_immutable;
407 *first_indexed_block = Some(block.point());
408 }
409
410 *blocks_synced = blocks_synced.saturating_add(1);
411}
412
413struct SyncTask {
416 cfg: chain_follower::EnvVars,
418
419 sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
421
422 current_sync_tasks: u16,
424
425 start_slot: Slot,
427
428 immutable_tip_slot: Slot,
430
431 live_tip_slot: Slot,
433
434 sync_status: Vec<SyncStatus>,
436
437 pending_blocks: watch::Sender<BTreeSet<Slot>>,
444}
445
446impl SyncTask {
447 fn new(cfg: chain_follower::EnvVars) -> SyncTask {
449 Self {
450 cfg,
451 sync_tasks: FuturesUnordered::new(),
452 start_slot: 0.into(),
453 current_sync_tasks: 0,
454 immutable_tip_slot: 0.into(),
455 live_tip_slot: 0.into(),
456 sync_status: Vec::new(),
457 pending_blocks: watch::channel(BTreeSet::new()).0,
458 }
459 }
460
461 fn add_sync_task(
463 &mut self,
464 params: SyncParams,
465 ) {
466 self.pending_blocks.send_modify(|blocks| {
467 blocks.insert(params.end.slot_or_default());
468 });
469 self.sync_tasks
470 .push(sync_subchain(params, self.pending_blocks.subscribe()));
471 self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
472 debug!(current_sync_tasks=%self.current_sync_tasks, "Added new Sync Task");
473 metrics_updater::sync_tasks(self.current_sync_tasks);
474 }
475
476 fn sync_task_finished(&mut self) {
478 self.current_sync_tasks = self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
479 error!("current_sync_tasks -= 1 overflow");
480 0
481 });
482 debug!(current_sync_tasks=%self.current_sync_tasks, "Finished Sync Task");
483 metrics_updater::sync_tasks(self.current_sync_tasks);
484 }
485
486 #[allow(clippy::too_many_lines)]
495 async fn run(&mut self) {
496 let tips = ChainFollower::get_tips(self.cfg.chain).await;
499 self.immutable_tip_slot = tips.0.slot_or_default();
500 self.live_tip_slot = tips.1.slot_or_default();
501 info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Running the primary blockchain follower task.");
502
503 metrics_updater::current_tip_slot(self.live_tip_slot, self.immutable_tip_slot);
504
505 CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
511
512 info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state for indexing");
513 self.sync_status = get_sync_status().await;
514 debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
515
516 self.add_sync_task(SyncParams::new(
519 self.cfg.chain,
520 Point::fuzzy(self.immutable_tip_slot),
521 Point::TIP,
522 ));
523 metrics_updater::reached_live_tip(false);
524
525 self.start_immutable_followers();
526 if self.sync_tasks.len() == 1 {
529 set_follower_immutable_first_reached_tip();
530 }
531
532 metrics_updater::reached_immutable_tip(self.sync_tasks.len() == 1);
533
534 while let Some(completed) = self.sync_tasks.next().await {
540 self.sync_task_finished();
542
543 match completed {
544 Ok(finished) => {
545 let tips = ChainFollower::get_tips(self.cfg.chain).await;
546 let immutable_tip_slot = tips.0.slot_or_default();
547 let live_tip_slot = tips.1.slot_or_default();
548 info!(immutable_tip_slot=?immutable_tip_slot, live_tip_slot=?live_tip_slot, "Chain Indexer task finished");
549 if finished.end == Point::TIP {
557 if let Some(ref roll_forward_point) = finished.follower_roll_forward {
558 self.immutable_tip_slot = roll_forward_point.slot_or_default();
561
562 metrics_updater::current_tip_slot(
563 self.live_tip_slot,
564 self.immutable_tip_slot,
565 );
566
567 info!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished reaching TIP.");
568
569 self.start_immutable_followers();
570 metrics_updater::reached_immutable_tip(false);
571 } else {
572 error!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished without to reach TIP.");
573 }
574
575 self.add_sync_task(finished.retry());
577 } else if let Some(result) = finished.result.as_ref() {
578 match result {
579 Ok(()) => {
580 info!(chain=%self.cfg.chain, report=%finished,
581 "The Immutable follower completed successfully.");
582
583 finished.last_indexed_block.as_ref().inspect(|block| {
584 metrics_updater::highest_complete_indexed_slot(
585 block.slot_or_default(),
586 );
587 });
588
589 self.pending_blocks.send_modify(|blocks| {
590 if !blocks.remove(&finished.end.slot_or_default()) {
591 error!(chain=%self.cfg.chain, end=?finished.end,
592 "The immutable follower completed successfully, but its end point isn't present in index_sync_channel"
593 );
594 }
595 });
596
597 self.start_immutable_followers();
600 },
601 Err(error) => {
602 error!(chain=%self.cfg.chain, report=%finished, error=%error,
603 "An Immutable follower failed, restarting it.");
604 self.add_sync_task(finished.retry());
607 },
608 }
609 } else {
610 error!(chain=%self.cfg.chain, report=%finished,
611 "BUG: The Immutable follower completed, but without a proper result.");
612 }
613 },
614 Err(error) => {
615 error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information. Sync is probably failed at this point.");
616 },
617 }
618
619 if self.sync_tasks.len() == 1 {
628 set_follower_immutable_first_reached_tip();
629 metrics_updater::reached_immutable_tip(true);
630
631 CassandraSession::get(true).inspect(|session| {
634 session.caches().assets_ada().clear_cache();
635 session.caches().assets_native().clear_cache();
636 });
637
638 #[allow(clippy::arithmetic_side_effects)]
641 let purge_to_slot =
642 self.immutable_tip_slot - Settings::purge_backward_slot_buffer();
643 let purge_condition = PurgeCondition::PurgeBackwards(purge_to_slot);
644 info!(chain=%self.cfg.chain, purge_to_slot=?purge_to_slot, "Backwards purging volatile data.");
645 if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
646 error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
647 } else {
648 metrics_updater::backward_data_purge();
649 }
650 }
651 }
652
653 error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped. This is an unexpected error!");
654 }
655
656 fn start_immutable_followers(&mut self) {
658 if self.start_slot < self.immutable_tip_slot {
664 while self.current_sync_tasks < self.cfg.sync_tasks {
666 let end_slot = self.immutable_tip_slot.min(
667 (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
668 .into(),
669 );
670
671 if let Some((first_point, last_point)) =
672 self.get_syncable_range(self.start_slot, end_slot)
673 {
674 self.add_sync_task(SyncParams::new(
675 self.cfg.chain,
676 first_point,
677 last_point.clone(),
678 ));
679 }
680
681 self.start_slot = end_slot;
684
685 if end_slot == self.immutable_tip_slot {
686 break;
687 }
688 }
689 }
690 }
691
692 fn get_syncable_range(
697 &self,
698 start: Slot,
699 end: Slot,
700 ) -> Option<(Point, Point)> {
701 for sync_block in &self.sync_status {
702 if start >= sync_block.start_slot && start <= sync_block.end_slot {
704 if end <= sync_block.end_slot {
706 return None;
707 }
708
709 return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
716 }
717 }
718
719 let start_slot = if start == 0.into() {
720 Point::ORIGIN
721 } else {
722 Point::fuzzy(start)
723 };
724
725 Some((start_slot, Point::fuzzy(end)))
726 }
727}
728
729pub(crate) async fn start_followers() -> anyhow::Result<()> {
731 let cfg = Settings::follower_cfg();
732
733 cfg.log();
735
736 start_sync_for(&cfg).await?;
738
739 tokio::spawn(async move {
740 let mut sync_task = SyncTask::new(cfg);
741
742 sync_task.run().await;
743 });
744
745 Ok(())
746}