cat_gateway/db/index/block/
roll_forward.rs1use std::sync::Arc;
4
5use cardano_chain_follower::Slot;
6use futures::StreamExt;
7
8use crate::db::index::{
9 block::CassandraSession,
10 queries::{
11 purge,
12 rbac::{
13 get_catalyst_id_from_public_key::invalidate_public_keys_cache,
14 get_catalyst_id_from_stake_address::invalidate_stake_addresses_cache,
15 get_catalyst_id_from_transaction_id::invalidate_transactions_ids_cache,
16 },
17 },
18};
19
20#[derive(Debug, Clone, Copy, PartialEq)]
22pub(crate) enum PurgeCondition {
23 PurgeBackwards(Slot),
25 PurgeForwards(Slot),
27}
28
29impl PurgeCondition {
30 fn filter(
32 &self,
33 slot: Slot,
34 ) -> bool {
35 match self {
36 Self::PurgeBackwards(purge_to_slot) => &slot <= purge_to_slot,
37 Self::PurgeForwards(purge_to_slot) => &slot >= purge_to_slot,
38 }
39 }
40}
41
42pub(crate) async fn purge_live_index(purge_condition: PurgeCondition) -> anyhow::Result<()> {
44 let persistent = false; let Some(session) = CassandraSession::get(persistent) else {
46 anyhow::bail!("Failed to acquire db session");
47 };
48
49 purge_txi_by_hash(&session, purge_condition).await?;
50 purge_cip36_registration(&session, purge_condition).await?;
51 purge_cip36_registration_for_vote_key(&session, purge_condition).await?;
52 purge_cip36_registration_invalid(&session, purge_condition).await?;
53 purge_rbac509_registration(&session, purge_condition).await?;
54 purge_invalid_rbac509_registration(&session, purge_condition).await?;
55 purge_catalyst_id_for_stake_address(&session, purge_condition).await?;
56 purge_catalyst_id_for_txn_id(&session, purge_condition).await?;
57 purge_catalyst_id_for_public_key(&session, purge_condition).await?;
58 purge_stake_registration(&session, purge_condition).await?;
59 purge_txo_ada(&session, purge_condition).await?;
60 purge_txo_assets(&session, purge_condition).await?;
61 purge_unstaked_txo_ada(&session, purge_condition).await?;
62 purge_unstaked_txo_assets(&session, purge_condition).await?;
63
64 Ok(())
65}
66
67async fn purge_txi_by_hash(
69 session: &Arc<CassandraSession>,
70 purge_condition: PurgeCondition,
71) -> anyhow::Result<()> {
72 use purge::txi_by_hash::{DeleteQuery, Params, PrimaryKeyQuery};
73
74 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
76 let mut delete_params: Vec<Params> = Vec::new();
78 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
79 if purge_condition.filter(primary_key.2.into()) {
80 let params: Params = primary_key.into();
81 delete_params.push(params);
82 }
83 }
84 DeleteQuery::execute(session, delete_params).await?;
86 Ok(())
87}
88
89async fn purge_cip36_registration(
91 session: &Arc<CassandraSession>,
92 purge_condition: PurgeCondition,
93) -> anyhow::Result<()> {
94 use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery};
95
96 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
98 let mut delete_params: Vec<Params> = Vec::new();
100 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
101 let params: Params = primary_key.into();
102 if purge_condition.filter(params.slot_no.into()) {
103 delete_params.push(params);
104 }
105 }
106 DeleteQuery::execute(session, delete_params).await?;
108 Ok(())
109}
110
111async fn purge_cip36_registration_for_vote_key(
113 session: &Arc<CassandraSession>,
114 purge_condition: PurgeCondition,
115) -> anyhow::Result<()> {
116 use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery};
117
118 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
120 let mut delete_params: Vec<Params> = Vec::new();
122 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
123 let params: Params = primary_key.into();
124 if purge_condition.filter(params.slot_no.into()) {
125 delete_params.push(params);
126 }
127 }
128 DeleteQuery::execute(session, delete_params).await?;
130 Ok(())
131}
132
133async fn purge_cip36_registration_invalid(
135 session: &Arc<CassandraSession>,
136 purge_condition: PurgeCondition,
137) -> anyhow::Result<()> {
138 use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery};
139
140 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
142 let mut delete_params: Vec<Params> = Vec::new();
144 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
145 let params: Params = primary_key.into();
146 if purge_condition.filter(params.slot_no.into()) {
147 delete_params.push(params);
148 }
149 }
150 DeleteQuery::execute(session, delete_params).await?;
152 Ok(())
153}
154
155async fn purge_rbac509_registration(
157 session: &Arc<CassandraSession>,
158 purge_condition: PurgeCondition,
159) -> anyhow::Result<()> {
160 use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery};
161
162 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
164 let mut delete_params: Vec<Params> = Vec::new();
166 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
167 if purge_condition.filter(primary_key.1.into()) {
168 delete_params.push(primary_key.into());
169 }
170 }
171 DeleteQuery::execute(session, delete_params).await?;
173 Ok(())
174}
175
176async fn purge_invalid_rbac509_registration(
178 session: &Arc<CassandraSession>,
179 purge_condition: PurgeCondition,
180) -> anyhow::Result<()> {
181 use purge::rbac509_invalid_registration::{DeleteQuery, Params, PrimaryKeyQuery};
182
183 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
184 let mut delete_params: Vec<Params> = Vec::new();
185 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
186 if purge_condition.filter(primary_key.2.into()) {
187 delete_params.push(primary_key.into());
188 }
189 }
190
191 DeleteQuery::execute(session, delete_params).await?;
192 Ok(())
193}
194
195async fn purge_catalyst_id_for_stake_address(
197 session: &Arc<CassandraSession>,
198 purge_condition: PurgeCondition,
199) -> anyhow::Result<()> {
200 use purge::catalyst_id_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery};
201
202 invalidate_stake_addresses_cache(false);
203
204 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
205
206 let mut delete_params: Vec<Params> = Vec::new();
207 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
208 if purge_condition.filter(primary_key.1.into()) {
209 delete_params.push(primary_key.into());
210 }
211 }
212
213 DeleteQuery::execute(session, delete_params).await?;
214 Ok(())
215}
216
217async fn purge_catalyst_id_for_txn_id(
219 session: &Arc<CassandraSession>,
220 purge_condition: PurgeCondition,
221) -> anyhow::Result<()> {
222 use purge::catalyst_id_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery};
223
224 invalidate_transactions_ids_cache(false);
225
226 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
227
228 let mut delete_params: Vec<Params> = Vec::new();
229 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
230 if purge_condition.filter(primary_key.1.into()) {
231 delete_params.push(primary_key.into());
232 }
233 }
234
235 DeleteQuery::execute(session, delete_params).await?;
236 Ok(())
237}
238
239async fn purge_catalyst_id_for_public_key(
241 session: &Arc<CassandraSession>,
242 purge_condition: PurgeCondition,
243) -> anyhow::Result<()> {
244 use purge::catalyst_id_for_public_key::{DeleteQuery, Params, PrimaryKeyQuery};
245
246 invalidate_public_keys_cache(false);
247
248 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
249
250 let mut delete_params: Vec<Params> = Vec::new();
251 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
252 if purge_condition.filter(primary_key.1.into()) {
253 delete_params.push(primary_key.into());
254 }
255 }
256
257 DeleteQuery::execute(session, delete_params).await?;
258 Ok(())
259}
260
261async fn purge_stake_registration(
263 session: &Arc<CassandraSession>,
264 purge_condition: PurgeCondition,
265) -> anyhow::Result<()> {
266 use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery};
267
268 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
270 let mut delete_params: Vec<Params> = Vec::new();
272 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
273 let params: Params = primary_key.into();
274 if purge_condition.filter(params.slot_no.into()) {
275 delete_params.push(params);
276 }
277 }
278 DeleteQuery::execute(session, delete_params).await?;
280 Ok(())
281}
282
283async fn purge_txo_ada(
285 session: &Arc<CassandraSession>,
286 purge_condition: PurgeCondition,
287) -> anyhow::Result<()> {
288 use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
289
290 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
292 let mut delete_params: Vec<Params> = Vec::new();
294 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
295 let params: Params = primary_key.into();
296 if purge_condition.filter(params.slot_no.into()) {
297 delete_params.push(params);
298 }
299 }
300 DeleteQuery::execute(session, delete_params).await?;
302 Ok(())
303}
304
305async fn purge_txo_assets(
307 session: &Arc<CassandraSession>,
308 purge_condition: PurgeCondition,
309) -> anyhow::Result<()> {
310 use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
311
312 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
314 let mut delete_params: Vec<Params> = Vec::new();
316 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
317 let params: Params = primary_key.into();
318 if purge_condition.filter(params.slot_no.into()) {
319 delete_params.push(params);
320 }
321 }
322 DeleteQuery::execute(session, delete_params).await?;
324 Ok(())
325}
326
327async fn purge_unstaked_txo_ada(
329 session: &Arc<CassandraSession>,
330 purge_condition: PurgeCondition,
331) -> anyhow::Result<()> {
332 use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
333
334 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
336 let mut delete_params: Vec<Params> = Vec::new();
338 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
339 if purge_condition.filter(primary_key.2.clone().into()) {
340 let params: Params = primary_key.into();
341 delete_params.push(params);
342 }
343 }
344 DeleteQuery::execute(session, delete_params).await?;
346 Ok(())
347}
348
349async fn purge_unstaked_txo_assets(
351 session: &Arc<CassandraSession>,
352 purge_condition: PurgeCondition,
353) -> anyhow::Result<()> {
354 use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
355
356 let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
358 let mut delete_params: Vec<Params> = Vec::new();
360 while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
361 if purge_condition.filter(primary_key.4.clone().into()) {
362 let params: Params = primary_key.into();
363 delete_params.push(params);
364 }
365 }
366 DeleteQuery::execute(session, delete_params).await?;
368 Ok(())
369}