cat_gateway/db/index/block/cip36/
mod.rs1pub(crate) mod insert_cip36;
4pub(crate) mod insert_cip36_for_vote_key;
5pub(crate) mod insert_cip36_invalid;
6
7use std::sync::Arc;
8
9use cardano_chain_follower::{
10 Cip36, MultiEraBlock, Slot, StakeAddress, TxnIndex, hashes::Blake2b224Hash,
11};
12use scylla::client::session::Session;
13
14use super::certs;
15use crate::{
16 db::index::{
17 queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
18 session::CassandraSession,
19 },
20 settings::cassandra_db,
21};
22
23pub(crate) struct Cip36InsertQuery {
25 registrations: Vec<insert_cip36::Params>,
27 invalid: Vec<insert_cip36_invalid::Params>,
29 for_vote_key: Vec<insert_cip36_for_vote_key::Params>,
31 stake_regs: Vec<certs::StakeRegistrationInsertQuery>,
33}
34
35impl Cip36InsertQuery {
36 pub(crate) fn new() -> Self {
38 Cip36InsertQuery {
39 registrations: Vec::new(),
40 invalid: Vec::new(),
41 for_vote_key: Vec::new(),
42 stake_regs: Vec::new(),
43 }
44 }
45
46 pub(crate) async fn prepare_batch(
48 session: &Arc<Session>,
49 cfg: &cassandra_db::EnvVars,
50 ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch)> {
51 let insert_cip36_batch = insert_cip36::Params::prepare_batch(session, cfg).await;
52 let insert_cip36_invalid_batch =
53 insert_cip36_invalid::Params::prepare_batch(session, cfg).await;
54 let insert_cip36_for_vote_key_addr_batch =
55 insert_cip36_for_vote_key::Params::prepare_batch(session, cfg).await;
56 Ok((
61 insert_cip36_batch?,
62 insert_cip36_invalid_batch?,
63 insert_cip36_for_vote_key_addr_batch?,
64 ))
65 }
66
67 pub(crate) fn index(
69 &mut self,
70 index: TxnIndex,
71 slot_no: Slot,
72 block: &MultiEraBlock,
73 ) -> anyhow::Result<()> {
74 match Cip36::new(block, index, true) {
76 Ok(Some(cip36)) if cip36.is_valid() => {
79 let voting_key = cip36.voting_pks().first().ok_or(anyhow::anyhow!(
81 "Valid CIP36 registration must have one voting key"
82 ))?;
83
84 let stake_pk = cip36.stake_pk().ok_or(anyhow::anyhow!(
85 "Valid CIP36 registration must have one stake public key"
86 ))?;
87 let stake_pk_hash = Blake2b224Hash::new(&stake_pk.to_bytes());
88 let stake_address =
89 StakeAddress::new(block.network().clone(), false, stake_pk_hash);
90
91 self.registrations.push(insert_cip36::Params::new(
92 voting_key, slot_no, index, &cip36,
93 ));
94 self.for_vote_key
95 .push(insert_cip36_for_vote_key::Params::new(
96 voting_key, slot_no, index, &cip36, true,
97 ));
98 self.stake_regs
99 .push(certs::StakeRegistrationInsertQuery::new(
100 stake_address,
101 slot_no,
102 index,
103 *stake_pk,
104 false,
105 false,
106 false,
107 true,
108 None,
109 ));
110 },
111 Ok(Some(cip36)) => {
113 if let Some(stake_pk) = cip36.stake_pk() {
115 if cip36.voting_pks().is_empty() {
116 self.invalid.push(insert_cip36_invalid::Params::new(
117 None, slot_no, index, &cip36,
118 ));
119 } else {
120 for voting_key in cip36.voting_pks() {
121 self.invalid.push(insert_cip36_invalid::Params::new(
122 Some(voting_key),
123 slot_no,
124 index,
125 &cip36,
126 ));
127 self.for_vote_key
128 .push(insert_cip36_for_vote_key::Params::new(
129 voting_key, slot_no, index, &cip36, false,
130 ));
131 }
132 }
133
134 let stake_pk_hash = Blake2b224Hash::new(&stake_pk.to_bytes());
135 let stake_address =
136 StakeAddress::new(block.network().clone(), false, stake_pk_hash);
137 self.stake_regs
138 .push(certs::StakeRegistrationInsertQuery::new(
139 stake_address,
140 slot_no,
141 index,
142 *stake_pk,
143 false,
144 false,
145 false,
146 true,
147 None,
148 ));
149 }
150 },
151 _ => {},
152 }
153 Ok(())
154 }
155
156 pub(crate) fn execute(
160 self,
161 session: &Arc<CassandraSession>,
162 ) -> FallibleQueryTasks {
163 let mut query_handles: FallibleQueryTasks = Vec::new();
164
165 if !self.registrations.is_empty() {
166 let inner_session = session.clone();
167 query_handles.push(tokio::spawn(async move {
168 inner_session
169 .execute_batch(
170 PreparedQuery::Cip36RegistrationInsertQuery,
171 self.registrations,
172 )
173 .await
174 }));
175 }
176
177 if !self.invalid.is_empty() {
178 let inner_session = session.clone();
179 query_handles.push(tokio::spawn(async move {
180 inner_session
181 .execute_batch(
182 PreparedQuery::Cip36RegistrationInsertErrorQuery,
183 self.invalid,
184 )
185 .await
186 }));
187 }
188
189 if !self.for_vote_key.is_empty() {
190 let inner_session = session.clone();
191 query_handles.push(tokio::spawn(async move {
192 inner_session
193 .execute_batch(
194 PreparedQuery::Cip36RegistrationForVoteKeyInsertQuery,
195 self.for_vote_key,
196 )
197 .await
198 }));
199 }
200
201 if !self.stake_regs.is_empty() {
202 let inner_session = session.clone();
203 query_handles.push(tokio::spawn(async move {
204 inner_session
205 .execute_batch(PreparedQuery::StakeRegistrationInsertQuery, self.stake_regs)
206 .await
207 }));
208 }
209
210 query_handles
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use crate::db::index::tests::test_utils;
218
219 #[test]
220 fn index() {
221 let block = test_utils::block_2();
222 let mut query = Cip36InsertQuery::new();
223 query.index(0.into(), 0.into(), &block).unwrap();
224 assert_eq!(1, query.registrations.len());
225 assert!(query.invalid.is_empty());
226 assert_eq!(1, query.for_vote_key.len());
227 assert_eq!(1, query.stake_regs.len());
228 }
229}