cat_gateway/db/index/block/
txi.rs1use std::sync::Arc;
4
5use cardano_chain_follower::{
6 hashes::{Blake2b256Hash, TransactionId},
7 Slot, TxnOutputOffset,
8};
9use scylla::{client::session::Session, SerializeRow};
10use tracing::error;
11
12use crate::{
13 db::{
14 index::{
15 queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
16 session::CassandraSession,
17 },
18 types::{DbSlot, DbTransactionId, DbTxnOutputOffset},
19 },
20 settings::cassandra_db,
21};
22
23#[derive(SerializeRow, Debug)]
25pub(crate) struct TxiInsertParams {
26 txn_id: DbTransactionId,
28 txo: DbTxnOutputOffset,
30 slot_no: DbSlot,
32}
33
34impl TxiInsertParams {
35 pub fn new(txn_id: TransactionId, txo: TxnOutputOffset, slot: Slot) -> Self {
37 Self {
38 txn_id: txn_id.into(),
39 txo: txo.into(),
40 slot_no: slot.into(),
41 }
42 }
43}
44
45pub(crate) struct TxiInsertQuery {
47 txi_data: Vec<TxiInsertParams>,
49}
50
51const INSERT_TXI_QUERY: &str = include_str!("./cql/insert_txi.cql");
53
54impl TxiInsertQuery {
55 pub(crate) fn new() -> Self {
57 Self {
58 txi_data: Vec::new(),
59 }
60 }
61
62 pub(crate) async fn prepare_batch(
64 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
65 ) -> anyhow::Result<SizedBatch> {
66 PreparedQueries::prepare_batch(
67 session.clone(),
68 INSERT_TXI_QUERY,
69 cfg,
70 scylla::statement::Consistency::Any,
71 true,
72 false,
73 )
74 .await
75 .inspect_err(|error| error!(error=%error,"Failed to prepare Insert TXI Query."))
76 .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_TXI_QUERY}"))
77 }
78
79 pub(crate) fn index(
81 &mut self, txs: &cardano_chain_follower::pallas_traverse::MultiEraTx<'_>, slot_no: Slot,
82 ) {
83 for txi in txs.inputs() {
85 let txn_id = Blake2b256Hash::from(*txi.hash()).into();
86 let txo = txi.index().try_into().unwrap_or(i16::MAX).into();
87
88 self.txi_data
89 .push(TxiInsertParams::new(txn_id, txo, slot_no));
90 }
91 }
92
93 pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
97 let mut query_handles: FallibleQueryTasks = Vec::new();
98
99 let inner_session = session.clone();
100
101 query_handles.push(tokio::spawn(async move {
102 inner_session
103 .execute_batch(PreparedQuery::TxiInsertQuery, self.txi_data)
104 .await
105 }));
106
107 query_handles
108 }
109}