1use std::collections::{HashMap, HashSet};
4
5use anyhow::{Context, Result};
6use cardano_chain_follower::{hashes::TransactionId, StakeAddress};
7use catalyst_types::{
8 catalyst_id::{role_index::RoleId, CatalystId},
9 problem_report::ProblemReport,
10};
11use ed25519_dalek::VerifyingKey;
12use futures::StreamExt;
13use rbac_registration::{
14 cardano::cip509::{Cip0134UriSet, Cip509},
15 registration::cardano::RegistrationChain,
16};
17
18use crate::{
19 db::index::{
20 queries::rbac::get_catalyst_id_from_stake_address::cache_stake_address,
21 session::CassandraSession,
22 },
23 rbac::{
24 chains_cache::{cache_persistent_rbac_chain, cached_persistent_rbac_chain},
25 get_chain::{apply_regs, build_rbac_chain, persistent_rbac_chain},
26 latest_rbac_chain, RbacBlockIndexingContext, RbacValidationError, RbacValidationResult,
27 RbacValidationSuccess,
28 },
29};
30
31pub async fn validate_rbac_registration(
37 reg: Cip509,
38 is_persistent: bool,
39 context: &mut RbacBlockIndexingContext,
40) -> RbacValidationResult {
41 match reg.previous_transaction() {
42 Some(previous_txn) => {
44 Box::pin(update_chain(reg, previous_txn, is_persistent, context)).await
45 },
46 None => Box::pin(start_new_chain(reg, is_persistent, context)).await,
47 }
48}
49
50async fn update_chain(
52 reg: Cip509,
53 previous_txn: TransactionId,
54 is_persistent: bool,
55 context: &mut RbacBlockIndexingContext,
56) -> RbacValidationResult {
57 let purpose = reg.purpose();
58 let report = reg.report().to_owned();
59
60 let Some(catalyst_id) = catalyst_id_from_txn_id(previous_txn, is_persistent, context).await?
62 else {
63 return Err(RbacValidationError::UnknownCatalystId);
66 };
67 let chain = chain(&catalyst_id, is_persistent, context).await?
68 .context("{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'")?;
69
70 let previous_addresses = chain.stake_addresses();
72 let reg_addresses = cip509_stake_addresses(®);
73 let new_addresses: Vec<_> = reg_addresses.difference(&previous_addresses).collect();
74 for address in &new_addresses {
75 match catalyst_id_from_stake_address(address, is_persistent, context).await? {
76 None => {
77 },
79 Some(_) => {
80 report.functional_validation(
81 &format!("{address} stake addresses is already used"),
82 "It isn't allowed to use same stake address in multiple registration chains",
83 );
84 },
85 }
86 }
87
88 let txn_id = reg.txn_hash();
90 let stake_addresses = cip509_stake_addresses(®);
91 let origin = reg.origin().to_owned();
92
93 let new_chain = chain.update(reg).ok_or_else(|| {
95 RbacValidationError::InvalidRegistration {
96 catalyst_id: catalyst_id.clone(),
97 purpose,
98 report: report.clone(),
99 }
100 })?;
101
102 let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?;
104
105 if report.is_problematic() {
107 return Err(RbacValidationError::InvalidRegistration {
108 catalyst_id,
109 purpose,
110 report,
111 });
112 }
113
114 context.insert_transaction(txn_id, catalyst_id.clone());
116 context.insert_addresses(stake_addresses.clone(), &catalyst_id);
117 context.insert_public_keys(public_keys.clone(), &catalyst_id);
118 context.insert_registration(
119 catalyst_id.clone(),
120 txn_id,
121 origin.point().slot_or_default(),
122 origin.txn_index(),
123 Some(previous_txn),
124 HashSet::new(),
126 );
127
128 if is_persistent {
129 cache_persistent_rbac_chain(catalyst_id.clone(), new_chain);
130 }
131
132 Ok(RbacValidationSuccess {
133 catalyst_id,
134 stake_addresses,
135 public_keys,
136 modified_chains: Vec::new(),
139 purpose,
140 })
141}
142
143async fn start_new_chain(
145 reg: Cip509,
146 is_persistent: bool,
147 context: &mut RbacBlockIndexingContext,
148) -> RbacValidationResult {
149 let catalyst_id = reg.catalyst_id().map(CatalystId::as_short_id);
150 let purpose = reg.purpose();
151 let report = reg.report().to_owned();
152
153 let new_chain = RegistrationChain::new(reg).ok_or_else(|| {
155 if let Some(catalyst_id) = catalyst_id {
156 RbacValidationError::InvalidRegistration {
157 catalyst_id,
158 purpose,
159 report: report.clone(),
160 }
161 } else {
162 RbacValidationError::UnknownCatalystId
163 }
164 })?;
165
166 let catalyst_id = new_chain.catalyst_id().as_short_id();
168 if is_chain_known(&catalyst_id, is_persistent, context).await? {
169 report.functional_validation(
170 &format!("{catalyst_id} is already used"),
171 "It isn't allowed to use same Catalyst ID (certificate subject public key) in multiple registration chains",
172 );
173 return Err(RbacValidationError::InvalidRegistration {
174 catalyst_id,
175 purpose,
176 report,
177 });
178 }
179
180 let new_addresses = new_chain.stake_addresses();
182 let mut updated_chains: HashMap<_, HashSet<StakeAddress>> = HashMap::new();
183 for address in &new_addresses {
184 if let Some(id) = catalyst_id_from_stake_address(address, is_persistent, context).await? {
185 let previous_chain = chain(&id, is_persistent, context)
188 .await?
189 .context("{id} is present in 'catalyst_id_for_stake_address', but not in 'rbac_registration'")?;
190 if previous_chain.get_latest_signing_pk_for_role(&RoleId::Role0)
191 == new_chain.get_latest_signing_pk_for_role(&RoleId::Role0)
192 {
193 report.functional_validation(
194 &format!("A new registration ({catalyst_id}) uses the same public key as the previous one ({})",
195 previous_chain.catalyst_id().as_short_id()
196 ),
197 "It is only allowed to override the existing chain by using different public key",
198 );
199 } else {
200 updated_chains
203 .entry(id)
204 .and_modify(|e| {
205 e.insert(address.clone());
206 })
207 .or_insert([address.clone()].into_iter().collect());
208 }
209 }
210 }
211
212 let public_keys = validate_public_keys(&new_chain, is_persistent, &report, context).await?;
214
215 if report.is_problematic() {
216 return Err(RbacValidationError::InvalidRegistration {
217 catalyst_id,
218 purpose,
219 report,
220 });
221 }
222
223 context.insert_transaction(new_chain.current_tx_id_hash(), catalyst_id.clone());
225 context.insert_addresses(new_addresses.clone(), &catalyst_id);
228 context.insert_public_keys(public_keys.clone(), &catalyst_id);
229 context.insert_registration(
230 catalyst_id.clone(),
231 new_chain.current_tx_id_hash(),
232 new_chain.current_point().slot_or_default(),
233 new_chain.current_txn_index(),
234 None,
236 HashSet::new(),
238 );
239
240 for (catalyst_id, addresses) in &updated_chains {
242 for address in addresses {
243 cache_stake_address(is_persistent, address.clone(), catalyst_id.clone());
244 }
245 }
246
247 Ok(RbacValidationSuccess {
248 catalyst_id,
249 stake_addresses: new_addresses,
250 public_keys,
251 modified_chains: updated_chains.into_iter().collect(),
252 purpose,
253 })
254}
255
256async fn catalyst_id_from_txn_id(
258 txn_id: TransactionId,
259 is_persistent: bool,
260 context: &mut RbacBlockIndexingContext,
261) -> Result<Option<CatalystId>> {
262 use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query;
263
264 if let Some(catalyst_id) = context.find_transaction(&txn_id) {
266 return Ok(Some(catalyst_id.to_owned()));
267 }
268
269 let session =
271 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
272 if let Some(id) = Query::get(&session, txn_id).await? {
273 return Ok(Some(id));
274 }
275
276 if !is_persistent {
278 let session =
279 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
280 return Query::get(&session, txn_id).await;
281 }
282
283 Ok(None)
284}
285
286async fn chain(
289 id: &CatalystId,
290 is_persistent: bool,
291 context: &mut RbacBlockIndexingContext,
292) -> Result<Option<RegistrationChain>> {
293 let chain = if is_persistent {
294 persistent_rbac_chain(id).await?
295 } else {
296 latest_rbac_chain(id).await?.map(|i| i.chain)
297 };
298
299 if let Some(regs) = context.find_registrations(id) {
301 let regs = regs.iter().cloned();
302 match chain {
303 Some(c) => apply_regs(c, regs).await.map(Some),
304 None => build_rbac_chain(regs).await,
305 }
306 } else {
307 Ok(chain)
308 }
309}
310
311async fn catalyst_id_from_stake_address(
313 address: &StakeAddress,
314 is_persistent: bool,
315 context: &mut RbacBlockIndexingContext,
316) -> Result<Option<CatalystId>> {
317 use crate::db::index::queries::rbac::get_catalyst_id_from_stake_address::Query;
318
319 if let Some(catalyst_id) = context.find_address(address) {
321 return Ok(Some(catalyst_id.to_owned()));
322 }
323
324 let session =
326 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
327 if let Some(id) = Query::latest(&session, address).await? {
328 return Ok(Some(id));
329 }
330
331 if !is_persistent {
333 let session =
334 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
335 return Query::latest(&session, address).await;
336 }
337
338 Ok(None)
339}
340
341async fn validate_public_keys(
344 chain: &RegistrationChain,
345 is_persistent: bool,
346 report: &ProblemReport,
347 context: &mut RbacBlockIndexingContext,
348) -> Result<HashSet<VerifyingKey>> {
349 let mut keys = HashSet::new();
350
351 let roles: Vec<_> = chain.role_data_history().keys().collect();
352 let catalyst_id = chain.catalyst_id().as_short_id();
353
354 for role in roles {
355 if let Some((key, _)) = chain.get_latest_signing_pk_for_role(role) {
356 keys.insert(key);
357 if let Some(previous) = catalyst_id_from_public_key(key, is_persistent, context).await?
358 {
359 if previous != catalyst_id {
360 report.functional_validation(
361 &format!("An update to {catalyst_id} registration chain uses the same public key ({key:?}) as {previous} chain"),
362 "It isn't allowed to use role 0 signing (certificate subject public) key in different chains",
363 );
364 }
365 }
366 }
367 }
368
369 Ok(keys)
370}
371
372async fn catalyst_id_from_public_key(
374 key: VerifyingKey,
375 is_persistent: bool,
376 context: &mut RbacBlockIndexingContext,
377) -> Result<Option<CatalystId>> {
378 use crate::db::index::queries::rbac::get_catalyst_id_from_public_key::Query;
379
380 if let Some(catalyst_id) = context.find_public_key(&key) {
382 return Ok(Some(catalyst_id.to_owned()));
383 }
384
385 let session =
387 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
388 if let Some(id) = Query::get(&session, key).await? {
389 return Ok(Some(id));
390 }
391
392 if !is_persistent {
394 let session =
395 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
396 return Query::get(&session, key).await;
397 }
398
399 Ok(None)
400}
401
402pub async fn is_chain_known(
407 id: &CatalystId,
408 is_persistent: bool,
409 context: &mut RbacBlockIndexingContext,
410) -> Result<bool> {
411 if context.find_registrations(id).is_some() {
412 return Ok(true);
413 }
414
415 let session =
416 CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
417
418 if cached_persistent_rbac_chain(&session, id).is_some() {
421 return Ok(true);
422 }
423
424 if is_cat_id_known(&session, id).await? {
425 return Ok(true);
426 }
427
428 if !is_persistent {
430 let session =
431 CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
432 if is_cat_id_known(&session, id).await? {
433 return Ok(true);
434 }
435 }
436
437 Ok(false)
438}
439
440async fn is_cat_id_known(
442 session: &CassandraSession,
443 id: &CatalystId,
444) -> Result<bool> {
445 use crate::db::index::queries::rbac::get_rbac_registrations::{Query, QueryParams};
446
447 Ok(Query::execute(session, QueryParams {
448 catalyst_id: id.clone().into(),
449 })
450 .await?
451 .next()
452 .await
453 .is_some())
454}
455
456fn cip509_stake_addresses(cip509: &Cip509) -> HashSet<StakeAddress> {
458 cip509
459 .certificate_uris()
460 .map(Cip0134UriSet::stake_addresses)
461 .unwrap_or_default()
462}