cat_gateway/rbac/
state.rs

1//! An implementation of the `rbac_registration::cardano::state::RbacChainsState` trait
2
3use std::collections::{HashMap, HashSet};
4
5use anyhow::Context;
6use cardano_chain_follower::{StakeAddress, hashes::TransactionId};
7use catalyst_types::catalyst_id::CatalystId;
8use ed25519_dalek::VerifyingKey;
9use futures::StreamExt;
10
11use crate::{
12    db::index::session::CassandraSession,
13    rbac::{
14        RbacBlockIndexingContext,
15        chains_cache::cached_persistent_rbac_chain,
16        get_chain::{apply_regs, build_rbac_chain, persistent_rbac_chain},
17        latest_rbac_chain,
18    },
19};
20
21/// A helper struct to handle RBAC related state from the DB and caches.
22/// Implements `rbac_registration::cardano::state::RbacChainsState` trait.
23pub(crate) struct RbacChainsState<'a> {
24    /// `index-db` corresponding flag
25    is_persistent: bool,
26    /// `RbacBlockIndexingContext` reference
27    context: &'a RbacBlockIndexingContext,
28    /// Recorded modified registration chains by the `take_stake_address_from_chains`
29    /// method. During the `take_stake_address_from_chains` execution nothing is
30    /// written into the `index-db`, all these data would be written to the DB in a
31    /// batch. To consume this field call `consume` method
32    modified_chains: HashMap<CatalystId, HashSet<StakeAddress>>,
33}
34
35impl<'a> RbacChainsState<'a> {
36    /// Creates a new instance of `RbacChainsState`
37    pub fn new(
38        is_persistent: bool,
39        context: &'a RbacBlockIndexingContext,
40    ) -> Self {
41        Self {
42            is_persistent,
43            context,
44            modified_chains: HashMap::new(),
45        }
46    }
47
48    /// Consumes `RbacChainsState` instance and returns recorded `modified_chains` field.
49    pub fn consume(self) -> HashMap<CatalystId, HashSet<StakeAddress>> {
50        self.modified_chains
51    }
52
53    /// Returns a Catalyst ID corresponding to the given transaction hash.
54    pub async fn catalyst_id_from_txn_id(
55        &self,
56        txn_id: TransactionId,
57    ) -> anyhow::Result<Option<CatalystId>> {
58        use crate::db::index::queries::rbac::get_catalyst_id_from_transaction_id::Query;
59
60        // Check the context first.
61        if let Some(catalyst_id) = self.context.find_transaction(&txn_id) {
62            return Ok(Some(catalyst_id.to_owned()));
63        }
64
65        // Then try to find in the persistent database.
66        let session =
67            CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
68        if let Some(id) = Query::get(&session, txn_id).await? {
69            return Ok(Some(id));
70        }
71
72        // Conditionally check the volatile database.
73        if !self.is_persistent {
74            let session =
75                CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
76            return Query::get(&session, txn_id).await;
77        }
78
79        Ok(None)
80    }
81}
82
83impl rbac_registration::cardano::state::RbacChainsState for RbacChainsState<'_> {
84    async fn chain(
85        &self,
86        id: &CatalystId,
87    ) -> anyhow::Result<Option<rbac_registration::registration::cardano::RegistrationChain>> {
88        let chain = if self.is_persistent {
89            persistent_rbac_chain(id).await?
90        } else {
91            latest_rbac_chain(id).await?.map(|i| i.chain)
92        };
93
94        // Apply additional registrations from context if any.
95        if let Some(regs) = self.context.find_registrations(id) {
96            let regs = regs.iter().cloned();
97            match chain {
98                Some(c) => return apply_regs(c, regs).await.map(Some),
99                None => return build_rbac_chain(regs).await,
100            }
101        }
102
103        Ok(chain)
104    }
105
106    async fn is_chain_known(
107        &self,
108        id: &CatalystId,
109    ) -> anyhow::Result<bool> {
110        if self.context.find_registrations(id).is_some() {
111            return Ok(true);
112        }
113
114        let session =
115            CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
116
117        // We only cache persistent chains, so it is ok to check the cache regardless of the
118        // `is_persistent` parameter value.
119        if cached_persistent_rbac_chain(&session, id).is_some() {
120            return Ok(true);
121        }
122
123        if is_cat_id_known(&session, id).await? {
124            return Ok(true);
125        }
126
127        // Conditionally check the volatile database.
128        if !self.is_persistent {
129            let session =
130                CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
131            if is_cat_id_known(&session, id).await? {
132                return Ok(true);
133            }
134        }
135
136        Ok(false)
137    }
138
139    async fn is_stake_address_used(
140        &self,
141        address: &StakeAddress,
142    ) -> anyhow::Result<bool> {
143        catalyst_id_from_stake_address(address, self.is_persistent, self.context)
144            .await
145            .map(|v| v.is_some())
146    }
147
148    async fn chain_catalyst_id_from_signing_public_key(
149        &self,
150        key: &VerifyingKey,
151    ) -> anyhow::Result<Option<CatalystId>> {
152        use crate::db::index::queries::rbac::get_catalyst_id_from_public_key::Query;
153
154        // Check the context first.
155        if let Some(catalyst_id) = self.context.find_public_key(key) {
156            return Ok(Some(catalyst_id.to_owned()));
157        }
158
159        // Then try to find in the persistent database.
160        let session =
161            CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
162        if let Some(id) = Query::get(&session, *key).await? {
163            return Ok(Some(id));
164        }
165
166        // Conditionally check the volatile database.
167        if !self.is_persistent {
168            let session =
169                CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
170            return Query::get(&session, *key).await;
171        }
172
173        Ok(None)
174    }
175
176    async fn take_stake_address_from_chains<I>(
177        &mut self,
178        addresses: I,
179    ) -> anyhow::Result<()>
180    where
181        I: IntoIterator<Item = StakeAddress> + Send,
182        <I as IntoIterator>::IntoIter: Send,
183    {
184        for addr in addresses {
185            if let Some(cat_id) =
186                catalyst_id_from_stake_address(&addr, self.is_persistent, self.context).await?
187            {
188                self.modified_chains
189                    .entry(cat_id)
190                    .and_modify(|e| {
191                        e.insert(addr.clone());
192                    })
193                    .or_insert([addr].into_iter().collect());
194            }
195        }
196
197        Ok(())
198    }
199}
200
201/// Returns a Catalyst ID corresponding to the given stake address.
202async fn catalyst_id_from_stake_address(
203    address: &StakeAddress,
204    is_persistent: bool,
205    context: &RbacBlockIndexingContext,
206) -> anyhow::Result<Option<CatalystId>> {
207    use crate::db::index::queries::rbac::get_catalyst_id_from_stake_address::Query;
208
209    // Check the context first.
210    if let Some(catalyst_id) = context.find_address(address) {
211        return Ok(Some(catalyst_id.to_owned()));
212    }
213
214    // Then try to find in the persistent database.
215    let session =
216        CassandraSession::get(true).context("Failed to get Cassandra persistent session")?;
217    if let Some(id) = Query::latest(&session, address).await? {
218        return Ok(Some(id));
219    }
220
221    // Conditionally check the volatile database.
222    if !is_persistent {
223        let session =
224            CassandraSession::get(false).context("Failed to get Cassandra volatile session")?;
225        return Query::latest(&session, address).await;
226    }
227
228    Ok(None)
229}
230
231/// Returns `true` if there is at least one registration with the given Catalyst ID.
232async fn is_cat_id_known(
233    session: &CassandraSession,
234    id: &CatalystId,
235) -> anyhow::Result<bool> {
236    use crate::db::index::queries::rbac::get_rbac_registrations::{Query, QueryParams};
237
238    Ok(Query::execute(session, QueryParams {
239        catalyst_id: id.clone().into(),
240    })
241    .await?
242    .next()
243    .await
244    .is_some())
245}