cat_gateway/db/index/queries/
mod.rs

1//! Pre-prepare queries for a given session.
2//!
3//! This improves query execution time.
4
5pub(crate) mod caches;
6pub(crate) mod purge;
7pub(crate) mod rbac;
8pub(crate) mod registrations;
9pub(crate) mod staked_ada;
10pub(crate) mod sync_status;
11
12use std::{fmt::Debug, sync::Arc};
13
14use anyhow::bail;
15use crossbeam_skiplist::SkipMap;
16use registrations::{
17    get_all_invalids::GetAllInvalidRegistrationsQuery,
18    get_all_registrations::GetAllRegistrationsQuery, get_from_stake_addr::GetRegistrationQuery,
19    get_from_stake_address::GetStakeAddrQuery, get_from_vote_key::GetStakeAddrFromVoteKeyQuery,
20    get_invalid::GetInvalidRegistrationQuery,
21};
22use scylla::{
23    client::{pager::QueryPager, session::Session},
24    errors::{ExecutionError, NextPageError, PagerExecutionError, PrepareError, RequestError},
25    response::query_result::QueryResult,
26    serialize::row::SerializeRow,
27    statement::{batch::Batch, prepared::PreparedStatement},
28};
29use staked_ada::{
30    get_assets_by_stake_address::GetAssetsByStakeAddressQuery,
31    get_txi_by_txn_hash::GetTxiByTxnHashesQuery,
32    get_txo_by_stake_address::GetTxoByStakeAddressQuery, update_txo_spent::UpdateTxoSpentQuery,
33};
34use sync_status::update::SyncStatusInsertQuery;
35use tracing::error;
36
37use super::block::{
38    certs::CertInsertQuery, cip36::Cip36InsertQuery, rbac509::Rbac509InsertQuery,
39    txi::TxiInsertQuery, txo::TxoInsertQuery,
40};
41use crate::{
42    db::index::{
43        queries::rbac::{
44            get_catalyst_id_from_public_key, get_catalyst_id_from_stake_address,
45            get_catalyst_id_from_transaction_id, get_rbac_invalid_registrations,
46            get_rbac_registrations,
47        },
48        session::CassandraSessionError,
49    },
50    service::utilities::health::set_index_db_liveness,
51    settings::cassandra_db,
52};
53
54/// Batches of different sizes, prepared and ready for use.
55pub(crate) type SizedBatch = SkipMap<u16, Arc<Batch>>;
56
57/// All Prepared insert Queries that we know about.
58#[derive(strum_macros::Display)]
59#[allow(clippy::enum_variant_names)]
60pub(crate) enum PreparedQuery {
61    /// TXO Insert query.
62    TxoAdaInsertQuery,
63    /// TXO Asset Insert query.
64    TxoAssetInsertQuery,
65    /// Unstaked TXO Insert query.
66    UnstakedTxoAdaInsertQuery,
67    /// Unstaked TXO Asset Insert query.
68    UnstakedTxoAssetInsertQuery,
69    /// TXI Insert query.
70    TxiInsertQuery,
71    /// Stake Registration Insert query.
72    StakeRegistrationInsertQuery,
73    /// CIP 36 Registration Insert Query.
74    Cip36RegistrationInsertQuery,
75    /// CIP 36 Registration Error Insert query.
76    Cip36RegistrationInsertErrorQuery,
77    /// CIP 36 Registration for voting key Insert query.
78    Cip36RegistrationForVoteKeyInsertQuery,
79    /// TXO spent Update query.
80    TxoSpentUpdateQuery,
81    /// RBAC 509 Registration Insert query.
82    Rbac509InsertQuery,
83    /// An invalid RBAC 509 registration Insert query.
84    Rbac509InvalidInsertQuery,
85    /// A Catalyst ID for transaction ID insert query.
86    CatalystIdForTxnIdInsertQuery,
87    /// A Catalyst ID for stake address insert query.
88    CatalystIdForStakeAddressInsertQuery,
89    /// A Catalyst ID for public key insert query.
90    CatalystIdForPublicKeyInsertQuery,
91}
92
93/// All prepared SELECT query statements (return data).
94pub(crate) enum PreparedSelectQuery {
95    /// Get TXO by stake address query.
96    TxoByStakeAddress,
97    /// Get TXI by transaction hash query.
98    TxiByTransactionHash,
99    /// Get native assets by stake address query.
100    AssetsByStakeAddress,
101    /// Get Registrations
102    RegistrationFromStakeAddr,
103    /// Get invalid Registration
104    InvalidRegistrationsFromStakeAddr,
105    /// Get stake addr from stake hash
106    StakeAddrFromStakeHash,
107    /// Get stake addr from vote key
108    StakeAddrFromVoteKey,
109    /// Get RBAC registrations by Catalyst ID.
110    RbacRegistrationsByCatalystId,
111    /// Get invalid RBAC registrations by Catalyst ID.
112    RbacInvalidRegistrationsByCatalystId,
113    /// Get Catalyst ID by transaction ID.
114    CatalystIdByTransactionId,
115    /// Get Catalyst ID by stake address.
116    CatalystIdByStakeAddress,
117    /// Get Catalyst ID by public key.
118    CatalystIdByPublicKey,
119    /// Get all registrations for snapshot
120    GetAllRegistrations,
121    /// Get all invalid registrations for snapshot
122    GetAllInvalidRegistrations,
123}
124
125/// All prepared UPSERT query statements (inserts/updates a single value of data).
126pub(crate) enum PreparedUpsertQuery {
127    /// Sync Status Insert
128    SyncStatusInsert,
129}
130
131/// All prepared queries for a session.
132#[allow(clippy::struct_field_names)]
133pub(crate) struct PreparedQueries {
134    /// TXO Insert query.
135    txo_insert_queries: SizedBatch,
136    /// TXO Asset Insert query.
137    txo_asset_insert_queries: SizedBatch,
138    /// Unstaked TXO Insert query.
139    unstaked_txo_insert_queries: SizedBatch,
140    /// Unstaked TXO Asset Insert query.
141    unstaked_txo_asset_insert_queries: SizedBatch,
142    /// TXI Insert query.
143    txi_insert_queries: SizedBatch,
144    /// TXI Insert query.
145    stake_registration_insert_queries: SizedBatch,
146    /// CIP36 Registrations.
147    cip36_registration_insert_queries: SizedBatch,
148    /// CIP36 Registration errors.
149    cip36_registration_error_insert_queries: SizedBatch,
150    /// CIP36 Registration for Stake Address Insert query.
151    cip36_registration_for_vote_key_insert_queries: SizedBatch,
152    /// Update TXO spent query.
153    txo_spent_update_queries: SizedBatch,
154    /// Get TXO by stake address query.
155    txo_by_stake_address_query: PreparedStatement,
156    /// Get TXI by transaction hash.
157    txi_by_txn_hash_query: PreparedStatement,
158    /// RBAC 509 Registrations.
159    rbac509_registration_insert_queries: SizedBatch,
160    /// Invalid RBAC 509 registrations.
161    rbac509_invalid_registration_insert_queries: SizedBatch,
162    /// Catalyst ID for transaction ID insert query.
163    catalyst_id_for_txn_id_insert_queries: SizedBatch,
164    /// Catalyst ID for stake address insert query.
165    catalyst_id_for_stake_address_insert_queries: SizedBatch,
166    /// Catalyst ID for public key insert query.
167    catalyst_id_for_public_key_insert_queries: SizedBatch,
168    /// Get native assets by stake address query.
169    native_assets_by_stake_address_query: PreparedStatement,
170    /// Get registrations
171    registration_from_stake_addr_query: PreparedStatement,
172    /// stake addr from stake hash
173    stake_addr_from_stake_address_query: PreparedStatement,
174    /// stake addr from vote key
175    stake_addr_from_vote_key_query: PreparedStatement,
176    /// Get invalid registrations
177    invalid_registrations_from_stake_addr_query: PreparedStatement,
178    /// Insert Sync Status update.
179    sync_status_insert: PreparedStatement,
180    /// Get RBAC registrations by Catalyst ID.
181    rbac_registrations_by_catalyst_id_query: PreparedStatement,
182    /// Get invalid RBAC registrations by Catalyst ID.
183    rbac_invalid_registrations_by_catalyst_id_query: PreparedStatement,
184    /// Get Catalyst ID by stake address.
185    catalyst_id_by_stake_address_query: PreparedStatement,
186    /// Get Catalyst ID by transaction ID.
187    catalyst_id_by_transaction_id_query: PreparedStatement,
188    /// Get Catalyst ID by public key.
189    catalyst_id_by_public_key_query: PreparedStatement,
190    /// Get all registrations for snapshot
191    get_all_registrations_query: PreparedStatement,
192    /// Get all invalid registrations for snapshot
193    get_all_invalid_registrations_query: PreparedStatement,
194}
195
196/// A set of query responses that can fail.
197pub(crate) type FallibleQueryResults = anyhow::Result<Vec<QueryResult>>;
198/// A set of query responses from tasks that can fail.
199pub(crate) type FallibleQueryTasks = Vec<tokio::task::JoinHandle<FallibleQueryResults>>;
200
201impl PreparedQueries {
202    /// Create new prepared queries for a given session.
203    #[allow(clippy::too_many_lines)]
204    pub(crate) async fn new(
205        session: Arc<Session>,
206        cfg: &cassandra_db::EnvVars,
207    ) -> anyhow::Result<Self> {
208        // We initialize like this, so that all errors preparing querys get shown before aborting.
209        let txi_insert_queries = TxiInsertQuery::prepare_batch(&session, cfg).await?;
210        let all_txo_queries = TxoInsertQuery::prepare_batch(&session, cfg).await;
211        let stake_registration_insert_queries =
212            CertInsertQuery::prepare_batch(&session, cfg).await?;
213        let all_cip36_queries = Cip36InsertQuery::prepare_batch(&session, cfg).await;
214        let txo_spent_update_queries =
215            UpdateTxoSpentQuery::prepare_batch(session.clone(), cfg).await?;
216        let txo_by_stake_address_query = GetTxoByStakeAddressQuery::prepare(session.clone()).await;
217        let txi_by_txn_hash_query = GetTxiByTxnHashesQuery::prepare(session.clone()).await;
218        let all_rbac_queries = Rbac509InsertQuery::prepare_batch(&session, cfg).await;
219        let native_assets_by_stake_address_query =
220            GetAssetsByStakeAddressQuery::prepare(session.clone()).await;
221        let registration_from_stake_addr_query =
222            GetRegistrationQuery::prepare(session.clone()).await;
223        let stake_addr_from_stake_address = GetStakeAddrQuery::prepare(session.clone()).await;
224        let stake_addr_from_vote_key = GetStakeAddrFromVoteKeyQuery::prepare(session.clone()).await;
225        let invalid_registrations = GetInvalidRegistrationQuery::prepare(session.clone()).await;
226        let get_all_registrations_query = GetAllRegistrationsQuery::prepare(session.clone()).await;
227        let get_all_invalid_registrations_query =
228            GetAllInvalidRegistrationsQuery::prepare(session.clone()).await;
229        let sync_status_insert = SyncStatusInsertQuery::prepare(session.clone()).await?;
230        let rbac_registrations_by_catalyst_id_query =
231            get_rbac_registrations::Query::prepare(session.clone()).await?;
232        let rbac_invalid_registrations_by_catalyst_id_query =
233            get_rbac_invalid_registrations::Query::prepare(session.clone()).await?;
234        let catalyst_id_by_stake_address_query =
235            get_catalyst_id_from_stake_address::Query::prepare(session.clone()).await?;
236        let catalyst_id_by_transaction_id_query =
237            get_catalyst_id_from_transaction_id::Query::prepare(session.clone()).await?;
238        let catalyst_id_by_public_key_query =
239            get_catalyst_id_from_public_key::Query::prepare(session.clone()).await?;
240
241        let (
242            txo_insert_queries,
243            unstaked_txo_insert_queries,
244            txo_asset_insert_queries,
245            unstaked_txo_asset_insert_queries,
246        ) = all_txo_queries?;
247
248        let (
249            cip36_registration_insert_queries,
250            cip36_registration_error_insert_queries,
251            cip36_registration_for_vote_key_insert_queries,
252        ) = all_cip36_queries?;
253
254        let (
255            rbac509_registration_insert_queries,
256            rbac509_invalid_registration_insert_queries,
257            catalyst_id_for_txn_id_insert_queries,
258            catalyst_id_for_stake_address_insert_queries,
259            catalyst_id_for_public_key_insert_queries,
260        ) = all_rbac_queries?;
261
262        Ok(Self {
263            txo_insert_queries,
264            txo_asset_insert_queries,
265            unstaked_txo_insert_queries,
266            unstaked_txo_asset_insert_queries,
267            txi_insert_queries,
268            stake_registration_insert_queries,
269            cip36_registration_insert_queries,
270            cip36_registration_error_insert_queries,
271            cip36_registration_for_vote_key_insert_queries,
272            txo_spent_update_queries,
273            txo_by_stake_address_query: txo_by_stake_address_query?,
274            txi_by_txn_hash_query: txi_by_txn_hash_query?,
275            rbac509_registration_insert_queries,
276            rbac509_invalid_registration_insert_queries,
277            catalyst_id_for_txn_id_insert_queries,
278            catalyst_id_for_stake_address_insert_queries,
279            catalyst_id_for_public_key_insert_queries,
280            native_assets_by_stake_address_query: native_assets_by_stake_address_query?,
281            registration_from_stake_addr_query: registration_from_stake_addr_query?,
282            stake_addr_from_stake_address_query: stake_addr_from_stake_address?,
283            stake_addr_from_vote_key_query: stake_addr_from_vote_key?,
284            invalid_registrations_from_stake_addr_query: invalid_registrations?,
285            sync_status_insert,
286            rbac_registrations_by_catalyst_id_query,
287            rbac_invalid_registrations_by_catalyst_id_query,
288            catalyst_id_by_stake_address_query,
289            catalyst_id_by_transaction_id_query,
290            catalyst_id_by_public_key_query,
291            get_all_registrations_query: get_all_registrations_query?,
292            get_all_invalid_registrations_query: get_all_invalid_registrations_query?,
293        })
294    }
295
296    /// Prepares a statement.
297    pub(crate) async fn prepare(
298        session: Arc<Session>,
299        query: &str,
300        consistency: scylla::statement::Consistency,
301        idempotent: bool,
302    ) -> anyhow::Result<PreparedStatement> {
303        let mut prepared = session
304            .prepare(query)
305            .await
306            .map_err(|e| {
307                match e {
308                    PrepareError::ConnectionPoolError(err) => {
309                        set_index_db_liveness(false);
310                        error!(error = %err, "Index DB connection failed when preparing. Liveness set to false.");
311                        CassandraSessionError::ConnectionUnavailable { source: err.into() }
312                    },
313                    _ => CassandraSessionError::PreparingQueriesFailed { source: e.into() },
314                }
315            })?;
316        prepared.set_consistency(consistency);
317        prepared.set_is_idempotent(idempotent);
318
319        Ok(prepared)
320    }
321
322    /// Prepares all permutations of the batch from 1 to max.
323    /// It is necessary to do this because batches are pre-sized, they can not be dynamic.
324    /// Preparing the batches in advance is a very larger performance increase.
325    pub(crate) async fn prepare_batch(
326        session: Arc<Session>,
327        query: &str,
328        cfg: &cassandra_db::EnvVars,
329        consistency: scylla::statement::Consistency,
330        idempotent: bool,
331        logged: bool,
332    ) -> anyhow::Result<SizedBatch> {
333        let sized_batches: SizedBatch = SkipMap::new();
334
335        // First prepare the query. Only needs to be done once, all queries on a batch are the
336        // same.
337        let prepared = Self::prepare(session, query, consistency, idempotent).await?;
338
339        for batch_size in cassandra_db::MIN_BATCH_SIZE..=cfg.max_batch_size {
340            let mut batch: Batch = Batch::new(if logged {
341                scylla::statement::batch::BatchType::Logged
342            } else {
343                scylla::statement::batch::BatchType::Unlogged
344            });
345            batch.set_consistency(consistency);
346            batch.set_is_idempotent(idempotent);
347            for _ in cassandra_db::MIN_BATCH_SIZE..=batch_size {
348                batch.append_statement(prepared.clone());
349            }
350
351            sized_batches.insert(batch_size.try_into()?, Arc::new(batch));
352        }
353
354        Ok(sized_batches)
355    }
356
357    /// Executes a single query with the given parameters.
358    ///
359    /// Returns no data, and an error if the query fails.
360    pub(crate) async fn execute_upsert<P>(
361        &self,
362        session: Arc<Session>,
363        upsert_query: PreparedUpsertQuery,
364        params: P,
365    ) -> anyhow::Result<()>
366    where
367        P: SerializeRow,
368    {
369        let prepared_stmt = match upsert_query {
370            PreparedUpsertQuery::SyncStatusInsert => &self.sync_status_insert,
371        };
372
373        session
374            .execute_unpaged(prepared_stmt, params)
375            .await
376            .map_err(|e| {
377                match e {
378                    ExecutionError::ConnectionPoolError(err) => {
379                        set_index_db_liveness(false);
380                        error!(error = %err, "Index DB connection failed. Liveness set to false.");
381                        CassandraSessionError::ConnectionUnavailable { source: err.into() }.into()
382                    },
383                    _ => anyhow::anyhow!(e),
384                }
385            })?;
386
387        Ok(())
388    }
389
390    /// Executes a select query with the given parameters.
391    ///
392    /// Returns an iterator that iterates over all the result pages that the query
393    /// returns.
394    pub(crate) async fn execute_iter<P>(
395        &self,
396        session: Arc<Session>,
397        select_query: PreparedSelectQuery,
398        params: P,
399    ) -> anyhow::Result<QueryPager>
400    where
401        P: SerializeRow,
402    {
403        let prepared_stmt = match select_query {
404            PreparedSelectQuery::TxoByStakeAddress => &self.txo_by_stake_address_query,
405            PreparedSelectQuery::TxiByTransactionHash => &self.txi_by_txn_hash_query,
406            PreparedSelectQuery::AssetsByStakeAddress => &self.native_assets_by_stake_address_query,
407            PreparedSelectQuery::RegistrationFromStakeAddr => {
408                &self.registration_from_stake_addr_query
409            },
410            PreparedSelectQuery::StakeAddrFromStakeHash => {
411                &self.stake_addr_from_stake_address_query
412            },
413            PreparedSelectQuery::StakeAddrFromVoteKey => &self.stake_addr_from_vote_key_query,
414            PreparedSelectQuery::InvalidRegistrationsFromStakeAddr => {
415                &self.invalid_registrations_from_stake_addr_query
416            },
417            PreparedSelectQuery::RbacRegistrationsByCatalystId => {
418                &self.rbac_registrations_by_catalyst_id_query
419            },
420            PreparedSelectQuery::RbacInvalidRegistrationsByCatalystId => {
421                &self.rbac_invalid_registrations_by_catalyst_id_query
422            },
423            PreparedSelectQuery::CatalystIdByTransactionId => {
424                &self.catalyst_id_by_transaction_id_query
425            },
426            PreparedSelectQuery::CatalystIdByStakeAddress => {
427                &self.catalyst_id_by_stake_address_query
428            },
429            PreparedSelectQuery::CatalystIdByPublicKey => &self.catalyst_id_by_public_key_query,
430            PreparedSelectQuery::GetAllRegistrations => &self.get_all_registrations_query,
431            PreparedSelectQuery::GetAllInvalidRegistrations => {
432                &self.get_all_invalid_registrations_query
433            },
434        };
435        session_execute_iter(session, prepared_stmt, params).await
436    }
437
438    /// Execute a Batch query with the given parameters.
439    ///
440    /// Values should be a Vec of values which implement `SerializeRow` and they MUST be
441    /// the same, and must match the query being executed.
442    ///
443    /// This will divide the batch into optimal sized chunks and execute them until all
444    /// values have been executed or the first error is encountered.
445    pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
446        &self,
447        session: Arc<Session>,
448        cfg: Arc<cassandra_db::EnvVars>,
449        query: PreparedQuery,
450        values: Vec<T>,
451    ) -> FallibleQueryResults {
452        let query_map = match query {
453            PreparedQuery::TxoAdaInsertQuery => &self.txo_insert_queries,
454            PreparedQuery::TxoAssetInsertQuery => &self.txo_asset_insert_queries,
455            PreparedQuery::UnstakedTxoAdaInsertQuery => &self.unstaked_txo_insert_queries,
456            PreparedQuery::UnstakedTxoAssetInsertQuery => &self.unstaked_txo_asset_insert_queries,
457            PreparedQuery::TxiInsertQuery => &self.txi_insert_queries,
458            PreparedQuery::StakeRegistrationInsertQuery => &self.stake_registration_insert_queries,
459            PreparedQuery::Cip36RegistrationInsertQuery => &self.cip36_registration_insert_queries,
460            PreparedQuery::Cip36RegistrationInsertErrorQuery => {
461                &self.cip36_registration_error_insert_queries
462            },
463            PreparedQuery::Cip36RegistrationForVoteKeyInsertQuery => {
464                &self.cip36_registration_for_vote_key_insert_queries
465            },
466            PreparedQuery::TxoSpentUpdateQuery => &self.txo_spent_update_queries,
467            PreparedQuery::Rbac509InsertQuery => &self.rbac509_registration_insert_queries,
468            PreparedQuery::Rbac509InvalidInsertQuery => {
469                &self.rbac509_invalid_registration_insert_queries
470            },
471            PreparedQuery::CatalystIdForTxnIdInsertQuery => {
472                &self.catalyst_id_for_txn_id_insert_queries
473            },
474            PreparedQuery::CatalystIdForStakeAddressInsertQuery => {
475                &self.catalyst_id_for_stake_address_insert_queries
476            },
477            PreparedQuery::CatalystIdForPublicKeyInsertQuery => {
478                &self.catalyst_id_for_public_key_insert_queries
479            },
480        };
481        session_execute_batch(session, query_map, cfg, query, values).await
482    }
483}
484
485/// Execute a Batch query with the given parameters.
486///
487/// Values should be a Vec of values which implement `SerializeRow` and they MUST be
488/// the same, and must match the query being executed.
489///
490/// This will divide the batch into optimal sized chunks and execute them until all
491/// values have been executed or the first error is encountered.
492async fn session_execute_batch<T: SerializeRow + Debug, Q: std::fmt::Display>(
493    session: Arc<Session>,
494    query_map: &SizedBatch,
495    cfg: Arc<cassandra_db::EnvVars>,
496    query: Q,
497    values: Vec<T>,
498) -> FallibleQueryResults {
499    let mut results: Vec<QueryResult> = Vec::new();
500    let mut errors = Vec::new();
501
502    let chunks = values.chunks(cfg.max_batch_size.try_into().unwrap_or(1));
503    let query_str = format!("{query}");
504
505    for chunk in chunks {
506        let chunk_size: u16 = chunk.len().try_into()?;
507        let Some(batch_query) = query_map.get(&chunk_size) else {
508            // This should not actually occur.
509            bail!("No batch query found for size {}", chunk_size);
510        };
511        let batch_query_statements = batch_query.value().clone();
512        match session.batch(&batch_query_statements, chunk).await {
513            Ok(result) => results.push(result),
514            Err(error) => {
515                let chunk_str = format!("{chunk:?}");
516                if let ExecutionError::ConnectionPoolError(err) = error {
517                    set_index_db_liveness(false);
518                    error!(error=%err, query=query_str, chunk=chunk_str, "Index DB connection failed. Liveness set to false.");
519                    bail!(CassandraSessionError::ConnectionUnavailable { source: err.into() })
520                }
521                error!(%error, query=query_str, chunk=chunk_str, "Query Execution Failed");
522                errors.push(error);
523                // Defer failure until all batches have been processed.
524            },
525        }
526    }
527
528    if !errors.is_empty() {
529        bail!("Query Failed: {query_str}! {errors:?}");
530    }
531
532    Ok(results)
533}
534
535/// Executes a select query with the given parameters.
536///
537/// Returns an iterator that iterates over all the result pages that the query
538/// returns.
539pub(crate) async fn session_execute_iter<P>(
540    session: Arc<Session>,
541    prepared_stmt: &PreparedStatement,
542    params: P,
543) -> anyhow::Result<QueryPager>
544where
545    P: SerializeRow,
546{
547    session
548        .execute_iter(prepared_stmt.clone(), params)
549        .await
550        .map_err(|e| {
551            match e {
552                PagerExecutionError::PrepareError(PrepareError::ConnectionPoolError(err)) => {
553                    set_index_db_liveness(false);
554                    error!(error = %err, "Index DB connection failed when preparing. Liveness set to false.");
555                    CassandraSessionError::ConnectionUnavailable { source: err.into() }.into()
556                },
557                PagerExecutionError::NextPageError(NextPageError::RequestFailure(RequestError::ConnectionPoolError(err))) => {
558                    set_index_db_liveness(false);
559                    error!(error = %err, "Index DB connection failed during request. Liveness set to false.");
560                    CassandraSessionError::ConnectionUnavailable { source: err.into() }.into()
561                },
562                _ => e.into(),
563            }
564        })
565}