cat_gateway/db/index/block/
roll_forward.rs

1//! Immutable Roll Forward logic.
2
3use std::sync::Arc;
4
5use cardano_chain_follower::Slot;
6use futures::StreamExt;
7
8use crate::db::index::{
9    block::CassandraSession,
10    queries::{
11        purge,
12        rbac::{
13            get_catalyst_id_from_public_key::invalidate_public_keys_cache,
14            get_catalyst_id_from_stake_address::invalidate_stake_addresses_cache,
15            get_catalyst_id_from_transaction_id::invalidate_transactions_ids_cache,
16        },
17    },
18};
19
20/// Purge condition option
21#[derive(Debug, Clone, Copy, PartialEq)]
22pub(crate) enum PurgeCondition {
23    /// Purge all data before the provided slot number (including)
24    PurgeBackwards(Slot),
25    /// Purge all data after the provided slot number (including)
26    PurgeForwards(Slot),
27}
28
29impl PurgeCondition {
30    /// A filtering condition of the `PurgeOption` and provided `slot` value
31    fn filter(
32        &self,
33        slot: Slot,
34    ) -> bool {
35        match self {
36            Self::PurgeBackwards(purge_to_slot) => &slot <= purge_to_slot,
37            Self::PurgeForwards(purge_to_slot) => &slot >= purge_to_slot,
38        }
39    }
40}
41
42/// Purge cardano Live Index data from the volatile db session
43pub(crate) async fn purge_live_index(purge_condition: PurgeCondition) -> anyhow::Result<()> {
44    let persistent = false; // get volatile session
45    let Some(session) = CassandraSession::get(persistent) else {
46        anyhow::bail!("Failed to acquire db session");
47    };
48
49    purge_txi_by_hash(&session, purge_condition).await?;
50    purge_cip36_registration(&session, purge_condition).await?;
51    purge_cip36_registration_for_vote_key(&session, purge_condition).await?;
52    purge_cip36_registration_invalid(&session, purge_condition).await?;
53    purge_rbac509_registration(&session, purge_condition).await?;
54    purge_invalid_rbac509_registration(&session, purge_condition).await?;
55    purge_catalyst_id_for_stake_address(&session, purge_condition).await?;
56    purge_catalyst_id_for_txn_id(&session, purge_condition).await?;
57    purge_catalyst_id_for_public_key(&session, purge_condition).await?;
58    purge_stake_registration(&session, purge_condition).await?;
59    purge_txo_ada(&session, purge_condition).await?;
60    purge_txo_assets(&session, purge_condition).await?;
61    purge_unstaked_txo_ada(&session, purge_condition).await?;
62    purge_unstaked_txo_assets(&session, purge_condition).await?;
63
64    Ok(())
65}
66
67/// Purge data from `txi_by_hash`.
68async fn purge_txi_by_hash(
69    session: &Arc<CassandraSession>,
70    purge_condition: PurgeCondition,
71) -> anyhow::Result<()> {
72    use purge::txi_by_hash::{DeleteQuery, Params, PrimaryKeyQuery};
73
74    // Get all keys
75    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
76    // Filter
77    let mut delete_params: Vec<Params> = Vec::new();
78    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
79        if purge_condition.filter(primary_key.2.into()) {
80            let params: Params = primary_key.into();
81            delete_params.push(params);
82        }
83    }
84    // Delete filtered keys
85    DeleteQuery::execute(session, delete_params).await?;
86    Ok(())
87}
88
89/// Purge data from `cip36_registration`.
90async fn purge_cip36_registration(
91    session: &Arc<CassandraSession>,
92    purge_condition: PurgeCondition,
93) -> anyhow::Result<()> {
94    use purge::cip36_registration::{DeleteQuery, Params, PrimaryKeyQuery};
95
96    // Get all keys
97    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
98    // Filter
99    let mut delete_params: Vec<Params> = Vec::new();
100    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
101        let params: Params = primary_key.into();
102        if purge_condition.filter(params.slot_no.into()) {
103            delete_params.push(params);
104        }
105    }
106    // Delete filtered keys
107    DeleteQuery::execute(session, delete_params).await?;
108    Ok(())
109}
110
111/// Purge data from `cip36_registration_for_vote_key`.
112async fn purge_cip36_registration_for_vote_key(
113    session: &Arc<CassandraSession>,
114    purge_condition: PurgeCondition,
115) -> anyhow::Result<()> {
116    use purge::cip36_registration_for_vote_key::{DeleteQuery, Params, PrimaryKeyQuery};
117
118    // Get all keys
119    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
120    // Filter
121    let mut delete_params: Vec<Params> = Vec::new();
122    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
123        let params: Params = primary_key.into();
124        if purge_condition.filter(params.slot_no.into()) {
125            delete_params.push(params);
126        }
127    }
128    // Delete filtered keys
129    DeleteQuery::execute(session, delete_params).await?;
130    Ok(())
131}
132
133/// Purge data from `cip36_registration_invalid`.
134async fn purge_cip36_registration_invalid(
135    session: &Arc<CassandraSession>,
136    purge_condition: PurgeCondition,
137) -> anyhow::Result<()> {
138    use purge::cip36_registration_invalid::{DeleteQuery, Params, PrimaryKeyQuery};
139
140    // Get all keys
141    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
142    // Filter
143    let mut delete_params: Vec<Params> = Vec::new();
144    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
145        let params: Params = primary_key.into();
146        if purge_condition.filter(params.slot_no.into()) {
147            delete_params.push(params);
148        }
149    }
150    // Delete filtered keys
151    DeleteQuery::execute(session, delete_params).await?;
152    Ok(())
153}
154
155/// Purge data from `rbac509_registration`.
156async fn purge_rbac509_registration(
157    session: &Arc<CassandraSession>,
158    purge_condition: PurgeCondition,
159) -> anyhow::Result<()> {
160    use purge::rbac509_registration::{DeleteQuery, Params, PrimaryKeyQuery};
161
162    // Get all keys
163    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
164    // Filter
165    let mut delete_params: Vec<Params> = Vec::new();
166    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
167        if purge_condition.filter(primary_key.1.into()) {
168            delete_params.push(primary_key.into());
169        }
170    }
171    // Delete filtered keys
172    DeleteQuery::execute(session, delete_params).await?;
173    Ok(())
174}
175
176/// Purges the data from `rbac509_invalid_registration`.
177async fn purge_invalid_rbac509_registration(
178    session: &Arc<CassandraSession>,
179    purge_condition: PurgeCondition,
180) -> anyhow::Result<()> {
181    use purge::rbac509_invalid_registration::{DeleteQuery, Params, PrimaryKeyQuery};
182
183    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
184    let mut delete_params: Vec<Params> = Vec::new();
185    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
186        if purge_condition.filter(primary_key.2.into()) {
187            delete_params.push(primary_key.into());
188        }
189    }
190
191    DeleteQuery::execute(session, delete_params).await?;
192    Ok(())
193}
194
195/// Purges the data from the `catalyst_id_for_stake_address` table.
196async fn purge_catalyst_id_for_stake_address(
197    session: &Arc<CassandraSession>,
198    purge_condition: PurgeCondition,
199) -> anyhow::Result<()> {
200    use purge::catalyst_id_for_stake_address::{DeleteQuery, Params, PrimaryKeyQuery};
201
202    invalidate_stake_addresses_cache(false);
203
204    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
205
206    let mut delete_params: Vec<Params> = Vec::new();
207    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
208        if purge_condition.filter(primary_key.1.into()) {
209            delete_params.push(primary_key.into());
210        }
211    }
212
213    DeleteQuery::execute(session, delete_params).await?;
214    Ok(())
215}
216
217/// Purges the data from the `catalyst_id_for_txn_id` table.
218async fn purge_catalyst_id_for_txn_id(
219    session: &Arc<CassandraSession>,
220    purge_condition: PurgeCondition,
221) -> anyhow::Result<()> {
222    use purge::catalyst_id_for_txn_id::{DeleteQuery, Params, PrimaryKeyQuery};
223
224    invalidate_transactions_ids_cache(false);
225
226    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
227
228    let mut delete_params: Vec<Params> = Vec::new();
229    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
230        if purge_condition.filter(primary_key.1.into()) {
231            delete_params.push(primary_key.into());
232        }
233    }
234
235    DeleteQuery::execute(session, delete_params).await?;
236    Ok(())
237}
238
239/// Purges the data from the `catalyst_id_for_public_key` table.
240async fn purge_catalyst_id_for_public_key(
241    session: &Arc<CassandraSession>,
242    purge_condition: PurgeCondition,
243) -> anyhow::Result<()> {
244    use purge::catalyst_id_for_public_key::{DeleteQuery, Params, PrimaryKeyQuery};
245
246    invalidate_public_keys_cache(false);
247
248    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
249
250    let mut delete_params: Vec<Params> = Vec::new();
251    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
252        if purge_condition.filter(primary_key.1.into()) {
253            delete_params.push(primary_key.into());
254        }
255    }
256
257    DeleteQuery::execute(session, delete_params).await?;
258    Ok(())
259}
260
261/// Purge data from `stake_registration`.
262async fn purge_stake_registration(
263    session: &Arc<CassandraSession>,
264    purge_condition: PurgeCondition,
265) -> anyhow::Result<()> {
266    use purge::stake_registration::{DeleteQuery, Params, PrimaryKeyQuery};
267
268    // Get all keys
269    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
270    // Filter
271    let mut delete_params: Vec<Params> = Vec::new();
272    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
273        let params: Params = primary_key.into();
274        if purge_condition.filter(params.slot_no.into()) {
275            delete_params.push(params);
276        }
277    }
278    // Delete filtered keys
279    DeleteQuery::execute(session, delete_params).await?;
280    Ok(())
281}
282
283/// Purge data from `txo_ada`.
284async fn purge_txo_ada(
285    session: &Arc<CassandraSession>,
286    purge_condition: PurgeCondition,
287) -> anyhow::Result<()> {
288    use purge::txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
289
290    // Get all keys
291    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
292    // Filter
293    let mut delete_params: Vec<Params> = Vec::new();
294    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
295        let params: Params = primary_key.into();
296        if purge_condition.filter(params.slot_no.into()) {
297            delete_params.push(params);
298        }
299    }
300    // Delete filtered keys
301    DeleteQuery::execute(session, delete_params).await?;
302    Ok(())
303}
304
305/// Purge data from `txo_assets`.
306async fn purge_txo_assets(
307    session: &Arc<CassandraSession>,
308    purge_condition: PurgeCondition,
309) -> anyhow::Result<()> {
310    use purge::txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
311
312    // Get all keys
313    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
314    // Filter
315    let mut delete_params: Vec<Params> = Vec::new();
316    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
317        let params: Params = primary_key.into();
318        if purge_condition.filter(params.slot_no.into()) {
319            delete_params.push(params);
320        }
321    }
322    // Delete filtered keys
323    DeleteQuery::execute(session, delete_params).await?;
324    Ok(())
325}
326
327/// Purge data from `unstaked_txo_ada`.
328async fn purge_unstaked_txo_ada(
329    session: &Arc<CassandraSession>,
330    purge_condition: PurgeCondition,
331) -> anyhow::Result<()> {
332    use purge::unstaked_txo_ada::{DeleteQuery, Params, PrimaryKeyQuery};
333
334    // Get all keys
335    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
336    // Filter
337    let mut delete_params: Vec<Params> = Vec::new();
338    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
339        if purge_condition.filter(primary_key.2.clone().into()) {
340            let params: Params = primary_key.into();
341            delete_params.push(params);
342        }
343    }
344    // Delete filtered keys
345    DeleteQuery::execute(session, delete_params).await?;
346    Ok(())
347}
348
349/// Purge data from `unstaked_txo_assets`.
350async fn purge_unstaked_txo_assets(
351    session: &Arc<CassandraSession>,
352    purge_condition: PurgeCondition,
353) -> anyhow::Result<()> {
354    use purge::unstaked_txo_assets::{DeleteQuery, Params, PrimaryKeyQuery};
355
356    // Get all keys
357    let mut primary_keys_stream = PrimaryKeyQuery::execute(session).await?;
358    // Filter
359    let mut delete_params: Vec<Params> = Vec::new();
360    while let Some(Ok(primary_key)) = primary_keys_stream.next().await {
361        if purge_condition.filter(primary_key.4.clone().into()) {
362            let params: Params = primary_key.into();
363            delete_params.push(params);
364        }
365    }
366    // Delete filtered keys
367    DeleteQuery::execute(session, delete_params).await?;
368    Ok(())
369}