cat_gateway/db/index/queries/caches/assets/
ada.rs1use std::sync::Arc;
3
4use moka::policy::EvictionPolicy;
5use tracing::{debug, error};
6
7use crate::{
8 db::{
9 index::queries::staked_ada::{
10 get_txo_by_stake_address::{GetTxoByStakeAddressQuery, GetTxoByStakeAddressQueryKey},
11 update_txo_spent::UpdateTxoSpentQueryParams,
12 },
13 types::DbStakeAddress,
14 },
15 metrics::caches::txo_assets::{txo_assets_hits_inc, txo_assets_misses_inc},
16 service::utilities::cache::Cache,
17 settings::Settings,
18};
19
20pub(crate) struct AssetsAdaCache {
22 inner: Cache<DbStakeAddress, Arc<Vec<GetTxoByStakeAddressQuery>>>,
24}
25
26impl AssetsAdaCache {
27 const CACHE_NAME: &str = "Cardano UTXO ADA Assets Cache";
29
30 pub(crate) fn new(is_persistent: bool) -> Self {
37 let max_capacity = if is_persistent {
38 Settings::cardano_assets_cache().utxo_cache_size()
39 } else {
40 0
41 };
42 Self {
43 inner: Cache::new(Self::CACHE_NAME, EvictionPolicy::lru(), max_capacity),
44 }
45 }
46
47 pub(crate) fn get(
49 &self,
50 key: &DbStakeAddress,
51 ) -> Option<Arc<Vec<GetTxoByStakeAddressQuery>>> {
52 if !self.inner.is_enabled() {
54 return None;
55 }
56 self.inner
57 .get(key)
58 .inspect(|_| txo_assets_hits_inc())
59 .or_else(|| {
60 txo_assets_misses_inc();
61 None
62 })
63 }
64
65 pub(crate) fn insert(
67 &self,
68 key: DbStakeAddress,
69 value: Arc<Vec<GetTxoByStakeAddressQuery>>,
70 ) {
71 self.inner.insert(key, value);
72 }
73
74 pub(crate) fn clear_cache(&self) {
76 self.inner.clear_cache();
77 }
78
79 pub(crate) fn weighted_size(&self) -> u64 {
81 self.inner.weighted_size()
82 }
83
84 pub(crate) fn entry_count(&self) -> u64 {
86 self.inner.entry_count()
87 }
88
89 pub(crate) fn update(
91 &self,
92 params: Vec<UpdateTxoSpentQueryParams>,
93 ) {
94 if !self.inner.is_enabled() {
96 return;
97 }
98 for txo_update in params {
99 let stake_address = &txo_update.stake_address;
100 let update_key = &GetTxoByStakeAddressQueryKey {
101 txn_index: txo_update.txn_index,
102 txo: txo_update.txo,
103 slot_no: txo_update.slot_no,
104 };
105 let Some(txo_rows) = self.inner.get(stake_address) else {
106 debug!(
107 %stake_address,
108 txn_index = i16::from(txo_update.txn_index),
109 txo = i16::from(txo_update.txo),
110 slot_no = u64::from(txo_update.slot_no),
111 "Stake Address not found in TXO Assets Cache, cannot update.");
112 continue;
113 };
114 let Some(txo) = txo_rows.iter().find(|tx| tx.key.as_ref() == update_key) else {
115 debug!(
116 %stake_address,
117 txn_index = i16::from(update_key.txn_index),
118 txo = i16::from(update_key.txo),
119 slot_no = u64::from(update_key.slot_no),
120 "TXO for Stake Address does not exist, cannot update.");
121 continue;
122 };
123 if txo.is_spent() {
125 debug!(
126 %stake_address,
127 txn_index = i16::from(update_key.txn_index),
128 txo = i16::from(update_key.txo),
129 slot_no = u64::from(update_key.slot_no),
130 "TXO for Stake Address was already spent!");
131 continue;
132 }
133
134 let mut value = txo.value.write().unwrap_or_else(|error| {
135 error!(
136 %stake_address,
137 txn_index = i16::from(update_key.txn_index),
138 txo = i16::from(update_key.txo),
139 slot_no = u64::from(update_key.slot_no),
140 %error,
141 "RwLock for cache entry is poisoned, recovering.");
142 txo.value.clear_poison();
143 error.into_inner()
144 });
145 value.spent_slot.replace(txo_update.spent_slot);
147 debug!(
148 %stake_address,
149 txn_index = i16::from(update_key.txn_index),
150 txo = i16::from(update_key.txo),
151 slot_no = u64::from(update_key.slot_no),
152 "Updated TXO for Stake Address");
153 }
154 }
155}