cat_gateway/cardano/
mod.rs

1//! Logic for orchestrating followers
2
3use std::{collections::BTreeSet, fmt::Display, sync::Arc, time::Duration};
4
5use cardano_chain_follower::{ChainFollower, ChainSyncConfig, MultiEraBlock, Network, Point, Slot};
6use duration_string::DurationString;
7use futures::{stream::FuturesUnordered, StreamExt};
8use rand::{Rng, SeedableRng};
9use tokio::sync::watch;
10use tracing::{debug, error, info};
11
12use crate::{
13    db::index::{
14        block::{
15            index_block,
16            roll_forward::{self, PurgeCondition},
17        },
18        queries::sync_status::{
19            get::{get_sync_status, SyncStatus},
20            update::update_sync_status,
21        },
22        session::CassandraSession,
23    },
24    metrics::chain_indexer::metrics_updater,
25    service::utilities::health::{
26        set_follower_immutable_first_reached_tip, set_follower_live_first_reached_tip,
27    },
28    settings::{chain_follower, Settings},
29};
30
31/// How long we wait between checks for connection to the indexing DB to be ready.
32pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
33
34/// Start syncing a particular network
35async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
36    let chain = cfg.chain;
37    let dl_config = cfg.dl_config.clone();
38
39    let mut cfg = ChainSyncConfig::default_for(chain);
40    cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
41    info!(chain = %chain, "Starting Chain Sync Task");
42
43    if let Err(error) = cfg.run().await {
44        error!(chain=%chain, error=%error, "Failed to start Chain Sync Task");
45        Err(error)?;
46    }
47
48    Ok(())
49}
50
51/// Data we return from a sync task.
52#[derive(Clone)]
53struct SyncParams {
54    /// What blockchain are we syncing.
55    chain: Network,
56    /// The starting point of this sync.
57    start: Point,
58    /// The ending point of this sync.
59    end: Point,
60    /// The first block we successfully synced.
61    first_indexed_block: Option<Point>,
62    /// Is the starting point immutable? (True = immutable, false = don't know.)
63    first_is_immutable: bool,
64    /// The last block we successfully synced.
65    last_indexed_block: Option<Point>,
66    /// Is the ending point immutable? (True = immutable, false = don't know.)
67    last_is_immutable: bool,
68    /// The number of blocks we successfully synced overall.
69    total_blocks_synced: u64,
70    /// The number of blocks we successfully synced, in the last attempt.
71    last_blocks_synced: u64,
72    /// The number of retries so far on this sync task.
73    retries: u64,
74    /// The number of retries so far on this sync task.
75    backoff_delay: Option<Duration>,
76    /// If the sync completed without error or not.
77    result: Arc<Option<anyhow::Result<()>>>,
78    /// Chain follower roll forward.
79    follower_roll_forward: Option<Point>,
80}
81
82impl Display for SyncParams {
83    fn fmt(
84        &self,
85        f: &mut std::fmt::Formatter<'_>,
86    ) -> std::fmt::Result {
87        if self.result.is_none() {
88            write!(f, "Sync_Params {{ ")?;
89        } else {
90            write!(f, "Sync_Result {{ ")?;
91        }
92
93        write!(f, "start: {}, end: {}", self.start, self.end)?;
94
95        if let Some(first) = self.first_indexed_block.as_ref() {
96            write!(
97                f,
98                ", first_indexed_block: {first}{}",
99                if self.first_is_immutable { ":I" } else { "" }
100            )?;
101        }
102
103        if let Some(last) = self.last_indexed_block.as_ref() {
104            write!(
105                f,
106                ", last_indexed_block: {last}{}",
107                if self.last_is_immutable { ":I" } else { "" }
108            )?;
109        }
110
111        if self.retries > 0 {
112            write!(f, ", retries: {}", self.retries)?;
113        }
114
115        if self.retries > 0 || self.result.is_some() {
116            write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
117        }
118
119        if self.result.is_some() {
120            write!(f, ", last_sync: {}", self.last_blocks_synced)?;
121        }
122
123        if let Some(backoff) = self.backoff_delay.as_ref() {
124            write!(f, ", backoff: {}", DurationString::from(*backoff))?;
125        }
126
127        if let Some(result) = self.result.as_ref() {
128            match result {
129                Ok(()) => write!(f, ", Success")?,
130                Err(error) => write!(f, ", {error}")?,
131            }
132        }
133
134        f.write_str(" }")
135    }
136}
137
138/// The range we generate random backoffs within given a base backoff value.
139const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
140
141impl SyncParams {
142    /// Create a new `SyncParams`.
143    fn new(
144        chain: Network,
145        start: Point,
146        end: Point,
147    ) -> Self {
148        Self {
149            chain,
150            start,
151            end,
152            first_indexed_block: None,
153            first_is_immutable: false,
154            last_indexed_block: None,
155            last_is_immutable: false,
156            total_blocks_synced: 0,
157            last_blocks_synced: 0,
158            retries: 0,
159            backoff_delay: None,
160            result: Arc::new(None),
161            follower_roll_forward: None,
162        }
163    }
164
165    /// Convert a result back into parameters for a retry.
166    fn retry(&self) -> Self {
167        let retry_count = self.retries.saturating_add(1);
168
169        let mut backoff = None;
170
171        // If we did sync any blocks last time, first retry is immediate.
172        // Otherwise we backoff progressively more as we do more retries.
173        if self.last_blocks_synced == 0 {
174            // Calculate backoff based on number of retries so far.
175            backoff = match retry_count {
176                1 => Some(Duration::from_secs(1)),     // 1-3 seconds
177                2..5 => Some(Duration::from_secs(10)), // 10-30 seconds
178                _ => Some(Duration::from_secs(30)),    // 30-90 seconds.
179            };
180        }
181
182        let mut retry = self.clone();
183        retry.last_blocks_synced = 0;
184        retry.retries = retry_count;
185        retry.backoff_delay = backoff;
186        retry.result = Arc::new(None);
187        retry.follower_roll_forward = None;
188
189        retry
190    }
191
192    /// Convert Params into the result of the sync.
193    pub(crate) fn done(
194        &self,
195        first: Option<Point>,
196        first_immutable: bool,
197        last: Option<Point>,
198        last_immutable: bool,
199        synced: u64,
200        result: anyhow::Result<()>,
201    ) -> Self {
202        if result.is_ok() && self.end != Point::TIP {
203            // Update sync status in the Immutable DB.
204            // Can fire and forget, because failure to update DB will simply cause the chunk to be
205            // re-indexed, on recovery.
206            update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
207        }
208        let mut done = self.clone();
209        done.first_indexed_block = first;
210        done.first_is_immutable = first_immutable;
211        done.last_indexed_block = last;
212        done.last_is_immutable = last_immutable;
213        done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
214        done.last_blocks_synced = synced;
215        done.result = Arc::new(Some(result));
216
217        done
218    }
219
220    /// Get where this sync run actually needs to start from.
221    fn actual_start(&self) -> Point {
222        self.last_indexed_block
223            .as_ref()
224            .unwrap_or(&self.start)
225            .clone()
226    }
227
228    /// Do the backoff delay processing.
229    ///
230    /// The actual delay is a random time from the Delay itself to
231    /// `BACKOFF_RANGE_MULTIPLIER` times the delay. This is to prevent hammering the
232    /// service at regular intervals.
233    async fn backoff(&self) {
234        if let Some(backoff) = self.backoff_delay {
235            let mut rng = rand::rngs::StdRng::from_entropy();
236            let actual_backoff =
237                rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
238
239            tokio::time::sleep(actual_backoff).await;
240        }
241    }
242}
243
244/// Sync a portion of the blockchain.
245/// Set end to `Point::TIP` to sync the tip continuously.
246#[allow(clippy::too_many_lines)]
247fn sync_subchain(
248    params: SyncParams,
249    mut pending_blocks: watch::Receiver<BTreeSet<Slot>>,
250) -> tokio::task::JoinHandle<SyncParams> {
251    tokio::spawn(async move {
252        // Backoff hitting the database if we need to.
253        params.backoff().await;
254
255        // Wait for indexing DB to be ready before continuing.
256        CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
257        info!(chain=%params.chain, params=%params,"Starting Chain Indexing Task");
258
259        let mut first_indexed_block = params.first_indexed_block.clone();
260        let mut first_immutable = params.first_is_immutable;
261        let mut last_indexed_block = params.last_indexed_block.clone();
262        let mut last_immutable = params.last_is_immutable;
263        let mut blocks_synced = 0u64;
264
265        let mut follower =
266            ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
267        while let Some(chain_update) = follower.next().await {
268            let tips = ChainFollower::get_tips(params.chain).await;
269            let immutable_slot = tips.0.slot_or_default();
270            let live_slot = tips.1.slot_or_default();
271            metrics_updater::current_tip_slot(live_slot, immutable_slot);
272            match chain_update.kind {
273                cardano_chain_follower::Kind::ImmutableBlockRollForward => {
274                    // We only process these on the follower tracking the TIP.
275                    if params.end == Point::TIP {
276                        // What we need to do here is tell the primary follower to start a new sync
277                        // for the new immutable data, and then purge the volatile database of the
278                        // old data (after the immutable data has synced).
279                        info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
280                        let mut result = params.done(
281                            first_indexed_block,
282                            first_immutable,
283                            last_indexed_block,
284                            last_immutable,
285                            blocks_synced,
286                            Ok(()),
287                        );
288                        // Signal the point the immutable chain rolled forward to.
289                        // If this is live chain immediately stops to later run immutable sync tasks
290                        result.follower_roll_forward = Some(chain_update.block_data().point());
291                        return result;
292                    }
293                },
294                cardano_chain_follower::Kind::Block => {
295                    let block = chain_update.block_data();
296
297                    if let Err(error) =
298                        index_block(block, &mut pending_blocks, params.end.slot_or_default()).await
299                    {
300                        let error_msg = format!("Failed to index block {}", block.point());
301                        error!(chain=%params.chain, error=%error, params=%params, error_msg);
302                        return params.done(
303                            first_indexed_block,
304                            first_immutable,
305                            last_indexed_block,
306                            last_immutable,
307                            blocks_synced,
308                            Err(error.context(error_msg)),
309                        );
310                    }
311
312                    if chain_update.tip && !set_follower_live_first_reached_tip() {
313                        metrics_updater::reached_live_tip(true);
314                    }
315
316                    update_block_state(
317                        block,
318                        &mut first_indexed_block,
319                        &mut first_immutable,
320                        &mut last_indexed_block,
321                        &mut last_immutable,
322                        &mut blocks_synced,
323                    );
324                },
325
326                cardano_chain_follower::Kind::Rollback => {
327                    // Rollback occurs, need to purge forward
328                    let rollback_slot = chain_update.block_data().slot();
329
330                    let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
331                    if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
332                        error!(chain=%params.chain, error=%error,
333                            "Chain follower rollback, purging volatile data task failed."
334                        );
335                    } else {
336                        // How many slots are purged
337                        #[allow(clippy::arithmetic_side_effects)]
338                        let purge_slots = params
339                            .last_indexed_block
340                            .as_ref()
341                            // Slots arithmetic has saturating semantic, so this is ok
342                            .map_or(0.into(), |l| l.slot_or_default() - rollback_slot);
343
344                        metrics_updater::forward_data_purge(purge_slots.into());
345
346                        // Purge success, now index the current block
347                        let block = chain_update.block_data();
348                        if let Err(error) =
349                            index_block(block, &mut pending_blocks, params.end.slot_or_default())
350                                .await
351                        {
352                            let error_msg =
353                                format!("Failed to index block after rollback {}", block.point());
354                            error!(chain=%params.chain, error=%error, params=%params, error_msg);
355                            return params.done(
356                                first_indexed_block,
357                                first_immutable,
358                                last_indexed_block,
359                                last_immutable,
360                                blocks_synced,
361                                Err(error.context(error_msg)),
362                            );
363                        }
364
365                        update_block_state(
366                            block,
367                            &mut first_indexed_block,
368                            &mut first_immutable,
369                            &mut last_indexed_block,
370                            &mut last_immutable,
371                            &mut blocks_synced,
372                        );
373                    }
374                },
375            }
376        }
377
378        let result = params.done(
379            first_indexed_block,
380            first_immutable,
381            last_indexed_block,
382            last_immutable,
383            blocks_synced,
384            Ok(()),
385        );
386
387        info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");
388
389        result
390    })
391}
392
393/// Update block related state.
394fn update_block_state(
395    block: &MultiEraBlock,
396    first_indexed_block: &mut Option<Point>,
397    first_immutable: &mut bool,
398    last_indexed_block: &mut Option<Point>,
399    last_immutable: &mut bool,
400    blocks_synced: &mut u64,
401) {
402    *last_immutable = block.is_immutable();
403    *last_indexed_block = Some(block.point());
404
405    if first_indexed_block.is_none() {
406        *first_immutable = *last_immutable;
407        *first_indexed_block = Some(block.point());
408    }
409
410    *blocks_synced = blocks_synced.saturating_add(1);
411}
412
413/// The synchronisation task, and its state.
414/// There should ONLY ever be one of these at any time.
415struct SyncTask {
416    /// Chain follower configuration.
417    cfg: chain_follower::EnvVars,
418
419    /// The current running sync tasks.
420    sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
421
422    /// How many immutable chain follower sync tasks we are running.
423    current_sync_tasks: u16,
424
425    /// Start for the next block we would sync.
426    start_slot: Slot,
427
428    /// The immutable tip slot.
429    immutable_tip_slot: Slot,
430
431    /// The live tip slot.
432    live_tip_slot: Slot,
433
434    /// Current Sync Status.
435    sync_status: Vec<SyncStatus>,
436
437    /// A channel that contains an end slot number for every immutable sync task.
438    ///
439    /// For example, if there are 3 tasks with `0..100`, `100..200` and `200..300` block
440    /// ranges (where the end bounds aren't inclusive), then the channel will contain the
441    /// following list: `[99, 199, 299]`. A slot value is removed when that task is done,
442    /// so when the second task is finished the list will be updated to `[99, 299]`.
443    pending_blocks: watch::Sender<BTreeSet<Slot>>,
444}
445
446impl SyncTask {
447    /// Create a new `SyncTask`.
448    fn new(cfg: chain_follower::EnvVars) -> SyncTask {
449        Self {
450            cfg,
451            sync_tasks: FuturesUnordered::new(),
452            start_slot: 0.into(),
453            current_sync_tasks: 0,
454            immutable_tip_slot: 0.into(),
455            live_tip_slot: 0.into(),
456            sync_status: Vec::new(),
457            pending_blocks: watch::channel(BTreeSet::new()).0,
458        }
459    }
460
461    /// Add a new `SyncTask` to the queue.
462    fn add_sync_task(
463        &mut self,
464        params: SyncParams,
465    ) {
466        self.pending_blocks.send_modify(|blocks| {
467            blocks.insert(params.end.slot_or_default());
468        });
469        self.sync_tasks
470            .push(sync_subchain(params, self.pending_blocks.subscribe()));
471        self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
472        debug!(current_sync_tasks=%self.current_sync_tasks, "Added new Sync Task");
473        metrics_updater::sync_tasks(self.current_sync_tasks);
474    }
475
476    /// Update `SyncTask` count.
477    fn sync_task_finished(&mut self) {
478        self.current_sync_tasks = self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
479            error!("current_sync_tasks -= 1 overflow");
480            0
481        });
482        debug!(current_sync_tasks=%self.current_sync_tasks, "Finished Sync Task");
483        metrics_updater::sync_tasks(self.current_sync_tasks);
484    }
485
486    /// Primary Chain Follower task.
487    ///
488    /// This continuously runs in the background, and never terminates.
489    ///
490    /// Sets the Index DB liveness flag to true if it is not already set.
491    ///
492    /// Sets the Chain Follower Has First Reached Tip flag to true if it is not already
493    /// set.
494    #[allow(clippy::too_many_lines)]
495    async fn run(&mut self) {
496        // We can't sync until the local chain data is synced.
497        // This call will wait until we sync.
498        let tips = ChainFollower::get_tips(self.cfg.chain).await;
499        self.immutable_tip_slot = tips.0.slot_or_default();
500        self.live_tip_slot = tips.1.slot_or_default();
501        info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Running the primary blockchain follower task.");
502
503        metrics_updater::current_tip_slot(self.live_tip_slot, self.immutable_tip_slot);
504
505        // Wait for indexing DB to be ready before continuing.
506        // We do this after the above, because other nodes may have finished already, and we don't
507        // want to wait do any work they already completed while we were fetching the blockchain.
508        //
509        // After waiting, we set the liveness flag to true if it is not already set.
510        CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
511
512        info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state for indexing");
513        self.sync_status = get_sync_status().await;
514        debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
515
516        // Start the Live Chain sync task - This can never end because it is syncing to TIP.
517        // So, if it fails, it will automatically be restarted.
518        self.add_sync_task(SyncParams::new(
519            self.cfg.chain,
520            Point::fuzzy(self.immutable_tip_slot),
521            Point::TIP,
522        ));
523        metrics_updater::reached_live_tip(false);
524
525        self.start_immutable_followers();
526        // IF there is only 1 chain follower spawn, then the immutable state already indexed and
527        // filled in the db.
528        if self.sync_tasks.len() == 1 {
529            set_follower_immutable_first_reached_tip();
530        }
531
532        metrics_updater::reached_immutable_tip(self.sync_tasks.len() == 1);
533
534        // Wait Sync tasks to complete.  If they fail and have not completed, reschedule them.
535        // If an immutable sync task ends OK, and we still have immutable data to sync then
536        // start a new task.
537        // They will return from this iterator in the order they complete.
538        // This iterator actually never ends, because the live sync task is always restarted.
539        while let Some(completed) = self.sync_tasks.next().await {
540            // update sync task count
541            self.sync_task_finished();
542
543            match completed {
544                Ok(finished) => {
545                    let tips = ChainFollower::get_tips(self.cfg.chain).await;
546                    let immutable_tip_slot = tips.0.slot_or_default();
547                    let live_tip_slot = tips.1.slot_or_default();
548                    info!(immutable_tip_slot=?immutable_tip_slot, live_tip_slot=?live_tip_slot, "Chain Indexer task finished");
549                    // Sync task finished.  Check if it completed OK or had an error.
550                    // If it failed, we need to reschedule it.
551
552                    // The TIP follower should NEVER end, unless there is an immutable roll forward,
553                    // or there is an error.  If this is not a roll forward, log an error.
554                    // It can fail if the index DB goes down in some way.
555                    // Restart it always.
556                    if finished.end == Point::TIP {
557                        if let Some(ref roll_forward_point) = finished.follower_roll_forward {
558                            // Advance the known immutable tip, and try and start followers to reach
559                            // it.
560                            self.immutable_tip_slot = roll_forward_point.slot_or_default();
561
562                            metrics_updater::current_tip_slot(
563                                self.live_tip_slot,
564                                self.immutable_tip_slot,
565                            );
566
567                            info!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished reaching TIP.");
568
569                            self.start_immutable_followers();
570                            metrics_updater::reached_immutable_tip(false);
571                        } else {
572                            error!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished without to reach TIP.");
573                        }
574
575                        // Start the Live Chain sync task again from where it left off.
576                        self.add_sync_task(finished.retry());
577                    } else if let Some(result) = finished.result.as_ref() {
578                        match result {
579                            Ok(()) => {
580                                info!(chain=%self.cfg.chain, report=%finished,
581                                    "The Immutable follower completed successfully.");
582
583                                finished.last_indexed_block.as_ref().inspect(|block| {
584                                    metrics_updater::highest_complete_indexed_slot(
585                                        block.slot_or_default(),
586                                    );
587                                });
588
589                                self.pending_blocks.send_modify(|blocks| {
590                                    if !blocks.remove(&finished.end.slot_or_default()) {
591                                        error!(chain=%self.cfg.chain, end=?finished.end,
592                                            "The immutable follower completed successfully, but its end point isn't present in index_sync_channel"
593                                        );
594                                    }
595                                });
596
597                                // If we need more immutable chain followers to sync the block
598                                // chain, we can now start them.
599                                self.start_immutable_followers();
600                            },
601                            Err(error) => {
602                                error!(chain=%self.cfg.chain, report=%finished, error=%error,
603                                        "An Immutable follower failed, restarting it.");
604                                // Restart the Immutable Chain sync task again from where it left
605                                // off.
606                                self.add_sync_task(finished.retry());
607                            },
608                        }
609                    } else {
610                        error!(chain=%self.cfg.chain, report=%finished,
611                        "BUG: The Immutable follower completed, but without a proper result.");
612                    }
613                },
614                Err(error) => {
615                    error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information.  Sync is probably failed at this point.");
616                },
617            }
618
619            // IF there is only 1 chain follower left in sync_tasks, then all
620            // immutable followers have finished.
621            // When this happens we need to purge the live index of any records that exist
622            // before the current immutable tip.
623            // Note: to prevent a data race when multiple nodes are syncing, we probably
624            // want to put a gap in this, so that there are X slots of overlap
625            // between the live chain and immutable chain.  This gap should be
626            // a parameter.
627            if self.sync_tasks.len() == 1 {
628                set_follower_immutable_first_reached_tip();
629                metrics_updater::reached_immutable_tip(true);
630
631                // Clear asset caches from the persistent Index DB session (volatile asset caches
632                // are disabled)
633                CassandraSession::get(true).inspect(|session| {
634                    session.caches().assets_ada().clear_cache();
635                    session.caches().assets_native().clear_cache();
636                });
637
638                // Purge data up to this slot
639                // Slots arithmetic has saturating semantic, so this is ok.
640                #[allow(clippy::arithmetic_side_effects)]
641                let purge_to_slot =
642                    self.immutable_tip_slot - Settings::purge_backward_slot_buffer();
643                let purge_condition = PurgeCondition::PurgeBackwards(purge_to_slot);
644                info!(chain=%self.cfg.chain, purge_to_slot=?purge_to_slot, "Backwards purging volatile data.");
645                if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
646                    error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
647                } else {
648                    metrics_updater::backward_data_purge();
649                }
650            }
651        }
652
653        error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped.  This is an unexpected error!");
654    }
655
656    /// Start immutable followers, if we can
657    fn start_immutable_followers(&mut self) {
658        // Start the Immutable Chain sync tasks, as required.
659        // We will start at most the number of configured sync tasks.
660        // The live chain sync task is not counted as a sync task for this config value.
661
662        // Nothing to do if the start_slot is not less than the end of the immutable chain.
663        if self.start_slot < self.immutable_tip_slot {
664            // Will also break if there are no more slots left to sync.
665            while self.current_sync_tasks < self.cfg.sync_tasks {
666                let end_slot = self.immutable_tip_slot.min(
667                    (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
668                        .into(),
669                );
670
671                if let Some((first_point, last_point)) =
672                    self.get_syncable_range(self.start_slot, end_slot)
673                {
674                    self.add_sync_task(SyncParams::new(
675                        self.cfg.chain,
676                        first_point,
677                        last_point.clone(),
678                    ));
679                }
680
681                // The one slot overlap is deliberate, it doesn't hurt anything and prevents all off
682                // by one problems that may occur otherwise.
683                self.start_slot = end_slot;
684
685                if end_slot == self.immutable_tip_slot {
686                    break;
687                }
688            }
689        }
690    }
691
692    /// Check if the requested range has already been indexed.
693    /// If it hasn't just return the slots as points.
694    /// If it has, return a subset that hasn't been indexed if any, or None if its been
695    /// completely indexed already.
696    fn get_syncable_range(
697        &self,
698        start: Slot,
699        end: Slot,
700    ) -> Option<(Point, Point)> {
701        for sync_block in &self.sync_status {
702            // Check if we start within a previously synchronized block.
703            if start >= sync_block.start_slot && start <= sync_block.end_slot {
704                // Check if we are fully contained by the sync block, if so, nothing to sync.
705                if end <= sync_block.end_slot {
706                    return None;
707                }
708
709                // In theory, we could extend into another sync block, but because we could extend
710                // into an unbounded number of sync blocks, we would need to bust
711                // this range into an unbounded number of sub chunks.
712                // It is not a problem to sync the same data mutiple times, so for simplicity we do
713                // not account for this, if the requested range goes beyond the sync
714                // block it starts within we assume that the rest is not synced.
715                return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
716            }
717        }
718
719        let start_slot = if start == 0.into() {
720            Point::ORIGIN
721        } else {
722            Point::fuzzy(start)
723        };
724
725        Some((start_slot, Point::fuzzy(end)))
726    }
727}
728
729/// Start followers as per defined in the config
730pub(crate) async fn start_followers() -> anyhow::Result<()> {
731    let cfg = Settings::follower_cfg();
732
733    // Log the chain follower configuration.
734    cfg.log();
735
736    // Start Syncing the blockchain, so we can consume its data as required.
737    start_sync_for(&cfg).await?;
738
739    tokio::spawn(async move {
740        let mut sync_task = SyncTask::new(cfg);
741
742        sync_task.run().await;
743    });
744
745    Ok(())
746}