cat_gateway/db/index/queries/sync_status/
update.rs1use std::{sync::Arc, time::SystemTime};
4
5use cardano_chain_follower::Slot;
6use row::SyncStatusQueryParams;
7use scylla::{
8 client::session::Session, statement::prepared::PreparedStatement, value::CqlTimestamp,
9};
10use tokio::task;
11use tracing::{debug, error, warn};
12
13use crate::{
14 db::index::{
15 queries::{PreparedQueries, PreparedUpsertQuery},
16 session::CassandraSession,
17 },
18 service::utilities::convert::from_saturating,
19 settings::Settings,
20};
21
22const INSERT_SYNC_STATUS_QUERY: &str = include_str!("../cql/insert_sync_status.cql");
24
25pub(crate) mod row {
27 use scylla::{value::CqlTimestamp, DeserializeRow, SerializeRow};
28
29 use crate::db::types::DbSlot;
30
31 #[derive(SerializeRow, DeserializeRow, Debug)]
33 pub(crate) struct SyncStatusQueryParams {
34 pub(crate) end_slot: DbSlot,
36 pub(crate) start_slot: DbSlot,
38 pub(crate) sync_time: CqlTimestamp,
40 pub(crate) node_id: String,
42 }
43}
44
45impl SyncStatusQueryParams {
46 pub(crate) fn new(
48 end_slot: Slot,
49 start_slot: Slot,
50 ) -> Self {
51 let sync_time = match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
52 Ok(now) => now.as_millis(),
53 Err(_) => 0, };
55
56 Self {
57 end_slot: end_slot.into(),
58 start_slot: start_slot.into(),
59 sync_time: CqlTimestamp(from_saturating(sync_time)),
60 node_id: Settings::service_id().to_owned(),
61 }
62 }
63}
64
65pub(crate) struct SyncStatusInsertQuery;
67
68impl SyncStatusInsertQuery {
69 pub(crate) async fn prepare(session: Arc<Session>) -> anyhow::Result<PreparedStatement> {
71 PreparedQueries::prepare(
72 session,
73 INSERT_SYNC_STATUS_QUERY,
74 scylla::statement::Consistency::All,
75 true,
76 )
77 .await
78 .inspect_err(
79 |error| error!(error=%error, "Failed to prepare get Sync Status Insert query."),
80 )
81 .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_SYNC_STATUS_QUERY}"))
82 }
83
84 pub(crate) async fn execute(
86 session: &CassandraSession,
87 params: SyncStatusQueryParams,
88 ) -> anyhow::Result<()> {
89 session
90 .execute_upsert(PreparedUpsertQuery::SyncStatusInsert, params)
91 .await
92 }
93}
94
95pub(crate) fn update_sync_status(
104 end_slot: Slot,
105 start_slot: Slot,
106) {
107 task::spawn(async move {
108 let Some(session) = CassandraSession::get(true) else {
109 warn!(
110 start_slot = ?start_slot,
111 end_slot = ?end_slot,
112 "Failed to get Cassandra Session, trying to record indexing status"
113 );
114 return;
115 };
116
117 if let Err(err) = SyncStatusInsertQuery::execute(
118 &session,
119 SyncStatusQueryParams::new(end_slot, start_slot),
120 )
121 .await
122 {
123 warn!(
124 error=%err,
125 start_slot = ?start_slot,
126 end_slot = ?end_slot,
127 "Failed to store Sync Status"
128 );
129 }
130 debug!(start_slot = ?start_slot, end_slot = ?end_slot, "Sync Status updated");
131 });
132}