1use std::{collections::BTreeSet, fmt::Display, sync::Arc, time::Duration};
4
5use cardano_chain_follower::{ChainFollower, ChainSyncConfig, MultiEraBlock, Network, Point, Slot};
6use duration_string::DurationString;
7use futures::{stream::FuturesUnordered, StreamExt};
8use rand::{Rng, SeedableRng};
9use tokio::sync::watch;
10use tracing::{debug, error, info};
11
12use crate::{
13    db::index::{
14        block::{
15            index_block,
16            roll_forward::{self, PurgeCondition},
17        },
18        queries::sync_status::{
19            get::{get_sync_status, SyncStatus},
20            update::update_sync_status,
21        },
22        session::CassandraSession,
23    },
24    metrics::chain_indexer::metrics_updater,
25    service::utilities::health::{
26        set_follower_immutable_first_reached_tip, set_follower_live_first_reached_tip,
27    },
28    settings::{chain_follower, Settings},
29};
30
31pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
33
34async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
36    let chain = cfg.chain;
37    let dl_config = cfg.dl_config.clone();
38
39    let mut cfg = ChainSyncConfig::default_for(chain);
40    cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
41    info!(chain = %chain, "Starting Chain Sync Task");
42
43    if let Err(error) = cfg.run().await {
44        error!(chain=%chain, error=%error, "Failed to start Chain Sync Task");
45        Err(error)?;
46    }
47
48    Ok(())
49}
50
51#[derive(Clone)]
53struct SyncParams {
54    chain: Network,
56    start: Point,
58    end: Point,
60    first_indexed_block: Option<Point>,
62    first_is_immutable: bool,
64    last_indexed_block: Option<Point>,
66    last_is_immutable: bool,
68    total_blocks_synced: u64,
70    last_blocks_synced: u64,
72    retries: u64,
74    backoff_delay: Option<Duration>,
76    result: Arc<Option<anyhow::Result<()>>>,
78    follower_roll_forward: Option<Point>,
80}
81
82impl Display for SyncParams {
83    fn fmt(
84        &self,
85        f: &mut std::fmt::Formatter<'_>,
86    ) -> std::fmt::Result {
87        if self.result.is_none() {
88            write!(f, "Sync_Params {{ ")?;
89        } else {
90            write!(f, "Sync_Result {{ ")?;
91        }
92
93        write!(f, "start: {}, end: {}", self.start, self.end)?;
94
95        if let Some(first) = self.first_indexed_block.as_ref() {
96            write!(
97                f,
98                ", first_indexed_block: {first}{}",
99                if self.first_is_immutable { ":I" } else { "" }
100            )?;
101        }
102
103        if let Some(last) = self.last_indexed_block.as_ref() {
104            write!(
105                f,
106                ", last_indexed_block: {last}{}",
107                if self.last_is_immutable { ":I" } else { "" }
108            )?;
109        }
110
111        if self.retries > 0 {
112            write!(f, ", retries: {}", self.retries)?;
113        }
114
115        if self.retries > 0 || self.result.is_some() {
116            write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
117        }
118
119        if self.result.is_some() {
120            write!(f, ", last_sync: {}", self.last_blocks_synced)?;
121        }
122
123        if let Some(backoff) = self.backoff_delay.as_ref() {
124            write!(f, ", backoff: {}", DurationString::from(*backoff))?;
125        }
126
127        if let Some(result) = self.result.as_ref() {
128            match result {
129                Ok(()) => write!(f, ", Success")?,
130                Err(error) => write!(f, ", {error}")?,
131            }
132        }
133
134        f.write_str(" }")
135    }
136}
137
138const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
140
141impl SyncParams {
142    fn new(
144        chain: Network,
145        start: Point,
146        end: Point,
147    ) -> Self {
148        Self {
149            chain,
150            start,
151            end,
152            first_indexed_block: None,
153            first_is_immutable: false,
154            last_indexed_block: None,
155            last_is_immutable: false,
156            total_blocks_synced: 0,
157            last_blocks_synced: 0,
158            retries: 0,
159            backoff_delay: None,
160            result: Arc::new(None),
161            follower_roll_forward: None,
162        }
163    }
164
165    fn retry(&self) -> Self {
167        let retry_count = self.retries.saturating_add(1);
168
169        let mut backoff = None;
170
171        if self.last_blocks_synced == 0 {
174            backoff = match retry_count {
176                1 => Some(Duration::from_secs(1)),     2..5 => Some(Duration::from_secs(10)), _ => Some(Duration::from_secs(30)),    };
180        }
181
182        let mut retry = self.clone();
183        retry.last_blocks_synced = 0;
184        retry.retries = retry_count;
185        retry.backoff_delay = backoff;
186        retry.result = Arc::new(None);
187        retry.follower_roll_forward = None;
188
189        retry
190    }
191
192    pub(crate) fn done(
194        &self,
195        first: Option<Point>,
196        first_immutable: bool,
197        last: Option<Point>,
198        last_immutable: bool,
199        synced: u64,
200        result: anyhow::Result<()>,
201    ) -> Self {
202        if result.is_ok() && self.end != Point::TIP {
203            update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
207        }
208        let mut done = self.clone();
209        done.first_indexed_block = first;
210        done.first_is_immutable = first_immutable;
211        done.last_indexed_block = last;
212        done.last_is_immutable = last_immutable;
213        done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
214        done.last_blocks_synced = synced;
215        done.result = Arc::new(Some(result));
216
217        done
218    }
219
220    fn actual_start(&self) -> Point {
222        self.last_indexed_block
223            .as_ref()
224            .unwrap_or(&self.start)
225            .clone()
226    }
227
228    async fn backoff(&self) {
234        if let Some(backoff) = self.backoff_delay {
235            let mut rng = rand::rngs::StdRng::from_entropy();
236            let actual_backoff =
237                rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
238
239            tokio::time::sleep(actual_backoff).await;
240        }
241    }
242}
243
244#[allow(clippy::too_many_lines)]
247fn sync_subchain(
248    params: SyncParams,
249    mut pending_blocks: watch::Receiver<BTreeSet<Slot>>,
250) -> tokio::task::JoinHandle<SyncParams> {
251    tokio::spawn(async move {
252        params.backoff().await;
254
255        CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
257        info!(chain=%params.chain, params=%params,"Starting Chain Indexing Task");
258
259        let mut first_indexed_block = params.first_indexed_block.clone();
260        let mut first_immutable = params.first_is_immutable;
261        let mut last_indexed_block = params.last_indexed_block.clone();
262        let mut last_immutable = params.last_is_immutable;
263        let mut blocks_synced = 0u64;
264
265        let mut follower =
266            ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
267        while let Some(chain_update) = follower.next().await {
268            let tips = ChainFollower::get_tips(params.chain).await;
269            let immutable_slot = tips.0.slot_or_default();
270            let live_slot = tips.1.slot_or_default();
271            metrics_updater::current_tip_slot(live_slot, immutable_slot);
272            match chain_update.kind {
273                cardano_chain_follower::Kind::ImmutableBlockRollForward => {
274                    if params.end == Point::TIP {
276                        info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
280                        let mut result = params.done(
281                            first_indexed_block,
282                            first_immutable,
283                            last_indexed_block,
284                            last_immutable,
285                            blocks_synced,
286                            Ok(()),
287                        );
288                        result.follower_roll_forward = Some(chain_update.block_data().point());
291                        return result;
292                    }
293                },
294                cardano_chain_follower::Kind::Block => {
295                    let block = chain_update.block_data();
296
297                    if let Err(error) =
298                        index_block(block, &mut pending_blocks, params.end.slot_or_default()).await
299                    {
300                        let error_msg = format!("Failed to index block {}", block.point());
301                        error!(chain=%params.chain, error=%error, params=%params, error_msg);
302                        return params.done(
303                            first_indexed_block,
304                            first_immutable,
305                            last_indexed_block,
306                            last_immutable,
307                            blocks_synced,
308                            Err(error.context(error_msg)),
309                        );
310                    }
311
312                    if chain_update.tip && !set_follower_live_first_reached_tip() {
313                        metrics_updater::reached_live_tip(true);
314                    }
315
316                    update_block_state(
317                        block,
318                        &mut first_indexed_block,
319                        &mut first_immutable,
320                        &mut last_indexed_block,
321                        &mut last_immutable,
322                        &mut blocks_synced,
323                    );
324                },
325
326                cardano_chain_follower::Kind::Rollback => {
327                    let rollback_slot = chain_update.block_data().slot();
329
330                    let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
331                    if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
332                        error!(chain=%params.chain, error=%error,
333                            "Chain follower rollback, purging volatile data task failed."
334                        );
335                    } else {
336                        #[allow(clippy::arithmetic_side_effects)]
338                        let purge_slots = params
339                            .last_indexed_block
340                            .as_ref()
341                            .map_or(0.into(), |l| l.slot_or_default() - rollback_slot);
343
344                        metrics_updater::forward_data_purge(purge_slots.into());
345
346                        let block = chain_update.block_data();
348                        if let Err(error) =
349                            index_block(block, &mut pending_blocks, params.end.slot_or_default())
350                                .await
351                        {
352                            let error_msg =
353                                format!("Failed to index block after rollback {}", block.point());
354                            error!(chain=%params.chain, error=%error, params=%params, error_msg);
355                            return params.done(
356                                first_indexed_block,
357                                first_immutable,
358                                last_indexed_block,
359                                last_immutable,
360                                blocks_synced,
361                                Err(error.context(error_msg)),
362                            );
363                        }
364
365                        update_block_state(
366                            block,
367                            &mut first_indexed_block,
368                            &mut first_immutable,
369                            &mut last_indexed_block,
370                            &mut last_immutable,
371                            &mut blocks_synced,
372                        );
373                    }
374                },
375            }
376        }
377
378        let result = params.done(
379            first_indexed_block,
380            first_immutable,
381            last_indexed_block,
382            last_immutable,
383            blocks_synced,
384            Ok(()),
385        );
386
387        info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");
388
389        result
390    })
391}
392
393fn update_block_state(
395    block: &MultiEraBlock,
396    first_indexed_block: &mut Option<Point>,
397    first_immutable: &mut bool,
398    last_indexed_block: &mut Option<Point>,
399    last_immutable: &mut bool,
400    blocks_synced: &mut u64,
401) {
402    *last_immutable = block.is_immutable();
403    *last_indexed_block = Some(block.point());
404
405    if first_indexed_block.is_none() {
406        *first_immutable = *last_immutable;
407        *first_indexed_block = Some(block.point());
408    }
409
410    *blocks_synced = blocks_synced.saturating_add(1);
411}
412
413struct SyncTask {
416    cfg: chain_follower::EnvVars,
418
419    sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
421
422    current_sync_tasks: u16,
424
425    start_slot: Slot,
427
428    immutable_tip_slot: Slot,
430
431    live_tip_slot: Slot,
433
434    sync_status: Vec<SyncStatus>,
436
437    pending_blocks: watch::Sender<BTreeSet<Slot>>,
444}
445
446impl SyncTask {
447    fn new(cfg: chain_follower::EnvVars) -> SyncTask {
449        Self {
450            cfg,
451            sync_tasks: FuturesUnordered::new(),
452            start_slot: 0.into(),
453            current_sync_tasks: 0,
454            immutable_tip_slot: 0.into(),
455            live_tip_slot: 0.into(),
456            sync_status: Vec::new(),
457            pending_blocks: watch::channel(BTreeSet::new()).0,
458        }
459    }
460
461    fn add_sync_task(
463        &mut self,
464        params: SyncParams,
465    ) {
466        self.pending_blocks.send_modify(|blocks| {
467            blocks.insert(params.end.slot_or_default());
468        });
469        self.sync_tasks
470            .push(sync_subchain(params, self.pending_blocks.subscribe()));
471        self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
472        debug!(current_sync_tasks=%self.current_sync_tasks, "Added new Sync Task");
473        metrics_updater::sync_tasks(self.current_sync_tasks);
474    }
475
476    fn sync_task_finished(&mut self) {
478        self.current_sync_tasks = self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
479            error!("current_sync_tasks -= 1 overflow");
480            0
481        });
482        debug!(current_sync_tasks=%self.current_sync_tasks, "Finished Sync Task");
483        metrics_updater::sync_tasks(self.current_sync_tasks);
484    }
485
486    #[allow(clippy::too_many_lines)]
495    async fn run(&mut self) {
496        let tips = ChainFollower::get_tips(self.cfg.chain).await;
499        self.immutable_tip_slot = tips.0.slot_or_default();
500        self.live_tip_slot = tips.1.slot_or_default();
501        info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Running the primary blockchain follower task.");
502
503        metrics_updater::current_tip_slot(self.live_tip_slot, self.immutable_tip_slot);
504
505        CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
511
512        info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state for indexing");
513        self.sync_status = get_sync_status().await;
514        debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
515
516        self.add_sync_task(SyncParams::new(
519            self.cfg.chain,
520            Point::fuzzy(self.immutable_tip_slot),
521            Point::TIP,
522        ));
523        metrics_updater::reached_live_tip(false);
524
525        self.start_immutable_followers();
526        if self.sync_tasks.len() == 1 {
529            set_follower_immutable_first_reached_tip();
530        }
531
532        metrics_updater::reached_immutable_tip(self.sync_tasks.len() == 1);
533
534        while let Some(completed) = self.sync_tasks.next().await {
540            self.sync_task_finished();
542
543            match completed {
544                Ok(finished) => {
545                    let tips = ChainFollower::get_tips(self.cfg.chain).await;
546                    let immutable_tip_slot = tips.0.slot_or_default();
547                    let live_tip_slot = tips.1.slot_or_default();
548                    info!(immutable_tip_slot=?immutable_tip_slot, live_tip_slot=?live_tip_slot, "Chain Indexer task finished");
549                    if finished.end == Point::TIP {
557                        if let Some(ref roll_forward_point) = finished.follower_roll_forward {
558                            self.immutable_tip_slot = roll_forward_point.slot_or_default();
561
562                            metrics_updater::current_tip_slot(
563                                self.live_tip_slot,
564                                self.immutable_tip_slot,
565                            );
566
567                            info!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished reaching TIP.");
568
569                            self.start_immutable_followers();
570                            metrics_updater::reached_immutable_tip(false);
571                        } else {
572                            error!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished without to reach TIP.");
573                        }
574
575                        self.add_sync_task(finished.retry());
577                    } else if let Some(result) = finished.result.as_ref() {
578                        match result {
579                            Ok(()) => {
580                                info!(chain=%self.cfg.chain, report=%finished,
581                                    "The Immutable follower completed successfully.");
582
583                                finished.last_indexed_block.as_ref().inspect(|block| {
584                                    metrics_updater::highest_complete_indexed_slot(
585                                        block.slot_or_default(),
586                                    );
587                                });
588
589                                self.pending_blocks.send_modify(|blocks| {
590                                    if !blocks.remove(&finished.end.slot_or_default()) {
591                                        error!(chain=%self.cfg.chain, end=?finished.end,
592                                            "The immutable follower completed successfully, but its end point isn't present in index_sync_channel"
593                                        );
594                                    }
595                                });
596
597                                self.start_immutable_followers();
600                            },
601                            Err(error) => {
602                                error!(chain=%self.cfg.chain, report=%finished, error=%error,
603                                        "An Immutable follower failed, restarting it.");
604                                self.add_sync_task(finished.retry());
607                            },
608                        }
609                    } else {
610                        error!(chain=%self.cfg.chain, report=%finished,
611                        "BUG: The Immutable follower completed, but without a proper result.");
612                    }
613                },
614                Err(error) => {
615                    error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information.  Sync is probably failed at this point.");
616                },
617            }
618
619            if self.sync_tasks.len() == 1 {
628                set_follower_immutable_first_reached_tip();
629                metrics_updater::reached_immutable_tip(true);
630
631                CassandraSession::get(true).inspect(|session| {
634                    session.caches().assets_ada().clear_cache();
635                    session.caches().assets_native().clear_cache();
636                });
637
638                #[allow(clippy::arithmetic_side_effects)]
641                let purge_to_slot =
642                    self.immutable_tip_slot - Settings::purge_backward_slot_buffer();
643                let purge_condition = PurgeCondition::PurgeBackwards(purge_to_slot);
644                info!(chain=%self.cfg.chain, purge_to_slot=?purge_to_slot, "Backwards purging volatile data.");
645                if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
646                    error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
647                } else {
648                    metrics_updater::backward_data_purge();
649                }
650            }
651        }
652
653        error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped.  This is an unexpected error!");
654    }
655
656    fn start_immutable_followers(&mut self) {
658        if self.start_slot < self.immutable_tip_slot {
664            while self.current_sync_tasks < self.cfg.sync_tasks {
666                let end_slot = self.immutable_tip_slot.min(
667                    (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
668                        .into(),
669                );
670
671                if let Some((first_point, last_point)) =
672                    self.get_syncable_range(self.start_slot, end_slot)
673                {
674                    self.add_sync_task(SyncParams::new(
675                        self.cfg.chain,
676                        first_point,
677                        last_point.clone(),
678                    ));
679                }
680
681                self.start_slot = end_slot;
684
685                if end_slot == self.immutable_tip_slot {
686                    break;
687                }
688            }
689        }
690    }
691
692    fn get_syncable_range(
697        &self,
698        start: Slot,
699        end: Slot,
700    ) -> Option<(Point, Point)> {
701        for sync_block in &self.sync_status {
702            if start >= sync_block.start_slot && start <= sync_block.end_slot {
704                if end <= sync_block.end_slot {
706                    return None;
707                }
708
709                return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
716            }
717        }
718
719        let start_slot = if start == 0.into() {
720            Point::ORIGIN
721        } else {
722            Point::fuzzy(start)
723        };
724
725        Some((start_slot, Point::fuzzy(end)))
726    }
727}
728
729pub(crate) async fn start_followers() -> anyhow::Result<()> {
731    let cfg = Settings::follower_cfg();
732
733    cfg.log();
735
736    start_sync_for(&cfg).await?;
738
739    tokio::spawn(async move {
740        let mut sync_task = SyncTask::new(cfg);
741
742        sync_task.run().await;
743    });
744
745    Ok(())
746}