cat_gateway/cardano/
mod.rs

1//! Logic for orchestrating followers
2
3use std::{collections::BTreeSet, fmt::Display, sync::Arc, time::Duration};
4
5use cardano_chain_follower::{ChainFollower, MultiEraBlock, Network, Point, Slot};
6use duration_string::DurationString;
7use futures::{StreamExt, stream::FuturesUnordered};
8use rand::{Rng, SeedableRng};
9use tokio::sync::watch;
10use tracing::{debug, error, info};
11
12use crate::{
13    db::index::{
14        block::{
15            index_block,
16            roll_forward::{self, PurgeCondition},
17        },
18        queries::sync_status::{
19            get::{SyncStatus, get_sync_status},
20            update::update_sync_status,
21        },
22        session::CassandraSession,
23    },
24    metrics::chain_indexer::metrics_updater,
25    service::utilities::health::{
26        set_follower_immutable_first_reached_tip, set_follower_live_first_reached_tip,
27    },
28    settings::{Settings, chain_follower},
29};
30
31/// How long we wait between checks for connection to the indexing DB to be ready.
32pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
33
34/// Start syncing a particular network
35async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
36    info!(chain = %cfg.chain(), "Starting Chain Sync Task");
37
38    if let Err(error) = cfg.cfg.clone().run().await {
39        error!(chain=%cfg.chain(), error=%error, "Failed to start Chain Sync Task");
40        Err(error)?;
41    }
42
43    Ok(())
44}
45
46/// Data we return from a sync task.
47#[derive(Clone)]
48struct SyncParams {
49    /// What blockchain are we syncing.
50    chain: Network,
51    /// The starting point of this sync.
52    start: Point,
53    /// The ending point of this sync.
54    end: Point,
55    /// The first block we successfully synced.
56    first_indexed_block: Option<Point>,
57    /// Is the starting point immutable? (True = immutable, false = don't know.)
58    first_is_immutable: bool,
59    /// The last block we successfully synced.
60    last_indexed_block: Option<Point>,
61    /// Is the ending point immutable? (True = immutable, false = don't know.)
62    last_is_immutable: bool,
63    /// The number of blocks we successfully synced overall.
64    total_blocks_synced: u64,
65    /// The number of blocks we successfully synced, in the last attempt.
66    last_blocks_synced: u64,
67    /// The number of retries so far on this sync task.
68    retries: u64,
69    /// The number of retries so far on this sync task.
70    backoff_delay: Option<Duration>,
71    /// If the sync completed without error or not.
72    result: Arc<Option<anyhow::Result<()>>>,
73    /// Chain follower roll forward.
74    follower_roll_forward: Option<Point>,
75}
76
77impl Display for SyncParams {
78    fn fmt(
79        &self,
80        f: &mut std::fmt::Formatter<'_>,
81    ) -> std::fmt::Result {
82        if self.result.is_none() {
83            write!(f, "Sync_Params {{ ")?;
84        } else {
85            write!(f, "Sync_Result {{ ")?;
86        }
87
88        write!(f, "start: {}, end: {}", self.start, self.end)?;
89
90        if let Some(first) = self.first_indexed_block.as_ref() {
91            write!(
92                f,
93                ", first_indexed_block: {first}{}",
94                if self.first_is_immutable { ":I" } else { "" }
95            )?;
96        }
97
98        if let Some(last) = self.last_indexed_block.as_ref() {
99            write!(
100                f,
101                ", last_indexed_block: {last}{}",
102                if self.last_is_immutable { ":I" } else { "" }
103            )?;
104        }
105
106        if self.retries > 0 {
107            write!(f, ", retries: {}", self.retries)?;
108        }
109
110        if self.retries > 0 || self.result.is_some() {
111            write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
112        }
113
114        if self.result.is_some() {
115            write!(f, ", last_sync: {}", self.last_blocks_synced)?;
116        }
117
118        if let Some(backoff) = self.backoff_delay.as_ref() {
119            write!(f, ", backoff: {}", DurationString::from(*backoff))?;
120        }
121
122        if let Some(result) = self.result.as_ref() {
123            match result {
124                Ok(()) => write!(f, ", Success")?,
125                Err(error) => write!(f, ", {error}")?,
126            }
127        }
128
129        f.write_str(" }")
130    }
131}
132
133/// The range we generate random backoffs within given a base backoff value.
134const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
135
136impl SyncParams {
137    /// Create a new `SyncParams`.
138    fn new(
139        chain: Network,
140        start: Point,
141        end: Point,
142    ) -> Self {
143        Self {
144            chain,
145            start,
146            end,
147            first_indexed_block: None,
148            first_is_immutable: false,
149            last_indexed_block: None,
150            last_is_immutable: false,
151            total_blocks_synced: 0,
152            last_blocks_synced: 0,
153            retries: 0,
154            backoff_delay: None,
155            result: Arc::new(None),
156            follower_roll_forward: None,
157        }
158    }
159
160    /// Convert a result back into parameters for a retry.
161    fn retry(&self) -> Self {
162        let retry_count = self.retries.saturating_add(1);
163
164        let mut backoff = None;
165
166        // If we did sync any blocks last time, first retry is immediate.
167        // Otherwise we backoff progressively more as we do more retries.
168        if self.last_blocks_synced == 0 {
169            // Calculate backoff based on number of retries so far.
170            backoff = match retry_count {
171                1 => Some(Duration::from_secs(1)),     // 1-3 seconds
172                2..5 => Some(Duration::from_secs(10)), // 10-30 seconds
173                _ => Some(Duration::from_secs(30)),    // 30-90 seconds.
174            };
175        }
176
177        let mut retry = self.clone();
178        retry.last_blocks_synced = 0;
179        retry.retries = retry_count;
180        retry.backoff_delay = backoff;
181        retry.result = Arc::new(None);
182        retry.follower_roll_forward = None;
183
184        retry
185    }
186
187    /// Convert Params into the result of the sync.
188    pub(crate) fn done(
189        &self,
190        first: Option<Point>,
191        first_immutable: bool,
192        last: Option<Point>,
193        last_immutable: bool,
194        synced: u64,
195        result: anyhow::Result<()>,
196    ) -> Self {
197        if result.is_ok() && self.end != Point::TIP {
198            // Update sync status in the Immutable DB.
199            // Can fire and forget, because failure to update DB will simply cause the chunk to be
200            // re-indexed, on recovery.
201            update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
202        }
203        let mut done = self.clone();
204        done.first_indexed_block = first;
205        done.first_is_immutable = first_immutable;
206        done.last_indexed_block = last;
207        done.last_is_immutable = last_immutable;
208        done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
209        done.last_blocks_synced = synced;
210        done.result = Arc::new(Some(result));
211
212        done
213    }
214
215    /// Get where this sync run actually needs to start from.
216    fn actual_start(&self) -> Point {
217        self.last_indexed_block
218            .as_ref()
219            .unwrap_or(&self.start)
220            .clone()
221    }
222
223    /// Do the backoff delay processing.
224    ///
225    /// The actual delay is a random time from the Delay itself to
226    /// `BACKOFF_RANGE_MULTIPLIER` times the delay. This is to prevent hammering the
227    /// service at regular intervals.
228    async fn backoff(&self) {
229        if let Some(backoff) = self.backoff_delay {
230            let mut rng = rand::rngs::StdRng::from_entropy();
231            let actual_backoff =
232                rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
233
234            tokio::time::sleep(actual_backoff).await;
235        }
236    }
237}
238
239/// Sync a portion of the blockchain.
240/// Set end to `Point::TIP` to sync the tip continuously.
241#[allow(clippy::too_many_lines)]
242fn sync_subchain(
243    params: SyncParams,
244    mut pending_blocks: watch::Receiver<BTreeSet<Slot>>,
245) -> tokio::task::JoinHandle<SyncParams> {
246    tokio::spawn(async move {
247        // Backoff hitting the database if we need to.
248        params.backoff().await;
249
250        // Wait for indexing DB to be ready before continuing.
251        CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
252        info!(chain=%params.chain, params=%params,"Starting Chain Indexing Task");
253
254        let mut first_indexed_block = params.first_indexed_block.clone();
255        let mut first_immutable = params.first_is_immutable;
256        let mut last_indexed_block = params.last_indexed_block.clone();
257        let mut last_immutable = params.last_is_immutable;
258        let mut blocks_synced = 0u64;
259
260        let mut follower =
261            ChainFollower::new(&params.chain, params.actual_start(), params.end.clone()).await;
262        while let Some(chain_update) = follower.next().await {
263            let tips = ChainFollower::get_tips(&params.chain).await;
264            let immutable_slot = tips.0.slot_or_default();
265            let live_slot = tips.1.slot_or_default();
266            metrics_updater::current_tip_slot(live_slot, immutable_slot);
267            match chain_update.kind {
268                cardano_chain_follower::Kind::ImmutableBlockRollForward => {
269                    // We only process these on the follower tracking the TIP.
270                    if params.end == Point::TIP {
271                        // What we need to do here is tell the primary follower to start a new sync
272                        // for the new immutable data, and then purge the volatile database of the
273                        // old data (after the immutable data has synced).
274                        info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
275                        let mut result = params.done(
276                            first_indexed_block,
277                            first_immutable,
278                            last_indexed_block,
279                            last_immutable,
280                            blocks_synced,
281                            Ok(()),
282                        );
283                        // Signal the point the immutable chain rolled forward to.
284                        // If this is live chain immediately stops to later run immutable sync tasks
285                        result.follower_roll_forward = Some(chain_update.block_data().point());
286                        return result;
287                    }
288                },
289                cardano_chain_follower::Kind::Block => {
290                    let block = chain_update.block_data();
291
292                    if let Err(error) =
293                        index_block(block, &mut pending_blocks, params.end.slot_or_default()).await
294                    {
295                        let error_msg = format!("Failed to index block {}", block.point());
296                        error!(chain=%params.chain, error=%error, params=%params, error_msg);
297                        return params.done(
298                            first_indexed_block,
299                            first_immutable,
300                            last_indexed_block,
301                            last_immutable,
302                            blocks_synced,
303                            Err(error.context(error_msg)),
304                        );
305                    }
306
307                    if chain_update.tip && !set_follower_live_first_reached_tip() {
308                        metrics_updater::reached_live_tip(true);
309                    }
310
311                    update_block_state(
312                        block,
313                        &mut first_indexed_block,
314                        &mut first_immutable,
315                        &mut last_indexed_block,
316                        &mut last_immutable,
317                        &mut blocks_synced,
318                    );
319                },
320
321                cardano_chain_follower::Kind::Rollback => {
322                    // Rollback occurs, need to purge forward
323                    let rollback_slot = chain_update.block_data().slot();
324
325                    let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
326                    if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
327                        error!(chain=%params.chain, error=%error,
328                            "Chain follower rollback, purging volatile data task failed."
329                        );
330                    } else {
331                        // How many slots are purged
332                        #[allow(clippy::arithmetic_side_effects)]
333                        let purge_slots = params
334                            .last_indexed_block
335                            .as_ref()
336                            // Slots arithmetic has saturating semantic, so this is ok
337                            .map_or(0.into(), |l| l.slot_or_default() - rollback_slot);
338
339                        metrics_updater::forward_data_purge(purge_slots.into());
340
341                        // Purge success, now index the current block
342                        let block = chain_update.block_data();
343                        if let Err(error) =
344                            index_block(block, &mut pending_blocks, params.end.slot_or_default())
345                                .await
346                        {
347                            let error_msg =
348                                format!("Failed to index block after rollback {}", block.point());
349                            error!(chain=%params.chain, error=%error, params=%params, error_msg);
350                            return params.done(
351                                first_indexed_block,
352                                first_immutable,
353                                last_indexed_block,
354                                last_immutable,
355                                blocks_synced,
356                                Err(error.context(error_msg)),
357                            );
358                        }
359
360                        update_block_state(
361                            block,
362                            &mut first_indexed_block,
363                            &mut first_immutable,
364                            &mut last_indexed_block,
365                            &mut last_immutable,
366                            &mut blocks_synced,
367                        );
368                    }
369                },
370            }
371        }
372
373        let result = params.done(
374            first_indexed_block,
375            first_immutable,
376            last_indexed_block,
377            last_immutable,
378            blocks_synced,
379            Ok(()),
380        );
381
382        info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");
383
384        result
385    })
386}
387
388/// Update block related state.
389fn update_block_state(
390    block: &MultiEraBlock,
391    first_indexed_block: &mut Option<Point>,
392    first_immutable: &mut bool,
393    last_indexed_block: &mut Option<Point>,
394    last_immutable: &mut bool,
395    blocks_synced: &mut u64,
396) {
397    *last_immutable = block.is_immutable();
398    *last_indexed_block = Some(block.point());
399
400    if first_indexed_block.is_none() {
401        *first_immutable = *last_immutable;
402        *first_indexed_block = Some(block.point());
403    }
404
405    *blocks_synced = blocks_synced.saturating_add(1);
406}
407
408/// The synchronisation task, and its state.
409/// There should ONLY ever be one of these at any time.
410struct SyncTask {
411    /// Chain follower configuration.
412    cfg: chain_follower::EnvVars,
413
414    /// The current running sync tasks.
415    sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
416
417    /// How many immutable chain follower sync tasks we are running.
418    current_sync_tasks: u16,
419
420    /// Start for the next block we would sync.
421    start_slot: Slot,
422
423    /// The immutable tip slot.
424    immutable_tip_slot: Slot,
425
426    /// The live tip slot.
427    live_tip_slot: Slot,
428
429    /// Current Sync Status.
430    sync_status: Vec<SyncStatus>,
431
432    /// A channel that contains an end slot number for every immutable sync task.
433    ///
434    /// For example, if there are 3 tasks with `0..100`, `100..200` and `200..300` block
435    /// ranges (where the end bounds aren't inclusive), then the channel will contain the
436    /// following list: `[99, 199, 299]`. A slot value is removed when that task is done,
437    /// so when the second task is finished the list will be updated to `[99, 299]`.
438    pending_blocks: watch::Sender<BTreeSet<Slot>>,
439}
440
441impl SyncTask {
442    /// Create a new `SyncTask`.
443    fn new(cfg: chain_follower::EnvVars) -> SyncTask {
444        Self {
445            cfg,
446            sync_tasks: FuturesUnordered::new(),
447            start_slot: 0.into(),
448            current_sync_tasks: 0,
449            immutable_tip_slot: 0.into(),
450            live_tip_slot: 0.into(),
451            sync_status: Vec::new(),
452            pending_blocks: watch::channel(BTreeSet::new()).0,
453        }
454    }
455
456    /// Add a new `SyncTask` to the queue.
457    fn add_sync_task(
458        &mut self,
459        params: SyncParams,
460    ) {
461        self.pending_blocks.send_modify(|blocks| {
462            blocks.insert(params.end.slot_or_default());
463        });
464        self.sync_tasks
465            .push(sync_subchain(params, self.pending_blocks.subscribe()));
466        self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
467        debug!(current_sync_tasks=%self.current_sync_tasks, "Added new Sync Task");
468        metrics_updater::sync_tasks(self.current_sync_tasks);
469    }
470
471    /// Update `SyncTask` count.
472    fn sync_task_finished(&mut self) {
473        self.current_sync_tasks = self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
474            error!("current_sync_tasks -= 1 overflow");
475            0
476        });
477        debug!(current_sync_tasks=%self.current_sync_tasks, "Finished Sync Task");
478        metrics_updater::sync_tasks(self.current_sync_tasks);
479    }
480
481    /// Primary Chain Follower task.
482    ///
483    /// This continuously runs in the background, and never terminates.
484    ///
485    /// Sets the Index DB liveness flag to true if it is not already set.
486    ///
487    /// Sets the Chain Follower Has First Reached Tip flag to true if it is not already
488    /// set.
489    #[allow(clippy::too_many_lines)]
490    async fn run(&mut self) {
491        // We can't sync until the local chain data is synced.
492        // This call will wait until we sync.
493        let tips = ChainFollower::get_tips(self.cfg.chain()).await;
494        self.immutable_tip_slot = tips.0.slot_or_default();
495        self.live_tip_slot = tips.1.slot_or_default();
496        info!(chain=%self.cfg.chain(), immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Running the primary blockchain follower task.");
497
498        metrics_updater::current_tip_slot(self.live_tip_slot, self.immutable_tip_slot);
499
500        // Wait for indexing DB to be ready before continuing.
501        // We do this after the above, because other nodes may have finished already, and we don't
502        // want to wait do any work they already completed while we were fetching the blockchain.
503        //
504        // After waiting, we set the liveness flag to true if it is not already set.
505        CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
506
507        info!(chain=%self.cfg.chain(), "Indexing DB is ready - Getting recovery state for indexing");
508        self.sync_status = get_sync_status().await;
509        debug!(chain=%self.cfg.chain(), "Sync Status: {:?}", self.sync_status);
510
511        // Start the Live Chain sync task - This can never end because it is syncing to TIP.
512        // So, if it fails, it will automatically be restarted.
513        self.add_sync_task(SyncParams::new(
514            self.cfg.chain().clone(),
515            Point::fuzzy(self.immutable_tip_slot),
516            Point::TIP,
517        ));
518        metrics_updater::reached_live_tip(false);
519
520        self.start_immutable_followers();
521        // IF there is only 1 chain follower spawn, then the immutable state already indexed and
522        // filled in the db.
523        if self.sync_tasks.len() == 1 {
524            set_follower_immutable_first_reached_tip();
525        }
526
527        metrics_updater::reached_immutable_tip(self.sync_tasks.len() == 1);
528
529        // Wait Sync tasks to complete.  If they fail and have not completed, reschedule them.
530        // If an immutable sync task ends OK, and we still have immutable data to sync then
531        // start a new task.
532        // They will return from this iterator in the order they complete.
533        // This iterator actually never ends, because the live sync task is always restarted.
534        while let Some(completed) = self.sync_tasks.next().await {
535            // update sync task count
536            self.sync_task_finished();
537
538            match completed {
539                Ok(finished) => {
540                    let tips = ChainFollower::get_tips(self.cfg.chain()).await;
541                    let immutable_tip_slot = tips.0.slot_or_default();
542                    let live_tip_slot = tips.1.slot_or_default();
543                    info!(immutable_tip_slot=?immutable_tip_slot, live_tip_slot=?live_tip_slot, "Chain Indexer task finished");
544                    // Sync task finished.  Check if it completed OK or had an error.
545                    // If it failed, we need to reschedule it.
546
547                    // The TIP follower should NEVER end, unless there is an immutable roll forward,
548                    // or there is an error.  If this is not a roll forward, log an error.
549                    // It can fail if the index DB goes down in some way.
550                    // Restart it always.
551                    if finished.end == Point::TIP {
552                        if let Some(ref roll_forward_point) = finished.follower_roll_forward {
553                            // Advance the known immutable tip, and try and start followers to reach
554                            // it.
555                            self.immutable_tip_slot = roll_forward_point.slot_or_default();
556
557                            metrics_updater::current_tip_slot(
558                                self.live_tip_slot,
559                                self.immutable_tip_slot,
560                            );
561
562                            info!(chain=%self.cfg.chain(), report=%finished, "Chain Indexer finished reaching TIP.");
563
564                            self.start_immutable_followers();
565                            metrics_updater::reached_immutable_tip(false);
566                        } else {
567                            error!(chain=%self.cfg.chain(), report=%finished, "Chain Indexer finished without to reach TIP.");
568                        }
569
570                        // Start the Live Chain sync task again from where it left off.
571                        self.add_sync_task(finished.retry());
572                    } else if let Some(result) = finished.result.as_ref() {
573                        match result {
574                            Ok(()) => {
575                                info!(chain=%self.cfg.chain(), report=%finished,
576                                    "The Immutable follower completed successfully.");
577
578                                finished.last_indexed_block.as_ref().inspect(|block| {
579                                    metrics_updater::highest_complete_indexed_slot(
580                                        block.slot_or_default(),
581                                    );
582                                });
583
584                                self.pending_blocks.send_modify(|blocks| {
585                                    if !blocks.remove(&finished.end.slot_or_default()) {
586                                        error!(chain=%self.cfg.chain(), end=?finished.end,
587                                            "The immutable follower completed successfully, but its end point isn't present in index_sync_channel"
588                                        );
589                                    }
590                                });
591
592                                // If we need more immutable chain followers to sync the block
593                                // chain, we can now start them.
594                                self.start_immutable_followers();
595                            },
596                            Err(error) => {
597                                error!(chain=%self.cfg.chain(), report=%finished, error=%error,
598                                        "An Immutable follower failed, restarting it.");
599                                // Restart the Immutable Chain sync task again from where it left
600                                // off.
601                                self.add_sync_task(finished.retry());
602                            },
603                        }
604                    } else {
605                        error!(chain=%self.cfg.chain(), report=%finished,
606                        "BUG: The Immutable follower completed, but without a proper result.");
607                    }
608                },
609                Err(error) => {
610                    error!(chain=%self.cfg.chain(), error=%error, "BUG: Sync task failed. Can not restart it, not enough information.  Sync is probably failed at this point.");
611                },
612            }
613
614            // IF there is only 1 chain follower left in sync_tasks, then all
615            // immutable followers have finished.
616            // When this happens we need to purge the live index of any records that exist
617            // before the current immutable tip.
618            // Note: to prevent a data race when multiple nodes are syncing, we probably
619            // want to put a gap in this, so that there are X slots of overlap
620            // between the live chain and immutable chain.  This gap should be
621            // a parameter.
622            if self.sync_tasks.len() == 1 {
623                set_follower_immutable_first_reached_tip();
624                metrics_updater::reached_immutable_tip(true);
625
626                // Clear asset caches from the persistent Index DB session (volatile asset caches
627                // are disabled)
628                CassandraSession::get(true).inspect(|session| {
629                    session.caches().assets_ada().clear_cache();
630                    session.caches().assets_native().clear_cache();
631                });
632
633                // Purge data up to this slot
634                // Slots arithmetic has saturating semantic, so this is ok.
635                #[allow(clippy::arithmetic_side_effects)]
636                let purge_to_slot =
637                    self.immutable_tip_slot - Settings::purge_backward_slot_buffer();
638                let purge_condition = PurgeCondition::PurgeBackwards(purge_to_slot);
639                info!(chain=%self.cfg.chain(), purge_to_slot=?purge_to_slot, "Backwards purging volatile data.");
640                if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
641                    error!(chain=%self.cfg.chain(), error=%error, "BUG: Purging volatile data task failed.");
642                } else {
643                    metrics_updater::backward_data_purge();
644                }
645            }
646        }
647
648        error!(chain=%self.cfg.chain(),"BUG: Sync tasks have all stopped.  This is an unexpected error!");
649    }
650
651    /// Start immutable followers, if we can
652    fn start_immutable_followers(&mut self) {
653        // Start the Immutable Chain sync tasks, as required.
654        // We will start at most the number of configured sync tasks.
655        // The live chain sync task is not counted as a sync task for this config value.
656
657        // Nothing to do if the start_slot is not less than the end of the immutable chain.
658        if self.start_slot < self.immutable_tip_slot {
659            // Will also break if there are no more slots left to sync.
660            while self.current_sync_tasks < self.cfg.sync_tasks {
661                let end_slot = self.immutable_tip_slot.min(
662                    (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
663                        .into(),
664                );
665
666                if let Some((first_point, last_point)) =
667                    self.get_syncable_range(self.start_slot, end_slot)
668                {
669                    self.add_sync_task(SyncParams::new(
670                        self.cfg.chain().clone(),
671                        first_point,
672                        last_point.clone(),
673                    ));
674                }
675
676                // The one slot overlap is deliberate, it doesn't hurt anything and prevents all off
677                // by one problems that may occur otherwise.
678                self.start_slot = end_slot;
679
680                if end_slot == self.immutable_tip_slot {
681                    break;
682                }
683            }
684        }
685    }
686
687    /// Check if the requested range has already been indexed.
688    /// If it hasn't just return the slots as points.
689    /// If it has, return a subset that hasn't been indexed if any, or None if its been
690    /// completely indexed already.
691    fn get_syncable_range(
692        &self,
693        start: Slot,
694        end: Slot,
695    ) -> Option<(Point, Point)> {
696        for sync_block in &self.sync_status {
697            // Check if we start within a previously synchronized block.
698            if start >= sync_block.start_slot && start <= sync_block.end_slot {
699                // Check if we are fully contained by the sync block, if so, nothing to sync.
700                if end <= sync_block.end_slot {
701                    return None;
702                }
703
704                // In theory, we could extend into another sync block, but because we could extend
705                // into an unbounded number of sync blocks, we would need to bust
706                // this range into an unbounded number of sub chunks.
707                // It is not a problem to sync the same data mutiple times, so for simplicity we do
708                // not account for this, if the requested range goes beyond the sync
709                // block it starts within we assume that the rest is not synced.
710                return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
711            }
712        }
713
714        let start_slot = if start == 0.into() {
715            Point::ORIGIN
716        } else {
717            Point::fuzzy(start)
718        };
719
720        Some((start_slot, Point::fuzzy(end)))
721    }
722}
723
724/// Start followers as per defined in the config
725pub(crate) async fn start_followers() -> anyhow::Result<()> {
726    let cfg = Settings::follower_cfg();
727
728    // Log the chain follower configuration.
729    cfg.log();
730
731    // Start Syncing the blockchain, so we can consume its data as required.
732    start_sync_for(&cfg).await?;
733
734    tokio::spawn(async move {
735        let mut sync_task = SyncTask::new(cfg);
736
737        sync_task.run().await;
738    });
739
740    Ok(())
741}