cat_gateway/db/event/
mod.rs

1//! Catalyst Election Database crate
2use std::{
3    str::FromStr,
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        Arc, OnceLock,
7    },
8};
9
10use bb8::{Pool, PooledConnection};
11use bb8_postgres::PostgresConnectionManager;
12use error::NotFoundError;
13use futures::{Stream, StreamExt, TryStreamExt};
14use tokio_postgres::{types::ToSql, NoTls, Row};
15use tracing::{debug, debug_span, error, Instrument};
16
17use crate::{
18    service::utilities::health::{event_db_is_live, set_event_db_liveness},
19    settings::Settings,
20};
21
22pub(crate) mod common;
23pub(crate) mod config;
24pub(crate) mod error;
25pub(crate) mod schema_check;
26pub(crate) mod signed_docs;
27
28/// Database version this crate matches.
29/// Must equal the last Migrations Version Number from `event-db/migrations`.
30pub(crate) const DATABASE_SCHEMA_VERSION: i32 = 4;
31
32/// Postgres Connection Manager DB Pool
33type SqlDbPool = Arc<Pool<PostgresConnectionManager<NoTls>>>;
34
35/// Postgres Connection Manager DB Pool Instance
36static EVENT_DB_POOL: OnceLock<SqlDbPool> = OnceLock::new();
37
38/// Is Deep Query Analysis enabled or not?
39static DEEP_QUERY_INSPECT: AtomicBool = AtomicBool::new(false);
40
41/// The Catalyst Event SQL Database
42pub(crate) struct EventDB {}
43
44/// `EventDB` Errors
45#[derive(thiserror::Error, Debug, PartialEq, Eq)]
46pub(crate) enum EventDBConnectionError {
47    /// Failed to get a DB Pool
48    #[error("DB Pool uninitialized")]
49    DbPoolUninitialized,
50    /// Failed to get a DB Pool Connection
51    #[error("DB Pool connection is unavailable")]
52    PoolConnectionUnavailable,
53}
54
55impl EventDB {
56    /// Get a connection from the pool.
57    async fn get_pool_connection<'a>(
58    ) -> Result<PooledConnection<'a, PostgresConnectionManager<NoTls>>, EventDBConnectionError>
59    {
60        let pool = EVENT_DB_POOL
61            .get()
62            .ok_or(EventDBConnectionError::DbPoolUninitialized)?;
63        pool.get()
64            .await
65            .map_err(|_| EventDBConnectionError::PoolConnectionUnavailable)
66    }
67
68    /// Determine if deep query inspection is enabled.
69    pub(crate) fn is_deep_query_enabled() -> bool {
70        DEEP_QUERY_INSPECT.load(Ordering::SeqCst)
71    }
72
73    /// Modify the deep query inspection setting.
74    ///
75    /// # Arguments
76    ///
77    /// * `enable` - Set the `DeepQueryInspection` setting to this value.
78    pub(crate) fn modify_deep_query(enable: bool) {
79        DEEP_QUERY_INSPECT.store(enable, Ordering::SeqCst);
80    }
81
82    /// Query the database.
83    ///
84    /// If deep query inspection is enabled, this will log the query plan inside a
85    /// rolled-back transaction, before running the query.
86    ///
87    /// # Arguments
88    ///
89    /// * `stmt` - `&str` SQL statement.
90    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
91    ///
92    /// # Returns
93    ///
94    /// `anyhow::Result<Vec<Row>>`
95    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
96    pub(crate) async fn query(
97        stmt: &str, params: &[&(dyn ToSql + Sync)],
98    ) -> anyhow::Result<Vec<Row>> {
99        if Self::is_deep_query_enabled() {
100            Self::explain_analyze_rollback(stmt, params).await?;
101        }
102        let conn = Self::get_pool_connection().await?;
103        let rows = conn.query(stmt, params).await?;
104        Ok(rows)
105    }
106
107    /// Query the database and return a async stream of rows.
108    ///
109    /// If deep query inspection is enabled, this will log the query plan inside a
110    /// rolled-back transaction, before running the query.
111    ///
112    /// # Arguments
113    ///
114    /// * `stmt` - `&str` SQL statement.
115    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
116    ///
117    /// # Returns
118    ///
119    /// `anyhow::Result<impl Stream<Item = anyhow::Result<Row>>>`
120    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
121    pub(crate) async fn query_stream(
122        stmt: &str, params: &[&(dyn ToSql + Sync)],
123    ) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>>> {
124        if Self::is_deep_query_enabled() {
125            Self::explain_analyze_rollback(stmt, params).await?;
126        }
127        let conn = Self::get_pool_connection().await?;
128        let rows = conn.query_raw(stmt, params.iter().copied()).await?;
129        Ok(rows.map_err(Into::into).boxed())
130    }
131
132    /// Query the database for a single row.
133    ///
134    /// # Arguments
135    ///
136    /// * `stmt` - `&str` SQL statement.
137    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
138    ///
139    /// # Returns
140    ///
141    /// `Result<Row, anyhow::Error>`
142    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
143    pub(crate) async fn query_one(
144        stmt: &str, params: &[&(dyn ToSql + Sync)],
145    ) -> anyhow::Result<Row> {
146        if Self::is_deep_query_enabled() {
147            Self::explain_analyze_rollback(stmt, params).await?;
148        }
149        let conn = Self::get_pool_connection().await?;
150        let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
151        Ok(row)
152    }
153
154    /// Modify the database.
155    ///
156    /// Use this for `UPDATE`, `DELETE`, and other DB statements that
157    /// don't return data.
158    ///
159    /// # Arguments
160    ///
161    /// * `stmt` - `&str` SQL statement.
162    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
163    ///
164    /// # Returns
165    ///
166    /// `anyhow::Result<()>`
167    pub(crate) async fn modify(stmt: &str, params: &[&(dyn ToSql + Sync)]) -> anyhow::Result<()> {
168        if Self::is_deep_query_enabled() {
169            Self::explain_analyze_commit(stmt, params).await?;
170        } else {
171            let conn = Self::get_pool_connection().await?;
172            conn.query(stmt, params).await?;
173        }
174        Ok(())
175    }
176
177    /// Checks that connection to `EventDB` is available.
178    pub(crate) async fn connection_is_ok() -> bool {
179        let event_db_liveness = event_db_is_live();
180        Self::get_pool_connection()
181            .await
182            .inspect(|_| {
183                if !event_db_liveness {
184                    set_event_db_liveness(true);
185                }
186            })
187            .inspect_err(|_| {
188                if event_db_liveness {
189                    set_event_db_liveness(false);
190                }
191            })
192            .is_ok()
193    }
194
195    /// Prepend `EXPLAIN ANALYZE` to the query, and rollback the transaction.
196    async fn explain_analyze_rollback(
197        stmt: &str, params: &[&(dyn ToSql + Sync)],
198    ) -> anyhow::Result<()> {
199        Self::explain_analyze(stmt, params, true).await
200    }
201
202    /// Prepend `EXPLAIN ANALYZE` to the query, and commit the transaction.
203    async fn explain_analyze_commit(
204        stmt: &str, params: &[&(dyn ToSql + Sync)],
205    ) -> anyhow::Result<()> {
206        Self::explain_analyze(stmt, params, false).await
207    }
208
209    /// Prepend `EXPLAIN ANALYZE` to the query.
210    ///
211    /// Log the query plan inside a transaction that may be committed or rolled back.
212    ///
213    /// # Arguments
214    ///
215    /// * `stmt` - `&str` SQL statement.
216    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
217    /// * `rollback` - `bool` whether to roll back the transaction or not.
218    async fn explain_analyze(
219        stmt: &str, params: &[&(dyn ToSql + Sync)], rollback: bool,
220    ) -> anyhow::Result<()> {
221        let span = debug_span!(
222            "query_plan",
223            query_statement = stmt,
224            params = format!("{:?}", params),
225            uuid = uuid::Uuid::new_v4().to_string()
226        );
227
228        async move {
229            let mut conn = Self::get_pool_connection().await?;
230            let transaction = conn.transaction().await?;
231            let explain_stmt = transaction
232                .prepare(format!("EXPLAIN ANALYZE {stmt}").as_str())
233                .await?;
234            let rows = transaction.query(&explain_stmt, params).await?;
235            for r in rows {
236                let query_plan_str: String = r.get("QUERY PLAN");
237                debug!("{}", query_plan_str);
238            }
239            if rollback {
240                transaction.rollback().await?;
241            } else {
242                transaction.commit().await?;
243            }
244            Ok(())
245        }
246        .instrument(span)
247        .await
248    }
249}
250
251/// Establish a connection to the database, and check the schema is up-to-date.
252///
253/// # Parameters
254///
255/// * `url` set to the postgres connection string needed to connect to the database.  IF
256///   it is None, then the env var "`DATABASE_URL`" will be used for this connection
257///   string. eg: "`postgres://catalyst-dev:CHANGE_ME@localhost/CatalystDev`"
258/// * `do_schema_check` boolean flag to decide whether to verify the schema version or
259///   not. If it is `true`, a query is made to verify the DB schema version.
260///
261/// # Errors
262///
263/// This function will return an error if:
264/// * `url` is None and the environment variable "`DATABASE_URL`" isn't set.
265/// * There is any error communicating the the database to check its schema.
266/// * The database schema in the DB does not 100% match the schema supported by this
267///   library.
268///
269/// # Notes
270///
271/// The env var "`DATABASE_URL`" can be set directly as an anv var, or in a
272/// `.env` file.
273///
274/// If connection to the pool is `OK`, the `LIVE_EVENT_DB` atomic flag is set to `true`.
275pub async fn establish_connection_pool() {
276    let (url, user, pass, max_connections, max_lifetime, min_idle, connection_timeout) =
277        Settings::event_db_settings();
278    debug!("Establishing connection with Event DB pool");
279
280    // This was pre-validated and can't fail, but provide default in the impossible case it
281    // does.
282    let mut config = tokio_postgres::config::Config::from_str(url).unwrap_or_else(|_| {
283        error!(url = url, "Postgres URL Pre Validation has failed.");
284        tokio_postgres::config::Config::default()
285    });
286    if let Some(user) = user {
287        config.user(user);
288    }
289    if let Some(pass) = pass {
290        config.password(pass);
291    }
292
293    let pg_mgr = PostgresConnectionManager::new(config, tokio_postgres::NoTls);
294
295    match Pool::builder()
296        .max_size(max_connections)
297        .max_lifetime(Some(core::time::Duration::from_secs(max_lifetime.into())))
298        .min_idle(min_idle)
299        .connection_timeout(core::time::Duration::from_secs(connection_timeout.into()))
300        .build(pg_mgr)
301        .await
302    {
303        Ok(pool) => {
304            debug!("Event DB pool configured.");
305            if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
306                error!("Failed to set EVENT_DB_POOL. Already set?");
307            }
308        },
309        Err(err) => {
310            error!(error = %err, "Failed to establish connection with EventDB pool");
311        },
312    }
313}