cat_gateway/rbac/
get_chain.rs

1//! Utilities for obtaining a RBAC registration chain (`RegistrationChain`).
2
3use anyhow::{bail, Context, Result};
4use cardano_chain_follower::{ChainFollower, Network, Point, Slot, TxnIndex};
5use catalyst_types::catalyst_id::CatalystId;
6use futures::{future::try_join, TryFutureExt, TryStreamExt};
7use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
8
9use crate::{
10    db::index::{
11        queries::rbac::get_rbac_registrations::{
12            Query as RbacQuery, QueryParams as RbacQueryParams,
13        },
14        session::CassandraSession,
15    },
16    rbac::{
17        chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain},
18        ChainInfo,
19    },
20    settings::Settings,
21};
22
23/// Returns the latest (including the volatile part) registration chain by the given
24/// Catalyst ID.
25pub async fn latest_rbac_chain(id: &CatalystId) -> Result<Option<ChainInfo>> {
26    let id = id.as_short_id();
27
28    let volatile_session =
29        CassandraSession::get(false).context("Failed to get volatile Cassandra session")?;
30    // Get the persistent part of the chain and volatile registrations. Both of these parts
31    // can be non-existing.
32    let (chain, volatile_regs) = try_join(
33        persistent_rbac_chain(&id),
34        indexed_regs(&volatile_session, &id),
35    )
36    .await?;
37
38    let mut last_persistent_txn = None;
39    let mut last_persistent_slot = 0.into();
40
41    // Either update the persistent chain or build a new one.
42    let chain = match chain {
43        Some(c) => {
44            last_persistent_txn = Some(c.current_tx_id_hash());
45            last_persistent_slot = c.current_point().slot_or_default();
46            Some(apply_regs(c, volatile_regs).await?)
47        },
48        None => build_rbac_chain(volatile_regs).await?,
49    };
50
51    Ok(chain.map(|chain| {
52        let last_txn = Some(chain.current_tx_id_hash());
53        // If the last persistent transaction ID is the same as the last one, then there are no
54        // volatile registrations in this chain.
55        let last_volatile_txn = if last_persistent_txn == last_txn {
56            None
57        } else {
58            last_txn
59        };
60
61        ChainInfo {
62            chain,
63            last_persistent_txn,
64            last_volatile_txn,
65            last_persistent_slot,
66        }
67    }))
68}
69
70/// Returns only the persistent part of a registration chain by the given Catalyst ID.
71pub async fn persistent_rbac_chain(id: &CatalystId) -> Result<Option<RegistrationChain>> {
72    let session = CassandraSession::get(true).context("Failed to get Cassandra session")?;
73
74    let id = id.as_short_id();
75
76    if let Some(chain) = cached_persistent_rbac_chain(&session, &id) {
77        return Ok(Some(chain));
78    }
79
80    let regs = indexed_regs(&session, &id).await?;
81    let chain = build_rbac_chain(regs).await?.inspect(|c| {
82        cache_persistent_rbac_chain(id.clone(), c.clone());
83    });
84    Ok(chain)
85}
86
87/// Queries indexed RBAC registrations from the database.
88async fn indexed_regs(
89    session: &CassandraSession,
90    id: &CatalystId,
91) -> Result<Vec<RbacQuery>> {
92    RbacQuery::execute(session, RbacQueryParams {
93        catalyst_id: id.clone().into(),
94    })
95    .and_then(|r| r.try_collect().map_err(Into::into))
96    .await
97}
98
99/// Builds a chain from the given registrations.
100pub async fn build_rbac_chain(
101    regs: impl IntoIterator<Item = RbacQuery>
102) -> Result<Option<RegistrationChain>> {
103    let mut regs = regs.into_iter();
104    let Some(root) = regs.next() else {
105        return Ok(None);
106    };
107    if !root.removed_stake_addresses.is_empty() {
108        // This set contains addresses that were removed from the chain. It is impossible to
109        // remove an address before the chain was even started.
110        bail!("The root registration shouldn't contain removed stake addresses");
111    }
112    let root = cip509(
113        Settings::cardano_network(),
114        root.slot_no.into(),
115        root.txn_index.into(),
116    )
117    .await?;
118
119    let chain = RegistrationChain::new(root).context("Failed to start registration chain")?;
120    let chain = apply_regs(chain, regs).await?;
121    Ok(Some(chain))
122}
123
124/// Applies the given registration to the given chain.
125pub async fn apply_regs(
126    mut chain: RegistrationChain,
127    regs: impl IntoIterator<Item = RbacQuery>,
128) -> Result<RegistrationChain> {
129    let network = Settings::cardano_network();
130
131    for reg in regs {
132        if !reg.removed_stake_addresses.is_empty() {
133            // TODO: This should be handled as a part of the
134            // https://github.com/input-output-hk/catalyst-voices/issues/2599 task.
135            continue;
136        }
137        let reg = cip509(network, reg.slot_no.into(), reg.txn_index.into()).await?;
138        chain = chain
139            .update(reg)
140            .context("Failed to update registration chain")?;
141    }
142
143    Ok(chain)
144}
145
146/// Loads and parses a `Cip509` registration from a block using chain follower.
147async fn cip509(
148    network: Network,
149    slot: Slot,
150    txn_index: TxnIndex,
151) -> Result<Cip509> {
152    let point = Point::fuzzy(slot);
153    let block = ChainFollower::get_block(network, point)
154        .await
155        .context("Unable to get block")?
156        .data;
157    if block.point().slot_or_default() != slot {
158        // The `ChainFollower::get_block` function can return the next consecutive block if it
159        // cannot find the exact one. This shouldn't happen, but we need to check anyway.
160        bail!(
161            "Unable to find exact {slot:?} block. Found block slot {:?}",
162            block.point().slot_or_default()
163        );
164    }
165    // We perform validation during indexing, so this normally should never fail.
166    Cip509::new(&block, txn_index, &[])
167        .with_context(|| {
168            format!("Invalid RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
169        })?
170        .with_context(|| {
171            format!("No RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
172        })
173}