cat_gateway/rbac/
get_chain.rs1use anyhow::{bail, Context, Result};
4use cardano_chain_follower::{ChainFollower, Network, Point, Slot, TxnIndex};
5use catalyst_types::catalyst_id::CatalystId;
6use futures::{future::try_join, TryFutureExt, TryStreamExt};
7use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
8
9use crate::{
10 db::index::{
11 queries::rbac::get_rbac_registrations::{
12 Query as RbacQuery, QueryParams as RbacQueryParams,
13 },
14 session::CassandraSession,
15 },
16 rbac::{
17 chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain},
18 ChainInfo,
19 },
20 settings::Settings,
21};
22
23pub async fn latest_rbac_chain(id: &CatalystId) -> Result<Option<ChainInfo>> {
26 let id = id.as_short_id();
27
28 let volatile_session =
29 CassandraSession::get(false).context("Failed to get volatile Cassandra session")?;
30 let (chain, volatile_regs) = try_join(
33 persistent_rbac_chain(&id),
34 indexed_regs(&volatile_session, &id),
35 )
36 .await?;
37
38 let mut last_persistent_txn = None;
39 let mut last_persistent_slot = 0.into();
40
41 let chain = match chain {
43 Some(c) => {
44 last_persistent_txn = Some(c.current_tx_id_hash());
45 last_persistent_slot = c.current_point().slot_or_default();
46 Some(apply_regs(c, volatile_regs).await?)
47 },
48 None => build_rbac_chain(volatile_regs).await?,
49 };
50
51 Ok(chain.map(|chain| {
52 let last_txn = Some(chain.current_tx_id_hash());
53 let last_volatile_txn = if last_persistent_txn == last_txn {
56 None
57 } else {
58 last_txn
59 };
60
61 ChainInfo {
62 chain,
63 last_persistent_txn,
64 last_volatile_txn,
65 last_persistent_slot,
66 }
67 }))
68}
69
70pub async fn persistent_rbac_chain(id: &CatalystId) -> Result<Option<RegistrationChain>> {
72 let session = CassandraSession::get(true).context("Failed to get Cassandra session")?;
73
74 let id = id.as_short_id();
75
76 if let Some(chain) = cached_persistent_rbac_chain(&session, &id) {
77 return Ok(Some(chain));
78 }
79
80 let regs = indexed_regs(&session, &id).await?;
81 let chain = build_rbac_chain(regs).await?.inspect(|c| {
82 cache_persistent_rbac_chain(id.clone(), c.clone());
83 });
84 Ok(chain)
85}
86
87async fn indexed_regs(
89 session: &CassandraSession,
90 id: &CatalystId,
91) -> Result<Vec<RbacQuery>> {
92 RbacQuery::execute(session, RbacQueryParams {
93 catalyst_id: id.clone().into(),
94 })
95 .and_then(|r| r.try_collect().map_err(Into::into))
96 .await
97}
98
99pub async fn build_rbac_chain(
101 regs: impl IntoIterator<Item = RbacQuery>
102) -> Result<Option<RegistrationChain>> {
103 let mut regs = regs.into_iter();
104 let Some(root) = regs.next() else {
105 return Ok(None);
106 };
107 if !root.removed_stake_addresses.is_empty() {
108 bail!("The root registration shouldn't contain removed stake addresses");
111 }
112 let root = cip509(
113 Settings::cardano_network(),
114 root.slot_no.into(),
115 root.txn_index.into(),
116 )
117 .await?;
118
119 let chain = RegistrationChain::new(root).context("Failed to start registration chain")?;
120 let chain = apply_regs(chain, regs).await?;
121 Ok(Some(chain))
122}
123
124pub async fn apply_regs(
126 mut chain: RegistrationChain,
127 regs: impl IntoIterator<Item = RbacQuery>,
128) -> Result<RegistrationChain> {
129 let network = Settings::cardano_network();
130
131 for reg in regs {
132 if !reg.removed_stake_addresses.is_empty() {
133 continue;
136 }
137 let reg = cip509(network, reg.slot_no.into(), reg.txn_index.into()).await?;
138 chain = chain
139 .update(reg)
140 .context("Failed to update registration chain")?;
141 }
142
143 Ok(chain)
144}
145
146async fn cip509(
148 network: Network,
149 slot: Slot,
150 txn_index: TxnIndex,
151) -> Result<Cip509> {
152 let point = Point::fuzzy(slot);
153 let block = ChainFollower::get_block(network, point)
154 .await
155 .context("Unable to get block")?
156 .data;
157 if block.point().slot_or_default() != slot {
158 bail!(
161 "Unable to find exact {slot:?} block. Found block slot {:?}",
162 block.point().slot_or_default()
163 );
164 }
165 Cip509::new(&block, txn_index, &[])
167 .with_context(|| {
168 format!("Invalid RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
169 })?
170 .with_context(|| {
171 format!("No RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
172 })
173}