cat_gateway/db/index/block/txo/
mod.rs

1//! Insert TXO Indexed Data Queries.
2//!
3//! Note, there are multiple ways TXO Data is indexed and they all happen in here.
4
5pub(crate) mod insert_txo;
6pub(crate) mod insert_txo_asset;
7pub(crate) mod insert_unstaked_txo;
8pub(crate) mod insert_unstaked_txo_asset;
9
10use std::sync::Arc;
11
12use cardano_chain_follower::{
13    hashes::TransactionId, Network, Slot, StakeAddress, TxnIndex, TxnOutputOffset,
14};
15use scylla::client::session::Session;
16use tracing::{error, warn};
17
18use crate::{
19    db::index::{
20        queries::{FallibleQueryTasks, PreparedQuery, SizedBatch},
21        session::CassandraSession,
22    },
23    settings::cassandra_db,
24};
25
26/// Insert TXO Query and Parameters
27///
28/// There are multiple possible parameters to a query, which are represented separately.
29pub(crate) struct TxoInsertQuery {
30    /// Staked TXO Data Parameters
31    staked_txo: Vec<insert_txo::Params>,
32    /// Unstaked TXO Data Parameters
33    unstaked_txo: Vec<insert_unstaked_txo::Params>,
34    /// Staked TXO Asset Data Parameters
35    staked_txo_asset: Vec<insert_txo_asset::Params>,
36    /// Unstaked TXO Asset Data Parameters
37    unstaked_txo_asset: Vec<insert_unstaked_txo_asset::Params>,
38}
39
40impl TxoInsertQuery {
41    /// Create a new Insert TXO Query Batch
42    pub(crate) fn new() -> Self {
43        TxoInsertQuery {
44            staked_txo: Vec::new(),
45            unstaked_txo: Vec::new(),
46            staked_txo_asset: Vec::new(),
47            unstaked_txo_asset: Vec::new(),
48        }
49    }
50
51    /// Prepare Batch of Insert TXI Index Data Queries
52    pub(crate) async fn prepare_batch(
53        session: &Arc<Session>,
54        cfg: &cassandra_db::EnvVars,
55    ) -> anyhow::Result<(SizedBatch, SizedBatch, SizedBatch, SizedBatch)> {
56        let txo_staked_insert_batch = insert_txo::Params::prepare_batch(session, cfg).await;
57        let txo_unstaked_insert_batch =
58            insert_unstaked_txo::Params::prepare_batch(session, cfg).await;
59        let txo_staked_asset_insert_batch =
60            insert_txo_asset::Params::prepare_batch(session, cfg).await;
61        let txo_unstaked_asset_insert_batch =
62            insert_unstaked_txo_asset::Params::prepare_batch(session, cfg).await;
63
64        Ok((
65            txo_staked_insert_batch?,
66            txo_unstaked_insert_batch?,
67            txo_staked_asset_insert_batch?,
68            txo_unstaked_asset_insert_batch?,
69        ))
70    }
71
72    /// Extracts a stake address from a TXO if possible.
73    /// Returns None if it is not possible.
74    /// If we want to index, but can not determine a stake key hash, then return a Vec
75    /// with a single 0 byte.    This is because the index DB needs data in the
76    /// primary key, so we use a single byte of 0 to indicate    that there is no
77    /// stake address, and still have a primary key on the table. Otherwise return the
78    /// header and the stake key hash as a vec of 29 bytes.
79    fn extract_stake_address(
80        network: Network,
81        txo: &cardano_chain_follower::pallas_traverse::MultiEraOutput<'_>,
82        slot_no: Slot,
83        txn_id: &str,
84    ) -> Option<(Option<StakeAddress>, String)> {
85        let stake_address = match txo.address() {
86            Ok(address) => {
87                match address {
88                    // Byron addresses do not have stake addresses and are not supported.
89                    cardano_chain_follower::pallas_addresses::Address::Byron(_) => {
90                        return None;
91                    },
92                    cardano_chain_follower::pallas_addresses::Address::Shelley(address) => {
93                        let address_string = match address.to_bech32() {
94                            Ok(address) => address,
95                            Err(error) => {
96                                // Shouldn't happen, but if it does error and don't index.
97                                error!(error=%error, slot=?slot_no, txn=txn_id,"Error converting to bech32: skipping.");
98                                return None;
99                            },
100                        };
101
102                        let address = match address.delegation() {
103                            cardano_chain_follower::pallas_addresses::ShelleyDelegationPart::Script(hash) => {
104                                Some(StakeAddress::new(network, true, (*hash).into()))
105                            },
106                            cardano_chain_follower::pallas_addresses::ShelleyDelegationPart::Key(hash) => {
107                                Some(StakeAddress::new(network, false, (*hash).into()))
108                            },
109                            cardano_chain_follower::pallas_addresses::ShelleyDelegationPart::Pointer(_pointer) => {
110                                // These are not supported from Conway, so we don't support them
111                                // either.
112                                None
113                            },
114                            cardano_chain_follower::pallas_addresses::ShelleyDelegationPart::Null => None,
115                        };
116                        (address, address_string)
117                    },
118                    cardano_chain_follower::pallas_addresses::Address::Stake(_) => {
119                        // This should NOT appear in a TXO, so report if it does. But don't index it
120                        // as a stake address.
121                        warn!(
122                            slot = ?slot_no,
123                            txn = txn_id,
124                            "Unexpected Stake address found in TXO. Refusing to index."
125                        );
126                        return None;
127                    },
128                }
129            },
130            Err(error) => {
131                // This should not ever happen.
132                error!(error=%error, slot = ?slot_no, txn = txn_id, "Failed to get Address from TXO. Skipping TXO.");
133                return None;
134            },
135        };
136
137        Some(stake_address)
138    }
139
140    /// Index the transaction Inputs.
141    pub(crate) fn index(
142        &mut self,
143        network: Network,
144        txn: &cardano_chain_follower::pallas_traverse::MultiEraTx<'_>,
145        slot_no: Slot,
146        txn_hash: TransactionId,
147        index: TxnIndex,
148    ) {
149        let txn_id = txn_hash.to_string();
150
151        // Accumulate all the data we want to insert from this transaction here.
152        for (txo_index, txo) in txn.outputs().iter().enumerate() {
153            // This will only return None if the TXO is not to be indexed (Byron Addresses)
154            let Some((stake_address, address)) =
155                Self::extract_stake_address(network, txo, slot_no, &txn_id)
156            else {
157                continue;
158            };
159
160            let txo_index = TxnOutputOffset::from(txo_index);
161            if let Some(stake_address) = stake_address.clone() {
162                let params = insert_txo::Params::new(
163                    stake_address,
164                    slot_no,
165                    index,
166                    txo_index,
167                    &address,
168                    txo.value().coin(),
169                    txn_hash,
170                );
171                self.staked_txo.push(params);
172            } else {
173                let params = insert_unstaked_txo::Params::new(
174                    txn_hash,
175                    txo_index,
176                    slot_no,
177                    index,
178                    &address,
179                    txo.value().coin(),
180                );
181                self.unstaked_txo.push(params);
182            }
183
184            for asset in txo.value().assets() {
185                let policy_id = asset.policy().to_vec();
186                for policy_asset in asset.assets() {
187                    if policy_asset.is_output() {
188                        let asset_name = policy_asset.name();
189                        let value = policy_asset.any_coin();
190
191                        if let Some(stake_address) = stake_address.clone() {
192                            let params = insert_txo_asset::Params::new(
193                                stake_address,
194                                slot_no,
195                                index,
196                                txo_index,
197                                &policy_id,
198                                asset_name,
199                                value,
200                            );
201                            self.staked_txo_asset.push(params);
202                        } else {
203                            let params = insert_unstaked_txo_asset::Params::new(
204                                txn_hash, txo_index, &policy_id, asset_name, slot_no, index, value,
205                            );
206                            self.unstaked_txo_asset.push(params);
207                        }
208                    } else {
209                        error!("Minting MultiAsset in TXO.");
210                    }
211                }
212            }
213        }
214    }
215
216    /// Index the transaction Inputs.
217    ///
218    /// Consumes `self` and returns a vector of futures.
219    pub(crate) fn execute(
220        self,
221        session: &Arc<CassandraSession>,
222    ) -> FallibleQueryTasks {
223        let mut query_handles: FallibleQueryTasks = Vec::new();
224
225        if !self.staked_txo.is_empty() {
226            let inner_session = session.clone();
227            query_handles.push(tokio::spawn(async move {
228                inner_session
229                    .execute_batch(PreparedQuery::TxoAdaInsertQuery, self.staked_txo)
230                    .await
231            }));
232        }
233
234        if !self.unstaked_txo.is_empty() {
235            let inner_session = session.clone();
236            query_handles.push(tokio::spawn(async move {
237                inner_session
238                    .execute_batch(PreparedQuery::UnstakedTxoAdaInsertQuery, self.unstaked_txo)
239                    .await
240            }));
241        }
242
243        if !self.staked_txo_asset.is_empty() {
244            let inner_session = session.clone();
245            query_handles.push(tokio::spawn(async move {
246                inner_session
247                    .execute_batch(PreparedQuery::TxoAssetInsertQuery, self.staked_txo_asset)
248                    .await
249            }));
250        }
251        if !self.unstaked_txo_asset.is_empty() {
252            let inner_session = session.clone();
253            query_handles.push(tokio::spawn(async move {
254                inner_session
255                    .execute_batch(
256                        PreparedQuery::UnstakedTxoAssetInsertQuery,
257                        self.unstaked_txo_asset,
258                    )
259                    .await
260            }));
261        }
262
263        query_handles
264    }
265}