cat_gateway/db/index/block/rbac509/
mod.rs1pub(crate) mod insert_catalyst_id_for_public_key;
4pub(crate) mod insert_catalyst_id_for_stake_address;
5pub(crate) mod insert_catalyst_id_for_txn_id;
6pub(crate) mod insert_rbac509;
7pub(crate) mod insert_rbac509_invalid;
8
9use std::{
10 collections::{BTreeSet, HashSet},
11 sync::Arc,
12};
13
14use anyhow::{Context, Result};
15use cardano_chain_follower::{hashes::TransactionId, MultiEraBlock, Slot, TxnIndex};
16use rbac_registration::cardano::cip509::Cip509;
17use scylla::client::session::Session;
18use tokio::sync::watch;
19use tracing::{debug, error};
20
21use crate::{
22 db::index::{
23 queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
24 session::CassandraSession,
25 },
26 metrics::caches::rbac::{inc_index_sync, inc_invalid_rbac_reg_count},
27 rbac::{
28 validate_rbac_registration, RbacBlockIndexingContext, RbacValidationError,
29 RbacValidationSuccess,
30 },
31 settings::cassandra_db::EnvVars,
32};
33
34#[derive(Debug)]
36pub(crate) struct Rbac509InsertQuery {
37 pub(crate) registrations: Vec<insert_rbac509::Params>,
39 pub(crate) invalid: Vec<insert_rbac509_invalid::Params>,
41 catalyst_id_for_txn_id: Vec<insert_catalyst_id_for_txn_id::Params>,
43 catalyst_id_for_stake_address: Vec<insert_catalyst_id_for_stake_address::Params>,
45 catalyst_id_for_public_key: Vec<insert_catalyst_id_for_public_key::Params>,
47}
48
49impl Rbac509InsertQuery {
50 pub(crate) fn new() -> Self {
52 Rbac509InsertQuery {
53 registrations: Vec::new(),
54 invalid: Vec::new(),
55 catalyst_id_for_txn_id: Vec::new(),
56 catalyst_id_for_stake_address: Vec::new(),
57 catalyst_id_for_public_key: Vec::new(),
58 }
59 }
60
61 pub(crate) async fn prepare_batch(
63 session: &Arc<Session>, cfg: &EnvVars,
64 ) -> Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
65 Ok((
66 insert_rbac509::Params::prepare_batch(session, cfg).await?,
67 insert_rbac509_invalid::Params::prepare_batch(session, cfg).await?,
68 insert_catalyst_id_for_txn_id::Params::prepare_batch(session, cfg).await?,
69 insert_catalyst_id_for_stake_address::Params::prepare_batch(session, cfg).await?,
70 insert_catalyst_id_for_public_key::Params::prepare_batch(session, cfg).await?,
71 ))
72 }
73
74 #[allow(clippy::too_many_lines)]
76 pub(crate) async fn index(
77 &mut self, txn_hash: TransactionId, index: TxnIndex, block: &MultiEraBlock,
78 pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>, our_end: Slot,
79 context: &mut RbacBlockIndexingContext,
80 ) -> Result<()> {
81 let slot = block.slot();
82 let cip509 = match Cip509::new(block, index, &[]) {
83 Ok(Some(v)) => v,
84 Ok(None) => {
85 return Ok(());
87 },
88 Err(e) => {
89 debug!(
93 slot = ?slot,
94 index = ?index,
95 err = ?e,
96 "Invalid RBAC Registration Metadata in transaction"
97 );
98 return Ok(());
99 },
100 };
101
102 if slot != cip509.origin().point().slot_or_default() {
104 error!(
105 "Cip509 slot mismatch: expected {slot:?}, got {:?}",
106 cip509.origin().point().slot_or_default()
107 );
108 }
109 if txn_hash != cip509.txn_hash() {
110 error!(
111 "Cip509 txn hash mismatch: expected {txn_hash}, got {}",
112 cip509.txn_hash()
113 );
114 }
115
116 wait_for_previous_blocks(pending_blocks, our_end, block.slot()).await?;
119
120 let previous_transaction = cip509.previous_transaction();
121 match Box::pin(validate_rbac_registration(
123 cip509,
124 block.is_immutable(),
125 context,
126 ))
127 .await
128 {
129 Ok(RbacValidationSuccess {
133 catalyst_id,
134 stake_addresses,
135 public_keys,
136 modified_chains,
137 purpose,
138 }) => {
139 self.catalyst_id_for_txn_id
141 .push(insert_catalyst_id_for_txn_id::Params::new(
142 catalyst_id.clone(),
143 txn_hash,
144 slot,
145 ));
146 for address in stake_addresses {
148 self.catalyst_id_for_stake_address.push(
149 insert_catalyst_id_for_stake_address::Params::new(
150 address,
151 slot,
152 index,
153 catalyst_id.clone(),
154 ),
155 );
156 }
157 for key in public_keys {
159 self.catalyst_id_for_public_key.push(
160 insert_catalyst_id_for_public_key::Params::new(
161 key,
162 slot,
163 catalyst_id.clone(),
164 ),
165 );
166 }
167 self.registrations.push(insert_rbac509::Params::new(
169 catalyst_id,
170 txn_hash,
171 slot,
172 index,
173 previous_transaction,
174 HashSet::new(),
177 purpose,
178 ));
179
180 for (catalyst_id, removed_addresses) in modified_chains {
182 self.registrations.push(insert_rbac509::Params::new(
183 catalyst_id.clone(),
184 txn_hash,
185 slot,
186 index,
187 None,
191 removed_addresses,
192 None,
193 ));
194 }
195 },
196 Err(RbacValidationError::InvalidRegistration {
198 catalyst_id,
199 purpose,
200 report,
201 }) => {
202 inc_invalid_rbac_reg_count();
203 self.invalid.push(insert_rbac509_invalid::Params::new(
204 catalyst_id,
205 txn_hash,
206 slot,
207 index,
208 purpose,
209 previous_transaction,
210 &report,
211 ));
212 },
213 Err(RbacValidationError::UnknownCatalystId) => {
217 debug!(
218 slot = ?slot,
219 index = ?index,
220 txn_hash = ?txn_hash,
221 "Unable to determine Catalyst id for registration"
222 );
223 },
224 Err(RbacValidationError::Fatal(e)) => {
225 error!(
226 slot = ?slot,
227 index = ?index,
228 txn_hash = ?txn_hash,
229 err = ?e,
230 "Error indexing RBAC registration"
231 );
232 return Err(e);
234 },
235 }
236
237 Ok(())
238 }
239
240 pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
244 let mut query_handles: FallibleQueryTasks = Vec::new();
245
246 if !self.registrations.is_empty() {
247 let inner_session = session.clone();
248 query_handles.push(tokio::spawn(async move {
249 inner_session
250 .execute_batch(PreparedQuery::Rbac509InsertQuery, self.registrations)
251 .await
252 }));
253 }
254
255 if !self.invalid.is_empty() {
256 let inner_session = session.clone();
257 query_handles.push(tokio::spawn(async move {
258 inner_session
259 .execute_batch(PreparedQuery::Rbac509InvalidInsertQuery, self.invalid)
260 .await
261 }));
262 }
263
264 if !self.catalyst_id_for_txn_id.is_empty() {
265 let inner_session = session.clone();
266 query_handles.push(tokio::spawn(async move {
267 inner_session
268 .execute_batch(
269 PreparedQuery::CatalystIdForTxnIdInsertQuery,
270 self.catalyst_id_for_txn_id,
271 )
272 .await
273 }));
274 }
275
276 if !self.catalyst_id_for_stake_address.is_empty() {
277 let inner_session = session.clone();
278 query_handles.push(tokio::spawn(async move {
279 inner_session
280 .execute_batch(
281 PreparedQuery::CatalystIdForStakeAddressInsertQuery,
282 self.catalyst_id_for_stake_address,
283 )
284 .await
285 }));
286 }
287
288 if !self.catalyst_id_for_public_key.is_empty() {
289 let inner_session = session.clone();
290 query_handles.push(tokio::spawn(async move {
291 inner_session
292 .execute_batch(
293 PreparedQuery::CatalystIdForPublicKeyInsertQuery,
294 self.catalyst_id_for_public_key,
295 )
296 .await
297 }));
298 }
299
300 query_handles
301 }
302}
303
304async fn wait_for_previous_blocks(
308 pending_blocks: &mut watch::Receiver<BTreeSet<Slot>>, our_end: Slot, current_slot: Slot,
309) -> Result<()> {
310 loop {
311 if pending_blocks
312 .borrow_and_update()
313 .iter()
314 .filter(|&&v| v == our_end)
315 .all(|&slot| slot > current_slot)
316 {
317 return Ok(());
318 }
319
320 inc_index_sync();
321
322 pending_blocks
323 .changed()
324 .await
325 .context("Unprocessed blocks channel was closed unexpectedly")?;
326 }
327}