cat_gateway/db/index/session/
mod.rs

1//! Session creation and storage
2mod cache_manager;
3
4use std::{
5    fmt::Debug,
6    path::PathBuf,
7    sync::{Arc, LazyLock, Mutex, OnceLock},
8    time::Duration,
9};
10
11use cardano_chain_follower::Network;
12use openssl::ssl::{SslContextBuilder, SslFiletype, SslMethod, SslVerifyMode};
13use scylla::{
14    client::{
15        execution_profile::ExecutionProfile, pager::QueryPager, session::Session,
16        session_builder::SessionBuilder,
17    },
18    frame::Compression,
19    serialize::row::SerializeRow,
20};
21use thiserror::Error;
22use tokio::{fs, task::JoinHandle};
23use tracing::{debug, error, info};
24
25use super::{
26    queries::{
27        purge::{self, PreparedDeleteQuery},
28        FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery,
29        PreparedUpsertQuery,
30    },
31    schema::create_schema,
32};
33use crate::{
34    service::utilities::health::{index_db_is_live, set_index_db_liveness},
35    settings::{cassandra_db, Settings},
36};
37
38/// Configuration Choices for compression
39#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames)]
40#[strum(ascii_case_insensitive)]
41pub(crate) enum CompressionChoice {
42    /// LZ4 link data compression.
43    Lz4,
44    /// Snappy link data compression.
45    Snappy,
46    /// No compression.
47    None,
48}
49
50/// Configuration Choices for TLS.
51#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames, PartialEq)]
52#[strum(ascii_case_insensitive)]
53pub(crate) enum TlsChoice {
54    /// Disable TLS.
55    Disabled,
56    /// Verifies that the peer's certificate is trusted.
57    Verified,
58    /// Disables verification of the peer's certificate.
59    Unverified,
60}
61
62/// Represents errors that can occur while interacting with a Cassandra session.
63#[derive(Debug, Error)]
64pub(crate) enum CassandraSessionError {
65    /// Error when connecting to database.
66    #[error("Database connection failed: {source}")]
67    ConnectionUnavailable {
68        /// The underlying error that caused the connection failure.
69        #[source]
70        source: anyhow::Error,
71    },
72    /// Error when preparing queries fails.
73    #[error("Preparing queries failed: {source}")]
74    PreparingQueriesFailed {
75        /// The underlying error that caused query preparation to fail.
76        #[source]
77        source: anyhow::Error,
78    },
79    /// Should be used by the caller when it fails to acquire the initialized database
80    /// session.
81    #[error("Failed acquiring database session")]
82    FailedAcquiringSession,
83}
84
85/// All interaction with cassandra goes through this struct.
86#[derive(Clone)]
87pub(crate) struct CassandraSession {
88    /// Is the session to the persistent or volatile DB?
89    persistent: bool,
90    /// Configuration for this session.
91    cfg: Arc<cassandra_db::EnvVars>,
92    /// The actual session.
93    session: Arc<Session>,
94    /// All prepared queries we can use on this session.
95    queries: Arc<PreparedQueries>,
96    /// All prepared purge queries we can use on this session.
97    purge_queries: Arc<purge::PreparedQueries>,
98    /// Manager for all caches used in this session.
99    caches: Arc<cache_manager::Caches>,
100}
101
102/// Persistent DB Session.
103static PERSISTENT_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
104
105/// Volatile DB Session.
106static VOLATILE_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
107
108/// Background Index DB probe check
109static INDEX_DB_PROBE_TASK: LazyLock<Mutex<Option<JoinHandle<()>>>> =
110    LazyLock::new(|| Mutex::new(None));
111
112impl CassandraSession {
113    /// Initialise the Cassandra Cluster Connections.
114    /// Should be called only once.
115    pub(crate) fn init() {
116        let (persistent, volatile) = Settings::cassandra_db_cfg();
117        let network = Settings::cardano_network();
118
119        let _join_handle =
120            tokio::task::spawn(
121                async move { Box::pin(retry_init(persistent, network, true)).await },
122            );
123        let _join_handle =
124            tokio::task::spawn(async move { Box::pin(retry_init(volatile, network, false)).await });
125    }
126
127    /// Check to see if the Cassandra Indexing DB is ready for use
128    pub(crate) async fn is_ready() -> bool {
129        let persistent_ready = if let Some(cassandra) = PERSISTENT_SESSION.get() {
130            cassandra
131                .session
132                .refresh_metadata()
133                .await
134                .inspect_err(|e| {
135                    error!(error=%e, is_persistent=cassandra.is_persistent(), "Session connection failed");
136                })
137                .is_ok()
138        } else {
139            debug!(is_persistent = true, "Session has not been created");
140            false
141        };
142        let volatile_ready = if let Some(cassandra) = VOLATILE_SESSION.get() {
143            cassandra
144                .session
145                .refresh_metadata()
146                .await
147                .inspect_err(|e| {
148                    error!(error=%e, is_persistent=cassandra.is_persistent(), "Session connection failed");
149                })
150                .is_ok()
151        } else {
152            debug!(is_persistent = false, "Session has not been created");
153            false
154        };
155        let current_liveness = index_db_is_live();
156        let is_ready = persistent_ready && volatile_ready;
157        if is_ready {
158            if !current_liveness {
159                set_index_db_liveness(true);
160            }
161        } else if current_liveness {
162            set_index_db_liveness(false);
163        }
164        is_ready
165    }
166
167    /// Wait for the Cassandra Indexing DB to be ready before continuing
168    pub(crate) async fn wait_until_ready(interval: Duration) {
169        loop {
170            if Self::is_ready().await {
171                return;
172            }
173
174            tokio::time::sleep(interval).await;
175        }
176    }
177
178    /// Spawns a background task checking for the Index DB to be
179    /// ready.
180    /// Could spawn only one background task at a time
181    pub(crate) fn spawn_ready_probe() {
182        let spawning = || -> anyhow::Result<()> {
183            let mut task = INDEX_DB_PROBE_TASK
184                .lock()
185                .map_err(|e| anyhow::anyhow!("{e}"))?;
186
187            if task.as_ref().is_none_or(JoinHandle::is_finished) {
188                /// Index DB probe check wait interval
189                const INTERVAL: Duration = Duration::from_secs(1);
190
191                *task = Some(tokio::spawn(async move {
192                    Self::wait_until_ready(INTERVAL).await;
193                    info!("Index DB is ready");
194                }));
195            }
196            Ok(())
197        };
198
199        debug!("Waiting for the Index DB background probe check task to be spawned");
200        while let Err(e) = spawning() {
201            error!(error = ?e, "INDEX_DB_PROBE_TASK is poisoned, should never happen");
202            INDEX_DB_PROBE_TASK.clear_poison();
203        }
204    }
205
206    /// Get the session needed to perform a query.
207    pub(crate) fn get(persistent: bool) -> Option<Arc<CassandraSession>> {
208        if persistent {
209            PERSISTENT_SESSION.get().cloned()
210        } else {
211            VOLATILE_SESSION.get().cloned()
212        }
213    }
214
215    /// Executes a select query with the given parameters.
216    ///
217    /// Returns an iterator that iterates over all the result pages that the query
218    /// returns.
219    pub(crate) async fn execute_iter<P>(
220        &self,
221        select_query: PreparedSelectQuery,
222        params: P,
223    ) -> anyhow::Result<QueryPager>
224    where
225        P: SerializeRow,
226    {
227        let session = self.session.clone();
228        let queries = self.queries.clone();
229
230        queries.execute_iter(session, select_query, params).await
231    }
232
233    /// Execute a Batch query with the given parameters.
234    ///
235    /// Values should be a Vec of values which implement `SerializeRow` and they MUST be
236    /// the same, and must match the query being executed.
237    ///
238    /// This will divide the batch into optimal sized chunks and execute them until all
239    /// values have been executed or the first error is encountered.
240    pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
241        &self,
242        query: PreparedQuery,
243        values: Vec<T>,
244    ) -> FallibleQueryResults {
245        let session = self.session.clone();
246        let cfg = self.cfg.clone();
247        let queries = self.queries.clone();
248
249        queries.execute_batch(session, cfg, query, values).await
250    }
251
252    /// Execute a query which returns no results, except an error if it fails.
253    /// Can not be batched, takes a single set of parameters.
254    pub(crate) async fn execute_upsert<T: SerializeRow + Debug>(
255        &self,
256        query: PreparedUpsertQuery,
257        value: T,
258    ) -> anyhow::Result<()> {
259        let session = self.session.clone();
260        let queries = self.queries.clone();
261
262        queries.execute_upsert(session, query, value).await
263    }
264
265    /// Execute a purge query with the given parameters.
266    ///
267    /// Values should be a Vec of values which implement `SerializeRow` and they MUST be
268    /// the same, and must match the query being executed.
269    ///
270    /// This will divide the batch into optimal sized chunks and execute them until all
271    /// values have been executed or the first error is encountered.
272    ///
273    /// NOTE: This is currently only used to purge volatile data.
274    pub(crate) async fn purge_execute_batch<T: SerializeRow + Debug>(
275        &self,
276        query: PreparedDeleteQuery,
277        values: Vec<T>,
278    ) -> FallibleQueryResults {
279        // Only execute purge queries on the volatile session
280        let persistent = false;
281        let Some(volatile_db) = Self::get(persistent) else {
282            // This should never happen
283            anyhow::bail!("Volatile DB Session not found");
284        };
285        let cfg = self.cfg.clone();
286        let queries = self.purge_queries.clone();
287        let session = volatile_db.session.clone();
288
289        queries.execute_batch(session, cfg, query, values).await
290    }
291
292    /// Execute a select query to gather primary keys for purging.
293    pub(crate) async fn purge_execute_iter(
294        &self,
295        query: purge::PreparedSelectQuery,
296    ) -> anyhow::Result<QueryPager> {
297        // Only execute purge queries on the volatile session
298        let persistent = false;
299        let Some(volatile_db) = Self::get(persistent) else {
300            // This should never happen
301            anyhow::bail!("Volatile DB Session not found");
302        };
303        let queries = self.purge_queries.clone();
304
305        queries
306            .execute_iter(volatile_db.session.clone(), query)
307            .await
308    }
309
310    /// Get underlying Raw Cassandra Session.
311    pub(crate) fn get_raw_session(&self) -> Arc<Session> {
312        self.session.clone()
313    }
314
315    /// Returns `true` if the database session is persistent.
316    pub fn is_persistent(&self) -> bool {
317        self.persistent
318    }
319
320    /// Get a handle to the session Cache manager.
321    pub fn caches(&self) -> Arc<cache_manager::Caches> {
322        self.caches.clone()
323    }
324}
325
326/// Create a new execution profile based on the given configuration.
327///
328/// The intention here is that we should be able to tune this based on configuration,
329/// but for now we don't so the `cfg` is not used yet.
330fn make_execution_profile(_cfg: &cassandra_db::EnvVars) -> ExecutionProfile {
331    ExecutionProfile::builder()
332        .consistency(scylla::statement::Consistency::LocalQuorum)
333        .serial_consistency(Some(scylla::statement::SerialConsistency::LocalSerial))
334        .retry_policy(Arc::new(scylla::policies::retry::DefaultRetryPolicy::new()))
335        .load_balancing_policy(
336            scylla::policies::load_balancing::DefaultPolicyBuilder::new()
337                .permit_dc_failover(true)
338                .build(),
339        )
340        .speculative_execution_policy(Some(Arc::new(
341            scylla::policies::speculative_execution::SimpleSpeculativeExecutionPolicy {
342                max_retry_count: 3,
343                retry_interval: Duration::from_millis(100),
344            },
345        )))
346        .build()
347}
348
349/// Construct a session based on the given configuration.
350async fn make_session(cfg: &cassandra_db::EnvVars) -> anyhow::Result<Arc<Session>> {
351    let cluster_urls: Vec<&str> = cfg.url.as_str().split(',').collect();
352
353    let mut sb = SessionBuilder::new()
354        .known_nodes(cluster_urls)
355        .auto_await_schema_agreement(false);
356
357    let profile_handle = make_execution_profile(cfg).into_handle();
358    sb = sb.default_execution_profile_handle(profile_handle);
359
360    sb = match cfg.compression {
361        CompressionChoice::Lz4 => sb.compression(Some(Compression::Lz4)),
362        CompressionChoice::Snappy => sb.compression(Some(Compression::Snappy)),
363        CompressionChoice::None => sb.compression(None),
364    };
365
366    if cfg.tls != TlsChoice::Disabled {
367        let mut context_builder = SslContextBuilder::new(SslMethod::tls())?;
368
369        if let Some(cert_name) = &cfg.tls_cert {
370            let certdir = fs::canonicalize(PathBuf::from(cert_name.as_str())).await?;
371            context_builder.set_certificate_file(certdir.as_path(), SslFiletype::PEM)?;
372        }
373
374        if cfg.tls == TlsChoice::Verified {
375            context_builder.set_verify(SslVerifyMode::PEER);
376        } else {
377            context_builder.set_verify(SslVerifyMode::NONE);
378        }
379
380        let ssl_context = context_builder.build();
381
382        sb = sb.tls_context(Some(ssl_context));
383    }
384
385    // Set the username and password, if required.
386    if let Some(username) = &cfg.username {
387        if let Some(password) = &cfg.password {
388            sb = sb.user(username.as_str(), password.as_str());
389        }
390    }
391
392    let session = Box::pin(sb.build()).await?;
393
394    Ok(Arc::new(session))
395}
396
397/// Continuously try and init the DB, if it fails, backoff.
398///
399/// Display reasonable logs to help diagnose DB connection issues.
400async fn retry_init(
401    cfg: cassandra_db::EnvVars,
402    network: Network,
403    persistent: bool,
404) {
405    let mut retry_delay = Duration::from_secs(0);
406    let db_type = if persistent { "Persistent" } else { "Volatile" };
407
408    info!(
409        db_type = db_type,
410        network = %network,
411        "Index DB Session Creation: Started."
412    );
413
414    cfg.log(persistent, network);
415
416    loop {
417        tokio::time::sleep(retry_delay).await;
418        retry_delay = Duration::from_secs(30); // 30 seconds if we every try again.
419
420        info!(
421            db_type = db_type,
422            network = %network,
423            "Attempting to connect to Cassandra DB..."
424        );
425
426        // Create a Session to the Cassandra DB.
427        let session = match make_session(&cfg).await {
428            Ok(session) => session,
429            Err(error) => {
430                error!(
431                    db_type = db_type,
432                    network = %network,
433                    error = format!("{error:?}"),
434                    "Failed to Create Cassandra DB Session"
435                );
436                continue;
437            },
438        };
439
440        // Set up the Schema for it.
441        if let Err(error) = create_schema(&mut session.clone(), &cfg, persistent, network).await {
442            error!(
443                db_type = db_type,
444                network = %network,
445                error = format!("{error:?}"),
446                "Failed to Create Cassandra DB Schema"
447            );
448            continue;
449        }
450
451        let queries = match PreparedQueries::new(session.clone(), &cfg).await {
452            Ok(queries) => Arc::new(queries),
453            Err(error) => {
454                error!(
455                    db_type = db_type,
456                    network = %network,
457                    error = %error,
458                    "Failed to Create Cassandra Prepared Queries"
459                );
460                continue;
461            },
462        };
463
464        let purge_queries = match Box::pin(purge::PreparedQueries::new(session.clone(), &cfg)).await
465        {
466            Ok(queries) => Arc::new(queries),
467            Err(error) => {
468                error!(
469                    db_type = db_type,
470                    network = %network,
471                    error = %error,
472                    "Failed to Create Cassandra Prepared Purge Queries"
473                );
474                continue;
475            },
476        };
477
478        let cassandra_session = CassandraSession {
479            persistent,
480            cfg: Arc::new(cfg),
481            session,
482            queries,
483            purge_queries,
484            caches: Arc::new(cache_manager::Caches::new(persistent)),
485        };
486
487        // Save the session so we can execute queries on the DB
488        if persistent {
489            if PERSISTENT_SESSION.set(Arc::new(cassandra_session)).is_err() {
490                error!("Persistent Session already set.  This should not happen.");
491            }
492        } else if VOLATILE_SESSION.set(Arc::new(cassandra_session)).is_err() {
493            error!("Volatile Session already set.  This should not happen.");
494        }
495
496        // IF we get here, then everything seems to have worked, so finish init.
497        break;
498    }
499
500    info!(db_type = db_type, network = %network, "Index DB Session Creation: OK.");
501}