cat_gateway/service/api/cardano/staking/
assets_get.rs1use std::{
4 collections::{HashMap, HashSet},
5 sync::Arc,
6};
7
8use cardano_chain_follower::{hashes::TransactionId, Slot, StakeAddress, TxnIndex};
9use futures::TryStreamExt;
10use poem_openapi::{payload::Json, ApiResponse};
11
12use crate::{
13 db::index::{
14 queries::staked_ada::{
15 get_assets_by_stake_address::{
16 GetAssetsByStakeAddressParams, GetAssetsByStakeAddressQuery,
17 GetAssetsByStakeAddressQueryKey, GetAssetsByStakeAddressQueryValue,
18 },
19 get_txi_by_txn_hash::{GetTxiByTxnHashesQuery, GetTxiByTxnHashesQueryParams},
20 get_txo_by_stake_address::{
21 GetTxoByStakeAddressQuery, GetTxoByStakeAddressQueryParams,
22 },
23 update_txo_spent::{UpdateTxoSpentQuery, UpdateTxoSpentQueryParams},
24 },
25 session::{CassandraSession, CassandraSessionError},
26 },
27 service::common::{
28 objects::cardano::{
29 network::Network,
30 stake_info::{FullStakeInfo, StakeInfo, StakedTxoAssetInfo},
31 },
32 responses::WithErrorResponses,
33 types::cardano::{
34 ada_value::AdaValue, asset_name::AssetName, asset_value::AssetValue,
35 cip19_stake_address::Cip19StakeAddress, hash28::HexEncodedHash28, slot_no::SlotNo,
36 },
37 },
38 settings::Settings,
39};
40
41#[derive(ApiResponse)]
43pub(crate) enum Responses {
44 #[oai(status = 200)]
48 Ok(Json<FullStakeInfo>),
49 #[oai(status = 404)]
53 NotFound,
54}
55
56pub(crate) type AllResponses = WithErrorResponses<Responses>;
58
59pub(crate) async fn endpoint(
61 stake_address: Cip19StakeAddress,
62 provided_network: Option<Network>,
63 slot_num: Option<SlotNo>,
64) -> AllResponses {
65 match build_full_stake_info_response(stake_address, provided_network, slot_num).await {
66 Ok(None) => Responses::NotFound.into(),
67 Ok(Some(full_stake_info)) => Responses::Ok(Json(full_stake_info)).into(),
68 Err(err) => AllResponses::handle_error(&err),
69 }
70}
71
72#[derive(Clone)]
74struct TxoInfo {
75 value: num_bigint::BigInt,
77 txn_index: TxnIndex,
79 txo: i16,
81 slot_no: Slot,
83 spent_slot_no: Option<Slot>,
85}
86
87async fn build_full_stake_info_response(
89 stake_address: Cip19StakeAddress,
90 provided_network: Option<Network>,
91 slot_num: Option<SlotNo>,
92) -> anyhow::Result<Option<FullStakeInfo>> {
93 if let Some(provided_network) = provided_network {
94 if cardano_chain_follower::Network::from(provided_network) != Settings::cardano_network() {
95 return Ok(None);
96 }
97 }
98 let persistent_session =
99 CassandraSession::get(true).ok_or(CassandraSessionError::FailedAcquiringSession)?;
100 let volatile_session =
101 CassandraSession::get(false).ok_or(CassandraSessionError::FailedAcquiringSession)?;
102 let adjusted_slot_num = slot_num.unwrap_or(SlotNo::MAXIMUM);
103
104 let persistent_txo_state = calculate_assets_state(
105 persistent_session,
106 stake_address.clone(),
107 TxoAssetsState::default(),
108 )
109 .await?;
110
111 let volatile_txo_state = calculate_assets_state(
112 volatile_session,
113 stake_address.clone(),
114 persistent_txo_state.clone(),
115 )
116 .await?;
117
118 if volatile_txo_state.is_empty() && persistent_txo_state.is_empty() {
119 return Ok(None);
120 }
121 let persistent_stake_info = build_stake_info(persistent_txo_state, adjusted_slot_num)?;
122
123 let volatile_stake_info = build_stake_info(volatile_txo_state, adjusted_slot_num)?;
124
125 Ok(Some(FullStakeInfo {
126 volatile: volatile_stake_info.into(),
127 persistent: persistent_stake_info.into(),
128 }))
129}
130
131async fn calculate_assets_state(
136 session: Arc<CassandraSession>,
137 stake_address: Cip19StakeAddress,
138 mut txo_base_state: TxoAssetsState,
139) -> anyhow::Result<TxoAssetsState> {
140 let address: StakeAddress = stake_address.try_into()?;
141
142 let (mut txos, txo_assets) = futures::try_join!(
143 get_txo(&session, &address),
144 get_txo_assets(&session, &address)
145 )?;
146
147 let params = update_spent(&session, &address, &mut txo_base_state.txos, &mut txos).await?;
148
149 let txos = txo_base_state.txos.into_iter().chain(txos).collect();
151 let txo_assets: HashMap<_, _> = txo_base_state
152 .txo_assets
153 .into_iter()
154 .chain(txo_assets)
155 .collect();
156
157 tokio::spawn(async move {
159 if let Err(err) = UpdateTxoSpentQuery::execute(&session, params).await {
160 tracing::error!("Failed to update TXO spent info, err: {err}");
161 }
162 });
163
164 Ok(TxoAssetsState { txos, txo_assets })
165}
166
167type TxoMap = HashMap<(TransactionId, i16), TxoInfo>;
169
170async fn get_txo(
172 session: &CassandraSession,
173 stake_address: &StakeAddress,
174) -> anyhow::Result<TxoMap> {
175 let txos_stream = GetTxoByStakeAddressQuery::execute(
176 session,
177 GetTxoByStakeAddressQueryParams::new(stake_address.clone()),
178 )
179 .await?;
180
181 let txo_map = txos_stream.iter().fold(HashMap::new(), |mut txo_map, row| {
182 let query_value = row.value.read().unwrap_or_else(|err| {
183 tracing::error!(
184 error = %err,
185 "UTXO Assets Cache entry lock is poisoned, recovering lock.");
186 row.value.clear_poison();
187 err.into_inner()
188 });
189 let query_key = &row.key;
190 let key = (query_value.txn_id.into(), query_key.txo.into());
191 txo_map.insert(key, TxoInfo {
192 value: query_value.value.clone(),
193 txn_index: query_key.txn_index.into(),
194 txo: query_key.txo.into(),
195 slot_no: query_key.slot_no.into(),
196 spent_slot_no: query_value.spent_slot.map(Into::into),
197 });
198 txo_map
199 });
200 Ok(txo_map)
201}
202
203type TxoAssetsMap =
205 HashMap<Arc<GetAssetsByStakeAddressQueryKey>, Vec<Arc<GetAssetsByStakeAddressQueryValue>>>;
206
207#[derive(Default, Clone)]
209struct TxoAssetsState {
210 txos: TxoMap,
212 txo_assets: TxoAssetsMap,
214}
215
216impl TxoAssetsState {
217 fn is_empty(&self) -> bool {
219 self.txos.is_empty() && self.txo_assets.is_empty()
220 }
221}
222
223async fn get_txo_assets(
225 session: &CassandraSession,
226 stake_address: &StakeAddress,
227) -> anyhow::Result<TxoAssetsMap> {
228 let assets_txos_stream = GetAssetsByStakeAddressQuery::execute(
229 session,
230 GetAssetsByStakeAddressParams::new(stake_address.clone()),
231 )
232 .await?;
233
234 let tokens_map =
235 assets_txos_stream
236 .iter()
237 .fold(HashMap::new(), |mut tokens_map: TxoAssetsMap, row| {
238 let key = row.key.clone();
239 match tokens_map.entry(key) {
240 std::collections::hash_map::Entry::Occupied(mut o) => {
241 o.get_mut().push(row.value.clone());
242 },
243 std::collections::hash_map::Entry::Vacant(v) => {
244 v.insert(vec![row.value.clone()]);
245 },
246 }
247 tokens_map
248 });
249 Ok(tokens_map)
250}
251
252async fn update_spent(
258 session: &CassandraSession,
259 stake_address: &StakeAddress,
260 base_txos: &mut TxoMap,
261 txos: &mut TxoMap,
262) -> anyhow::Result<Vec<UpdateTxoSpentQueryParams>> {
263 let txn_hashes = txos
264 .iter()
265 .chain(base_txos.iter())
266 .filter(|(_, txo)| txo.spent_slot_no.is_none())
267 .map(|((tx_id, _), _)| *tx_id)
268 .collect::<HashSet<_>>()
269 .into_iter()
270 .collect::<Vec<_>>();
271
272 let mut params = Vec::new();
273
274 for chunk in txn_hashes.chunks(100) {
275 let mut txi_stream = GetTxiByTxnHashesQuery::execute(
276 session,
277 GetTxiByTxnHashesQueryParams::new(chunk.to_vec()),
278 )
279 .await?;
280
281 while let Some(row) = txi_stream.try_next().await? {
282 let key = (row.txn_id.into(), row.txo.into());
283 if let Some(txo_info) = txos.get_mut(&key) {
284 params.push(UpdateTxoSpentQueryParams {
285 stake_address: stake_address.clone().into(),
286 txn_index: txo_info.txn_index.into(),
287 txo: txo_info.txo.into(),
288 slot_no: txo_info.slot_no.into(),
289 spent_slot: row.slot_no,
290 });
291
292 txo_info.spent_slot_no = Some(row.slot_no.into());
293 }
294 if let Some(txo_info) = base_txos.get_mut(&key) {
295 txo_info.spent_slot_no = Some(row.slot_no.into());
296 }
297 }
298 }
299
300 Ok(params)
301}
302
303fn build_stake_info(
305 mut txo_state: TxoAssetsState,
306 slot_num: SlotNo,
307) -> anyhow::Result<StakeInfo> {
308 let slot_num = slot_num.into();
309 let mut total_ada_amount = AdaValue::default();
310 let mut last_slot_num = SlotNo::default();
311 let mut assets = HashMap::<(HexEncodedHash28, AssetName), AssetValue>::new();
312
313 for txo_info in txo_state.txos.into_values() {
314 if txo_info.slot_no >= slot_num {
316 continue;
317 }
318 if let Some(spent_slot) = txo_info.spent_slot_no {
320 if spent_slot <= slot_num {
321 continue;
322 }
323 }
324
325 let value = AdaValue::try_from(txo_info.value)?;
326 total_ada_amount = total_ada_amount.saturating_add(value);
327
328 let key = GetAssetsByStakeAddressQueryKey {
329 slot_no: txo_info.slot_no.into(),
330 txn_index: txo_info.txn_index.into(),
331 txo: txo_info.txo.into(),
332 };
333 if let Some(native_assets) = txo_state.txo_assets.remove(&key) {
334 for native_asset in native_assets {
335 let amount = (&native_asset.value).into();
336 match assets.entry((
337 (&native_asset.policy_id).try_into()?,
338 (&native_asset.asset_name).into(),
339 )) {
340 std::collections::hash_map::Entry::Occupied(mut o) => {
341 *o.get_mut() = o.get().saturating_add(&amount);
342 },
343 std::collections::hash_map::Entry::Vacant(v) => {
344 v.insert(amount.clone());
345 },
346 }
347 }
348 }
349
350 let slot_no = txo_info.slot_no.into();
351 if last_slot_num < slot_no {
352 last_slot_num = slot_no;
353 }
354 }
355
356 Ok(StakeInfo {
357 ada_amount: total_ada_amount,
358 slot_number: last_slot_num,
359 assets: assets
360 .into_iter()
361 .map(|((policy_hash, asset_name), amount)| {
362 StakedTxoAssetInfo {
363 policy_hash,
364 asset_name,
365 amount,
366 }
367 })
368 .collect::<Vec<_>>()
369 .into(),
370 })
371}