cat_gateway/db/index/session/
mod.rs

1//! Session creation and storage
2mod cache_manager;
3
4use std::{
5    fmt::Debug,
6    path::PathBuf,
7    sync::{Arc, OnceLock},
8    time::Duration,
9};
10
11use cardano_chain_follower::Network;
12use openssl::ssl::{SslContextBuilder, SslFiletype, SslMethod, SslVerifyMode};
13use scylla::{
14    client::{
15        execution_profile::ExecutionProfile, pager::QueryPager, session::Session,
16        session_builder::SessionBuilder,
17    },
18    frame::Compression,
19    serialize::row::SerializeRow,
20};
21use thiserror::Error;
22use tokio::fs;
23use tracing::{debug, error, info};
24
25use super::{
26    queries::{
27        purge::{self, PreparedDeleteQuery},
28        FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery,
29        PreparedUpsertQuery,
30    },
31    schema::create_schema,
32};
33use crate::{
34    service::utilities::health::{index_db_is_live, set_index_db_liveness},
35    settings::{cassandra_db, Settings},
36};
37
38/// Configuration Choices for compression
39#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames)]
40#[strum(ascii_case_insensitive)]
41pub(crate) enum CompressionChoice {
42    /// LZ4 link data compression.
43    Lz4,
44    /// Snappy link data compression.
45    Snappy,
46    /// No compression.
47    None,
48}
49
50/// Configuration Choices for TLS.
51#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames, PartialEq)]
52#[strum(ascii_case_insensitive)]
53pub(crate) enum TlsChoice {
54    /// Disable TLS.
55    Disabled,
56    /// Verifies that the peer's certificate is trusted.
57    Verified,
58    /// Disables verification of the peer's certificate.
59    Unverified,
60}
61
62/// Represents errors that can occur while interacting with a Cassandra session.
63#[derive(Debug, Error)]
64pub(crate) enum CassandraSessionError {
65    /// Error when connecting to database.
66    #[error("Database connection failed: {source}")]
67    ConnectionUnavailable {
68        /// The underlying error that caused the connection failure.
69        #[source]
70        source: anyhow::Error,
71    },
72    /// Error when preparing queries fails.
73    #[error("Preparing queries failed: {source}")]
74    PreparingQueriesFailed {
75        /// The underlying error that caused query preparation to fail.
76        #[source]
77        source: anyhow::Error,
78    },
79    /// Should be used by the caller when it fails to acquire the initialized database
80    /// session.
81    #[error("Failed acquiring database session")]
82    FailedAcquiringSession,
83}
84
85/// All interaction with cassandra goes through this struct.
86#[derive(Clone)]
87pub(crate) struct CassandraSession {
88    /// Is the session to the persistent or volatile DB?
89    persistent: bool,
90    /// Configuration for this session.
91    cfg: Arc<cassandra_db::EnvVars>,
92    /// The actual session.
93    session: Arc<Session>,
94    /// All prepared queries we can use on this session.
95    queries: Arc<PreparedQueries>,
96    /// All prepared purge queries we can use on this session.
97    purge_queries: Arc<purge::PreparedQueries>,
98    /// Manager for all caches used in this session.
99    caches: Arc<cache_manager::Caches>,
100}
101
102/// Persistent DB Session.
103static PERSISTENT_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
104
105/// Volatile DB Session.
106static VOLATILE_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
107
108impl CassandraSession {
109    /// Initialise the Cassandra Cluster Connections.
110    /// Should be called only once.
111    pub(crate) fn init() {
112        let (persistent, volatile) = Settings::cassandra_db_cfg();
113        let network = Settings::cardano_network();
114
115        let _join_handle =
116            tokio::task::spawn(
117                async move { Box::pin(retry_init(persistent, network, true)).await },
118            );
119        let _join_handle =
120            tokio::task::spawn(async move { Box::pin(retry_init(volatile, network, false)).await });
121    }
122
123    /// Check to see if the Cassandra Indexing DB is ready for use
124    pub(crate) async fn is_ready() -> bool {
125        let persistent_ready = if let Some(cassandra) = PERSISTENT_SESSION.get() {
126            cassandra
127                .session
128                .refresh_metadata()
129                .await
130                .inspect_err(|e| {
131                    error!(error=%e, is_persistent=cassandra.is_persistent(), "Session connection failed");
132                })
133                .is_ok()
134        } else {
135            debug!(is_persistent = true, "Session has not been created");
136            false
137        };
138        let volatile_ready = if let Some(cassandra) = VOLATILE_SESSION.get() {
139            cassandra
140                .session
141                .refresh_metadata()
142                .await
143                .inspect_err(|e| {
144                    error!(error=%e, is_persistent=cassandra.is_persistent(), "Session connection failed");
145                })
146                .is_ok()
147        } else {
148            debug!(is_persistent = false, "Session has not been created");
149            false
150        };
151        let current_liveness = index_db_is_live();
152        let is_ready = persistent_ready && volatile_ready;
153        if is_ready {
154            if !current_liveness {
155                set_index_db_liveness(true);
156            }
157        } else if current_liveness {
158            set_index_db_liveness(false);
159        }
160        is_ready
161    }
162
163    /// Wait for the Cassandra Indexing DB to be ready before continuing
164    pub(crate) async fn wait_until_ready(interval: Duration) {
165        loop {
166            if Self::is_ready().await {
167                return;
168            }
169
170            tokio::time::sleep(interval).await;
171        }
172    }
173
174    /// Get the session needed to perform a query.
175    pub(crate) fn get(persistent: bool) -> Option<Arc<CassandraSession>> {
176        if persistent {
177            PERSISTENT_SESSION.get().cloned()
178        } else {
179            VOLATILE_SESSION.get().cloned()
180        }
181    }
182
183    /// Executes a select query with the given parameters.
184    ///
185    /// Returns an iterator that iterates over all the result pages that the query
186    /// returns.
187    pub(crate) async fn execute_iter<P>(
188        &self, select_query: PreparedSelectQuery, params: P,
189    ) -> anyhow::Result<QueryPager>
190    where P: SerializeRow {
191        let session = self.session.clone();
192        let queries = self.queries.clone();
193
194        queries.execute_iter(session, select_query, params).await
195    }
196
197    /// Execute a Batch query with the given parameters.
198    ///
199    /// Values should be a Vec of values which implement `SerializeRow` and they MUST be
200    /// the same, and must match the query being executed.
201    ///
202    /// This will divide the batch into optimal sized chunks and execute them until all
203    /// values have been executed or the first error is encountered.
204    pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
205        &self, query: PreparedQuery, values: Vec<T>,
206    ) -> FallibleQueryResults {
207        let session = self.session.clone();
208        let cfg = self.cfg.clone();
209        let queries = self.queries.clone();
210
211        queries.execute_batch(session, cfg, query, values).await
212    }
213
214    /// Execute a query which returns no results, except an error if it fails.
215    /// Can not be batched, takes a single set of parameters.
216    pub(crate) async fn execute_upsert<T: SerializeRow + Debug>(
217        &self, query: PreparedUpsertQuery, value: T,
218    ) -> anyhow::Result<()> {
219        let session = self.session.clone();
220        let queries = self.queries.clone();
221
222        queries.execute_upsert(session, query, value).await
223    }
224
225    /// Execute a purge query with the given parameters.
226    ///
227    /// Values should be a Vec of values which implement `SerializeRow` and they MUST be
228    /// the same, and must match the query being executed.
229    ///
230    /// This will divide the batch into optimal sized chunks and execute them until all
231    /// values have been executed or the first error is encountered.
232    ///
233    /// NOTE: This is currently only used to purge volatile data.
234    pub(crate) async fn purge_execute_batch<T: SerializeRow + Debug>(
235        &self, query: PreparedDeleteQuery, values: Vec<T>,
236    ) -> FallibleQueryResults {
237        // Only execute purge queries on the volatile session
238        let persistent = false;
239        let Some(volatile_db) = Self::get(persistent) else {
240            // This should never happen
241            anyhow::bail!("Volatile DB Session not found");
242        };
243        let cfg = self.cfg.clone();
244        let queries = self.purge_queries.clone();
245        let session = volatile_db.session.clone();
246
247        queries.execute_batch(session, cfg, query, values).await
248    }
249
250    /// Execute a select query to gather primary keys for purging.
251    pub(crate) async fn purge_execute_iter(
252        &self, query: purge::PreparedSelectQuery,
253    ) -> anyhow::Result<QueryPager> {
254        // Only execute purge queries on the volatile session
255        let persistent = false;
256        let Some(volatile_db) = Self::get(persistent) else {
257            // This should never happen
258            anyhow::bail!("Volatile DB Session not found");
259        };
260        let queries = self.purge_queries.clone();
261
262        queries
263            .execute_iter(volatile_db.session.clone(), query)
264            .await
265    }
266
267    /// Get underlying Raw Cassandra Session.
268    pub(crate) fn get_raw_session(&self) -> Arc<Session> {
269        self.session.clone()
270    }
271
272    /// Returns `true` if the database session is persistent.
273    pub fn is_persistent(&self) -> bool {
274        self.persistent
275    }
276
277    /// Get a handle to the session Cache manager.
278    pub fn caches(&self) -> Arc<cache_manager::Caches> {
279        self.caches.clone()
280    }
281}
282
283/// Create a new execution profile based on the given configuration.
284///
285/// The intention here is that we should be able to tune this based on configuration,
286/// but for now we don't so the `cfg` is not used yet.
287fn make_execution_profile(_cfg: &cassandra_db::EnvVars) -> ExecutionProfile {
288    ExecutionProfile::builder()
289        .consistency(scylla::statement::Consistency::LocalQuorum)
290        .serial_consistency(Some(scylla::statement::SerialConsistency::LocalSerial))
291        .retry_policy(Arc::new(scylla::policies::retry::DefaultRetryPolicy::new()))
292        .load_balancing_policy(
293            scylla::policies::load_balancing::DefaultPolicyBuilder::new()
294                .permit_dc_failover(true)
295                .build(),
296        )
297        .speculative_execution_policy(Some(Arc::new(
298            scylla::policies::speculative_execution::SimpleSpeculativeExecutionPolicy {
299                max_retry_count: 3,
300                retry_interval: Duration::from_millis(100),
301            },
302        )))
303        .build()
304}
305
306/// Construct a session based on the given configuration.
307async fn make_session(cfg: &cassandra_db::EnvVars) -> anyhow::Result<Arc<Session>> {
308    let cluster_urls: Vec<&str> = cfg.url.as_str().split(',').collect();
309
310    let mut sb = SessionBuilder::new()
311        .known_nodes(cluster_urls)
312        .auto_await_schema_agreement(false);
313
314    let profile_handle = make_execution_profile(cfg).into_handle();
315    sb = sb.default_execution_profile_handle(profile_handle);
316
317    sb = match cfg.compression {
318        CompressionChoice::Lz4 => sb.compression(Some(Compression::Lz4)),
319        CompressionChoice::Snappy => sb.compression(Some(Compression::Snappy)),
320        CompressionChoice::None => sb.compression(None),
321    };
322
323    if cfg.tls != TlsChoice::Disabled {
324        let mut context_builder = SslContextBuilder::new(SslMethod::tls())?;
325
326        if let Some(cert_name) = &cfg.tls_cert {
327            let certdir = fs::canonicalize(PathBuf::from(cert_name.as_str())).await?;
328            context_builder.set_certificate_file(certdir.as_path(), SslFiletype::PEM)?;
329        }
330
331        if cfg.tls == TlsChoice::Verified {
332            context_builder.set_verify(SslVerifyMode::PEER);
333        } else {
334            context_builder.set_verify(SslVerifyMode::NONE);
335        }
336
337        let ssl_context = context_builder.build();
338
339        sb = sb.tls_context(Some(ssl_context));
340    }
341
342    // Set the username and password, if required.
343    if let Some(username) = &cfg.username {
344        if let Some(password) = &cfg.password {
345            sb = sb.user(username.as_str(), password.as_str());
346        }
347    }
348
349    let session = Box::pin(sb.build()).await?;
350
351    Ok(Arc::new(session))
352}
353
354/// Continuously try and init the DB, if it fails, backoff.
355///
356/// Display reasonable logs to help diagnose DB connection issues.
357async fn retry_init(cfg: cassandra_db::EnvVars, network: Network, persistent: bool) {
358    let mut retry_delay = Duration::from_secs(0);
359    let db_type = if persistent { "Persistent" } else { "Volatile" };
360
361    info!(
362        db_type = db_type,
363        network = %network,
364        "Index DB Session Creation: Started."
365    );
366
367    cfg.log(persistent, network);
368
369    loop {
370        tokio::time::sleep(retry_delay).await;
371        retry_delay = Duration::from_secs(30); // 30 seconds if we every try again.
372
373        info!(
374            db_type = db_type,
375            network = %network,
376            "Attempting to connect to Cassandra DB..."
377        );
378
379        // Create a Session to the Cassandra DB.
380        let session = match make_session(&cfg).await {
381            Ok(session) => session,
382            Err(error) => {
383                error!(
384                    db_type = db_type,
385                    network = %network,
386                    error = format!("{error:?}"),
387                    "Failed to Create Cassandra DB Session"
388                );
389                continue;
390            },
391        };
392
393        // Set up the Schema for it.
394        if let Err(error) = create_schema(&mut session.clone(), &cfg, persistent, network).await {
395            error!(
396                db_type = db_type,
397                network = %network,
398                error = format!("{error:?}"),
399                "Failed to Create Cassandra DB Schema"
400            );
401            continue;
402        }
403
404        let queries = match PreparedQueries::new(session.clone(), &cfg).await {
405            Ok(queries) => Arc::new(queries),
406            Err(error) => {
407                error!(
408                    db_type = db_type,
409                    network = %network,
410                    error = %error,
411                    "Failed to Create Cassandra Prepared Queries"
412                );
413                continue;
414            },
415        };
416
417        let purge_queries = match Box::pin(purge::PreparedQueries::new(session.clone(), &cfg)).await
418        {
419            Ok(queries) => Arc::new(queries),
420            Err(error) => {
421                error!(
422                    db_type = db_type,
423                    network = %network,
424                    error = %error,
425                    "Failed to Create Cassandra Prepared Purge Queries"
426                );
427                continue;
428            },
429        };
430
431        let cassandra_session = CassandraSession {
432            persistent,
433            cfg: Arc::new(cfg),
434            session,
435            queries,
436            purge_queries,
437            caches: Arc::new(cache_manager::Caches::new(persistent)),
438        };
439
440        // Save the session so we can execute queries on the DB
441        if persistent {
442            if PERSISTENT_SESSION.set(Arc::new(cassandra_session)).is_err() {
443                error!("Persistent Session already set.  This should not happen.");
444            };
445        } else if VOLATILE_SESSION.set(Arc::new(cassandra_session)).is_err() {
446            error!("Volatile Session already set.  This should not happen.");
447        };
448
449        // IF we get here, then everything seems to have worked, so finish init.
450        break;
451    }
452
453    info!(db_type = db_type, network = %network, "Index DB Session Creation: OK.");
454}