cat_gateway/db/event/
mod.rs

1//! Catalyst Election Database crate
2use std::{
3    str::FromStr,
4    sync::{
5        Arc, LazyLock, Mutex, OnceLock,
6        atomic::{AtomicBool, Ordering},
7    },
8    time::Duration,
9};
10
11use error::NotFoundError;
12use futures::{Stream, StreamExt, TryStreamExt};
13use tokio::task::JoinHandle;
14use tokio_postgres::{Row, types::ToSql};
15use tracing::{Instrument, debug, debug_span, error, info};
16
17use crate::{
18    service::utilities::health::{event_db_is_live, set_event_db_liveness},
19    settings::Settings,
20};
21
22pub(crate) mod common;
23pub(crate) mod config;
24pub(crate) mod error;
25pub(crate) mod schema_check;
26pub(crate) mod signed_docs;
27
28/// Database version this crate matches.
29/// Must equal the last Migrations Version Number from `event-db/migrations`.
30pub(crate) const DATABASE_SCHEMA_VERSION: i32 = 3;
31
32/// Postgres Connection Manager DB Pool
33type SqlDbPool = Arc<deadpool::managed::Pool<deadpool_postgres::Manager>>;
34
35/// Postgres Connection Manager DB Pool Instance
36static EVENT_DB_POOL: OnceLock<SqlDbPool> = OnceLock::new();
37
38/// Background Event DB probe check
39static EVENT_DB_PROBE_TASK: LazyLock<Mutex<Option<JoinHandle<()>>>> =
40    LazyLock::new(|| Mutex::new(None));
41
42/// Is Deep Query Analysis enabled or not?
43static DEEP_QUERY_INSPECT: AtomicBool = AtomicBool::new(false);
44
45/// The Catalyst Event SQL Database
46pub(crate) struct EventDB {}
47
48/// `EventDB` Errors
49#[derive(thiserror::Error, Debug, PartialEq, Eq)]
50pub(crate) enum EventDBConnectionError {
51    /// Failed to get a DB Pool
52    #[error("DB Pool uninitialized")]
53    DbPoolUninitialized,
54    /// Failed to get a DB Pool Connection
55    #[error("DB Pool connection is unavailable")]
56    PoolConnectionUnavailable,
57}
58
59impl EventDB {
60    /// Get a connection from the pool.
61    async fn get_pool_connection()
62    -> Result<deadpool::managed::Object<deadpool_postgres::Manager>, EventDBConnectionError> {
63        let pool = EVENT_DB_POOL
64            .get()
65            .ok_or(EventDBConnectionError::DbPoolUninitialized)?;
66        let res = pool
67            .get()
68            .await
69            .map_err(|_| EventDBConnectionError::PoolConnectionUnavailable)?;
70        Ok(res)
71    }
72
73    /// Determine if deep query inspection is enabled.
74    pub(crate) fn is_deep_query_enabled() -> bool {
75        DEEP_QUERY_INSPECT.load(Ordering::SeqCst)
76    }
77
78    /// Modify the deep query inspection setting.
79    ///
80    /// # Arguments
81    ///
82    /// * `enable` - Set the `DeepQueryInspection` setting to this value.
83    pub(crate) fn modify_deep_query(enable: bool) {
84        DEEP_QUERY_INSPECT.store(enable, Ordering::SeqCst);
85    }
86
87    /// Query the database.
88    ///
89    /// If deep query inspection is enabled, this will log the query plan inside a
90    /// rolled-back transaction, before running the query.
91    ///
92    /// # Arguments
93    ///
94    /// * `stmt` - `&str` SQL statement.
95    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
96    ///
97    /// # Returns
98    ///
99    /// `anyhow::Result<Vec<Row>>`
100    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
101    pub(crate) async fn query(
102        stmt: &str,
103        params: &[&(dyn ToSql + Sync)],
104    ) -> anyhow::Result<Vec<Row>> {
105        if Self::is_deep_query_enabled() {
106            Self::explain_analyze_rollback(stmt, params).await?;
107        }
108        let conn = Self::get_pool_connection().await?;
109        let rows = conn.query(stmt, params).await?;
110        Ok(rows)
111    }
112
113    /// Query the database and return a async stream of rows.
114    ///
115    /// If deep query inspection is enabled, this will log the query plan inside a
116    /// rolled-back transaction, before running the query.
117    ///
118    /// # Arguments
119    ///
120    /// * `stmt` - `&str` SQL statement.
121    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
122    ///
123    /// # Returns
124    ///
125    /// `anyhow::Result<impl Stream<Item = anyhow::Result<Row>>>`
126    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
127    pub(crate) async fn query_stream(
128        stmt: &str,
129        params: &[&(dyn ToSql + Sync)],
130    ) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>> + use<>> {
131        if Self::is_deep_query_enabled() {
132            Self::explain_analyze_rollback(stmt, params).await?;
133        }
134        let conn = Self::get_pool_connection().await?;
135        let rows = conn.query_raw(stmt, params.iter().copied()).await?;
136        Ok(rows.map_err(Into::into).boxed())
137    }
138
139    /// Query the database for a single row.
140    ///
141    /// # Arguments
142    ///
143    /// * `stmt` - `&str` SQL statement.
144    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
145    ///
146    /// # Returns
147    ///
148    /// `Result<Row, anyhow::Error>`
149    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
150    pub(crate) async fn query_one(
151        stmt: &str,
152        params: &[&(dyn ToSql + Sync)],
153    ) -> anyhow::Result<Row> {
154        if Self::is_deep_query_enabled() {
155            Self::explain_analyze_rollback(stmt, params).await?;
156        }
157        let conn = Self::get_pool_connection().await?;
158        let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
159        Ok(row)
160    }
161
162    /// Modify the database.
163    ///
164    /// Use this for `UPDATE`, `DELETE`, and other DB statements that
165    /// don't return data.
166    ///
167    /// # Arguments
168    ///
169    /// * `stmt` - `&str` SQL statement.
170    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
171    ///
172    /// # Returns
173    ///
174    /// `anyhow::Result<()>`
175    pub(crate) async fn modify(
176        stmt: &str,
177        params: &[&(dyn ToSql + Sync)],
178    ) -> anyhow::Result<()> {
179        if Self::is_deep_query_enabled() {
180            Self::explain_analyze_commit(stmt, params).await?;
181        } else {
182            let conn = Self::get_pool_connection().await?;
183            conn.query(stmt, params).await?;
184        }
185        Ok(())
186    }
187
188    /// Checks that connection to `EventDB` is available.
189    pub(crate) async fn connection_is_ok() -> bool {
190        let event_db_liveness = event_db_is_live();
191        Self::schema_version_check()
192            .await
193            .inspect(|_| {
194                if !event_db_liveness {
195                    set_event_db_liveness(true);
196                }
197            })
198            .inspect_err(|err| {
199                error!(err = err.to_string(), "Event DB connection issues");
200                if event_db_liveness {
201                    set_event_db_liveness(false);
202                }
203            })
204            .is_ok()
205    }
206
207    /// Prepend `EXPLAIN ANALYZE` to the query, and rollback the transaction.
208    async fn explain_analyze_rollback(
209        stmt: &str,
210        params: &[&(dyn ToSql + Sync)],
211    ) -> anyhow::Result<()> {
212        Self::explain_analyze(stmt, params, true).await
213    }
214
215    /// Prepend `EXPLAIN ANALYZE` to the query, and commit the transaction.
216    async fn explain_analyze_commit(
217        stmt: &str,
218        params: &[&(dyn ToSql + Sync)],
219    ) -> anyhow::Result<()> {
220        Self::explain_analyze(stmt, params, false).await
221    }
222
223    /// Prepend `EXPLAIN ANALYZE` to the query.
224    ///
225    /// Log the query plan inside a transaction that may be committed or rolled back.
226    ///
227    /// # Arguments
228    ///
229    /// * `stmt` - `&str` SQL statement.
230    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
231    /// * `rollback` - `bool` whether to roll back the transaction or not.
232    async fn explain_analyze(
233        stmt: &str,
234        params: &[&(dyn ToSql + Sync)],
235        rollback: bool,
236    ) -> anyhow::Result<()> {
237        let span = debug_span!(
238            "query_plan",
239            query_statement = stmt,
240            params = format!("{:?}", params),
241            uuid = uuid::Uuid::new_v4().to_string()
242        );
243
244        async move {
245            let mut conn = Self::get_pool_connection().await?;
246            let transaction = conn.transaction().await?;
247            let explain_stmt = transaction
248                .prepare(format!("EXPLAIN ANALYZE {stmt}").as_str())
249                .await?;
250            let rows = transaction.query(&explain_stmt, params).await?;
251            for r in rows {
252                let query_plan_str: String = r.get("QUERY PLAN");
253                debug!("{}", query_plan_str);
254            }
255            if rollback {
256                transaction.rollback().await?;
257            } else {
258                transaction.commit().await?;
259            }
260            Ok(())
261        }
262        .instrument(span)
263        .await
264    }
265
266    /// Wait for the Event DB to be ready before continuing
267    pub(crate) async fn wait_until_ready(interval: Duration) {
268        loop {
269            if Self::connection_is_ok().await {
270                return;
271            }
272
273            tokio::time::sleep(interval).await;
274        }
275    }
276
277    /// Spawns a background task checking for the Event DB to be
278    /// ready.
279    /// Could spawn only one background task at a time
280    pub(crate) fn spawn_ready_probe() {
281        let spawning = || -> anyhow::Result<()> {
282            let mut task = EVENT_DB_PROBE_TASK
283                .lock()
284                .map_err(|e| anyhow::anyhow!("{e}"))?;
285
286            if task.as_ref().is_none_or(JoinHandle::is_finished) {
287                /// Event DB probe check wait interval
288                const INTERVAL: Duration = Duration::from_secs(1);
289
290                *task = Some(tokio::spawn(async move {
291                    Self::wait_until_ready(INTERVAL).await;
292                    info!("Event DB is ready");
293                }));
294            }
295            Ok(())
296        };
297
298        debug!("Waiting for the Event DB background probe check task to be spawned");
299        while let Err(e) = spawning() {
300            error!(error = ?e, "EVENT_DB_PROBE_TASK is poisoned, should never happen");
301            EVENT_DB_PROBE_TASK.clear_poison();
302        }
303    }
304}
305
306/// Establish a connection pool to the database.
307/// After successful initialisation of the connection pool, spawns a background ready
308/// probe task.
309///
310/// # Parameters
311///
312/// * `url` set to the postgres connection string needed to connect to the database.  IF
313///   it is None, then the env var "`DATABASE_URL`" will be used for this connection
314///   string. eg: "`postgres://catalyst-dev:CHANGE_ME@localhost/CatalystDev`"
315///
316/// # Notes
317///
318/// The env var "`DATABASE_URL`" can be set directly as an anv var, or in a
319/// `.env` file.
320///
321/// If connection to the pool is `OK`, the `LIVE_EVENT_DB` atomic flag is set to `true`.
322pub fn establish_connection_pool() {
323    debug!("Establishing connection with Event DB pool");
324
325    // This was pre-validated and can't fail, but provide default in the impossible case it
326    // does.
327    let url = Settings::event_db_settings().url();
328    let mut config = tokio_postgres::config::Config::from_str(url).unwrap_or_else(|_| {
329        error!(url = url, "Postgres URL Pre Validation has failed.");
330        tokio_postgres::config::Config::default()
331    });
332    if let Some(user) = Settings::event_db_settings().username() {
333        config.user(user);
334    }
335    if let Some(pass) = Settings::event_db_settings().password() {
336        config.password(pass);
337    }
338
339    let pg_mgr = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
340    match deadpool::managed::Pool::builder(pg_mgr)
341        .max_size(Settings::event_db_settings().max_connections() as usize)
342        .create_timeout(Some(
343            Settings::event_db_settings().connection_creation_timeout(),
344        ))
345        .wait_timeout(Some(Settings::event_db_settings().slot_wait_timeout()))
346        .recycle_timeout(Some(
347            Settings::event_db_settings().connection_recycle_timeout(),
348        ))
349        .runtime(deadpool::Runtime::Tokio1)
350        .build()
351    {
352        Ok(pool) => {
353            debug!("Event DB pool configured.");
354            if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
355                error!("Failed to set EVENT_DB_POOL. Already set?");
356            }
357            EventDB::spawn_ready_probe();
358        },
359        Err(err) => {
360            error!(error = %err, "Failed to establish connection with EventDB pool");
361        },
362    }
363}