cat_gateway/db/index/block/rbac509/
mod.rs1pub(crate) mod insert_catalyst_id_for_public_key;
4pub(crate) mod insert_catalyst_id_for_stake_address;
5pub(crate) mod insert_catalyst_id_for_txn_id;
6pub(crate) mod insert_rbac509;
7pub(crate) mod insert_rbac509_invalid;
8
9use std::{
10 collections::{BTreeSet, HashSet},
11 sync::Arc,
12};
13
14use anyhow::{Context, Result};
15use cardano_chain_follower::{hashes::TransactionId, MultiEraBlock, Slot, TxnIndex};
16use rbac_registration::cardano::cip509::Cip509;
17use scylla::client::session::Session;
18use tokio::sync::watch;
19use tracing::{debug, error};
20
21use crate::{
22 db::index::{
23 queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
24 session::CassandraSession,
25 },
26 metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
27 rbac::{
28 validate_rbac_registration, RbacBlockIndexingContext, RbacValidationError,
29 RbacValidationSuccess,
30 },
31 settings::cassandra_db::EnvVars,
32};
33
34#[derive(Debug)]
36pub(crate) struct Rbac509InsertQuery {
37 pub(crate) registrations: Vec<insert_rbac509::Params>,
39 pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
41 catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
43 catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
45 catalyst_id_for_public_key: Vec<insert_catalyst_id_for_public_key::Params>,
47}
48
49impl Rbac509InsertQuery {
50 pub(crate) fn new() -> Self {
52 Rbac509InsertQuery {
53 registrations: Vec::new(),
54 invalid: Vec::new(),
55 catalyst_id_for_txn_id: Vec::new(),
56 catalyst_id_for_stake_address: Vec::new(),
57 catalyst_id_for_public_key: Vec::new(),
58 }
59 }
60
61 pub(crate) async fn prepare_batch(
63 session: &Arc<Session>,
64 cfg: &EnvVars,
65 ) -> Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
66 Ok((
67 insert_rbac509::Params::prepare_batch(session, cfg).await?,
68 insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
69 insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
70 insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
71 insert_catalyst_id_for_public_key::Params::prepare_batch(session, cfg).await?,
72 ))
73 }
74
75 #[allow(clippy::too_many_lines)]
77 pub(crate) async fn index(
78 &mut self,
79 txn_hash: TransactionId,
80 index: TxnIndex,
81 block: &MultiEraBlock,
82 pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
83 our_end: Slot,
84 context: &mut RbacBlockIndexingContext,
85 ) -> Result<()> {
86 let slot = block.slot();
87 let cip509 = match Cip509::new(block, index, &[]) {
88 Ok(Some(v)) => v,
89 Ok(None) => {
90 return Ok(());
92 },
93 Err(e) => {
94 debug!(
98 slot = ?slot,
99 index = ?index,
100 err = ?e,
101 "Invalid RBAC Registration Metadata in transaction"
102 );
103 return Ok(());
104 },
105 };
106
107 if slot != cip509.origin().point().slot_or_default() {
109 error!(
110 "Cip509 slot mismatch: expected {slot:?}, got {:?}",
111 cip509.origin().point().slot_or_default()
112 );
113 }
114 if txn_hash != cip509.txn_hash() {
115 error!(
116 "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
117 cip509.txn_hash()
118 );
119 }
120
121 wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?;
124
125 let previous_transaction = cip509.previous_transaction();
126 match Box::pin(validate_rbac_registration(
128 cip509,
129 block.is_immutable(),
130 context,
131 ))
132 .await
133 {
134 Ok(RbacValidationSuccess {
138 catalyst_id,
139 stake_addresses,
140 public_keys,
141 modified_chains,
142 purpose,
143 }) => {
144 self.catalyst_id_for_txn_id
146 .push(insert_catalyst_id_for_txn_id::Params::new(
147 catalyst_id.clone(),
148 txn_hash,
149 slot,
150 ));
151 for address in stake_addresses {
153 self.catalyst_id_for_stake_address.push(
154 insert_catalyst_id_for_stake_address::Params::new(
155 address,
156 slot,
157 index,
158 catalyst_id.clone(),
159 ),
160 );
161 }
162 for key in public_keys {
164 self.catalyst_id_for_public_key.push(
165 insert_catalyst_id_for_public_key::Params::new(
166 key,
167 slot,
168 catalyst_id.clone(),
169 ),
170 );
171 }
172 self.registrations.push(insert_rbac509::Params::new(
174 catalyst_id,
175 txn_hash,
176 slot,
177 index,
178 previous_transaction,
179 HashSet::new(),
182 purpose,
183 ));
184
185 for (catalyst_id, removed_addresses) in modified_chains {
187 self.registrations.push(insert_rbac509::Params::new(
188 catalyst_id.clone(),
189 txn_hash,
190 slot,
191 index,
192 None,
196 removed_addresses,
197 None,
198 ));
199 }
200 },
201 Err(RbacValidationError::InvalidRegistration {
203 catalyst_id,
204 purpose,
205 report,
206 }) => {
207 inc_invalid_rbac_reg_count();
208 self.invalid.push(insert_rbac509_invalid::Params::new(
209 catalyst_id,
210 txn_hash,
211 slot,
212 index,
213 purpose,
214 previous_transaction,
215 &report,
216 ));
217 },
218 Err(RbacValidationError::UnknownCatalystId) => {
222 debug!(
223 slot = ?slot,
224 index = ?index,
225 txn_hash = ?txn_hash,
226 "Unable to determine Catalyst id for registration"
227 );
228 },
229 Err(RbacValidationError::Fatal(e)) => {
230 error!(
231 slot = ?slot,
232 index = ?index,
233 txn_hash = ?txn_hash,
234 err = ?e,
235 "Error indexing RBAC registration"
236 );
237 return Err(e);
239 },
240 }
241
242 Ok(())
243 }
244
245 pub(crate) fn execute(
249 self,
250 session: &Arc<CassandraSession>,
251 ) -> FallibleQueryTasks {
252 let mut query_handles: FallibleQueryTasks = Vec::new();
253
254 if !self.registrations.is_empty() {
255 let inner_session = session.clone();
256 query_handles.push(tokio::spawn(async move {
257 inner_session
258 .execute_batch(PreparedQuery::Rbac509InsertQuery, self.registrations)
259 .await
260 }));
261 }
262
263 if !self.invalid.is_empty() {
264 let inner_session = session.clone();
265 query_handles.push(tokio::spawn(async move {
266 inner_session
267 .execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, self.invalid)
268 .await
269 }));
270 }
271
272 if !self.catalyst_id_for_txn_id.is_empty() {
273 let inner_session = session.clone();
274 query_handles.push(tokio::spawn(async move {
275 inner_session
276 .execute_batch(
277 PreparedQuery::CatalystIdForTxnIdInsertQuery,
278 self.catalyst_id_for_txn_id,
279 )
280 .await
281 }));
282 }
283
284 if !self.catalyst_id_for_stake_address.is_empty() {
285 let inner_session = session.clone();
286 query_handles.push(tokio::spawn(async move {
287 inner_session
288 .execute_batch(
289 PreparedQuery::CatalystIdForStakeAddressInsertQuery,
290 self.catalyst_id_for_stake_address,
291 )
292 .await
293 }));
294 }
295
296 if !self.catalyst_id_for_public_key.is_empty() {
297 let inner_session = session.clone();
298 query_handles.push(tokio::spawn(async move {
299 inner_session
300 .execute_batch(
301 PreparedQuery::CatalystIdForPublicKeyInsertQuery,
302 self.catalyst_id_for_public_key,
303 )
304 .await
305 }));
306 }
307
308 query_handles
309 }
310}
311
312async fn wait_for_previous_blocks(
316 pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>,
317 our_end: Slot,
318 current_slot: Slot,
319) -> Result<()> {
320 loop {
321 if pending_blocks
322 .borrow_and_update()
323 .iter()
324 .filter(|&&v| v == our_end)
325 .all(|&slot| slot > current_slot)
326 {
327 return Ok(());
328 }
329
330 inc_index_sync();
331
332 pending_blocks
333 .changed()
334 .await
335 .context("Unprocessed blocks channel was closed unexpectedly")?;
336 }
337}