cat_gateway/db/index/queries/purge/
txo_assets.rs

1//! TXO Assets by Stake Address Queries used in purging data.
2use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5    client::{pager::TypedRowStream, session::Session},
6    statement::prepared::PreparedStatement,
7    SerializeRow,
8};
9use tracing::error;
10
11use crate::{
12    db::{
13        index::{
14            queries::{
15                purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
16                FallibleQueryResults, SizedBatch,
17            },
18            session::CassandraSession,
19        },
20        types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset},
21    },
22    settings::cassandra_db,
23};
24
25pub(crate) mod result {
26    //! Return values for TXO Assets by Stake Address purge queries.
27
28    use crate::db::types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset};
29
30    /// Primary Key Row
31    pub(crate) type PrimaryKey = (
32        DbStakeAddress,
33        DbSlot,
34        DbTxnIndex,
35        DbTxnOutputOffset,
36        Vec<u8>,
37        Vec<u8>,
38    );
39}
40
41/// Select primary keys for TXO Assets by Stake Address.
42const SELECT_QUERY: &str = include_str!("./cql/get_txo_assets_by_stake_addr.cql");
43
44/// Primary Key Value.
45#[derive(SerializeRow)]
46pub(crate) struct Params {
47    /// Stake Address - Binary 29 bytes.
48    pub(crate) stake_address: DbStakeAddress,
49    /// Block Slot Number
50    pub(crate) slot_no: DbSlot,
51    /// Transaction Offset inside the block.
52    pub(crate) txn_index: DbTxnIndex,
53    /// Transaction Output Offset inside the transaction.
54    pub(crate) txo: DbTxnOutputOffset,
55    /// Asset policy hash (28 bytes).
56    policy_id: Vec<u8>,
57    /// Asset name (range of 0 - 32 bytes).
58    asset_name: Vec<u8>,
59}
60
61impl Debug for Params {
62    fn fmt(
63        &self,
64        f: &mut std::fmt::Formatter<'_>,
65    ) -> std::fmt::Result {
66        f.debug_struct("Params")
67            .field("stake_address", &self.stake_address)
68            .field("slot_no", &self.slot_no)
69            .field("txn_index", &self.txn_index)
70            .field("txo", &self.txo)
71            .field("policy_id", &self.policy_id)
72            .field("asset_name", &self.asset_name)
73            .finish()
74    }
75}
76
77impl From<result::PrimaryKey> for Params {
78    fn from(value: result::PrimaryKey) -> Self {
79        Self {
80            stake_address: value.0,
81            slot_no: value.1,
82            txn_index: value.2,
83            txo: value.3,
84            policy_id: value.4,
85            asset_name: value.5,
86        }
87    }
88}
89/// Get primary key for TXO Assets by Stake Address query.
90pub(crate) struct PrimaryKeyQuery;
91
92impl PrimaryKeyQuery {
93    /// Prepares a query to get all TXO Assets by stake address primary keys.
94    pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
95        PreparedQueries::prepare(
96            session.clone(),
97            SELECT_QUERY,
98            scylla::statement::Consistency::All,
99            true,
100        )
101        .await
102        .inspect_err(
103            |error| error!(error=%error, "Failed to prepare get TXO Assets by stake address primary key query."),
104        )
105        .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
106    }
107
108    /// Executes a query to get all TXO Assets by stake address primary keys.
109    pub(crate) async fn execute(
110        session: &CassandraSession
111    ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
112        let iter = session
113            .purge_execute_iter(PreparedSelectQuery::TxoAssets)
114            .await?
115            .rows_stream::<result::PrimaryKey>()?;
116
117        Ok(iter)
118    }
119}
120
121/// Delete TXO Assets by Stake Address
122const DELETE_QUERY: &str = include_str!("cql/delete_txo_assets_by_stake_address.cql");
123
124/// Delete TXO Assets by Stake Address Query
125pub(crate) struct DeleteQuery;
126
127impl DeleteQuery {
128    /// Prepare Batch of Delete Queries
129    pub(crate) async fn prepare_batch(
130        session: &Arc<Session>,
131        cfg: &cassandra_db::EnvVars,
132    ) -> anyhow::Result<SizedBatch> {
133        PreparedQueries::prepare_batch(
134            session.clone(),
135            DELETE_QUERY,
136            cfg,
137            scylla::statement::Consistency::Any,
138            true,
139            false,
140        )
141        .await
142        .inspect_err(
143            |error| error!(error=%error, "Failed to prepare delete TXO Assets by stake address primary key query."),
144        )
145        .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
146    }
147
148    /// Executes a DELETE Query
149    pub(crate) async fn execute(
150        session: &CassandraSession,
151        params: Vec<Params>,
152    ) -> FallibleQueryResults {
153        let results = session
154            .purge_execute_batch(PreparedDeleteQuery::TxoAssets, params)
155            .await?;
156
157        Ok(results)
158    }
159}