cat_gateway/rbac/
get_chain.rs

1//! Utilities for obtaining a RBAC registration chain (`RegistrationChain`).
2
3use anyhow::{Context, Result, bail};
4use cardano_chain_follower::{ChainFollower, Network, Point, Slot, TxnIndex};
5use catalyst_types::catalyst_id::CatalystId;
6use futures::{TryFutureExt, TryStreamExt, future::try_join};
7use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
8
9use crate::{
10    db::index::{
11        queries::rbac::get_rbac_registrations::{
12            Query as RbacQuery, QueryParams as RbacQueryParams,
13        },
14        session::CassandraSession,
15    },
16    rbac::{
17        ChainInfo,
18        chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain},
19    },
20    settings::Settings,
21};
22
23/// Returns the latest (including the volatile part) registration chain by the given
24/// Catalyst ID.
25pub async fn latest_rbac_chain(id: &CatalystId) -> Result<Option<ChainInfo>> {
26    let volatile_session =
27        CassandraSession::get(false).context("Failed to get volatile Cassandra session")?;
28    // Get the persistent part of the chain and volatile registrations. Both of these parts
29    // can be non-existing.
30    let (chain, volatile_regs) = try_join(
31        persistent_rbac_chain(id),
32        indexed_regs(&volatile_session, id),
33    )
34    .await?;
35
36    let mut last_persistent_txn = None;
37    let mut last_persistent_slot = 0.into();
38
39    // Either update the persistent chain or build a new one.
40    let chain = match chain {
41        Some(c) => {
42            last_persistent_txn = Some(c.current_tx_id_hash());
43            last_persistent_slot = c.current_point().slot_or_default();
44            Some(apply_regs(c, volatile_regs).await?)
45        },
46        None => build_rbac_chain(volatile_regs).await?,
47    };
48
49    Ok(chain.map(|chain| {
50        let last_txn = Some(chain.current_tx_id_hash());
51        // If the last persistent transaction ID is the same as the last one, then there are no
52        // volatile registrations in this chain.
53        let last_volatile_txn = if last_persistent_txn == last_txn {
54            None
55        } else {
56            last_txn
57        };
58
59        ChainInfo {
60            chain,
61            last_persistent_txn,
62            last_volatile_txn,
63            last_persistent_slot,
64        }
65    }))
66}
67
68/// Returns only the persistent part of a registration chain by the given Catalyst ID.
69pub async fn persistent_rbac_chain(id: &CatalystId) -> Result<Option<RegistrationChain>> {
70    let session = CassandraSession::get(true).context("Failed to get Cassandra session")?;
71    if let Some(chain) = cached_persistent_rbac_chain(&session, id) {
72        return Ok(Some(chain));
73    }
74
75    let regs = indexed_regs(&session, id).await?;
76    let chain = build_rbac_chain(regs).await?.inspect(|c| {
77        cache_persistent_rbac_chain(id.clone(), c.clone());
78    });
79    Ok(chain)
80}
81
82/// Queries indexed RBAC registrations from the database.
83async fn indexed_regs(
84    session: &CassandraSession,
85    id: &CatalystId,
86) -> Result<Vec<RbacQuery>> {
87    RbacQuery::execute(session, RbacQueryParams {
88        catalyst_id: id.clone().into(),
89    })
90    .and_then(|r| r.try_collect().map_err(Into::into))
91    .await
92}
93
94/// Builds a chain from the given registrations.
95pub async fn build_rbac_chain(
96    regs: impl IntoIterator<Item = RbacQuery>
97) -> Result<Option<RegistrationChain>> {
98    let mut regs = regs.into_iter();
99    let Some(root) = regs.next() else {
100        return Ok(None);
101    };
102    if !root.removed_stake_addresses.is_empty() {
103        // This set contains addresses that were removed from the chain. It is impossible to
104        // remove an address before the chain was even started.
105        bail!("The root registration shouldn't contain removed stake addresses");
106    }
107    let root = cip509(
108        Settings::cardano_network(),
109        root.slot_no.into(),
110        root.txn_index.into(),
111    )
112    .await?;
113
114    let chain = RegistrationChain::new(root).context("Failed to start registration chain")?;
115    let chain = apply_regs(chain, regs).await?;
116    Ok(Some(chain))
117}
118
119/// Applies the given registration to the given chain.
120pub async fn apply_regs(
121    mut chain: RegistrationChain,
122    regs: impl IntoIterator<Item = RbacQuery>,
123) -> Result<RegistrationChain> {
124    let network = Settings::cardano_network();
125
126    for reg in regs {
127        if !reg.removed_stake_addresses.is_empty() {
128            // TODO: This should be handled as a part of the
129            // https://github.com/input-output-hk/catalyst-voices/issues/3464 task.
130            continue;
131        }
132        let reg = cip509(network, reg.slot_no.into(), reg.txn_index.into()).await?;
133        chain = chain
134            .update(reg)
135            .context("Failed to update registration chain")?;
136    }
137
138    Ok(chain)
139}
140
141/// Loads and parses a `Cip509` registration from a block using chain follower.
142async fn cip509(
143    network: &Network,
144    slot: Slot,
145    txn_index: TxnIndex,
146) -> Result<Cip509> {
147    let point = Point::fuzzy(slot);
148    let block = ChainFollower::get_block(network, point)
149        .await
150        .context("Unable to get block")?
151        .data;
152    if block.point().slot_or_default() != slot {
153        // The `ChainFollower::get_block` function can return the next consecutive block if it
154        // cannot find the exact one. This shouldn't happen, but we need to check anyway.
155        bail!(
156            "Unable to find exact {slot:?} block. Found block slot {:?}",
157            block.point().slot_or_default()
158        );
159    }
160    // We perform validation during indexing, so this normally should never fail.
161    Cip509::new(&block, txn_index, &[])
162        .with_context(|| {
163            format!("Invalid RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
164        })?
165        .with_context(|| {
166            format!("No RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
167        })
168}