cat_gateway/rbac/
get_chain.rs

1//! Utilities for obtaining a RBAC registration chain (`RegistrationChain`).
2
3use anyhow::{bail, Context, Result};
4use cardano_chain_follower::{ChainFollower, Network, Point, Slot, StakeAddress, TxnIndex};
5use catalyst_types::catalyst_id::CatalystId;
6use futures::{future::try_join, TryFutureExt, TryStreamExt};
7use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
8
9use crate::{
10    db::index::{
11        queries::rbac::{
12            get_catalyst_id_from_stake_address::Query as CatalystIdQuery,
13            get_rbac_registrations::{Query as RbacQuery, QueryParams as RbacQueryParams},
14        },
15        session::CassandraSession,
16    },
17    rbac::{
18        chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain},
19        ChainInfo,
20    },
21    settings::Settings,
22};
23
24/// Returns the latest (including the volatile part) registration chain by the given
25/// Catalyst ID.
26pub async fn latest_rbac_chain(id: &CatalystId) -> Result<Option<ChainInfo>> {
27    let id = id.as_short_id();
28
29    let volatile_session =
30        CassandraSession::get(false).context("Failed to get volatile Cassandra session")?;
31    // Get the persistent part of the chain and volatile registrations. Both of these parts
32    // can be non-existing.
33    let (chain, volatile_regs) = try_join(
34        persistent_rbac_chain(&id),
35        indexed_regs(&volatile_session, &id),
36    )
37    .await?;
38
39    let mut last_persistent_txn = None;
40    let mut last_persistent_slot = 0.into();
41
42    // Either update the persistent chain or build a new one.
43    let chain = match chain {
44        Some(c) => {
45            last_persistent_txn = Some(c.current_tx_id_hash());
46            last_persistent_slot = c.current_point().slot_or_default();
47            Some(apply_regs(c, volatile_regs).await?)
48        },
49        None => build_rbac_chain(volatile_regs).await?,
50    };
51
52    Ok(chain.map(|chain| {
53        let last_txn = Some(chain.current_tx_id_hash());
54        // If the last persistent transaction ID is the same as the last one, then there are no
55        // volatile registrations in this chain.
56        let last_volatile_txn = if last_persistent_txn == last_txn {
57            None
58        } else {
59            last_txn
60        };
61
62        ChainInfo {
63            chain,
64            last_persistent_txn,
65            last_volatile_txn,
66            last_persistent_slot,
67        }
68    }))
69}
70
71/// Returns the latest (including the volatile part) registration chain by the given stake
72/// address.
73pub async fn latest_rbac_chain_by_address(address: &StakeAddress) -> Result<Option<ChainInfo>> {
74    let persistent_session =
75        CassandraSession::get(true).context("Failed to get persistent Cassandra session")?;
76    let volatile_session =
77        CassandraSession::get(false).context("Failed to get volatile Cassandra session")?;
78
79    // We always check the latest (volatile) data first.
80    let id = match CatalystIdQuery::latest(&volatile_session, address).await? {
81        Some(id) => id,
82        None => {
83            match CatalystIdQuery::latest(&persistent_session, address).await? {
84                Some(id) => id,
85                None => return Ok(None),
86            }
87        },
88    };
89
90    latest_rbac_chain(&id).await
91}
92
93/// Returns only the persistent part of a registration chain by the given Catalyst ID.
94pub async fn persistent_rbac_chain(id: &CatalystId) -> Result<Option<RegistrationChain>> {
95    let session = CassandraSession::get(true).context("Failed to get Cassandra session")?;
96
97    let id = id.as_short_id();
98
99    if let Some(chain) = cached_persistent_rbac_chain(&session, &id) {
100        return Ok(Some(chain));
101    }
102
103    let regs = indexed_regs(&session, &id).await?;
104    let chain = build_rbac_chain(regs).await?.inspect(|c| {
105        cache_persistent_rbac_chain(id.clone(), c.clone());
106    });
107    Ok(chain)
108}
109
110/// Queries indexed RBAC registrations from the database.
111async fn indexed_regs(
112    session: &CassandraSession,
113    id: &CatalystId,
114) -> Result<Vec<RbacQuery>> {
115    RbacQuery::execute(session, RbacQueryParams {
116        catalyst_id: id.clone().into(),
117    })
118    .and_then(|r| r.try_collect().map_err(Into::into))
119    .await
120}
121
122/// Builds a chain from the given registrations.
123pub async fn build_rbac_chain(
124    regs: impl IntoIterator<Item = RbacQuery>
125) -> Result<Option<RegistrationChain>> {
126    let mut regs = regs.into_iter();
127    let Some(root) = regs.next() else {
128        return Ok(None);
129    };
130    if !root.removed_stake_addresses.is_empty() {
131        // This set contains addresses that were removed from the chain. It is impossible to
132        // remove an address before the chain was even started.
133        bail!("The root registration shouldn't contain removed stake addresses");
134    }
135    let root = cip509(
136        Settings::cardano_network(),
137        root.slot_no.into(),
138        root.txn_index.into(),
139    )
140    .await?;
141
142    let chain = RegistrationChain::new(root).context("Failed to start registration chain")?;
143    let chain = apply_regs(chain, regs).await?;
144    Ok(Some(chain))
145}
146
147/// Applies the given registration to the given chain.
148pub async fn apply_regs(
149    mut chain: RegistrationChain,
150    regs: impl IntoIterator<Item = RbacQuery>,
151) -> Result<RegistrationChain> {
152    let network = Settings::cardano_network();
153
154    for reg in regs {
155        if !reg.removed_stake_addresses.is_empty() {
156            // TODO: This should be handled as a part of the
157            // https://github.com/input-output-hk/catalyst-voices/issues/2599 task.
158            continue;
159        }
160        let reg = cip509(network, reg.slot_no.into(), reg.txn_index.into()).await?;
161        chain = chain
162            .update(reg)
163            .context("Failed to update registration chain")?;
164    }
165
166    Ok(chain)
167}
168
169/// Loads and parses a `Cip509` registration from a block using chain follower.
170async fn cip509(
171    network: Network,
172    slot: Slot,
173    txn_index: TxnIndex,
174) -> Result<Cip509> {
175    let point = Point::fuzzy(slot);
176    let block = ChainFollower::get_block(network, point)
177        .await
178        .context("Unable to get block")?
179        .data;
180    if block.point().slot_or_default() != slot {
181        // The `ChainFollower::get_block` function can return the next consecutive block if it
182        // cannot find the exact one. This shouldn't happen, but we need to check anyway.
183        bail!(
184            "Unable to find exact {slot:?} block. Found block slot {:?}",
185            block.point().slot_or_default()
186        );
187    }
188    // We perform validation during indexing, so this normally should never fail.
189    Cip509::new(&block, txn_index, &[])
190        .with_context(|| {
191            format!("Invalid RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
192        })?
193        .with_context(|| {
194            format!("No RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
195        })
196}