cat_gateway/db/index/session/
mod.rs1mod cache_manager;
3
4use std::{
5 fmt::Debug,
6 path::PathBuf,
7 sync::{Arc, OnceLock},
8 time::Duration,
9};
10
11use cardano_chain_follower::Network;
12use openssl::ssl::{SslContextBuilder, SslFiletype, SslMethod, SslVerifyMode};
13use scylla::{
14 client::{
15 execution_profile::ExecutionProfile, pager::QueryPager, session::Session,
16 session_builder::SessionBuilder,
17 },
18 frame::Compression,
19 serialize::row::SerializeRow,
20};
21use thiserror::Error;
22use tokio::fs;
23use tracing::{debug, error, info};
24
25use super::{
26 queries::{
27 purge::{self, PreparedDeleteQuery},
28 FallibleQueryResults, PreparedQueries, PreparedQuery, PreparedSelectQuery,
29 PreparedUpsertQuery,
30 },
31 schema::create_schema,
32};
33use crate::{
34 service::utilities::health::{index_db_is_live, set_index_db_liveness},
35 settings::{cassandra_db, Settings},
36};
37
38#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames)]
40#[strum(ascii_case_insensitive)]
41pub(crate) enum CompressionChoice {
42 Lz4,
44 Snappy,
46 None,
48}
49
50#[derive(Clone, strum::EnumString, strum::Display, strum::VariantNames, PartialEq)]
52#[strum(ascii_case_insensitive)]
53pub(crate) enum TlsChoice {
54 Disabled,
56 Verified,
58 Unverified,
60}
61
62#[derive(Debug, Error)]
64pub(crate) enum CassandraSessionError {
65 #[error("Database connection failed: {source}")]
67 ConnectionUnavailable {
68 #[source]
70 source: anyhow::Error,
71 },
72 #[error("Preparing queries failed: {source}")]
74 PreparingQueriesFailed {
75 #[source]
77 source: anyhow::Error,
78 },
79 #[error("Failed acquiring database session")]
82 FailedAcquiringSession,
83}
84
85#[derive(Clone)]
87pub(crate) struct CassandraSession {
88 persistent: bool,
90 cfg: Arc<cassandra_db::EnvVars>,
92 session: Arc<Session>,
94 queries: Arc<PreparedQueries>,
96 purge_queries: Arc<purge::PreparedQueries>,
98 caches: Arc<cache_manager::Caches>,
100}
101
102static PERSISTENT_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
104
105static VOLATILE_SESSION: OnceLock<Arc<CassandraSession>> = OnceLock::new();
107
108impl CassandraSession {
109 pub(crate) fn init() {
112 let (persistent, volatile) = Settings::cassandra_db_cfg();
113 let network = Settings::cardano_network();
114
115 let _join_handle =
116 tokio::task::spawn(
117 async move { Box::pin(retry_init(persistent, network, true)).await },
118 );
119 let _join_handle =
120 tokio::task::spawn(async move { Box::pin(retry_init(volatile, network, false)).await });
121 }
122
123 pub(crate) async fn is_ready() -> bool {
125 let persistent_ready = if let Some(cassandra) = PERSISTENT_SESSION.get() {
126 cassandra
127 .session
128 .refresh_metadata()
129 .await
130 .inspect_err(|e| {
131 error!(error=%e, is_persistent=cassandra.is_persistent(), "Session connection failed");
132 })
133 .is_ok()
134 } else {
135 debug!(is_persistent = true, "Session has not been created");
136 false
137 };
138 let volatile_ready = if let Some(cassandra) = VOLATILE_SESSION.get() {
139 cassandra
140 .session
141 .refresh_metadata()
142 .await
143 .inspect_err(|e| {
144 error!(error=%e, is_persistent=cassandra.is_persistent(), "Session connection failed");
145 })
146 .is_ok()
147 } else {
148 debug!(is_persistent = false, "Session has not been created");
149 false
150 };
151 let current_liveness = index_db_is_live();
152 let is_ready = persistent_ready && volatile_ready;
153 if is_ready {
154 if !current_liveness {
155 set_index_db_liveness(true);
156 }
157 } else if current_liveness {
158 set_index_db_liveness(false);
159 }
160 is_ready
161 }
162
163 pub(crate) async fn wait_until_ready(interval: Duration) {
165 loop {
166 if Self::is_ready().await {
167 return;
168 }
169
170 tokio::time::sleep(interval).await;
171 }
172 }
173
174 pub(crate) fn get(persistent: bool) -> Option<Arc<CassandraSession>> {
176 if persistent {
177 PERSISTENT_SESSION.get().cloned()
178 } else {
179 VOLATILE_SESSION.get().cloned()
180 }
181 }
182
183 pub(crate) async fn execute_iter<P>(
188 &self, select_query: PreparedSelectQuery, params: P,
189 ) -> anyhow::Result<QueryPager>
190 where P: SerializeRow {
191 let session = self.session.clone();
192 let queries = self.queries.clone();
193
194 queries.execute_iter(session, select_query, params).await
195 }
196
197 pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
205 &self, query: PreparedQuery, values: Vec<T>,
206 ) -> FallibleQueryResults {
207 let session = self.session.clone();
208 let cfg = self.cfg.clone();
209 let queries = self.queries.clone();
210
211 queries.execute_batch(session, cfg, query, values).await
212 }
213
214 pub(crate) async fn execute_upsert<T: SerializeRow + Debug>(
217 &self, query: PreparedUpsertQuery, value: T,
218 ) -> anyhow::Result<()> {
219 let session = self.session.clone();
220 let queries = self.queries.clone();
221
222 queries.execute_upsert(session, query, value).await
223 }
224
225 pub(crate) async fn purge_execute_batch<T: SerializeRow + Debug>(
235 &self, query: PreparedDeleteQuery, values: Vec<T>,
236 ) -> FallibleQueryResults {
237 let persistent = false;
239 let Some(volatile_db) = Self::get(persistent) else {
240 anyhow::bail!("Volatile DB Session not found");
242 };
243 let cfg = self.cfg.clone();
244 let queries = self.purge_queries.clone();
245 let session = volatile_db.session.clone();
246
247 queries.execute_batch(session, cfg, query, values).await
248 }
249
250 pub(crate) async fn purge_execute_iter(
252 &self, query: purge::PreparedSelectQuery,
253 ) -> anyhow::Result<QueryPager> {
254 let persistent = false;
256 let Some(volatile_db) = Self::get(persistent) else {
257 anyhow::bail!("Volatile DB Session not found");
259 };
260 let queries = self.purge_queries.clone();
261
262 queries
263 .execute_iter(volatile_db.session.clone(), query)
264 .await
265 }
266
267 pub(crate) fn get_raw_session(&self) -> Arc<Session> {
269 self.session.clone()
270 }
271
272 pub fn is_persistent(&self) -> bool {
274 self.persistent
275 }
276
277 pub fn caches(&self) -> Arc<cache_manager::Caches> {
279 self.caches.clone()
280 }
281}
282
283fn make_execution_profile(_cfg: &cassandra_db::EnvVars) -> ExecutionProfile {
288 ExecutionProfile::builder()
289 .consistency(scylla::statement::Consistency::LocalQuorum)
290 .serial_consistency(Some(scylla::statement::SerialConsistency::LocalSerial))
291 .retry_policy(Arc::new(scylla::policies::retry::DefaultRetryPolicy::new()))
292 .load_balancing_policy(
293 scylla::policies::load_balancing::DefaultPolicyBuilder::new()
294 .permit_dc_failover(true)
295 .build(),
296 )
297 .speculative_execution_policy(Some(Arc::new(
298 scylla::policies::speculative_execution::SimpleSpeculativeExecutionPolicy {
299 max_retry_count: 3,
300 retry_interval: Duration::from_millis(100),
301 },
302 )))
303 .build()
304}
305
306async fn make_session(cfg: &cassandra_db::EnvVars) -> anyhow::Result<Arc<Session>> {
308 let cluster_urls: Vec<&str> = cfg.url.as_str().split(',').collect();
309
310 let mut sb = SessionBuilder::new()
311 .known_nodes(cluster_urls)
312 .auto_await_schema_agreement(false);
313
314 let profile_handle = make_execution_profile(cfg).into_handle();
315 sb = sb.default_execution_profile_handle(profile_handle);
316
317 sb = match cfg.compression {
318 CompressionChoice::Lz4 => sb.compression(Some(Compression::Lz4)),
319 CompressionChoice::Snappy => sb.compression(Some(Compression::Snappy)),
320 CompressionChoice::None => sb.compression(None),
321 };
322
323 if cfg.tls != TlsChoice::Disabled {
324 let mut context_builder = SslContextBuilder::new(SslMethod::tls())?;
325
326 if let Some(cert_name) = &cfg.tls_cert {
327 let certdir = fs::canonicalize(PathBuf::from(cert_name.as_str())).await?;
328 context_builder.set_certificate_file(certdir.as_path(), SslFiletype::PEM)?;
329 }
330
331 if cfg.tls == TlsChoice::Verified {
332 context_builder.set_verify(SslVerifyMode::PEER);
333 } else {
334 context_builder.set_verify(SslVerifyMode::NONE);
335 }
336
337 let ssl_context = context_builder.build();
338
339 sb = sb.tls_context(Some(ssl_context));
340 }
341
342 if let Some(username) = &cfg.username {
344 if let Some(password) = &cfg.password {
345 sb = sb.user(username.as_str(), password.as_str());
346 }
347 }
348
349 let session = Box::pin(sb.build()).await?;
350
351 Ok(Arc::new(session))
352}
353
354async fn retry_init(cfg: cassandra_db::EnvVars, network: Network, persistent: bool) {
358 let mut retry_delay = Duration::from_secs(0);
359 let db_type = if persistent { "Persistent" } else { "Volatile" };
360
361 info!(
362 db_type = db_type,
363 network = %network,
364 "Index DB Session Creation: Started."
365 );
366
367 cfg.log(persistent, network);
368
369 loop {
370 tokio::time::sleep(retry_delay).await;
371 retry_delay = Duration::from_secs(30); info!(
374 db_type = db_type,
375 network = %network,
376 "Attempting to connect to Cassandra DB..."
377 );
378
379 let session = match make_session(&cfg).await {
381 Ok(session) => session,
382 Err(error) => {
383 error!(
384 db_type = db_type,
385 network = %network,
386 error = format!("{error:?}"),
387 "Failed to Create Cassandra DB Session"
388 );
389 continue;
390 },
391 };
392
393 if let Err(error) = create_schema(&mut session.clone(), &cfg, persistent, network).await {
395 error!(
396 db_type = db_type,
397 network = %network,
398 error = format!("{error:?}"),
399 "Failed to Create Cassandra DB Schema"
400 );
401 continue;
402 }
403
404 let queries = match PreparedQueries::new(session.clone(), &cfg).await {
405 Ok(queries) => Arc::new(queries),
406 Err(error) => {
407 error!(
408 db_type = db_type,
409 network = %network,
410 error = %error,
411 "Failed to Create Cassandra Prepared Queries"
412 );
413 continue;
414 },
415 };
416
417 let purge_queries = match Box::pin(purge::PreparedQueries::new(session.clone(), &cfg)).await
418 {
419 Ok(queries) => Arc::new(queries),
420 Err(error) => {
421 error!(
422 db_type = db_type,
423 network = %network,
424 error = %error,
425 "Failed to Create Cassandra Prepared Purge Queries"
426 );
427 continue;
428 },
429 };
430
431 let cassandra_session = CassandraSession {
432 persistent,
433 cfg: Arc::new(cfg),
434 session,
435 queries,
436 purge_queries,
437 caches: Arc::new(cache_manager::Caches::new(persistent)),
438 };
439
440 if persistent {
442 if PERSISTENT_SESSION.set(Arc::new(cassandra_session)).is_err() {
443 error!("Persistent Session already set. This should not happen.");
444 };
445 } else if VOLATILE_SESSION.set(Arc::new(cassandra_session)).is_err() {
446 error!("Volatile Session already set. This should not happen.");
447 };
448
449 break;
451 }
452
453 info!(db_type = db_type, network = %network, "Index DB Session Creation: OK.");
454}