cat_gateway/db/index/block/
mod.rs

1//! Index a block
2//! Primary Data Indexing - Upsert operations
3
4pub(crate) mod certs;
5pub(crate) mod cip36;
6pub(crate) mod rbac509;
7pub(crate) mod roll_forward;
8pub(crate) mod txi;
9pub(crate) mod txo;
10
11use std::collections::BTreeSet;
12
13use cardano_chain_follower::{hashes::Blake2b256Hash, MultiEraBlock, Slot};
14use certs::CertInsertQuery;
15use cip36::Cip36InsertQuery;
16use rbac509::Rbac509InsertQuery;
17use tokio::sync::watch;
18use tracing::error;
19use txi::TxiInsertQuery;
20use txo::TxoInsertQuery;
21
22use super::{queries::FallibleQueryTasks, session::CassandraSession};
23use crate::rbac::RbacBlockIndexingContext;
24
25/// Add all data needed from the block into the indexes.
26pub(crate) async fn index_block(
27    block: &MultiEraBlock,
28    pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
29    our_end: Slot,
30) -> anyhow::Result<()> {
31    // Get the session.  This should never fail.
32    let Some(session) = CassandraSession::get(block.is_immutable()) else {
33        anyhow::bail!("Failed to get Index DB Session. Cannot index block.");
34    };
35
36    let mut cert_index = CertInsertQuery::new();
37    let mut cip36_index = Cip36InsertQuery::new();
38    let mut rbac509_index = Rbac509InsertQuery::new();
39
40    let mut txi_index = TxiInsertQuery::new();
41    let mut txo_index = TxoInsertQuery::new();
42
43    let slot_no = block.point().slot_or_default();
44
45    let mut rbac_context = RbacBlockIndexingContext::new();
46
47    // We add all transactions in the block to their respective index data sets.
48    for (index, txn) in block.enumerate_txs() {
49        let txn_id = Blake2b256Hash::from(txn.hash()).into();
50
51        // Index the TXIs.
52        txi_index.index(&txn, slot_no);
53
54        // TODO: Index minting.
55        // let mint = txs.mints().iter() {};
56
57        // TODO: Index Metadata.
58        cip36_index.index(index, slot_no, block)?;
59
60        // Index Certificates inside the transaction.
61        cert_index.index(&txn, slot_no, index, block);
62
63        // Index the TXOs.
64        txo_index.index(block.network(), &txn, slot_no, txn_id, index);
65
66        // Index RBAC 509 inside the transaction.
67        // `Box::pin` is used here because of the future size (`clippy::large_futures` lint).
68        Box::pin(rbac509_index.index(
69            txn_id,
70            index,
71            block,
72            pending_blocks,
73            our_end,
74            &mut rbac_context,
75        ))
76        .await?;
77    }
78
79    // We then execute each batch of data from the block.
80    // This maximizes batching opportunities.
81    let mut query_handles: FallibleQueryTasks = Vec::new();
82
83    query_handles.extend(txo_index.execute(&session));
84    query_handles.extend(txi_index.execute(&session));
85    query_handles.extend(cert_index.execute(&session));
86    query_handles.extend(cip36_index.execute(&session));
87    query_handles.extend(rbac509_index.execute(&session));
88
89    let mut result: anyhow::Result<()> = Ok(());
90
91    // Wait for operations to complete, and display any errors
92    for handle in query_handles {
93        if result.is_err() {
94            // Try and cancel all futures waiting tasks and return the first error we encountered.
95            handle.abort();
96            continue;
97        }
98        match handle.await {
99            Ok(join_res) => {
100                if let Err(error) = join_res {
101                    // IF a query fails, assume everything else is broken.
102                    error!(error=%error,"Query Failed");
103                    result = Err(error);
104                }
105            },
106            Err(error) => {
107                error!(error=%error,"Query Join Failed");
108                result = Err(error.into());
109            },
110        }
111    }
112
113    result
114}