cat_gateway/db/index/block/
mod.rs1pub(crate) mod certs;
5pub(crate) mod cip36;
6pub(crate) mod rbac509;
7pub(crate) mod roll_forward;
8pub(crate) mod txi;
9pub(crate) mod txo;
10
11use std::collections::BTreeSet;
12
13use cardano_chain_follower::{hashes::Blake2b256Hash, MultiEraBlock, Slot};
14use certs::CertInsertQuery;
15use cip36::Cip36InsertQuery;
16use rbac509::Rbac509InsertQuery;
17use tokio::sync::watch;
18use tracing::error;
19use txi::TxiInsertQuery;
20use txo::TxoInsertQuery;
21
22use super::{queries::FallibleQueryTasks, session::CassandraSession};
23use crate::rbac::RbacBlockIndexingContext;
24
25pub(crate) async fn index_block(
27 block: &MultiEraBlock,
28 pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
29 our_end: Slot,
30) -> anyhow::Result<()> {
31 let Some(session) = CassandraSession::get(block.is_immutable()) else {
33 anyhow::bail!("Failed to get Index DB Session. Cannot index block.");
34 };
35
36 let mut cert_index = CertInsertQuery::new();
37 let mut cip36_index = Cip36InsertQuery::new();
38 let mut rbac509_index = Rbac509InsertQuery::new();
39
40 let mut txi_index = TxiInsertQuery::new();
41 let mut txo_index = TxoInsertQuery::new();
42
43 let slot_no = block.point().slot_or_default();
44
45 let mut rbac_context = RbacBlockIndexingContext::new();
46
47 for (index, txn) in block.enumerate_txs() {
49 let txn_id = Blake2b256Hash::from(txn.hash()).into();
50
51 txi_index.index(&txn, slot_no);
53
54 cip36_index.index(index, slot_no, block)?;
59
60 cert_index.index(&txn, slot_no, index, block);
62
63 txo_index.index(block.network(), &txn, slot_no, txn_id, index);
65
66 Box::pin(rbac509_index.index(
69 txn_id,
70 index,
71 block,
72 pending_blocks,
73 our_end,
74 &mut rbac_context,
75 ))
76 .await?;
77 }
78
79 let mut query_handles: FallibleQueryTasks = Vec::new();
82
83 query_handles.extend(txo_index.execute(&session));
84 query_handles.extend(txi_index.execute(&session));
85 query_handles.extend(cert_index.execute(&session));
86 query_handles.extend(cip36_index.execute(&session));
87 query_handles.extend(rbac509_index.execute(&session));
88
89 let mut result: anyhow::Result<()> = Ok(());
90
91 for handle in query_handles {
93 if result.is_err() {
94 handle.abort();
96 continue;
97 }
98 match handle.await {
99 Ok(join_res) => {
100 if let Err(error) = join_res {
101 error!(error=%error,"Query Failed");
103 result = Err(error);
104 }
105 },
106 Err(error) => {
107 error!(error=%error,"Query Join Failed");
108 result = Err(error.into());
109 },
110 }
111 }
112
113 result
114}