cat_gateway/db/index/block/
certs.rs

1//! Index certs found in a transaction.
2
3use std::{fmt::Debug, sync::Arc};
4
5use cardano_chain_follower::{
6    pallas_primitives::{alonzo, conway},
7    pallas_traverse::{MultiEraCert, MultiEraTx},
8    MultiEraBlock, Slot, StakeAddress, TxnIndex, VKeyHash,
9};
10use ed25519_dalek::VerifyingKey;
11use scylla::{client::session::Session, value::MaybeUnset, SerializeRow};
12use tracing::error;
13
14use crate::{
15    db::{
16        index::{
17            queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
18            session::CassandraSession,
19        },
20        types::{DbPublicKey, DbSlot, DbStakeAddress, DbTxnIndex},
21    },
22    settings::cassandra_db,
23};
24
25/// Insert stake registration query
26#[derive(SerializeRow)]
27pub(crate) struct StakeRegistrationInsertQuery {
28    /// Stake address (29 bytes).
29    stake_address: DbStakeAddress,
30    /// Slot Number the cert is in.
31    slot_no: DbSlot,
32    /// Transaction Index.
33    txn_index: DbTxnIndex,
34    /// Full Stake Public Key (32 byte Ed25519 Public key, not hashed).
35    stake_public_key: DbPublicKey,
36    /// Is the stake address a script or not.
37    script: bool,
38    /// Is the Cardano Certificate Registered
39    register: MaybeUnset<bool>,
40    /// Is the Cardano Certificate Deregistered
41    deregister: MaybeUnset<bool>,
42    /// Is the stake address contains CIP36 registration?
43    cip36: MaybeUnset<bool>,
44    /// Pool Delegation Address
45    pool_delegation: MaybeUnset<Vec<u8>>,
46}
47
48impl Debug for StakeRegistrationInsertQuery {
49    fn fmt(
50        &self,
51        f: &mut std::fmt::Formatter<'_>,
52    ) -> Result<(), std::fmt::Error> {
53        let stake_public_key = hex::encode(self.stake_public_key.as_ref());
54        let register = match self.register {
55            MaybeUnset::Unset => "UNSET",
56            MaybeUnset::Set(v) => &format!("{v:?}"),
57        };
58        let deregister = match self.deregister {
59            MaybeUnset::Unset => "UNSET",
60            MaybeUnset::Set(v) => &format!("{v:?}"),
61        };
62        let cip36 = match self.cip36 {
63            MaybeUnset::Unset => "UNSET",
64            MaybeUnset::Set(v) => &format!("{v:?}"),
65        };
66        let pool_delegation = match self.pool_delegation {
67            MaybeUnset::Unset => "UNSET",
68            MaybeUnset::Set(ref v) => &hex::encode(v),
69        };
70
71        f.debug_struct("StakeRegistrationInsertQuery")
72            .field("stake_address", &format!("{}", self.stake_address))
73            .field("slot_no", &self.slot_no)
74            .field("txn_index", &self.txn_index)
75            .field("stake_public_key", &stake_public_key)
76            .field("script", &self.script)
77            .field("register", &register)
78            .field("deregister", &deregister)
79            .field("cip36", &cip36)
80            .field("pool_delegation", &pool_delegation)
81            .finish()
82    }
83}
84
85/// Insert stake registration
86const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");
87
88impl StakeRegistrationInsertQuery {
89    /// Create a new Insert Query.
90    #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
91    pub fn new(
92        stake_address: StakeAddress,
93        slot_no: Slot,
94        txn_index: TxnIndex,
95        stake_public_key: VerifyingKey,
96        script: bool,
97        register: bool,
98        deregister: bool,
99        cip36: bool,
100        pool_delegation: Option<Vec<u8>>,
101    ) -> Self {
102        StakeRegistrationInsertQuery {
103            stake_address: stake_address.into(),
104            slot_no: slot_no.into(),
105            txn_index: txn_index.into(),
106            stake_public_key: stake_public_key.into(),
107            script,
108            register: if register {
109                MaybeUnset::Set(true)
110            } else {
111                MaybeUnset::Unset
112            },
113            deregister: if deregister {
114                MaybeUnset::Set(true)
115            } else {
116                MaybeUnset::Unset
117            },
118            cip36: if cip36 {
119                MaybeUnset::Set(true)
120            } else {
121                MaybeUnset::Unset
122            },
123            pool_delegation: if let Some(pool_delegation) = pool_delegation {
124                MaybeUnset::Set(pool_delegation)
125            } else {
126                MaybeUnset::Unset
127            },
128        }
129    }
130
131    /// Prepare Batch of Insert stake registration.
132    pub(crate) async fn prepare_batch(
133        session: &Arc<Session>,
134        cfg: &cassandra_db::EnvVars,
135    ) -> anyhow::Result<SizedBatch> {
136        PreparedQueries::prepare_batch(
137            session.clone(),
138            INSERT_STAKE_REGISTRATION_QUERY,
139            cfg,
140            scylla::statement::Consistency::Any,
141            true,
142            false,
143        )
144        .await
145        .inspect_err(
146            |error| error!(error=%error,"Failed to prepare Insert Stake Registration Query."),
147        )
148        .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_STAKE_REGISTRATION_QUERY}"))
149    }
150}
151
152/// Insert Cert Queries
153pub(crate) struct CertInsertQuery {
154    /// Stake Registration Data captured during indexing.
155    stake_reg_data: Vec<StakeRegistrationInsertQuery>,
156}
157
158impl CertInsertQuery {
159    /// Create new data set for Cert Insert Query Batch.
160    pub(crate) fn new() -> Self {
161        CertInsertQuery {
162            stake_reg_data: Vec::new(),
163        }
164    }
165
166    /// Prepare Batch of Insert TXI Index Data Queries
167    pub(crate) async fn prepare_batch(
168        session: &Arc<Session>,
169        cfg: &cassandra_db::EnvVars,
170    ) -> anyhow::Result<SizedBatch> {
171        // Note: for now we have one query, but there are many certs, and later we may have more
172        // to add here.
173        StakeRegistrationInsertQuery::prepare_batch(session, cfg).await
174    }
175
176    /// Get the stake address for a hash, return an empty address if one can not be found.
177    #[allow(clippy::too_many_arguments)]
178    fn stake_address(
179        &mut self,
180        cred: &alonzo::StakeCredential,
181        slot_no: Slot,
182        txn: TxnIndex,
183        register: bool,
184        deregister: bool,
185        delegation: Option<Vec<u8>>,
186        block: &MultiEraBlock,
187    ) {
188        let (stake_address, pubkey, script) = match *cred {
189            conway::StakeCredential::AddrKeyhash(cred) => {
190                let stake_address = StakeAddress::new(block.network(), false, cred.into());
191                let addr = block.witness_for_tx(&VKeyHash::from(*cred), txn);
192                // Note: it is totally possible for the Registration Certificate to not be
193                // witnessed.
194                (stake_address, addr, false)
195            },
196            conway::StakeCredential::ScriptHash(h) => {
197                (
198                    StakeAddress::new(block.network(), true, h.into()),
199                    None,
200                    true,
201                )
202            },
203        };
204
205        if pubkey.is_none() && !script && deregister {
206            error!("Stake Deregistration Certificate {stake_address} is NOT Witnessed.");
207        }
208
209        if pubkey.is_none() && !script && delegation.is_some() {
210            error!("Stake Delegation Certificate {stake_address} is NOT Witnessed.");
211        }
212
213        // This may not be witnessed, its normal but disappointing.
214        if let Some(pubkey) = pubkey {
215            self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
216                stake_address,
217                slot_no,
218                txn,
219                pubkey,
220                script,
221                register,
222                deregister,
223                false,
224                delegation,
225            ));
226        }
227    }
228
229    /// Index an Alonzo Era certificate into the database.
230    fn index_alonzo_cert(
231        &mut self,
232        cert: &alonzo::Certificate,
233        slot: Slot,
234        index: TxnIndex,
235        block: &MultiEraBlock,
236    ) {
237        #[allow(clippy::match_same_arms)]
238        match cert {
239            alonzo::Certificate::StakeRegistration(cred) => {
240                // This may not be witnessed, its normal but disappointing.
241                self.stake_address(cred, slot, index, true, false, None, block);
242            },
243            alonzo::Certificate::StakeDeregistration(cred) => {
244                self.stake_address(cred, slot, index, false, true, None, block);
245            },
246            alonzo::Certificate::StakeDelegation(cred, pool) => {
247                self.stake_address(cred, slot, index, false, false, Some(pool.to_vec()), block);
248            },
249            alonzo::Certificate::PoolRegistration { .. } => {},
250            alonzo::Certificate::PoolRetirement(..) => {},
251            alonzo::Certificate::GenesisKeyDelegation(..) => {},
252            alonzo::Certificate::MoveInstantaneousRewardsCert(_) => {},
253        }
254    }
255
256    /// Index a certificate from a conway transaction.
257    fn index_conway_cert(
258        &mut self,
259        cert: &conway::Certificate,
260        slot_no: Slot,
261        txn: TxnIndex,
262        block: &MultiEraBlock,
263    ) {
264        #[allow(clippy::match_same_arms)]
265        match cert {
266            conway::Certificate::StakeRegistration(cred) => {
267                // This may not be witnessed, its normal but disappointing.
268                self.stake_address(cred, slot_no, txn, true, false, None, block);
269            },
270            conway::Certificate::StakeDeregistration(cred) => {
271                self.stake_address(cred, slot_no, txn, false, true, None, block);
272            },
273            conway::Certificate::StakeDelegation(cred, pool) => {
274                self.stake_address(cred, slot_no, txn, false, false, Some(pool.to_vec()), block);
275            },
276            conway::Certificate::PoolRegistration { .. } => {},
277            conway::Certificate::PoolRetirement(..) => {},
278            conway::Certificate::Reg(..) => {},
279            conway::Certificate::UnReg(..) => {},
280            conway::Certificate::VoteDeleg(..) => {},
281            conway::Certificate::StakeVoteDeleg(..) => {},
282            conway::Certificate::StakeRegDeleg(..) => {},
283            conway::Certificate::VoteRegDeleg(..) => {},
284            conway::Certificate::StakeVoteRegDeleg(..) => {},
285            conway::Certificate::AuthCommitteeHot(..) => {},
286            conway::Certificate::ResignCommitteeCold(..) => {},
287            conway::Certificate::RegDRepCert(..) => {},
288            conway::Certificate::UnRegDRepCert(..) => {},
289            conway::Certificate::UpdateDRepCert(..) => {},
290        }
291    }
292
293    /// Index the certificates in a transaction.
294    pub(crate) fn index(
295        &mut self,
296        txs: &MultiEraTx<'_>,
297        slot: Slot,
298        index: TxnIndex,
299        block: &MultiEraBlock,
300    ) {
301        #[allow(clippy::match_same_arms)]
302        txs.certs().iter().for_each(|cert| {
303            match cert {
304                MultiEraCert::NotApplicable => {},
305                MultiEraCert::AlonzoCompatible(cert) => {
306                    self.index_alonzo_cert(cert, slot, index, block);
307                },
308                MultiEraCert::Conway(cert) => {
309                    self.index_conway_cert(cert, slot, index, block);
310                },
311                _ => {},
312            }
313        });
314    }
315
316    /// Execute the Certificate Indexing Queries.
317    ///
318    /// Consumes the `self` and returns a vector of futures.
319    pub(crate) fn execute(
320        self,
321        session: &Arc<CassandraSession>,
322    ) -> FallibleQueryTasks {
323        let mut query_handles: FallibleQueryTasks = Vec::new();
324
325        let inner_session = session.clone();
326
327        query_handles.push(tokio::spawn(async move {
328            inner_session
329                .execute_batch(
330                    PreparedQuery::StakeRegistrationInsertQuery,
331                    self.stake_reg_data,
332                )
333                .await
334        }));
335
336        query_handles
337    }
338}