1pub(crate) mod insert_catalyst_id_for_public_key;
4pub(crate) mod insert_catalyst_id_for_stake_address;
5pub(crate) mod insert_catalyst_id_for_txn_id;
6pub(crate) mod insert_rbac509;
7pub(crate) mod insert_rbac509_invalid;
8
9use std::{
10 collections::{BTreeSet, HashMap, HashSet},
11 sync::Arc,
12};
13
14use anyhow::Context;
15use cardano_chain_follower::{MultiEraBlock, Slot, StakeAddress, TxnIndex, hashes::TransactionId};
16use catalyst_types::{catalyst_id::CatalystId, problem_report::ProblemReport, uuid::UuidV4};
17use ed25519_dalek::VerifyingKey;
18use rbac_registration::{
19 cardano::{cip509::Cip509, state::RbacChainsState as _},
20 registration::cardano::RegistrationChain,
21};
22use scylla::client::session::Session;
23use tokio::sync::watch;
24use tracing::{debug, error};
25
26use crate::{
27 db::index::{
28 queries::{FallibleQueryTasks, SizedBatch},
29 session::CassandraSession,
30 },
31 metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
32 rbac::{RbacBlockIndexingContext, cache_persistent_rbac_chain, state::RbacChainsState},
33 settings::cassandra_db::EnvVars,
34};
35
36#[derive(Debug)]
38pub(crate) struct Rbac509InsertQuery {
39 pub(crate) registrations: Vec<insert_rbac509::Params>,
41 pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
43 catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
45 catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
47 catalyst_id_for_public_key: Vec<insert_catalyst_id_for_public_key::Params>,
49}
50
51impl Rbac509InsertQuery {
52 pub(crate) fn new() -> Self {
54 Rbac509InsertQuery {
55 registrations: Vec::new(),
56 invalid: Vec::new(),
57 catalyst_id_for_txn_id: Vec::new(),
58 catalyst_id_for_stake_address: Vec::new(),
59 catalyst_id_for_public_key: Vec::new(),
60 }
61 }
62
63 pub(crate) async fn prepare_batch(
65 session: &Arc<Session>,
66 cfg: &EnvVars,
67 ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
68 Ok((
69 insert_rbac509::Params::prepare_batch(session, cfg).await?,
70 insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
71 insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
72 insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
73 insert_catalyst_id_for_public_key::Params::prepare_batch(session, cfg).await?,
74 ))
75 }
76
77 pub(crate) async fn index(
79 &mut self,
80 txn_hash: TransactionId,
81 index: TxnIndex,
82 block: &MultiEraBlock,
83 pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
84 our_end: Slot,
85 context: &mut RbacBlockIndexingContext,
86 ) -> anyhow::Result<()> {
87 let slot = block.slot();
88 let cip509 = match Cip509::new(block, index, &[]) {
89 Ok(Some(v)) => v,
90 Ok(None) => {
91 return Ok(());
93 },
94 Err(e) => {
95 debug!(
99 slot = ?slot,
100 index = ?index,
101 err = ?e,
102 "Invalid RBAC Registration Metadata in transaction"
103 );
104 return Ok(());
105 },
106 };
107
108 if slot != cip509.origin().point().slot_or_default() {
110 error!(
111 "Cip509 slot mismatch: expected {slot:?}, got {:?}",
112 cip509.origin().point().slot_or_default()
113 );
114 }
115 if txn_hash != cip509.txn_hash() {
116 error!(
117 "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
118 cip509.txn_hash()
119 );
120 }
121
122 wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?;
125
126 if let Some(previous_txn) = cip509.previous_transaction() {
127 self.try_record_chain_update(&cip509, previous_txn, block.is_immutable(), context)
128 .await?;
129 } else {
130 self.try_record_new_chain(&cip509, block.is_immutable(), context)
131 .await?;
132 }
133
134 Ok(())
135 }
136
137 async fn try_record_chain_update(
140 &mut self,
141 reg: &Cip509,
142 previous_txn: TransactionId,
143 is_persistent: bool,
144 context: &mut RbacBlockIndexingContext,
145 ) -> anyhow::Result<()> {
146 let state = RbacChainsState::new(is_persistent, context);
148
149 let slot = reg.origin().point().slot_or_default();
150 let txn_index = reg.origin().txn_index();
151 let txn_hash = reg.txn_hash();
152
153 let Some(catalyst_id) = state.catalyst_id_from_txn_id(previous_txn).await? else {
154 debug!(
158 slot = ?slot,
159 txn_index = ?txn_index,
160 txn_hash = ?txn_hash,
161 "Unable to determine Catalyst id for registration"
162 );
163 return Ok(());
164 };
165
166 let chain = state.chain(&catalyst_id).await?.context(format!(
167 "{catalyst_id} is present in 'catalyst_id_for_txn_id' table, but not in 'rbac_registration'"
168 ))?;
169
170 let Some(new_chain) = chain.update(reg, &state).await? else {
172 self.record_invalid_registration(
173 txn_hash,
174 txn_index,
175 slot,
176 chain.catalyst_id().clone(),
177 Some(previous_txn),
178 reg.purpose(),
179 reg.report().clone(),
180 );
181
182 return Ok(());
183 };
184
185 let previous_addresses = chain.stake_addresses();
186 let stake_addresses: HashSet<_> = new_chain
187 .stake_addresses()
188 .difference(&previous_addresses)
189 .cloned()
190 .collect();
191 let public_keys = reg
192 .all_roles()
193 .iter()
194 .filter_map(|v| reg.signing_public_key_for_role(*v))
195 .collect::<HashSet<_>>();
196 let modified_chains = state.consume();
197 let purpose = reg.purpose();
198
199 if is_persistent {
200 cache_persistent_rbac_chain(catalyst_id.clone(), new_chain);
201 }
202
203 self.record_valid_registration(
204 txn_hash,
205 txn_index,
206 slot,
207 catalyst_id.clone(),
208 Some(previous_txn),
209 stake_addresses,
210 public_keys,
211 modified_chains,
212 purpose,
213 context,
214 );
215
216 Ok(())
217 }
218
219 async fn try_record_new_chain(
222 &mut self,
223 reg: &Cip509,
224 is_persistent: bool,
225 context: &mut RbacBlockIndexingContext,
226 ) -> anyhow::Result<()> {
227 let mut state = RbacChainsState::new(is_persistent, context);
228
229 let slot = reg.origin().point().slot_or_default();
230 let txn_index = reg.origin().txn_index();
231 let txn_hash = reg.txn_hash();
232
233 let Some(new_chain) = RegistrationChain::new(reg, &mut state).await? else {
235 if let Some(cat_id) = reg.catalyst_id() {
236 self.record_invalid_registration(
237 txn_hash,
238 txn_index,
239 slot,
240 cat_id.clone(),
241 None,
242 reg.purpose(),
243 reg.report().clone(),
244 );
245 } else {
246 debug!(
250 slot = ?slot,
251 txn_index = ?txn_index,
252 txn_hash = ?txn_hash,
253 "Unable to determine Catalyst id for registration"
254 );
255 }
256
257 return Ok(());
258 };
259
260 let catalyst_id = new_chain.catalyst_id();
261 let stake_addresses = new_chain.stake_addresses();
262 let public_keys = reg
263 .all_roles()
264 .iter()
265 .filter_map(|v| reg.signing_public_key_for_role(*v))
266 .collect::<HashSet<_>>();
267 let modified_chains = state.consume();
268 let purpose = reg.purpose();
269
270 self.record_valid_registration(
271 txn_hash,
272 txn_index,
273 slot,
274 catalyst_id.clone(),
275 None,
276 stake_addresses,
277 public_keys,
278 modified_chains,
279 purpose,
280 context,
281 );
282
283 Ok(())
284 }
285
286 #[allow(clippy::too_many_arguments)]
288 fn record_valid_registration(
289 &mut self,
290 txn_hash: TransactionId,
291 index: TxnIndex,
292 slot: Slot,
293 catalyst_id: CatalystId,
294 previous_transaction: Option<TransactionId>,
295 stake_addresses: HashSet<StakeAddress>,
296 public_keys: HashSet<VerifyingKey>,
297 modified_chains: HashMap<CatalystId, HashSet<StakeAddress>>,
298 purpose: Option<UuidV4>,
299 context: &mut RbacBlockIndexingContext,
300 ) {
301 context.insert_transaction(txn_hash, catalyst_id.clone());
302 context.insert_addresses(stake_addresses.clone(), &catalyst_id);
303 context.insert_public_keys(public_keys.clone(), &catalyst_id);
304 context.insert_registration(
305 catalyst_id.clone(),
306 txn_hash,
307 slot,
308 index,
309 previous_transaction,
310 );
311
312 self.catalyst_id_for_txn_id
314 .push(insert_catalyst_id_for_txn_id::Params::new(
315 catalyst_id.clone(),
316 txn_hash,
317 slot,
318 ));
319 for address in stake_addresses {
321 self.catalyst_id_for_stake_address.push(
322 insert_catalyst_id_for_stake_address::Params::new(
323 address,
324 slot,
325 index,
326 catalyst_id.clone(),
327 ),
328 );
329 }
330 for key in public_keys {
332 self.catalyst_id_for_public_key
333 .push(insert_catalyst_id_for_public_key::Params::new(
334 key,
335 slot,
336 catalyst_id.clone(),
337 ));
338 }
339 self.registrations.push(insert_rbac509::Params::new(
341 catalyst_id,
342 txn_hash,
343 slot,
344 index,
345 previous_transaction,
346 purpose,
347 ));
348
349 for (catalyst_id, _) in modified_chains {
351 self.registrations.push(insert_rbac509::Params::new(
352 catalyst_id.clone(),
353 txn_hash,
354 slot,
355 index,
356 None,
360 None,
361 ));
362 }
363 }
364
365 #[allow(clippy::too_many_arguments)]
367 fn record_invalid_registration(
368 &mut self,
369 txn_hash: TransactionId,
370 index: TxnIndex,
371 slot: Slot,
372 catalyst_id: CatalystId,
373 previous_transaction: Option<TransactionId>,
374 purpose: Option<UuidV4>,
375 report: ProblemReport,
376 ) {
377 inc_invalid_rbac_reg_count();
378 self.invalid.push(insert_rbac509_invalid::Params::new(
379 catalyst_id,
380 txn_hash,
381 slot,
382 index,
383 purpose,
384 previous_transaction,
385 report,
386 ));
387 }
388
389 pub(crate) fn execute(
393 self,
394 session: &Arc<CassandraSession>,
395 ) -> FallibleQueryTasks {
396 let mut query_handles: FallibleQueryTasks = Vec::new();
397
398 if !self.registrations.is_empty() {
399 let inner_session = session.clone();
400 query_handles.push(tokio::spawn(async move {
401 insert_rbac509::Params::execute_batch(&inner_session, self.registrations).await
402 }));
403 }
404
405 if !self.invalid.is_empty() {
406 let inner_session = session.clone();
407 query_handles.push(tokio::spawn(async move {
408 insert_rbac509_invalid::Params::execute_batch(&inner_session, self.invalid).await
409 }));
410 }
411
412 if !self.catalyst_id_for_txn_id.is_empty() {
413 let inner_session = session.clone();
414 query_handles.push(tokio::spawn(async move {
415 insert_catalyst_id_for_txn_id::Params::execute_batch(
416 &inner_session,
417 self.catalyst_id_for_txn_id,
418 )
419 .await
420 }));
421 }
422
423 if !self.catalyst_id_for_stake_address.is_empty() {
424 let inner_session = session.clone();
425 query_handles.push(tokio::spawn(async move {
426 insert_catalyst_id_for_stake_address::Params::execute_batch(
427 &inner_session,
428 self.catalyst_id_for_stake_address,
429 )
430 .await
431 }));
432 }
433
434 if !self.catalyst_id_for_public_key.is_empty() {
435 let inner_session = session.clone();
436 query_handles.push(tokio::spawn(async move {
437 insert_catalyst_id_for_public_key::Params::execute_batch(
438 &inner_session,
439 self.catalyst_id_for_public_key,
440 )
441 .await
442 }));
443 }
444
445 query_handles
446 }
447}
448
449async fn wait_for_previous_blocks(
453 pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
454 our_end: Slot,
455 current_slot: Slot,
456) -> anyhow::Result<()> {
457 loop {
458 if pending_blocks
459 .borrow_and_update()
460 .iter()
461 .filter(|&&v| v == our_end)
462 .all(|&slot| slot > current_slot)
463 {
464 return Ok(());
465 }
466
467 inc_index_sync();
468
469 pending_blocks
470 .changed()
471 .await
472 .context("Unprocessed blocks channel was closed unexpectedly")?;
473 }
474}