cat_gateway/db/index/block/
certs.rs

1//! Index certs found in a transaction.
2
3use std::{fmt::Debug, sync::Arc};
4
5use cardano_chain_follower::{
6    pallas_primitives::{alonzo, conway},
7    pallas_traverse::{MultiEraCert, MultiEraTx},
8    MultiEraBlock, Slot, StakeAddress, TxnIndex, VKeyHash,
9};
10use ed25519_dalek::VerifyingKey;
11use scylla::{client::session::Session, value::MaybeUnset, SerializeRow};
12use tracing::error;
13
14use crate::{
15    db::{
16        index::{
17            queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
18            session::CassandraSession,
19        },
20        types::{DbPublicKey, DbSlot, DbStakeAddress, DbTxnIndex},
21    },
22    settings::cassandra_db,
23};
24
25/// Insert stake registration query
26#[derive(SerializeRow)]
27pub(crate) struct StakeRegistrationInsertQuery {
28    /// Stake address (29 bytes).
29    stake_address: DbStakeAddress,
30    /// Slot Number the cert is in.
31    slot_no: DbSlot,
32    /// Transaction Index.
33    txn_index: DbTxnIndex,
34    /// Full Stake Public Key (32 byte Ed25519 Public key, not hashed).
35    stake_public_key: DbPublicKey,
36    /// Is the stake address a script or not.
37    script: bool,
38    /// Is the Cardano Certificate Registered
39    register: MaybeUnset<bool>,
40    /// Is the Cardano Certificate Deregistered
41    deregister: MaybeUnset<bool>,
42    /// Is the stake address contains CIP36 registration?
43    cip36: MaybeUnset<bool>,
44    /// Pool Delegation Address
45    pool_delegation: MaybeUnset<Vec<u8>>,
46}
47
48impl Debug for StakeRegistrationInsertQuery {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
50        let stake_public_key = hex::encode(self.stake_public_key.as_ref());
51        let register = match self.register {
52            MaybeUnset::Unset => "UNSET",
53            MaybeUnset::Set(v) => &format!("{v:?}"),
54        };
55        let deregister = match self.deregister {
56            MaybeUnset::Unset => "UNSET",
57            MaybeUnset::Set(v) => &format!("{v:?}"),
58        };
59        let cip36 = match self.cip36 {
60            MaybeUnset::Unset => "UNSET",
61            MaybeUnset::Set(v) => &format!("{v:?}"),
62        };
63        let pool_delegation = match self.pool_delegation {
64            MaybeUnset::Unset => "UNSET",
65            MaybeUnset::Set(ref v) => &hex::encode(v),
66        };
67
68        f.debug_struct("StakeRegistrationInsertQuery")
69            .field("stake_address", &format!("{}", self.stake_address))
70            .field("slot_no", &self.slot_no)
71            .field("txn_index", &self.txn_index)
72            .field("stake_public_key", &stake_public_key)
73            .field("script", &self.script)
74            .field("register", &register)
75            .field("deregister", &deregister)
76            .field("cip36", &cip36)
77            .field("pool_delegation", &pool_delegation)
78            .finish()
79    }
80}
81
82/// Insert stake registration
83const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");
84
85impl StakeRegistrationInsertQuery {
86    /// Create a new Insert Query.
87    #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
88    pub fn new(
89        stake_address: StakeAddress, slot_no: Slot, txn_index: TxnIndex,
90        stake_public_key: VerifyingKey, script: bool, register: bool, deregister: bool,
91        cip36: bool, pool_delegation: Option<Vec<u8>>,
92    ) -> Self {
93        StakeRegistrationInsertQuery {
94            stake_address: stake_address.into(),
95            slot_no: slot_no.into(),
96            txn_index: txn_index.into(),
97            stake_public_key: stake_public_key.into(),
98            script,
99            register: if register {
100                MaybeUnset::Set(true)
101            } else {
102                MaybeUnset::Unset
103            },
104            deregister: if deregister {
105                MaybeUnset::Set(true)
106            } else {
107                MaybeUnset::Unset
108            },
109            cip36: if cip36 {
110                MaybeUnset::Set(true)
111            } else {
112                MaybeUnset::Unset
113            },
114            pool_delegation: if let Some(pool_delegation) = pool_delegation {
115                MaybeUnset::Set(pool_delegation)
116            } else {
117                MaybeUnset::Unset
118            },
119        }
120    }
121
122    /// Prepare Batch of Insert stake registration.
123    pub(crate) async fn prepare_batch(
124        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
125    ) -> anyhow::Result<SizedBatch> {
126        PreparedQueries::prepare_batch(
127            session.clone(),
128            INSERT_STAKE_REGISTRATION_QUERY,
129            cfg,
130            scylla::statement::Consistency::Any,
131            true,
132            false,
133        )
134        .await
135        .inspect_err(
136            |error| error!(error=%error,"Failed to prepare Insert Stake Registration Query."),
137        )
138        .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_STAKE_REGISTRATION_QUERY}"))
139    }
140}
141
142/// Insert Cert Queries
143pub(crate) struct CertInsertQuery {
144    /// Stake Registration Data captured during indexing.
145    stake_reg_data: Vec<StakeRegistrationInsertQuery>,
146}
147
148impl CertInsertQuery {
149    /// Create new data set for Cert Insert Query Batch.
150    pub(crate) fn new() -> Self {
151        CertInsertQuery {
152            stake_reg_data: Vec::new(),
153        }
154    }
155
156    /// Prepare Batch of Insert TXI Index Data Queries
157    pub(crate) async fn prepare_batch(
158        session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
159    ) -> anyhow::Result<SizedBatch> {
160        // Note: for now we have one query, but there are many certs, and later we may have more
161        // to add here.
162        StakeRegistrationInsertQuery::prepare_batch(session, cfg).await
163    }
164
165    /// Get the stake address for a hash, return an empty address if one can not be found.
166    #[allow(clippy::too_many_arguments)]
167    fn stake_address(
168        &mut self, cred: &alonzo::StakeCredential, slot_no: Slot, txn: TxnIndex, register: bool,
169        deregister: bool, delegation: Option<Vec<u8>>, block: &MultiEraBlock,
170    ) {
171        let (stake_address, pubkey, script) = match *cred {
172            conway::StakeCredential::AddrKeyhash(cred) => {
173                let stake_address = StakeAddress::new(block.network(), false, cred.into());
174                let addr = block.witness_for_tx(&VKeyHash::from(*cred), txn);
175                // Note: it is totally possible for the Registration Certificate to not be
176                // witnessed.
177                (stake_address, addr, false)
178            },
179            conway::StakeCredential::ScriptHash(h) => {
180                (
181                    StakeAddress::new(block.network(), true, h.into()),
182                    None,
183                    true,
184                )
185            },
186        };
187
188        if pubkey.is_none() && !script && deregister {
189            error!("Stake Deregistration Certificate {stake_address} is NOT Witnessed.");
190        }
191
192        if pubkey.is_none() && !script && delegation.is_some() {
193            error!("Stake Delegation Certificate {stake_address} is NOT Witnessed.");
194        }
195
196        // This may not be witnessed, its normal but disappointing.
197        if let Some(pubkey) = pubkey {
198            self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
199                stake_address,
200                slot_no,
201                txn,
202                pubkey,
203                script,
204                register,
205                deregister,
206                false,
207                delegation,
208            ));
209        }
210    }
211
212    /// Index an Alonzo Era certificate into the database.
213    fn index_alonzo_cert(
214        &mut self, cert: &alonzo::Certificate, slot: Slot, index: TxnIndex, block: &MultiEraBlock,
215    ) {
216        #[allow(clippy::match_same_arms)]
217        match cert {
218            alonzo::Certificate::StakeRegistration(cred) => {
219                // This may not be witnessed, its normal but disappointing.
220                self.stake_address(cred, slot, index, true, false, None, block);
221            },
222            alonzo::Certificate::StakeDeregistration(cred) => {
223                self.stake_address(cred, slot, index, false, true, None, block);
224            },
225            alonzo::Certificate::StakeDelegation(cred, pool) => {
226                self.stake_address(cred, slot, index, false, false, Some(pool.to_vec()), block);
227            },
228            alonzo::Certificate::PoolRegistration { .. } => {},
229            alonzo::Certificate::PoolRetirement(..) => {},
230            alonzo::Certificate::GenesisKeyDelegation(..) => {},
231            alonzo::Certificate::MoveInstantaneousRewardsCert(_) => {},
232        }
233    }
234
235    /// Index a certificate from a conway transaction.
236    fn index_conway_cert(
237        &mut self, cert: &conway::Certificate, slot_no: Slot, txn: TxnIndex, block: &MultiEraBlock,
238    ) {
239        #[allow(clippy::match_same_arms)]
240        match cert {
241            conway::Certificate::StakeRegistration(cred) => {
242                // This may not be witnessed, its normal but disappointing.
243                self.stake_address(cred, slot_no, txn, true, false, None, block);
244            },
245            conway::Certificate::StakeDeregistration(cred) => {
246                self.stake_address(cred, slot_no, txn, false, true, None, block);
247            },
248            conway::Certificate::StakeDelegation(cred, pool) => {
249                self.stake_address(cred, slot_no, txn, false, false, Some(pool.to_vec()), block);
250            },
251            conway::Certificate::PoolRegistration { .. } => {},
252            conway::Certificate::PoolRetirement(..) => {},
253            conway::Certificate::Reg(..) => {},
254            conway::Certificate::UnReg(..) => {},
255            conway::Certificate::VoteDeleg(..) => {},
256            conway::Certificate::StakeVoteDeleg(..) => {},
257            conway::Certificate::StakeRegDeleg(..) => {},
258            conway::Certificate::VoteRegDeleg(..) => {},
259            conway::Certificate::StakeVoteRegDeleg(..) => {},
260            conway::Certificate::AuthCommitteeHot(..) => {},
261            conway::Certificate::ResignCommitteeCold(..) => {},
262            conway::Certificate::RegDRepCert(..) => {},
263            conway::Certificate::UnRegDRepCert(..) => {},
264            conway::Certificate::UpdateDRepCert(..) => {},
265        }
266    }
267
268    /// Index the certificates in a transaction.
269    pub(crate) fn index(
270        &mut self, txs: &MultiEraTx<'_>, slot: Slot, index: TxnIndex, block: &MultiEraBlock,
271    ) {
272        #[allow(clippy::match_same_arms)]
273        txs.certs().iter().for_each(|cert| {
274            match cert {
275                MultiEraCert::NotApplicable => {},
276                MultiEraCert::AlonzoCompatible(cert) => {
277                    self.index_alonzo_cert(cert, slot, index, block);
278                },
279                MultiEraCert::Conway(cert) => {
280                    self.index_conway_cert(cert, slot, index, block);
281                },
282                _ => {},
283            }
284        });
285    }
286
287    /// Execute the Certificate Indexing Queries.
288    ///
289    /// Consumes the `self` and returns a vector of futures.
290    pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
291        let mut query_handles: FallibleQueryTasks = Vec::new();
292
293        let inner_session = session.clone();
294
295        query_handles.push(tokio::spawn(async move {
296            inner_session
297                .execute_batch(
298                    PreparedQuery::StakeRegistrationInsertQuery,
299                    self.stake_reg_data,
300                )
301                .await
302        }));
303
304        query_handles
305    }
306}