cat_gateway/rbac/
validation.rs

1//! Utilities for RBAC registrations validation.
2
3use std::collections::{HashMap, HashSet};
4
5use anyhow::{Context, Result};
6use cardano_chain_follower::{hashes::TransactionId, StakeAddress};
7use catalyst_types::{
8    catalyst_id::{role_index::RoleId, CatalystId},
9    problem_report::ProblemReport,
10};
11use ed25519_dalek::VerifyingKey;
12use futures::StreamExt;
13use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
14
15use crate::{
16    db::index::{
17        queries::rbac::get_catalyst_id_from_stake_address::cache_stake_address,
18        session::CassandraSession,
19    },
20    rbac::{
21        chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain},
22        get_chain::{apply_regs, build_rbac_chain, persistent_rbac_chain},
23        latest_rbac_chain, RbacBlockIndexingContext, RbacValidationError, RbacValidationResult,
24        RbacValidationSuccess,
25    },
26};
27
28/// Validates a new registration by either starting a new chain or adding it to the
29/// existing one.
30///
31/// In case of failure a problem report from the given registration is updated and
32/// returned.
33pub async fn validate_rbac_registration(
34    reg: Cip509, is_persistent: bool, context: &mut RbacBlockIndexingContext,
35) -> RbacValidationResult {
36    match reg.previous_transaction() {
37        // `Box::pin` is used here because of the future size (`clippy::large_futures` lint).
38        Some(previous_txn) => {
39            Box::pin(update_chain(reg, previous_txn, is_persistent, context)).await
40        },
41        None => Box::pin(start_new_chain(reg, is_persistent, context)).await,
42    }
43}
44
45/// Tries to update an existing RBAC chain.
46async fn update_chain(
47    reg: Cip509, previous_txn: TransactionId, is_persistent: bool,
48    context: &mut RbacBlockIndexingContext,
49) -> RbacValidationResult {
50    let purpose = reg.purpose();
51    let report = reg.report().to_owned();
52
53    // Find a chain this registration belongs to.
54    let Some(catalyst_id) = catalyst_id_from_txn_id(previous_txn, is_persistent, context).await?
55    else {
56        // We are unable to determine a Catalyst ID, so there is no sense to update the problem
57        // report because we would be unable to store this registration anyway.
58        return Err(RbacValidationError::UnknownCatalystId);
59    };
60    let chain = chain(&catalyst_id, is_persistent, context).await?
61        .context("{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'")?;
62
63    // Check that addresses from the new registration aren't used in other chains.
64    let previous_addresses = chain.role_0_stake_addresses();
65    let reg_addresses = reg.role_0_stake_addresses();
66    let new_addresses: Vec<_> = reg_addresses.difference(&previous_addresses).collect();
67    for address in &new_addresses {
68        match catalyst_id_from_stake_address(address, is_persistent, context).await? {
69            None => {
70                // All good: the address wasn't used before.
71            },
72            Some(_) => {
73                report.functional_validation(
74                    &format!("{address} stake addresses is already used"),
75                    "It isn't allowed to use same stake address in multiple registration chains",
76                );
77            },
78        }
79    }
80
81    // Store values before consuming the registration.
82    let txn_id = reg.txn_hash();
83    let stake_addresses = reg.role_0_stake_addresses();
84    let origin = reg.origin().to_owned();
85
86    // Try to add a new registration to the chain.
87    let new_chain = chain.update(reg).ok_or_else(|| {
88        RbacValidationError::InvalidRegistration {
89            catalyst_id: catalyst_id.clone(),
90            purpose,
91            report: report.clone(),
92        }
93    })?;
94
95    // Check that new public keys aren't used by other chains.
96    let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?;
97
98    // Return an error if any issues were recorded in the report.
99    if report.is_problematic() {
100        return Err(RbacValidationError::InvalidRegistration {
101            catalyst_id,
102            purpose,
103            report,
104        });
105    }
106
107    // Everything is fine: update the context.
108    context.insert_transaction(txn_id, catalyst_id.clone());
109    context.insert_addresses(stake_addresses.clone(), &catalyst_id);
110    context.insert_public_keys(public_keys.clone(), &catalyst_id);
111    context.insert_registration(
112        catalyst_id.clone(),
113        txn_id,
114        origin.point().slot_or_default(),
115        origin.txn_index(),
116        Some(previous_txn),
117        // Only a new chain can remove stake addresses from an existing one.
118        HashSet::new(),
119    );
120
121    if is_persistent {
122        cache_persistent_rbac_chain(catalyst_id.clone(), new_chain);
123    }
124
125    Ok(RbacValidationSuccess {
126        catalyst_id,
127        stake_addresses,
128        public_keys,
129        // Only new chains can take ownership of stake addresses of existing chains, so in this case
130        // other chains aren't affected.
131        modified_chains: Vec::new(),
132        purpose,
133    })
134}
135
136/// Tries to start a new RBAC chain.
137async fn start_new_chain(
138    reg: Cip509, is_persistent: bool, context: &mut RbacBlockIndexingContext,
139) -> RbacValidationResult {
140    let catalyst_id = reg.catalyst_id().map(CatalystId::as_short_id);
141    let purpose = reg.purpose();
142    let report = reg.report().to_owned();
143
144    // Try to start a new chain.
145    let new_chain = RegistrationChain::new(reg).ok_or_else(|| {
146        if let Some(catalyst_id) = catalyst_id {
147            RbacValidationError::InvalidRegistration {
148                catalyst_id,
149                purpose,
150                report: report.clone(),
151            }
152        } else {
153            RbacValidationError::UnknownCatalystId
154        }
155    })?;
156
157    // Verify that a Catalyst ID of this chain is unique.
158    let catalyst_id = new_chain.catalyst_id().as_short_id();
159    if is_chain_known(&catalyst_id, is_persistent, context).await? {
160        report.functional_validation(
161            &format!("{catalyst_id} is already used"),
162            "It isn't allowed to use same Catalyst ID (certificate subject public key) in multiple registration chains",
163        );
164        return Err(RbacValidationError::InvalidRegistration {
165            catalyst_id,
166            purpose,
167            report,
168        });
169    }
170
171    // Validate stake addresses.
172    let new_addresses = new_chain.role_0_stake_addresses();
173    let mut updated_chains: HashMap<_, HashSet<StakeAddress>> = HashMap::new();
174    for address in &new_addresses {
175        if let Some(id) = catalyst_id_from_stake_address(address, is_persistent, context).await? {
176            // If an address is used in existing chain then a new chain must have different role 0
177            // signing key.
178            let previous_chain = chain(&id, is_persistent, context)
179                .await?
180                .context("{id} is present in 'catalyst_id_for_stake_address', but not in 'rbac_registration'")?;
181            if previous_chain.get_latest_signing_pk_for_role(&RoleId::Role0)
182                == new_chain.get_latest_signing_pk_for_role(&RoleId::Role0)
183            {
184                report.functional_validation(
185                    &format!("A new registration ({catalyst_id}) uses the same public key as the previous one ({})",
186                        previous_chain.catalyst_id().as_short_id()
187                    ),
188                    "It is only allowed to override the existing chain by using different public key",
189                );
190            } else {
191                // The new root registration "takes" an address(es) from the existing chain, so that
192                // chain needs to be updated.
193                updated_chains
194                    .entry(id)
195                    .and_modify(|e| {
196                        e.insert(address.clone());
197                    })
198                    .or_insert([address.clone()].into_iter().collect());
199            }
200        }
201    }
202
203    // Check that new public keys aren't used by other chains.
204    let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?;
205
206    if report.is_problematic() {
207        return Err(RbacValidationError::InvalidRegistration {
208            catalyst_id,
209            purpose,
210            report,
211        });
212    }
213
214    // Everything is fine: update the context.
215    context.insert_transaction(new_chain.current_tx_id_hash(), catalyst_id.clone());
216    // This will also update the addresses that are already present in the context if they
217    // were reassigned to the new chain.
218    context.insert_addresses(new_addresses.clone(), &catalyst_id);
219    context.insert_public_keys(public_keys.clone(), &catalyst_id);
220    context.insert_registration(
221        catalyst_id.clone(),
222        new_chain.current_tx_id_hash(),
223        new_chain.current_point().slot_or_default(),
224        new_chain.current_txn_index(),
225        // No previous transaction for the root registration.
226        None,
227        // This chain has just been created, so no addresses have been removed from it.
228        HashSet::new(),
229    );
230
231    // This cache must be updated because these addresses previously belonged to other chains.
232    for (catalyst_id, addresses) in &updated_chains {
233        for address in addresses {
234            cache_stake_address(is_persistent, address.clone(), catalyst_id.clone());
235        }
236    }
237
238    Ok(RbacValidationSuccess {
239        catalyst_id,
240        stake_addresses: new_addresses,
241        public_keys,
242        modified_chains: updated_chains.into_iter().collect(),
243        purpose,
244    })
245}
246
247/// Returns a Catalyst ID corresponding to the given transaction hash.
248async fn catalyst_id_from_txn_id(
249    txn_id: TransactionId, is_persistent: bool, context: &mut RbacBlockIndexingContext,
250) -> Result<Option<CatalystId>> {
251    use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query;
252
253    // Check the context first.
254    if let Some(catalyst_id) = context.find_transaction(&txn_id) {
255        return Ok(Some(catalyst_id.to_owned()));
256    }
257
258    // Then try to find in the persistent database.
259    let session =
260        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
261    if let Some(id) = Query::get(&session, txn_id).await? {
262        return Ok(Some(id));
263    };
264
265    // Conditionally check the volatile database.
266    if !is_persistent {
267        let session =
268            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
269        return Query::get(&session, txn_id).await;
270    }
271
272    Ok(None)
273}
274
275/// Returns either persistent or "latest" (persistent + volatile) registration chain for
276/// the given Catalyst ID.
277async fn chain(
278    id: &CatalystId, is_persistent: bool, context: &mut RbacBlockIndexingContext,
279) -> Result<Option<RegistrationChain>> {
280    let chain = if is_persistent {
281        persistent_rbac_chain(id).await?
282    } else {
283        latest_rbac_chain(id).await?.map(|i| i.chain)
284    };
285
286    // Apply additional registrations from context if any.
287    if let Some(regs) = context.find_registrations(id) {
288        let regs = regs.iter().cloned();
289        match chain {
290            Some(c) => apply_regs(c, regs).await.map(Some),
291            None => build_rbac_chain(regs).await,
292        }
293    } else {
294        Ok(chain)
295    }
296}
297
298/// Returns a Catalyst ID corresponding to the given stake address.
299async fn catalyst_id_from_stake_address(
300    address: &StakeAddress, is_persistent: bool, context: &mut RbacBlockIndexingContext,
301) -> Result<Option<CatalystId>> {
302    use crate::db::index::queries::rbac::get_catalyst_id_from_stake_address::Query;
303
304    // Check the context first.
305    if let Some(catalyst_id) = context.find_address(address) {
306        return Ok(Some(catalyst_id.to_owned()));
307    }
308
309    // Then try to find in the persistent database.
310    let session =
311        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
312    if let Some(id) = Query::latest(&session, address).await? {
313        return Ok(Some(id));
314    };
315
316    // Conditionally check the volatile database.
317    if !is_persistent {
318        let session =
319            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
320        return Query::latest(&session, address).await;
321    }
322
323    Ok(None)
324}
325
326/// Checks that a new registration doesn't contain a signing key that was used by any
327/// other chain. Returns a list of public keys in the registration.
328async fn validate_public_keys(
329    chain: &RegistrationChain, is_persistent: bool, report: &ProblemReport,
330    context: &mut RbacBlockIndexingContext,
331) -> Result<HashSet<VerifyingKey>> {
332    let mut keys = HashSet::new();
333
334    let roles: Vec<_> = chain.role_data_history().keys().collect();
335    let catalyst_id = chain.catalyst_id().as_short_id();
336
337    for role in roles {
338        if let Some((key, _)) = chain.get_latest_signing_pk_for_role(role) {
339            keys.insert(key);
340            if let Some(previous) = catalyst_id_from_public_key(key, is_persistent, context).await?
341            {
342                if previous != catalyst_id {
343                    report.functional_validation(
344                        &format!("An update to {catalyst_id} registration chain uses the same public key ({key:?}) as {previous} chain"),
345                        "It isn't allowed to use role 0 signing (certificate subject public) key in different chains",
346                    );
347                }
348            }
349        }
350    }
351
352    Ok(keys)
353}
354
355/// Returns a Catalyst ID corresponding to the given public key.
356async fn catalyst_id_from_public_key(
357    key: VerifyingKey, is_persistent: bool, context: &mut RbacBlockIndexingContext,
358) -> Result<Option<CatalystId>> {
359    use crate::db::index::queries::rbac::get_catalyst_id_from_public_key::Query;
360
361    // Check the context first.
362    if let Some(catalyst_id) = context.find_public_key(&key) {
363        return Ok(Some(catalyst_id.to_owned()));
364    }
365
366    // Then try to find in the persistent database.
367    let session =
368        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
369    if let Some(id) = Query::get(&session, key).await? {
370        return Ok(Some(id));
371    };
372
373    // Conditionally check the volatile database.
374    if !is_persistent {
375        let session =
376            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
377        return Query::get(&session, key).await;
378    }
379
380    Ok(None)
381}
382
383/// Returns `true` if a chain with the given Catalyst ID already exists.
384///
385/// This function behaves in the same way as `latest_rbac_chain(...).is_some()` but the
386/// implementation is more optimized because we don't need to build the whole chain.
387pub async fn is_chain_known(
388    id: &CatalystId, is_persistent: bool, context: &mut RbacBlockIndexingContext,
389) -> Result<bool> {
390    if context.find_registrations(id).is_some() {
391        return Ok(true);
392    }
393
394    let session =
395        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
396
397    // We only cache persistent chains, so it is ok to check the cache regardless of the
398    // `is_persistent` parameter value.
399    if cached_persistent_rbac_chain(&session, id).is_some() {
400        return Ok(true);
401    }
402
403    if is_cat_id_known(&session, id).await? {
404        return Ok(true);
405    }
406
407    // Conditionally check the volatile database.
408    if !is_persistent {
409        let session =
410            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
411        if is_cat_id_known(&session, id).await? {
412            return Ok(true);
413        }
414    }
415
416    Ok(false)
417}
418
419/// Returns `true` if there is at least one registration with the given Catalyst ID.
420async fn is_cat_id_known(session: &CassandraSession, id: &CatalystId) -> Result<bool> {
421    use crate::db::index::queries::rbac::get_rbac_registrations::{Query, QueryParams};
422
423    Ok(Query::execute(session, QueryParams {
424        catalyst_id: id.clone().into(),
425    })
426    .await?
427    .next()
428    .await
429    .is_some())
430}