1pub(crate) mod caches;
6pub(crate) mod purge;
7pub(crate) mod rbac;
8pub(crate) mod registrations;
9pub(crate) mod staked_ada;
10pub(crate) mod sync_status;
11
12use std::{fmt::Debug, sync::Arc};
13
14use anyhow::bail;
15use crossbeam_skiplist::SkipMap;
16use registrations::{
17 get_all_invalids::GetAllInvalidRegistrationsQuery,
18 get_all_registrations::GetAllRegistrationsQuery, get_from_stake_addr::GetRegistrationQuery,
19 get_from_stake_address::GetStakeAddrQuery, get_from_vote_key::GetStakeAddrFromVoteKeyQuery,
20 get_invalid::GetInvalidRegistrationQuery,
21};
22use scylla::{
23 client::{pager::QueryPager, session::Session},
24 errors::{ExecutionError, NextPageError, PagerExecutionError, PrepareError, RequestError},
25 response::query_result::QueryResult,
26 serialize::row::SerializeRow,
27 statement::{batch::Batch, prepared::PreparedStatement},
28};
29use staked_ada::{
30 get_assets_by_stake_address::GetAssetsByStakeAddressQuery,
31 get_txi_by_txn_hash::GetTxiByTxnHashesQuery,
32 get_txo_by_stake_address::GetTxoByStakeAddressQuery, update_txo_spent::UpdateTxoSpentQuery,
33};
34use sync_status::update::SyncStatusInsertQuery;
35use tracing::error;
36
37use super::block::{
38 certs::CertInsertQuery, cip36::Cip36InsertQuery, rbac509::Rbac509InsertQuery,
39 txi::TxiInsertQuery, txo::TxoInsertQuery,
40};
41use crate::{
42 db::index::{
43 queries::rbac::{
44 get_catalyst_id_from_public_key, get_catalyst_id_from_stake_address,
45 get_catalyst_id_from_transaction_id, get_rbac_invalid_registrations,
46 get_rbac_registrations,
47 },
48 session::CassandraSessionError,
49 },
50 service::utilities::health::set_index_db_liveness,
51 settings::cassandra_db,
52};
53
54pub(crate) type SizedBatch = SkipMap<u16, Arc<Batch>>;
56
57#[derive(strum_macros::Display)]
59#[allow(clippy::enum_variant_names)]
60pub(crate) enum PreparedQuery {
61 TxoAdaInsertQuery,
63 TxoAssetInsertQuery,
65 UnstakedTxoAdaInsertQuery,
67 UnstakedTxoAssetInsertQuery,
69 TxiInsertQuery,
71 StakeRegistrationInsertQuery,
73 Cip36RegistrationInsertQuery,
75 Cip36RegistrationInsertErrorQuery,
77 Cip36RegistrationForVoteKeyInsertQuery,
79 TxoSpentUpdateQuery,
81 Rbac509InsertQuery,
83 Rbac509InvalidInsertQuery,
85 CatalystIdForTxnIdInsertQuery,
87 CatalystIdForStakeAddressInsertQuery,
89 CatalystIdForPublicKeyInsertQuery,
91}
92
93pub(crate) enum PreparedSelectQuery {
95 TxoByStakeAddress,
97 TxiByTransactionHash,
99 AssetsByStakeAddress,
101 RegistrationFromStakeAddr,
103 InvalidRegistrationsFromStakeAddr,
105 StakeAddrFromStakeHash,
107 StakeAddrFromVoteKey,
109 RbacRegistrationsByCatalystId,
111 RbacInvalidRegistrationsByCatalystId,
113 CatalystIdByTransactionId,
115 CatalystIdByStakeAddress,
117 CatalystIdByPublicKey,
119 GetAllRegistrations,
121 GetAllInvalidRegistrations,
123}
124
125pub(crate) enum PreparedUpsertQuery {
127 SyncStatusInsert,
129}
130
131#[allow(clippy::struct_field_names)]
133pub(crate) struct PreparedQueries {
134 txo_insert_queries: SizedBatch,
136 txo_asset_insert_queries: SizedBatch,
138 unstaked_txo_insert_queries: SizedBatch,
140 unstaked_txo_asset_insert_queries: SizedBatch,
142 txi_insert_queries: SizedBatch,
144 stake_registration_insert_queries: SizedBatch,
146 cip36_registration_insert_queries: SizedBatch,
148 cip36_registration_error_insert_queries: SizedBatch,
150 cip36_registration_for_vote_key_insert_queries: SizedBatch,
152 txo_spent_update_queries: SizedBatch,
154 txo_by_stake_address_query: PreparedStatement,
156 txi_by_txn_hash_query: PreparedStatement,
158 rbac509_registration_insert_queries: SizedBatch,
160 rbac509_invalid_registration_insert_queries: SizedBatch,
162 catalyst_id_for_txn_id_insert_queries: SizedBatch,
164 catalyst_id_for_stake_address_insert_queries: SizedBatch,
166 catalyst_id_for_public_key_insert_queries: SizedBatch,
168 native_assets_by_stake_address_query: PreparedStatement,
170 registration_from_stake_addr_query: PreparedStatement,
172 stake_addr_from_stake_address_query: PreparedStatement,
174 stake_addr_from_vote_key_query: PreparedStatement,
176 invalid_registrations_from_stake_addr_query: PreparedStatement,
178 sync_status_insert: PreparedStatement,
180 rbac_registrations_by_catalyst_id_query: PreparedStatement,
182 rbac_invalid_registrations_by_catalyst_id_query: PreparedStatement,
184 catalyst_id_by_stake_address_query: PreparedStatement,
186 catalyst_id_by_transaction_id_query: PreparedStatement,
188 catalyst_id_by_public_key_query: PreparedStatement,
190 get_all_registrations_query: PreparedStatement,
192 get_all_invalid_registrations_query: PreparedStatement,
194}
195
196pub(crate) type FallibleQueryResults = anyhow::Result<Vec<QueryResult>>;
198pub(crate) type FallibleQueryTasks = Vec<tokio::task::JoinHandle<FallibleQueryResults>>;
200
201impl PreparedQueries {
202 #[allow(clippy::too_many_lines)]
204 pub(crate) async fn new(
205 session: Arc<Session>,
206 cfg: &cassandra_db::EnvVars,
207 ) -> anyhow::Result<Self> {
208 let txi_insert_queries = TxiInsertQuery::prepare_batch(&session, cfg).await?;
210 let all_txo_queries = TxoInsertQuery::prepare_batch(&session, cfg).await;
211 let stake_registration_insert_queries =
212 CertInsertQuery::prepare_batch(&session, cfg).await?;
213 let all_cip36_queries = Cip36InsertQuery::prepare_batch(&session, cfg).await;
214 let txo_spent_update_queries =
215 UpdateTxoSpentQuery::prepare_batch(session.clone(), cfg).await?;
216 let txo_by_stake_address_query = GetTxoByStakeAddressQuery::prepare(session.clone()).await;
217 let txi_by_txn_hash_query = GetTxiByTxnHashesQuery::prepare(session.clone()).await;
218 let all_rbac_queries = Rbac509InsertQuery::prepare_batch(&session, cfg).await;
219 let native_assets_by_stake_address_query =
220 GetAssetsByStakeAddressQuery::prepare(session.clone()).await;
221 let registration_from_stake_addr_query =
222 GetRegistrationQuery::prepare(session.clone()).await;
223 let stake_addr_from_stake_address = GetStakeAddrQuery::prepare(session.clone()).await;
224 let stake_addr_from_vote_key = GetStakeAddrFromVoteKeyQuery::prepare(session.clone()).await;
225 let invalid_registrations = GetInvalidRegistrationQuery::prepare(session.clone()).await;
226 let get_all_registrations_query = GetAllRegistrationsQuery::prepare(session.clone()).await;
227 let get_all_invalid_registrations_query =
228 GetAllInvalidRegistrationsQuery::prepare(session.clone()).await;
229 let sync_status_insert = SyncStatusInsertQuery::prepare(session.clone()).await?;
230 let rbac_registrations_by_catalyst_id_query =
231 get_rbac_registrations::Query::prepare(session.clone()).await?;
232 let rbac_invalid_registrations_by_catalyst_id_query =
233 get_rbac_invalid_registrations::Query::prepare(session.clone()).await?;
234 let catalyst_id_by_stake_address_query =
235 get_catalyst_id_from_stake_address::Query::prepare(session.clone()).await?;
236 let catalyst_id_by_transaction_id_query =
237 get_catalyst_id_from_transaction_id::Query::prepare(session.clone()).await?;
238 let catalyst_id_by_public_key_query =
239 get_catalyst_id_from_public_key::Query::prepare(session.clone()).await?;
240
241 let (
242 txo_insert_queries,
243 unstaked_txo_insert_queries,
244 txo_asset_insert_queries,
245 unstaked_txo_asset_insert_queries,
246 ) = all_txo_queries?;
247
248 let (
249 cip36_registration_insert_queries,
250 cip36_registration_error_insert_queries,
251 cip36_registration_for_vote_key_insert_queries,
252 ) = all_cip36_queries?;
253
254 let (
255 rbac509_registration_insert_queries,
256 rbac509_invalid_registration_insert_queries,
257 catalyst_id_for_txn_id_insert_queries,
258 catalyst_id_for_stake_address_insert_queries,
259 catalyst_id_for_public_key_insert_queries,
260 ) = all_rbac_queries?;
261
262 Ok(Self {
263 txo_insert_queries,
264 txo_asset_insert_queries,
265 unstaked_txo_insert_queries,
266 unstaked_txo_asset_insert_queries,
267 txi_insert_queries,
268 stake_registration_insert_queries,
269 cip36_registration_insert_queries,
270 cip36_registration_error_insert_queries,
271 cip36_registration_for_vote_key_insert_queries,
272 txo_spent_update_queries,
273 txo_by_stake_address_query: txo_by_stake_address_query?,
274 txi_by_txn_hash_query: txi_by_txn_hash_query?,
275 rbac509_registration_insert_queries,
276 rbac509_invalid_registration_insert_queries,
277 catalyst_id_for_txn_id_insert_queries,
278 catalyst_id_for_stake_address_insert_queries,
279 catalyst_id_for_public_key_insert_queries,
280 native_assets_by_stake_address_query: native_assets_by_stake_address_query?,
281 registration_from_stake_addr_query: registration_from_stake_addr_query?,
282 stake_addr_from_stake_address_query: stake_addr_from_stake_address?,
283 stake_addr_from_vote_key_query: stake_addr_from_vote_key?,
284 invalid_registrations_from_stake_addr_query: invalid_registrations?,
285 sync_status_insert,
286 rbac_registrations_by_catalyst_id_query,
287 rbac_invalid_registrations_by_catalyst_id_query,
288 catalyst_id_by_stake_address_query,
289 catalyst_id_by_transaction_id_query,
290 catalyst_id_by_public_key_query,
291 get_all_registrations_query: get_all_registrations_query?,
292 get_all_invalid_registrations_query: get_all_invalid_registrations_query?,
293 })
294 }
295
296 pub(crate) async fn prepare(
298 session: Arc<Session>,
299 query: &str,
300 consistency: scylla::statement::Consistency,
301 idempotent: bool,
302 ) -> anyhow::Result<PreparedStatement> {
303 let mut prepared = session
304 .prepare(query)
305 .await
306 .map_err(|e| {
307 match e {
308 PrepareError::ConnectionPoolError(err) => {
309 set_index_db_liveness(false);
310 error!(error = %err, "Index DB connection failed when preparing. Liveness set to false.");
311 CassandraSessionError::ConnectionUnavailable { source: err.into() }
312 },
313 _ => CassandraSessionError::PreparingQueriesFailed { source: e.into() },
314 }
315 })?;
316 prepared.set_consistency(consistency);
317 prepared.set_is_idempotent(idempotent);
318
319 Ok(prepared)
320 }
321
322 pub(crate) async fn prepare_batch(
326 session: Arc<Session>,
327 query: &str,
328 cfg: &cassandra_db::EnvVars,
329 consistency: scylla::statement::Consistency,
330 idempotent: bool,
331 logged: bool,
332 ) -> anyhow::Result<SizedBatch> {
333 let sized_batches: SizedBatch = SkipMap::new();
334
335 let prepared = Self::prepare(session, query, consistency, idempotent).await?;
338
339 for batch_size in cassandra_db::MIN_BATCH_SIZE..=cfg.max_batch_size {
340 let mut batch: Batch = Batch::new(if logged {
341 scylla::statement::batch::BatchType::Logged
342 } else {
343 scylla::statement::batch::BatchType::Unlogged
344 });
345 batch.set_consistency(consistency);
346 batch.set_is_idempotent(idempotent);
347 for _ in cassandra_db::MIN_BATCH_SIZE..=batch_size {
348 batch.append_statement(prepared.clone());
349 }
350
351 sized_batches.insert(batch_size.try_into()?, Arc::new(batch));
352 }
353
354 Ok(sized_batches)
355 }
356
357 pub(crate) async fn execute_upsert<P>(
361 &self,
362 session: Arc<Session>,
363 upsert_query: PreparedUpsertQuery,
364 params: P,
365 ) -> anyhow::Result<()>
366 where
367 P: SerializeRow,
368 {
369 let prepared_stmt = match upsert_query {
370 PreparedUpsertQuery::SyncStatusInsert => &self.sync_status_insert,
371 };
372
373 session
374 .execute_unpaged(prepared_stmt, params)
375 .await
376 .map_err(|e| {
377 match e {
378 ExecutionError::ConnectionPoolError(err) => {
379 set_index_db_liveness(false);
380 error!(error = %err, "Index DB connection failed. Liveness set to false.");
381 CassandraSessionError::ConnectionUnavailable { source: err.into() }.into()
382 },
383 _ => anyhow::anyhow!(e),
384 }
385 })?;
386
387 Ok(())
388 }
389
390 pub(crate) async fn execute_iter<P>(
395 &self,
396 session: Arc<Session>,
397 select_query: PreparedSelectQuery,
398 params: P,
399 ) -> anyhow::Result<QueryPager>
400 where
401 P: SerializeRow,
402 {
403 let prepared_stmt = match select_query {
404 PreparedSelectQuery::TxoByStakeAddress => &self.txo_by_stake_address_query,
405 PreparedSelectQuery::TxiByTransactionHash => &self.txi_by_txn_hash_query,
406 PreparedSelectQuery::AssetsByStakeAddress => &self.native_assets_by_stake_address_query,
407 PreparedSelectQuery::RegistrationFromStakeAddr => {
408 &self.registration_from_stake_addr_query
409 },
410 PreparedSelectQuery::StakeAddrFromStakeHash => {
411 &self.stake_addr_from_stake_address_query
412 },
413 PreparedSelectQuery::StakeAddrFromVoteKey => &self.stake_addr_from_vote_key_query,
414 PreparedSelectQuery::InvalidRegistrationsFromStakeAddr => {
415 &self.invalid_registrations_from_stake_addr_query
416 },
417 PreparedSelectQuery::RbacRegistrationsByCatalystId => {
418 &self.rbac_registrations_by_catalyst_id_query
419 },
420 PreparedSelectQuery::RbacInvalidRegistrationsByCatalystId => {
421 &self.rbac_invalid_registrations_by_catalyst_id_query
422 },
423 PreparedSelectQuery::CatalystIdByTransactionId => {
424 &self.catalyst_id_by_transaction_id_query
425 },
426 PreparedSelectQuery::CatalystIdByStakeAddress => {
427 &self.catalyst_id_by_stake_address_query
428 },
429 PreparedSelectQuery::CatalystIdByPublicKey => &self.catalyst_id_by_public_key_query,
430 PreparedSelectQuery::GetAllRegistrations => &self.get_all_registrations_query,
431 PreparedSelectQuery::GetAllInvalidRegistrations => {
432 &self.get_all_invalid_registrations_query
433 },
434 };
435 session_execute_iter(session, prepared_stmt, params).await
436 }
437
438 pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
446 &self,
447 session: Arc<Session>,
448 cfg: Arc<cassandra_db::EnvVars>,
449 query: PreparedQuery,
450 values: Vec<T>,
451 ) -> FallibleQueryResults {
452 let query_map = match query {
453 PreparedQuery::TxoAdaInsertQuery => &self.txo_insert_queries,
454 PreparedQuery::TxoAssetInsertQuery => &self.txo_asset_insert_queries,
455 PreparedQuery::UnstakedTxoAdaInsertQuery => &self.unstaked_txo_insert_queries,
456 PreparedQuery::UnstakedTxoAssetInsertQuery => &self.unstaked_txo_asset_insert_queries,
457 PreparedQuery::TxiInsertQuery => &self.txi_insert_queries,
458 PreparedQuery::StakeRegistrationInsertQuery => &self.stake_registration_insert_queries,
459 PreparedQuery::Cip36RegistrationInsertQuery => &self.cip36_registration_insert_queries,
460 PreparedQuery::Cip36RegistrationInsertErrorQuery => {
461 &self.cip36_registration_error_insert_queries
462 },
463 PreparedQuery::Cip36RegistrationForVoteKeyInsertQuery => {
464 &self.cip36_registration_for_vote_key_insert_queries
465 },
466 PreparedQuery::TxoSpentUpdateQuery => &self.txo_spent_update_queries,
467 PreparedQuery::Rbac509InsertQuery => &self.rbac509_registration_insert_queries,
468 PreparedQuery::Rbac509InvalidInsertQuery => {
469 &self.rbac509_invalid_registration_insert_queries
470 },
471 PreparedQuery::CatalystIdForTxnIdInsertQuery => {
472 &self.catalyst_id_for_txn_id_insert_queries
473 },
474 PreparedQuery::CatalystIdForStakeAddressInsertQuery => {
475 &self.catalyst_id_for_stake_address_insert_queries
476 },
477 PreparedQuery::CatalystIdForPublicKeyInsertQuery => {
478 &self.catalyst_id_for_public_key_insert_queries
479 },
480 };
481 session_execute_batch(session, query_map, cfg, query, values).await
482 }
483}
484
485async fn session_execute_batch<T: SerializeRow + Debug, Q: std::fmt::Display>(
493 session: Arc<Session>,
494 query_map: &SizedBatch,
495 cfg: Arc<cassandra_db::EnvVars>,
496 query: Q,
497 values: Vec<T>,
498) -> FallibleQueryResults {
499 let mut results: Vec<QueryResult> = Vec::new();
500 let mut errors = Vec::new();
501
502 let chunks = values.chunks(cfg.max_batch_size.try_into().unwrap_or(1));
503 let query_str = format!("{query}");
504
505 for chunk in chunks {
506 let chunk_size: u16 = chunk.len().try_into()?;
507 let Some(batch_query) = query_map.get(&chunk_size) else {
508 bail!("No batch query found for size {}", chunk_size);
510 };
511 let batch_query_statements = batch_query.value().clone();
512 match session.batch(&batch_query_statements, chunk).await {
513 Ok(result) => results.push(result),
514 Err(error) => {
515 let chunk_str = format!("{chunk:?}");
516 if let ExecutionError::ConnectionPoolError(err) = error {
517 set_index_db_liveness(false);
518 error!(error=%err, query=query_str, chunk=chunk_str, "Index DB connection failed. Liveness set to false.");
519 bail!(CassandraSessionError::ConnectionUnavailable { source: err.into() })
520 }
521 error!(%error, query=query_str, chunk=chunk_str, "Query Execution Failed");
522 errors.push(error);
523 },
525 }
526 }
527
528 if !errors.is_empty() {
529 bail!("Query Failed: {query_str}! {errors:?}");
530 }
531
532 Ok(results)
533}
534
535pub(crate) async fn session_execute_iter<P>(
540 session: Arc<Session>,
541 prepared_stmt: &PreparedStatement,
542 params: P,
543) -> anyhow::Result<QueryPager>
544where
545 P: SerializeRow,
546{
547 session
548 .execute_iter(prepared_stmt.clone(), params)
549 .await
550 .map_err(|e| {
551 match e {
552 PagerExecutionError::PrepareError(PrepareError::ConnectionPoolError(err)) => {
553 set_index_db_liveness(false);
554 error!(error = %err, "Index DB connection failed when preparing. Liveness set to false.");
555 CassandraSessionError::ConnectionUnavailable { source: err.into() }.into()
556 },
557 PagerExecutionError::NextPageError(NextPageError::RequestFailure(RequestError::ConnectionPoolError(err))) => {
558 set_index_db_liveness(false);
559 error!(error = %err, "Index DB connection failed during request. Liveness set to false.");
560 CassandraSessionError::ConnectionUnavailable { source: err.into() }.into()
561 },
562 _ => e.into(),
563 }
564 })
565}