1use std::collections::{HashMap, HashSet};
4
5use anyhow::{Context, Result};
6use cardano_chain_follower::{hashes::TransactionId, StakeAddress};
7use catalyst_types::{
8 catalyst_id::{role_index::RoleId, CatalystId},
9 problem_report::ProblemReport,
10};
11use ed25519_dalek::VerifyingKey;
12use futures::StreamExt;
13use rbac_registration::{cardano::cip509::Cip509, registration::cardano::RegistrationChain};
14
15use crate::{
16 db::index::{
17 queries::rbac::get_catalyst_id_from_stake_address::cache_stake_address,
18 session::CassandraSession,
19 },
20 rbac::{
21 chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain},
22 get_chain::{apply_regs, build_rbac_chain, persistent_rbac_chain},
23 latest_rbac_chain, RbacBlockIndexingContext, RbacValidationError, RbacValidationResult,
24 RbacValidationSuccess,
25 },
26};
27
28pub async fn validate_rbac_registration(
34 reg: Cip509,
35 is_persistent: bool,
36 context: &mut RbacBlockIndexingContext,
37) -> RbacValidationResult {
38 match reg.previous_transaction() {
39 Some(previous_txn) => {
41 Box::pin(update_chain(reg, previous_txn, is_persistent, context)).await
42 },
43 None => Box::pin(start_new_chain(reg, is_persistent, context)).await,
44 }
45}
46
47async fn update_chain(
49 reg: Cip509,
50 previous_txn: TransactionId,
51 is_persistent: bool,
52 context: &mut RbacBlockIndexingContext,
53) -> RbacValidationResult {
54 let purpose = reg.purpose();
55 let report = reg.report().to_owned();
56
57 let Some(catalyst_id) = catalyst_id_from_txn_id(previous_txn, is_persistent, context).await?
59 else {
60 return Err(RbacValidationError::UnknownCatalystId);
63 };
64 let chain = chain(&catalyst_id, is_persistent, context).await?
65 .context("{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'")?;
66
67 let previous_addresses = chain.role_0_stake_addresses();
69 let reg_addresses = reg.role_0_stake_addresses();
70 let new_addresses: Vec<_> = reg_addresses.difference(&previous_addresses).collect();
71 for address in &new_addresses {
72 match catalyst_id_from_stake_address(address, is_persistent, context).await? {
73 None => {
74 },
76 Some(_) => {
77 report.functional_validation(
78 &format!("{address} stake addresses is already used"),
79 "It isn't allowed to use same stake address in multiple registration chains",
80 );
81 },
82 }
83 }
84
85 let txn_id = reg.txn_hash();
87 let stake_addresses = reg.role_0_stake_addresses();
88 let origin = reg.origin().to_owned();
89
90 let new_chain = chain.update(reg).ok_or_else(|| {
92 RbacValidationError::InvalidRegistration {
93 catalyst_id: catalyst_id.clone(),
94 purpose,
95 report: report.clone(),
96 }
97 })?;
98
99 let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?;
101
102 if report.is_problematic() {
104 return Err(RbacValidationError::InvalidRegistration {
105 catalyst_id,
106 purpose,
107 report,
108 });
109 }
110
111 context.insert_transaction(txn_id, catalyst_id.clone());
113 context.insert_addresses(stake_addresses.clone(), &catalyst_id);
114 context.insert_public_keys(public_keys.clone(), &catalyst_id);
115 context.insert_registration(
116 catalyst_id.clone(),
117 txn_id,
118 origin.point().slot_or_default(),
119 origin.txn_index(),
120 Some(previous_txn),
121 HashSet::new(),
123 );
124
125 if is_persistent {
126 cache_persistent_rbac_chain(catalyst_id.clone(), new_chain);
127 }
128
129 Ok(RbacValidationSuccess {
130 catalyst_id,
131 stake_addresses,
132 public_keys,
133 modified_chains: Vec::new(),
136 purpose,
137 })
138}
139
140async fn start_new_chain(
142 reg: Cip509,
143 is_persistent: bool,
144 context: &mut RbacBlockIndexingContext,
145) -> RbacValidationResult {
146 let catalyst_id = reg.catalyst_id().map(CatalystId::as_short_id);
147 let purpose = reg.purpose();
148 let report = reg.report().to_owned();
149
150 let new_chain = RegistrationChain::new(reg).ok_or_else(|| {
152 if let Some(catalyst_id) = catalyst_id {
153 RbacValidationError::InvalidRegistration {
154 catalyst_id,
155 purpose,
156 report: report.clone(),
157 }
158 } else {
159 RbacValidationError::UnknownCatalystId
160 }
161 })?;
162
163 let catalyst_id = new_chain.catalyst_id().as_short_id();
165 if is_chain_known(&catalyst_id, is_persistent, context).await? {
166 report.functional_validation(
167 &format!("{catalyst_id} is already used"),
168 "It isn't allowed to use same Catalyst ID (certificate subject public key) in multiple registration chains",
169 );
170 return Err(RbacValidationError::InvalidRegistration {
171 catalyst_id,
172 purpose,
173 report,
174 });
175 }
176
177 let new_addresses = new_chain.role_0_stake_addresses();
179 let mut updated_chains: HashMap<_, HashSet<StakeAddress>> = HashMap::new();
180 for address in &new_addresses {
181 if let Some(id) = catalyst_id_from_stake_address(address, is_persistent, context).await? {
182 let previous_chain = chain(&id, is_persistent, context)
185 .await?
186 .context("{id} is present in 'catalyst_id_for_stake_address', but not in 'rbac_registration'")?;
187 if previous_chain.get_latest_signing_pk_for_role(&RoleId::Role0)
188 == new_chain.get_latest_signing_pk_for_role(&RoleId::Role0)
189 {
190 report.functional_validation(
191 &format!("A new registration ({catalyst_id}) uses the same public key as the previous one ({})",
192 previous_chain.catalyst_id().as_short_id()
193 ),
194 "It is only allowed to override the existing chain by using different public key",
195 );
196 } else {
197 updated_chains
200 .entry(id)
201 .and_modify(|e| {
202 e.insert(address.clone());
203 })
204 .or_insert([address.clone()].into_iter().collect());
205 }
206 }
207 }
208
209 let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?;
211
212 if report.is_problematic() {
213 return Err(RbacValidationError::InvalidRegistration {
214 catalyst_id,
215 purpose,
216 report,
217 });
218 }
219
220 context.insert_transaction(new_chain.current_tx_id_hash(), catalyst_id.clone());
222 context.insert_addresses(new_addresses.clone(), &catalyst_id);
225 context.insert_public_keys(public_keys.clone(), &catalyst_id);
226 context.insert_registration(
227 catalyst_id.clone(),
228 new_chain.current_tx_id_hash(),
229 new_chain.current_point().slot_or_default(),
230 new_chain.current_txn_index(),
231 None,
233 HashSet::new(),
235 );
236
237 for (catalyst_id, addresses) in &updated_chains {
239 for address in addresses {
240 cache_stake_address(is_persistent, address.clone(), catalyst_id.clone());
241 }
242 }
243
244 Ok(RbacValidationSuccess {
245 catalyst_id,
246 stake_addresses: new_addresses,
247 public_keys,
248 modified_chains: updated_chains.into_iter().collect(),
249 purpose,
250 })
251}
252
253async fn catalyst_id_from_txn_id(
255 txn_id: TransactionId,
256 is_persistent: bool,
257 context: &mut RbacBlockIndexingContext,
258) -> Result<Option<CatalystId>> {
259 use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query;
260
261 if let Some(catalyst_id) = context.find_transaction(&txn_id) {
263 return Ok(Some(catalyst_id.to_owned()));
264 }
265
266 let session =
268 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
269 if let Some(id) = Query::get(&session, txn_id).await? {
270 return Ok(Some(id));
271 }
272
273 if !is_persistent {
275 let session =
276 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
277 return Query::get(&session, txn_id).await;
278 }
279
280 Ok(None)
281}
282
283async fn chain(
286 id: &CatalystId,
287 is_persistent: bool,
288 context: &mut RbacBlockIndexingContext,
289) -> Result<Option<RegistrationChain>> {
290 let chain = if is_persistent {
291 persistent_rbac_chain(id).await?
292 } else {
293 latest_rbac_chain(id).await?.map(|i| i.chain)
294 };
295
296 if let Some(regs) = context.find_registrations(id) {
298 let regs = regs.iter().cloned();
299 match chain {
300 Some(c) => apply_regs(c, regs).await.map(Some),
301 None => build_rbac_chain(regs).await,
302 }
303 } else {
304 Ok(chain)
305 }
306}
307
308async fn catalyst_id_from_stake_address(
310 address: &StakeAddress,
311 is_persistent: bool,
312 context: &mut RbacBlockIndexingContext,
313) -> Result<Option<CatalystId>> {
314 use crate::db::index::queries::rbac::get_catalyst_id_from_stake_address::Query;
315
316 if let Some(catalyst_id) = context.find_address(address) {
318 return Ok(Some(catalyst_id.to_owned()));
319 }
320
321 let session =
323 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
324 if let Some(id) = Query::latest(&session, address).await? {
325 return Ok(Some(id));
326 }
327
328 if !is_persistent {
330 let session =
331 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
332 return Query::latest(&session, address).await;
333 }
334
335 Ok(None)
336}
337
338async fn validate_public_keys(
341 chain: &RegistrationChain,
342 is_persistent: bool,
343 report: &ProblemReport,
344 context: &mut RbacBlockIndexingContext,
345) -> Result<HashSet<VerifyingKey>> {
346 let mut keys = HashSet::new();
347
348 let roles: Vec<_> = chain.role_data_history().keys().collect();
349 let catalyst_id = chain.catalyst_id().as_short_id();
350
351 for role in roles {
352 if let Some((key, _)) = chain.get_latest_signing_pk_for_role(role) {
353 keys.insert(key);
354 if let Some(previous) = catalyst_id_from_public_key(key, is_persistent, context).await?
355 {
356 if previous != catalyst_id {
357 report.functional_validation(
358 &format!("An update to {catalyst_id} registration chain uses the same public key ({key:?}) as {previous} chain"),
359 "It isn't allowed to use role 0 signing (certificate subject public) key in different chains",
360 );
361 }
362 }
363 }
364 }
365
366 Ok(keys)
367}
368
369async fn catalyst_id_from_public_key(
371 key: VerifyingKey,
372 is_persistent: bool,
373 context: &mut RbacBlockIndexingContext,
374) -> Result<Option<CatalystId>> {
375 use crate::db::index::queries::rbac::get_catalyst_id_from_public_key::Query;
376
377 if let Some(catalyst_id) = context.find_public_key(&key) {
379 return Ok(Some(catalyst_id.to_owned()));
380 }
381
382 let session =
384 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
385 if let Some(id) = Query::get(&session, key).await? {
386 return Ok(Some(id));
387 }
388
389 if !is_persistent {
391 let session =
392 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
393 return Query::get(&session, key).await;
394 }
395
396 Ok(None)
397}
398
399pub async fn is_chain_known(
404 id: &CatalystId,
405 is_persistent: bool,
406 context: &mut RbacBlockIndexingContext,
407) -> Result<bool> {
408 if context.find_registrations(id).is_some() {
409 return Ok(true);
410 }
411
412 let session =
413 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
414
415 if cached_persistent_rbac_chain(&session, id).is_some() {
418 return Ok(true);
419 }
420
421 if is_cat_id_known(&session, id).await? {
422 return Ok(true);
423 }
424
425 if !is_persistent {
427 let session =
428 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
429 if is_cat_id_known(&session, id).await? {
430 return Ok(true);
431 }
432 }
433
434 Ok(false)
435}
436
437async fn is_cat_id_known(
439 session: &CassandraSession,
440 id: &CatalystId,
441) -> Result<bool> {
442 use crate::db::index::queries::rbac::get_rbac_registrations::{Query, QueryParams};
443
444 Ok(Query::execute(session, QueryParams {
445 catalyst_id: id.clone().into(),
446 })
447 .await?
448 .next()
449 .await
450 .is_some())
451}