1pub(crate) mod insert_catalyst_id_for_public_key;
4pub(crate) mod insert_catalyst_id_for_stake_address;
5pub(crate) mod insert_catalyst_id_for_txn_id;
6pub(crate) mod insert_rbac509;
7pub(crate) mod insert_rbac509_invalid;
8
9use std::{
10 collections::{BTreeSet, HashMap, HashSet},
11 sync::Arc,
12};
13
14use anyhow::Context;
15use cardano_chain_follower::{MultiEraBlock, Slot, StakeAddress, TxnIndex, hashes::TransactionId};
16use catalyst_types::{catalyst_id::CatalystId, problem_report::ProblemReport, uuid::UuidV4};
17use ed25519_dalek::VerifyingKey;
18use futures::{StreamExt, TryStreamExt};
19use rbac_registration::{
20 cardano::{cip509::Cip509, provider::RbacChainsProvider as _},
21 registration::cardano::RegistrationChain,
22};
23use scylla::client::session::Session;
24use tokio::sync::watch;
25use tracing::{debug, error};
26
27use crate::{
28 db::index::{
29 queries::{FallibleQueryTasks, SizedBatch},
30 session::CassandraSession,
31 },
32 metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
33 rbac::{RbacBlockIndexingContext, cache_persistent_rbac_chain, provider::RbacChainsProvider},
34 settings::cassandra_db::EnvVars,
35};
36
37#[derive(Debug)]
39pub(crate) struct Rbac509InsertQuery {
40 pub(crate) registrations: Vec<insert_rbac509::Params>,
42 pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
44 catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
46 catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
48 catalyst_id_for_public_key: Vec<insert_catalyst_id_for_public_key::Params>,
50}
51
52impl Rbac509InsertQuery {
53 pub(crate) fn new() -> Self {
55 Rbac509InsertQuery {
56 registrations: Vec::new(),
57 invalid: Vec::new(),
58 catalyst_id_for_txn_id: Vec::new(),
59 catalyst_id_for_stake_address: Vec::new(),
60 catalyst_id_for_public_key: Vec::new(),
61 }
62 }
63
64 pub(crate) async fn prepare_batch(
66 session: &Arc<Session>,
67 cfg: &EnvVars,
68 ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
69 Ok((
70 insert_rbac509::Params::prepare_batch(session, cfg).await?,
71 insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
72 insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
73 insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
74 insert_catalyst_id_for_public_key::Params::prepare_batch(session, cfg).await?,
75 ))
76 }
77
78 pub(crate) async fn index(
80 &mut self,
81 txn_hash: TransactionId,
82 index: TxnIndex,
83 block: &MultiEraBlock,
84 pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
85 our_end: Slot,
86 context: &mut RbacBlockIndexingContext,
87 ) -> anyhow::Result<()> {
88 let slot = block.slot();
89 let cip509 = match Cip509::new(block, index, &[]) {
90 Ok(Some(v)) => v,
91 Ok(None) => {
92 return Ok(());
94 },
95 Err(e) => {
96 debug!(
100 slot = ?slot,
101 index = ?index,
102 err = ?e,
103 "Invalid RBAC Registration Metadata in transaction"
104 );
105 return Ok(());
106 },
107 };
108
109 if slot != cip509.origin().point().slot_or_default() {
111 error!(
112 "Cip509 slot mismatch: expected {slot:?}, got {:?}",
113 cip509.origin().point().slot_or_default()
114 );
115 }
116 if txn_hash != cip509.txn_hash() {
117 error!(
118 "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
119 cip509.txn_hash()
120 );
121 }
122
123 wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?;
126
127 if let Some(previous_txn) = cip509.previous_transaction() {
128 self.try_record_chain_update(&cip509, previous_txn, block.is_immutable(), context)
129 .await?;
130 } else {
131 self.try_record_new_chain(&cip509, block.is_immutable(), context)
132 .await?;
133 }
134
135 Ok(())
136 }
137
138 async fn try_record_chain_update(
141 &mut self,
142 reg: &Cip509,
143 previous_txn: TransactionId,
144 is_persistent: bool,
145 context: &mut RbacBlockIndexingContext,
146 ) -> anyhow::Result<()> {
147 let state = RbacChainsProvider::new(is_persistent, context);
149
150 let slot = reg.origin().point().slot_or_default();
151 let txn_index = reg.origin().txn_index();
152 let txn_hash = reg.txn_hash();
153
154 let Some(catalyst_id) = state.catalyst_id_from_txn_id(previous_txn).await? else {
155 debug!(
159 slot = ?slot,
160 txn_index = ?txn_index,
161 txn_hash = ?txn_hash,
162 "Unable to determine Catalyst id for registration"
163 );
164 return Ok(());
165 };
166
167 let chain = state.chain(&catalyst_id).await?.context(format!(
168 "{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'"
169 ))?;
170
171 let Some(new_chain) = chain.update(reg, &state).await? else {
173 self.record_invalid_registration(
174 txn_hash,
175 txn_index,
176 slot,
177 chain.catalyst_id().clone(),
178 Some(previous_txn),
179 reg.purpose(),
180 reg.report().clone(),
181 );
182
183 return Ok(());
184 };
185
186 let previous_addresses = chain.stake_addresses();
187 let new_addresses = new_chain.stake_addresses();
188 let added_addresses: HashSet<_> = new_addresses
189 .difference(&previous_addresses)
190 .cloned()
191 .collect();
192 let removed_addresses: HashSet<_> = previous_addresses
193 .difference(&new_addresses)
194 .cloned()
195 .collect();
196 let public_keys = reg
197 .all_roles()
198 .iter()
199 .filter_map(|v| reg.signing_public_key_for_role(*v))
200 .collect::<HashSet<_>>();
201 let modified_chains = HashMap::new();
203 let purpose = reg.purpose();
204
205 if is_persistent {
206 cache_persistent_rbac_chain(catalyst_id.clone(), new_chain);
207 }
208
209 self.record_valid_registration(
210 txn_hash,
211 txn_index,
212 slot,
213 catalyst_id.clone(),
214 Some(previous_txn),
215 &added_addresses,
216 &removed_addresses,
217 public_keys,
218 modified_chains,
219 purpose,
220 context,
221 );
222
223 Ok(())
224 }
225
226 async fn try_record_new_chain(
229 &mut self,
230 reg: &Cip509,
231 is_persistent: bool,
232 context: &mut RbacBlockIndexingContext,
233 ) -> anyhow::Result<()> {
234 const FUTURES_BUFFER_SIZE: usize = 10;
236
237 let provider = RbacChainsProvider::new(is_persistent, context);
238
239 let slot = reg.origin().point().slot_or_default();
240 let txn_index = reg.origin().txn_index();
241 let txn_hash = reg.txn_hash();
242
243 let Some(new_chain) = RegistrationChain::new(reg, &provider).await? else {
245 if let Some(cat_id) = reg.catalyst_id() {
246 self.record_invalid_registration(
247 txn_hash,
248 txn_index,
249 slot,
250 cat_id.clone(),
251 None,
252 reg.purpose(),
253 reg.report().clone(),
254 );
255 } else {
256 debug!(
260 slot = ?slot,
261 txn_index = ?txn_index,
262 txn_hash = ?txn_hash,
263 "Unable to determine Catalyst id for registration"
264 );
265 }
266
267 return Ok(());
268 };
269
270 let catalyst_id = new_chain.catalyst_id();
271 let stake_addresses = new_chain.stake_addresses();
272 let public_keys = reg
273 .all_roles()
274 .iter()
275 .filter_map(|v| reg.signing_public_key_for_role(*v))
276 .collect::<HashSet<_>>();
277 let purpose = reg.purpose();
278
279 let futures = reg.stake_addresses().clone().into_iter().map(|addr| {
280 async {
281 anyhow::Ok((
282 provider.chain_catalyst_id_from_stake_address(&addr).await?,
283 addr,
284 ))
285 }
286 });
287 let modified_chains = futures::stream::iter(futures)
288 .buffer_unordered(FUTURES_BUFFER_SIZE)
289 .try_fold(
290 HashMap::<CatalystId, HashSet<StakeAddress>>::new(),
291 |mut acc, (cat_id, addr)| {
292 async {
293 if let Some(cat_id) = cat_id {
294 acc.entry(cat_id).or_default().insert(addr);
295 }
296 anyhow::Ok(acc)
297 }
298 },
299 )
300 .await?;
301
302 self.record_valid_registration(
303 txn_hash,
304 txn_index,
305 slot,
306 catalyst_id.clone(),
307 None,
308 &stake_addresses,
309 &HashSet::new(),
310 public_keys,
311 modified_chains,
312 purpose,
313 context,
314 );
315
316 Ok(())
317 }
318
319 #[allow(clippy::too_many_arguments)]
321 fn record_valid_registration(
322 &mut self,
323 txn_hash: TransactionId,
324 index: TxnIndex,
325 slot: Slot,
326 catalyst_id: CatalystId,
327 previous_transaction: Option<TransactionId>,
328 added_stake_addresses: &HashSet<StakeAddress>,
329 removed_stake_addresses: &HashSet<StakeAddress>,
330 public_keys: HashSet<VerifyingKey>,
331 modified_chains: HashMap<CatalystId, HashSet<StakeAddress>>,
332 purpose: Option<UuidV4>,
333 context: &mut RbacBlockIndexingContext,
334 ) {
335 context.insert_transaction(txn_hash, catalyst_id.clone());
336 context.insert_addresses(added_stake_addresses.clone(), &catalyst_id);
337 context.remove_addresses(removed_stake_addresses);
338 context.insert_public_keys(public_keys.clone(), &catalyst_id);
339 context.insert_registration(
340 catalyst_id.clone(),
341 txn_hash,
342 slot,
343 index,
344 previous_transaction,
345 );
346
347 self.catalyst_id_for_txn_id
349 .push(insert_catalyst_id_for_txn_id::Params::new(
350 catalyst_id.clone(),
351 txn_hash,
352 slot,
353 ));
354 for address in added_stake_addresses
356 .difference(removed_stake_addresses)
357 .cloned()
358 {
359 self.catalyst_id_for_stake_address.push(
360 insert_catalyst_id_for_stake_address::Params::new(
361 address,
362 slot,
363 index,
364 catalyst_id.clone(),
365 ),
366 );
367 }
368
369 for key in public_keys {
371 self.catalyst_id_for_public_key
372 .push(insert_catalyst_id_for_public_key::Params::new(
373 key,
374 slot,
375 catalyst_id.clone(),
376 ));
377 }
378 self.registrations.push(insert_rbac509::Params::new(
380 catalyst_id,
381 txn_hash,
382 slot,
383 index,
384 previous_transaction,
385 purpose,
386 ));
387
388 for (catalyst_id, _) in modified_chains {
390 self.registrations.push(insert_rbac509::Params::new(
391 catalyst_id.clone(),
392 txn_hash,
393 slot,
394 index,
395 None,
399 None,
400 ));
401 }
402 }
403
404 #[allow(clippy::too_many_arguments)]
406 fn record_invalid_registration(
407 &mut self,
408 txn_hash: TransactionId,
409 index: TxnIndex,
410 slot: Slot,
411 catalyst_id: CatalystId,
412 previous_transaction: Option<TransactionId>,
413 purpose: Option<UuidV4>,
414 report: ProblemReport,
415 ) {
416 inc_invalid_rbac_reg_count();
417 self.invalid.push(insert_rbac509_invalid::Params::new(
418 catalyst_id,
419 txn_hash,
420 slot,
421 index,
422 purpose,
423 previous_transaction,
424 report,
425 ));
426 }
427
428 pub(crate) fn execute(
432 self,
433 session: &Arc<CassandraSession>,
434 ) -> FallibleQueryTasks {
435 let mut query_handles: FallibleQueryTasks = Vec::new();
436
437 if !self.registrations.is_empty() {
438 let inner_session = session.clone();
439 query_handles.push(tokio::spawn(async move {
440 insert_rbac509::Params::execute_batch(&inner_session, self.registrations).await
441 }));
442 }
443
444 if !self.invalid.is_empty() {
445 let inner_session = session.clone();
446 query_handles.push(tokio::spawn(async move {
447 insert_rbac509_invalid::Params::execute_batch(&inner_session, self.invalid).await
448 }));
449 }
450
451 if !self.catalyst_id_for_txn_id.is_empty() {
452 let inner_session = session.clone();
453 query_handles.push(tokio::spawn(async move {
454 insert_catalyst_id_for_txn_id::Params::execute_batch(
455 &inner_session,
456 self.catalyst_id_for_txn_id,
457 )
458 .await
459 }));
460 }
461
462 if !self.catalyst_id_for_stake_address.is_empty() {
463 let inner_session = session.clone();
464 query_handles.push(tokio::spawn(async move {
465 insert_catalyst_id_for_stake_address::Params::execute_batch(
466 &inner_session,
467 self.catalyst_id_for_stake_address,
468 )
469 .await
470 }));
471 }
472
473 if !self.catalyst_id_for_public_key.is_empty() {
474 let inner_session = session.clone();
475 query_handles.push(tokio::spawn(async move {
476 insert_catalyst_id_for_public_key::Params::execute_batch(
477 &inner_session,
478 self.catalyst_id_for_public_key,
479 )
480 .await
481 }));
482 }
483
484 query_handles
485 }
486}
487
488async fn wait_for_previous_blocks(
492 pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
493 our_end: Slot,
494 current_slot: Slot,
495) -> anyhow::Result<()> {
496 loop {
497 if pending_blocks
498 .borrow_and_update()
499 .iter()
500 .filter(|&&v| v == our_end)
501 .all(|&slot| slot > current_slot)
502 {
503 return Ok(());
504 }
505
506 inc_index_sync();
507
508 pending_blocks
509 .changed()
510 .await
511 .context("Unprocessed blocks channel was closed unexpectedly")?;
512 }
513}