cat_gateway/db/index/schema/
mod.rs

1//! Index Schema
2
3use std::sync::Arc;
4
5use anyhow::Context;
6use cardano_chain_follower::Network;
7use handlebars::Handlebars;
8use scylla::client::session::Session;
9use serde_json::json;
10use tracing::error;
11
12use crate::{settings::cassandra_db, utils::blake2b_hash::generate_uuid_string_from_data};
13
14/// Keyspace Create (Templated)
15const CREATE_NAMESPACE_CQL: &str = include_str!("./cql/namespace.cql");
16
17/// All Schema Creation Statements
18const SCHEMAS: &[(&str, &str)] = &[
19    (
20        // Sync Status Table Schema
21        include_str!("./cql/sync_status.cql"),
22        "Create Sync Status Table",
23    ),
24    (
25        // TXO by Stake Address Table Schema
26        include_str!("./cql/txo_by_stake_table.cql"),
27        "Create Table TXO By Stake Address",
28    ),
29    (
30        // TXO Assets by Stake Address Table Schema
31        include_str!("./cql/txo_assets_by_stake_table.cql"),
32        "Create Table TXO Assets By Stake Address",
33    ),
34    (
35        // TXO Unstaked Table Schema
36        include_str!("cql/unstaked_txo_by_txn_id.cql"),
37        "Create Table Unstaked TXO By Txn Hash",
38    ),
39    (
40        // TXO Unstaked Assets Table Schema
41        include_str!("cql/unstaked_txo_assets_by_txn_id.cql"),
42        "Create Table Unstaked TXO Assets By Txn Hash",
43    ),
44    (
45        // TXI by Stake Address table schema.
46        include_str!("cql/txi_by_txn_id_table.cql"),
47        "Create Table TXI By Stake Address",
48    ),
49    (
50        // Stake Address/Registration Table Schema
51        include_str!("cql/stake_registration.cql"),
52        "Create Table Stake Registration",
53    ),
54    (
55        // CIP-36 Registration Table Schema
56        include_str!("cql/cip36_registration.cql"),
57        "Create Table CIP-36 Registration",
58    ),
59    (
60        // CIP-36 invalid registration table schema.
61        include_str!("cql/cip36_registration_invalid.cql"),
62        "Create Table CIP-36 Registration Invalid",
63    ),
64    (
65        // CIP-36 registration for vote key table schema.
66        include_str!("cql/cip36_registration_for_vote_key.cql"),
67        "Create Table CIP-36 Registration For a stake address",
68    ),
69    (
70        // RBAC registration table schema.
71        include_str!("cql/rbac_registration.cql"),
72        "Create Table RBAC Registration",
73    ),
74    (
75        // RBAC invalid registration table schema.
76        include_str!("cql/rbac_invalid_registration.cql"),
77        "Create Table Invalid RBAC Registration",
78    ),
79    (
80        // Catalyst ID for transaction ID table schema.
81        include_str!("cql/catalyst_id_for_txn_id.cql"),
82        "Create table Catalyst ID for transaction ID",
83    ),
84    (
85        // Catalyst ID for stake address table schema.
86        include_str!("cql/catalyst_id_for_stake_address.cql"),
87        "Create table Catalyst ID for stake address",
88    ),
89    (
90        // Catalyst ID for public key table schema.
91        include_str!("cql/catalyst_id_for_public_key.cql"),
92        "Create table Catalyst ID for public key",
93    ),
94];
95
96/// Removes all comments from each line in the input query text and joins the remaining
97/// lines into a single string, reducing consecutive whitespace characters to a single
98/// space. Comments are defined as any text following `--` on a line.
99///
100/// # Arguments
101///
102/// * `text`: A string slice that holds the query to be cleaned.
103///
104/// # Returns
105///
106/// A new string with comments removed and whitespace reduced, where each remaining line
107/// from the original text is separated by a newline character.
108fn remove_comments_and_join_query_lines(text: &str) -> String {
109    // Split the input text into lines, removing any trailing empty lines
110    let raw_lines: Vec<&str> = text.lines().collect();
111    let mut clean_lines: Vec<String> = Vec::new();
112
113    // Filter out comments from each line
114    for line in raw_lines {
115        let mut clean_line = line.to_string();
116        if let Some(no_comment) = line.split_once("--") {
117            clean_line = no_comment.0.to_string();
118        }
119        clean_line = clean_line
120            .split_whitespace()
121            .collect::<Vec<&str>>()
122            .join(" ")
123            .trim()
124            .to_string();
125        if !clean_line.is_empty() {
126            clean_lines.push(clean_line);
127        }
128    }
129    clean_lines.join("\n")
130}
131
132/// Generates a unique schema version identifier based on the content of all CQL schemas.
133///
134/// This function processes each CQL schema, removes comments from its lines and joins
135/// them into a single string. It then sorts these processed strings to ensure consistency
136/// in schema versions regardless of their order in the list. Finally, it generates a UUID
137/// from a 127 bit hash of this sorted collection of schema contents, which serves as a
138/// unique identifier for the current version of all schemas.
139///
140/// # Returns
141///
142/// A string representing the UUID derived from the concatenated and cleaned CQL
143/// schema contents.
144fn generate_cql_schema_version() -> String {
145    // Where we will actually store the bytes we derive the UUID from.
146    let mut clean_schemas: Vec<String> = Vec::new();
147
148    // Iterate through each CQL schema and add it to the list of clean schemas documents.
149    for (schema, _) in SCHEMAS {
150        let schema = remove_comments_and_join_query_lines(schema);
151        if !schema.is_empty() {
152            clean_schemas.push(schema);
153        }
154    }
155
156    // make sure any re-ordering of the schemas in the list does not effect the generated
157    // schema version
158    clean_schemas.sort();
159
160    // Generate a unique hash of the clean schemas,
161    // and use it to form a UUID to identify the schema version.
162    generate_uuid_string_from_data("Catalyst-Gateway Index Database Schema", &clean_schemas)
163}
164
165/// Get the namespace for a particular db configuration
166pub(crate) fn namespace(
167    persistent: bool,
168    network: Network,
169) -> String {
170    // Build and set the Keyspace to use.
171    let namespace = if persistent { "p" } else { "v" };
172    format!(
173        "{namespace}_{network}_{}",
174        generate_cql_schema_version().replace('-', "_")
175    )
176}
177
178/// Create the namespace we will use for this session
179/// Ok to run this if the namespace already exists.
180async fn create_namespace(
181    session: &mut Arc<Session>,
182    cfg: &cassandra_db::EnvVars,
183    persistent: bool,
184    network: Network,
185) -> anyhow::Result<()> {
186    let keyspace = namespace(persistent, network);
187
188    let mut reg = Handlebars::new();
189    // disable default `html_escape` function
190    // which transforms `<`, `>` symbols to `&lt`, `&gt`
191    reg.register_escape_fn(|s| s.into());
192    let query = reg
193        .render_template(
194            CREATE_NAMESPACE_CQL,
195            &json!({"keyspace": keyspace,"options": cfg.deployment.clone().to_string()}),
196        )
197        .context(format!("Keyspace: {keyspace}"))?;
198
199    // Create the Keyspace if it doesn't exist already.
200    let stmt = session
201        .prepare(query)
202        .await
203        .context(format!("Keyspace: {keyspace}"))?;
204    session
205        .execute_unpaged(&stmt, ())
206        .await
207        .context(format!("Keyspace: {keyspace}"))?;
208
209    // Wait for the Schema to be ready.
210    session.await_schema_agreement().await?;
211
212    // Set the Keyspace to use for this session.
213    if let Err(error) = session.use_keyspace(keyspace.clone(), false).await {
214        error!(keyspace = keyspace, error = %error, "Failed to set keyspace");
215    }
216
217    Ok(())
218}
219
220/// Create the Schema on the connected Cassandra DB
221pub(crate) async fn create_schema(
222    session: &mut Arc<Session>,
223    cfg: &cassandra_db::EnvVars,
224    persistent: bool,
225    network: Network,
226) -> anyhow::Result<()> {
227    create_namespace(session, cfg, persistent, network)
228        .await
229        .context("Creating Namespace")?;
230
231    let mut errors = Vec::with_capacity(SCHEMAS.len());
232
233    for (schema, schema_name) in SCHEMAS {
234        match session.prepare(*schema).await {
235            Ok(stmt) => {
236                if let Err(err) = session.execute_unpaged(&stmt, ()).await {
237                    error!(schema=schema_name, error=%err, "Failed to Execute Create Schema Query");
238                    errors.push(anyhow::anyhow!(
239                        "Failed to Execute Create Schema Query: {err}\n--\nSchema: {schema_name}\n--\n{schema}"
240                    ));
241                };
242            },
243            Err(err) => {
244                error!(schema=schema_name, error=%err, "Failed to Prepare Create Schema Query");
245                errors.push(anyhow::anyhow!(
246                    "Failed to Prepare Create Schema Query: {err}\n--\nSchema: {schema_name}\n--\n{schema}"
247                ));
248            },
249        }
250    }
251
252    if !errors.is_empty() {
253        let fmt_err: Vec<_> = errors.into_iter().map(|err| format!("{err}")).collect();
254        return Err(anyhow::anyhow!(format!(
255            "{} Error(s): {}",
256            fmt_err.len(),
257            fmt_err.join("\n")
258        )));
259    }
260
261    // Wait for the Schema to be ready.
262    session.await_schema_agreement().await?;
263
264    Ok(())
265}
266
267#[cfg(test)]
268mod tests {
269    use super::*;
270
271    /// The version of the Index DB Schema we SHOULD BE using.
272    /// DO NOT change this unless you are intentionally changing the Schema.
273    ///
274    /// This constant is ONLY used by Unit tests to identify when the schema version will
275    /// change accidentally, and is NOT to be used directly to set the schema version of
276    /// the table namespaces.
277    const SCHEMA_VERSION: &str = "69e28bc1-be89-8407-83dc-9c4cc408d3a9";
278
279    #[test]
280    /// This test is designed to fail if the schema version has changed.
281    /// It is used to help detect inadvertent schema version changes.
282    /// If you did NOT intend to change the index db schema and this test fails,
283    /// then revert or fix your changes to the schema files.
284    fn check_schema_version_has_not_changed() {
285        let calculated_version = generate_cql_schema_version();
286        assert_eq!(SCHEMA_VERSION, calculated_version);
287    }
288
289    #[test]
290    fn test_namespace_persistent() {
291        let network = Network::Preprod;
292        let persistent = true;
293        let namespace = namespace(persistent, network);
294        let schema_version = generate_cql_schema_version().replace('-', "_");
295        let expected = format!("p_{network}_{schema_version}");
296        assert_eq!(namespace, expected);
297    }
298
299    #[test]
300    fn test_namespace_volatile() {
301        let network = Network::Preprod;
302        let persistent = false;
303        let namespace = namespace(persistent, network);
304        let schema_version = generate_cql_schema_version().replace('-', "_");
305        let expected = format!("v_{network}_{schema_version}");
306        assert_eq!(namespace, expected);
307    }
308
309    #[test]
310    fn test_no_comments() {
311        let input = "SELECT * FROM table1;";
312        let expected_output = "SELECT * FROM table1;";
313        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
314    }
315
316    #[test]
317    fn test_single_line_comment() {
318        let input = "SELECT -- some comment * FROM table1;";
319        let expected_output = "SELECT";
320        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
321    }
322
323    #[test]
324    fn test_multi_line_comment() {
325        let input = "SELECT -- some comment\n* FROM table1;";
326        let expected_output = "SELECT\n* FROM table1;";
327        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
328    }
329
330    #[test]
331    fn test_multiple_lines() {
332        let input = "SELECT * FROM table1;\n-- another comment\nSELECT * FROM table2;";
333        let expected_output = "SELECT * FROM table1;\nSELECT * FROM table2;";
334        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
335    }
336
337    #[test]
338    fn test_empty_lines() {
339        let input = "\n\nSELECT * FROM table1;\n-- comment here\n\n";
340        let expected_output = "SELECT * FROM table1;";
341        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
342    }
343
344    #[test]
345    fn test_whitespace_only() {
346        let input = "   \n  -- comment here\n   ";
347        let expected_output = "";
348        assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
349    }
350}