cat_gateway/db/index/block/rbac509/
mod.rs

1//! Index Role-Based Access Control (RBAC) Registration.
2
3pub(crate) mod insert_catalyst_id_for_public_key;
4pub(crate) mod insert_catalyst_id_for_stake_address;
5pub(crate) mod insert_catalyst_id_for_txn_id;
6pub(crate) mod insert_rbac509;
7pub(crate) mod insert_rbac509_invalid;
8
9use std::{
10    collections::{BTreeSet, HashSet},
11    sync::Arc,
12};
13
14use anyhow::{Context, Result};
15use cardano_chain_follower::{hashes::TransactionId, MultiEraBlock, Slot, TxnIndex};
16use rbac_registration::cardano::cip509::Cip509;
17use scylla::client::session::Session;
18use tokio::sync::watch;
19use tracing::{debug, error};
20
21use crate::{
22    db::index::{
23        queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
24        session::CassandraSession,
25    },
26    metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
27    rbac::{
28        validate_rbac_registration, RbacBlockIndexingContext, RbacValidationError,
29        RbacValidationSuccess,
30    },
31    settings::cassandra_db::EnvVars,
32};
33
34/// Index RBAC 509 Registration Query Parameters
35#[derive(Debug)]
36pub(crate) struct Rbac509InsertQuery {
37    /// RBAC Registration Data captured during indexing.
38    pub(crate) registrations: Vec<insert_rbac509::Params>,
39    /// An invalid RBAC registration data.
40    pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
41    /// A Catalyst ID for transaction ID Data captured during indexing.
42    catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
43    /// A Catalyst ID for stake address data captured during indexing.
44    catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
45    /// A Catalyst ID for public key data captured during indexing.
46    catalyst_id_for_public_key: Vec<insert_catalyst_id_for_public_key::Params>,
47}
48
49impl Rbac509InsertQuery {
50    /// Creates new data set for RBAC 509 Registrations Insert Query Batch.
51    pub(crate) fn new() -> Self {
52        Rbac509InsertQuery {
53            registrations: Vec::new(),
54            invalid: Vec::new(),
55            catalyst_id_for_txn_id: Vec::new(),
56            catalyst_id_for_stake_address: Vec::new(),
57            catalyst_id_for_public_key: Vec::new(),
58        }
59    }
60
61    /// Prepare Batch of Insert RBAC 509 Registration Data Queries
62    pub(crate) async fn prepare_batch(
63        session: &Arc<Session>, cfg: &EnvVars,
64    ) -> Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
65        Ok((
66            insert_rbac509::Params::prepare_batch(session, cfg).await?,
67            insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
68            insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
69            insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
70            insert_catalyst_id_for_public_key::Params::prepare_batch(session, cfg).await?,
71        ))
72    }
73
74    /// Index the RBAC 509 registrations in a transaction.
75    #[allow(clippy::too_many_lines)]
76    pub(crate) async fn index(
77        &mut self, txn_hash: TransactionId, index: TxnIndex, block: &MultiEraBlock,
78        pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>, our_end: Slot,
79        context: &mut RbacBlockIndexingContext,
80    ) -> Result<()> {
81        let slot = block.slot();
82        let cip509 = match Cip509::new(block, index, &[]) {
83            Ok(Some(v)) => v,
84            Ok(None) => {
85                // Nothing to index.
86                return Ok(());
87            },
88            Err(e) => {
89                // This registration is either completely corrupted or someone else is using "our"
90                // label (`MetadatumLabel::CIP509_RBAC`). We don't want to index it even as
91                // incorrect.
92                debug!(
93                    slot = ?slot,
94                    index = ?index,
95                    err = ?e,
96                    "Invalid RBAC Registration Metadata in transaction"
97                );
98                return Ok(());
99            },
100        };
101
102        // This should never happen, but let's check anyway.
103        if slot != cip509.origin().point().slot_or_default() {
104            error!(
105                "Cip509 slot mismatch: expected {slot:?}, got {:?}",
106                cip509.origin().point().slot_or_default()
107            );
108        }
109        if txn_hash != cip509.txn_hash() {
110            error!(
111                "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
112                cip509.txn_hash()
113            );
114        }
115
116        // To properly validate a new registration we need to index all the previous blocks, so
117        // here we are going to wait till the other tasks have indexed the blocks before this one.
118        wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?;
119
120        let previous_transaction = cip509.previous_transaction();
121        // `Box::pin` is used here because of the future size (`clippy::large_futures` lint).
122        match Box::pin(validate_rbac_registration(
123            cip509,
124            block.is_immutable(),
125            context,
126        ))
127        .await
128        {
129            // Write updates to the database. There can be multiple updates in one registration
130            // because a new chain can take ownership of stake addresses of the existing chains and
131            // in that case we want to record changes to all those chains as well as the new one.
132            Ok(RbacValidationSuccess {
133                catalyst_id,
134                stake_addresses,
135                public_keys,
136                modified_chains,
137                purpose,
138            }) => {
139                // Record the transaction identifier (hash) of a new registration.
140                self.catalyst_id_for_txn_id
141                    .push(insert_catalyst_id_for_txn_id::Params::new(
142                        catalyst_id.clone(),
143                        txn_hash,
144                        slot,
145                    ));
146                // Record new stake addresses.
147                for address in stake_addresses {
148                    self.catalyst_id_for_stake_address.push(
149                        insert_catalyst_id_for_stake_address::Params::new(
150                            address,
151                            slot,
152                            index,
153                            catalyst_id.clone(),
154                        ),
155                    );
156                }
157                // Record new public keys.
158                for key in public_keys {
159                    self.catalyst_id_for_public_key.push(
160                        insert_catalyst_id_for_public_key::Params::new(
161                            key,
162                            slot,
163                            catalyst_id.clone(),
164                        ),
165                    );
166                }
167                // Update the chain this registration belongs to.
168                self.registrations.push(insert_rbac509::Params::new(
169                    catalyst_id,
170                    txn_hash,
171                    slot,
172                    index,
173                    previous_transaction,
174                    // Addresses can only be removed from other chains, so this list is always
175                    // empty for the chain that is being updated.
176                    HashSet::new(),
177                    purpose,
178                ));
179
180                // Update other chains that were affected by this registration.
181                for (catalyst_id, removed_addresses) in modified_chains {
182                    self.registrations.push(insert_rbac509::Params::new(
183                        catalyst_id.clone(),
184                        txn_hash,
185                        slot,
186                        index,
187                        // In this case the addresses were removed by another chain, so it doesn't
188                        // make sense to include a previous transaction ID unrelated to the chain
189                        // that is being updated.
190                        None,
191                        removed_addresses,
192                        None,
193                    ));
194                }
195            },
196            // Invalid registrations are being recorded in order to report failure.
197            Err(RbacValidationError::InvalidRegistration {
198                catalyst_id,
199                purpose,
200                report,
201            }) => {
202                inc_invalid_rbac_reg_count();
203                self.invalid.push(insert_rbac509_invalid::Params::new(
204                    catalyst_id,
205                    txn_hash,
206                    slot,
207                    index,
208                    purpose,
209                    previous_transaction,
210                    &report,
211                ));
212            },
213            // This isn't a hard error because user input can contain invalid information. If there
214            // is no Catalyst ID, then we cannot record this registration as invalid and can only
215            // ignore (and log) it.
216            Err(RbacValidationError::UnknownCatalystId) => {
217                debug!(
218                    slot = ?slot,
219                    index = ?index,
220                    txn_hash = ?txn_hash,
221                    "Unable to determine Catalyst id for registration"
222                );
223            },
224            Err(RbacValidationError::Fatal(e)) => {
225                error!(
226                   slot = ?slot,
227                   index = ?index,
228                   txn_hash = ?txn_hash,
229                   err = ?e,
230                   "Error indexing RBAC registration"
231                );
232                // Propagate an error.
233                return Err(e);
234            },
235        }
236
237        Ok(())
238    }
239
240    /// Execute the RBAC 509 Registration Indexing Queries.
241    ///
242    /// Consumes the `self` and returns a vector of futures.
243    pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
244        let mut query_handles: FallibleQueryTasks = Vec::new();
245
246        if !self.registrations.is_empty() {
247            let inner_session = session.clone();
248            query_handles.push(tokio::spawn(async move {
249                inner_session
250                    .execute_batch(PreparedQuery::Rbac509InsertQuery, self.registrations)
251                    .await
252            }));
253        }
254
255        if !self.invalid.is_empty() {
256            let inner_session = session.clone();
257            query_handles.push(tokio::spawn(async move {
258                inner_session
259                    .execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, self.invalid)
260                    .await
261            }));
262        }
263
264        if !self.catalyst_id_for_txn_id.is_empty() {
265            let inner_session = session.clone();
266            query_handles.push(tokio::spawn(async move {
267                inner_session
268                    .execute_batch(
269                        PreparedQuery::CatalystIdForTxnIdInsertQuery,
270                        self.catalyst_id_for_txn_id,
271                    )
272                    .await
273            }));
274        }
275
276        if !self.catalyst_id_for_stake_address.is_empty() {
277            let inner_session = session.clone();
278            query_handles.push(tokio::spawn(async move {
279                inner_session
280                    .execute_batch(
281                        PreparedQuery::CatalystIdForStakeAddressInsertQuery,
282                        self.catalyst_id_for_stake_address,
283                    )
284                    .await
285            }));
286        }
287
288        if !self.catalyst_id_for_public_key.is_empty() {
289            let inner_session = session.clone();
290            query_handles.push(tokio::spawn(async move {
291                inner_session
292                    .execute_batch(
293                        PreparedQuery::CatalystIdForPublicKeyInsertQuery,
294                        self.catalyst_id_for_public_key,
295                    )
296                    .await
297            }));
298        }
299
300        query_handles
301    }
302}
303
304/// Waits till all previous blocks are indexed.
305///
306/// The given `our_end` is excluded from the list of unprocessed blocks.
307async fn wait_for_previous_blocks(
308    pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>, our_end: Slot, current_slot: Slot,
309) -> Result<()> {
310    loop {
311        if pending_blocks
312            .borrow_and_update()
313            .iter()
314            .filter(|&&v| v == our_end)
315            .all(|&slot| slot > current_slot)
316        {
317            return Ok(());
318        }
319
320        inc_index_sync();
321
322        pending_blocks
323            .changed()
324            .await
325            .context("Unprocessed blocks channel was closed unexpectedly")?;
326    }
327}