cat_gateway/db/index/block/rbac509/
mod.rs

1//! Index Role-Based Access Control (RBAC) Registration.
2
3pub(crate) mod insert_catalyst_id_for_public_key;
4pub(crate) mod insert_catalyst_id_for_stake_address;
5pub(crate) mod insert_catalyst_id_for_txn_id;
6pub(crate) mod insert_rbac509;
7pub(crate) mod insert_rbac509_invalid;
8
9use std::{
10    collections::{BTreeSet, HashSet},
11    sync::Arc,
12};
13
14use anyhow::{Context, Result};
15use cardano_chain_follower::{hashes::TransactionId, MultiEraBlock, Slot, TxnIndex};
16use rbac_registration::cardano::cip509::Cip509;
17use scylla::client::session::Session;
18use tokio::sync::watch;
19use tracing::{debug, error};
20
21use crate::{
22    db::index::{
23        queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
24        session::CassandraSession,
25    },
26    metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
27    rbac::{
28        validate_rbac_registration, RbacBlockIndexingContext, RbacValidationError,
29        RbacValidationSuccess,
30    },
31    settings::cassandra_db::EnvVars,
32};
33
34/// Index RBAC 509 Registration Query Parameters
35#[derive(Debug)]
36pub(crate) struct Rbac509InsertQuery {
37    /// RBAC Registration Data captured during indexing.
38    pub(crate) registrations: Vec<insert_rbac509::Params>,
39    /// An invalid RBAC registration data.
40    pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
41    /// A Catalyst ID for transaction ID Data captured during indexing.
42    catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
43    /// A Catalyst ID for stake address data captured during indexing.
44    catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
45    /// A Catalyst ID for public key data captured during indexing.
46    catalyst_id_for_public_key: Vec<insert_catalyst_id_for_public_key::Params>,
47}
48
49impl Rbac509InsertQuery {
50    /// Creates new data set for RBAC 509 Registrations Insert Query Batch.
51    pub(crate) fn new() -> Self {
52        Rbac509InsertQuery {
53            registrations: Vec::new(),
54            invalid: Vec::new(),
55            catalyst_id_for_txn_id: Vec::new(),
56            catalyst_id_for_stake_address: Vec::new(),
57            catalyst_id_for_public_key: Vec::new(),
58        }
59    }
60
61    /// Prepare Batch of Insert RBAC 509 Registration Data Queries
62    pub(crate) async fn prepare_batch(
63        session: &Arc<Session>,
64        cfg: &EnvVars,
65    ) -> Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
66        Ok((
67            insert_rbac509::Params::prepare_batch(session, cfg).await?,
68            insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
69            insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
70            insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
71            insert_catalyst_id_for_public_key::Params::prepare_batch(session, cfg).await?,
72        ))
73    }
74
75    /// Index the RBAC 509 registrations in a transaction.
76    #[allow(clippy::too_many_lines)]
77    pub(crate) async fn index(
78        &mut self,
79        txn_hash: TransactionId,
80        index: TxnIndex,
81        block: &MultiEraBlock,
82        pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
83        our_end: Slot,
84        context: &mut RbacBlockIndexingContext,
85    ) -> Result<()> {
86        let slot = block.slot();
87        let cip509 = match Cip509::new(block, index, &[]) {
88            Ok(Some(v)) => v,
89            Ok(None) => {
90                // Nothing to index.
91                return Ok(());
92            },
93            Err(e) => {
94                // This registration is either completely corrupted or someone else is using "our"
95                // label (`MetadatumLabel::CIP509_RBAC`). We don't want to index it even as
96                // incorrect.
97                debug!(
98                    slot = ?slot,
99                    index = ?index,
100                    err = ?e,
101                    "Invalid RBAC Registration Metadata in transaction"
102                );
103                return Ok(());
104            },
105        };
106
107        // This should never happen, but let's check anyway.
108        if slot != cip509.origin().point().slot_or_default() {
109            error!(
110                "Cip509 slot mismatch: expected {slot:?}, got {:?}",
111                cip509.origin().point().slot_or_default()
112            );
113        }
114        if txn_hash != cip509.txn_hash() {
115            error!(
116                "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
117                cip509.txn_hash()
118            );
119        }
120
121        // To properly validate a new registration we need to index all the previous blocks, so
122        // here we are going to wait till the other tasks have indexed the blocks before this one.
123        wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?;
124
125        let previous_transaction = cip509.previous_transaction();
126        // `Box::pin` is used here because of the future size (`clippy::large_futures` lint).
127        match Box::pin(validate_rbac_registration(
128            cip509,
129            block.is_immutable(),
130            context,
131        ))
132        .await
133        {
134            // Write updates to the database. There can be multiple updates in one registration
135            // because a new chain can take ownership of stake addresses of the existing chains and
136            // in that case we want to record changes to all those chains as well as the new one.
137            Ok(RbacValidationSuccess {
138                catalyst_id,
139                stake_addresses,
140                public_keys,
141                modified_chains,
142                purpose,
143            }) => {
144                // Record the transaction identifier (hash) of a new registration.
145                self.catalyst_id_for_txn_id
146                    .push(insert_catalyst_id_for_txn_id::Params::new(
147                        catalyst_id.clone(),
148                        txn_hash,
149                        slot,
150                    ));
151                // Record new stake addresses.
152                for address in stake_addresses {
153                    self.catalyst_id_for_stake_address.push(
154                        insert_catalyst_id_for_stake_address::Params::new(
155                            address,
156                            slot,
157                            index,
158                            catalyst_id.clone(),
159                        ),
160                    );
161                }
162                // Record new public keys.
163                for key in public_keys {
164                    self.catalyst_id_for_public_key.push(
165                        insert_catalyst_id_for_public_key::Params::new(
166                            key,
167                            slot,
168                            catalyst_id.clone(),
169                        ),
170                    );
171                }
172                // Update the chain this registration belongs to.
173                self.registrations.push(insert_rbac509::Params::new(
174                    catalyst_id,
175                    txn_hash,
176                    slot,
177                    index,
178                    previous_transaction,
179                    // Addresses can only be removed from other chains, so this list is always
180                    // empty for the chain that is being updated.
181                    HashSet::new(),
182                    purpose,
183                ));
184
185                // Update other chains that were affected by this registration.
186                for (catalyst_id, removed_addresses) in modified_chains {
187                    self.registrations.push(insert_rbac509::Params::new(
188                        catalyst_id.clone(),
189                        txn_hash,
190                        slot,
191                        index,
192                        // In this case the addresses were removed by another chain, so it doesn't
193                        // make sense to include a previous transaction ID unrelated to the chain
194                        // that is being updated.
195                        None,
196                        removed_addresses,
197                        None,
198                    ));
199                }
200            },
201            // Invalid registrations are being recorded in order to report failure.
202            Err(RbacValidationError::InvalidRegistration {
203                catalyst_id,
204                purpose,
205                report,
206            }) => {
207                inc_invalid_rbac_reg_count();
208                self.invalid.push(insert_rbac509_invalid::Params::new(
209                    catalyst_id,
210                    txn_hash,
211                    slot,
212                    index,
213                    purpose,
214                    previous_transaction,
215                    &report,
216                ));
217            },
218            // This isn't a hard error because user input can contain invalid information. If there
219            // is no Catalyst ID, then we cannot record this registration as invalid and can only
220            // ignore (and log) it.
221            Err(RbacValidationError::UnknownCatalystId) => {
222                debug!(
223                    slot = ?slot,
224                    index = ?index,
225                    txn_hash = ?txn_hash,
226                    "Unable to determine Catalyst id for registration"
227                );
228            },
229            Err(RbacValidationError::Fatal(e)) => {
230                error!(
231                   slot = ?slot,
232                   index = ?index,
233                   txn_hash = ?txn_hash,
234                   err = ?e,
235                   "Error indexing RBAC registration"
236                );
237                // Propagate an error.
238                return Err(e);
239            },
240        }
241
242        Ok(())
243    }
244
245    /// Execute the RBAC 509 Registration Indexing Queries.
246    ///
247    /// Consumes the `self` and returns a vector of futures.
248    pub(crate) fn execute(
249        self,
250        session: &Arc<CassandraSession>,
251    ) -> FallibleQueryTasks {
252        let mut query_handles: FallibleQueryTasks = Vec::new();
253
254        if !self.registrations.is_empty() {
255            let inner_session = session.clone();
256            query_handles.push(tokio::spawn(async move {
257                inner_session
258                    .execute_batch(PreparedQuery::Rbac509InsertQuery, self.registrations)
259                    .await
260            }));
261        }
262
263        if !self.invalid.is_empty() {
264            let inner_session = session.clone();
265            query_handles.push(tokio::spawn(async move {
266                inner_session
267                    .execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, self.invalid)
268                    .await
269            }));
270        }
271
272        if !self.catalyst_id_for_txn_id.is_empty() {
273            let inner_session = session.clone();
274            query_handles.push(tokio::spawn(async move {
275                inner_session
276                    .execute_batch(
277                        PreparedQuery::CatalystIdForTxnIdInsertQuery,
278                        self.catalyst_id_for_txn_id,
279                    )
280                    .await
281            }));
282        }
283
284        if !self.catalyst_id_for_stake_address.is_empty() {
285            let inner_session = session.clone();
286            query_handles.push(tokio::spawn(async move {
287                inner_session
288                    .execute_batch(
289                        PreparedQuery::CatalystIdForStakeAddressInsertQuery,
290                        self.catalyst_id_for_stake_address,
291                    )
292                    .await
293            }));
294        }
295
296        if !self.catalyst_id_for_public_key.is_empty() {
297            let inner_session = session.clone();
298            query_handles.push(tokio::spawn(async move {
299                inner_session
300                    .execute_batch(
301                        PreparedQuery::CatalystIdForPublicKeyInsertQuery,
302                        self.catalyst_id_for_public_key,
303                    )
304                    .await
305            }));
306        }
307
308        query_handles
309    }
310}
311
312/// Waits till all previous blocks are indexed.
313///
314/// The given `our_end` is excluded from the list of unprocessed blocks.
315async fn wait_for_previous_blocks(
316    pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
317    our_end: Slot,
318    current_slot: Slot,
319) -> Result<()> {
320    loop {
321        if pending_blocks
322            .borrow_and_update()
323            .iter()
324            .filter(|&&v| v == our_end)
325            .all(|&slot| slot > current_slot)
326        {
327            return Ok(());
328        }
329
330        inc_index_sync();
331
332        pending_blocks
333            .changed()
334            .await
335            .context("Unprocessed blocks channel was closed unexpectedly")?;
336    }
337}