cat_gateway/db/index/block/rbac509/
mod.rs

1//! Index Role-Based Access Control (RBAC) Registration.
2
3pub(crate) mod insert_catalyst_id_for_public_key;
4pub(crate) mod insert_catalyst_id_for_stake_address;
5pub(crate) mod insert_catalyst_id_for_txn_id;
6pub(crate) mod insert_rbac509;
7pub(crate) mod insert_rbac509_invalid;
8
9use std::{
10    collections::{BTreeSet, HashMap, HashSet},
11    sync::Arc,
12};
13
14use anyhow::Context;
15use cardano_chain_follower::{MultiEraBlock, Slot, StakeAddress, TxnIndex, hashes::TransactionId};
16use catalyst_types::{catalyst_id::CatalystId, problem_report::ProblemReport, uuid::UuidV4};
17use ed25519_dalek::VerifyingKey;
18use rbac_registration::{
19    cardano::{cip509::Cip509, state::RbacChainsState as _},
20    registration::cardano::RegistrationChain,
21};
22use scylla::client::session::Session;
23use tokio::sync::watch;
24use tracing::{debug, error};
25
26use crate::{
27    db::index::{
28        queries::{FallibleQueryTasks, SizedBatch},
29        session::CassandraSession,
30    },
31    metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
32    rbac::{RbacBlockIndexingContext, cache_persistent_rbac_chain, state::RbacChainsState},
33    settings::cassandra_db::EnvVars,
34};
35
36/// Index RBAC 509 Registration Query Parameters
37#[derive(Debug)]
38pub(crate) struct Rbac509InsertQuery {
39    /// RBAC Registration Data captured during indexing.
40    pub(crate) registrations: Vec<insert_rbac509::Params>,
41    /// An invalid RBAC registration data.
42    pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
43    /// A Catalyst ID for transaction ID Data captured during indexing.
44    catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
45    /// A Catalyst ID for stake address data captured during indexing.
46    catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
47    /// A Catalyst ID for public key data captured during indexing.
48    catalyst_id_for_public_key: Vec<insert_catalyst_id_for_public_key::Params>,
49}
50
51impl Rbac509InsertQuery {
52    /// Creates new data set for RBAC 509 Registrations Insert Query Batch.
53    pub(crate) fn new() -> Self {
54        Rbac509InsertQuery {
55            registrations: Vec::new(),
56            invalid: Vec::new(),
57            catalyst_id_for_txn_id: Vec::new(),
58            catalyst_id_for_stake_address: Vec::new(),
59            catalyst_id_for_public_key: Vec::new(),
60        }
61    }
62
63    /// Prepare Batch of Insert RBAC 509 Registration Data Queries
64    pub(crate) async fn prepare_batch(
65        session: &Arc<Session>,
66        cfg: &EnvVars,
67    ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
68        Ok((
69            insert_rbac509::Params::prepare_batch(session, cfg).await?,
70            insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
71            insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
72            insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
73            insert_catalyst_id_for_public_key::Params::prepare_batch(session, cfg).await?,
74        ))
75    }
76
77    /// Index the RBAC 509 registrations in a transaction.
78    pub(crate) async fn index(
79        &mut self,
80        txn_hash: TransactionId,
81        index: TxnIndex,
82        block: &MultiEraBlock,
83        pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
84        our_end: Slot,
85        context: &mut RbacBlockIndexingContext,
86    ) -> anyhow::Result<()> {
87        let slot = block.slot();
88        let cip509 = match Cip509::new(block, index, &[]) {
89            Ok(Some(v)) => v,
90            Ok(None) => {
91                // Nothing to index.
92                return Ok(());
93            },
94            Err(e) => {
95                // This registration is either completely corrupted or someone else is using "our"
96                // label (`MetadatumLabel::CIP509_RBAC`). We don't want to index it even as
97                // incorrect.
98                debug!(
99                    slot = ?slot,
100                    index = ?index,
101                    err = ?e,
102                    "Invalid RBAC Registration Metadata in transaction"
103                );
104                return Ok(());
105            },
106        };
107
108        // This should never happen, but let's check anyway.
109        if slot != cip509.origin().point().slot_or_default() {
110            error!(
111                "Cip509 slot mismatch: expected {slot:?}, got {:?}",
112                cip509.origin().point().slot_or_default()
113            );
114        }
115        if txn_hash != cip509.txn_hash() {
116            error!(
117                "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
118                cip509.txn_hash()
119            );
120        }
121
122        // To properly validate a new registration we need to index all the previous blocks, so
123        // here we are going to wait till the other tasks have indexed the blocks before this one.
124        wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?;
125
126        if let Some(previous_txn) = cip509.previous_transaction() {
127            self.try_record_chain_update(&cip509, previous_txn, block.is_immutable(), context)
128                .await?;
129        } else {
130            self.try_record_new_chain(&cip509, block.is_immutable(), context)
131                .await?;
132        }
133
134        Ok(())
135    }
136
137    /// Validating the provided registration as an update for an existing chain, preparing
138    /// corresponding updates
139    async fn try_record_chain_update(
140        &mut self,
141        reg: &Cip509,
142        previous_txn: TransactionId,
143        is_persistent: bool,
144        context: &mut RbacBlockIndexingContext,
145    ) -> anyhow::Result<()> {
146        // Find a chain this registration belongs to.
147        let state = RbacChainsState::new(is_persistent, context);
148
149        let slot = reg.origin().point().slot_or_default();
150        let txn_index = reg.origin().txn_index();
151        let txn_hash = reg.txn_hash();
152
153        let Some(catalyst_id) = state.catalyst_id_from_txn_id(previous_txn).await? else {
154            // This isn't a hard error because user input can contain invalid information.
155            // If there is no Catalyst ID, then we cannot record this
156            // registration as invalid and can only ignore (and log) it.
157            debug!(
158                slot = ?slot,
159                txn_index = ?txn_index,
160                txn_hash = ?txn_hash,
161                "Unable to determine Catalyst id for registration"
162            );
163            return Ok(());
164        };
165
166        let chain = state.chain(&catalyst_id).await?.context(format!(
167                "{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'"
168        ))?;
169
170        // Try to add a new registration to the chain.
171        let Some(new_chain) = chain.update(reg, &state).await? else {
172            self.record_invalid_registration(
173                txn_hash,
174                txn_index,
175                slot,
176                chain.catalyst_id().clone(),
177                Some(previous_txn),
178                reg.purpose(),
179                reg.report().clone(),
180            );
181
182            return Ok(());
183        };
184
185        let previous_addresses = chain.stake_addresses();
186        let stake_addresses: HashSet<_> = new_chain
187            .stake_addresses()
188            .difference(&previous_addresses)
189            .cloned()
190            .collect();
191        let public_keys = reg
192            .all_roles()
193            .iter()
194            .filter_map(|v| reg.signing_public_key_for_role(*v))
195            .collect::<HashSet<_>>();
196        let modified_chains = state.consume();
197        let purpose = reg.purpose();
198
199        if is_persistent {
200            cache_persistent_rbac_chain(catalyst_id.clone(), new_chain);
201        }
202
203        self.record_valid_registration(
204            txn_hash,
205            txn_index,
206            slot,
207            catalyst_id.clone(),
208            Some(previous_txn),
209            stake_addresses,
210            public_keys,
211            modified_chains,
212            purpose,
213            context,
214        );
215
216        Ok(())
217    }
218
219    /// Validating the provided registration as a beginning of a new chain, preparing
220    /// corresponding updates
221    async fn try_record_new_chain(
222        &mut self,
223        reg: &Cip509,
224        is_persistent: bool,
225        context: &mut RbacBlockIndexingContext,
226    ) -> anyhow::Result<()> {
227        let mut state = RbacChainsState::new(is_persistent, context);
228
229        let slot = reg.origin().point().slot_or_default();
230        let txn_index = reg.origin().txn_index();
231        let txn_hash = reg.txn_hash();
232
233        // Try to start a new chain.
234        let Some(new_chain) = RegistrationChain::new(reg, &mut state).await? else {
235            if let Some(cat_id) = reg.catalyst_id() {
236                self.record_invalid_registration(
237                    txn_hash,
238                    txn_index,
239                    slot,
240                    cat_id.clone(),
241                    None,
242                    reg.purpose(),
243                    reg.report().clone(),
244                );
245            } else {
246                // This isn't a hard error because user input can contain invalid information.
247                // If there is no Catalyst ID, then we cannot record this
248                // registration as invalid and can only ignore (and log) it.
249                debug!(
250                    slot = ?slot,
251                    txn_index = ?txn_index,
252                    txn_hash = ?txn_hash,
253                    "Unable to determine Catalyst id for registration"
254                );
255            }
256
257            return Ok(());
258        };
259
260        let catalyst_id = new_chain.catalyst_id();
261        let stake_addresses = new_chain.stake_addresses();
262        let public_keys = reg
263            .all_roles()
264            .iter()
265            .filter_map(|v| reg.signing_public_key_for_role(*v))
266            .collect::<HashSet<_>>();
267        let modified_chains = state.consume();
268        let purpose = reg.purpose();
269
270        self.record_valid_registration(
271            txn_hash,
272            txn_index,
273            slot,
274            catalyst_id.clone(),
275            None,
276            stake_addresses,
277            public_keys,
278            modified_chains,
279            purpose,
280            context,
281        );
282
283        Ok(())
284    }
285
286    /// Making corresponding records according to the valid registration
287    #[allow(clippy::too_many_arguments)]
288    fn record_valid_registration(
289        &mut self,
290        txn_hash: TransactionId,
291        index: TxnIndex,
292        slot: Slot,
293        catalyst_id: CatalystId,
294        previous_transaction: Option<TransactionId>,
295        stake_addresses: HashSet<StakeAddress>,
296        public_keys: HashSet<VerifyingKey>,
297        modified_chains: HashMap<CatalystId, HashSet<StakeAddress>>,
298        purpose: Option<UuidV4>,
299        context: &mut RbacBlockIndexingContext,
300    ) {
301        context.insert_transaction(txn_hash, catalyst_id.clone());
302        context.insert_addresses(stake_addresses.clone(), &catalyst_id);
303        context.insert_public_keys(public_keys.clone(), &catalyst_id);
304        context.insert_registration(
305            catalyst_id.clone(),
306            txn_hash,
307            slot,
308            index,
309            previous_transaction,
310        );
311
312        // Record the transaction identifier (hash) of a new registration.
313        self.catalyst_id_for_txn_id
314            .push(insert_catalyst_id_for_txn_id::Params::new(
315                catalyst_id.clone(),
316                txn_hash,
317                slot,
318            ));
319        // Record new stake addresses.
320        for address in stake_addresses {
321            self.catalyst_id_for_stake_address.push(
322                insert_catalyst_id_for_stake_address::Params::new(
323                    address,
324                    slot,
325                    index,
326                    catalyst_id.clone(),
327                ),
328            );
329        }
330        // Record new public keys.
331        for key in public_keys {
332            self.catalyst_id_for_public_key
333                .push(insert_catalyst_id_for_public_key::Params::new(
334                    key,
335                    slot,
336                    catalyst_id.clone(),
337                ));
338        }
339        // Update the chain this registration belongs to.
340        self.registrations.push(insert_rbac509::Params::new(
341            catalyst_id,
342            txn_hash,
343            slot,
344            index,
345            previous_transaction,
346            purpose,
347        ));
348
349        // Update other chains that were affected by this registration.
350        for (catalyst_id, _) in modified_chains {
351            self.registrations.push(insert_rbac509::Params::new(
352                catalyst_id.clone(),
353                txn_hash,
354                slot,
355                index,
356                // In this case the addresses were removed by another chain, so it doesn't
357                // make sense to include a previous transaction ID unrelated to the chain
358                // that is being updated.
359                None,
360                None,
361            ));
362        }
363    }
364
365    /// Making corresponding records according to the valid registration
366    #[allow(clippy::too_many_arguments)]
367    fn record_invalid_registration(
368        &mut self,
369        txn_hash: TransactionId,
370        index: TxnIndex,
371        slot: Slot,
372        catalyst_id: CatalystId,
373        previous_transaction: Option<TransactionId>,
374        purpose: Option<UuidV4>,
375        report: ProblemReport,
376    ) {
377        inc_invalid_rbac_reg_count();
378        self.invalid.push(insert_rbac509_invalid::Params::new(
379            catalyst_id,
380            txn_hash,
381            slot,
382            index,
383            purpose,
384            previous_transaction,
385            report,
386        ));
387    }
388
389    /// Execute the RBAC 509 Registration Indexing Queries.
390    ///
391    /// Consumes the `self` and returns a vector of futures.
392    pub(crate) fn execute(
393        self,
394        session: &Arc<CassandraSession>,
395    ) -> FallibleQueryTasks {
396        let mut query_handles: FallibleQueryTasks = Vec::new();
397
398        if !self.registrations.is_empty() {
399            let inner_session = session.clone();
400            query_handles.push(tokio::spawn(async move {
401                insert_rbac509::Params::execute_batch(&inner_session, self.registrations).await
402            }));
403        }
404
405        if !self.invalid.is_empty() {
406            let inner_session = session.clone();
407            query_handles.push(tokio::spawn(async move {
408                insert_rbac509_invalid::Params::execute_batch(&inner_session, self.invalid).await
409            }));
410        }
411
412        if !self.catalyst_id_for_txn_id.is_empty() {
413            let inner_session = session.clone();
414            query_handles.push(tokio::spawn(async move {
415                insert_catalyst_id_for_txn_id::Params::execute_batch(
416                    &inner_session,
417                    self.catalyst_id_for_txn_id,
418                )
419                .await
420            }));
421        }
422
423        if !self.catalyst_id_for_stake_address.is_empty() {
424            let inner_session = session.clone();
425            query_handles.push(tokio::spawn(async move {
426                insert_catalyst_id_for_stake_address::Params::execute_batch(
427                    &inner_session,
428                    self.catalyst_id_for_stake_address,
429                )
430                .await
431            }));
432        }
433
434        if !self.catalyst_id_for_public_key.is_empty() {
435            let inner_session = session.clone();
436            query_handles.push(tokio::spawn(async move {
437                insert_catalyst_id_for_public_key::Params::execute_batch(
438                    &inner_session,
439                    self.catalyst_id_for_public_key,
440                )
441                .await
442            }));
443        }
444
445        query_handles
446    }
447}
448
449/// Waits till all previous blocks are indexed.
450///
451/// The given `our_end` is excluded from the list of unprocessed blocks.
452async fn wait_for_previous_blocks(
453    pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
454    our_end: Slot,
455    current_slot: Slot,
456) -> anyhow::Result<()> {
457    loop {
458        if pending_blocks
459            .borrow_and_update()
460            .iter()
461            .filter(|&&v| v == our_end)
462            .all(|&slot| slot > current_slot)
463        {
464            return Ok(());
465        }
466
467        inc_index_sync();
468
469        pending_blocks
470            .changed()
471            .await
472            .context("Unprocessed blocks channel was closed unexpectedly")?;
473    }
474}