cat_gateway/db/index/block/txo/
mod.rs1pub(crate) mod insert_txo;
6pub(crate) mod insert_txo_asset;
7pub(crate) mod insert_unstaked_txo;
8pub(crate) mod insert_unstaked_txo_asset;
9
10use std::sync::Arc;
11
12use cardano_chain_follower::{
13 hashes::TransactionId, Network, Slot, StakeAddress, TxnIndex, TxnOutputOffset,
14};
15use scylla::client::session::Session;
16use tracing::{error, warn};
17
18use crate::{
19 db::index::{
20 queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
21 session::CassandraSession,
22 },
23 settings::cassandra_db,
24};
25
26pub(crate) struct TxoInsertQuery {
30 staked_txo: Vec<insert_txo::Params>,
32 unstaked_txo: Vec<insert_unstaked_txo::Params>,
34 staked_txo_asset: Vec<insert_txo_asset::Params>,
36 unstaked_txo_asset: Vec<insert_unstaked_txo_asset::Params>,
38}
39
40impl TxoInsertQuery {
41 pub(crate) fn new() -> Self {
43 TxoInsertQuery {
44 staked_txo: Vec::new(),
45 unstaked_txo: Vec::new(),
46 staked_txo_asset: Vec::new(),
47 unstaked_txo_asset: Vec::new(),
48 }
49 }
50
51 pub(crate) async fn prepare_batch(
53 session: &Arc<Session>,
54 cfg: &cassandra_db::EnvVars,
55 ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
56 let txo_staked_insert_batch = insert_txo::Params::prepare_batch(session, cfg).await;
57 let txo_unstaked_insert_batch =
58 insert_unstaked_txo::Params::prepare_batch(session, cfg).await;
59 let txo_staked_asset_insert_batch =
60 insert_txo_asset::Params::prepare_batch(session, cfg).await;
61 let txo_unstaked_asset_insert_batch =
62 insert_unstaked_txo_asset::Params::prepare_batch(session, cfg).await;
63
64 Ok((
65 txo_staked_insert_batch?,
66 txo_unstaked_insert_batch?,
67 txo_staked_asset_insert_batch?,
68 txo_unstaked_asset_insert_batch?,
69 ))
70 }
71
72 fn extract_stake_address(
80 network: Network,
81 txo: &cardano_chain_follower::pallas_traverse::MultiEraOutput<'_>,
82 slot_no: Slot,
83 txn_id: &str,
84 ) -> Option<(Option<StakeAddress>, String)> {
85 let stake_address = match txo.address() {
86 Ok(address) => {
87 match address {
88 cardano_chain_follower::pallas_addresses::Address::Byron(_) => {
90 return None;
91 },
92 cardano_chain_follower::pallas_addresses::Address::Shelley(address) => {
93 let address_string = match address.to_bech32() {
94 Ok(address) => address,
95 Err(error) => {
96 error!(error=%error, slot=?slot_no, txn=txn_id,"Error converting to bech32: skipping.");
98 return None;
99 },
100 };
101
102 let address = match address.delegation() {
103 cardano_chain_follower::pallas_addresses::ShelleyDelegationPart::Script(hash) => {
104 Some(StakeAddress::new(network, true, (*hash).into()))
105 },
106 cardano_chain_follower::pallas_addresses::ShelleyDelegationPart::Key(hash) => {
107 Some(StakeAddress::new(network, false, (*hash).into()))
108 },
109 cardano_chain_follower::pallas_addresses::ShelleyDelegationPart::Pointer(_pointer) => {
110 None
113 },
114 cardano_chain_follower::pallas_addresses::ShelleyDelegationPart::Null => None,
115 };
116 (address, address_string)
117 },
118 cardano_chain_follower::pallas_addresses::Address::Stake(_) => {
119 warn!(
122 slot = ?slot_no,
123 txn = txn_id,
124 "Unexpected Stake address found in TXO. Refusing to index."
125 );
126 return None;
127 },
128 }
129 },
130 Err(error) => {
131 error!(error=%error, slot = ?slot_no, txn = txn_id, "Failed to get Address from TXO. Skipping TXO.");
133 return None;
134 },
135 };
136
137 Some(stake_address)
138 }
139
140 pub(crate) fn index(
142 &mut self,
143 network: Network,
144 txn: &cardano_chain_follower::pallas_traverse::MultiEraTx<'_>,
145 slot_no: Slot,
146 txn_hash: TransactionId,
147 index: TxnIndex,
148 ) {
149 let txn_id = txn_hash.to_string();
150
151 for (txo_index, txo) in txn.outputs().iter().enumerate() {
153 let Some((stake_address, address)) =
155 Self::extract_stake_address(network, txo, slot_no, &txn_id)
156 else {
157 continue;
158 };
159
160 let txo_index = TxnOutputOffset::from(txo_index);
161 if let Some(stake_address) = stake_address.clone() {
162 let params = insert_txo::Params::new(
163 stake_address,
164 slot_no,
165 index,
166 txo_index,
167 &address,
168 txo.value().coin(),
169 txn_hash,
170 );
171 self.staked_txo.push(params);
172 } else {
173 let params = insert_unstaked_txo::Params::new(
174 txn_hash,
175 txo_index,
176 slot_no,
177 index,
178 &address,
179 txo.value().coin(),
180 );
181 self.unstaked_txo.push(params);
182 }
183
184 for asset in txo.value().assets() {
185 let policy_id = asset.policy().to_vec();
186 for policy_asset in asset.assets() {
187 if policy_asset.is_output() {
188 let asset_name = policy_asset.name();
189 let value = policy_asset.any_coin();
190
191 if let Some(stake_address) = stake_address.clone() {
192 let params = insert_txo_asset::Params::new(
193 stake_address,
194 slot_no,
195 index,
196 txo_index,
197 &policy_id,
198 asset_name,
199 value,
200 );
201 self.staked_txo_asset.push(params);
202 } else {
203 let params = insert_unstaked_txo_asset::Params::new(
204 txn_hash, txo_index, &policy_id, asset_name, slot_no, index, value,
205 );
206 self.unstaked_txo_asset.push(params);
207 }
208 } else {
209 error!("Minting MultiAsset in TXO.");
210 }
211 }
212 }
213 }
214 }
215
216 pub(crate) fn execute(
220 self,
221 session: &Arc<CassandraSession>,
222 ) -> FallibleQueryTasks {
223 let mut query_handles: FallibleQueryTasks = Vec::new();
224
225 if !self.staked_txo.is_empty() {
226 let inner_session = session.clone();
227 query_handles.push(tokio::spawn(async move {
228 inner_session
229 .execute_batch(PreparedQuery::TxoAdaInsertQuery, self.staked_txo)
230 .await
231 }));
232 }
233
234 if !self.unstaked_txo.is_empty() {
235 let inner_session = session.clone();
236 query_handles.push(tokio::spawn(async move {
237 inner_session
238 .execute_batch(PreparedQuery::UnstakedTxoAdaInsertQuery, self.unstaked_txo)
239 .await
240 }));
241 }
242
243 if !self.staked_txo_asset.is_empty() {
244 let inner_session = session.clone();
245 query_handles.push(tokio::spawn(async move {
246 inner_session
247 .execute_batch(PreparedQuery::TxoAssetInsertQuery, self.staked_txo_asset)
248 .await
249 }));
250 }
251 if !self.unstaked_txo_asset.is_empty() {
252 let inner_session = session.clone();
253 query_handles.push(tokio::spawn(async move {
254 inner_session
255 .execute_batch(
256 PreparedQuery::UnstakedTxoAssetInsertQuery,
257 self.unstaked_txo_asset,
258 )
259 .await
260 }));
261 }
262
263 query_handles
264 }
265}