cat_gateway/rbac/
state.rs1use std::collections::{HashMap, HashSet};
4
5use anyhow::Context;
6use cardano_chain_follower::{StakeAddress, hashes::TransactionId};
7use catalyst_types::catalyst_id::CatalystId;
8use ed25519_dalek::VerifyingKey;
9use futures::StreamExt;
10
11use crate::{
12 db::index::session::CassandraSession,
13 rbac::{
14 RbacBlockIndexingContext,
15 chains_cache::cached_persistent_rbac_chain,
16 get_chain::{apply_regs, build_rbac_chain, persistent_rbac_chain},
17 latest_rbac_chain,
18 },
19};
20
21pub(crate) struct RbacChainsState<'a> {
24 is_persistent: bool,
26 context: &'a RbacBlockIndexingContext,
28 modified_chains: HashMap<CatalystId, HashSet<StakeAddress>>,
33}
34
35impl<'a> RbacChainsState<'a> {
36 pub fn new(
38 is_persistent: bool,
39 context: &'a RbacBlockIndexingContext,
40 ) -> Self {
41 Self {
42 is_persistent,
43 context,
44 modified_chains: HashMap::new(),
45 }
46 }
47
48 pub fn consume(self) -> HashMap<CatalystId, HashSet<StakeAddress>> {
50 self.modified_chains
51 }
52
53 pub async fn catalyst_id_from_txn_id(
55 &self,
56 txn_id: TransactionId,
57 ) -> anyhow::Result<Option<CatalystId>> {
58 use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query;
59
60 if let Some(catalyst_id) = self.context.find_transaction(&txn_id) {
62 return Ok(Some(catalyst_id.to_owned()));
63 }
64
65 let session =
67 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
68 if let Some(id) = Query::get(&session, txn_id).await? {
69 return Ok(Some(id));
70 }
71
72 if !self.is_persistent {
74 let session =
75 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
76 return Query::get(&session, txn_id).await;
77 }
78
79 Ok(None)
80 }
81}
82
83impl rbac_registration::cardano::state::RbacChainsState for RbacChainsState<'_> {
84 async fn chain(
85 &self,
86 id: &CatalystId,
87 ) -> anyhow::Result<Option<rbac_registration::registration::cardano::RegistrationChain>> {
88 let chain = if self.is_persistent {
89 persistent_rbac_chain(id).await?
90 } else {
91 latest_rbac_chain(id).await?.map(|i| i.chain)
92 };
93
94 if let Some(regs) = self.context.find_registrations(id) {
96 let regs = regs.iter().cloned();
97 match chain {
98 Some(c) => return apply_regs(c, regs).await.map(Some),
99 None => return build_rbac_chain(regs).await,
100 }
101 }
102
103 Ok(chain)
104 }
105
106 async fn is_chain_known(
107 &self,
108 id: &CatalystId,
109 ) -> anyhow::Result<bool> {
110 if self.context.find_registrations(id).is_some() {
111 return Ok(true);
112 }
113
114 let session =
115 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
116
117 if cached_persistent_rbac_chain(&session, id).is_some() {
120 return Ok(true);
121 }
122
123 if is_cat_id_known(&session, id).await? {
124 return Ok(true);
125 }
126
127 if !self.is_persistent {
129 let session =
130 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
131 if is_cat_id_known(&session, id).await? {
132 return Ok(true);
133 }
134 }
135
136 Ok(false)
137 }
138
139 async fn is_stake_address_used(
140 &self,
141 address: &StakeAddress,
142 ) -> anyhow::Result<bool> {
143 catalyst_id_from_stake_address(address, self.is_persistent, self.context)
144 .await
145 .map(|v| v.is_some())
146 }
147
148 async fn chain_catalyst_id_from_signing_public_key(
149 &self,
150 key: &VerifyingKey,
151 ) -> anyhow::Result<Option<CatalystId>> {
152 use crate::db::index::queries::rbac::get_catalyst_id_from_public_key::Query;
153
154 if let Some(catalyst_id) = self.context.find_public_key(key) {
156 return Ok(Some(catalyst_id.to_owned()));
157 }
158
159 let session =
161 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
162 if let Some(id) = Query::get(&session, *key).await? {
163 return Ok(Some(id));
164 }
165
166 if !self.is_persistent {
168 let session =
169 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
170 return Query::get(&session, *key).await;
171 }
172
173 Ok(None)
174 }
175
176 async fn take_stake_address_from_chains<I>(
177 &mut self,
178 addresses: I,
179 ) -> anyhow::Result<()>
180 where
181 I: IntoIterator<Item = StakeAddress> + Send,
182 <I as IntoIterator>::IntoIter: Send,
183 {
184 for addr in addresses {
185 if let Some(cat_id) =
186 catalyst_id_from_stake_address(&addr, self.is_persistent, self.context).await?
187 {
188 self.modified_chains
189 .entry(cat_id)
190 .and_modify(|e| {
191 e.insert(addr.clone());
192 })
193 .or_insert([addr].into_iter().collect());
194 }
195 }
196
197 Ok(())
198 }
199}
200
201async fn catalyst_id_from_stake_address(
203 address: &StakeAddress,
204 is_persistent: bool,
205 context: &RbacBlockIndexingContext,
206) -> anyhow::Result<Option<CatalystId>> {
207 use crate::db::index::queries::rbac::get_catalyst_id_from_stake_address::Query;
208
209 if let Some(catalyst_id) = context.find_address(address) {
211 return Ok(Some(catalyst_id.to_owned()));
212 }
213
214 let session =
216 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
217 if let Some(id) = Query::latest(&session, address).await? {
218 return Ok(Some(id));
219 }
220
221 if !is_persistent {
223 let session =
224 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
225 return Query::latest(&session, address).await;
226 }
227
228 Ok(None)
229}
230
231async fn is_cat_id_known(
233 session: &CassandraSession,
234 id: &CatalystId,
235) -> anyhow::Result<bool> {
236 use crate::db::index::queries::rbac::get_rbac_registrations::{Query, QueryParams};
237
238 Ok(Query::execute(session, QueryParams {
239 catalyst_id: id.clone().into(),
240 })
241 .await?
242 .next()
243 .await
244 .is_some())
245}