cat_gateway/db/index/block/
mod.rs1pub(crate) mod certs;
5pub(crate) mod cip36;
6pub(crate) mod rbac509;
7pub(crate) mod roll_forward;
8pub(crate) mod txi;
9pub(crate) mod txo;
10
11use std::collections::BTreeSet;
12
13use cardano_chain_follower::{hashes::Blake2b256Hash, MultiEraBlock, Slot};
14use certs::CertInsertQuery;
15use cip36::Cip36InsertQuery;
16use rbac509::Rbac509InsertQuery;
17use tokio::sync::watch;
18use tracing::error;
19use txi::TxiInsertQuery;
20use txo::TxoInsertQuery;
21
22use super::{queries::FallibleQueryTasks, session::CassandraSession};
23use crate::rbac::RbacBlockIndexingContext;
24
25pub(crate) async fn index_block(
27 block: &MultiEraBlock, pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>, our_end: Slot,
28) -> anyhow::Result<()> {
29 let Some(session) = CassandraSession::get(block.is_immutable()) else {
31 anyhow::bail!("Failed to get Index DB Session. Cannot index block.");
32 };
33
34 let mut cert_index = CertInsertQuery::new();
35 let mut cip36_index = Cip36InsertQuery::new();
36 let mut rbac509_index = Rbac509InsertQuery::new();
37
38 let mut txi_index = TxiInsertQuery::new();
39 let mut txo_index = TxoInsertQuery::new();
40
41 let slot_no = block.point().slot_or_default();
42
43 let mut rbac_context = RbacBlockIndexingContext::new();
44
45 for (index, txn) in block.enumerate_txs() {
47 let txn_id = Blake2b256Hash::from(txn.hash()).into();
48
49 txi_index.index(&txn, slot_no);
51
52 cip36_index.index(index, slot_no, block)?;
57
58 cert_index.index(&txn, slot_no, index, block);
60
61 txo_index.index(block.network(), &txn, slot_no, txn_id, index);
63
64 Box::pin(rbac509_index.index(
67 txn_id,
68 index,
69 block,
70 pending_blocks,
71 our_end,
72 &mut rbac_context,
73 ))
74 .await?;
75 }
76
77 let mut query_handles: FallibleQueryTasks = Vec::new();
80
81 query_handles.extend(txo_index.execute(&session));
82 query_handles.extend(txi_index.execute(&session));
83 query_handles.extend(cert_index.execute(&session));
84 query_handles.extend(cip36_index.execute(&session));
85 query_handles.extend(rbac509_index.execute(&session));
86
87 let mut result: anyhow::Result<()> = Ok(());
88
89 for handle in query_handles {
91 if result.is_err() {
92 handle.abort();
94 continue;
95 }
96 match handle.await {
97 Ok(join_res) => {
98 if let Err(error) = join_res {
99 error!(error=%error,"Query Failed");
101 result = Err(error);
102 }
103 },
104 Err(error) => {
105 error!(error=%error,"Query Join Failed");
106 result = Err(error.into());
107 },
108 }
109 }
110
111 result
112}