cat_gateway/db/index/queries/purge/
txo_ada.rs

1//! TXO by Stake Address Queries used in purging data.
2use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5    client::{pager::TypedRowStream, session::Session},
6    statement::prepared::PreparedStatement,
7    SerializeRow,
8};
9use tracing::error;
10
11use crate::{
12    db::{
13        index::{
14            queries::{
15                purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
16                FallibleQueryResults, SizedBatch,
17            },
18            session::CassandraSession,
19        },
20        types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset},
21    },
22    settings::cassandra_db,
23};
24
25pub(crate) mod result {
26    //! Return values for TXO by Stake Address purge queries.
27
28    use crate::db::types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset};
29
30    /// Primary Key Row
31    pub(crate) type PrimaryKey = (DbStakeAddress, DbSlot, DbTxnIndex, DbTxnOutputOffset);
32}
33
34/// Select primary keys for TXO by Stake Address.
35const SELECT_QUERY: &str = include_str!("./cql/get_txo_by_stake_address.cql");
36
37/// Primary Key Value.
38#[derive(SerializeRow)]
39pub(crate) struct Params {
40    /// Stake Address - Binary 29 bytes.
41    pub(crate) stake_address: DbStakeAddress,
42    /// Block Slot Number
43    pub(crate) slot_no: DbSlot,
44    /// Transaction Offset inside the block.
45    pub(crate) txn_index: DbTxnIndex,
46    /// Transaction Output Offset inside the transaction.
47    pub(crate) txo: DbTxnOutputOffset,
48}
49
50impl Debug for Params {
51    fn fmt(
52        &self,
53        f: &mut std::fmt::Formatter<'_>,
54    ) -> std::fmt::Result {
55        f.debug_struct("Params")
56            .field("stake_address", &self.stake_address)
57            .field("slot_no", &self.slot_no)
58            .field("txn_index", &self.txn_index)
59            .field("txo", &self.txo)
60            .finish()
61    }
62}
63
64impl From<result::PrimaryKey> for Params {
65    fn from(value: result::PrimaryKey) -> Self {
66        Self {
67            stake_address: value.0,
68            slot_no: value.1,
69            txn_index: value.2,
70            txo: value.3,
71        }
72    }
73}
74/// Get primary key for TXO by Stake Address query.
75pub(crate) struct PrimaryKeyQuery;
76
77impl PrimaryKeyQuery {
78    /// Prepares a query to get all TXO by stake address primary keys.
79    pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
80        PreparedQueries::prepare(
81            session.clone(),
82            SELECT_QUERY,
83            scylla::statement::Consistency::All,
84            true,
85        )
86        .await
87        .inspect_err(
88            |error| error!(error=%error, "Failed to prepare get TXO by stake address primary key query."),
89        )
90        .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
91    }
92
93    /// Executes a query to get all TXO by stake address primary keys.
94    pub(crate) async fn execute(
95        session: &CassandraSession
96    ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
97        let iter = session
98            .purge_execute_iter(PreparedSelectQuery::TxoAda)
99            .await?
100            .rows_stream::<result::PrimaryKey>()?;
101
102        Ok(iter)
103    }
104}
105
106/// Delete TXO by Stake Address
107const DELETE_QUERY: &str = include_str!("cql/delete_txo_by_stake_address.cql");
108
109/// Delete TXO by Stake Address Query
110pub(crate) struct DeleteQuery;
111
112impl DeleteQuery {
113    /// Prepare Batch of Delete Queries
114    pub(crate) async fn prepare_batch(
115        session: &Arc<Session>,
116        cfg: &cassandra_db::EnvVars,
117    ) -> anyhow::Result<SizedBatch> {
118        PreparedQueries::prepare_batch(
119            session.clone(),
120            DELETE_QUERY,
121            cfg,
122            scylla::statement::Consistency::Any,
123            true,
124            false,
125        )
126        .await
127        .inspect_err(
128            |error| error!(error=%error, "Failed to prepare delete TXO by stake address primary key query."),
129        )
130        .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
131    }
132
133    /// Executes a DELETE Query
134    pub(crate) async fn execute(
135        session: &CassandraSession,
136        params: Vec<Params>,
137    ) -> FallibleQueryResults {
138        let results = session
139            .purge_execute_batch(PreparedDeleteQuery::TxoAda, params)
140            .await?;
141
142        Ok(results)
143    }
144}