cat_gateway/db/index/session/
mod.rs1mod cache_manager;
3
4use std::{
5 fmt::Debug,
6 path::PathBuf,
7 sync::{Arc, LazyLock, Mutex, OnceLock},
8 time::Duration,
9};
10
11use cardano_chain_follower::Network;
12use openssl::ssl::{SslContextBuilder, SslFiletype, SslMethod, SslVerifyMode};
13use scylla::{
14 client::{
15 execution_profile::ExecutionProfile, pager::QueryPager, session::Session,
16 session_builder::SessionBuilder,
17 },
18 frame::Compression,
19 serialize::row::SerializeRow,
20};
21use thiserror::Error;
22use tokio::{fs, task::JoinHandle};
23use tracing::{debug, error, info};
24
25use super::{
26 queries::{
27 purge::{self, PreparedDeleteQuery},
28 FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery,
29 PreparedUpsertQuery,
30 },
31 schema::create_schema,
32};
33use crate::{
34 service::utilities::health::{index_db_is_live, set_index_db_liveness},
35 settings::{cassandra_db, Settings},
36};
37
38#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames)]
40#[strum(ascii_case_insensitive)]
41pub(crate) enum CompressionChoice {
42 Lz4,
44 Snappy,
46 None,
48}
49
50#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames, PartialEq)]
52#[strum(ascii_case_insensitive)]
53pub(crate) enum TlsChoice {
54 Disabled,
56 Verified,
58 Unverified,
60}
61
62#[derive(Debug, Error)]
64pub(crate) enum CassandraSessionError {
65 #[error("Database connection failed: {source}")]
67 ConnectionUnavailable {
68 #[source]
70 source: anyhow::Error,
71 },
72 #[error("Preparing queries failed: {source}")]
74 PreparingQueriesFailed {
75 #[source]
77 source: anyhow::Error,
78 },
79 #[error("Failed acquiring database session")]
82 FailedAcquiringSession,
83}
84
85#[derive(Clone)]
87pub(crate) struct CassandraSession {
88 persistent: bool,
90 cfg: Arc<cassandra_db::EnvVars>,
92 session: Arc<Session>,
94 queries: Arc<PreparedQueries>,
96 purge_queries: Arc<purge::PreparedQueries>,
98 caches: Arc<cache_manager::Caches>,
100}
101
102static PERSISTENT_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
104
105static VOLATILE_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
107
108static INDEX_DB_PROBE_TASK: LazyLock<Mutex<Option<JoinHandle<()>>>> =
110 LazyLock::new(|| Mutex::new(None));
111
112impl CassandraSession {
113 pub(crate) fn init() {
116 let (persistent, volatile) = Settings::cassandra_db_cfg();
117 let network = Settings::cardano_network();
118
119 let _join_handle =
120 tokio::task::spawn(
121 async move { Box::pin(retry_init(persistent, network, true)).await },
122 );
123 let _join_handle =
124 tokio::task::spawn(async move { Box::pin(retry_init(volatile, network, false)).await });
125 }
126
127 pub(crate) async fn is_ready() -> bool {
129 let persistent_ready = if let Some(cassandra) = PERSISTENT_SESSION.get() {
130 cassandra
131 .session
132 .refresh_metadata()
133 .await
134 .inspect_err(|e| {
135 error!(error=%e, is_persistent=cassandra.is_persistent(), "Session connection failed");
136 })
137 .is_ok()
138 } else {
139 debug!(is_persistent = true, "Session has not been created");
140 false
141 };
142 let volatile_ready = if let Some(cassandra) = VOLATILE_SESSION.get() {
143 cassandra
144 .session
145 .refresh_metadata()
146 .await
147 .inspect_err(|e| {
148 error!(error=%e, is_persistent=cassandra.is_persistent(), "Session connection failed");
149 })
150 .is_ok()
151 } else {
152 debug!(is_persistent = false, "Session has not been created");
153 false
154 };
155 let current_liveness = index_db_is_live();
156 let is_ready = persistent_ready && volatile_ready;
157 if is_ready {
158 if !current_liveness {
159 set_index_db_liveness(true);
160 }
161 } else if current_liveness {
162 set_index_db_liveness(false);
163 }
164 is_ready
165 }
166
167 pub(crate) async fn wait_until_ready(interval: Duration) {
169 loop {
170 if Self::is_ready().await {
171 return;
172 }
173
174 tokio::time::sleep(interval).await;
175 }
176 }
177
178 pub(crate) fn spawn_ready_probe() {
182 let spawning = || -> anyhow::Result<()> {
183 let mut task = INDEX_DB_PROBE_TASK
184 .lock()
185 .map_err(|e| anyhow::anyhow!("{e}"))?;
186
187 if task.as_ref().is_none_or(JoinHandle::is_finished) {
188 const INTERVAL: Duration = Duration::from_secs(1);
190
191 *task = Some(tokio::spawn(async move {
192 Self::wait_until_ready(INTERVAL).await;
193 info!("Index DB is ready");
194 }));
195 }
196 Ok(())
197 };
198
199 debug!("Waiting for the Index DB background probe check task to be spawned");
200 while let Err(e) = spawning() {
201 error!(error = ?e, "INDEX_DB_PROBE_TASK is poisoned, should never happen");
202 INDEX_DB_PROBE_TASK.clear_poison();
203 }
204 }
205
206 pub(crate) fn get(persistent: bool) -> Option<Arc<CassandraSession>> {
208 if persistent {
209 PERSISTENT_SESSION.get().cloned()
210 } else {
211 VOLATILE_SESSION.get().cloned()
212 }
213 }
214
215 pub(crate) async fn execute_iter<P>(
220 &self,
221 select_query: PreparedSelectQuery,
222 params: P,
223 ) -> anyhow::Result<QueryPager>
224 where
225 P: SerializeRow,
226 {
227 let session = self.session.clone();
228 let queries = self.queries.clone();
229
230 queries.execute_iter(session, select_query, params).await
231 }
232
233 pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
241 &self,
242 query: PreparedQuery,
243 values: Vec<T>,
244 ) -> FallibleQueryResults {
245 let session = self.session.clone();
246 let cfg = self.cfg.clone();
247 let queries = self.queries.clone();
248
249 queries.execute_batch(session, cfg, query, values).await
250 }
251
252 pub(crate) async fn execute_upsert<T: SerializeRow + Debug>(
255 &self,
256 query: PreparedUpsertQuery,
257 value: T,
258 ) -> anyhow::Result<()> {
259 let session = self.session.clone();
260 let queries = self.queries.clone();
261
262 queries.execute_upsert(session, query, value).await
263 }
264
265 pub(crate) async fn purge_execute_batch<T: SerializeRow + Debug>(
275 &self,
276 query: PreparedDeleteQuery,
277 values: Vec<T>,
278 ) -> FallibleQueryResults {
279 let persistent = false;
281 let Some(volatile_db) = Self::get(persistent) else {
282 anyhow::bail!("Volatile DB Session not found");
284 };
285 let cfg = self.cfg.clone();
286 let queries = self.purge_queries.clone();
287 let session = volatile_db.session.clone();
288
289 queries.execute_batch(session, cfg, query, values).await
290 }
291
292 pub(crate) async fn purge_execute_iter(
294 &self,
295 query: purge::PreparedSelectQuery,
296 ) -> anyhow::Result<QueryPager> {
297 let persistent = false;
299 let Some(volatile_db) = Self::get(persistent) else {
300 anyhow::bail!("Volatile DB Session not found");
302 };
303 let queries = self.purge_queries.clone();
304
305 queries
306 .execute_iter(volatile_db.session.clone(), query)
307 .await
308 }
309
310 pub(crate) fn get_raw_session(&self) -> Arc<Session> {
312 self.session.clone()
313 }
314
315 pub fn is_persistent(&self) -> bool {
317 self.persistent
318 }
319
320 pub fn caches(&self) -> Arc<cache_manager::Caches> {
322 self.caches.clone()
323 }
324}
325
326fn make_execution_profile(_cfg: &cassandra_db::EnvVars) -> ExecutionProfile {
331 ExecutionProfile::builder()
332 .consistency(scylla::statement::Consistency::LocalQuorum)
333 .serial_consistency(Some(scylla::statement::SerialConsistency::LocalSerial))
334 .retry_policy(Arc::new(scylla::policies::retry::DefaultRetryPolicy::new()))
335 .load_balancing_policy(
336 scylla::policies::load_balancing::DefaultPolicyBuilder::new()
337 .permit_dc_failover(true)
338 .build(),
339 )
340 .speculative_execution_policy(Some(Arc::new(
341 scylla::policies::speculative_execution::SimpleSpeculativeExecutionPolicy {
342 max_retry_count: 3,
343 retry_interval: Duration::from_millis(100),
344 },
345 )))
346 .build()
347}
348
349async fn make_session(cfg: &cassandra_db::EnvVars) -> anyhow::Result<Arc<Session>> {
351 let cluster_urls: Vec<&str> = cfg.url.as_str().split(',').collect();
352
353 let mut sb = SessionBuilder::new()
354 .known_nodes(cluster_urls)
355 .auto_await_schema_agreement(false);
356
357 let profile_handle = make_execution_profile(cfg).into_handle();
358 sb = sb.default_execution_profile_handle(profile_handle);
359
360 sb = match cfg.compression {
361 CompressionChoice::Lz4 => sb.compression(Some(Compression::Lz4)),
362 CompressionChoice::Snappy => sb.compression(Some(Compression::Snappy)),
363 CompressionChoice::None => sb.compression(None),
364 };
365
366 if cfg.tls != TlsChoice::Disabled {
367 let mut context_builder = SslContextBuilder::new(SslMethod::tls())?;
368
369 if let Some(cert_name) = &cfg.tls_cert {
370 let certdir = fs::canonicalize(PathBuf::from(cert_name.as_str())).await?;
371 context_builder.set_certificate_file(certdir.as_path(), SslFiletype::PEM)?;
372 }
373
374 if cfg.tls == TlsChoice::Verified {
375 context_builder.set_verify(SslVerifyMode::PEER);
376 } else {
377 context_builder.set_verify(SslVerifyMode::NONE);
378 }
379
380 let ssl_context = context_builder.build();
381
382 sb = sb.tls_context(Some(ssl_context));
383 }
384
385 if let Some(username) = &cfg.username {
387 if let Some(password) = &cfg.password {
388 sb = sb.user(username.as_str(), password.as_str());
389 }
390 }
391
392 let session = Box::pin(sb.build()).await?;
393
394 Ok(Arc::new(session))
395}
396
397async fn retry_init(
401 cfg: cassandra_db::EnvVars,
402 network: Network,
403 persistent: bool,
404) {
405 let mut retry_delay = Duration::from_secs(0);
406 let db_type = if persistent { "Persistent" } else { "Volatile" };
407
408 info!(
409 db_type = db_type,
410 network = %network,
411 "Index DB Session Creation: Started."
412 );
413
414 cfg.log(persistent, network);
415
416 loop {
417 tokio::time::sleep(retry_delay).await;
418 retry_delay = Duration::from_secs(30); info!(
421 db_type = db_type,
422 network = %network,
423 "Attempting to connect to Cassandra DB..."
424 );
425
426 let session = match make_session(&cfg).await {
428 Ok(session) => session,
429 Err(error) => {
430 error!(
431 db_type = db_type,
432 network = %network,
433 error = format!("{error:?}"),
434 "Failed to Create Cassandra DB Session"
435 );
436 continue;
437 },
438 };
439
440 if let Err(error) = create_schema(&mut session.clone(), &cfg, persistent, network).await {
442 error!(
443 db_type = db_type,
444 network = %network,
445 error = format!("{error:?}"),
446 "Failed to Create Cassandra DB Schema"
447 );
448 continue;
449 }
450
451 let queries = match PreparedQueries::new(session.clone(), &cfg).await {
452 Ok(queries) => Arc::new(queries),
453 Err(error) => {
454 error!(
455 db_type = db_type,
456 network = %network,
457 error = %error,
458 "Failed to Create Cassandra Prepared Queries"
459 );
460 continue;
461 },
462 };
463
464 let purge_queries = match Box::pin(purge::PreparedQueries::new(session.clone(), &cfg)).await
465 {
466 Ok(queries) => Arc::new(queries),
467 Err(error) => {
468 error!(
469 db_type = db_type,
470 network = %network,
471 error = %error,
472 "Failed to Create Cassandra Prepared Purge Queries"
473 );
474 continue;
475 },
476 };
477
478 let cassandra_session = CassandraSession {
479 persistent,
480 cfg: Arc::new(cfg),
481 session,
482 queries,
483 purge_queries,
484 caches: Arc::new(cache_manager::Caches::new(persistent)),
485 };
486
487 if persistent {
489 if PERSISTENT_SESSION.set(Arc::new(cassandra_session)).is_err() {
490 error!("Persistent Session already set. This should not happen.");
491 }
492 } else if VOLATILE_SESSION.set(Arc::new(cassandra_session)).is_err() {
493 error!("Volatile Session already set. This should not happen.");
494 }
495
496 break;
498 }
499
500 info!(db_type = db_type, network = %network, "Index DB Session Creation: OK.");
501}