cat_gateway/db/index/block/rbac509/
mod.rs

1//! Index Role-Based Access Control (RBAC) Registration.
2
3pub(crate) mod insert_catalyst_id_for_public_key;
4pub(crate) mod insert_catalyst_id_for_stake_address;
5pub(crate) mod insert_catalyst_id_for_txn_id;
6pub(crate) mod insert_rbac509;
7pub(crate) mod insert_rbac509_invalid;
8
9use std::{
10    collections::{BTreeSet, HashMap, HashSet},
11    sync::Arc,
12};
13
14use anyhow::Context;
15use cardano_chain_follower::{MultiEraBlock, Slot, StakeAddress, TxnIndex, hashes::TransactionId};
16use catalyst_types::{catalyst_id::CatalystId, problem_report::ProblemReport, uuid::UuidV4};
17use ed25519_dalek::VerifyingKey;
18use futures::{StreamExt, TryStreamExt};
19use rbac_registration::{
20    cardano::{cip509::Cip509, provider::RbacChainsProvider as _},
21    registration::cardano::RegistrationChain,
22};
23use scylla::client::session::Session;
24use tokio::sync::watch;
25use tracing::{debug, error};
26
27use crate::{
28    db::index::{
29        queries::{FallibleQueryTasks, SizedBatch},
30        session::CassandraSession,
31    },
32    metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
33    rbac::{RbacBlockIndexingContext, cache_persistent_rbac_chain, provider::RbacChainsProvider},
34    settings::cassandra_db::EnvVars,
35};
36
37/// Index RBAC 509 Registration Query Parameters
38#[derive(Debug)]
39pub(crate) struct Rbac509InsertQuery {
40    /// RBAC Registration Data captured during indexing.
41    pub(crate) registrations: Vec<insert_rbac509::Params>,
42    /// An invalid RBAC registration data.
43    pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
44    /// A Catalyst ID for transaction ID Data captured during indexing.
45    catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
46    /// A Catalyst ID for stake address data captured during indexing.
47    catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
48    /// A Catalyst ID for public key data captured during indexing.
49    catalyst_id_for_public_key: Vec<insert_catalyst_id_for_public_key::Params>,
50}
51
52impl Rbac509InsertQuery {
53    /// Creates new data set for RBAC 509 Registrations Insert Query Batch.
54    pub(crate) fn new() -> Self {
55        Rbac509InsertQuery {
56            registrations: Vec::new(),
57            invalid: Vec::new(),
58            catalyst_id_for_txn_id: Vec::new(),
59            catalyst_id_for_stake_address: Vec::new(),
60            catalyst_id_for_public_key: Vec::new(),
61        }
62    }
63
64    /// Prepare Batch of Insert RBAC 509 Registration Data Queries
65    pub(crate) async fn prepare_batch(
66        session: &Arc<Session>,
67        cfg: &EnvVars,
68    ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
69        Ok((
70            insert_rbac509::Params::prepare_batch(session, cfg).await?,
71            insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
72            insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
73            insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
74            insert_catalyst_id_for_public_key::Params::prepare_batch(session, cfg).await?,
75        ))
76    }
77
78    /// Index the RBAC 509 registrations in a transaction.
79    pub(crate) async fn index(
80        &mut self,
81        txn_hash: TransactionId,
82        index: TxnIndex,
83        block: &MultiEraBlock,
84        pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
85        our_end: Slot,
86        context: &mut RbacBlockIndexingContext,
87    ) -> anyhow::Result<()> {
88        let slot = block.slot();
89        let cip509 = match Cip509::new(block, index, &[]) {
90            Ok(Some(v)) => v,
91            Ok(None) => {
92                // Nothing to index.
93                return Ok(());
94            },
95            Err(e) => {
96                // This registration is either completely corrupted or someone else is using "our"
97                // label (`MetadatumLabel::CIP509_RBAC`). We don't want to index it even as
98                // incorrect.
99                debug!(
100                    slot = ?slot,
101                    index = ?index,
102                    err = ?e,
103                    "Invalid RBAC Registration Metadata in transaction"
104                );
105                return Ok(());
106            },
107        };
108
109        // This should never happen, but let's check anyway.
110        if slot != cip509.origin().point().slot_or_default() {
111            error!(
112                "Cip509 slot mismatch: expected {slot:?}, got {:?}",
113                cip509.origin().point().slot_or_default()
114            );
115        }
116        if txn_hash != cip509.txn_hash() {
117            error!(
118                "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
119                cip509.txn_hash()
120            );
121        }
122
123        // To properly validate a new registration we need to index all the previous blocks, so
124        // here we are going to wait till the other tasks have indexed the blocks before this one.
125        wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?;
126
127        if let Some(previous_txn) = cip509.previous_transaction() {
128            self.try_record_chain_update(&cip509, previous_txn, block.is_immutable(), context)
129                .await?;
130        } else {
131            self.try_record_new_chain(&cip509, block.is_immutable(), context)
132                .await?;
133        }
134
135        Ok(())
136    }
137
138    /// Validating the provided registration as an update for an existing chain, preparing
139    /// corresponding updates
140    async fn try_record_chain_update(
141        &mut self,
142        reg: &Cip509,
143        previous_txn: TransactionId,
144        is_persistent: bool,
145        context: &mut RbacBlockIndexingContext,
146    ) -> anyhow::Result<()> {
147        // Find a chain this registration belongs to.
148        let state = RbacChainsProvider::new(is_persistent, context);
149
150        let slot = reg.origin().point().slot_or_default();
151        let txn_index = reg.origin().txn_index();
152        let txn_hash = reg.txn_hash();
153
154        let Some(catalyst_id) = state.catalyst_id_from_txn_id(previous_txn).await? else {
155            // This isn't a hard error because user input can contain invalid information.
156            // If there is no Catalyst ID, then we cannot record this
157            // registration as invalid and can only ignore (and log) it.
158            debug!(
159                slot = ?slot,
160                txn_index = ?txn_index,
161                txn_hash = ?txn_hash,
162                "Unable to determine Catalyst id for registration"
163            );
164            return Ok(());
165        };
166
167        let chain = state.chain(&catalyst_id).await?.context(format!(
168                "{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'"
169        ))?;
170
171        // Try to add a new registration to the chain.
172        let Some(new_chain) = chain.update(reg, &state).await? else {
173            self.record_invalid_registration(
174                txn_hash,
175                txn_index,
176                slot,
177                chain.catalyst_id().clone(),
178                Some(previous_txn),
179                reg.purpose(),
180                reg.report().clone(),
181            );
182
183            return Ok(());
184        };
185
186        let previous_addresses = chain.stake_addresses();
187        let stake_addresses: HashSet<_> = new_chain
188            .stake_addresses()
189            .difference(&previous_addresses)
190            .cloned()
191            .collect();
192        let public_keys = reg
193            .all_roles()
194            .iter()
195            .filter_map(|v| reg.signing_public_key_for_role(*v))
196            .collect::<HashSet<_>>();
197        // During the chain update, it cannot be an any stake addresses updates
198        let modified_chains = HashMap::new();
199        let purpose = reg.purpose();
200
201        if is_persistent {
202            cache_persistent_rbac_chain(catalyst_id.clone(), new_chain);
203        }
204
205        self.record_valid_registration(
206            txn_hash,
207            txn_index,
208            slot,
209            catalyst_id.clone(),
210            Some(previous_txn),
211            stake_addresses,
212            public_keys,
213            modified_chains,
214            purpose,
215            context,
216        );
217
218        Ok(())
219    }
220
221    /// Validating the provided registration as a beginning of a new chain, preparing
222    /// corresponding updates
223    async fn try_record_new_chain(
224        &mut self,
225        reg: &Cip509,
226        is_persistent: bool,
227        context: &mut RbacBlockIndexingContext,
228    ) -> anyhow::Result<()> {
229        /// Size of the buffered list adaptor of futures.
230        const FUTURES_BUFFER_SIZE: usize = 10;
231
232        let provider = RbacChainsProvider::new(is_persistent, context);
233
234        let slot = reg.origin().point().slot_or_default();
235        let txn_index = reg.origin().txn_index();
236        let txn_hash = reg.txn_hash();
237
238        // Try to start a new chain.
239        let Some(new_chain) = RegistrationChain::new(reg, &provider).await? else {
240            if let Some(cat_id) = reg.catalyst_id() {
241                self.record_invalid_registration(
242                    txn_hash,
243                    txn_index,
244                    slot,
245                    cat_id.clone(),
246                    None,
247                    reg.purpose(),
248                    reg.report().clone(),
249                );
250            } else {
251                // This isn't a hard error because user input can contain invalid information.
252                // If there is no Catalyst ID, then we cannot record this
253                // registration as invalid and can only ignore (and log) it.
254                debug!(
255                    slot = ?slot,
256                    txn_index = ?txn_index,
257                    txn_hash = ?txn_hash,
258                    "Unable to determine Catalyst id for registration"
259                );
260            }
261
262            return Ok(());
263        };
264
265        let catalyst_id = new_chain.catalyst_id();
266        let stake_addresses = new_chain.stake_addresses();
267        let public_keys = reg
268            .all_roles()
269            .iter()
270            .filter_map(|v| reg.signing_public_key_for_role(*v))
271            .collect::<HashSet<_>>();
272        let purpose = reg.purpose();
273
274        let futures = reg.stake_addresses().clone().into_iter().map(|addr| {
275            async {
276                anyhow::Ok((
277                    provider.chain_catalyst_id_from_stake_address(&addr).await?,
278                    addr,
279                ))
280            }
281        });
282        let modified_chains = futures::stream::iter(futures)
283            .buffer_unordered(FUTURES_BUFFER_SIZE)
284            .try_fold(
285                HashMap::<CatalystId, HashSet<StakeAddress>>::new(),
286                |mut acc, (cat_id, addr)| {
287                    async {
288                        if let Some(cat_id) = cat_id {
289                            acc.entry(cat_id).or_default().insert(addr);
290                        }
291                        anyhow::Ok(acc)
292                    }
293                },
294            )
295            .await?;
296
297        self.record_valid_registration(
298            txn_hash,
299            txn_index,
300            slot,
301            catalyst_id.clone(),
302            None,
303            stake_addresses,
304            public_keys,
305            modified_chains,
306            purpose,
307            context,
308        );
309
310        Ok(())
311    }
312
313    /// Making corresponding records according to the valid registration
314    #[allow(clippy::too_many_arguments)]
315    fn record_valid_registration(
316        &mut self,
317        txn_hash: TransactionId,
318        index: TxnIndex,
319        slot: Slot,
320        catalyst_id: CatalystId,
321        previous_transaction: Option<TransactionId>,
322        stake_addresses: HashSet<StakeAddress>,
323        public_keys: HashSet<VerifyingKey>,
324        modified_chains: HashMap<CatalystId, HashSet<StakeAddress>>,
325        purpose: Option<UuidV4>,
326        context: &mut RbacBlockIndexingContext,
327    ) {
328        context.insert_transaction(txn_hash, catalyst_id.clone());
329        context.insert_addresses(stake_addresses.clone(), &catalyst_id);
330        context.insert_public_keys(public_keys.clone(), &catalyst_id);
331        context.insert_registration(
332            catalyst_id.clone(),
333            txn_hash,
334            slot,
335            index,
336            previous_transaction,
337        );
338
339        // Record the transaction identifier (hash) of a new registration.
340        self.catalyst_id_for_txn_id
341            .push(insert_catalyst_id_for_txn_id::Params::new(
342                catalyst_id.clone(),
343                txn_hash,
344                slot,
345            ));
346        // Record new stake addresses.
347        for address in stake_addresses {
348            self.catalyst_id_for_stake_address.push(
349                insert_catalyst_id_for_stake_address::Params::new(
350                    address,
351                    slot,
352                    index,
353                    catalyst_id.clone(),
354                ),
355            );
356        }
357        // Record new public keys.
358        for key in public_keys {
359            self.catalyst_id_for_public_key
360                .push(insert_catalyst_id_for_public_key::Params::new(
361                    key,
362                    slot,
363                    catalyst_id.clone(),
364                ));
365        }
366        // Update the chain this registration belongs to.
367        self.registrations.push(insert_rbac509::Params::new(
368            catalyst_id,
369            txn_hash,
370            slot,
371            index,
372            previous_transaction,
373            purpose,
374        ));
375
376        // Update other chains that were affected by this registration.
377        for (catalyst_id, _) in modified_chains {
378            self.registrations.push(insert_rbac509::Params::new(
379                catalyst_id.clone(),
380                txn_hash,
381                slot,
382                index,
383                // In this case the addresses were removed by another chain, so it doesn't
384                // make sense to include a previous transaction ID unrelated to the chain
385                // that is being updated.
386                None,
387                None,
388            ));
389        }
390    }
391
392    /// Making corresponding records according to the valid registration
393    #[allow(clippy::too_many_arguments)]
394    fn record_invalid_registration(
395        &mut self,
396        txn_hash: TransactionId,
397        index: TxnIndex,
398        slot: Slot,
399        catalyst_id: CatalystId,
400        previous_transaction: Option<TransactionId>,
401        purpose: Option<UuidV4>,
402        report: ProblemReport,
403    ) {
404        inc_invalid_rbac_reg_count();
405        self.invalid.push(insert_rbac509_invalid::Params::new(
406            catalyst_id,
407            txn_hash,
408            slot,
409            index,
410            purpose,
411            previous_transaction,
412            report,
413        ));
414    }
415
416    /// Execute the RBAC 509 Registration Indexing Queries.
417    ///
418    /// Consumes the `self` and returns a vector of futures.
419    pub(crate) fn execute(
420        self,
421        session: &Arc<CassandraSession>,
422    ) -> FallibleQueryTasks {
423        let mut query_handles: FallibleQueryTasks = Vec::new();
424
425        if !self.registrations.is_empty() {
426            let inner_session = session.clone();
427            query_handles.push(tokio::spawn(async move {
428                insert_rbac509::Params::execute_batch(&inner_session, self.registrations).await
429            }));
430        }
431
432        if !self.invalid.is_empty() {
433            let inner_session = session.clone();
434            query_handles.push(tokio::spawn(async move {
435                insert_rbac509_invalid::Params::execute_batch(&inner_session, self.invalid).await
436            }));
437        }
438
439        if !self.catalyst_id_for_txn_id.is_empty() {
440            let inner_session = session.clone();
441            query_handles.push(tokio::spawn(async move {
442                insert_catalyst_id_for_txn_id::Params::execute_batch(
443                    &inner_session,
444                    self.catalyst_id_for_txn_id,
445                )
446                .await
447            }));
448        }
449
450        if !self.catalyst_id_for_stake_address.is_empty() {
451            let inner_session = session.clone();
452            query_handles.push(tokio::spawn(async move {
453                insert_catalyst_id_for_stake_address::Params::execute_batch(
454                    &inner_session,
455                    self.catalyst_id_for_stake_address,
456                )
457                .await
458            }));
459        }
460
461        if !self.catalyst_id_for_public_key.is_empty() {
462            let inner_session = session.clone();
463            query_handles.push(tokio::spawn(async move {
464                insert_catalyst_id_for_public_key::Params::execute_batch(
465                    &inner_session,
466                    self.catalyst_id_for_public_key,
467                )
468                .await
469            }));
470        }
471
472        query_handles
473    }
474}
475
476/// Waits till all previous blocks are indexed.
477///
478/// The given `our_end` is excluded from the list of unprocessed blocks.
479async fn wait_for_previous_blocks(
480    pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
481    our_end: Slot,
482    current_slot: Slot,
483) -> anyhow::Result<()> {
484    loop {
485        if pending_blocks
486            .borrow_and_update()
487            .iter()
488            .filter(|&&v| v == our_end)
489            .all(|&slot| slot > current_slot)
490        {
491            return Ok(());
492        }
493
494        inc_index_sync();
495
496        pending_blocks
497            .changed()
498            .await
499            .context("Unprocessed blocks channel was closed unexpectedly")?;
500    }
501}