cat_gateway/db/index/schema/
mod.rs1use std::sync::Arc;
4
5use anyhow::Context;
6use cardano_chain_follower::Network;
7use handlebars::Handlebars;
8use scylla::client::session::Session;
9use serde_json::json;
10use tracing::error;
11
12use crate::{settings::cassandra_db, utils::blake2b_hash::generate_uuid_string_from_data};
13
14const CREATE_NAMESPACE_CQL: &str = include_str!("./cql/namespace.cql");
16
17const SCHEMAS: &[(&str, &str)] = &[
19 (
20 include_str!("./cql/sync_status.cql"),
22 "Create Sync Status Table",
23 ),
24 (
25 include_str!("./cql/txo_by_stake_table.cql"),
27 "Create Table TXO By Stake Address",
28 ),
29 (
30 include_str!("./cql/txo_assets_by_stake_table.cql"),
32 "Create Table TXO Assets By Stake Address",
33 ),
34 (
35 include_str!("cql/unstaked_txo_by_txn_id.cql"),
37 "Create Table Unstaked TXO By Txn Hash",
38 ),
39 (
40 include_str!("cql/unstaked_txo_assets_by_txn_id.cql"),
42 "Create Table Unstaked TXO Assets By Txn Hash",
43 ),
44 (
45 include_str!("cql/txi_by_txn_id_table.cql"),
47 "Create Table TXI By Stake Address",
48 ),
49 (
50 include_str!("cql/stake_registration.cql"),
52 "Create Table Stake Registration",
53 ),
54 (
55 include_str!("cql/cip36_registration.cql"),
57 "Create Table CIP-36 Registration",
58 ),
59 (
60 include_str!("cql/cip36_registration_invalid.cql"),
62 "Create Table CIP-36 Registration Invalid",
63 ),
64 (
65 include_str!("cql/cip36_registration_for_vote_key.cql"),
67 "Create Table CIP-36 Registration For a stake address",
68 ),
69 (
70 include_str!("cql/rbac_registration.cql"),
72 "Create Table RBAC Registration",
73 ),
74 (
75 include_str!("cql/rbac_invalid_registration.cql"),
77 "Create Table Invalid RBAC Registration",
78 ),
79 (
80 include_str!("cql/catalyst_id_for_txn_id.cql"),
82 "Create table Catalyst ID for transaction ID",
83 ),
84 (
85 include_str!("cql/catalyst_id_for_stake_address.cql"),
87 "Create table Catalyst ID for stake address",
88 ),
89 (
90 include_str!("cql/catalyst_id_for_public_key.cql"),
92 "Create table Catalyst ID for public key",
93 ),
94];
95
96fn remove_comments_and_join_query_lines(text: &str) -> String {
109 let raw_lines: Vec<&str> = text.lines().collect();
111 let mut clean_lines: Vec<String> = Vec::new();
112
113 for line in raw_lines {
115 let mut clean_line = line.to_string();
116 if let Some(no_comment) = line.split_once("--") {
117 clean_line = no_comment.0.to_string();
118 }
119 clean_line = clean_line
120 .split_whitespace()
121 .collect::<Vec<&str>>()
122 .join(" ")
123 .trim()
124 .to_string();
125 if !clean_line.is_empty() {
126 clean_lines.push(clean_line);
127 }
128 }
129 clean_lines.join("\n")
130}
131
132fn generate_cql_schema_version() -> String {
145 let mut clean_schemas: Vec<String> = Vec::new();
147
148 for (schema, _) in SCHEMAS {
150 let schema = remove_comments_and_join_query_lines(schema);
151 if !schema.is_empty() {
152 clean_schemas.push(schema);
153 }
154 }
155
156 clean_schemas.sort();
159
160 generate_uuid_string_from_data("Catalyst-Gateway Index Database Schema", &clean_schemas)
163}
164
165pub(crate) fn namespace(
167 persistent: bool,
168 network: Network,
169) -> String {
170 let namespace = if persistent { "p" } else { "v" };
172 format!(
173 "{namespace}_{network}_{}",
174 generate_cql_schema_version().replace('-', "_")
175 )
176}
177
178async fn create_namespace(
181 session: &mut Arc<Session>,
182 cfg: &cassandra_db::EnvVars,
183 persistent: bool,
184 network: Network,
185) -> anyhow::Result<()> {
186 let keyspace = namespace(persistent, network);
187
188 let mut reg = Handlebars::new();
189 reg.register_escape_fn(|s| s.into());
192 let query = reg
193 .render_template(
194 CREATE_NAMESPACE_CQL,
195 &json!({"keyspace": keyspace,"options": cfg.deployment.clone().to_string()}),
196 )
197 .context(format!("Keyspace: {keyspace}"))?;
198
199 let stmt = session
201 .prepare(query)
202 .await
203 .context(format!("Keyspace: {keyspace}"))?;
204 session
205 .execute_unpaged(&stmt, ())
206 .await
207 .context(format!("Keyspace: {keyspace}"))?;
208
209 session.await_schema_agreement().await?;
211
212 if let Err(error) = session.use_keyspace(keyspace.clone(), false).await {
214 error!(keyspace = keyspace, error = %error, "Failed to set keyspace");
215 }
216
217 Ok(())
218}
219
220pub(crate) async fn create_schema(
222 session: &mut Arc<Session>,
223 cfg: &cassandra_db::EnvVars,
224 persistent: bool,
225 network: Network,
226) -> anyhow::Result<()> {
227 create_namespace(session, cfg, persistent, network)
228 .await
229 .context("Creating Namespace")?;
230
231 let mut errors = Vec::with_capacity(SCHEMAS.len());
232
233 for (schema, schema_name) in SCHEMAS {
234 match session.prepare(*schema).await {
235 Ok(stmt) => {
236 if let Err(err) = session.execute_unpaged(&stmt, ()).await {
237 error!(schema=schema_name, error=%err, "Failed to Execute Create Schema Query");
238 errors.push(anyhow::anyhow!(
239 "Failed to Execute Create Schema Query: {err}\n--\nSchema: {schema_name}\n--\n{schema}"
240 ));
241 };
242 },
243 Err(err) => {
244 error!(schema=schema_name, error=%err, "Failed to Prepare Create Schema Query");
245 errors.push(anyhow::anyhow!(
246 "Failed to Prepare Create Schema Query: {err}\n--\nSchema: {schema_name}\n--\n{schema}"
247 ));
248 },
249 }
250 }
251
252 if !errors.is_empty() {
253 let fmt_err: Vec<_> = errors.into_iter().map(|err| format!("{err}")).collect();
254 return Err(anyhow::anyhow!(format!(
255 "{} Error(s): {}",
256 fmt_err.len(),
257 fmt_err.join("\n")
258 )));
259 }
260
261 session.await_schema_agreement().await?;
263
264 Ok(())
265}
266
267#[cfg(test)]
268mod tests {
269 use super::*;
270
271 const SCHEMA_VERSION: &str = "69e28bc1-be89-8407-83dc-9c4cc408d3a9";
278
279 #[test]
280 fn check_schema_version_has_not_changed() {
285 let calculated_version = generate_cql_schema_version();
286 assert_eq!(SCHEMA_VERSION, calculated_version);
287 }
288
289 #[test]
290 fn test_namespace_persistent() {
291 let network = Network::Preprod;
292 let persistent = true;
293 let namespace = namespace(persistent, network);
294 let schema_version = generate_cql_schema_version().replace('-', "_");
295 let expected = format!("p_{network}_{schema_version}");
296 assert_eq!(namespace, expected);
297 }
298
299 #[test]
300 fn test_namespace_volatile() {
301 let network = Network::Preprod;
302 let persistent = false;
303 let namespace = namespace(persistent, network);
304 let schema_version = generate_cql_schema_version().replace('-', "_");
305 let expected = format!("v_{network}_{schema_version}");
306 assert_eq!(namespace, expected);
307 }
308
309 #[test]
310 fn test_no_comments() {
311 let input = "SELECT * FROM table1;";
312 let expected_output = "SELECT * FROM table1;";
313 assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
314 }
315
316 #[test]
317 fn test_single_line_comment() {
318 let input = "SELECT -- some comment * FROM table1;";
319 let expected_output = "SELECT";
320 assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
321 }
322
323 #[test]
324 fn test_multi_line_comment() {
325 let input = "SELECT -- some comment\n* FROM table1;";
326 let expected_output = "SELECT\n* FROM table1;";
327 assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
328 }
329
330 #[test]
331 fn test_multiple_lines() {
332 let input = "SELECT * FROM table1;\n-- another comment\nSELECT * FROM table2;";
333 let expected_output = "SELECT * FROM table1;\nSELECT * FROM table2;";
334 assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
335 }
336
337 #[test]
338 fn test_empty_lines() {
339 let input = "\n\nSELECT * FROM table1;\n-- comment here\n\n";
340 let expected_output = "SELECT * FROM table1;";
341 assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
342 }
343
344 #[test]
345 fn test_whitespace_only() {
346 let input = " \n -- comment here\n ";
347 let expected_output = "";
348 assert_eq!(remove_comments_and_join_query_lines(input), expected_output);
349 }
350}