1use std::{collections::BTreeSet, fmt::Display, sync::Arc, time::Duration};
4
5use cardano_chain_follower::{ChainFollower, MultiEraBlock, Network, Point, Slot};
6use duration_string::DurationString;
7use futures::{StreamExt, stream::FuturesUnordered};
8use rand::{Rng, SeedableRng};
9use tokio::sync::watch;
10use tracing::{debug, error, info};
11
12use crate::{
13 db::index::{
14 block::{
15 index_block,
16 roll_forward::{self, PurgeCondition},
17 },
18 queries::sync_status::{
19 get::{SyncStatus, get_sync_status},
20 update::update_sync_status,
21 },
22 session::CassandraSession,
23 },
24 metrics::chain_indexer::metrics_updater,
25 service::utilities::health::{
26 set_follower_immutable_first_reached_tip, set_follower_live_first_reached_tip,
27 },
28 settings::{Settings, chain_follower},
29};
30
31pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
33
34async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
36 info!(chain = %cfg.chain(), "Starting Chain Sync Task");
37
38 if let Err(error) = cfg.cfg.clone().run().await {
39 error!(chain=%cfg.chain(), error=%error, "Failed to start Chain Sync Task");
40 Err(error)?;
41 }
42
43 Ok(())
44}
45
46#[derive(Clone)]
48struct SyncParams {
49 chain: Network,
51 start: Point,
53 end: Point,
55 first_indexed_block: Option<Point>,
57 first_is_immutable: bool,
59 last_indexed_block: Option<Point>,
61 last_is_immutable: bool,
63 total_blocks_synced: u64,
65 last_blocks_synced: u64,
67 retries: u64,
69 backoff_delay: Option<Duration>,
71 result: Arc<Option<anyhow::Result<()>>>,
73 follower_roll_forward: Option<Point>,
75}
76
77impl Display for SyncParams {
78 fn fmt(
79 &self,
80 f: &mut std::fmt::Formatter<'_>,
81 ) -> std::fmt::Result {
82 if self.result.is_none() {
83 write!(f, "Sync_Params {{ ")?;
84 } else {
85 write!(f, "Sync_Result {{ ")?;
86 }
87
88 write!(f, "start: {}, end: {}", self.start, self.end)?;
89
90 if let Some(first) = self.first_indexed_block.as_ref() {
91 write!(
92 f,
93 ", first_indexed_block: {first}{}",
94 if self.first_is_immutable { ":I" } else { "" }
95 )?;
96 }
97
98 if let Some(last) = self.last_indexed_block.as_ref() {
99 write!(
100 f,
101 ", last_indexed_block: {last}{}",
102 if self.last_is_immutable { ":I" } else { "" }
103 )?;
104 }
105
106 if self.retries > 0 {
107 write!(f, ", retries: {}", self.retries)?;
108 }
109
110 if self.retries > 0 || self.result.is_some() {
111 write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
112 }
113
114 if self.result.is_some() {
115 write!(f, ", last_sync: {}", self.last_blocks_synced)?;
116 }
117
118 if let Some(backoff) = self.backoff_delay.as_ref() {
119 write!(f, ", backoff: {}", DurationString::from(*backoff))?;
120 }
121
122 if let Some(result) = self.result.as_ref() {
123 match result {
124 Ok(()) => write!(f, ", Success")?,
125 Err(error) => write!(f, ", {error}")?,
126 }
127 }
128
129 f.write_str(" }")
130 }
131}
132
133const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
135
136impl SyncParams {
137 fn new(
139 chain: Network,
140 start: Point,
141 end: Point,
142 ) -> Self {
143 Self {
144 chain,
145 start,
146 end,
147 first_indexed_block: None,
148 first_is_immutable: false,
149 last_indexed_block: None,
150 last_is_immutable: false,
151 total_blocks_synced: 0,
152 last_blocks_synced: 0,
153 retries: 0,
154 backoff_delay: None,
155 result: Arc::new(None),
156 follower_roll_forward: None,
157 }
158 }
159
160 fn retry(&self) -> Self {
162 let retry_count = self.retries.saturating_add(1);
163
164 let mut backoff = None;
165
166 if self.last_blocks_synced == 0 {
169 backoff = match retry_count {
171 1 => Some(Duration::from_secs(1)), 2..5 => Some(Duration::from_secs(10)), _ => Some(Duration::from_secs(30)), };
175 }
176
177 let mut retry = self.clone();
178 retry.last_blocks_synced = 0;
179 retry.retries = retry_count;
180 retry.backoff_delay = backoff;
181 retry.result = Arc::new(None);
182 retry.follower_roll_forward = None;
183
184 retry
185 }
186
187 pub(crate) fn done(
189 &self,
190 first: Option<Point>,
191 first_immutable: bool,
192 last: Option<Point>,
193 last_immutable: bool,
194 synced: u64,
195 result: anyhow::Result<()>,
196 ) -> Self {
197 if result.is_ok() && self.end != Point::TIP {
198 update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
202 }
203 let mut done = self.clone();
204 done.first_indexed_block = first;
205 done.first_is_immutable = first_immutable;
206 done.last_indexed_block = last;
207 done.last_is_immutable = last_immutable;
208 done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
209 done.last_blocks_synced = synced;
210 done.result = Arc::new(Some(result));
211
212 done
213 }
214
215 fn actual_start(&self) -> Point {
217 self.last_indexed_block
218 .as_ref()
219 .unwrap_or(&self.start)
220 .clone()
221 }
222
223 async fn backoff(&self) {
229 if let Some(backoff) = self.backoff_delay {
230 let mut rng = rand::rngs::StdRng::from_entropy();
231 let actual_backoff =
232 rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
233
234 tokio::time::sleep(actual_backoff).await;
235 }
236 }
237}
238
239#[allow(clippy::too_many_lines)]
242fn sync_subchain(
243 params: SyncParams,
244 mut pending_blocks: watch::Receiver<BTreeSet<Slot>>,
245) -> tokio::task::JoinHandle<SyncParams> {
246 tokio::spawn(async move {
247 params.backoff().await;
249
250 CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
252 info!(chain=%params.chain, params=%params,"Starting Chain Indexing Task");
253
254 let mut first_indexed_block = params.first_indexed_block.clone();
255 let mut first_immutable = params.first_is_immutable;
256 let mut last_indexed_block = params.last_indexed_block.clone();
257 let mut last_immutable = params.last_is_immutable;
258 let mut blocks_synced = 0u64;
259
260 let mut follower =
261 ChainFollower::new(¶ms.chain, params.actual_start(), params.end.clone()).await;
262 while let Some(chain_update) = follower.next().await {
263 let tips = ChainFollower::get_tips(¶ms.chain).await;
264 let immutable_slot = tips.0.slot_or_default();
265 let live_slot = tips.1.slot_or_default();
266 metrics_updater::current_tip_slot(live_slot, immutable_slot);
267 match chain_update.kind {
268 cardano_chain_follower::Kind::ImmutableBlockRollForward => {
269 if params.end == Point::TIP {
271 info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
275 let mut result = params.done(
276 first_indexed_block,
277 first_immutable,
278 last_indexed_block,
279 last_immutable,
280 blocks_synced,
281 Ok(()),
282 );
283 result.follower_roll_forward = Some(chain_update.block_data().point());
286 return result;
287 }
288 },
289 cardano_chain_follower::Kind::Block => {
290 let block = chain_update.block_data();
291
292 if let Err(error) =
293 index_block(block, &mut pending_blocks, params.end.slot_or_default()).await
294 {
295 let error_msg = format!("Failed to index block {}", block.point());
296 error!(chain=%params.chain, error=%error, params=%params, error_msg);
297 return params.done(
298 first_indexed_block,
299 first_immutable,
300 last_indexed_block,
301 last_immutable,
302 blocks_synced,
303 Err(error.context(error_msg)),
304 );
305 }
306
307 if chain_update.tip && !set_follower_live_first_reached_tip() {
308 metrics_updater::reached_live_tip(true);
309 }
310
311 update_block_state(
312 block,
313 &mut first_indexed_block,
314 &mut first_immutable,
315 &mut last_indexed_block,
316 &mut last_immutable,
317 &mut blocks_synced,
318 );
319 },
320
321 cardano_chain_follower::Kind::Rollback => {
322 let rollback_slot = chain_update.block_data().slot();
324
325 let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
326 if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
327 error!(chain=%params.chain, error=%error,
328 "Chain follower rollback, purging volatile data task failed."
329 );
330 } else {
331 #[allow(clippy::arithmetic_side_effects)]
333 let purge_slots = params
334 .last_indexed_block
335 .as_ref()
336 .map_or(0.into(), |l| l.slot_or_default() - rollback_slot);
338
339 metrics_updater::forward_data_purge(purge_slots.into());
340
341 let block = chain_update.block_data();
343 if let Err(error) =
344 index_block(block, &mut pending_blocks, params.end.slot_or_default())
345 .await
346 {
347 let error_msg =
348 format!("Failed to index block after rollback {}", block.point());
349 error!(chain=%params.chain, error=%error, params=%params, error_msg);
350 return params.done(
351 first_indexed_block,
352 first_immutable,
353 last_indexed_block,
354 last_immutable,
355 blocks_synced,
356 Err(error.context(error_msg)),
357 );
358 }
359
360 update_block_state(
361 block,
362 &mut first_indexed_block,
363 &mut first_immutable,
364 &mut last_indexed_block,
365 &mut last_immutable,
366 &mut blocks_synced,
367 );
368 }
369 },
370 }
371 }
372
373 let result = params.done(
374 first_indexed_block,
375 first_immutable,
376 last_indexed_block,
377 last_immutable,
378 blocks_synced,
379 Ok(()),
380 );
381
382 info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");
383
384 result
385 })
386}
387
388fn update_block_state(
390 block: &MultiEraBlock,
391 first_indexed_block: &mut Option<Point>,
392 first_immutable: &mut bool,
393 last_indexed_block: &mut Option<Point>,
394 last_immutable: &mut bool,
395 blocks_synced: &mut u64,
396) {
397 *last_immutable = block.is_immutable();
398 *last_indexed_block = Some(block.point());
399
400 if first_indexed_block.is_none() {
401 *first_immutable = *last_immutable;
402 *first_indexed_block = Some(block.point());
403 }
404
405 *blocks_synced = blocks_synced.saturating_add(1);
406}
407
408struct SyncTask {
411 cfg: chain_follower::EnvVars,
413
414 sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
416
417 current_sync_tasks: u16,
419
420 start_slot: Slot,
422
423 immutable_tip_slot: Slot,
425
426 live_tip_slot: Slot,
428
429 sync_status: Vec<SyncStatus>,
431
432 pending_blocks: watch::Sender<BTreeSet<Slot>>,
439}
440
441impl SyncTask {
442 fn new(cfg: chain_follower::EnvVars) -> SyncTask {
444 Self {
445 cfg,
446 sync_tasks: FuturesUnordered::new(),
447 start_slot: 0.into(),
448 current_sync_tasks: 0,
449 immutable_tip_slot: 0.into(),
450 live_tip_slot: 0.into(),
451 sync_status: Vec::new(),
452 pending_blocks: watch::channel(BTreeSet::new()).0,
453 }
454 }
455
456 fn add_sync_task(
458 &mut self,
459 params: SyncParams,
460 ) {
461 self.pending_blocks.send_modify(|blocks| {
462 blocks.insert(params.end.slot_or_default());
463 });
464 self.sync_tasks
465 .push(sync_subchain(params, self.pending_blocks.subscribe()));
466 self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
467 debug!(current_sync_tasks=%self.current_sync_tasks, "Added new Sync Task");
468 metrics_updater::sync_tasks(self.current_sync_tasks);
469 }
470
471 fn sync_task_finished(&mut self) {
473 self.current_sync_tasks = self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
474 error!("current_sync_tasks -= 1 overflow");
475 0
476 });
477 debug!(current_sync_tasks=%self.current_sync_tasks, "Finished Sync Task");
478 metrics_updater::sync_tasks(self.current_sync_tasks);
479 }
480
481 #[allow(clippy::too_many_lines)]
490 async fn run(&mut self) {
491 let tips = ChainFollower::get_tips(self.cfg.chain()).await;
494 self.immutable_tip_slot = tips.0.slot_or_default();
495 self.live_tip_slot = tips.1.slot_or_default();
496 info!(chain=%self.cfg.chain(), immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Running the primary blockchain follower task.");
497
498 metrics_updater::current_tip_slot(self.live_tip_slot, self.immutable_tip_slot);
499
500 CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
506
507 info!(chain=%self.cfg.chain(), "Indexing DB is ready - Getting recovery state for indexing");
508 self.sync_status = get_sync_status().await;
509 debug!(chain=%self.cfg.chain(), "Sync Status: {:?}", self.sync_status);
510
511 self.add_sync_task(SyncParams::new(
514 self.cfg.chain().clone(),
515 Point::fuzzy(self.immutable_tip_slot),
516 Point::TIP,
517 ));
518 metrics_updater::reached_live_tip(false);
519
520 self.start_immutable_followers();
521 if self.sync_tasks.len() == 1 {
524 set_follower_immutable_first_reached_tip();
525 }
526
527 metrics_updater::reached_immutable_tip(self.sync_tasks.len() == 1);
528
529 while let Some(completed) = self.sync_tasks.next().await {
535 self.sync_task_finished();
537
538 match completed {
539 Ok(finished) => {
540 let tips = ChainFollower::get_tips(self.cfg.chain()).await;
541 let immutable_tip_slot = tips.0.slot_or_default();
542 let live_tip_slot = tips.1.slot_or_default();
543 info!(immutable_tip_slot=?immutable_tip_slot, live_tip_slot=?live_tip_slot, "Chain Indexer task finished");
544 if finished.end == Point::TIP {
552 if let Some(ref roll_forward_point) = finished.follower_roll_forward {
553 self.immutable_tip_slot = roll_forward_point.slot_or_default();
556
557 metrics_updater::current_tip_slot(
558 self.live_tip_slot,
559 self.immutable_tip_slot,
560 );
561
562 info!(chain=%self.cfg.chain(), report=%finished, "Chain Indexer finished reaching TIP.");
563
564 self.start_immutable_followers();
565 metrics_updater::reached_immutable_tip(false);
566 } else {
567 error!(chain=%self.cfg.chain(), report=%finished, "Chain Indexer finished without to reach TIP.");
568 }
569
570 self.add_sync_task(finished.retry());
572 } else if let Some(result) = finished.result.as_ref() {
573 match result {
574 Ok(()) => {
575 info!(chain=%self.cfg.chain(), report=%finished,
576 "The Immutable follower completed successfully.");
577
578 finished.last_indexed_block.as_ref().inspect(|block| {
579 metrics_updater::highest_complete_indexed_slot(
580 block.slot_or_default(),
581 );
582 });
583
584 self.pending_blocks.send_modify(|blocks| {
585 if !blocks.remove(&finished.end.slot_or_default()) {
586 error!(chain=%self.cfg.chain(), end=?finished.end,
587 "The immutable follower completed successfully, but its end point isn't present in index_sync_channel"
588 );
589 }
590 });
591
592 self.start_immutable_followers();
595 },
596 Err(error) => {
597 error!(chain=%self.cfg.chain(), report=%finished, error=%error,
598 "An Immutable follower failed, restarting it.");
599 self.add_sync_task(finished.retry());
602 },
603 }
604 } else {
605 error!(chain=%self.cfg.chain(), report=%finished,
606 "BUG: The Immutable follower completed, but without a proper result.");
607 }
608 },
609 Err(error) => {
610 error!(chain=%self.cfg.chain(), error=%error, "BUG: Sync task failed. Can not restart it, not enough information. Sync is probably failed at this point.");
611 },
612 }
613
614 if self.sync_tasks.len() == 1 {
623 set_follower_immutable_first_reached_tip();
624 metrics_updater::reached_immutable_tip(true);
625
626 CassandraSession::get(true).inspect(|session| {
629 session.caches().assets_ada().clear_cache();
630 session.caches().assets_native().clear_cache();
631 });
632
633 #[allow(clippy::arithmetic_side_effects)]
636 let purge_to_slot =
637 self.immutable_tip_slot - Settings::purge_backward_slot_buffer();
638 let purge_condition = PurgeCondition::PurgeBackwards(purge_to_slot);
639 info!(chain=%self.cfg.chain(), purge_to_slot=?purge_to_slot, "Backwards purging volatile data.");
640 if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
641 error!(chain=%self.cfg.chain(), error=%error, "BUG: Purging volatile data task failed.");
642 } else {
643 metrics_updater::backward_data_purge();
644 }
645 }
646 }
647
648 error!(chain=%self.cfg.chain(),"BUG: Sync tasks have all stopped. This is an unexpected error!");
649 }
650
651 fn start_immutable_followers(&mut self) {
653 if self.start_slot < self.immutable_tip_slot {
659 while self.current_sync_tasks < self.cfg.sync_tasks {
661 let end_slot = self.immutable_tip_slot.min(
662 (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
663 .into(),
664 );
665
666 if let Some((first_point, last_point)) =
667 self.get_syncable_range(self.start_slot, end_slot)
668 {
669 self.add_sync_task(SyncParams::new(
670 self.cfg.chain().clone(),
671 first_point,
672 last_point.clone(),
673 ));
674 }
675
676 self.start_slot = end_slot;
679
680 if end_slot == self.immutable_tip_slot {
681 break;
682 }
683 }
684 }
685 }
686
687 fn get_syncable_range(
692 &self,
693 start: Slot,
694 end: Slot,
695 ) -> Option<(Point, Point)> {
696 for sync_block in &self.sync_status {
697 if start >= sync_block.start_slot && start <= sync_block.end_slot {
699 if end <= sync_block.end_slot {
701 return None;
702 }
703
704 return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
711 }
712 }
713
714 let start_slot = if start == 0.into() {
715 Point::ORIGIN
716 } else {
717 Point::fuzzy(start)
718 };
719
720 Some((start_slot, Point::fuzzy(end)))
721 }
722}
723
724pub(crate) async fn start_followers() -> anyhow::Result<()> {
726 let cfg = Settings::follower_cfg();
727
728 cfg.log();
730
731 start_sync_for(&cfg).await?;
733
734 tokio::spawn(async move {
735 let mut sync_task = SyncTask::new(cfg);
736
737 sync_task.run().await;
738 });
739
740 Ok(())
741}