1use std::{fmt::Debug, sync::Arc};
4
5use cardano_chain_follower::{
6 pallas_primitives::{alonzo, conway},
7 pallas_traverse::{MultiEraCert, MultiEraTx},
8 MultiEraBlock, Slot, StakeAddress, TxnIndex, VKeyHash,
9};
10use ed25519_dalek::VerifyingKey;
11use scylla::{client::session::Session, value::MaybeUnset, SerializeRow};
12use tracing::error;
13
14use crate::{
15 db::{
16 index::{
17 queries::{FallibleQueryTasks, PreparedQueries, PreparedQuery, SizedBatch},
18 session::CassandraSession,
19 },
20 types::{DbPublicKey, DbSlot, DbStakeAddress, DbTxnIndex},
21 },
22 settings::cassandra_db,
23};
24
25#[derive(SerializeRow)]
27pub(crate) struct StakeRegistrationInsertQuery {
28 stake_address: DbStakeAddress,
30 slot_no: DbSlot,
32 txn_index: DbTxnIndex,
34 stake_public_key: DbPublicKey,
36 script: bool,
38 register: MaybeUnset<bool>,
40 deregister: MaybeUnset<bool>,
42 cip36: MaybeUnset<bool>,
44 pool_delegation: MaybeUnset<Vec<u8>>,
46}
47
48impl Debug for StakeRegistrationInsertQuery {
49 fn fmt(
50 &self,
51 f: &mut std::fmt::Formatter<'_>,
52 ) -> Result<(), std::fmt::Error> {
53 let stake_public_key = hex::encode(self.stake_public_key.as_ref());
54 let register = match self.register {
55 MaybeUnset::Unset => "UNSET",
56 MaybeUnset::Set(v) => &format!("{v:?}"),
57 };
58 let deregister = match self.deregister {
59 MaybeUnset::Unset => "UNSET",
60 MaybeUnset::Set(v) => &format!("{v:?}"),
61 };
62 let cip36 = match self.cip36 {
63 MaybeUnset::Unset => "UNSET",
64 MaybeUnset::Set(v) => &format!("{v:?}"),
65 };
66 let pool_delegation = match self.pool_delegation {
67 MaybeUnset::Unset => "UNSET",
68 MaybeUnset::Set(ref v) => &hex::encode(v),
69 };
70
71 f.debug_struct("StakeRegistrationInsertQuery")
72 .field("stake_address", &format!("{}", self.stake_address))
73 .field("slot_no", &self.slot_no)
74 .field("txn_index", &self.txn_index)
75 .field("stake_public_key", &stake_public_key)
76 .field("script", &self.script)
77 .field("register", ®ister)
78 .field("deregister", &deregister)
79 .field("cip36", &cip36)
80 .field("pool_delegation", &pool_delegation)
81 .finish()
82 }
83}
84
85const INSERT_STAKE_REGISTRATION_QUERY: &str = include_str!("./cql/insert_stake_registration.cql");
87
88impl StakeRegistrationInsertQuery {
89 #[allow(clippy::too_many_arguments, clippy::fn_params_excessive_bools)]
91 pub fn new(
92 stake_address: StakeAddress,
93 slot_no: Slot,
94 txn_index: TxnIndex,
95 stake_public_key: VerifyingKey,
96 script: bool,
97 register: bool,
98 deregister: bool,
99 cip36: bool,
100 pool_delegation: Option<Vec<u8>>,
101 ) -> Self {
102 StakeRegistrationInsertQuery {
103 stake_address: stake_address.into(),
104 slot_no: slot_no.into(),
105 txn_index: txn_index.into(),
106 stake_public_key: stake_public_key.into(),
107 script,
108 register: if register {
109 MaybeUnset::Set(true)
110 } else {
111 MaybeUnset::Unset
112 },
113 deregister: if deregister {
114 MaybeUnset::Set(true)
115 } else {
116 MaybeUnset::Unset
117 },
118 cip36: if cip36 {
119 MaybeUnset::Set(true)
120 } else {
121 MaybeUnset::Unset
122 },
123 pool_delegation: if let Some(pool_delegation) = pool_delegation {
124 MaybeUnset::Set(pool_delegation)
125 } else {
126 MaybeUnset::Unset
127 },
128 }
129 }
130
131 pub(crate) async fn prepare_batch(
133 session: &Arc<Session>,
134 cfg: &cassandra_db::EnvVars,
135 ) -> anyhow::Result<SizedBatch> {
136 PreparedQueries::prepare_batch(
137 session.clone(),
138 INSERT_STAKE_REGISTRATION_QUERY,
139 cfg,
140 scylla::statement::Consistency::Any,
141 true,
142 false,
143 )
144 .await
145 .inspect_err(
146 |error| error!(error=%error,"Failed to prepare Insert Stake Registration Query."),
147 )
148 .map_err(|error| anyhow::anyhow!("{error}\n--\n{INSERT_STAKE_REGISTRATION_QUERY}"))
149 }
150}
151
152pub(crate) struct CertInsertQuery {
154 stake_reg_data: Vec<StakeRegistrationInsertQuery>,
156}
157
158impl CertInsertQuery {
159 pub(crate) fn new() -> Self {
161 CertInsertQuery {
162 stake_reg_data: Vec::new(),
163 }
164 }
165
166 pub(crate) async fn prepare_batch(
168 session: &Arc<Session>,
169 cfg: &cassandra_db::EnvVars,
170 ) -> anyhow::Result<SizedBatch> {
171 StakeRegistrationInsertQuery::prepare_batch(session, cfg).await
174 }
175
176 #[allow(clippy::too_many_arguments)]
178 fn stake_address(
179 &mut self,
180 cred: &alonzo::StakeCredential,
181 slot_no: Slot,
182 txn: TxnIndex,
183 register: bool,
184 deregister: bool,
185 delegation: Option<Vec<u8>>,
186 block: &MultiEraBlock,
187 ) {
188 let (stake_address, pubkey, script) = match *cred {
189 conway::StakeCredential::AddrKeyhash(cred) => {
190 let stake_address = StakeAddress::new(block.network(), false, cred.into());
191 let addr = block.witness_for_tx(&VKeyHash::from(*cred), txn);
192 (stake_address, addr, false)
195 },
196 conway::StakeCredential::ScriptHash(h) => {
197 (
198 StakeAddress::new(block.network(), true, h.into()),
199 None,
200 true,
201 )
202 },
203 };
204
205 if pubkey.is_none() && !script && deregister {
206 error!("Stake Deregistration Certificate {stake_address} is NOT Witnessed.");
207 }
208
209 if pubkey.is_none() && !script && delegation.is_some() {
210 error!("Stake Delegation Certificate {stake_address} is NOT Witnessed.");
211 }
212
213 if let Some(pubkey) = pubkey {
215 self.stake_reg_data.push(StakeRegistrationInsertQuery::new(
216 stake_address,
217 slot_no,
218 txn,
219 pubkey,
220 script,
221 register,
222 deregister,
223 false,
224 delegation,
225 ));
226 }
227 }
228
229 fn index_alonzo_cert(
231 &mut self,
232 cert: &alonzo::Certificate,
233 slot: Slot,
234 index: TxnIndex,
235 block: &MultiEraBlock,
236 ) {
237 #[allow(clippy::match_same_arms)]
238 match cert {
239 alonzo::Certificate::StakeRegistration(cred) => {
240 self.stake_address(cred, slot, index, true, false, None, block);
242 },
243 alonzo::Certificate::StakeDeregistration(cred) => {
244 self.stake_address(cred, slot, index, false, true, None, block);
245 },
246 alonzo::Certificate::StakeDelegation(cred, pool) => {
247 self.stake_address(cred, slot, index, false, false, Some(pool.to_vec()), block);
248 },
249 alonzo::Certificate::PoolRegistration { .. } => {},
250 alonzo::Certificate::PoolRetirement(..) => {},
251 alonzo::Certificate::GenesisKeyDelegation(..) => {},
252 alonzo::Certificate::MoveInstantaneousRewardsCert(_) => {},
253 }
254 }
255
256 fn index_conway_cert(
258 &mut self,
259 cert: &conway::Certificate,
260 slot_no: Slot,
261 txn: TxnIndex,
262 block: &MultiEraBlock,
263 ) {
264 #[allow(clippy::match_same_arms)]
265 match cert {
266 conway::Certificate::StakeRegistration(cred) => {
267 self.stake_address(cred, slot_no, txn, true, false, None, block);
269 },
270 conway::Certificate::StakeDeregistration(cred) => {
271 self.stake_address(cred, slot_no, txn, false, true, None, block);
272 },
273 conway::Certificate::StakeDelegation(cred, pool) => {
274 self.stake_address(cred, slot_no, txn, false, false, Some(pool.to_vec()), block);
275 },
276 conway::Certificate::PoolRegistration { .. } => {},
277 conway::Certificate::PoolRetirement(..) => {},
278 conway::Certificate::Reg(..) => {},
279 conway::Certificate::UnReg(..) => {},
280 conway::Certificate::VoteDeleg(..) => {},
281 conway::Certificate::StakeVoteDeleg(..) => {},
282 conway::Certificate::StakeRegDeleg(..) => {},
283 conway::Certificate::VoteRegDeleg(..) => {},
284 conway::Certificate::StakeVoteRegDeleg(..) => {},
285 conway::Certificate::AuthCommitteeHot(..) => {},
286 conway::Certificate::ResignCommitteeCold(..) => {},
287 conway::Certificate::RegDRepCert(..) => {},
288 conway::Certificate::UnRegDRepCert(..) => {},
289 conway::Certificate::UpdateDRepCert(..) => {},
290 }
291 }
292
293 pub(crate) fn index(
295 &mut self,
296 txs: &MultiEraTx<'_>,
297 slot: Slot,
298 index: TxnIndex,
299 block: &MultiEraBlock,
300 ) {
301 #[allow(clippy::match_same_arms)]
302 txs.certs().iter().for_each(|cert| {
303 match cert {
304 MultiEraCert::NotApplicable => {},
305 MultiEraCert::AlonzoCompatible(cert) => {
306 self.index_alonzo_cert(cert, slot, index, block);
307 },
308 MultiEraCert::Conway(cert) => {
309 self.index_conway_cert(cert, slot, index, block);
310 },
311 _ => {},
312 }
313 });
314 }
315
316 pub(crate) fn execute(
320 self,
321 session: &Arc<CassandraSession>,
322 ) -> FallibleQueryTasks {
323 let mut query_handles: FallibleQueryTasks = Vec::new();
324
325 let inner_session = session.clone();
326
327 query_handles.push(tokio::spawn(async move {
328 inner_session
329 .execute_batch(
330 PreparedQuery::StakeRegistrationInsertQuery,
331 self.stake_reg_data,
332 )
333 .await
334 }));
335
336 query_handles
337 }
338}