cat_gateway/rbac/
validation.rs

1//! Utilities for RBAC registrations validation.
2
3use std::collections::{HashMap, HashSet};
4
5use anyhow::{Context, Result};
6use cardano_chain_follower::{hashes::TransactionId, StakeAddress};
7use catalyst_types::{
8    catalyst_id::{role_index::RoleId, CatalystId},
9    problem_report::ProblemReport,
10};
11use ed25519_dalek::VerifyingKey;
12use futures::StreamExt;
13use rbac_registration::{
14    cardano::cip509::{Cip0134UriSet, Cip509},
15    registration::cardano::RegistrationChain,
16};
17
18use crate::{
19    db::index::{
20        queries::rbac::get_catalyst_id_from_stake_address::cache_stake_address,
21        session::CassandraSession,
22    },
23    rbac::{
24        chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain},
25        get_chain::{apply_regs, build_rbac_chain, persistent_rbac_chain},
26        latest_rbac_chain, RbacBlockIndexingContext, RbacValidationError, RbacValidationResult,
27        RbacValidationSuccess,
28    },
29};
30
31/// Validates a new registration by either starting a new chain or adding it to the
32/// existing one.
33///
34/// In case of failure a problem report from the given registration is updated and
35/// returned.
36pub async fn validate_rbac_registration(
37    reg: Cip509,
38    is_persistent: bool,
39    context: &mut RbacBlockIndexingContext,
40) -> RbacValidationResult {
41    match reg.previous_transaction() {
42        // `Box::pin` is used here because of the future size (`clippy::large_futures` lint).
43        Some(previous_txn) => {
44            Box::pin(update_chain(reg, previous_txn, is_persistent, context)).await
45        },
46        None => Box::pin(start_new_chain(reg, is_persistent, context)).await,
47    }
48}
49
50/// Tries to update an existing RBAC chain.
51async fn update_chain(
52    reg: Cip509,
53    previous_txn: TransactionId,
54    is_persistent: bool,
55    context: &mut RbacBlockIndexingContext,
56) -> RbacValidationResult {
57    let purpose = reg.purpose();
58    let report = reg.report().to_owned();
59
60    // Find a chain this registration belongs to.
61    let Some(catalyst_id) = catalyst_id_from_txn_id(previous_txn, is_persistent, context).await?
62    else {
63        // We are unable to determine a Catalyst ID, so there is no sense to update the problem
64        // report because we would be unable to store this registration anyway.
65        return Err(RbacValidationError::UnknownCatalystId);
66    };
67    let chain = chain(&catalyst_id, is_persistent, context).await?
68        .context("{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'")?;
69
70    // Check that addresses from the new registration aren't used in other chains.
71    let previous_addresses = chain.stake_addresses();
72    let reg_addresses = cip509_stake_addresses(&reg);
73    let new_addresses: Vec<_> = reg_addresses.difference(&previous_addresses).collect();
74    for address in &new_addresses {
75        match catalyst_id_from_stake_address(address, is_persistent, context).await? {
76            None => {
77                // All good: the address wasn't used before.
78            },
79            Some(_) => {
80                report.functional_validation(
81                    &format!("{address} stake addresses is already used"),
82                    "It isn't allowed to use same stake address in multiple registration chains",
83                );
84            },
85        }
86    }
87
88    // Store values before consuming the registration.
89    let txn_id = reg.txn_hash();
90    let stake_addresses = cip509_stake_addresses(&reg);
91    let origin = reg.origin().to_owned();
92
93    // Try to add a new registration to the chain.
94    let new_chain = chain.update(reg).ok_or_else(|| {
95        RbacValidationError::InvalidRegistration {
96            catalyst_id: catalyst_id.clone(),
97            purpose,
98            report: report.clone(),
99        }
100    })?;
101
102    // Check that new public keys aren't used by other chains.
103    let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?;
104
105    // Return an error if any issues were recorded in the report.
106    if report.is_problematic() {
107        return Err(RbacValidationError::InvalidRegistration {
108            catalyst_id,
109            purpose,
110            report,
111        });
112    }
113
114    // Everything is fine: update the context.
115    context.insert_transaction(txn_id, catalyst_id.clone());
116    context.insert_addresses(stake_addresses.clone(), &catalyst_id);
117    context.insert_public_keys(public_keys.clone(), &catalyst_id);
118    context.insert_registration(
119        catalyst_id.clone(),
120        txn_id,
121        origin.point().slot_or_default(),
122        origin.txn_index(),
123        Some(previous_txn),
124        // Only a new chain can remove stake addresses from an existing one.
125        HashSet::new(),
126    );
127
128    if is_persistent {
129        cache_persistent_rbac_chain(catalyst_id.clone(), new_chain);
130    }
131
132    Ok(RbacValidationSuccess {
133        catalyst_id,
134        stake_addresses,
135        public_keys,
136        // Only new chains can take ownership of stake addresses of existing chains, so in this case
137        // other chains aren't affected.
138        modified_chains: Vec::new(),
139        purpose,
140    })
141}
142
143/// Tries to start a new RBAC chain.
144async fn start_new_chain(
145    reg: Cip509,
146    is_persistent: bool,
147    context: &mut RbacBlockIndexingContext,
148) -> RbacValidationResult {
149    let catalyst_id = reg.catalyst_id().map(CatalystId::as_short_id);
150    let purpose = reg.purpose();
151    let report = reg.report().to_owned();
152
153    // Try to start a new chain.
154    let new_chain = RegistrationChain::new(reg).ok_or_else(|| {
155        if let Some(catalyst_id) = catalyst_id {
156            RbacValidationError::InvalidRegistration {
157                catalyst_id,
158                purpose,
159                report: report.clone(),
160            }
161        } else {
162            RbacValidationError::UnknownCatalystId
163        }
164    })?;
165
166    // Verify that a Catalyst ID of this chain is unique.
167    let catalyst_id = new_chain.catalyst_id().as_short_id();
168    if is_chain_known(&catalyst_id, is_persistent, context).await? {
169        report.functional_validation(
170            &format!("{catalyst_id} is already used"),
171            "It isn't allowed to use same Catalyst ID (certificate subject public key) in multiple registration chains",
172        );
173        return Err(RbacValidationError::InvalidRegistration {
174            catalyst_id,
175            purpose,
176            report,
177        });
178    }
179
180    // Validate stake addresses.
181    let new_addresses = new_chain.stake_addresses();
182    let mut updated_chains: HashMap<_, HashSet<StakeAddress>> = HashMap::new();
183    for address in &new_addresses {
184        if let Some(id) = catalyst_id_from_stake_address(address, is_persistent, context).await? {
185            // If an address is used in existing chain then a new chain must have different role 0
186            // signing key.
187            let previous_chain = chain(&id, is_persistent, context)
188                .await?
189                .context("{id} is present in 'catalyst_id_for_stake_address', but not in 'rbac_registration'")?;
190            if previous_chain.get_latest_signing_pk_for_role(&RoleId::Role0)
191                == new_chain.get_latest_signing_pk_for_role(&RoleId::Role0)
192            {
193                report.functional_validation(
194                    &format!("A new registration ({catalyst_id}) uses the same public key as the previous one ({})",
195                        previous_chain.catalyst_id().as_short_id()
196                    ),
197                    "It is only allowed to override the existing chain by using different public key",
198                );
199            } else {
200                // The new root registration "takes" an address(es) from the existing chain, so that
201                // chain needs to be updated.
202                updated_chains
203                    .entry(id)
204                    .and_modify(|e| {
205                        e.insert(address.clone());
206                    })
207                    .or_insert([address.clone()].into_iter().collect());
208            }
209        }
210    }
211
212    // Check that new public keys aren't used by other chains.
213    let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?;
214
215    if report.is_problematic() {
216        return Err(RbacValidationError::InvalidRegistration {
217            catalyst_id,
218            purpose,
219            report,
220        });
221    }
222
223    // Everything is fine: update the context.
224    context.insert_transaction(new_chain.current_tx_id_hash(), catalyst_id.clone());
225    // This will also update the addresses that are already present in the context if they
226    // were reassigned to the new chain.
227    context.insert_addresses(new_addresses.clone(), &catalyst_id);
228    context.insert_public_keys(public_keys.clone(), &catalyst_id);
229    context.insert_registration(
230        catalyst_id.clone(),
231        new_chain.current_tx_id_hash(),
232        new_chain.current_point().slot_or_default(),
233        new_chain.current_txn_index(),
234        // No previous transaction for the root registration.
235        None,
236        // This chain has just been created, so no addresses have been removed from it.
237        HashSet::new(),
238    );
239
240    // This cache must be updated because these addresses previously belonged to other chains.
241    for (catalyst_id, addresses) in &updated_chains {
242        for address in addresses {
243            cache_stake_address(is_persistent, address.clone(), catalyst_id.clone());
244        }
245    }
246
247    Ok(RbacValidationSuccess {
248        catalyst_id,
249        stake_addresses: new_addresses,
250        public_keys,
251        modified_chains: updated_chains.into_iter().collect(),
252        purpose,
253    })
254}
255
256/// Returns a Catalyst ID corresponding to the given transaction hash.
257async fn catalyst_id_from_txn_id(
258    txn_id: TransactionId,
259    is_persistent: bool,
260    context: &mut RbacBlockIndexingContext,
261) -> Result<Option<CatalystId>> {
262    use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query;
263
264    // Check the context first.
265    if let Some(catalyst_id) = context.find_transaction(&txn_id) {
266        return Ok(Some(catalyst_id.to_owned()));
267    }
268
269    // Then try to find in the persistent database.
270    let session =
271        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
272    if let Some(id) = Query::get(&session, txn_id).await? {
273        return Ok(Some(id));
274    }
275
276    // Conditionally check the volatile database.
277    if !is_persistent {
278        let session =
279            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
280        return Query::get(&session, txn_id).await;
281    }
282
283    Ok(None)
284}
285
286/// Returns either persistent or "latest" (persistent + volatile) registration chain for
287/// the given Catalyst ID.
288async fn chain(
289    id: &CatalystId,
290    is_persistent: bool,
291    context: &mut RbacBlockIndexingContext,
292) -> Result<Option<RegistrationChain>> {
293    let chain = if is_persistent {
294        persistent_rbac_chain(id).await?
295    } else {
296        latest_rbac_chain(id).await?.map(|i| i.chain)
297    };
298
299    // Apply additional registrations from context if any.
300    if let Some(regs) = context.find_registrations(id) {
301        let regs = regs.iter().cloned();
302        match chain {
303            Some(c) => apply_regs(c, regs).await.map(Some),
304            None => build_rbac_chain(regs).await,
305        }
306    } else {
307        Ok(chain)
308    }
309}
310
311/// Returns a Catalyst ID corresponding to the given stake address.
312async fn catalyst_id_from_stake_address(
313    address: &StakeAddress,
314    is_persistent: bool,
315    context: &mut RbacBlockIndexingContext,
316) -> Result<Option<CatalystId>> {
317    use crate::db::index::queries::rbac::get_catalyst_id_from_stake_address::Query;
318
319    // Check the context first.
320    if let Some(catalyst_id) = context.find_address(address) {
321        return Ok(Some(catalyst_id.to_owned()));
322    }
323
324    // Then try to find in the persistent database.
325    let session =
326        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
327    if let Some(id) = Query::latest(&session, address).await? {
328        return Ok(Some(id));
329    }
330
331    // Conditionally check the volatile database.
332    if !is_persistent {
333        let session =
334            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
335        return Query::latest(&session, address).await;
336    }
337
338    Ok(None)
339}
340
341/// Checks that a new registration doesn't contain a signing key that was used by any
342/// other chain. Returns a list of public keys in the registration.
343async fn validate_public_keys(
344    chain: &RegistrationChain,
345    is_persistent: bool,
346    report: &ProblemReport,
347    context: &mut RbacBlockIndexingContext,
348) -> Result<HashSet<VerifyingKey>> {
349    let mut keys = HashSet::new();
350
351    let roles: Vec<_> = chain.role_data_history().keys().collect();
352    let catalyst_id = chain.catalyst_id().as_short_id();
353
354    for role in roles {
355        if let Some((key, _)) = chain.get_latest_signing_pk_for_role(role) {
356            keys.insert(key);
357            if let Some(previous) = catalyst_id_from_public_key(key, is_persistent, context).await?
358            {
359                if previous != catalyst_id {
360                    report.functional_validation(
361                        &format!("An update to {catalyst_id} registration chain uses the same public key ({key:?}) as {previous} chain"),
362                        "It isn't allowed to use role 0 signing (certificate subject public) key in different chains",
363                    );
364                }
365            }
366        }
367    }
368
369    Ok(keys)
370}
371
372/// Returns a Catalyst ID corresponding to the given public key.
373async fn catalyst_id_from_public_key(
374    key: VerifyingKey,
375    is_persistent: bool,
376    context: &mut RbacBlockIndexingContext,
377) -> Result<Option<CatalystId>> {
378    use crate::db::index::queries::rbac::get_catalyst_id_from_public_key::Query;
379
380    // Check the context first.
381    if let Some(catalyst_id) = context.find_public_key(&key) {
382        return Ok(Some(catalyst_id.to_owned()));
383    }
384
385    // Then try to find in the persistent database.
386    let session =
387        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
388    if let Some(id) = Query::get(&session, key).await? {
389        return Ok(Some(id));
390    }
391
392    // Conditionally check the volatile database.
393    if !is_persistent {
394        let session =
395            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
396        return Query::get(&session, key).await;
397    }
398
399    Ok(None)
400}
401
402/// Returns `true` if a chain with the given Catalyst ID already exists.
403///
404/// This function behaves in the same way as `latest_rbac_chain(...).is_some()` but the
405/// implementation is more optimized because we don't need to build the whole chain.
406pub async fn is_chain_known(
407    id: &CatalystId,
408    is_persistent: bool,
409    context: &mut RbacBlockIndexingContext,
410) -> Result<bool> {
411    if context.find_registrations(id).is_some() {
412        return Ok(true);
413    }
414
415    let session =
416        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
417
418    // We only cache persistent chains, so it is ok to check the cache regardless of the
419    // `is_persistent` parameter value.
420    if cached_persistent_rbac_chain(&session, id).is_some() {
421        return Ok(true);
422    }
423
424    if is_cat_id_known(&session, id).await? {
425        return Ok(true);
426    }
427
428    // Conditionally check the volatile database.
429    if !is_persistent {
430        let session =
431            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
432        if is_cat_id_known(&session, id).await? {
433            return Ok(true);
434        }
435    }
436
437    Ok(false)
438}
439
440/// Returns `true` if there is at least one registration with the given Catalyst ID.
441async fn is_cat_id_known(
442    session: &CassandraSession,
443    id: &CatalystId,
444) -> Result<bool> {
445    use crate::db::index::queries::rbac::get_rbac_registrations::{Query, QueryParams};
446
447    Ok(Query::execute(session, QueryParams {
448        catalyst_id: id.clone().into(),
449    })
450    .await?
451    .next()
452    .await
453    .is_some())
454}
455
456/// Returns a set of stake addresses in the given registration.
457fn cip509_stake_addresses(cip509: &Cip509) -> HashSet<StakeAddress> {
458    cip509
459        .certificate_uris()
460        .map(Cip0134UriSet::stake_addresses)
461        .unwrap_or_default()
462}