cat_gateway/rbac/
validation.rs

1//! Utilities for RBAC registrations validation.
2
3use std::collections::{HashMap, HashSet};
4
5use anyhow::{Context, Result};
6use cardano_chain_follower::{hashes::TransactionId, StakeAddress};
7use catalyst_types::{
8    catalyst_id::{role_index::RoleId, CatalystId},
9    problem_report::ProblemReport,
10};
11use ed25519_dalek::VerifyingKey;
12use futures::StreamExt;
13use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
14
15use crate::{
16    db::index::{
17        queries::rbac::get_catalyst_id_from_stake_address::cache_stake_address,
18        session::CassandraSession,
19    },
20    rbac::{
21        chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain},
22        get_chain::{apply_regs, build_rbac_chain, persistent_rbac_chain},
23        latest_rbac_chain, RbacBlockIndexingContext, RbacValidationError, RbacValidationResult,
24        RbacValidationSuccess,
25    },
26};
27
28/// Validates a new registration by either starting a new chain or adding it to the
29/// existing one.
30///
31/// In case of failure a problem report from the given registration is updated and
32/// returned.
33pub async fn validate_rbac_registration(
34    reg: Cip509,
35    is_persistent: bool,
36    context: &mut RbacBlockIndexingContext,
37) -> RbacValidationResult {
38    match reg.previous_transaction() {
39        // `Box::pin` is used here because of the future size (`clippy::large_futures` lint).
40        Some(previous_txn) => {
41            Box::pin(update_chain(reg, previous_txn, is_persistent, context)).await
42        },
43        None => Box::pin(start_new_chain(reg, is_persistent, context)).await,
44    }
45}
46
47/// Tries to update an existing RBAC chain.
48async fn update_chain(
49    reg: Cip509,
50    previous_txn: TransactionId,
51    is_persistent: bool,
52    context: &mut RbacBlockIndexingContext,
53) -> RbacValidationResult {
54    let purpose = reg.purpose();
55    let report = reg.report().to_owned();
56
57    // Find a chain this registration belongs to.
58    let Some(catalyst_id) = catalyst_id_from_txn_id(previous_txn, is_persistent, context).await?
59    else {
60        // We are unable to determine a Catalyst ID, so there is no sense to update the problem
61        // report because we would be unable to store this registration anyway.
62        return Err(RbacValidationError::UnknownCatalystId);
63    };
64    let chain = chain(&catalyst_id, is_persistent, context).await?
65        .context("{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'")?;
66
67    // Check that addresses from the new registration aren't used in other chains.
68    let previous_addresses = chain.role_0_stake_addresses();
69    let reg_addresses = reg.role_0_stake_addresses();
70    let new_addresses: Vec<_> = reg_addresses.difference(&previous_addresses).collect();
71    for address in &new_addresses {
72        match catalyst_id_from_stake_address(address, is_persistent, context).await? {
73            None => {
74                // All good: the address wasn't used before.
75            },
76            Some(_) => {
77                report.functional_validation(
78                    &format!("{address} stake addresses is already used"),
79                    "It isn't allowed to use same stake address in multiple registration chains",
80                );
81            },
82        }
83    }
84
85    // Store values before consuming the registration.
86    let txn_id = reg.txn_hash();
87    let stake_addresses = reg.role_0_stake_addresses();
88    let origin = reg.origin().to_owned();
89
90    // Try to add a new registration to the chain.
91    let new_chain = chain.update(reg).ok_or_else(|| {
92        RbacValidationError::InvalidRegistration {
93            catalyst_id: catalyst_id.clone(),
94            purpose,
95            report: report.clone(),
96        }
97    })?;
98
99    // Check that new public keys aren't used by other chains.
100    let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?;
101
102    // Return an error if any issues were recorded in the report.
103    if report.is_problematic() {
104        return Err(RbacValidationError::InvalidRegistration {
105            catalyst_id,
106            purpose,
107            report,
108        });
109    }
110
111    // Everything is fine: update the context.
112    context.insert_transaction(txn_id, catalyst_id.clone());
113    context.insert_addresses(stake_addresses.clone(), &catalyst_id);
114    context.insert_public_keys(public_keys.clone(), &catalyst_id);
115    context.insert_registration(
116        catalyst_id.clone(),
117        txn_id,
118        origin.point().slot_or_default(),
119        origin.txn_index(),
120        Some(previous_txn),
121        // Only a new chain can remove stake addresses from an existing one.
122        HashSet::new(),
123    );
124
125    if is_persistent {
126        cache_persistent_rbac_chain(catalyst_id.clone(), new_chain);
127    }
128
129    Ok(RbacValidationSuccess {
130        catalyst_id,
131        stake_addresses,
132        public_keys,
133        // Only new chains can take ownership of stake addresses of existing chains, so in this case
134        // other chains aren't affected.
135        modified_chains: Vec::new(),
136        purpose,
137    })
138}
139
140/// Tries to start a new RBAC chain.
141async fn start_new_chain(
142    reg: Cip509,
143    is_persistent: bool,
144    context: &mut RbacBlockIndexingContext,
145) -> RbacValidationResult {
146    let catalyst_id = reg.catalyst_id().map(CatalystId::as_short_id);
147    let purpose = reg.purpose();
148    let report = reg.report().to_owned();
149
150    // Try to start a new chain.
151    let new_chain = RegistrationChain::new(reg).ok_or_else(|| {
152        if let Some(catalyst_id) = catalyst_id {
153            RbacValidationError::InvalidRegistration {
154                catalyst_id,
155                purpose,
156                report: report.clone(),
157            }
158        } else {
159            RbacValidationError::UnknownCatalystId
160        }
161    })?;
162
163    // Verify that a Catalyst ID of this chain is unique.
164    let catalyst_id = new_chain.catalyst_id().as_short_id();
165    if is_chain_known(&catalyst_id, is_persistent, context).await? {
166        report.functional_validation(
167            &format!("{catalyst_id} is already used"),
168            "It isn't allowed to use same Catalyst ID (certificate subject public key) in multiple registration chains",
169        );
170        return Err(RbacValidationError::InvalidRegistration {
171            catalyst_id,
172            purpose,
173            report,
174        });
175    }
176
177    // Validate stake addresses.
178    let new_addresses = new_chain.role_0_stake_addresses();
179    let mut updated_chains: HashMap<_, HashSet<StakeAddress>> = HashMap::new();
180    for address in &new_addresses {
181        if let Some(id) = catalyst_id_from_stake_address(address, is_persistent, context).await? {
182            // If an address is used in existing chain then a new chain must have different role 0
183            // signing key.
184            let previous_chain = chain(&id, is_persistent, context)
185                .await?
186                .context("{id} is present in 'catalyst_id_for_stake_address', but not in 'rbac_registration'")?;
187            if previous_chain.get_latest_signing_pk_for_role(&RoleId::Role0)
188                == new_chain.get_latest_signing_pk_for_role(&RoleId::Role0)
189            {
190                report.functional_validation(
191                    &format!("A new registration ({catalyst_id}) uses the same public key as the previous one ({})",
192                        previous_chain.catalyst_id().as_short_id()
193                    ),
194                    "It is only allowed to override the existing chain by using different public key",
195                );
196            } else {
197                // The new root registration "takes" an address(es) from the existing chain, so that
198                // chain needs to be updated.
199                updated_chains
200                    .entry(id)
201                    .and_modify(|e| {
202                        e.insert(address.clone());
203                    })
204                    .or_insert([address.clone()].into_iter().collect());
205            }
206        }
207    }
208
209    // Check that new public keys aren't used by other chains.
210    let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?;
211
212    if report.is_problematic() {
213        return Err(RbacValidationError::InvalidRegistration {
214            catalyst_id,
215            purpose,
216            report,
217        });
218    }
219
220    // Everything is fine: update the context.
221    context.insert_transaction(new_chain.current_tx_id_hash(), catalyst_id.clone());
222    // This will also update the addresses that are already present in the context if they
223    // were reassigned to the new chain.
224    context.insert_addresses(new_addresses.clone(), &catalyst_id);
225    context.insert_public_keys(public_keys.clone(), &catalyst_id);
226    context.insert_registration(
227        catalyst_id.clone(),
228        new_chain.current_tx_id_hash(),
229        new_chain.current_point().slot_or_default(),
230        new_chain.current_txn_index(),
231        // No previous transaction for the root registration.
232        None,
233        // This chain has just been created, so no addresses have been removed from it.
234        HashSet::new(),
235    );
236
237    // This cache must be updated because these addresses previously belonged to other chains.
238    for (catalyst_id, addresses) in &updated_chains {
239        for address in addresses {
240            cache_stake_address(is_persistent, address.clone(), catalyst_id.clone());
241        }
242    }
243
244    Ok(RbacValidationSuccess {
245        catalyst_id,
246        stake_addresses: new_addresses,
247        public_keys,
248        modified_chains: updated_chains.into_iter().collect(),
249        purpose,
250    })
251}
252
253/// Returns a Catalyst ID corresponding to the given transaction hash.
254async fn catalyst_id_from_txn_id(
255    txn_id: TransactionId,
256    is_persistent: bool,
257    context: &mut RbacBlockIndexingContext,
258) -> Result<Option<CatalystId>> {
259    use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query;
260
261    // Check the context first.
262    if let Some(catalyst_id) = context.find_transaction(&txn_id) {
263        return Ok(Some(catalyst_id.to_owned()));
264    }
265
266    // Then try to find in the persistent database.
267    let session =
268        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
269    if let Some(id) = Query::get(&session, txn_id).await? {
270        return Ok(Some(id));
271    }
272
273    // Conditionally check the volatile database.
274    if !is_persistent {
275        let session =
276            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
277        return Query::get(&session, txn_id).await;
278    }
279
280    Ok(None)
281}
282
283/// Returns either persistent or "latest" (persistent + volatile) registration chain for
284/// the given Catalyst ID.
285async fn chain(
286    id: &CatalystId,
287    is_persistent: bool,
288    context: &mut RbacBlockIndexingContext,
289) -> Result<Option<RegistrationChain>> {
290    let chain = if is_persistent {
291        persistent_rbac_chain(id).await?
292    } else {
293        latest_rbac_chain(id).await?.map(|i| i.chain)
294    };
295
296    // Apply additional registrations from context if any.
297    if let Some(regs) = context.find_registrations(id) {
298        let regs = regs.iter().cloned();
299        match chain {
300            Some(c) => apply_regs(c, regs).await.map(Some),
301            None => build_rbac_chain(regs).await,
302        }
303    } else {
304        Ok(chain)
305    }
306}
307
308/// Returns a Catalyst ID corresponding to the given stake address.
309async fn catalyst_id_from_stake_address(
310    address: &StakeAddress,
311    is_persistent: bool,
312    context: &mut RbacBlockIndexingContext,
313) -> Result<Option<CatalystId>> {
314    use crate::db::index::queries::rbac::get_catalyst_id_from_stake_address::Query;
315
316    // Check the context first.
317    if let Some(catalyst_id) = context.find_address(address) {
318        return Ok(Some(catalyst_id.to_owned()));
319    }
320
321    // Then try to find in the persistent database.
322    let session =
323        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
324    if let Some(id) = Query::latest(&session, address).await? {
325        return Ok(Some(id));
326    }
327
328    // Conditionally check the volatile database.
329    if !is_persistent {
330        let session =
331            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
332        return Query::latest(&session, address).await;
333    }
334
335    Ok(None)
336}
337
338/// Checks that a new registration doesn't contain a signing key that was used by any
339/// other chain. Returns a list of public keys in the registration.
340async fn validate_public_keys(
341    chain: &RegistrationChain,
342    is_persistent: bool,
343    report: &ProblemReport,
344    context: &mut RbacBlockIndexingContext,
345) -> Result<HashSet<VerifyingKey>> {
346    let mut keys = HashSet::new();
347
348    let roles: Vec<_> = chain.role_data_history().keys().collect();
349    let catalyst_id = chain.catalyst_id().as_short_id();
350
351    for role in roles {
352        if let Some((key, _)) = chain.get_latest_signing_pk_for_role(role) {
353            keys.insert(key);
354            if let Some(previous) = catalyst_id_from_public_key(key, is_persistent, context).await?
355            {
356                if previous != catalyst_id {
357                    report.functional_validation(
358                        &format!("An update to {catalyst_id} registration chain uses the same public key ({key:?}) as {previous} chain"),
359                        "It isn't allowed to use role 0 signing (certificate subject public) key in different chains",
360                    );
361                }
362            }
363        }
364    }
365
366    Ok(keys)
367}
368
369/// Returns a Catalyst ID corresponding to the given public key.
370async fn catalyst_id_from_public_key(
371    key: VerifyingKey,
372    is_persistent: bool,
373    context: &mut RbacBlockIndexingContext,
374) -> Result<Option<CatalystId>> {
375    use crate::db::index::queries::rbac::get_catalyst_id_from_public_key::Query;
376
377    // Check the context first.
378    if let Some(catalyst_id) = context.find_public_key(&key) {
379        return Ok(Some(catalyst_id.to_owned()));
380    }
381
382    // Then try to find in the persistent database.
383    let session =
384        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
385    if let Some(id) = Query::get(&session, key).await? {
386        return Ok(Some(id));
387    }
388
389    // Conditionally check the volatile database.
390    if !is_persistent {
391        let session =
392            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
393        return Query::get(&session, key).await;
394    }
395
396    Ok(None)
397}
398
399/// Returns `true` if a chain with the given Catalyst ID already exists.
400///
401/// This function behaves in the same way as `latest_rbac_chain(...).is_some()` but the
402/// implementation is more optimized because we don't need to build the whole chain.
403pub async fn is_chain_known(
404    id: &CatalystId,
405    is_persistent: bool,
406    context: &mut RbacBlockIndexingContext,
407) -> Result<bool> {
408    if context.find_registrations(id).is_some() {
409        return Ok(true);
410    }
411
412    let session =
413        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
414
415    // We only cache persistent chains, so it is ok to check the cache regardless of the
416    // `is_persistent` parameter value.
417    if cached_persistent_rbac_chain(&session, id).is_some() {
418        return Ok(true);
419    }
420
421    if is_cat_id_known(&session, id).await? {
422        return Ok(true);
423    }
424
425    // Conditionally check the volatile database.
426    if !is_persistent {
427        let session =
428            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
429        if is_cat_id_known(&session, id).await? {
430            return Ok(true);
431        }
432    }
433
434    Ok(false)
435}
436
437/// Returns `true` if there is at least one registration with the given Catalyst ID.
438async fn is_cat_id_known(
439    session: &CassandraSession,
440    id: &CatalystId,
441) -> Result<bool> {
442    use crate::db::index::queries::rbac::get_rbac_registrations::{Query, QueryParams};
443
444    Ok(Query::execute(session, QueryParams {
445        catalyst_id: id.clone().into(),
446    })
447    .await?
448    .next()
449    .await
450    .is_some())
451}