cat_gateway/db/index/block/
txi.rs1use std::sync::Arc;
4
5use cardano_chain_follower::{
6 hashes::{Blake2b256Hash, TransactionId},
7 Slot, TxnOutputOffset,
8};
9use scylla::{client::session::Session, SerializeRow};
10use tracing::error;
11
12use crate::{
13 db::{
14 index::{
15 queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
16 session::CassandraSession,
17 },
18 types::{DbSlot, DbTransactionId, DbTxnOutputOffset},
19 },
20 settings::cassandra_db,
21};
22
23#[derive(SerializeRow, Debug)]
25pub(crate) struct TxiInsertParams {
26 txn_id: DbTransactionId,
28 txo: DbTxnOutputOffset,
30 slot_no: DbSlot,
32}
33
34impl TxiInsertParams {
35 pub fn new(
37 txn_id: TransactionId,
38 txo: TxnOutputOffset,
39 slot: Slot,
40 ) -> Self {
41 Self {
42 txn_id: txn_id.into(),
43 txo: txo.into(),
44 slot_no: slot.into(),
45 }
46 }
47}
48
49pub(crate) struct TxiInsertQuery {
51 txi_data: Vec<TxiInsertParams>,
53}
54
55const INSERT_TXI_QUERY: &str = include_str!("./cql/insert_txi.cql");
57
58impl TxiInsertQuery {
59 pub(crate) fn new() -> Self {
61 Self {
62 txi_data: Vec::new(),
63 }
64 }
65
66 pub(crate) async fn prepare_batch(
68 session: &Arc<Session>,
69 cfg: &cassandra_db::EnvVars,
70 ) -> anyhow::Result<SizedBatch> {
71 PreparedQueries::prepare_batch(
72 session.clone(),
73 INSERT_TXI_QUERY,
74 cfg,
75 scylla::statement::Consistency::Any,
76 true,
77 false,
78 )
79 .await
80 .inspect_err(|error| error!(error=%error,"Failed to prepare Insert TXI Query."))
81 .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_TXI_QUERY}"))
82 }
83
84 pub(crate) fn index(
86 &mut self,
87 txs: &cardano_chain_follower::pallas_traverse::MultiEraTx<'_>,
88 slot_no: Slot,
89 ) {
90 for txi in txs.inputs() {
92 let txn_id = Blake2b256Hash::from(*txi.hash()).into();
93 let txo = txi.index().try_into().unwrap_or(i16::MAX).into();
94
95 self.txi_data
96 .push(TxiInsertParams::new(txn_id, txo, slot_no));
97 }
98 }
99
100 pub(crate) fn execute(
104 self,
105 session: &Arc<CassandraSession>,
106 ) -> FallibleQueryTasks {
107 let mut query_handles: FallibleQueryTasks = Vec::new();
108
109 let inner_session = session.clone();
110
111 query_handles.push(tokio::spawn(async move {
112 inner_session
113 .execute_batch(PreparedQuery::TxiInsertQuery, self.txi_data)
114 .await
115 }));
116
117 query_handles
118 }
119}