cat_gateway/db/index/queries/purge/
mod.rs

1//! Queries for purging volatile data.
2
3pub(crate) mod catalyst_id_for_public_key;
4pub(crate) mod catalyst_id_for_stake_address;
5pub(crate) mod catalyst_id_for_txn_id;
6pub(crate) mod cip36_registration;
7pub(crate) mod cip36_registration_for_vote_key;
8pub(crate) mod cip36_registration_invalid;
9pub(crate) mod rbac509_invalid_registration;
10pub(crate) mod rbac509_registration;
11pub(crate) mod stake_registration;
12pub(crate) mod txi_by_hash;
13pub(crate) mod txo_ada;
14pub(crate) mod txo_assets;
15pub(crate) mod unstaked_txo_ada;
16pub(crate) mod unstaked_txo_assets;
17
18use std::{fmt::Debug, sync::Arc};
19
20use scylla::{
21    client::{pager::QueryPager, session::Session},
22    serialize::row::SerializeRow,
23    statement::prepared::PreparedStatement,
24};
25
26use super::{FallibleQueryResults, SizedBatch};
27use crate::settings::cassandra_db;
28
29/// No parameters
30const NO_PARAMS: () = ();
31
32/// All prepared DELETE query statements (purge DB table rows).
33#[derive(strum_macros::Display)]
34pub(crate) enum PreparedDeleteQuery {
35    /// TXO Delete query.
36    TxoAda,
37    /// TXO Assets Delete query.
38    TxoAssets,
39    /// Unstaked TXO Delete query.
40    UnstakedTxoAda,
41    /// Unstaked TXO Asset Delete query.
42    UnstakedTxoAsset,
43    /// TXI by TXN Hash Delete query.
44    Txi,
45    /// Stake Registration Delete query.
46    StakeRegistration,
47    /// CIP 36 Registration Delete Query.
48    Cip36Registration,
49    /// CIP 36 Registration Invalid Delete query.
50    Cip36RegistrationInvalid,
51    /// CIP 36 Registration for vote key Delete query.
52    Cip36RegistrationForVoteKey,
53    /// RBAC 509 Registration Delete query.
54    Rbac509,
55    /// Invalid RBAC 509 Registration Delete query.
56    Rbac509Invalid,
57    /// Catalyst ID for transaction ID delete query.
58    CatalystIdForTxnId,
59    /// Catalyst ID for stake address delete query.
60    CatalystIdForStakeAddress,
61    /// Catalyst ID for public key delete query.
62    CatalystIdForPublicKey,
63}
64
65/// All prepared SELECT query statements (primary keys from table).
66#[derive(strum_macros::Display)]
67pub(crate) enum PreparedSelectQuery {
68    /// TXO Select query.
69    TxoAda,
70    /// TXO Asset Select query.
71    TxoAssets,
72    /// Unstaked TXO Select query.
73    UnstakedTxoAda,
74    /// Unstaked TXO Asset Select query.
75    UnstakedTxoAsset,
76    /// TXI by TXN Hash Select query.
77    Txi,
78    /// Stake Registration Select query.
79    StakeRegistration,
80    /// CIP 36 Registration Select Query.
81    Cip36Registration,
82    /// CIP 36 Registration Invalid Select query.
83    Cip36RegistrationInvalid,
84    /// CIP 36 Registration for vote key Select query.
85    Cip36RegistrationForVoteKey,
86    /// RBAC 509 Registration Select query.
87    Rbac509,
88    /// Invalid RBAC 509 Registration Select query.
89    Rbac509Invalid,
90    /// Catalyst ID for transaction ID select query.
91    CatalystIdForTxnId,
92    /// Catalyst ID for stake address select query.
93    CatalystIdForStakeAddress,
94    /// Catalyst ID for public key select query.
95    CatalystIdForPublicKey,
96}
97
98/// All prepared purge queries for a session.
99pub(crate) struct PreparedQueries {
100    /// TXO ADA Primary Key Query.
101    select_txo_ada: PreparedStatement,
102    /// TXO Delete Query.
103    delete_txo_ada: SizedBatch,
104    /// TXO Asset Primary Key Query.
105    select_txo_assets: PreparedStatement,
106    /// TXO Assets Delete Query.
107    delete_txo_assets: SizedBatch,
108    /// Unstaked TXO ADA Primary Key Query.
109    select_unstaked_txo_ada: PreparedStatement,
110    /// Unstaked TXO ADA Delete Query.
111    delete_unstaked_txo_ada: SizedBatch,
112    /// Unstaked TXO Assets Primary Key Query.
113    select_unstaked_txo_assets: PreparedStatement,
114    /// Unstaked TXO Asset Delete Query.
115    delete_unstaked_txo_assets: SizedBatch,
116    /// TXI by TXN Hash by TXN Hash Primary Key Query.
117    select_txi_by_hash: PreparedStatement,
118    /// TXI by TXN Hash Delete Query.
119    delete_txi_by_hash: SizedBatch,
120    /// Stake Registration Primary Key Query.
121    select_stake_registration: PreparedStatement,
122    /// Stake Registration Delete Query.
123    delete_stake_registration: SizedBatch,
124    /// CIP36 Registrations Primary Key Query.
125    select_cip36_registration: PreparedStatement,
126    /// CIP36 Registrations Delete Query.
127    delete_cip36_registration: SizedBatch,
128    /// CIP36 Registration Invalid Primary Key Query.
129    select_cip36_registration_invalid: PreparedStatement,
130    /// CIP36 Registration Invalid Delete Query.
131    delete_cip36_registration_invalid: SizedBatch,
132    /// CIP36 Registration for Vote Key Primary Key Query.
133    select_cip36_registration_for_vote_key: PreparedStatement,
134    /// CIP36 Registration for Vote Key Delete Query.
135    delete_cip36_registration_for_vote_key: SizedBatch,
136    /// RBAC 509 Registrations Primary Key Query.
137    select_rbac509_registration: PreparedStatement,
138    /// RBAC 509 Registrations Delete Query.
139    delete_rbac509_registration: SizedBatch,
140    /// RBAC 509 invalid registrations Primary Key Query.
141    select_rbac509_invalid_registration: PreparedStatement,
142    /// RBAC 509 invalid registrations Delete Query.
143    delete_rbac509_invalid_registration: SizedBatch,
144    /// A Catalyst ID for transaction ID primary key query.
145    select_catalyst_id_for_txn_id: PreparedStatement,
146    /// A Catalyst ID for transaction ID delete query.
147    delete_catalyst_id_for_txn_id: SizedBatch,
148    /// A Catalyst ID for stake address primary key query.
149    select_catalyst_id_for_stake_address: PreparedStatement,
150    /// A Catalyst ID for stake address delete query.
151    delete_catalyst_id_for_stake_address: SizedBatch,
152    /// A Catalyst ID for public key primary key query.
153    select_catalyst_id_for_public_key: PreparedStatement,
154    /// A Catalyst ID for public key delete query.
155    delete_catalyst_id_for_public_key: SizedBatch,
156}
157
158impl PreparedQueries {
159    /// Create new prepared queries for a given session.
160    pub(crate) async fn new(
161        session: Arc<Session>,
162        cfg: &cassandra_db::EnvVars,
163    ) -> anyhow::Result<Self> {
164        // We initialize like this, so that all errors preparing querys get shown before aborting.
165        Ok(Self {
166            select_txo_ada: txo_ada::PrimaryKeyQuery::prepare(&session).await?,
167            delete_txo_ada: txo_ada::DeleteQuery::prepare_batch(&session, cfg).await?,
168            select_txo_assets: txo_assets::PrimaryKeyQuery::prepare(&session).await?,
169            delete_txo_assets: txo_assets::DeleteQuery::prepare_batch(&session, cfg).await?,
170            select_unstaked_txo_ada: unstaked_txo_ada::PrimaryKeyQuery::prepare(&session).await?,
171            delete_unstaked_txo_ada: unstaked_txo_ada::DeleteQuery::prepare_batch(&session, cfg)
172                .await?,
173            select_unstaked_txo_assets: unstaked_txo_assets::PrimaryKeyQuery::prepare(&session)
174                .await?,
175            delete_unstaked_txo_assets: unstaked_txo_assets::DeleteQuery::prepare_batch(
176                &session, cfg,
177            )
178            .await?,
179            select_txi_by_hash: txi_by_hash::PrimaryKeyQuery::prepare(&session).await?,
180            delete_txi_by_hash: txi_by_hash::DeleteQuery::prepare_batch(&session, cfg).await?,
181            select_stake_registration: stake_registration::PrimaryKeyQuery::prepare(&session)
182                .await?,
183            delete_stake_registration: stake_registration::DeleteQuery::prepare_batch(
184                &session, cfg,
185            )
186            .await?,
187            select_cip36_registration: cip36_registration::PrimaryKeyQuery::prepare(&session)
188                .await?,
189            delete_cip36_registration: cip36_registration::DeleteQuery::prepare_batch(
190                &session, cfg,
191            )
192            .await?,
193            select_cip36_registration_invalid:
194                cip36_registration_invalid::PrimaryKeyQuery::prepare(&session).await?,
195            delete_cip36_registration_invalid:
196                cip36_registration_invalid::DeleteQuery::prepare_batch(&session, cfg).await?,
197            select_cip36_registration_for_vote_key:
198                cip36_registration_for_vote_key::PrimaryKeyQuery::prepare(&session).await?,
199            delete_cip36_registration_for_vote_key:
200                cip36_registration_for_vote_key::DeleteQuery::prepare_batch(&session, cfg).await?,
201            select_rbac509_registration: rbac509_registration::PrimaryKeyQuery::prepare(&session)
202                .await?,
203            delete_rbac509_registration: rbac509_registration::DeleteQuery::prepare_batch(
204                &session, cfg,
205            )
206            .await?,
207            select_rbac509_invalid_registration:
208                rbac509_invalid_registration::PrimaryKeyQuery::prepare(&session).await?,
209            delete_rbac509_invalid_registration:
210                rbac509_invalid_registration::DeleteQuery::prepare_batch(&session, cfg).await?,
211            select_catalyst_id_for_txn_id: catalyst_id_for_txn_id::PrimaryKeyQuery::prepare(
212                &session,
213            )
214            .await?,
215            delete_catalyst_id_for_txn_id: catalyst_id_for_txn_id::DeleteQuery::prepare_batch(
216                &session, cfg,
217            )
218            .await?,
219            select_catalyst_id_for_stake_address:
220                catalyst_id_for_stake_address::PrimaryKeyQuery::prepare(&session).await?,
221            delete_catalyst_id_for_stake_address:
222                catalyst_id_for_stake_address::DeleteQuery::prepare_batch(&session, cfg).await?,
223            select_catalyst_id_for_public_key:
224                catalyst_id_for_public_key::PrimaryKeyQuery::prepare(&session).await?,
225            delete_catalyst_id_for_public_key:
226                catalyst_id_for_public_key::DeleteQuery::prepare_batch(&session, cfg).await?,
227        })
228    }
229
230    /// Prepares a statement.
231    pub(crate) async fn prepare(
232        session: Arc<Session>,
233        query: &str,
234        consistency: scylla::statement::Consistency,
235        idempotent: bool,
236    ) -> anyhow::Result<PreparedStatement> {
237        super::PreparedQueries::prepare(session, query, consistency, idempotent).await
238    }
239
240    /// Prepares all permutations of the batch from 1 to max.
241    /// It is necessary to do this because batches are pre-sized, they can not be dynamic.
242    /// Preparing the batches in advance is a very larger performance increase.
243    pub(crate) async fn prepare_batch(
244        session: Arc<Session>,
245        query: &str,
246        cfg: &cassandra_db::EnvVars,
247        consistency: scylla::statement::Consistency,
248        idempotent: bool,
249        logged: bool,
250    ) -> anyhow::Result<SizedBatch> {
251        super::PreparedQueries::prepare_batch(session, query, cfg, consistency, idempotent, logged)
252            .await
253    }
254
255    /// Executes a select query with the given parameters.
256    ///
257    /// Returns an iterator that iterates over all the result pages that the query
258    /// returns.
259    pub(crate) async fn execute_iter(
260        &self,
261        session: Arc<Session>,
262        select_query: PreparedSelectQuery,
263    ) -> anyhow::Result<QueryPager> {
264        let prepared_stmt = match select_query {
265            PreparedSelectQuery::TxoAda => &self.select_txo_ada,
266            PreparedSelectQuery::TxoAssets => &self.select_txo_assets,
267            PreparedSelectQuery::UnstakedTxoAda => &self.select_unstaked_txo_ada,
268            PreparedSelectQuery::UnstakedTxoAsset => &self.select_unstaked_txo_assets,
269            PreparedSelectQuery::Txi => &self.select_txi_by_hash,
270            PreparedSelectQuery::StakeRegistration => &self.select_stake_registration,
271            PreparedSelectQuery::Cip36Registration => &self.select_cip36_registration,
272            PreparedSelectQuery::Cip36RegistrationInvalid => {
273                &self.select_cip36_registration_invalid
274            },
275            PreparedSelectQuery::Cip36RegistrationForVoteKey => {
276                &self.select_cip36_registration_for_vote_key
277            },
278            PreparedSelectQuery::Rbac509 => &self.select_rbac509_registration,
279            PreparedSelectQuery::Rbac509Invalid => &self.select_rbac509_invalid_registration,
280            PreparedSelectQuery::CatalystIdForTxnId => &self.select_catalyst_id_for_txn_id,
281            PreparedSelectQuery::CatalystIdForStakeAddress => {
282                &self.select_catalyst_id_for_stake_address
283            },
284            PreparedSelectQuery::CatalystIdForPublicKey => &self.select_catalyst_id_for_public_key,
285        };
286
287        super::session_execute_iter(session, prepared_stmt, NO_PARAMS).await
288    }
289
290    /// Execute a purge query with the given parameters.
291    pub(crate) async fn execute_batch<T: SerializeRow + Debug>(
292        &self,
293        session: Arc<Session>,
294        cfg: Arc<cassandra_db::EnvVars>,
295        query: PreparedDeleteQuery,
296        values: Vec<T>,
297    ) -> FallibleQueryResults {
298        let query_map = match query {
299            PreparedDeleteQuery::TxoAda => &self.delete_txo_ada,
300            PreparedDeleteQuery::TxoAssets => &self.delete_txo_assets,
301            PreparedDeleteQuery::UnstakedTxoAda => &self.delete_unstaked_txo_ada,
302            PreparedDeleteQuery::UnstakedTxoAsset => &self.delete_unstaked_txo_assets,
303            PreparedDeleteQuery::Txi => &self.delete_txi_by_hash,
304            PreparedDeleteQuery::StakeRegistration => &self.delete_stake_registration,
305            PreparedDeleteQuery::Cip36Registration => &self.delete_cip36_registration,
306            PreparedDeleteQuery::Cip36RegistrationInvalid => {
307                &self.delete_cip36_registration_invalid
308            },
309            PreparedDeleteQuery::Cip36RegistrationForVoteKey => {
310                &self.delete_cip36_registration_for_vote_key
311            },
312            PreparedDeleteQuery::Rbac509 => &self.delete_rbac509_registration,
313            PreparedDeleteQuery::Rbac509Invalid => &self.delete_rbac509_invalid_registration,
314            PreparedDeleteQuery::CatalystIdForTxnId => &self.delete_catalyst_id_for_txn_id,
315            PreparedDeleteQuery::CatalystIdForStakeAddress => {
316                &self.delete_catalyst_id_for_stake_address
317            },
318            PreparedDeleteQuery::CatalystIdForPublicKey => &self.delete_catalyst_id_for_public_key,
319        };
320
321        super::session_execute_batch(session, query_map, cfg, query, values).await
322    }
323}