cat_gateway/rbac/
get_chain.rs1use anyhow::{bail, Context, Result};
4use cardano_chain_follower::{ChainFollower, Network, Point, Slot, StakeAddress, TxnIndex};
5use catalyst_types::catalyst_id::CatalystId;
6use futures::{future::try_join, TryFutureExt, TryStreamExt};
7use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
8
9use crate::{
10 db::index::{
11 queries::rbac::{
12 get_catalyst_id_from_stake_address::Query as CatalystIdQuery,
13 get_rbac_registrations::{Query as RbacQuery, QueryParams as RbacQueryParams},
14 },
15 session::CassandraSession,
16 },
17 rbac::{
18 chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain},
19 ChainInfo,
20 },
21 settings::Settings,
22};
23
24pub async fn latest_rbac_chain(id: &CatalystId) -> Result<Option<ChainInfo>> {
27 let id = id.as_short_id();
28
29 let volatile_session =
30 CassandraSession::get(false).context("Failed to get volatile Cassandra session")?;
31 let (chain, volatile_regs) = try_join(
34 persistent_rbac_chain(&id),
35 indexed_regs(&volatile_session, &id),
36 )
37 .await?;
38
39 let mut last_persistent_txn = None;
40 let mut last_persistent_slot = 0.into();
41
42 let chain = match chain {
44 Some(c) => {
45 last_persistent_txn = Some(c.current_tx_id_hash());
46 last_persistent_slot = c.current_point().slot_or_default();
47 Some(apply_regs(c, volatile_regs).await?)
48 },
49 None => build_rbac_chain(volatile_regs).await?,
50 };
51
52 Ok(chain.map(|chain| {
53 let last_txn = Some(chain.current_tx_id_hash());
54 let last_volatile_txn = if last_persistent_txn == last_txn {
57 None
58 } else {
59 last_txn
60 };
61
62 ChainInfo {
63 chain,
64 last_persistent_txn,
65 last_volatile_txn,
66 last_persistent_slot,
67 }
68 }))
69}
70
71pub async fn latest_rbac_chain_by_address(address: &StakeAddress) -> Result<Option<ChainInfo>> {
74 let persistent_session =
75 CassandraSession::get(true).context("Failed to get persistent Cassandra session")?;
76 let volatile_session =
77 CassandraSession::get(false).context("Failed to get volatile Cassandra session")?;
78
79 let id = match CatalystIdQuery::latest(&volatile_session, address).await? {
81 Some(id) => id,
82 None => {
83 match CatalystIdQuery::latest(&persistent_session, address).await? {
84 Some(id) => id,
85 None => return Ok(None),
86 }
87 },
88 };
89
90 latest_rbac_chain(&id).await
91}
92
93pub async fn persistent_rbac_chain(id: &CatalystId) -> Result<Option<RegistrationChain>> {
95 let session = CassandraSession::get(true).context("Failed to get Cassandra session")?;
96
97 let id = id.as_short_id();
98
99 if let Some(chain) = cached_persistent_rbac_chain(&session, &id) {
100 return Ok(Some(chain));
101 }
102
103 let regs = indexed_regs(&session, &id).await?;
104 let chain = build_rbac_chain(regs).await?.inspect(|c| {
105 cache_persistent_rbac_chain(id.clone(), c.clone());
106 });
107 Ok(chain)
108}
109
110async fn indexed_regs(
112 session: &CassandraSession,
113 id: &CatalystId,
114) -> Result<Vec<RbacQuery>> {
115 RbacQuery::execute(session, RbacQueryParams {
116 catalyst_id: id.clone().into(),
117 })
118 .and_then(|r| r.try_collect().map_err(Into::into))
119 .await
120}
121
122pub async fn build_rbac_chain(
124 regs: impl IntoIterator<Item = RbacQuery>
125) -> Result<Option<RegistrationChain>> {
126 let mut regs = regs.into_iter();
127 let Some(root) = regs.next() else {
128 return Ok(None);
129 };
130 if !root.removed_stake_addresses.is_empty() {
131 bail!("The root registration shouldn't contain removed stake addresses");
134 }
135 let root = cip509(
136 Settings::cardano_network(),
137 root.slot_no.into(),
138 root.txn_index.into(),
139 )
140 .await?;
141
142 let chain = RegistrationChain::new(root).context("Failed to start registration chain")?;
143 let chain = apply_regs(chain, regs).await?;
144 Ok(Some(chain))
145}
146
147pub async fn apply_regs(
149 mut chain: RegistrationChain,
150 regs: impl IntoIterator<Item = RbacQuery>,
151) -> Result<RegistrationChain> {
152 let network = Settings::cardano_network();
153
154 for reg in regs {
155 if !reg.removed_stake_addresses.is_empty() {
156 continue;
159 }
160 let reg = cip509(network, reg.slot_no.into(), reg.txn_index.into()).await?;
161 chain = chain
162 .update(reg)
163 .context("Failed to update registration chain")?;
164 }
165
166 Ok(chain)
167}
168
169async fn cip509(
171 network: Network,
172 slot: Slot,
173 txn_index: TxnIndex,
174) -> Result<Cip509> {
175 let point = Point::fuzzy(slot);
176 let block = ChainFollower::get_block(network, point)
177 .await
178 .context("Unable to get block")?
179 .data;
180 if block.point().slot_or_default() != slot {
181 bail!(
184 "Unable to find exact {slot:?} block. Found block slot {:?}",
185 block.point().slot_or_default()
186 );
187 }
188 Cip509::new(&block, txn_index, &[])
190 .with_context(|| {
191 format!("Invalid RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
192 })?
193 .with_context(|| {
194 format!("No RBAC registration, slot = {slot:?}, transaction index = {txn_index:?}")
195 })
196}