cat_gateway/cardano/
mod.rs

1//! Logic for orchestrating followers
2
3use std::{collections::BTreeSet, fmt::Display, sync::Arc, time::Duration};
4
5use cardano_chain_follower::{ChainFollower, ChainSyncConfig, MultiEraBlock, Network, Point, Slot};
6use duration_string::DurationString;
7use futures::{stream::FuturesUnordered, StreamExt};
8use rand::{Rng, SeedableRng};
9use tokio::sync::watch;
10use tracing::{debug, error, info};
11
12use crate::{
13    db::index::{
14        block::{
15            index_block,
16            roll_forward::{self, PurgeCondition},
17        },
18        queries::sync_status::{
19            get::{get_sync_status, SyncStatus},
20            update::update_sync_status,
21        },
22        session::CassandraSession,
23    },
24    metrics::chain_indexer::metrics_updater,
25    service::utilities::health::{
26        set_follower_immutable_first_reached_tip, set_follower_live_first_reached_tip,
27    },
28    settings::{chain_follower, Settings},
29};
30
31/// How long we wait between checks for connection to the indexing DB to be ready.
32pub(crate) const INDEXING_DB_READY_WAIT_INTERVAL: Duration = Duration::from_secs(1);
33
34/// Start syncing a particular network
35async fn start_sync_for(cfg: &chain_follower::EnvVars) -> anyhow::Result<()> {
36    let chain = cfg.chain;
37    let dl_config = cfg.dl_config.clone();
38
39    let mut cfg = ChainSyncConfig::default_for(chain);
40    cfg.mithril_cfg = cfg.mithril_cfg.with_dl_config(dl_config);
41    info!(chain = %chain, "Starting Chain Sync Task");
42
43    if let Err(error) = cfg.run().await {
44        error!(chain=%chain, error=%error, "Failed to start Chain Sync Task");
45        Err(error)?;
46    }
47
48    Ok(())
49}
50
51/// Data we return from a sync task.
52#[derive(Clone)]
53struct SyncParams {
54    /// What blockchain are we syncing.
55    chain: Network,
56    /// The starting point of this sync.
57    start: Point,
58    /// The ending point of this sync.
59    end: Point,
60    /// The first block we successfully synced.
61    first_indexed_block: Option<Point>,
62    /// Is the starting point immutable? (True = immutable, false = don't know.)
63    first_is_immutable: bool,
64    /// The last block we successfully synced.
65    last_indexed_block: Option<Point>,
66    /// Is the ending point immutable? (True = immutable, false = don't know.)
67    last_is_immutable: bool,
68    /// The number of blocks we successfully synced overall.
69    total_blocks_synced: u64,
70    /// The number of blocks we successfully synced, in the last attempt.
71    last_blocks_synced: u64,
72    /// The number of retries so far on this sync task.
73    retries: u64,
74    /// The number of retries so far on this sync task.
75    backoff_delay: Option<Duration>,
76    /// If the sync completed without error or not.
77    result: Arc<Option<anyhow::Result<()>>>,
78    /// Chain follower roll forward.
79    follower_roll_forward: Option<Point>,
80}
81
82impl Display for SyncParams {
83    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
84        if self.result.is_none() {
85            write!(f, "Sync_Params {{ ")?;
86        } else {
87            write!(f, "Sync_Result {{ ")?;
88        }
89
90        write!(f, "start: {}, end: {}", self.start, self.end)?;
91
92        if let Some(first) = self.first_indexed_block.as_ref() {
93            write!(
94                f,
95                ", first_indexed_block: {first}{}",
96                if self.first_is_immutable { ":I" } else { "" }
97            )?;
98        }
99
100        if let Some(last) = self.last_indexed_block.as_ref() {
101            write!(
102                f,
103                ", last_indexed_block: {last}{}",
104                if self.last_is_immutable { ":I" } else { "" }
105            )?;
106        }
107
108        if self.retries > 0 {
109            write!(f, ", retries: {}", self.retries)?;
110        }
111
112        if self.retries > 0 || self.result.is_some() {
113            write!(f, ", synced_blocks: {}", self.total_blocks_synced)?;
114        }
115
116        if self.result.is_some() {
117            write!(f, ", last_sync: {}", self.last_blocks_synced)?;
118        }
119
120        if let Some(backoff) = self.backoff_delay.as_ref() {
121            write!(f, ", backoff: {}", DurationString::from(*backoff))?;
122        }
123
124        if let Some(result) = self.result.as_ref() {
125            match result {
126                Ok(()) => write!(f, ", Success")?,
127                Err(error) => write!(f, ", {error}")?,
128            };
129        }
130
131        f.write_str(" }")
132    }
133}
134
135/// The range we generate random backoffs within given a base backoff value.
136const BACKOFF_RANGE_MULTIPLIER: u32 = 3;
137
138impl SyncParams {
139    /// Create a new `SyncParams`.
140    fn new(chain: Network, start: Point, end: Point) -> Self {
141        Self {
142            chain,
143            start,
144            end,
145            first_indexed_block: None,
146            first_is_immutable: false,
147            last_indexed_block: None,
148            last_is_immutable: false,
149            total_blocks_synced: 0,
150            last_blocks_synced: 0,
151            retries: 0,
152            backoff_delay: None,
153            result: Arc::new(None),
154            follower_roll_forward: None,
155        }
156    }
157
158    /// Convert a result back into parameters for a retry.
159    fn retry(&self) -> Self {
160        let retry_count = self.retries.saturating_add(1);
161
162        let mut backoff = None;
163
164        // If we did sync any blocks last time, first retry is immediate.
165        // Otherwise we backoff progressively more as we do more retries.
166        if self.last_blocks_synced == 0 {
167            // Calculate backoff based on number of retries so far.
168            backoff = match retry_count {
169                1 => Some(Duration::from_secs(1)),     // 1-3 seconds
170                2..5 => Some(Duration::from_secs(10)), // 10-30 seconds
171                _ => Some(Duration::from_secs(30)),    // 30-90 seconds.
172            };
173        }
174
175        let mut retry = self.clone();
176        retry.last_blocks_synced = 0;
177        retry.retries = retry_count;
178        retry.backoff_delay = backoff;
179        retry.result = Arc::new(None);
180        retry.follower_roll_forward = None;
181
182        retry
183    }
184
185    /// Convert Params into the result of the sync.
186    pub(crate) fn done(
187        &self, first: Option<Point>, first_immutable: bool, last: Option<Point>,
188        last_immutable: bool, synced: u64, result: anyhow::Result<()>,
189    ) -> Self {
190        if result.is_ok() && self.end != Point::TIP {
191            // Update sync status in the Immutable DB.
192            // Can fire and forget, because failure to update DB will simply cause the chunk to be
193            // re-indexed, on recovery.
194            update_sync_status(self.end.slot_or_default(), self.start.slot_or_default());
195        }
196        let mut done = self.clone();
197        done.first_indexed_block = first;
198        done.first_is_immutable = first_immutable;
199        done.last_indexed_block = last;
200        done.last_is_immutable = last_immutable;
201        done.total_blocks_synced = done.total_blocks_synced.saturating_add(synced);
202        done.last_blocks_synced = synced;
203        done.result = Arc::new(Some(result));
204
205        done
206    }
207
208    /// Get where this sync run actually needs to start from.
209    fn actual_start(&self) -> Point {
210        self.last_indexed_block
211            .as_ref()
212            .unwrap_or(&self.start)
213            .clone()
214    }
215
216    /// Do the backoff delay processing.
217    ///
218    /// The actual delay is a random time from the Delay itself to
219    /// `BACKOFF_RANGE_MULTIPLIER` times the delay. This is to prevent hammering the
220    /// service at regular intervals.
221    async fn backoff(&self) {
222        if let Some(backoff) = self.backoff_delay {
223            let mut rng = rand::rngs::StdRng::from_entropy();
224            let actual_backoff =
225                rng.gen_range(backoff..backoff.saturating_mul(BACKOFF_RANGE_MULTIPLIER));
226
227            tokio::time::sleep(actual_backoff).await;
228        }
229    }
230}
231
232/// Sync a portion of the blockchain.
233/// Set end to `Point::TIP` to sync the tip continuously.
234#[allow(clippy::too_many_lines)]
235fn sync_subchain(
236    params: SyncParams, mut pending_blocks: watch::Receiver<BTreeSet<Slot>>,
237) -> tokio::task::JoinHandle<SyncParams> {
238    tokio::spawn(async move {
239        // Backoff hitting the database if we need to.
240        params.backoff().await;
241
242        // Wait for indexing DB to be ready before continuing.
243        CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
244        info!(chain=%params.chain, params=%params,"Starting Chain Indexing Task");
245
246        let mut first_indexed_block = params.first_indexed_block.clone();
247        let mut first_immutable = params.first_is_immutable;
248        let mut last_indexed_block = params.last_indexed_block.clone();
249        let mut last_immutable = params.last_is_immutable;
250        let mut blocks_synced = 0u64;
251
252        let mut follower =
253            ChainFollower::new(params.chain, params.actual_start(), params.end.clone()).await;
254        while let Some(chain_update) = follower.next().await {
255            let tips = ChainFollower::get_tips(params.chain).await;
256            let immutable_slot = tips.0.slot_or_default();
257            let live_slot = tips.1.slot_or_default();
258            metrics_updater::current_tip_slot(live_slot, immutable_slot);
259            match chain_update.kind {
260                cardano_chain_follower::Kind::ImmutableBlockRollForward => {
261                    // We only process these on the follower tracking the TIP.
262                    if params.end == Point::TIP {
263                        // What we need to do here is tell the primary follower to start a new sync
264                        // for the new immutable data, and then purge the volatile database of the
265                        // old data (after the immutable data has synced).
266                        info!(chain=%params.chain, point=?chain_update.block_data().point(), "Immutable chain rolled forward.");
267                        let mut result = params.done(
268                            first_indexed_block,
269                            first_immutable,
270                            last_indexed_block,
271                            last_immutable,
272                            blocks_synced,
273                            Ok(()),
274                        );
275                        // Signal the point the immutable chain rolled forward to.
276                        // If this is live chain immediately stops to later run immutable sync tasks
277                        result.follower_roll_forward = Some(chain_update.block_data().point());
278                        return result;
279                    }
280                },
281                cardano_chain_follower::Kind::Block => {
282                    let block = chain_update.block_data();
283
284                    if let Err(error) =
285                        index_block(block, &mut pending_blocks, params.end.slot_or_default()).await
286                    {
287                        let error_msg = format!("Failed to index block {}", block.point());
288                        error!(chain=%params.chain, error=%error, params=%params, error_msg);
289                        return params.done(
290                            first_indexed_block,
291                            first_immutable,
292                            last_indexed_block,
293                            last_immutable,
294                            blocks_synced,
295                            Err(error.context(error_msg)),
296                        );
297                    }
298
299                    if chain_update.tip && !set_follower_live_first_reached_tip() {
300                        metrics_updater::reached_live_tip(true);
301                    }
302
303                    update_block_state(
304                        block,
305                        &mut first_indexed_block,
306                        &mut first_immutable,
307                        &mut last_indexed_block,
308                        &mut last_immutable,
309                        &mut blocks_synced,
310                    );
311                },
312
313                cardano_chain_follower::Kind::Rollback => {
314                    // Rollback occurs, need to purge forward
315                    let rollback_slot = chain_update.block_data().slot();
316
317                    let purge_condition = PurgeCondition::PurgeForwards(rollback_slot);
318                    if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
319                        error!(chain=%params.chain, error=%error,
320                            "Chain follower rollback, purging volatile data task failed."
321                        );
322                    } else {
323                        // How many slots are purged
324                        #[allow(clippy::arithmetic_side_effects)]
325                        let purge_slots = params
326                            .last_indexed_block
327                            .as_ref()
328                            // Slots arithmetic has saturating semantic, so this is ok
329                            .map_or(0.into(), |l| l.slot_or_default() - rollback_slot);
330
331                        metrics_updater::forward_data_purge(purge_slots.into());
332
333                        // Purge success, now index the current block
334                        let block = chain_update.block_data();
335                        if let Err(error) =
336                            index_block(block, &mut pending_blocks, params.end.slot_or_default())
337                                .await
338                        {
339                            let error_msg =
340                                format!("Failed to index block after rollback {}", block.point());
341                            error!(chain=%params.chain, error=%error, params=%params, error_msg);
342                            return params.done(
343                                first_indexed_block,
344                                first_immutable,
345                                last_indexed_block,
346                                last_immutable,
347                                blocks_synced,
348                                Err(error.context(error_msg)),
349                            );
350                        }
351
352                        update_block_state(
353                            block,
354                            &mut first_indexed_block,
355                            &mut first_immutable,
356                            &mut last_indexed_block,
357                            &mut last_immutable,
358                            &mut blocks_synced,
359                        );
360                    }
361                },
362            }
363        }
364
365        let result = params.done(
366            first_indexed_block,
367            first_immutable,
368            last_indexed_block,
369            last_immutable,
370            blocks_synced,
371            Ok(()),
372        );
373
374        info!(chain = %result.chain, result=%result, "Indexing Blockchain Completed: OK");
375
376        result
377    })
378}
379
380/// Update block related state.
381fn update_block_state(
382    block: &MultiEraBlock, first_indexed_block: &mut Option<Point>, first_immutable: &mut bool,
383    last_indexed_block: &mut Option<Point>, last_immutable: &mut bool, blocks_synced: &mut u64,
384) {
385    *last_immutable = block.is_immutable();
386    *last_indexed_block = Some(block.point());
387
388    if first_indexed_block.is_none() {
389        *first_immutable = *last_immutable;
390        *first_indexed_block = Some(block.point());
391    }
392
393    *blocks_synced = blocks_synced.saturating_add(1);
394}
395
396/// The synchronisation task, and its state.
397/// There should ONLY ever be one of these at any time.
398struct SyncTask {
399    /// Chain follower configuration.
400    cfg: chain_follower::EnvVars,
401
402    /// The current running sync tasks.
403    sync_tasks: FuturesUnordered<tokio::task::JoinHandle<SyncParams>>,
404
405    /// How many immutable chain follower sync tasks we are running.
406    current_sync_tasks: u16,
407
408    /// Start for the next block we would sync.
409    start_slot: Slot,
410
411    /// The immutable tip slot.
412    immutable_tip_slot: Slot,
413
414    /// The live tip slot.
415    live_tip_slot: Slot,
416
417    /// Current Sync Status.
418    sync_status: Vec<SyncStatus>,
419
420    /// A channel that contains an end slot number for every immutable sync task.
421    ///
422    /// For example, if there are 3 tasks with `0..100`, `100..200` and `200..300` block
423    /// ranges (where the end bounds aren't inclusive), then the channel will contain the
424    /// following list: `[99, 199, 299]`. A slot value is removed when that task is done,
425    /// so when the second task is finished the list will be updated to `[99, 299]`.
426    pending_blocks: watch::Sender<BTreeSet<Slot>>,
427}
428
429impl SyncTask {
430    /// Create a new `SyncTask`.
431    fn new(cfg: chain_follower::EnvVars) -> SyncTask {
432        Self {
433            cfg,
434            sync_tasks: FuturesUnordered::new(),
435            start_slot: 0.into(),
436            current_sync_tasks: 0,
437            immutable_tip_slot: 0.into(),
438            live_tip_slot: 0.into(),
439            sync_status: Vec::new(),
440            pending_blocks: watch::channel(BTreeSet::new()).0,
441        }
442    }
443
444    /// Add a new `SyncTask` to the queue.
445    fn add_sync_task(&mut self, params: SyncParams) {
446        self.pending_blocks.send_modify(|blocks| {
447            blocks.insert(params.end.slot_or_default());
448        });
449        self.sync_tasks
450            .push(sync_subchain(params, self.pending_blocks.subscribe()));
451        self.current_sync_tasks = self.current_sync_tasks.saturating_add(1);
452        debug!(current_sync_tasks=%self.current_sync_tasks, "Added new Sync Task");
453        metrics_updater::sync_tasks(self.current_sync_tasks);
454    }
455
456    /// Update `SyncTask` count.
457    fn sync_task_finished(&mut self) {
458        self.current_sync_tasks = self.current_sync_tasks.checked_sub(1).unwrap_or_else(|| {
459            error!("current_sync_tasks -= 1 overflow");
460            0
461        });
462        debug!(current_sync_tasks=%self.current_sync_tasks, "Finished Sync Task");
463        metrics_updater::sync_tasks(self.current_sync_tasks);
464    }
465
466    /// Primary Chain Follower task.
467    ///
468    /// This continuously runs in the background, and never terminates.
469    ///
470    /// Sets the Index DB liveness flag to true if it is not already set.
471    ///
472    /// Sets the Chain Follower Has First Reached Tip flag to true if it is not already
473    /// set.
474    #[allow(clippy::too_many_lines)]
475    async fn run(&mut self) {
476        // We can't sync until the local chain data is synced.
477        // This call will wait until we sync.
478        let tips = ChainFollower::get_tips(self.cfg.chain).await;
479        self.immutable_tip_slot = tips.0.slot_or_default();
480        self.live_tip_slot = tips.1.slot_or_default();
481        info!(chain=%self.cfg.chain, immutable_tip=?self.immutable_tip_slot, live_tip=?self.live_tip_slot, "Running the primary blockchain follower task.");
482
483        metrics_updater::current_tip_slot(self.live_tip_slot, self.immutable_tip_slot);
484
485        // Wait for indexing DB to be ready before continuing.
486        // We do this after the above, because other nodes may have finished already, and we don't
487        // want to wait do any work they already completed while we were fetching the blockchain.
488        //
489        // After waiting, we set the liveness flag to true if it is not already set.
490        CassandraSession::wait_until_ready(INDEXING_DB_READY_WAIT_INTERVAL).await;
491
492        info!(chain=%self.cfg.chain, "Indexing DB is ready - Getting recovery state for indexing");
493        self.sync_status = get_sync_status().await;
494        debug!(chain=%self.cfg.chain, "Sync Status: {:?}", self.sync_status);
495
496        // Start the Live Chain sync task - This can never end because it is syncing to TIP.
497        // So, if it fails, it will automatically be restarted.
498        self.add_sync_task(SyncParams::new(
499            self.cfg.chain,
500            Point::fuzzy(self.immutable_tip_slot),
501            Point::TIP,
502        ));
503        metrics_updater::reached_live_tip(false);
504
505        self.start_immutable_followers();
506        // IF there is only 1 chain follower spawn, then the immutable state already indexed and
507        // filled in the db.
508        if self.sync_tasks.len() == 1 {
509            set_follower_immutable_first_reached_tip();
510        }
511
512        metrics_updater::reached_immutable_tip(self.sync_tasks.len() == 1);
513
514        // Wait Sync tasks to complete.  If they fail and have not completed, reschedule them.
515        // If an immutable sync task ends OK, and we still have immutable data to sync then
516        // start a new task.
517        // They will return from this iterator in the order they complete.
518        // This iterator actually never ends, because the live sync task is always restarted.
519        while let Some(completed) = self.sync_tasks.next().await {
520            // update sync task count
521            self.sync_task_finished();
522
523            match completed {
524                Ok(finished) => {
525                    let tips = ChainFollower::get_tips(self.cfg.chain).await;
526                    let immutable_tip_slot = tips.0.slot_or_default();
527                    let live_tip_slot = tips.1.slot_or_default();
528                    info!(immutable_tip_slot=?immutable_tip_slot, live_tip_slot=?live_tip_slot, "Chain Indexer task finished");
529                    // Sync task finished.  Check if it completed OK or had an error.
530                    // If it failed, we need to reschedule it.
531
532                    // The TIP follower should NEVER end, unless there is an immutable roll forward,
533                    // or there is an error.  If this is not a roll forward, log an error.
534                    // It can fail if the index DB goes down in some way.
535                    // Restart it always.
536                    if finished.end == Point::TIP {
537                        if let Some(ref roll_forward_point) = finished.follower_roll_forward {
538                            // Advance the known immutable tip, and try and start followers to reach
539                            // it.
540                            self.immutable_tip_slot = roll_forward_point.slot_or_default();
541
542                            metrics_updater::current_tip_slot(
543                                self.live_tip_slot,
544                                self.immutable_tip_slot,
545                            );
546
547                            info!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished reaching TIP.");
548
549                            self.start_immutable_followers();
550                            metrics_updater::reached_immutable_tip(false);
551                        } else {
552                            error!(chain=%self.cfg.chain, report=%finished, "Chain Indexer finished without to reach TIP.");
553                        }
554
555                        // Start the Live Chain sync task again from where it left off.
556                        self.add_sync_task(finished.retry());
557                    } else if let Some(result) = finished.result.as_ref() {
558                        match result {
559                            Ok(()) => {
560                                info!(chain=%self.cfg.chain, report=%finished,
561                                    "The Immutable follower completed successfully.");
562
563                                finished.last_indexed_block.as_ref().inspect(|block| {
564                                    metrics_updater::highest_complete_indexed_slot(
565                                        block.slot_or_default(),
566                                    );
567                                });
568
569                                self.pending_blocks.send_modify(|blocks| {
570                                    if !blocks.remove(&finished.end.slot_or_default()) {
571                                        error!(chain=%self.cfg.chain, end=?finished.end,
572                                            "The immutable follower completed successfully, but its end point isn't present in index_sync_channel"
573                                        );
574                                    }
575                                });
576
577                                // If we need more immutable chain followers to sync the block
578                                // chain, we can now start them.
579                                self.start_immutable_followers();
580                            },
581                            Err(error) => {
582                                error!(chain=%self.cfg.chain, report=%finished, error=%error,
583                                        "An Immutable follower failed, restarting it.");
584                                // Restart the Immutable Chain sync task again from where it left
585                                // off.
586                                self.add_sync_task(finished.retry());
587                            },
588                        }
589                    } else {
590                        error!(chain=%self.cfg.chain, report=%finished,
591                        "BUG: The Immutable follower completed, but without a proper result.");
592                    }
593                },
594                Err(error) => {
595                    error!(chain=%self.cfg.chain, error=%error, "BUG: Sync task failed. Can not restart it, not enough information.  Sync is probably failed at this point.");
596                },
597            }
598
599            // IF there is only 1 chain follower left in sync_tasks, then all
600            // immutable followers have finished.
601            // When this happens we need to purge the live index of any records that exist
602            // before the current immutable tip.
603            // Note: to prevent a data race when multiple nodes are syncing, we probably
604            // want to put a gap in this, so that there are X slots of overlap
605            // between the live chain and immutable chain.  This gap should be
606            // a parameter.
607            if self.sync_tasks.len() == 1 {
608                set_follower_immutable_first_reached_tip();
609                metrics_updater::reached_immutable_tip(true);
610
611                // Clear asset caches from the persistent Index DB session (volatile asset caches
612                // are disabled)
613                CassandraSession::get(true).inspect(|session| {
614                    session.caches().assets_ada().clear_cache();
615                    session.caches().assets_native().clear_cache();
616                });
617
618                // Purge data up to this slot
619                // Slots arithmetic has saturating semantic, so this is ok.
620                #[allow(clippy::arithmetic_side_effects)]
621                let purge_to_slot =
622                    self.immutable_tip_slot - Settings::purge_backward_slot_buffer();
623                let purge_condition = PurgeCondition::PurgeBackwards(purge_to_slot);
624                info!(chain=%self.cfg.chain, purge_to_slot=?purge_to_slot, "Backwards purging volatile data.");
625                if let Err(error) = roll_forward::purge_live_index(purge_condition).await {
626                    error!(chain=%self.cfg.chain, error=%error, "BUG: Purging volatile data task failed.");
627                } else {
628                    metrics_updater::backward_data_purge();
629                }
630            }
631        }
632
633        error!(chain=%self.cfg.chain,"BUG: Sync tasks have all stopped.  This is an unexpected error!");
634    }
635
636    /// Start immutable followers, if we can
637    fn start_immutable_followers(&mut self) {
638        // Start the Immutable Chain sync tasks, as required.
639        // We will start at most the number of configured sync tasks.
640        // The live chain sync task is not counted as a sync task for this config value.
641
642        // Nothing to do if the start_slot is not less than the end of the immutable chain.
643        if self.start_slot < self.immutable_tip_slot {
644            // Will also break if there are no more slots left to sync.
645            while self.current_sync_tasks < self.cfg.sync_tasks {
646                let end_slot = self.immutable_tip_slot.min(
647                    (u64::from(self.start_slot).saturating_add(self.cfg.sync_chunk_max_slots))
648                        .into(),
649                );
650
651                if let Some((first_point, last_point)) =
652                    self.get_syncable_range(self.start_slot, end_slot)
653                {
654                    self.add_sync_task(SyncParams::new(
655                        self.cfg.chain,
656                        first_point,
657                        last_point.clone(),
658                    ));
659                }
660
661                // The one slot overlap is deliberate, it doesn't hurt anything and prevents all off
662                // by one problems that may occur otherwise.
663                self.start_slot = end_slot;
664
665                if end_slot == self.immutable_tip_slot {
666                    break;
667                }
668            }
669        }
670    }
671
672    /// Check if the requested range has already been indexed.
673    /// If it hasn't just return the slots as points.
674    /// If it has, return a subset that hasn't been indexed if any, or None if its been
675    /// completely indexed already.
676    fn get_syncable_range(&self, start: Slot, end: Slot) -> Option<(Point, Point)> {
677        for sync_block in &self.sync_status {
678            // Check if we start within a previously synchronized block.
679            if start >= sync_block.start_slot && start <= sync_block.end_slot {
680                // Check if we are fully contained by the sync block, if so, nothing to sync.
681                if end <= sync_block.end_slot {
682                    return None;
683                }
684
685                // In theory, we could extend into another sync block, but because we could extend
686                // into an unbounded number of sync blocks, we would need to bust
687                // this range into an unbounded number of sub chunks.
688                // It is not a problem to sync the same data mutiple times, so for simplicity we do
689                // not account for this, if the requested range goes beyond the sync
690                // block it starts within we assume that the rest is not synced.
691                return Some((Point::fuzzy(sync_block.end_slot), Point::fuzzy(end)));
692            }
693        }
694
695        let start_slot = if start == 0.into() {
696            Point::ORIGIN
697        } else {
698            Point::fuzzy(start)
699        };
700
701        Some((start_slot, Point::fuzzy(end)))
702    }
703}
704
705/// Start followers as per defined in the config
706pub(crate) async fn start_followers() -> anyhow::Result<()> {
707    let cfg = Settings::follower_cfg();
708
709    // Log the chain follower configuration.
710    cfg.log();
711
712    // Start Syncing the blockchain, so we can consume its data as required.
713    start_sync_for(&cfg).await?;
714
715    tokio::spawn(async move {
716        let mut sync_task = SyncTask::new(cfg);
717
718        sync_task.run().await;
719    });
720
721    Ok(())
722}