1pub(crate) mod insert_catalyst_id_for_public_key;
4pub(crate) mod insert_catalyst_id_for_stake_address;
5pub(crate) mod insert_catalyst_id_for_txn_id;
6pub(crate) mod insert_rbac509;
7pub(crate) mod insert_rbac509_invalid;
8
9use std::{
10 collections::{BTreeSet, HashMap, HashSet},
11 sync::Arc,
12};
13
14use anyhow::Context;
15use cardano_chain_follower::{MultiEraBlock, Slot, StakeAddress, TxnIndex, hashes::TransactionId};
16use catalyst_types::{catalyst_id::CatalystId, problem_report::ProblemReport, uuid::UuidV4};
17use ed25519_dalek::VerifyingKey;
18use futures::{StreamExt, TryStreamExt};
19use rbac_registration::{
20 cardano::{cip509::Cip509, provider::RbacChainsProvider as _},
21 registration::cardano::RegistrationChain,
22};
23use scylla::client::session::Session;
24use tokio::sync::watch;
25use tracing::{debug, error};
26
27use crate::{
28 db::index::{
29 queries::{FallibleQueryTasks, SizedBatch},
30 session::CassandraSession,
31 },
32 metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
33 rbac::{RbacBlockIndexingContext, cache_persistent_rbac_chain, provider::RbacChainsProvider},
34 settings::cassandra_db::EnvVars,
35};
36
37#[derive(Debug)]
39pub(crate) struct Rbac509InsertQuery {
40 pub(crate) registrations: Vec<insert_rbac509::Params>,
42 pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
44 catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
46 catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
48 catalyst_id_for_public_key: Vec<insert_catalyst_id_for_public_key::Params>,
50}
51
52impl Rbac509InsertQuery {
53 pub(crate) fn new() -> Self {
55 Rbac509InsertQuery {
56 registrations: Vec::new(),
57 invalid: Vec::new(),
58 catalyst_id_for_txn_id: Vec::new(),
59 catalyst_id_for_stake_address: Vec::new(),
60 catalyst_id_for_public_key: Vec::new(),
61 }
62 }
63
64 pub(crate) async fn prepare_batch(
66 session: &Arc<Session>,
67 cfg: &EnvVars,
68 ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
69 Ok((
70 insert_rbac509::Params::prepare_batch(session, cfg).await?,
71 insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
72 insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
73 insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
74 insert_catalyst_id_for_public_key::Params::prepare_batch(session, cfg).await?,
75 ))
76 }
77
78 pub(crate) async fn index(
80 &mut self,
81 txn_hash: TransactionId,
82 index: TxnIndex,
83 block: &MultiEraBlock,
84 pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
85 our_end: Slot,
86 context: &mut RbacBlockIndexingContext,
87 ) -> anyhow::Result<()> {
88 let slot = block.slot();
89 let cip509 = match Cip509::new(block, index, &[]) {
90 Ok(Some(v)) => v,
91 Ok(None) => {
92 return Ok(());
94 },
95 Err(e) => {
96 debug!(
100 slot = ?slot,
101 index = ?index,
102 err = ?e,
103 "Invalid RBAC Registration Metadata in transaction"
104 );
105 return Ok(());
106 },
107 };
108
109 if slot != cip509.origin().point().slot_or_default() {
111 error!(
112 "Cip509 slot mismatch: expected {slot:?}, got {:?}",
113 cip509.origin().point().slot_or_default()
114 );
115 }
116 if txn_hash != cip509.txn_hash() {
117 error!(
118 "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
119 cip509.txn_hash()
120 );
121 }
122
123 wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?;
126
127 if let Some(previous_txn) = cip509.previous_transaction() {
128 self.try_record_chain_update(&cip509, previous_txn, block.is_immutable(), context)
129 .await?;
130 } else {
131 self.try_record_new_chain(&cip509, block.is_immutable(), context)
132 .await?;
133 }
134
135 Ok(())
136 }
137
138 async fn try_record_chain_update(
141 &mut self,
142 reg: &Cip509,
143 previous_txn: TransactionId,
144 is_persistent: bool,
145 context: &mut RbacBlockIndexingContext,
146 ) -> anyhow::Result<()> {
147 let state = RbacChainsProvider::new(is_persistent, context);
149
150 let slot = reg.origin().point().slot_or_default();
151 let txn_index = reg.origin().txn_index();
152 let txn_hash = reg.txn_hash();
153
154 let Some(catalyst_id) = state.catalyst_id_from_txn_id(previous_txn).await? else {
155 debug!(
159 slot = ?slot,
160 txn_index = ?txn_index,
161 txn_hash = ?txn_hash,
162 "Unable to determine Catalyst id for registration"
163 );
164 return Ok(());
165 };
166
167 let chain = state.chain(&catalyst_id).await?.context(format!(
168 "{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'"
169 ))?;
170
171 let Some(new_chain) = chain.update(reg, &state).await? else {
173 self.record_invalid_registration(
174 txn_hash,
175 txn_index,
176 slot,
177 chain.catalyst_id().clone(),
178 Some(previous_txn),
179 reg.purpose(),
180 reg.report().clone(),
181 );
182
183 return Ok(());
184 };
185
186 let previous_addresses = chain.stake_addresses();
187 let stake_addresses: HashSet<_> = new_chain
188 .stake_addresses()
189 .difference(&previous_addresses)
190 .cloned()
191 .collect();
192 let public_keys = reg
193 .all_roles()
194 .iter()
195 .filter_map(|v| reg.signing_public_key_for_role(*v))
196 .collect::<HashSet<_>>();
197 let modified_chains = HashMap::new();
199 let purpose = reg.purpose();
200
201 if is_persistent {
202 cache_persistent_rbac_chain(catalyst_id.clone(), new_chain);
203 }
204
205 self.record_valid_registration(
206 txn_hash,
207 txn_index,
208 slot,
209 catalyst_id.clone(),
210 Some(previous_txn),
211 stake_addresses,
212 public_keys,
213 modified_chains,
214 purpose,
215 context,
216 );
217
218 Ok(())
219 }
220
221 async fn try_record_new_chain(
224 &mut self,
225 reg: &Cip509,
226 is_persistent: bool,
227 context: &mut RbacBlockIndexingContext,
228 ) -> anyhow::Result<()> {
229 const FUTURES_BUFFER_SIZE: usize = 10;
231
232 let provider = RbacChainsProvider::new(is_persistent, context);
233
234 let slot = reg.origin().point().slot_or_default();
235 let txn_index = reg.origin().txn_index();
236 let txn_hash = reg.txn_hash();
237
238 let Some(new_chain) = RegistrationChain::new(reg, &provider).await? else {
240 if let Some(cat_id) = reg.catalyst_id() {
241 self.record_invalid_registration(
242 txn_hash,
243 txn_index,
244 slot,
245 cat_id.clone(),
246 None,
247 reg.purpose(),
248 reg.report().clone(),
249 );
250 } else {
251 debug!(
255 slot = ?slot,
256 txn_index = ?txn_index,
257 txn_hash = ?txn_hash,
258 "Unable to determine Catalyst id for registration"
259 );
260 }
261
262 return Ok(());
263 };
264
265 let catalyst_id = new_chain.catalyst_id();
266 let stake_addresses = new_chain.stake_addresses();
267 let public_keys = reg
268 .all_roles()
269 .iter()
270 .filter_map(|v| reg.signing_public_key_for_role(*v))
271 .collect::<HashSet<_>>();
272 let purpose = reg.purpose();
273
274 let futures = reg.stake_addresses().clone().into_iter().map(|addr| {
275 async {
276 anyhow::Ok((
277 provider.chain_catalyst_id_from_stake_address(&addr).await?,
278 addr,
279 ))
280 }
281 });
282 let modified_chains = futures::stream::iter(futures)
283 .buffer_unordered(FUTURES_BUFFER_SIZE)
284 .try_fold(
285 HashMap::<CatalystId, HashSet<StakeAddress>>::new(),
286 |mut acc, (cat_id, addr)| {
287 async {
288 if let Some(cat_id) = cat_id {
289 acc.entry(cat_id).or_default().insert(addr);
290 }
291 anyhow::Ok(acc)
292 }
293 },
294 )
295 .await?;
296
297 self.record_valid_registration(
298 txn_hash,
299 txn_index,
300 slot,
301 catalyst_id.clone(),
302 None,
303 stake_addresses,
304 public_keys,
305 modified_chains,
306 purpose,
307 context,
308 );
309
310 Ok(())
311 }
312
313 #[allow(clippy::too_many_arguments)]
315 fn record_valid_registration(
316 &mut self,
317 txn_hash: TransactionId,
318 index: TxnIndex,
319 slot: Slot,
320 catalyst_id: CatalystId,
321 previous_transaction: Option<TransactionId>,
322 stake_addresses: HashSet<StakeAddress>,
323 public_keys: HashSet<VerifyingKey>,
324 modified_chains: HashMap<CatalystId, HashSet<StakeAddress>>,
325 purpose: Option<UuidV4>,
326 context: &mut RbacBlockIndexingContext,
327 ) {
328 context.insert_transaction(txn_hash, catalyst_id.clone());
329 context.insert_addresses(stake_addresses.clone(), &catalyst_id);
330 context.insert_public_keys(public_keys.clone(), &catalyst_id);
331 context.insert_registration(
332 catalyst_id.clone(),
333 txn_hash,
334 slot,
335 index,
336 previous_transaction,
337 );
338
339 self.catalyst_id_for_txn_id
341 .push(insert_catalyst_id_for_txn_id::Params::new(
342 catalyst_id.clone(),
343 txn_hash,
344 slot,
345 ));
346 for address in stake_addresses {
348 self.catalyst_id_for_stake_address.push(
349 insert_catalyst_id_for_stake_address::Params::new(
350 address,
351 slot,
352 index,
353 catalyst_id.clone(),
354 ),
355 );
356 }
357 for key in public_keys {
359 self.catalyst_id_for_public_key
360 .push(insert_catalyst_id_for_public_key::Params::new(
361 key,
362 slot,
363 catalyst_id.clone(),
364 ));
365 }
366 self.registrations.push(insert_rbac509::Params::new(
368 catalyst_id,
369 txn_hash,
370 slot,
371 index,
372 previous_transaction,
373 purpose,
374 ));
375
376 for (catalyst_id, _) in modified_chains {
378 self.registrations.push(insert_rbac509::Params::new(
379 catalyst_id.clone(),
380 txn_hash,
381 slot,
382 index,
383 None,
387 None,
388 ));
389 }
390 }
391
392 #[allow(clippy::too_many_arguments)]
394 fn record_invalid_registration(
395 &mut self,
396 txn_hash: TransactionId,
397 index: TxnIndex,
398 slot: Slot,
399 catalyst_id: CatalystId,
400 previous_transaction: Option<TransactionId>,
401 purpose: Option<UuidV4>,
402 report: ProblemReport,
403 ) {
404 inc_invalid_rbac_reg_count();
405 self.invalid.push(insert_rbac509_invalid::Params::new(
406 catalyst_id,
407 txn_hash,
408 slot,
409 index,
410 purpose,
411 previous_transaction,
412 report,
413 ));
414 }
415
416 pub(crate) fn execute(
420 self,
421 session: &Arc<CassandraSession>,
422 ) -> FallibleQueryTasks {
423 let mut query_handles: FallibleQueryTasks = Vec::new();
424
425 if !self.registrations.is_empty() {
426 let inner_session = session.clone();
427 query_handles.push(tokio::spawn(async move {
428 insert_rbac509::Params::execute_batch(&inner_session, self.registrations).await
429 }));
430 }
431
432 if !self.invalid.is_empty() {
433 let inner_session = session.clone();
434 query_handles.push(tokio::spawn(async move {
435 insert_rbac509_invalid::Params::execute_batch(&inner_session, self.invalid).await
436 }));
437 }
438
439 if !self.catalyst_id_for_txn_id.is_empty() {
440 let inner_session = session.clone();
441 query_handles.push(tokio::spawn(async move {
442 insert_catalyst_id_for_txn_id::Params::execute_batch(
443 &inner_session,
444 self.catalyst_id_for_txn_id,
445 )
446 .await
447 }));
448 }
449
450 if !self.catalyst_id_for_stake_address.is_empty() {
451 let inner_session = session.clone();
452 query_handles.push(tokio::spawn(async move {
453 insert_catalyst_id_for_stake_address::Params::execute_batch(
454 &inner_session,
455 self.catalyst_id_for_stake_address,
456 )
457 .await
458 }));
459 }
460
461 if !self.catalyst_id_for_public_key.is_empty() {
462 let inner_session = session.clone();
463 query_handles.push(tokio::spawn(async move {
464 insert_catalyst_id_for_public_key::Params::execute_batch(
465 &inner_session,
466 self.catalyst_id_for_public_key,
467 )
468 .await
469 }));
470 }
471
472 query_handles
473 }
474}
475
476async fn wait_for_previous_blocks(
480 pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
481 our_end: Slot,
482 current_slot: Slot,
483) -> anyhow::Result<()> {
484 loop {
485 if pending_blocks
486 .borrow_and_update()
487 .iter()
488 .filter(|&&v| v == our_end)
489 .all(|&slot| slot > current_slot)
490 {
491 return Ok(());
492 }
493
494 inc_index_sync();
495
496 pending_blocks
497 .changed()
498 .await
499 .context("Unprocessed blocks channel was closed unexpectedly")?;
500 }
501}