cat_gateway/db/index/block/rbac509/
mod.rs

1//! Index Role-Based Access Control (RBAC) Registration.
2
3pub(crate) mod insert_catalyst_id_for_public_key;
4pub(crate) mod insert_catalyst_id_for_stake_address;
5pub(crate) mod insert_catalyst_id_for_txn_id;
6pub(crate) mod insert_rbac509;
7pub(crate) mod insert_rbac509_invalid;
8
9use std::{
10    collections::{BTreeSet, HashMap, HashSet},
11    sync::Arc,
12};
13
14use anyhow::Context;
15use cardano_chain_follower::{MultiEraBlock, Slot, StakeAddress, TxnIndex, hashes::TransactionId};
16use catalyst_types::{catalyst_id::CatalystId, problem_report::ProblemReport, uuid::UuidV4};
17use ed25519_dalek::VerifyingKey;
18use futures::{StreamExt, TryStreamExt};
19use rbac_registration::{
20    cardano::{cip509::Cip509, provider::RbacChainsProvider as _},
21    registration::cardano::RegistrationChain,
22};
23use scylla::client::session::Session;
24use tokio::sync::watch;
25use tracing::{debug, error};
26
27use crate::{
28    db::index::{
29        queries::{FallibleQueryTasks, SizedBatch},
30        session::CassandraSession,
31    },
32    metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
33    rbac::{RbacBlockIndexingContext, cache_persistent_rbac_chain, provider::RbacChainsProvider},
34    settings::cassandra_db::EnvVars,
35};
36
37/// Index RBAC 509 Registration Query Parameters
38#[derive(Debug)]
39pub(crate) struct Rbac509InsertQuery {
40    /// RBAC Registration Data captured during indexing.
41    pub(crate) registrations: Vec<insert_rbac509::Params>,
42    /// An invalid RBAC registration data.
43    pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
44    /// A Catalyst ID for transaction ID Data captured during indexing.
45    catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
46    /// A Catalyst ID for stake address data captured during indexing.
47    catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
48    /// A Catalyst ID for public key data captured during indexing.
49    catalyst_id_for_public_key: Vec<insert_catalyst_id_for_public_key::Params>,
50}
51
52impl Rbac509InsertQuery {
53    /// Creates new data set for RBAC 509 Registrations Insert Query Batch.
54    pub(crate) fn new() -> Self {
55        Rbac509InsertQuery {
56            registrations: Vec::new(),
57            invalid: Vec::new(),
58            catalyst_id_for_txn_id: Vec::new(),
59            catalyst_id_for_stake_address: Vec::new(),
60            catalyst_id_for_public_key: Vec::new(),
61        }
62    }
63
64    /// Prepare Batch of Insert RBAC 509 Registration Data Queries
65    pub(crate) async fn prepare_batch(
66        session: &Arc<Session>,
67        cfg: &EnvVars,
68    ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
69        Ok((
70            insert_rbac509::Params::prepare_batch(session, cfg).await?,
71            insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
72            insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
73            insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
74            insert_catalyst_id_for_public_key::Params::prepare_batch(session, cfg).await?,
75        ))
76    }
77
78    /// Index the RBAC 509 registrations in a transaction.
79    pub(crate) async fn index(
80        &mut self,
81        txn_hash: TransactionId,
82        index: TxnIndex,
83        block: &MultiEraBlock,
84        pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
85        our_end: Slot,
86        context: &mut RbacBlockIndexingContext,
87    ) -> anyhow::Result<()> {
88        let slot = block.slot();
89        let cip509 = match Cip509::new(block, index, &[]) {
90            Ok(Some(v)) => v,
91            Ok(None) => {
92                // Nothing to index.
93                return Ok(());
94            },
95            Err(e) => {
96                // This registration is either completely corrupted or someone else is using "our"
97                // label (`MetadatumLabel::CIP509_RBAC`). We don't want to index it even as
98                // incorrect.
99                debug!(
100                    slot = ?slot,
101                    index = ?index,
102                    err = ?e,
103                    "Invalid RBAC Registration Metadata in transaction"
104                );
105                return Ok(());
106            },
107        };
108
109        // This should never happen, but let's check anyway.
110        if slot != cip509.origin().point().slot_or_default() {
111            error!(
112                "Cip509 slot mismatch: expected {slot:?}, got {:?}",
113                cip509.origin().point().slot_or_default()
114            );
115        }
116        if txn_hash != cip509.txn_hash() {
117            error!(
118                "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
119                cip509.txn_hash()
120            );
121        }
122
123        // To properly validate a new registration we need to index all the previous blocks, so
124        // here we are going to wait till the other tasks have indexed the blocks before this one.
125        wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?;
126
127        if let Some(previous_txn) = cip509.previous_transaction() {
128            self.try_record_chain_update(&cip509, previous_txn, block.is_immutable(), context)
129                .await?;
130        } else {
131            self.try_record_new_chain(&cip509, block.is_immutable(), context)
132                .await?;
133        }
134
135        Ok(())
136    }
137
138    /// Validating the provided registration as an update for an existing chain, preparing
139    /// corresponding updates
140    async fn try_record_chain_update(
141        &mut self,
142        reg: &Cip509,
143        previous_txn: TransactionId,
144        is_persistent: bool,
145        context: &mut RbacBlockIndexingContext,
146    ) -> anyhow::Result<()> {
147        // Find a chain this registration belongs to.
148        let state = RbacChainsProvider::new(is_persistent, context);
149
150        let slot = reg.origin().point().slot_or_default();
151        let txn_index = reg.origin().txn_index();
152        let txn_hash = reg.txn_hash();
153
154        let Some(catalyst_id) = state.catalyst_id_from_txn_id(previous_txn).await? else {
155            // This isn't a hard error because user input can contain invalid information.
156            // If there is no Catalyst ID, then we cannot record this
157            // registration as invalid and can only ignore (and log) it.
158            debug!(
159                slot = ?slot,
160                txn_index = ?txn_index,
161                txn_hash = ?txn_hash,
162                "Unable to determine Catalyst id for registration"
163            );
164            return Ok(());
165        };
166
167        let chain = state.chain(&catalyst_id).await?.context(format!(
168                "{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'"
169        ))?;
170
171        // Try to add a new registration to the chain.
172        let Some(new_chain) = chain.update(reg, &state).await? else {
173            self.record_invalid_registration(
174                txn_hash,
175                txn_index,
176                slot,
177                chain.catalyst_id().clone(),
178                Some(previous_txn),
179                reg.purpose(),
180                reg.report().clone(),
181            );
182
183            return Ok(());
184        };
185
186        let previous_addresses = chain.stake_addresses();
187        let new_addresses = new_chain.stake_addresses();
188        let added_addresses: HashSet<_> = new_addresses
189            .difference(&previous_addresses)
190            .cloned()
191            .collect();
192        let removed_addresses: HashSet<_> = previous_addresses
193            .difference(&new_addresses)
194            .cloned()
195            .collect();
196        let public_keys = reg
197            .all_roles()
198            .iter()
199            .filter_map(|v| reg.signing_public_key_for_role(*v))
200            .collect::<HashSet<_>>();
201        // During the chain update, it cannot be an any stake addresses updates
202        let modified_chains = HashMap::new();
203        let purpose = reg.purpose();
204
205        if is_persistent {
206            cache_persistent_rbac_chain(catalyst_id.clone(), new_chain);
207        }
208
209        self.record_valid_registration(
210            txn_hash,
211            txn_index,
212            slot,
213            catalyst_id.clone(),
214            Some(previous_txn),
215            &added_addresses,
216            &removed_addresses,
217            public_keys,
218            modified_chains,
219            purpose,
220            context,
221        );
222
223        Ok(())
224    }
225
226    /// Validating the provided registration as a beginning of a new chain, preparing
227    /// corresponding updates
228    async fn try_record_new_chain(
229        &mut self,
230        reg: &Cip509,
231        is_persistent: bool,
232        context: &mut RbacBlockIndexingContext,
233    ) -> anyhow::Result<()> {
234        /// Size of the buffered list adaptor of futures.
235        const FUTURES_BUFFER_SIZE: usize = 10;
236
237        let provider = RbacChainsProvider::new(is_persistent, context);
238
239        let slot = reg.origin().point().slot_or_default();
240        let txn_index = reg.origin().txn_index();
241        let txn_hash = reg.txn_hash();
242
243        // Try to start a new chain.
244        let Some(new_chain) = RegistrationChain::new(reg, &provider).await? else {
245            if let Some(cat_id) = reg.catalyst_id() {
246                self.record_invalid_registration(
247                    txn_hash,
248                    txn_index,
249                    slot,
250                    cat_id.clone(),
251                    None,
252                    reg.purpose(),
253                    reg.report().clone(),
254                );
255            } else {
256                // This isn't a hard error because user input can contain invalid information.
257                // If there is no Catalyst ID, then we cannot record this
258                // registration as invalid and can only ignore (and log) it.
259                debug!(
260                    slot = ?slot,
261                    txn_index = ?txn_index,
262                    txn_hash = ?txn_hash,
263                    "Unable to determine Catalyst id for registration"
264                );
265            }
266
267            return Ok(());
268        };
269
270        let catalyst_id = new_chain.catalyst_id();
271        let stake_addresses = new_chain.stake_addresses();
272        let public_keys = reg
273            .all_roles()
274            .iter()
275            .filter_map(|v| reg.signing_public_key_for_role(*v))
276            .collect::<HashSet<_>>();
277        let purpose = reg.purpose();
278
279        let futures = reg.stake_addresses().clone().into_iter().map(|addr| {
280            async {
281                anyhow::Ok((
282                    provider.chain_catalyst_id_from_stake_address(&addr).await?,
283                    addr,
284                ))
285            }
286        });
287        let modified_chains = futures::stream::iter(futures)
288            .buffer_unordered(FUTURES_BUFFER_SIZE)
289            .try_fold(
290                HashMap::<CatalystId, HashSet<StakeAddress>>::new(),
291                |mut acc, (cat_id, addr)| {
292                    async {
293                        if let Some(cat_id) = cat_id {
294                            acc.entry(cat_id).or_default().insert(addr);
295                        }
296                        anyhow::Ok(acc)
297                    }
298                },
299            )
300            .await?;
301
302        self.record_valid_registration(
303            txn_hash,
304            txn_index,
305            slot,
306            catalyst_id.clone(),
307            None,
308            &stake_addresses,
309            &HashSet::new(),
310            public_keys,
311            modified_chains,
312            purpose,
313            context,
314        );
315
316        Ok(())
317    }
318
319    /// Making corresponding records according to the valid registration
320    #[allow(clippy::too_many_arguments)]
321    fn record_valid_registration(
322        &mut self,
323        txn_hash: TransactionId,
324        index: TxnIndex,
325        slot: Slot,
326        catalyst_id: CatalystId,
327        previous_transaction: Option<TransactionId>,
328        added_stake_addresses: &HashSet<StakeAddress>,
329        removed_stake_addresses: &HashSet<StakeAddress>,
330        public_keys: HashSet<VerifyingKey>,
331        modified_chains: HashMap<CatalystId, HashSet<StakeAddress>>,
332        purpose: Option<UuidV4>,
333        context: &mut RbacBlockIndexingContext,
334    ) {
335        context.insert_transaction(txn_hash, catalyst_id.clone());
336        context.insert_addresses(added_stake_addresses.clone(), &catalyst_id);
337        context.remove_addresses(removed_stake_addresses);
338        context.insert_public_keys(public_keys.clone(), &catalyst_id);
339        context.insert_registration(
340            catalyst_id.clone(),
341            txn_hash,
342            slot,
343            index,
344            previous_transaction,
345        );
346
347        // Record the transaction identifier (hash) of a new registration.
348        self.catalyst_id_for_txn_id
349            .push(insert_catalyst_id_for_txn_id::Params::new(
350                catalyst_id.clone(),
351                txn_hash,
352                slot,
353            ));
354        // Record new stake addresses that are not marked as removed.
355        for address in added_stake_addresses
356            .difference(removed_stake_addresses)
357            .cloned()
358        {
359            self.catalyst_id_for_stake_address.push(
360                insert_catalyst_id_for_stake_address::Params::new(
361                    address,
362                    slot,
363                    index,
364                    catalyst_id.clone(),
365                ),
366            );
367        }
368
369        // Record new public keys.
370        for key in public_keys {
371            self.catalyst_id_for_public_key
372                .push(insert_catalyst_id_for_public_key::Params::new(
373                    key,
374                    slot,
375                    catalyst_id.clone(),
376                ));
377        }
378        // Update the chain this registration belongs to.
379        self.registrations.push(insert_rbac509::Params::new(
380            catalyst_id,
381            txn_hash,
382            slot,
383            index,
384            previous_transaction,
385            purpose,
386        ));
387
388        // Update other chains that were affected by this registration.
389        for (catalyst_id, _) in modified_chains {
390            self.registrations.push(insert_rbac509::Params::new(
391                catalyst_id.clone(),
392                txn_hash,
393                slot,
394                index,
395                // In this case the addresses were removed by another chain, so it doesn't
396                // make sense to include a previous transaction ID unrelated to the chain
397                // that is being updated.
398                None,
399                None,
400            ));
401        }
402    }
403
404    /// Making corresponding records according to the valid registration
405    #[allow(clippy::too_many_arguments)]
406    fn record_invalid_registration(
407        &mut self,
408        txn_hash: TransactionId,
409        index: TxnIndex,
410        slot: Slot,
411        catalyst_id: CatalystId,
412        previous_transaction: Option<TransactionId>,
413        purpose: Option<UuidV4>,
414        report: ProblemReport,
415    ) {
416        inc_invalid_rbac_reg_count();
417        self.invalid.push(insert_rbac509_invalid::Params::new(
418            catalyst_id,
419            txn_hash,
420            slot,
421            index,
422            purpose,
423            previous_transaction,
424            report,
425        ));
426    }
427
428    /// Execute the RBAC 509 Registration Indexing Queries.
429    ///
430    /// Consumes the `self` and returns a vector of futures.
431    pub(crate) fn execute(
432        self,
433        session: &Arc<CassandraSession>,
434    ) -> FallibleQueryTasks {
435        let mut query_handles: FallibleQueryTasks = Vec::new();
436
437        if !self.registrations.is_empty() {
438            let inner_session = session.clone();
439            query_handles.push(tokio::spawn(async move {
440                insert_rbac509::Params::execute_batch(&inner_session, self.registrations).await
441            }));
442        }
443
444        if !self.invalid.is_empty() {
445            let inner_session = session.clone();
446            query_handles.push(tokio::spawn(async move {
447                insert_rbac509_invalid::Params::execute_batch(&inner_session, self.invalid).await
448            }));
449        }
450
451        if !self.catalyst_id_for_txn_id.is_empty() {
452            let inner_session = session.clone();
453            query_handles.push(tokio::spawn(async move {
454                insert_catalyst_id_for_txn_id::Params::execute_batch(
455                    &inner_session,
456                    self.catalyst_id_for_txn_id,
457                )
458                .await
459            }));
460        }
461
462        if !self.catalyst_id_for_stake_address.is_empty() {
463            let inner_session = session.clone();
464            query_handles.push(tokio::spawn(async move {
465                insert_catalyst_id_for_stake_address::Params::execute_batch(
466                    &inner_session,
467                    self.catalyst_id_for_stake_address,
468                )
469                .await
470            }));
471        }
472
473        if !self.catalyst_id_for_public_key.is_empty() {
474            let inner_session = session.clone();
475            query_handles.push(tokio::spawn(async move {
476                insert_catalyst_id_for_public_key::Params::execute_batch(
477                    &inner_session,
478                    self.catalyst_id_for_public_key,
479                )
480                .await
481            }));
482        }
483
484        query_handles
485    }
486}
487
488/// Waits till all previous blocks are indexed.
489///
490/// The given `our_end` is excluded from the list of unprocessed blocks.
491async fn wait_for_previous_blocks(
492    pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
493    our_end: Slot,
494    current_slot: Slot,
495) -> anyhow::Result<()> {
496    loop {
497        if pending_blocks
498            .borrow_and_update()
499            .iter()
500            .filter(|&&v| v == our_end)
501            .all(|&slot| slot > current_slot)
502        {
503            return Ok(());
504        }
505
506        inc_index_sync();
507
508        pending_blocks
509            .changed()
510            .await
511            .context("Unprocessed blocks channel was closed unexpectedly")?;
512    }
513}