cat_gateway/db/index/block/
txi.rs

1//! Insert TXI Index Data Queries.
2
3use std::sync::Arc;
4
5use cardano_chain_follower::{
6    hashes::{Blake2b256Hash, TransactionId},
7    Slot, TxnOutputOffset,
8};
9use scylla::{client::session::Session, SerializeRow};
10use tracing::error;
11
12use crate::{
13    db::{
14        index::{
15            queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
16            session::CassandraSession,
17        },
18        types::{DbSlot, DbTransactionId, DbTxnOutputOffset},
19    },
20    settings::cassandra_db,
21};
22
23/// Insert TXI Query and Parameters
24#[derive(SerializeRow, Debug)]
25pub(crate) struct TxiInsertParams {
26    /// Spent Transactions Hash
27    txn_id: DbTransactionId,
28    /// TXO Index spent.
29    txo: DbTxnOutputOffset,
30    /// Block Slot Number when spend occurred.
31    slot_no: DbSlot,
32}
33
34impl TxiInsertParams {
35    /// Create a new record for this transaction.
36    pub fn new(
37        txn_id: TransactionId,
38        txo: TxnOutputOffset,
39        slot: Slot,
40    ) -> Self {
41        Self {
42            txn_id: txn_id.into(),
43            txo: txo.into(),
44            slot_no: slot.into(),
45        }
46    }
47}
48
49/// Insert TXI Query and Parameters
50pub(crate) struct TxiInsertQuery {
51    /// Transaction Input Data to be inserted.
52    txi_data: Vec<TxiInsertParams>,
53}
54
55/// TXI by Txn hash Index
56const INSERT_TXI_QUERY: &str = include_str!("./cql/insert_txi.cql");
57
58impl TxiInsertQuery {
59    /// Create a new record for this transaction.
60    pub(crate) fn new() -> Self {
61        Self {
62            txi_data: Vec::new(),
63        }
64    }
65
66    /// Prepare Batch of Insert TXI Index Data Queries
67    pub(crate) async fn prepare_batch(
68        session: &Arc<Session>,
69        cfg: &cassandra_db::EnvVars,
70    ) -> anyhow::Result<SizedBatch> {
71        PreparedQueries::prepare_batch(
72            session.clone(),
73            INSERT_TXI_QUERY,
74            cfg,
75            scylla::statement::Consistency::Any,
76            true,
77            false,
78        )
79        .await
80        .inspect_err(|error| error!(error=%error,"Failed to prepare Insert TXI Query."))
81        .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_TXI_QUERY}"))
82    }
83
84    /// Index the transaction Inputs.
85    pub(crate) fn index(
86        &mut self,
87        txs: &cardano_chain_follower::pallas_traverse::MultiEraTx<'_>,
88        slot_no: Slot,
89    ) {
90        // Index the TXI's.
91        for txi in txs.inputs() {
92            let txn_id = Blake2b256Hash::from(*txi.hash()).into();
93            let txo = txi.index().try_into().unwrap_or(i16::MAX).into();
94
95            self.txi_data
96                .push(TxiInsertParams::new(txn_id, txo, slot_no));
97        }
98    }
99
100    /// Execute the Certificate Indexing Queries.
101    ///
102    /// Consumes the `self` and returns a vector of futures.
103    pub(crate) fn execute(
104        self,
105        session: &Arc<CassandraSession>,
106    ) -> FallibleQueryTasks {
107        let mut query_handles: FallibleQueryTasks = Vec::new();
108
109        let inner_session = session.clone();
110
111        query_handles.push(tokio::spawn(async move {
112            inner_session
113                .execute_batch(PreparedQuery::TxiInsertQuery, self.txi_data)
114                .await
115        }));
116
117        query_handles
118    }
119}