cat_gateway/db/index/block/cip36/
mod.rs1pub(crate) mod insert_cip36;
4pub(crate) mod insert_cip36_for_vote_key;
5pub(crate) mod insert_cip36_invalid;
6
7use std::sync::Arc;
8
9use cardano_chain_follower::{
10 hashes::Blake2b224Hash, Cip36, MultiEraBlock, Slot, StakeAddress, TxnIndex,
11};
12use scylla::client::session::Session;
13
14use super::certs;
15use crate::{
16 db::index::{
17 queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
18 session::CassandraSession,
19 },
20 settings::cassandra_db,
21};
22
23pub(crate) struct Cip36InsertQuery {
25 registrations: Vec<insert_cip36::Params>,
27 invalid: Vec<insert_cip36_invalid::Params>,
29 for_vote_key: Vec<insert_cip36_for_vote_key::Params>,
31 stake_regs: Vec<certs::StakeRegistrationInsertQuery>,
33}
34
35impl Cip36InsertQuery {
36 pub(crate) fn new() -> Self {
38 Cip36InsertQuery {
39 registrations: Vec::new(),
40 invalid: Vec::new(),
41 for_vote_key: Vec::new(),
42 stake_regs: Vec::new(),
43 }
44 }
45
46 pub(crate) async fn prepare_batch(
48 session: &Arc<Session>, cfg: &cassandra_db::EnvVars,
49 ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch)> {
50 let insert_cip36_batch = insert_cip36::Params::prepare_batch(session, cfg).await;
51 let insert_cip36_invalid_batch =
52 insert_cip36_invalid::Params::prepare_batch(session, cfg).await;
53 let insert_cip36_for_vote_key_addr_batch =
54 insert_cip36_for_vote_key::Params::prepare_batch(session, cfg).await;
55 Ok((
60 insert_cip36_batch?,
61 insert_cip36_invalid_batch?,
62 insert_cip36_for_vote_key_addr_batch?,
63 ))
64 }
65
66 pub(crate) fn index(
68 &mut self, index: TxnIndex, slot_no: Slot, block: &MultiEraBlock,
69 ) -> anyhow::Result<()> {
70 match Cip36::new(block, index, true) {
72 Ok(Some(cip36)) if cip36.is_valid() => {
75 let voting_key = cip36.voting_pks().first().ok_or(anyhow::anyhow!(
77 "Valid CIP36 registration must have one voting key"
78 ))?;
79
80 let stake_pk = cip36.stake_pk().ok_or(anyhow::anyhow!(
81 "Valid CIP36 registration must have one stake public key"
82 ))?;
83 let stake_pk_hash = Blake2b224Hash::new(&stake_pk.to_bytes());
84 let stake_address = StakeAddress::new(block.network(), false, stake_pk_hash);
85
86 self.registrations.push(insert_cip36::Params::new(
87 voting_key, slot_no, index, &cip36,
88 ));
89 self.for_vote_key
90 .push(insert_cip36_for_vote_key::Params::new(
91 voting_key, slot_no, index, &cip36, true,
92 ));
93 self.stake_regs
94 .push(certs::StakeRegistrationInsertQuery::new(
95 stake_address,
96 slot_no,
97 index,
98 *stake_pk,
99 false,
100 false,
101 false,
102 true,
103 None,
104 ));
105 },
106 Ok(Some(cip36)) => {
108 if let Some(stake_pk) = cip36.stake_pk() {
110 if cip36.voting_pks().is_empty() {
111 self.invalid.push(insert_cip36_invalid::Params::new(
112 None, slot_no, index, &cip36,
113 ));
114 } else {
115 for voting_key in cip36.voting_pks() {
116 self.invalid.push(insert_cip36_invalid::Params::new(
117 Some(voting_key),
118 slot_no,
119 index,
120 &cip36,
121 ));
122 self.for_vote_key
123 .push(insert_cip36_for_vote_key::Params::new(
124 voting_key, slot_no, index, &cip36, false,
125 ));
126 }
127 }
128
129 let stake_pk_hash = Blake2b224Hash::new(&stake_pk.to_bytes());
130 let stake_address = StakeAddress::new(block.network(), false, stake_pk_hash);
131 self.stake_regs
132 .push(certs::StakeRegistrationInsertQuery::new(
133 stake_address,
134 slot_no,
135 index,
136 *stake_pk,
137 false,
138 false,
139 false,
140 true,
141 None,
142 ));
143 }
144 },
145 _ => {},
146 }
147 Ok(())
148 }
149
150 pub(crate) fn execute(self, session: &Arc<CassandraSession>) -> FallibleQueryTasks {
154 let mut query_handles: FallibleQueryTasks = Vec::new();
155
156 if !self.registrations.is_empty() {
157 let inner_session = session.clone();
158 query_handles.push(tokio::spawn(async move {
159 inner_session
160 .execute_batch(
161 PreparedQuery::Cip36RegistrationInsertQuery,
162 self.registrations,
163 )
164 .await
165 }));
166 }
167
168 if !self.invalid.is_empty() {
169 let inner_session = session.clone();
170 query_handles.push(tokio::spawn(async move {
171 inner_session
172 .execute_batch(
173 PreparedQuery::Cip36RegistrationInsertErrorQuery,
174 self.invalid,
175 )
176 .await
177 }));
178 }
179
180 if !self.for_vote_key.is_empty() {
181 let inner_session = session.clone();
182 query_handles.push(tokio::spawn(async move {
183 inner_session
184 .execute_batch(
185 PreparedQuery::Cip36RegistrationForVoteKeyInsertQuery,
186 self.for_vote_key,
187 )
188 .await
189 }));
190 }
191
192 if !self.stake_regs.is_empty() {
193 let inner_session = session.clone();
194 query_handles.push(tokio::spawn(async move {
195 inner_session
196 .execute_batch(PreparedQuery::StakeRegistrationInsertQuery, self.stake_regs)
197 .await
198 }));
199 }
200
201 query_handles
202 }
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::db::index::tests::test_utils;
209
210 #[test]
211 fn index() {
212 let block = test_utils::block_2();
213 let mut query = Cip36InsertQuery::new();
214 query.index(0.into(), 0.into(), &block).unwrap();
215 assert_eq!(1, query.registrations.len());
216 assert!(query.invalid.is_empty());
217 assert_eq!(1, query.for_vote_key.len());
218 assert_eq!(1, query.stake_regs.len());
219 }
220}