cat_gateway/db/index/block/
txi.rs

1//! Insert TXI Index Data Queries.
2
3use std::sync::Arc;
4
5use cardano_chain_follower::{
6    hashes::{Blake2b256Hash, TransactionId},
7    Slot, TxnOutputOffset,
8};
9use scylla::{client::session::Session, SerializeRow};
10use tracing::error;
11
12use crate::{
13    db::{
14        index::{
15            queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
16            session::CassandraSession,
17        },
18        types::{DbSlot, DbTransactionId, DbTxnOutputOffset},
19    },
20    settings::cassandra_db,
21};
22
23/// Insert TXI Query and Parameters
24#[derive(SerializeRow, Debug)]
25pub(crate) struct TxiInsertParams {
26    /// Spent Transactions Hash
27    txn_id: DbTransactionId,
28    /// TXO Index spent.
29    txo: DbTxnOutputOffset,
30    /// Block Slot Number when spend occurred.
31    slot_no: DbSlot,
32}
33
34impl TxiInsertParams {
35    /// Create a new record for this transaction.
36    pub fn new(txn_id: TransactionId, txo: TxnOutputOffset, slot: Slot) -> Self {
37        Self {
38            txn_id: txn_id.into(),
39            txo: txo.into(),
40            slot_no: slot.into(),
41        }
42    }
43}
44
45/// Insert TXI Query and Parameters
46pub(crate) struct TxiInsertQuery {
47    /// Transaction Input Data to be inserted.
48    txi_data: Vec<TxiInsertParams>,
49}
50
51/// TXI by Txn hash Index
52const INSERT_TXI_QUERY: &str = include_str!("./cql/insert_txi.cql");
53
54impl TxiInsertQuery {
55    /// Create a new record for this transaction.
56    pub(crate) fn new() -> Self {
57        Self {
58            txi_data: Vec::new(),
59        }
60    }
61
62    /// Prepare Batch of Insert TXI Index Data Queries
63    pub(crate) async fn prepare_batch(
64        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
65    ) -> anyhow::Result<SizedBatch> {
66        PreparedQueries::prepare_batch(
67            session.clone(),
68            INSERT_TXI_QUERY,
69            cfg,
70            scylla::statement::Consistency::Any,
71            true,
72            false,
73        )
74        .await
75        .inspect_err(|error| error!(error=%error,"Failed to prepare Insert TXI Query."))
76        .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_TXI_QUERY}"))
77    }
78
79    /// Index the transaction Inputs.
80    pub(crate) fn index(
81        &mut self, txs: &cardano_chain_follower::pallas_traverse::MultiEraTx<'_>, slot_no: Slot,
82    ) {
83        // Index the TXI's.
84        for txi in txs.inputs() {
85            let txn_id = Blake2b256Hash::from(*txi.hash()).into();
86            let txo = txi.index().try_into().unwrap_or(i16::MAX).into();
87
88            self.txi_data
89                .push(TxiInsertParams::new(txn_id, txo, slot_no));
90        }
91    }
92
93    /// Execute the Certificate Indexing Queries.
94    ///
95    /// Consumes the `self` and returns a vector of futures.
96    pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
97        let mut query_handles: FallibleQueryTasks = Vec::new();
98
99        let inner_session = session.clone();
100
101        query_handles.push(tokio::spawn(async move {
102            inner_session
103                .execute_batch(PreparedQuery::TxiInsertQuery, self.txi_data)
104                .await
105        }));
106
107        query_handles
108    }
109}