cat_gateway/service/api/cardano/staking/
assets_get.rs

1//! Implementation of the GET `../assets` endpoint
2
3use std::{
4    collections::{HashMap, HashSet},
5    sync::Arc,
6};
7
8use cardano_chain_follower::{hashes::TransactionId, Slot, StakeAddress, TxnIndex};
9use futures::TryStreamExt;
10use poem_openapi::{payload::Json, ApiResponse};
11
12use crate::{
13    db::index::{
14        queries::staked_ada::{
15            get_assets_by_stake_address::{
16                GetAssetsByStakeAddressParams, GetAssetsByStakeAddressQuery,
17                GetAssetsByStakeAddressQueryKey, GetAssetsByStakeAddressQueryValue,
18            },
19            get_txi_by_txn_hash::{GetTxiByTxnHashesQuery, GetTxiByTxnHashesQueryParams},
20            get_txo_by_stake_address::{
21                GetTxoByStakeAddressQuery, GetTxoByStakeAddressQueryParams,
22            },
23            update_txo_spent::{UpdateTxoSpentQuery, UpdateTxoSpentQueryParams},
24        },
25        session::{CassandraSession, CassandraSessionError},
26    },
27    service::common::{
28        objects::cardano::{
29            network::Network,
30            stake_info::{FullStakeInfo, StakeInfo, StakedTxoAssetInfo},
31        },
32        responses::WithErrorResponses,
33        types::cardano::{
34            ada_value::AdaValue, asset_name::AssetName, asset_value::AssetValue,
35            cip19_stake_address::Cip19StakeAddress, hash28::HexEncodedHash28, slot_no::SlotNo,
36        },
37    },
38    settings::Settings,
39};
40
41/// Endpoint responses.
42#[derive(ApiResponse)]
43pub(crate) enum Responses {
44    /// ## Ok
45    ///
46    /// The amount of ADA staked by the queried stake address, as at the indicated slot.
47    #[oai(status = 200)]
48    Ok(Json<FullStakeInfo>),
49    /// ## Not Found
50    ///
51    /// The queried stake address was not found at the requested slot number.
52    #[oai(status = 404)]
53    NotFound,
54}
55
56/// All responses.
57pub(crate) type AllResponses = WithErrorResponses<Responses>;
58
59/// # GET `/staked_ada`
60pub(crate) async fn endpoint(
61    stake_address: Cip19StakeAddress,
62    provided_network: Option<Network>,
63    slot_num: Option<SlotNo>,
64) -> AllResponses {
65    match build_full_stake_info_response(stake_address, provided_network, slot_num).await {
66        Ok(None) => Responses::NotFound.into(),
67        Ok(Some(full_stake_info)) => Responses::Ok(Json(full_stake_info)).into(),
68        Err(err) => AllResponses::handle_error(&err),
69    }
70}
71
72/// TXO information used when calculating a user's stake info.
73#[derive(Clone)]
74struct TxoInfo {
75    /// TXO value.
76    value: num_bigint::BigInt,
77    /// TXO transaction index within the slot.
78    txn_index: TxnIndex,
79    /// TXO index.
80    txo: i16,
81    /// TXO transaction slot number.
82    slot_no: Slot,
83    /// Whether the TXO was spent.
84    spent_slot_no: Option<Slot>,
85}
86
87/// Building a full stake info response from the provided arguments.
88async fn build_full_stake_info_response(
89    stake_address: Cip19StakeAddress,
90    provided_network: Option<Network>,
91    slot_num: Option<SlotNo>,
92) -> anyhow::Result<Option<FullStakeInfo>> {
93    if let Some(provided_network) = provided_network {
94        if cardano_chain_follower::Network::from(provided_network) != Settings::cardano_network() {
95            return Ok(None);
96        }
97    }
98    let persistent_session =
99        CassandraSession::get(true).ok_or(CassandraSessionError::FailedAcquiringSession)?;
100    let volatile_session =
101        CassandraSession::get(false).ok_or(CassandraSessionError::FailedAcquiringSession)?;
102    let adjusted_slot_num = slot_num.unwrap_or(SlotNo::MAXIMUM);
103
104    let persistent_txo_state = calculate_assets_state(
105        persistent_session,
106        stake_address.clone(),
107        TxoAssetsState::default(),
108    )
109    .await?;
110
111    let volatile_txo_state = calculate_assets_state(
112        volatile_session,
113        stake_address.clone(),
114        persistent_txo_state.clone(),
115    )
116    .await?;
117
118    if volatile_txo_state.is_empty() && persistent_txo_state.is_empty() {
119        return Ok(None);
120    }
121    let persistent_stake_info = build_stake_info(persistent_txo_state, adjusted_slot_num)?;
122
123    let volatile_stake_info = build_stake_info(volatile_txo_state, adjusted_slot_num)?;
124
125    Ok(Some(FullStakeInfo {
126        volatile: volatile_stake_info.into(),
127        persistent: persistent_stake_info.into(),
128    }))
129}
130
131/// Calculate the assets state info for a given stake address.
132///
133/// This function also updates the spent column if it detects that a TXO was spent
134/// between lookups.
135async fn calculate_assets_state(
136    session: Arc<CassandraSession>,
137    stake_address: Cip19StakeAddress,
138    mut txo_base_state: TxoAssetsState,
139) -> anyhow::Result<TxoAssetsState> {
140    let address: StakeAddress = stake_address.try_into()?;
141
142    let (mut txos, txo_assets) = futures::try_join!(
143        get_txo(&session, &address),
144        get_txo_assets(&session, &address)
145    )?;
146
147    let params = update_spent(&session, &address, &mut txo_base_state.txos, &mut txos).await?;
148
149    // Extend the base state with current session data (used to calculate volatile data)
150    let txos = txo_base_state.txos.into_iter().chain(txos).collect();
151    let txo_assets: HashMap<_, _> = txo_base_state
152        .txo_assets
153        .into_iter()
154        .chain(txo_assets)
155        .collect();
156
157    // Sets TXOs as spent in the database in the background.
158    tokio::spawn(async move {
159        if let Err(err) = UpdateTxoSpentQuery::execute(&session, params).await {
160            tracing::error!("Failed to update TXO spent info, err: {err}");
161        }
162    });
163
164    Ok(TxoAssetsState { txos, txo_assets })
165}
166
167/// `TxoInfo` map type alias
168type TxoMap = HashMap<(TransactionId, i16), TxoInfo>;
169
170/// Returns a map of TXO infos for the given stake address.
171async fn get_txo(
172    session: &CassandraSession,
173    stake_address: &StakeAddress,
174) -> anyhow::Result<TxoMap> {
175    let txos_stream = GetTxoByStakeAddressQuery::execute(
176        session,
177        GetTxoByStakeAddressQueryParams::new(stake_address.clone()),
178    )
179    .await?;
180
181    let txo_map = txos_stream.iter().fold(HashMap::new(), |mut txo_map, row| {
182        let query_value = row.value.read().unwrap_or_else(|err| {
183            tracing::error!(
184                    error = %err,
185                    "UTXO Assets Cache entry lock is poisoned, recovering lock.");
186            row.value.clear_poison();
187            err.into_inner()
188        });
189        let query_key = &row.key;
190        let key = (query_value.txn_id.into(), query_key.txo.into());
191        txo_map.insert(key, TxoInfo {
192            value: query_value.value.clone(),
193            txn_index: query_key.txn_index.into(),
194            txo: query_key.txo.into(),
195            slot_no: query_key.slot_no.into(),
196            spent_slot_no: query_value.spent_slot.map(Into::into),
197        });
198        txo_map
199    });
200    Ok(txo_map)
201}
202
203/// TXO Assets map type alias
204type TxoAssetsMap =
205    HashMap<Arc<GetAssetsByStakeAddressQueryKey>, Vec<Arc<GetAssetsByStakeAddressQueryValue>>>;
206
207/// TXO Assets state
208#[derive(Default, Clone)]
209struct TxoAssetsState {
210    /// TXO Info map
211    txos: TxoMap,
212    /// TXO Assets map
213    txo_assets: TxoAssetsMap,
214}
215
216impl TxoAssetsState {
217    /// Returns true if underlying `txos` and `txo_assets` are empty, false otherwise
218    fn is_empty(&self) -> bool {
219        self.txos.is_empty() && self.txo_assets.is_empty()
220    }
221}
222
223/// Returns a map of txo asset infos for the given stake address.
224async fn get_txo_assets(
225    session: &CassandraSession,
226    stake_address: &StakeAddress,
227) -> anyhow::Result<TxoAssetsMap> {
228    let assets_txos_stream = GetAssetsByStakeAddressQuery::execute(
229        session,
230        GetAssetsByStakeAddressParams::new(stake_address.clone()),
231    )
232    .await?;
233
234    let tokens_map =
235        assets_txos_stream
236            .iter()
237            .fold(HashMap::new(), |mut tokens_map: TxoAssetsMap, row| {
238                let key = row.key.clone();
239                match tokens_map.entry(key) {
240                    std::collections::hash_map::Entry::Occupied(mut o) => {
241                        o.get_mut().push(row.value.clone());
242                    },
243                    std::collections::hash_map::Entry::Vacant(v) => {
244                        v.insert(vec![row.value.clone()]);
245                    },
246                }
247                tokens_map
248            });
249    Ok(tokens_map)
250}
251
252/// Checks if the given TXOs were spent and mark then as such.
253/// Separating `base_txos` and `txos` because we dont want to make an update inside the db
254/// for the `base_txos` data (it is covering the case when inside the persistent part we
255/// have a txo which is spent inside the volatile, so to not incorrectly mix up records
256/// from these two tables, inserting some rows from persistent to volatile section).
257async fn update_spent(
258    session: &CassandraSession,
259    stake_address: &StakeAddress,
260    base_txos: &mut TxoMap,
261    txos: &mut TxoMap,
262) -> anyhow::Result<Vec<UpdateTxoSpentQueryParams>> {
263    let txn_hashes = txos
264        .iter()
265        .chain(base_txos.iter())
266        .filter(|(_, txo)| txo.spent_slot_no.is_none())
267        .map(|((tx_id, _), _)| *tx_id)
268        .collect::<HashSet<_>>()
269        .into_iter()
270        .collect::<Vec<_>>();
271
272    let mut params = Vec::new();
273
274    for chunk in txn_hashes.chunks(100) {
275        let mut txi_stream = GetTxiByTxnHashesQuery::execute(
276            session,
277            GetTxiByTxnHashesQueryParams::new(chunk.to_vec()),
278        )
279        .await?;
280
281        while let Some(row) = txi_stream.try_next().await? {
282            let key = (row.txn_id.into(), row.txo.into());
283            if let Some(txo_info) = txos.get_mut(&key) {
284                params.push(UpdateTxoSpentQueryParams {
285                    stake_address: stake_address.clone().into(),
286                    txn_index: txo_info.txn_index.into(),
287                    txo: txo_info.txo.into(),
288                    slot_no: txo_info.slot_no.into(),
289                    spent_slot: row.slot_no,
290                });
291
292                txo_info.spent_slot_no = Some(row.slot_no.into());
293            }
294            if let Some(txo_info) = base_txos.get_mut(&key) {
295                txo_info.spent_slot_no = Some(row.slot_no.into());
296            }
297        }
298    }
299
300    Ok(params)
301}
302
303/// Builds an instance of [`StakeInfo`] based on the TXOs given.
304fn build_stake_info(
305    mut txo_state: TxoAssetsState,
306    slot_num: SlotNo,
307) -> anyhow::Result<StakeInfo> {
308    let slot_num = slot_num.into();
309    let mut total_ada_amount = AdaValue::default();
310    let mut last_slot_num = SlotNo::default();
311    let mut assets = HashMap::<(HexEncodedHash28, AssetName), AssetValue>::new();
312
313    for txo_info in txo_state.txos.into_values() {
314        // Filter out spent TXOs.
315        if txo_info.slot_no >= slot_num {
316            continue;
317        }
318        // Filter out spent TXOs.
319        if let Some(spent_slot) = txo_info.spent_slot_no {
320            if spent_slot <= slot_num {
321                continue;
322            }
323        }
324
325        let value = AdaValue::try_from(txo_info.value)?;
326        total_ada_amount = total_ada_amount.saturating_add(value);
327
328        let key = GetAssetsByStakeAddressQueryKey {
329            slot_no: txo_info.slot_no.into(),
330            txn_index: txo_info.txn_index.into(),
331            txo: txo_info.txo.into(),
332        };
333        if let Some(native_assets) = txo_state.txo_assets.remove(&key) {
334            for native_asset in native_assets {
335                let amount = (&native_asset.value).into();
336                match assets.entry((
337                    (&native_asset.policy_id).try_into()?,
338                    (&native_asset.asset_name).into(),
339                )) {
340                    std::collections::hash_map::Entry::Occupied(mut o) => {
341                        *o.get_mut() = o.get().saturating_add(&amount);
342                    },
343                    std::collections::hash_map::Entry::Vacant(v) => {
344                        v.insert(amount.clone());
345                    },
346                }
347            }
348        }
349
350        let slot_no = txo_info.slot_no.into();
351        if last_slot_num < slot_no {
352            last_slot_num = slot_no;
353        }
354    }
355
356    Ok(StakeInfo {
357        ada_amount: total_ada_amount,
358        slot_number: last_slot_num,
359        assets: assets
360            .into_iter()
361            .map(|((policy_hash, asset_name), amount)| {
362                StakedTxoAssetInfo {
363                    policy_hash,
364                    asset_name,
365                    amount,
366                }
367            })
368            .collect::<Vec<_>>()
369            .into(),
370    })
371}