cat_gateway/db/index/block/
mod.rs

1//! Index a block
2//! Primary Data Indexing - Upsert operations
3
4pub(crate) mod certs;
5pub(crate) mod cip36;
6pub(crate) mod rbac509;
7pub(crate) mod roll_forward;
8pub(crate) mod txi;
9pub(crate) mod txo;
10
11use std::collections::BTreeSet;
12
13use cardano_chain_follower::{hashes::Blake2b256Hash, MultiEraBlock, Slot};
14use certs::CertInsertQuery;
15use cip36::Cip36InsertQuery;
16use rbac509::Rbac509InsertQuery;
17use tokio::sync::watch;
18use tracing::error;
19use txi::TxiInsertQuery;
20use txo::TxoInsertQuery;
21
22use super::{queries::FallibleQueryTasks, session::CassandraSession};
23use crate::rbac::RbacBlockIndexingContext;
24
25/// Add all data needed from the block into the indexes.
26pub(crate) async fn index_block(
27    block: &MultiEraBlock, pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>, our_end: Slot,
28) -> anyhow::Result<()> {
29    // Get the session.  This should never fail.
30    let Some(session) = CassandraSession::get(block.is_immutable()) else {
31        anyhow::bail!("Failed to get Index DB Session. Cannot index block.");
32    };
33
34    let mut cert_index = CertInsertQuery::new();
35    let mut cip36_index = Cip36InsertQuery::new();
36    let mut rbac509_index = Rbac509InsertQuery::new();
37
38    let mut txi_index = TxiInsertQuery::new();
39    let mut txo_index = TxoInsertQuery::new();
40
41    let slot_no = block.point().slot_or_default();
42
43    let mut rbac_context = RbacBlockIndexingContext::new();
44
45    // We add all transactions in the block to their respective index data sets.
46    for (index, txn) in block.enumerate_txs() {
47        let txn_id = Blake2b256Hash::from(txn.hash()).into();
48
49        // Index the TXIs.
50        txi_index.index(&txn, slot_no);
51
52        // TODO: Index minting.
53        // let mint = txs.mints().iter() {};
54
55        // TODO: Index Metadata.
56        cip36_index.index(index, slot_no, block)?;
57
58        // Index Certificates inside the transaction.
59        cert_index.index(&txn, slot_no, index, block);
60
61        // Index the TXOs.
62        txo_index.index(block.network(), &txn, slot_no, txn_id, index);
63
64        // Index RBAC 509 inside the transaction.
65        // `Box::pin` is used here because of the future size (`clippy::large_futures` lint).
66        Box::pin(rbac509_index.index(
67            txn_id,
68            index,
69            block,
70            pending_blocks,
71            our_end,
72            &mut rbac_context,
73        ))
74        .await?;
75    }
76
77    // We then execute each batch of data from the block.
78    // This maximizes batching opportunities.
79    let mut query_handles: FallibleQueryTasks = Vec::new();
80
81    query_handles.extend(txo_index.execute(&session));
82    query_handles.extend(txi_index.execute(&session));
83    query_handles.extend(cert_index.execute(&session));
84    query_handles.extend(cip36_index.execute(&session));
85    query_handles.extend(rbac509_index.execute(&session));
86
87    let mut result: anyhow::Result<()> = Ok(());
88
89    // Wait for operations to complete, and display any errors
90    for handle in query_handles {
91        if result.is_err() {
92            // Try and cancel all futures waiting tasks and return the first error we encountered.
93            handle.abort();
94            continue;
95        }
96        match handle.await {
97            Ok(join_res) => {
98                if let Err(error) = join_res {
99                    // IF a query fails, assume everything else is broken.
100                    error!(error=%error,"Query Failed");
101                    result = Err(error);
102                }
103            },
104            Err(error) => {
105                error!(error=%error,"Query Join Failed");
106                result = Err(error.into());
107            },
108        }
109    }
110
111    result
112}