1use std::{collections::BTreeSet, fmt::Display, sync::Arc, time::Duration};
4
5use cardano_chain_follower::{ChainFollower, ChainSyncConfig, MultiEraBlock, Network, Point, Slot};
6use duration_string::DurationString;
7use futures::{stream::FuturesUnordered, StreamExt};
8use rand::{Rng, SeedableRng};
9use tokio::sync::watch;
10use tracing::{debug, error, info};
11
12use crate::{
13 db::index::{
14 block::{
15 index_block,
16 roll_forward::{self, PurgeCondition},
17 },
18 queries::sync_status::{
19 get::{get_sync_status, SyncStatus},
20 update::update_sync_status,
21 },
22 session::CassandraSession,
23 },
24 metrics::chain_indexer::metrics_updater,
25 service::utilities::health::{
26 set_follower_immutable_first_reached_tip, set_follower_live_first_reached_tip,
27 },
28 settings::{chain_follower, Settings},
29};
30
31pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
33
34async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
36 let chain = cfg.chain;
37 let dl_config = cfg.dl_config.clone();
38
39 let mut cfg = ChainSyncConfig::default_for(chain);
40 cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
41 info!(chain = %chain, "Starting Chain Sync Task");
42
43 if let Err(error) = cfg.run().await {
44 error!(chain=%chain, error=%error, "Failed to start Chain Sync Task");
45 Err(error)?;
46 }
47
48 Ok(())
49}
50
51#[derive(Clone)]
53struct SyncParams {
54 chain: Network,
56 start: Point,
58 end: Point,
60 first_indexed_block: Option<Point>,
62 first_is_immutable: bool,
64 last_indexed_block: Option<Point>,
66 last_is_immutable: bool,
68 total_blocks_synced: u64,
70 last_blocks_synced: u64,
72 retries: u64,
74 backoff_delay: Option<Duration>,
76 result: Arc<Option<anyhow::Result<()>>>,
78 follower_roll_forward: Option<Point>,
80}
81
82impl Display for SyncParams {
83 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84 if self.result.is_none() {
85 write!(f, "Sync_Params {{ ")?;
86 } else {
87 write!(f, "Sync_Result {{ ")?;
88 }
89
90 write!(f, "start: {}, end: {}", self.start, self.end)?;
91
92 if let Some(first) = self.first_indexed_block.as_ref() {
93 write!(
94 f,
95 ", first_indexed_block: {first}{}",
96 if self.first_is_immutable { ":I" } else { "" }
97 )?;
98 }
99
100 if let Some(last) = self.last_indexed_block.as_ref() {
101 write!(
102 f,
103 ", last_indexed_block: {last}{}",
104 if self.last_is_immutable { ":I" } else { "" }
105 )?;
106 }
107
108 if self.retries > 0 {
109 write!(f, ", retries: {}", self.retries)?;
110 }
111
112 if self.retries > 0 || self.result.is_some() {
113 write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
114 }
115
116 if self.result.is_some() {
117 write!(f, ", last_sync: {}", self.last_blocks_synced)?;
118 }
119
120 if let Some(backoff) = self.backoff_delay.as_ref() {
121 write!(f, ", backoff: {}", DurationString::from(*backoff))?;
122 }
123
124 if let Some(result) = self.result.as_ref() {
125 match result {
126 Ok(()) => write!(f, ", Success")?,
127 Err(error) => write!(f, ", {error}")?,
128 };
129 }
130
131 f.write_str(" }")
132 }
133}
134
135const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
137
138impl SyncParams {
139 fn new(chain: Network, start: Point, end: Point) -> Self {
141 Self {
142 chain,
143 start,
144 end,
145 first_indexed_block: None,
146 first_is_immutable: false,
147 last_indexed_block: None,
148 last_is_immutable: false,
149 total_blocks_synced: 0,
150 last_blocks_synced: 0,
151 retries: 0,
152 backoff_delay: None,
153 result: Arc::new(None),
154 follower_roll_forward: None,
155 }
156 }
157
158 fn retry(&self) -> Self {
160 let retry_count = self.retries.saturating_add(1);
161
162 let mut backoff = None;
163
164 if self.last_blocks_synced == 0 {
167 backoff = match retry_count {
169 1 => Some(Duration::from_secs(1)), 2..5 => Some(Duration::from_secs(10)), _ => Some(Duration::from_secs(30)), };
173 }
174
175 let mut retry = self.clone();
176 retry.last_blocks_synced = 0;
177 retry.retries = retry_count;
178 retry.backoff_delay = backoff;
179 retry.result = Arc::new(None);
180 retry.follower_roll_forward = None;
181
182 retry
183 }
184
185 pub(crate) fn done(
187 &self, first: Option<Point>, first_immutable: bool, last: Option<Point>,
188 last_immutable: bool, synced: u64, result: anyhow::Result<()>,
189 ) -> Self {
190 if result.is_ok() && self.end != Point::TIP {
191 update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
195 }
196 let mut done = self.clone();
197 done.first_indexed_block = first;
198 done.first_is_immutable = first_immutable;
199 done.last_indexed_block = last;
200 done.last_is_immutable = last_immutable;
201 done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
202 done.last_blocks_synced = synced;
203 done.result = Arc::new(Some(result));
204
205 done
206 }
207
208 fn actual_start(&self) -> Point {
210 self.last_indexed_block
211 .as_ref()
212 .unwrap_or(&self.start)
213 .clone()
214 }
215
216 async fn backoff(&self) {
222 if let Some(backoff) = self.backoff_delay {
223 let mut rng = rand::rngs::StdRng::from_entropy();
224 let actual_backoff =
225 rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
226
227 tokio::time::sleep(actual_backoff).await;
228 }
229 }
230}
231
232#[allow(clippy::too_many_lines)]
235fn sync_subchain(
236 params: SyncParams, mut pending_blocks: watch::Receiver<BTreeSet<Slot>>,
237) -> tokio::task::JoinHandle<SyncParams> {
238 tokio::spawn(async move {
239 params.backoff().await;
241
242 CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
244 info!(chain=%params.chain, params=%params,"Starting Chain Indexing Task");
245
246 let mut first_indexed_block = params.first_indexed_block.clone();
247 let mut first_immutable = params.first_is_immutable;
248 let mut last_indexed_block = params.last_indexed_block.clone();
249 let mut last_immutable = params.last_is_immutable;
250 let mut blocks_synced = 0u64;
251
252 let mut follower =
253 ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
254 while let Some(chain_update) = follower.next().await {
255 let tips = ChainFollower::get_tips(params.chain).await;
256 let immutable_slot = tips.0.slot_or_default();
257 let live_slot = tips.1.slot_or_default();
258 metrics_updater::current_tip_slot(live_slot, immutable_slot);
259 match chain_update.kind {
260 cardano_chain_follower::Kind::ImmutableBlockRollForward => {
261 if params.end == Point::TIP {
263 info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
267 let mut result = params.done(
268 first_indexed_block,
269 first_immutable,
270 last_indexed_block,
271 last_immutable,
272 blocks_synced,
273 Ok(()),
274 );
275 result.follower_roll_forward = Some(chain_update.block_data().point());
278 return result;
279 }
280 },
281 cardano_chain_follower::Kind::Block => {
282 let block = chain_update.block_data();
283
284 if let Err(error) =
285 index_block(block, &mut pending_blocks, params.end.slot_or_default()).await
286 {
287 let error_msg = format!("Failed to index block {}", block.point());
288 error!(chain=%params.chain, error=%error, params=%params, error_msg);
289 return params.done(
290 first_indexed_block,
291 first_immutable,
292 last_indexed_block,
293 last_immutable,
294 blocks_synced,
295 Err(error.context(error_msg)),
296 );
297 }
298
299 if chain_update.tip && !set_follower_live_first_reached_tip() {
300 metrics_updater::reached_live_tip(true);
301 }
302
303 update_block_state(
304 block,
305 &mut first_indexed_block,
306 &mut first_immutable,
307 &mut last_indexed_block,
308 &mut last_immutable,
309 &mut blocks_synced,
310 );
311 },
312
313 cardano_chain_follower::Kind::Rollback => {
314 let rollback_slot = chain_update.block_data().slot();
316
317 let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
318 if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
319 error!(chain=%params.chain, error=%error,
320 "Chain follower rollback, purging volatile data task failed."
321 );
322 } else {
323 #[allow(clippy::arithmetic_side_effects)]
325 let purge_slots = params
326 .last_indexed_block
327 .as_ref()
328 .map_or(0.into(), |l| l.slot_or_default() - rollback_slot);
330
331 metrics_updater::forward_data_purge(purge_slots.into());
332
333 let block = chain_update.block_data();
335 if let Err(error) =
336 index_block(block, &mut pending_blocks, params.end.slot_or_default())
337 .await
338 {
339 let error_msg =
340 format!("Failed to index block after rollback {}", block.point());
341 error!(chain=%params.chain, error=%error, params=%params, error_msg);
342 return params.done(
343 first_indexed_block,
344 first_immutable,
345 last_indexed_block,
346 last_immutable,
347 blocks_synced,
348 Err(error.context(error_msg)),
349 );
350 }
351
352 update_block_state(
353 block,
354 &mut first_indexed_block,
355 &mut first_immutable,
356 &mut last_indexed_block,
357 &mut last_immutable,
358 &mut blocks_synced,
359 );
360 }
361 },
362 }
363 }
364
365 let result = params.done(
366 first_indexed_block,
367 first_immutable,
368 last_indexed_block,
369 last_immutable,
370 blocks_synced,
371 Ok(()),
372 );
373
374 info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");
375
376 result
377 })
378}
379
380fn update_block_state(
382 block: &MultiEraBlock, first_indexed_block: &mut Option<Point>, first_immutable: &mut bool,
383 last_indexed_block: &mut Option<Point>, last_immutable: &mut bool, blocks_synced: &mut u64,
384) {
385 *last_immutable = block.is_immutable();
386 *last_indexed_block = Some(block.point());
387
388 if first_indexed_block.is_none() {
389 *first_immutable = *last_immutable;
390 *first_indexed_block = Some(block.point());
391 }
392
393 *blocks_synced = blocks_synced.saturating_add(1);
394}
395
396struct SyncTask {
399 cfg: chain_follower::EnvVars,
401
402 sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
404
405 current_sync_tasks: u16,
407
408 start_slot: Slot,
410
411 immutable_tip_slot: Slot,
413
414 live_tip_slot: Slot,
416
417 sync_status: Vec<SyncStatus>,
419
420 pending_blocks: watch::Sender<BTreeSet<Slot>>,
427}
428
429impl SyncTask {
430 fn new(cfg: chain_follower::EnvVars) -> SyncTask {
432 Self {
433 cfg,
434 sync_tasks: FuturesUnordered::new(),
435 start_slot: 0.into(),
436 current_sync_tasks: 0,
437 immutable_tip_slot: 0.into(),
438 live_tip_slot: 0.into(),
439 sync_status: Vec::new(),
440 pending_blocks: watch::channel(BTreeSet::new()).0,
441 }
442 }
443
444 fn add_sync_task(&mut self, params: SyncParams) {
446 self.pending_blocks.send_modify(|blocks| {
447 blocks.insert(params.end.slot_or_default());
448 });
449 self.sync_tasks
450 .push(sync_subchain(params, self.pending_blocks.subscribe()));
451 self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
452 debug!(current_sync_tasks=%self.current_sync_tasks, "Added new Sync Task");
453 metrics_updater::sync_tasks(self.current_sync_tasks);
454 }
455
456 fn sync_task_finished(&mut self) {
458 self.current_sync_tasks = self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
459 error!("current_sync_tasks -= 1 overflow");
460 0
461 });
462 debug!(current_sync_tasks=%self.current_sync_tasks, "Finished Sync Task");
463 metrics_updater::sync_tasks(self.current_sync_tasks);
464 }
465
466 #[allow(clippy::too_many_lines)]
475 async fn run(&mut self) {
476 let tips = ChainFollower::get_tips(self.cfg.chain).await;
479 self.immutable_tip_slot = tips.0.slot_or_default();
480 self.live_tip_slot = tips.1.slot_or_default();
481 info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Running the primary blockchain follower task.");
482
483 metrics_updater::current_tip_slot(self.live_tip_slot, self.immutable_tip_slot);
484
485 CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
491
492 info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state for indexing");
493 self.sync_status = get_sync_status().await;
494 debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
495
496 self.add_sync_task(SyncParams::new(
499 self.cfg.chain,
500 Point::fuzzy(self.immutable_tip_slot),
501 Point::TIP,
502 ));
503 metrics_updater::reached_live_tip(false);
504
505 self.start_immutable_followers();
506 if self.sync_tasks.len() == 1 {
509 set_follower_immutable_first_reached_tip();
510 }
511
512 metrics_updater::reached_immutable_tip(self.sync_tasks.len() == 1);
513
514 while let Some(completed) = self.sync_tasks.next().await {
520 self.sync_task_finished();
522
523 match completed {
524 Ok(finished) => {
525 let tips = ChainFollower::get_tips(self.cfg.chain).await;
526 let immutable_tip_slot = tips.0.slot_or_default();
527 let live_tip_slot = tips.1.slot_or_default();
528 info!(immutable_tip_slot=?immutable_tip_slot, live_tip_slot=?live_tip_slot, "Chain Indexer task finished");
529 if finished.end == Point::TIP {
537 if let Some(ref roll_forward_point) = finished.follower_roll_forward {
538 self.immutable_tip_slot = roll_forward_point.slot_or_default();
541
542 metrics_updater::current_tip_slot(
543 self.live_tip_slot,
544 self.immutable_tip_slot,
545 );
546
547 info!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished reaching TIP.");
548
549 self.start_immutable_followers();
550 metrics_updater::reached_immutable_tip(false);
551 } else {
552 error!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished without to reach TIP.");
553 }
554
555 self.add_sync_task(finished.retry());
557 } else if let Some(result) = finished.result.as_ref() {
558 match result {
559 Ok(()) => {
560 info!(chain=%self.cfg.chain, report=%finished,
561 "The Immutable follower completed successfully.");
562
563 finished.last_indexed_block.as_ref().inspect(|block| {
564 metrics_updater::highest_complete_indexed_slot(
565 block.slot_or_default(),
566 );
567 });
568
569 self.pending_blocks.send_modify(|blocks| {
570 if !blocks.remove(&finished.end.slot_or_default()) {
571 error!(chain=%self.cfg.chain, end=?finished.end,
572 "The immutable follower completed successfully, but its end point isn't present in index_sync_channel"
573 );
574 }
575 });
576
577 self.start_immutable_followers();
580 },
581 Err(error) => {
582 error!(chain=%self.cfg.chain, report=%finished, error=%error,
583 "An Immutable follower failed, restarting it.");
584 self.add_sync_task(finished.retry());
587 },
588 }
589 } else {
590 error!(chain=%self.cfg.chain, report=%finished,
591 "BUG: The Immutable follower completed, but without a proper result.");
592 }
593 },
594 Err(error) => {
595 error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information. Sync is probably failed at this point.");
596 },
597 }
598
599 if self.sync_tasks.len() == 1 {
608 set_follower_immutable_first_reached_tip();
609 metrics_updater::reached_immutable_tip(true);
610
611 CassandraSession::get(true).inspect(|session| {
614 session.caches().assets_ada().clear_cache();
615 session.caches().assets_native().clear_cache();
616 });
617
618 #[allow(clippy::arithmetic_side_effects)]
621 let purge_to_slot =
622 self.immutable_tip_slot - Settings::purge_backward_slot_buffer();
623 let purge_condition = PurgeCondition::PurgeBackwards(purge_to_slot);
624 info!(chain=%self.cfg.chain, purge_to_slot=?purge_to_slot, "Backwards purging volatile data.");
625 if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
626 error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
627 } else {
628 metrics_updater::backward_data_purge();
629 }
630 }
631 }
632
633 error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped. This is an unexpected error!");
634 }
635
636 fn start_immutable_followers(&mut self) {
638 if self.start_slot < self.immutable_tip_slot {
644 while self.current_sync_tasks < self.cfg.sync_tasks {
646 let end_slot = self.immutable_tip_slot.min(
647 (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
648 .into(),
649 );
650
651 if let Some((first_point, last_point)) =
652 self.get_syncable_range(self.start_slot, end_slot)
653 {
654 self.add_sync_task(SyncParams::new(
655 self.cfg.chain,
656 first_point,
657 last_point.clone(),
658 ));
659 }
660
661 self.start_slot = end_slot;
664
665 if end_slot == self.immutable_tip_slot {
666 break;
667 }
668 }
669 }
670 }
671
672 fn get_syncable_range(&self, start: Slot, end: Slot) -> Option<(Point, Point)> {
677 for sync_block in &self.sync_status {
678 if start >= sync_block.start_slot && start <= sync_block.end_slot {
680 if end <= sync_block.end_slot {
682 return None;
683 }
684
685 return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
692 }
693 }
694
695 let start_slot = if start == 0.into() {
696 Point::ORIGIN
697 } else {
698 Point::fuzzy(start)
699 };
700
701 Some((start_slot, Point::fuzzy(end)))
702 }
703}
704
705pub(crate) async fn start_followers() -> anyhow::Result<()> {
707 let cfg = Settings::follower_cfg();
708
709 cfg.log();
711
712 start_sync_for(&cfg).await?;
714
715 tokio::spawn(async move {
716 let mut sync_task = SyncTask::new(cfg);
717
718 sync_task.run().await;
719 });
720
721 Ok(())
722}