cat_gateway/db/index/queries/sync_status/
update.rs

1//! Read and write the synchronisation status.
2
3use std::{sync::Arc, time::SystemTime};
4
5use cardano_chain_follower::Slot;
6use row::SyncStatusQueryParams;
7use scylla::{
8    client::session::Session, statement::prepared::PreparedStatement, value::CqlTimestamp,
9};
10use tokio::task;
11use tracing::{debug, error, warn};
12
13use crate::{
14    db::index::{
15        queries::{PreparedQueries, PreparedUpsertQuery},
16        session::CassandraSession,
17    },
18    service::utilities::convert::from_saturating,
19    settings::Settings,
20};
21
22/// Insert Sync Status query string.
23const INSERT_SYNC_STATUS_QUERY: &str = include_str!("../cql/insert_sync_status.cql");
24
25/// Sync Status Row Record Module
26pub(crate) mod row {
27    use scylla::{value::CqlTimestamp, DeserializeRow, SerializeRow};
28
29    use crate::db::types::DbSlot;
30
31    /// Sync Status Record Row (used for both Insert and Query response)
32    #[derive(SerializeRow, DeserializeRow, Debug)]
33    pub(crate) struct SyncStatusQueryParams {
34        /// End Slot.
35        pub(crate) end_slot: DbSlot,
36        /// Start Slot.
37        pub(crate) start_slot: DbSlot,
38        /// Sync Time.
39        pub(crate) sync_time: CqlTimestamp,
40        /// Node ID
41        pub(crate) node_id: String,
42    }
43}
44
45impl SyncStatusQueryParams {
46    /// Create a new instance of [`SyncStatusQueryParams`]
47    pub(crate) fn new(
48        end_slot: Slot,
49        start_slot: Slot,
50    ) -> Self {
51        let sync_time = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
52            Ok(now) => now.as_millis(),
53            Err(_) => 0, // Shouldn't actually happen.
54        };
55
56        Self {
57            end_slot: end_slot.into(),
58            start_slot: start_slot.into(),
59            sync_time: CqlTimestamp(from_saturating(sync_time)),
60            node_id: Settings::service_id().to_owned(),
61        }
62    }
63}
64
65/// Sync Status Insert query.
66pub(crate) struct SyncStatusInsertQuery;
67
68impl SyncStatusInsertQuery {
69    /// Prepares a Sync Status Insert query.
70    pub(crate) async fn prepare(session: Arc<Session>) -> anyhow::Result<PreparedStatement> {
71        PreparedQueries::prepare(
72            session,
73            INSERT_SYNC_STATUS_QUERY,
74            scylla::statement::Consistency::All,
75            true,
76        )
77        .await
78        .inspect_err(
79            |error| error!(error=%error, "Failed to prepare get Sync Status Insert query."),
80        )
81        .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_SYNC_STATUS_QUERY}"))
82    }
83
84    /// Executes a sync status insert query.
85    pub(crate) async fn execute(
86        session: &CassandraSession,
87        params: SyncStatusQueryParams,
88    ) -> anyhow::Result<()> {
89        session
90            .execute_upsert(PreparedUpsertQuery::SyncStatusInsert, params)
91            .await
92    }
93}
94
95/// Update the sync status of the immutable database.
96///
97/// Note: There is no need to update the sync status of the volatile database.
98///
99/// Regarding failures:
100/// Failures of this function to record status, fail safely.
101/// This data is only used to recover sync
102/// There fore this function is both fire and forget, and returns no status.
103pub(crate) fn update_sync_status(
104    end_slot: Slot,
105    start_slot: Slot,
106) {
107    task::spawn(async move {
108        let Some(session) = CassandraSession::get(true) else {
109            warn!(
110                start_slot = ?start_slot,
111                end_slot = ?end_slot,
112                "Failed to get Cassandra Session, trying to record indexing status"
113            );
114            return;
115        };
116
117        if let Err(err) = SyncStatusInsertQuery::execute(
118            &session,
119            SyncStatusQueryParams::new(end_slot, start_slot),
120        )
121        .await
122        {
123            warn!(
124                error=%err,
125                start_slot = ?start_slot,
126                end_slot = ?end_slot,
127                "Failed to store Sync Status"
128            );
129        }
130        debug!(start_slot = ?start_slot, end_slot = ?end_slot, "Sync Status updated");
131    });
132}