cat_gateway/rbac/
get_chain.rs1use anyhow::{Context, Result, bail};
4use cardano_chain_follower::{ChainFollower, Network, Point, Slot, TxnIndex};
5use catalyst_types::catalyst_id::CatalystId;
6use futures::{TryFutureExt, TryStreamExt, future::try_join};
7use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
8
9use crate::{
10 db::index::{
11 queries::rbac::get_rbac_registrations::{
12 Query as RbacQuery, QueryParams as RbacQueryParams,
13 },
14 session::CassandraSession,
15 },
16 rbac::{
17 ChainInfo,
18 chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain},
19 },
20 settings::Settings,
21};
22
23pub async fn latest_rbac_chain(id: &CatalystId) -> Result<Option<ChainInfo>> {
26 let volatile_session =
27 CassandraSession::get(false).context("Failed to get volatile Cassandra session")?;
28 let (chain, volatile_regs) = try_join(
31 persistent_rbac_chain(id),
32 indexed_regs(&volatile_session, id),
33 )
34 .await?;
35
36 let mut last_persistent_txn = None;
37 let mut last_persistent_slot = 0.into();
38
39 let chain = match chain {
41 Some(c) => {
42 last_persistent_txn = Some(c.current_tx_id_hash());
43 last_persistent_slot = c.current_point().slot_or_default();
44 Some(apply_regs(c, volatile_regs).await?)
45 },
46 None => build_rbac_chain(volatile_regs).await?,
47 };
48
49 Ok(chain.map(|chain| {
50 let last_txn = Some(chain.current_tx_id_hash());
51 let last_volatile_txn = if last_persistent_txn == last_txn {
54 None
55 } else {
56 last_txn
57 };
58
59 ChainInfo {
60 chain,
61 last_persistent_txn,
62 last_volatile_txn,
63 last_persistent_slot,
64 }
65 }))
66}
67
68pub async fn persistent_rbac_chain(id: &CatalystId) -> Result<Option<RegistrationChain>> {
70 let session = CassandraSession::get(true).context("Failed to get Cassandra session")?;
71 if let Some(chain) = cached_persistent_rbac_chain(&session, id) {
72 return Ok(Some(chain));
73 }
74
75 let regs = indexed_regs(&session, id).await?;
76 let chain = build_rbac_chain(regs).await?.inspect(|c| {
77 cache_persistent_rbac_chain(id.clone(), c.clone());
78 });
79 Ok(chain)
80}
81
82async fn indexed_regs(
84 session: &CassandraSession,
85 id: &CatalystId,
86) -> Result<Vec<RbacQuery>> {
87 RbacQuery::execute(session, RbacQueryParams {
88 catalyst_id: id.clone().into(),
89 })
90 .and_then(|r| r.try_collect().map_err(Into::into))
91 .await
92}
93
94pub async fn build_rbac_chain(
96 regs: impl IntoIterator<Item = RbacQuery>
97) -> Result<Option<RegistrationChain>> {
98 let mut regs = regs.into_iter();
99 let Some(root) = regs.next() else {
100 return Ok(None);
101 };
102 if !root.removed_stake_addresses.is_empty() {
103 bail!("The root registration shouldn't contain removed stake addresses");
106 }
107 let root = cip509(
108 Settings::cardano_network(),
109 root.slot_no.into(),
110 root.txn_index.into(),
111 )
112 .await?;
113
114 let chain = RegistrationChain::new(root).context("Failed to start registration chain")?;
115 let chain = apply_regs(chain, regs).await?;
116 Ok(Some(chain))
117}
118
119pub async fn apply_regs(
121 mut chain: RegistrationChain,
122 regs: impl IntoIterator<Item = RbacQuery>,
123) -> Result<RegistrationChain> {
124 let network = Settings::cardano_network();
125
126 for reg in regs {
127 if !reg.removed_stake_addresses.is_empty() {
128 continue;
131 }
132 let reg = cip509(network, reg.slot_no.into(), reg.txn_index.into()).await?;
133 chain = chain
134 .update(reg)
135 .context("Failed to update registration chain")?;
136 }
137
138 Ok(chain)
139}
140
141async fn cip509(
143 network: &Network,
144 slot: Slot,
145 txn_index: TxnIndex,
146) -> Result<Cip509> {
147 let point = Point::fuzzy(slot);
148 let block = ChainFollower::get_block(network, point)
149 .await
150 .context("Unable to get block")?
151 .data;
152 if block.point().slot_or_default() != slot {
153 bail!(
156 "Unable to find exact {slot:?} block. Found block slot {:?}",
157 block.point().slot_or_default()
158 );
159 }
160 Cip509::new(&block, txn_index, &[])
162 .with_context(|| {
163 format!("Invalid RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
164 })?
165 .with_context(|| {
166 format!("No RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
167 })
168}