cat_gateway/db/index/queries/purge/
txo_assets.rs1use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5 client::{pager::TypedRowStream, session::Session},
6 statement::prepared::PreparedStatement,
7 SerializeRow,
8};
9use tracing::error;
10
11use crate::{
12 db::{
13 index::{
14 queries::{
15 purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
16 FallibleQueryResults, SizedBatch,
17 },
18 session::CassandraSession,
19 },
20 types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset},
21 },
22 settings::cassandra_db,
23};
24
25pub(crate) mod result {
26 use crate::db::types::{DbSlot, DbStakeAddress, DbTxnIndex, DbTxnOutputOffset};
29
30 pub(crate) type PrimaryKey = (
32 DbStakeAddress,
33 DbSlot,
34 DbTxnIndex,
35 DbTxnOutputOffset,
36 Vec<u8>,
37 Vec<u8>,
38 );
39}
40
41const SELECT_QUERY: &str = include_str!("./cql/get_txo_assets_by_stake_addr.cql");
43
44#[derive(SerializeRow)]
46pub(crate) struct Params {
47 pub(crate) stake_address: DbStakeAddress,
49 pub(crate) slot_no: DbSlot,
51 pub(crate) txn_index: DbTxnIndex,
53 pub(crate) txo: DbTxnOutputOffset,
55 policy_id: Vec<u8>,
57 asset_name: Vec<u8>,
59}
60
61impl Debug for Params {
62 fn fmt(
63 &self,
64 f: &mut std::fmt::Formatter<'_>,
65 ) -> std::fmt::Result {
66 f.debug_struct("Params")
67 .field("stake_address", &self.stake_address)
68 .field("slot_no", &self.slot_no)
69 .field("txn_index", &self.txn_index)
70 .field("txo", &self.txo)
71 .field("policy_id", &self.policy_id)
72 .field("asset_name", &self.asset_name)
73 .finish()
74 }
75}
76
77impl From<result::PrimaryKey> for Params {
78 fn from(value: result::PrimaryKey) -> Self {
79 Self {
80 stake_address: value.0,
81 slot_no: value.1,
82 txn_index: value.2,
83 txo: value.3,
84 policy_id: value.4,
85 asset_name: value.5,
86 }
87 }
88}
89pub(crate) struct PrimaryKeyQuery;
91
92impl PrimaryKeyQuery {
93 pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
95 PreparedQueries::prepare(
96 session.clone(),
97 SELECT_QUERY,
98 scylla::statement::Consistency::All,
99 true,
100 )
101 .await
102 .inspect_err(
103 |error| error!(error=%error, "Failed to prepare get TXO Assets by stake address primary key query."),
104 )
105 .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
106 }
107
108 pub(crate) async fn execute(
110 session: &CassandraSession
111 ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
112 let iter = session
113 .purge_execute_iter(PreparedSelectQuery::TxoAssets)
114 .await?
115 .rows_stream::<result::PrimaryKey>()?;
116
117 Ok(iter)
118 }
119}
120
121const DELETE_QUERY: &str = include_str!("cql/delete_txo_assets_by_stake_address.cql");
123
124pub(crate) struct DeleteQuery;
126
127impl DeleteQuery {
128 pub(crate) async fn prepare_batch(
130 session: &Arc<Session>,
131 cfg: &cassandra_db::EnvVars,
132 ) -> anyhow::Result<SizedBatch> {
133 PreparedQueries::prepare_batch(
134 session.clone(),
135 DELETE_QUERY,
136 cfg,
137 scylla::statement::Consistency::Any,
138 true,
139 false,
140 )
141 .await
142 .inspect_err(
143 |error| error!(error=%error, "Failed to prepare delete TXO Assets by stake address primary key query."),
144 )
145 .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
146 }
147
148 pub(crate) async fn execute(
150 session: &CassandraSession,
151 params: Vec<Params>,
152 ) -> FallibleQueryResults {
153 let results = session
154 .purge_execute_batch(PreparedDeleteQuery::TxoAssets, params)
155 .await?;
156
157 Ok(results)
158 }
159}