cat_gateway/db/index/queries/caches/assets/
ada.rs

1//! Cache for TXO ADA Assets by Stake Address Queries
2use std::sync::Arc;
3
4use moka::policy::EvictionPolicy;
5use tracing::{debug, error};
6
7use crate::{
8    db::{
9        index::queries::staked_ada::{
10            get_txo_by_stake_address::{GetTxoByStakeAddressQuery, GetTxoByStakeAddressQueryKey},
11            update_txo_spent::UpdateTxoSpentQueryParams,
12        },
13        types::DbStakeAddress,
14    },
15    metrics::caches::txo_assets::{txo_assets_hits_inc, txo_assets_misses_inc},
16    service::utilities::cache::Cache,
17    settings::Settings,
18};
19
20/// Cache for TXO ADA Assets by Stake Address Queries for Persistent Sessions
21pub(crate) struct AssetsAdaCache {
22    /// Interior cache type.
23    inner: Cache<DbStakeAddress, Arc<Vec<GetTxoByStakeAddressQuery>>>,
24}
25
26impl AssetsAdaCache {
27    /// Name for cache
28    const CACHE_NAME: &str = "Cardano UTXO ADA Assets Cache";
29
30    /// New ADA Assets Cache instance.
31    ///
32    /// # Arguments
33    /// * `is_persistent` - If the `CassandraSession` is persistent.
34    ///
35    /// Disables the cache for Volatile sessions (`is_persistent` is `false`).
36    pub(crate) fn new(is_persistent: bool) -> Self {
37        let max_capacity = if is_persistent {
38            Settings::cardano_assets_cache().utxo_cache_size()
39        } else {
40            0
41        };
42        Self {
43            inner: Cache::new(Self::CACHE_NAME, EvictionPolicy::lru(), max_capacity),
44        }
45    }
46
47    /// Get an entry from the cache
48    pub(crate) fn get(
49        &self,
50        key: &DbStakeAddress,
51    ) -> Option<Arc<Vec<GetTxoByStakeAddressQuery>>> {
52        // Exit if cache is not enabled to avoid metric updates.
53        if !self.inner.is_enabled() {
54            return None;
55        }
56        self.inner
57            .get(key)
58            .inspect(|_| txo_assets_hits_inc())
59            .or_else(|| {
60                txo_assets_misses_inc();
61                None
62            })
63    }
64
65    /// Get an entry from the cache
66    pub(crate) fn insert(
67        &self,
68        key: DbStakeAddress,
69        value: Arc<Vec<GetTxoByStakeAddressQuery>>,
70    ) {
71        self.inner.insert(key, value);
72    }
73
74    /// Clear (invalidates) the cache entries.
75    pub(crate) fn clear_cache(&self) {
76        self.inner.clear_cache();
77    }
78
79    /// Weighted-size of the cache.
80    pub(crate) fn weighted_size(&self) -> u64 {
81        self.inner.weighted_size()
82    }
83
84    /// Number of entries in the cache.
85    pub(crate) fn entry_count(&self) -> u64 {
86        self.inner.entry_count()
87    }
88
89    /// Update spent TXO Assets in Cache.
90    pub(crate) fn update(
91        &self,
92        params: Vec<UpdateTxoSpentQueryParams>,
93    ) {
94        // Exit if cache is not enabled to avoid processing params.
95        if !self.inner.is_enabled() {
96            return;
97        }
98        for txo_update in params {
99            let stake_address = &txo_update.stake_address;
100            let update_key = &GetTxoByStakeAddressQueryKey {
101                txn_index: txo_update.txn_index,
102                txo: txo_update.txo,
103                slot_no: txo_update.slot_no,
104            };
105            let Some(txo_rows) = self.inner.get(stake_address) else {
106                debug!(
107                    %stake_address,
108                    txn_index = i16::from(txo_update.txn_index),
109                    txo = i16::from(txo_update.txo),
110                    slot_no = u64::from(txo_update.slot_no),
111                    "Stake Address not found in TXO Assets Cache, cannot update.");
112                continue;
113            };
114            let Some(txo) = txo_rows.iter().find(|tx| tx.key.as_ref() == update_key) else {
115                debug!(
116                    %stake_address,
117                    txn_index = i16::from(update_key.txn_index),
118                    txo = i16::from(update_key.txo),
119                    slot_no = u64::from(update_key.slot_no),
120                    "TXO for Stake Address does not exist, cannot update.");
121                continue;
122            };
123            // Avoid writing if txo has already been spent,
124            if txo.is_spent() {
125                debug!(
126                    %stake_address,
127                    txn_index = i16::from(update_key.txn_index),
128                    txo = i16::from(update_key.txo),
129                    slot_no = u64::from(update_key.slot_no),
130                    "TXO for Stake Address was already spent!");
131                continue;
132            }
133
134            let mut value = txo.value.write().unwrap_or_else(|error| {
135                error!(
136                    %stake_address,
137                    txn_index = i16::from(update_key.txn_index),
138                    txo = i16::from(update_key.txo),
139                    slot_no = u64::from(update_key.slot_no),
140                    %error,
141                    "RwLock for cache entry is poisoned, recovering.");
142                txo.value.clear_poison();
143                error.into_inner()
144            });
145            // update spent_slot in cache
146            value.spent_slot.replace(txo_update.spent_slot);
147            debug!(
148                %stake_address,
149                txn_index = i16::from(update_key.txn_index),
150                txo = i16::from(update_key.txo),
151                slot_no = u64::from(update_key.slot_no),
152                "Updated TXO for Stake Address");
153        }
154    }
155}