cat_gateway/db/index/queries/purge/
rbac509_registration.rs1use std::{fmt::Debug, sync::Arc};
3
4use scylla::{
5 client::{pager::TypedRowStream, session::Session},
6 statement::prepared::PreparedStatement,
7 SerializeRow,
8};
9use tracing::error;
10
11use crate::{
12 db::{
13 index::{
14 queries::{
15 purge::{PreparedDeleteQuery, PreparedQueries, PreparedSelectQuery},
16 FallibleQueryResults, SizedBatch,
17 },
18 session::CassandraSession,
19 },
20 types::{DbCatalystId, DbSlot, DbTxnIndex},
21 },
22 settings::cassandra_db,
23};
24
25pub(crate) mod result {
26 use crate::db::types::{DbCatalystId, DbSlot, DbTxnIndex};
29
30 pub(crate) type PrimaryKey = (DbCatalystId, DbSlot, DbTxnIndex);
32}
33
34const SELECT_QUERY: &str = include_str!("cql/get_rbac_registration.cql");
36
37#[derive(SerializeRow)]
39pub(crate) struct Params {
40 pub catalyst_id: DbCatalystId,
42 pub slot_no: DbSlot,
44 pub txn_index: DbTxnIndex,
46}
47
48impl Debug for Params {
49 fn fmt(
50 &self,
51 f: &mut std::fmt::Formatter<'_>,
52 ) -> std::fmt::Result {
53 f.debug_struct("Params")
54 .field("catalyst_id", &self.catalyst_id)
55 .field("slot_no", &self.slot_no)
56 .field("txn_index", &self.txn_index)
57 .finish()
58 }
59}
60
61impl From<result::PrimaryKey> for Params {
62 fn from(value: result::PrimaryKey) -> Self {
63 Self {
64 catalyst_id: value.0,
65 slot_no: value.1,
66 txn_index: value.2,
67 }
68 }
69}
70pub(crate) struct PrimaryKeyQuery;
72
73impl PrimaryKeyQuery {
74 pub(crate) async fn prepare(session: &Arc<Session>) -> anyhow::Result<PreparedStatement> {
76 PreparedQueries::prepare(
77 session.clone(),
78 SELECT_QUERY,
79 scylla::statement::Consistency::All,
80 true,
81 )
82 .await
83 .inspect_err(
84 |error| error!(error=%error, "Failed to prepare get RBAC 509 registration primary key query."),
85 )
86 .map_err(|error| anyhow::anyhow!("{error}\n--\n{SELECT_QUERY}"))
87 }
88
89 pub(crate) async fn execute(
91 session: &CassandraSession
92 ) -> anyhow::Result<TypedRowStream<result::PrimaryKey>> {
93 let iter = session
94 .purge_execute_iter(PreparedSelectQuery::Rbac509)
95 .await?
96 .rows_stream::<result::PrimaryKey>()?;
97
98 Ok(iter)
99 }
100}
101
102const DELETE_QUERY: &str = include_str!("cql/delete_rbac_registration.cql");
104
105pub(crate) struct DeleteQuery;
107
108impl DeleteQuery {
109 pub(crate) async fn prepare_batch(
111 session: &Arc<Session>,
112 cfg: &cassandra_db::EnvVars,
113 ) -> anyhow::Result<SizedBatch> {
114 PreparedQueries::prepare_batch(
115 session.clone(),
116 DELETE_QUERY,
117 cfg,
118 scylla::statement::Consistency::Any,
119 true,
120 false,
121 )
122 .await
123 .inspect_err(
124 |error| error!(error=%error, "Failed to prepare delete RBAC 509 registration primary key query."),
125 )
126 .map_err(|error| anyhow::anyhow!("{error}\n--\n{DELETE_QUERY}"))
127 }
128
129 pub(crate) async fn execute(
131 session: &CassandraSession,
132 params: Vec<Params>,
133 ) -> FallibleQueryResults {
134 let results = session
135 .purge_execute_batch(PreparedDeleteQuery::Rbac509, params)
136 .await?;
137
138 Ok(results)
139 }
140}