cat_gateway/db/index/queries/purge/
txo_ada.rs1use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5 client::{pager::TypedRowStream, session::Session},
6 statement::prepared::PreparedStatement,
7 SerializeRow,
8};
9use tracing::error;
10
11use crate::{
12 db::{
13 index::{
14 queries::{
15 purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
16 FallibleQueryResults, SizedBatch,
17 },
18 session::CassandraSession,
19 },
20 types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset},
21 },
22 settings::cassandra_db,
23};
24
25pub(crate) mod result {
26 use crate::db::types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset};
29
30 pub(crate) type PrimaryKey = (DbStakeAddress, DbSlot, DbTxnIndex, DbTxnOutputOffset);
32}
33
34const SELECT_QUERY: &str = include_str!("./cql/get_txo_by_stake_address.cql");
36
37#[derive(SerializeRow)]
39pub(crate) struct Params {
40 pub(crate) stake_address: DbStakeAddress,
42 pub(crate) slot_no: DbSlot,
44 pub(crate) txn_index: DbTxnIndex,
46 pub(crate) txo: DbTxnOutputOffset,
48}
49
50impl Debug for Params {
51 fn fmt(
52 &self,
53 f: &mut std::fmt::Formatter<'_>,
54 ) -> std::fmt::Result {
55 f.debug_struct("Params")
56 .field("stake_address", &self.stake_address)
57 .field("slot_no", &self.slot_no)
58 .field("txn_index", &self.txn_index)
59 .field("txo", &self.txo)
60 .finish()
61 }
62}
63
64impl From<result::PrimaryKey> for Params {
65 fn from(value: result::PrimaryKey) -> Self {
66 Self {
67 stake_address: value.0,
68 slot_no: value.1,
69 txn_index: value.2,
70 txo: value.3,
71 }
72 }
73}
74pub(crate) struct PrimaryKeyQuery;
76
77impl PrimaryKeyQuery {
78 pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
80 PreparedQueries::prepare(
81 session.clone(),
82 SELECT_QUERY,
83 scylla::statement::Consistency::All,
84 true,
85 )
86 .await
87 .inspect_err(
88 |error| error!(error=%error, "Failed to prepare get TXO by stake address primary key query."),
89 )
90 .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
91 }
92
93 pub(crate) async fn execute(
95 session: &CassandraSession
96 ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
97 let iter = session
98 .purge_execute_iter(PreparedSelectQuery::TxoAda)
99 .await?
100 .rows_stream::<result::PrimaryKey>()?;
101
102 Ok(iter)
103 }
104}
105
106const DELETE_QUERY: &str = include_str!("cql/delete_txo_by_stake_address.cql");
108
109pub(crate) struct DeleteQuery;
111
112impl DeleteQuery {
113 pub(crate) async fn prepare_batch(
115 session: &Arc<Session>,
116 cfg: &cassandra_db::EnvVars,
117 ) -> anyhow::Result<SizedBatch> {
118 PreparedQueries::prepare_batch(
119 session.clone(),
120 DELETE_QUERY,
121 cfg,
122 scylla::statement::Consistency::Any,
123 true,
124 false,
125 )
126 .await
127 .inspect_err(
128 |error| error!(error=%error, "Failed to prepare delete TXO by stake address primary key query."),
129 )
130 .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
131 }
132
133 pub(crate) async fn execute(
135 session: &CassandraSession,
136 params: Vec<Params>,
137 ) -> FallibleQueryResults {
138 let results = session
139 .purge_execute_batch(PreparedDeleteQuery::TxoAda, params)
140 .await?;
141
142 Ok(results)
143 }
144}