1use std::{fmt::Debug, sync::Arc};
4
5use cardano_chain_follower::{
6 pallas_primitives::{alonzo, conway},
7 pallas_traverse::{MultiEraCert, MultiEraTx},
8 MultiEraBlock, Slot, StakeAddress, TxnIndex, VKeyHash,
9};
10use ed25519_dalek::VerifyingKey;
11use scylla::{client::session::Session, value::MaybeUnset, SerializeRow};
12use tracing::error;
13
14use crate::{
15 db::{
16 index::{
17 queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
18 session::CassandraSession,
19 },
20 types::{DbPublicKey, DbSlot, DbStakeAddress, DbTxnIndex},
21 },
22 settings::cassandra_db,
23};
24
25#[derive(SerializeRow)]
27pub(crate) struct StakeRegistrationInsertQuery {
28 stake_address: DbStakeAddress,
30 slot_no: DbSlot,
32 txn_index: DbTxnIndex,
34 stake_public_key: DbPublicKey,
36 script: bool,
38 register: MaybeUnset<bool>,
40 deregister: MaybeUnset<bool>,
42 cip36: MaybeUnset<bool>,
44 pool_delegation: MaybeUnset<Vec<u8>>,
46}
47
48impl Debug for StakeRegistrationInsertQuery {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> {
50 let stake_public_key = hex::encode(self.stake_public_key.as_ref());
51 let register = match self.register {
52 MaybeUnset::Unset => "UNSET",
53 MaybeUnset::Set(v) => &format!("{v:?}"),
54 };
55 let deregister = match self.deregister {
56 MaybeUnset::Unset => "UNSET",
57 MaybeUnset::Set(v) => &format!("{v:?}"),
58 };
59 let cip36 = match self.cip36 {
60 MaybeUnset::Unset => "UNSET",
61 MaybeUnset::Set(v) => &format!("{v:?}"),
62 };
63 let pool_delegation = match self.pool_delegation {
64 MaybeUnset::Unset => "UNSET",
65 MaybeUnset::Set(ref v) => &hex::encode(v),
66 };
67
68 f.debug_struct("StakeRegistrationInsertQuery")
69 .field("stake_address", &format!("{}", self.stake_address))
70 .field("slot_no", &self.slot_no)
71 .field("txn_index", &self.txn_index)
72 .field("stake_public_key", &stake_public_key)
73 .field("script", &self.script)
74 .field("register", ®ister)
75 .field("deregister", &deregister)
76 .field("cip36", &cip36)
77 .field("pool_delegation", &pool_delegation)
78 .finish()
79 }
80}
81
82const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");
84
85impl StakeRegistrationInsertQuery {
86 #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
88 pub fn new(
89 stake_address: StakeAddress, slot_no: Slot, txn_index: TxnIndex,
90 stake_public_key: VerifyingKey, script: bool, register: bool, deregister: bool,
91 cip36: bool, pool_delegation: Option<Vec<u8>>,
92 ) -> Self {
93 StakeRegistrationInsertQuery {
94 stake_address: stake_address.into(),
95 slot_no: slot_no.into(),
96 txn_index: txn_index.into(),
97 stake_public_key: stake_public_key.into(),
98 script,
99 register: if register {
100 MaybeUnset::Set(true)
101 } else {
102 MaybeUnset::Unset
103 },
104 deregister: if deregister {
105 MaybeUnset::Set(true)
106 } else {
107 MaybeUnset::Unset
108 },
109 cip36: if cip36 {
110 MaybeUnset::Set(true)
111 } else {
112 MaybeUnset::Unset
113 },
114 pool_delegation: if let Some(pool_delegation) = pool_delegation {
115 MaybeUnset::Set(pool_delegation)
116 } else {
117 MaybeUnset::Unset
118 },
119 }
120 }
121
122 pub(crate) async fn prepare_batch(
124 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
125 ) -> anyhow::Result<SizedBatch> {
126 PreparedQueries::prepare_batch(
127 session.clone(),
128 INSERT_STAKE_REGISTRATION_QUERY,
129 cfg,
130 scylla::statement::Consistency::Any,
131 true,
132 false,
133 )
134 .await
135 .inspect_err(
136 |error| error!(error=%error,"Failed to prepare Insert Stake Registration Query."),
137 )
138 .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_STAKE_REGISTRATION_QUERY}"))
139 }
140}
141
142pub(crate) struct CertInsertQuery {
144 stake_reg_data: Vec<StakeRegistrationInsertQuery>,
146}
147
148impl CertInsertQuery {
149 pub(crate) fn new() -> Self {
151 CertInsertQuery {
152 stake_reg_data: Vec::new(),
153 }
154 }
155
156 pub(crate) async fn prepare_batch(
158 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
159 ) -> anyhow::Result<SizedBatch> {
160 StakeRegistrationInsertQuery::prepare_batch(session, cfg).await
163 }
164
165 #[allow(clippy::too_many_arguments)]
167 fn stake_address(
168 &mut self, cred: &alonzo::StakeCredential, slot_no: Slot, txn: TxnIndex, register: bool,
169 deregister: bool, delegation: Option<Vec<u8>>, block: &MultiEraBlock,
170 ) {
171 let (stake_address, pubkey, script) = match *cred {
172 conway::StakeCredential::AddrKeyhash(cred) => {
173 let stake_address = StakeAddress::new(block.network(), false, cred.into());
174 let addr = block.witness_for_tx(&VKeyHash::from(*cred), txn);
175 (stake_address, addr, false)
178 },
179 conway::StakeCredential::ScriptHash(h) => {
180 (
181 StakeAddress::new(block.network(), true, h.into()),
182 None,
183 true,
184 )
185 },
186 };
187
188 if pubkey.is_none() && !script && deregister {
189 error!("Stake Deregistration Certificate {stake_address} is NOT Witnessed.");
190 }
191
192 if pubkey.is_none() && !script && delegation.is_some() {
193 error!("Stake Delegation Certificate {stake_address} is NOT Witnessed.");
194 }
195
196 if let Some(pubkey) = pubkey {
198 self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
199 stake_address,
200 slot_no,
201 txn,
202 pubkey,
203 script,
204 register,
205 deregister,
206 false,
207 delegation,
208 ));
209 }
210 }
211
212 fn index_alonzo_cert(
214 &mut self, cert: &alonzo::Certificate, slot: Slot, index: TxnIndex, block: &MultiEraBlock,
215 ) {
216 #[allow(clippy::match_same_arms)]
217 match cert {
218 alonzo::Certificate::StakeRegistration(cred) => {
219 self.stake_address(cred, slot, index, true, false, None, block);
221 },
222 alonzo::Certificate::StakeDeregistration(cred) => {
223 self.stake_address(cred, slot, index, false, true, None, block);
224 },
225 alonzo::Certificate::StakeDelegation(cred, pool) => {
226 self.stake_address(cred, slot, index, false, false, Some(pool.to_vec()), block);
227 },
228 alonzo::Certificate::PoolRegistration { .. } => {},
229 alonzo::Certificate::PoolRetirement(..) => {},
230 alonzo::Certificate::GenesisKeyDelegation(..) => {},
231 alonzo::Certificate::MoveInstantaneousRewardsCert(_) => {},
232 }
233 }
234
235 fn index_conway_cert(
237 &mut self, cert: &conway::Certificate, slot_no: Slot, txn: TxnIndex, block: &MultiEraBlock,
238 ) {
239 #[allow(clippy::match_same_arms)]
240 match cert {
241 conway::Certificate::StakeRegistration(cred) => {
242 self.stake_address(cred, slot_no, txn, true, false, None, block);
244 },
245 conway::Certificate::StakeDeregistration(cred) => {
246 self.stake_address(cred, slot_no, txn, false, true, None, block);
247 },
248 conway::Certificate::StakeDelegation(cred, pool) => {
249 self.stake_address(cred, slot_no, txn, false, false, Some(pool.to_vec()), block);
250 },
251 conway::Certificate::PoolRegistration { .. } => {},
252 conway::Certificate::PoolRetirement(..) => {},
253 conway::Certificate::Reg(..) => {},
254 conway::Certificate::UnReg(..) => {},
255 conway::Certificate::VoteDeleg(..) => {},
256 conway::Certificate::StakeVoteDeleg(..) => {},
257 conway::Certificate::StakeRegDeleg(..) => {},
258 conway::Certificate::VoteRegDeleg(..) => {},
259 conway::Certificate::StakeVoteRegDeleg(..) => {},
260 conway::Certificate::AuthCommitteeHot(..) => {},
261 conway::Certificate::ResignCommitteeCold(..) => {},
262 conway::Certificate::RegDRepCert(..) => {},
263 conway::Certificate::UnRegDRepCert(..) => {},
264 conway::Certificate::UpdateDRepCert(..) => {},
265 }
266 }
267
268 pub(crate) fn index(
270 &mut self, txs: &MultiEraTx<'_>, slot: Slot, index: TxnIndex, block: &MultiEraBlock,
271 ) {
272 #[allow(clippy::match_same_arms)]
273 txs.certs().iter().for_each(|cert| {
274 match cert {
275 MultiEraCert::NotApplicable => {},
276 MultiEraCert::AlonzoCompatible(cert) => {
277 self.index_alonzo_cert(cert, slot, index, block);
278 },
279 MultiEraCert::Conway(cert) => {
280 self.index_conway_cert(cert, slot, index, block);
281 },
282 _ => {},
283 }
284 });
285 }
286
287 pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
291 let mut query_handles: FallibleQueryTasks = Vec::new();
292
293 let inner_session = session.clone();
294
295 query_handles.push(tokio::spawn(async move {
296 inner_session
297 .execute_batch(
298 PreparedQuery::StakeRegistrationInsertQuery,
299 self.stake_reg_data,
300 )
301 .await
302 }));
303
304 query_handles
305 }
306}