1use std::collections::{HashMap, HashSet};
4
5use anyhow::{Context, Result};
6use cardano_chain_follower::{hashes::TransactionId, StakeAddress};
7use catalyst_types::{
8 catalyst_id::{role_index::RoleId, CatalystId},
9 problem_report::ProblemReport,
10};
11use ed25519_dalek::VerifyingKey;
12use futures::StreamExt;
13use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
14
15use crate::{
16 db::index::{
17 queries::rbac::get_catalyst_id_from_stake_address::cache_stake_address,
18 session::CassandraSession,
19 },
20 rbac::{
21 chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain},
22 get_chain::{apply_regs, build_rbac_chain, persistent_rbac_chain},
23 latest_rbac_chain, RbacBlockIndexingContext, RbacValidationError, RbacValidationResult,
24 RbacValidationSuccess,
25 },
26};
27
28pub async fn validate_rbac_registration(
34 reg: Cip509, is_persistent: bool, context: &mut RbacBlockIndexingContext,
35) -> RbacValidationResult {
36 match reg.previous_transaction() {
37 Some(previous_txn) => {
39 Box::pin(update_chain(reg, previous_txn, is_persistent, context)).await
40 },
41 None => Box::pin(start_new_chain(reg, is_persistent, context)).await,
42 }
43}
44
45async fn update_chain(
47 reg: Cip509, previous_txn: TransactionId, is_persistent: bool,
48 context: &mut RbacBlockIndexingContext,
49) -> RbacValidationResult {
50 let purpose = reg.purpose();
51 let report = reg.report().to_owned();
52
53 let Some(catalyst_id) = catalyst_id_from_txn_id(previous_txn, is_persistent, context).await?
55 else {
56 return Err(RbacValidationError::UnknownCatalystId);
59 };
60 let chain = chain(&catalyst_id, is_persistent, context).await?
61 .context("{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'")?;
62
63 let previous_addresses = chain.role_0_stake_addresses();
65 let reg_addresses = reg.role_0_stake_addresses();
66 let new_addresses: Vec<_> = reg_addresses.difference(&previous_addresses).collect();
67 for address in &new_addresses {
68 match catalyst_id_from_stake_address(address, is_persistent, context).await? {
69 None => {
70 },
72 Some(_) => {
73 report.functional_validation(
74 &format!("{address} stake addresses is already used"),
75 "It isn't allowed to use same stake address in multiple registration chains",
76 );
77 },
78 }
79 }
80
81 let txn_id = reg.txn_hash();
83 let stake_addresses = reg.role_0_stake_addresses();
84 let origin = reg.origin().to_owned();
85
86 let new_chain = chain.update(reg).ok_or_else(|| {
88 RbacValidationError::InvalidRegistration {
89 catalyst_id: catalyst_id.clone(),
90 purpose,
91 report: report.clone(),
92 }
93 })?;
94
95 let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?;
97
98 if report.is_problematic() {
100 return Err(RbacValidationError::InvalidRegistration {
101 catalyst_id,
102 purpose,
103 report,
104 });
105 }
106
107 context.insert_transaction(txn_id, catalyst_id.clone());
109 context.insert_addresses(stake_addresses.clone(), &catalyst_id);
110 context.insert_public_keys(public_keys.clone(), &catalyst_id);
111 context.insert_registration(
112 catalyst_id.clone(),
113 txn_id,
114 origin.point().slot_or_default(),
115 origin.txn_index(),
116 Some(previous_txn),
117 HashSet::new(),
119 );
120
121 if is_persistent {
122 cache_persistent_rbac_chain(catalyst_id.clone(), new_chain);
123 }
124
125 Ok(RbacValidationSuccess {
126 catalyst_id,
127 stake_addresses,
128 public_keys,
129 modified_chains: Vec::new(),
132 purpose,
133 })
134}
135
136async fn start_new_chain(
138 reg: Cip509, is_persistent: bool, context: &mut RbacBlockIndexingContext,
139) -> RbacValidationResult {
140 let catalyst_id = reg.catalyst_id().map(CatalystId::as_short_id);
141 let purpose = reg.purpose();
142 let report = reg.report().to_owned();
143
144 let new_chain = RegistrationChain::new(reg).ok_or_else(|| {
146 if let Some(catalyst_id) = catalyst_id {
147 RbacValidationError::InvalidRegistration {
148 catalyst_id,
149 purpose,
150 report: report.clone(),
151 }
152 } else {
153 RbacValidationError::UnknownCatalystId
154 }
155 })?;
156
157 let catalyst_id = new_chain.catalyst_id().as_short_id();
159 if is_chain_known(&catalyst_id, is_persistent, context).await? {
160 report.functional_validation(
161 &format!("{catalyst_id} is already used"),
162 "It isn't allowed to use same Catalyst ID (certificate subject public key) in multiple registration chains",
163 );
164 return Err(RbacValidationError::InvalidRegistration {
165 catalyst_id,
166 purpose,
167 report,
168 });
169 }
170
171 let new_addresses = new_chain.role_0_stake_addresses();
173 let mut updated_chains: HashMap<_, HashSet<StakeAddress>> = HashMap::new();
174 for address in &new_addresses {
175 if let Some(id) = catalyst_id_from_stake_address(address, is_persistent, context).await? {
176 let previous_chain = chain(&id, is_persistent, context)
179 .await?
180 .context("{id} is present in 'catalyst_id_for_stake_address', but not in 'rbac_registration'")?;
181 if previous_chain.get_latest_signing_pk_for_role(&RoleId::Role0)
182 == new_chain.get_latest_signing_pk_for_role(&RoleId::Role0)
183 {
184 report.functional_validation(
185 &format!("A new registration ({catalyst_id}) uses the same public key as the previous one ({})",
186 previous_chain.catalyst_id().as_short_id()
187 ),
188 "It is only allowed to override the existing chain by using different public key",
189 );
190 } else {
191 updated_chains
194 .entry(id)
195 .and_modify(|e| {
196 e.insert(address.clone());
197 })
198 .or_insert([address.clone()].into_iter().collect());
199 }
200 }
201 }
202
203 let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?;
205
206 if report.is_problematic() {
207 return Err(RbacValidationError::InvalidRegistration {
208 catalyst_id,
209 purpose,
210 report,
211 });
212 }
213
214 context.insert_transaction(new_chain.current_tx_id_hash(), catalyst_id.clone());
216 context.insert_addresses(new_addresses.clone(), &catalyst_id);
219 context.insert_public_keys(public_keys.clone(), &catalyst_id);
220 context.insert_registration(
221 catalyst_id.clone(),
222 new_chain.current_tx_id_hash(),
223 new_chain.current_point().slot_or_default(),
224 new_chain.current_txn_index(),
225 None,
227 HashSet::new(),
229 );
230
231 for (catalyst_id, addresses) in &updated_chains {
233 for address in addresses {
234 cache_stake_address(is_persistent, address.clone(), catalyst_id.clone());
235 }
236 }
237
238 Ok(RbacValidationSuccess {
239 catalyst_id,
240 stake_addresses: new_addresses,
241 public_keys,
242 modified_chains: updated_chains.into_iter().collect(),
243 purpose,
244 })
245}
246
247async fn catalyst_id_from_txn_id(
249 txn_id: TransactionId, is_persistent: bool, context: &mut RbacBlockIndexingContext,
250) -> Result<Option<CatalystId>> {
251 use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query;
252
253 if let Some(catalyst_id) = context.find_transaction(&txn_id) {
255 return Ok(Some(catalyst_id.to_owned()));
256 }
257
258 let session =
260 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
261 if let Some(id) = Query::get(&session, txn_id).await? {
262 return Ok(Some(id));
263 };
264
265 if !is_persistent {
267 let session =
268 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
269 return Query::get(&session, txn_id).await;
270 }
271
272 Ok(None)
273}
274
275async fn chain(
278 id: &CatalystId, is_persistent: bool, context: &mut RbacBlockIndexingContext,
279) -> Result<Option<RegistrationChain>> {
280 let chain = if is_persistent {
281 persistent_rbac_chain(id).await?
282 } else {
283 latest_rbac_chain(id).await?.map(|i| i.chain)
284 };
285
286 if let Some(regs) = context.find_registrations(id) {
288 let regs = regs.iter().cloned();
289 match chain {
290 Some(c) => apply_regs(c, regs).await.map(Some),
291 None => build_rbac_chain(regs).await,
292 }
293 } else {
294 Ok(chain)
295 }
296}
297
298async fn catalyst_id_from_stake_address(
300 address: &StakeAddress, is_persistent: bool, context: &mut RbacBlockIndexingContext,
301) -> Result<Option<CatalystId>> {
302 use crate::db::index::queries::rbac::get_catalyst_id_from_stake_address::Query;
303
304 if let Some(catalyst_id) = context.find_address(address) {
306 return Ok(Some(catalyst_id.to_owned()));
307 }
308
309 let session =
311 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
312 if let Some(id) = Query::latest(&session, address).await? {
313 return Ok(Some(id));
314 };
315
316 if !is_persistent {
318 let session =
319 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
320 return Query::latest(&session, address).await;
321 }
322
323 Ok(None)
324}
325
326async fn validate_public_keys(
329 chain: &RegistrationChain, is_persistent: bool, report: &ProblemReport,
330 context: &mut RbacBlockIndexingContext,
331) -> Result<HashSet<VerifyingKey>> {
332 let mut keys = HashSet::new();
333
334 let roles: Vec<_> = chain.role_data_history().keys().collect();
335 let catalyst_id = chain.catalyst_id().as_short_id();
336
337 for role in roles {
338 if let Some((key, _)) = chain.get_latest_signing_pk_for_role(role) {
339 keys.insert(key);
340 if let Some(previous) = catalyst_id_from_public_key(key, is_persistent, context).await?
341 {
342 if previous != catalyst_id {
343 report.functional_validation(
344 &format!("An update to {catalyst_id} registration chain uses the same public key ({key:?}) as {previous} chain"),
345 "It isn't allowed to use role 0 signing (certificate subject public) key in different chains",
346 );
347 }
348 }
349 }
350 }
351
352 Ok(keys)
353}
354
355async fn catalyst_id_from_public_key(
357 key: VerifyingKey, is_persistent: bool, context: &mut RbacBlockIndexingContext,
358) -> Result<Option<CatalystId>> {
359 use crate::db::index::queries::rbac::get_catalyst_id_from_public_key::Query;
360
361 if let Some(catalyst_id) = context.find_public_key(&key) {
363 return Ok(Some(catalyst_id.to_owned()));
364 }
365
366 let session =
368 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
369 if let Some(id) = Query::get(&session, key).await? {
370 return Ok(Some(id));
371 };
372
373 if !is_persistent {
375 let session =
376 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
377 return Query::get(&session, key).await;
378 }
379
380 Ok(None)
381}
382
383pub async fn is_chain_known(
388 id: &CatalystId, is_persistent: bool, context: &mut RbacBlockIndexingContext,
389) -> Result<bool> {
390 if context.find_registrations(id).is_some() {
391 return Ok(true);
392 }
393
394 let session =
395 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
396
397 if cached_persistent_rbac_chain(&session, id).is_some() {
400 return Ok(true);
401 }
402
403 if is_cat_id_known(&session, id).await? {
404 return Ok(true);
405 }
406
407 if !is_persistent {
409 let session =
410 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
411 if is_cat_id_known(&session, id).await? {
412 return Ok(true);
413 }
414 }
415
416 Ok(false)
417}
418
419async fn is_cat_id_known(session: &CassandraSession, id: &CatalystId) -> Result<bool> {
421 use crate::db::index::queries::rbac::get_rbac_registrations::{Query, QueryParams};
422
423 Ok(Query::execute(session, QueryParams {
424 catalyst_id: id.clone().into(),
425 })
426 .await?
427 .next()
428 .await
429 .is_some())
430}