cat_gateway/db/index/block/cip36/
mod.rs1pub(crate) mod insert_cip36;
4pub(crate) mod insert_cip36_for_vote_key;
5pub(crate) mod insert_cip36_invalid;
6
7use std::sync::Arc;
8
9use cardano_chain_follower::{
10 hashes::Blake2b224Hash, Cip36, MultiEraBlock, Slot, StakeAddress, TxnIndex,
11};
12use scylla::client::session::Session;
13
14use super::certs;
15use crate::{
16 db::index::{
17 queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
18 session::CassandraSession,
19 },
20 settings::cassandra_db,
21};
22
23pub(crate) struct Cip36InsertQuery {
25 registrations: Vec<insert_cip36::Params>,
27 invalid: Vec<insert_cip36_invalid::Params>,
29 for_vote_key: Vec<insert_cip36_for_vote_key::Params>,
31 stake_regs: Vec<certs::StakeRegistrationInsertQuery>,
33}
34
35impl Cip36InsertQuery {
36 pub(crate) fn new() -> Self {
38 Cip36InsertQuery {
39 registrations: Vec::new(),
40 invalid: Vec::new(),
41 for_vote_key: Vec::new(),
42 stake_regs: Vec::new(),
43 }
44 }
45
46 pub(crate) async fn prepare_batch(
48 session: &Arc<Session>,
49 cfg: &cassandra_db::EnvVars,
50 ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch)> {
51 let insert_cip36_batch = insert_cip36::Params::prepare_batch(session, cfg).await;
52 let insert_cip36_invalid_batch =
53 insert_cip36_invalid::Params::prepare_batch(session, cfg).await;
54 let insert_cip36_for_vote_key_addr_batch =
55 insert_cip36_for_vote_key::Params::prepare_batch(session, cfg).await;
56 Ok((
61 insert_cip36_batch?,
62 insert_cip36_invalid_batch?,
63 insert_cip36_for_vote_key_addr_batch?,
64 ))
65 }
66
67 pub(crate) fn index(
69 &mut self,
70 index: TxnIndex,
71 slot_no: Slot,
72 block: &MultiEraBlock,
73 ) -> anyhow::Result<()> {
74 match Cip36::new(block, index, true) {
76 Ok(Some(cip36)) if cip36.is_valid() => {
79 let voting_key = cip36.voting_pks().first().ok_or(anyhow::anyhow!(
81 "Valid CIP36 registration must have one voting key"
82 ))?;
83
84 let stake_pk = cip36.stake_pk().ok_or(anyhow::anyhow!(
85 "Valid CIP36 registration must have one stake public key"
86 ))?;
87 let stake_pk_hash = Blake2b224Hash::new(&stake_pk.to_bytes());
88 let stake_address = StakeAddress::new(block.network(), false, stake_pk_hash);
89
90 self.registrations.push(insert_cip36::Params::new(
91 voting_key, slot_no, index, &cip36,
92 ));
93 self.for_vote_key
94 .push(insert_cip36_for_vote_key::Params::new(
95 voting_key, slot_no, index, &cip36, true,
96 ));
97 self.stake_regs
98 .push(certs::StakeRegistrationInsertQuery::new(
99 stake_address,
100 slot_no,
101 index,
102 *stake_pk,
103 false,
104 false,
105 false,
106 true,
107 None,
108 ));
109 },
110 Ok(Some(cip36)) => {
112 if let Some(stake_pk) = cip36.stake_pk() {
114 if cip36.voting_pks().is_empty() {
115 self.invalid.push(insert_cip36_invalid::Params::new(
116 None, slot_no, index, &cip36,
117 ));
118 } else {
119 for voting_key in cip36.voting_pks() {
120 self.invalid.push(insert_cip36_invalid::Params::new(
121 Some(voting_key),
122 slot_no,
123 index,
124 &cip36,
125 ));
126 self.for_vote_key
127 .push(insert_cip36_for_vote_key::Params::new(
128 voting_key, slot_no, index, &cip36, false,
129 ));
130 }
131 }
132
133 let stake_pk_hash = Blake2b224Hash::new(&stake_pk.to_bytes());
134 let stake_address = StakeAddress::new(block.network(), false, stake_pk_hash);
135 self.stake_regs
136 .push(certs::StakeRegistrationInsertQuery::new(
137 stake_address,
138 slot_no,
139 index,
140 *stake_pk,
141 false,
142 false,
143 false,
144 true,
145 None,
146 ));
147 }
148 },
149 _ => {},
150 }
151 Ok(())
152 }
153
154 pub(crate) fn execute(
158 self,
159 session: &Arc<CassandraSession>,
160 ) -> FallibleQueryTasks {
161 let mut query_handles: FallibleQueryTasks = Vec::new();
162
163 if !self.registrations.is_empty() {
164 let inner_session = session.clone();
165 query_handles.push(tokio::spawn(async move {
166 inner_session
167 .execute_batch(
168 PreparedQuery::Cip36RegistrationInsertQuery,
169 self.registrations,
170 )
171 .await
172 }));
173 }
174
175 if !self.invalid.is_empty() {
176 let inner_session = session.clone();
177 query_handles.push(tokio::spawn(async move {
178 inner_session
179 .execute_batch(
180 PreparedQuery::Cip36RegistrationInsertErrorQuery,
181 self.invalid,
182 )
183 .await
184 }));
185 }
186
187 if !self.for_vote_key.is_empty() {
188 let inner_session = session.clone();
189 query_handles.push(tokio::spawn(async move {
190 inner_session
191 .execute_batch(
192 PreparedQuery::Cip36RegistrationForVoteKeyInsertQuery,
193 self.for_vote_key,
194 )
195 .await
196 }));
197 }
198
199 if !self.stake_regs.is_empty() {
200 let inner_session = session.clone();
201 query_handles.push(tokio::spawn(async move {
202 inner_session
203 .execute_batch(PreparedQuery::StakeRegistrationInsertQuery, self.stake_regs)
204 .await
205 }));
206 }
207
208 query_handles
209 }
210}
211
212#[cfg(test)]
213mod tests {
214 use super::*;
215 use crate::db::index::tests::test_utils;
216
217 #[test]
218 fn index() {
219 let block = test_utils::block_2();
220 let mut query = Cip36InsertQuery::new();
221 query.index(0.into(), 0.into(), &block).unwrap();
222 assert_eq!(1, query.registrations.len());
223 assert!(query.invalid.is_empty());
224 assert_eq!(1, query.for_vote_key.len());
225 assert_eq!(1, query.stake_regs.len());
226 }
227}