cat_gateway/db/event/
mod.rs

1//! Catalyst Election Database crate
2use std::{
3    str::FromStr,
4    sync::{
5        atomic::{AtomicBool, Ordering},
6        Arc, LazyLock, Mutex, OnceLock,
7    },
8    time::Duration,
9};
10
11use error::NotFoundError;
12use futures::{Stream, StreamExt, TryStreamExt};
13use tokio::task::JoinHandle;
14use tokio_postgres::{types::ToSql, Row};
15use tracing::{debug, debug_span, error, info, Instrument};
16
17use crate::{
18    service::utilities::health::{event_db_is_live, set_event_db_liveness},
19    settings::Settings,
20};
21
22pub(crate) mod common;
23pub(crate) mod config;
24pub(crate) mod error;
25pub(crate) mod schema_check;
26pub(crate) mod signed_docs;
27
28/// Database version this crate matches.
29/// Must equal the last Migrations Version Number from `event-db/migrations`.
30pub(crate) const DATABASE_SCHEMA_VERSION: i32 = 4;
31
32/// Postgres Connection Manager DB Pool
33type SqlDbPool = Arc<deadpool::managed::Pool<deadpool_postgres::Manager>>;
34
35/// Postgres Connection Manager DB Pool Instance
36static EVENT_DB_POOL: OnceLock<SqlDbPool> = OnceLock::new();
37
38/// Background Event DB probe check
39static EVENT_DB_PROBE_TASK: LazyLock<Mutex<Option<JoinHandle<()>>>> =
40    LazyLock::new(|| Mutex::new(None));
41
42/// Is Deep Query Analysis enabled or not?
43static DEEP_QUERY_INSPECT: AtomicBool = AtomicBool::new(false);
44
45/// The Catalyst Event SQL Database
46pub(crate) struct EventDB {}
47
48/// `EventDB` Errors
49#[derive(thiserror::Error, Debug, PartialEq, Eq)]
50pub(crate) enum EventDBConnectionError {
51    /// Failed to get a DB Pool
52    #[error("DB Pool uninitialized")]
53    DbPoolUninitialized,
54    /// Failed to get a DB Pool Connection
55    #[error("DB Pool connection is unavailable")]
56    PoolConnectionUnavailable,
57}
58
59impl EventDB {
60    /// Get a connection from the pool.
61    async fn get_pool_connection(
62    ) -> Result<deadpool::managed::Object<deadpool_postgres::Manager>, EventDBConnectionError> {
63        let pool = EVENT_DB_POOL
64            .get()
65            .ok_or(EventDBConnectionError::DbPoolUninitialized)?;
66        let res = pool
67            .get()
68            .await
69            .map_err(|_| EventDBConnectionError::PoolConnectionUnavailable)?;
70        Ok(res)
71    }
72
73    /// Determine if deep query inspection is enabled.
74    pub(crate) fn is_deep_query_enabled() -> bool {
75        DEEP_QUERY_INSPECT.load(Ordering::SeqCst)
76    }
77
78    /// Modify the deep query inspection setting.
79    ///
80    /// # Arguments
81    ///
82    /// * `enable` - Set the `DeepQueryInspection` setting to this value.
83    pub(crate) fn modify_deep_query(enable: bool) {
84        DEEP_QUERY_INSPECT.store(enable, Ordering::SeqCst);
85    }
86
87    /// Query the database.
88    ///
89    /// If deep query inspection is enabled, this will log the query plan inside a
90    /// rolled-back transaction, before running the query.
91    ///
92    /// # Arguments
93    ///
94    /// * `stmt` - `&str` SQL statement.
95    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
96    ///
97    /// # Returns
98    ///
99    /// `anyhow::Result<Vec<Row>>`
100    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
101    pub(crate) async fn query(
102        stmt: &str,
103        params: &[&(dyn ToSql + Sync)],
104    ) -> anyhow::Result<Vec<Row>> {
105        if Self::is_deep_query_enabled() {
106            Self::explain_analyze_rollback(stmt, params).await?;
107        }
108        let conn = Self::get_pool_connection().await?;
109        let rows = conn.query(stmt, params).await?;
110        Ok(rows)
111    }
112
113    /// Query the database and return a async stream of rows.
114    ///
115    /// If deep query inspection is enabled, this will log the query plan inside a
116    /// rolled-back transaction, before running the query.
117    ///
118    /// # Arguments
119    ///
120    /// * `stmt` - `&str` SQL statement.
121    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
122    ///
123    /// # Returns
124    ///
125    /// `anyhow::Result<impl Stream<Item = anyhow::Result<Row>>>`
126    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
127    pub(crate) async fn query_stream(
128        stmt: &str,
129        params: &[&(dyn ToSql + Sync)],
130    ) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>>> {
131        if Self::is_deep_query_enabled() {
132            Self::explain_analyze_rollback(stmt, params).await?;
133        }
134        let conn = Self::get_pool_connection().await?;
135        let rows = conn.query_raw(stmt, params.iter().copied()).await?;
136        Ok(rows.map_err(Into::into).boxed())
137    }
138
139    /// Query the database for a single row.
140    ///
141    /// # Arguments
142    ///
143    /// * `stmt` - `&str` SQL statement.
144    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
145    ///
146    /// # Returns
147    ///
148    /// `Result<Row, anyhow::Error>`
149    #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
150    pub(crate) async fn query_one(
151        stmt: &str,
152        params: &[&(dyn ToSql + Sync)],
153    ) -> anyhow::Result<Row> {
154        if Self::is_deep_query_enabled() {
155            Self::explain_analyze_rollback(stmt, params).await?;
156        }
157        let conn = Self::get_pool_connection().await?;
158        let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
159        Ok(row)
160    }
161
162    /// Modify the database.
163    ///
164    /// Use this for `UPDATE`, `DELETE`, and other DB statements that
165    /// don't return data.
166    ///
167    /// # Arguments
168    ///
169    /// * `stmt` - `&str` SQL statement.
170    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
171    ///
172    /// # Returns
173    ///
174    /// `anyhow::Result<()>`
175    pub(crate) async fn modify(
176        stmt: &str,
177        params: &[&(dyn ToSql + Sync)],
178    ) -> anyhow::Result<()> {
179        if Self::is_deep_query_enabled() {
180            Self::explain_analyze_commit(stmt, params).await?;
181        } else {
182            let conn = Self::get_pool_connection().await?;
183            conn.query(stmt, params).await?;
184        }
185        Ok(())
186    }
187
188    /// Checks that connection to `EventDB` is available.
189    pub(crate) async fn connection_is_ok() -> bool {
190        let event_db_liveness = event_db_is_live();
191        Self::get_pool_connection()
192            .await
193            .inspect(|_| {
194                if !event_db_liveness {
195                    set_event_db_liveness(true);
196                }
197            })
198            .inspect_err(|_| {
199                if event_db_liveness {
200                    set_event_db_liveness(false);
201                }
202            })
203            .is_ok()
204    }
205
206    /// Prepend `EXPLAIN ANALYZE` to the query, and rollback the transaction.
207    async fn explain_analyze_rollback(
208        stmt: &str,
209        params: &[&(dyn ToSql + Sync)],
210    ) -> anyhow::Result<()> {
211        Self::explain_analyze(stmt, params, true).await
212    }
213
214    /// Prepend `EXPLAIN ANALYZE` to the query, and commit the transaction.
215    async fn explain_analyze_commit(
216        stmt: &str,
217        params: &[&(dyn ToSql + Sync)],
218    ) -> anyhow::Result<()> {
219        Self::explain_analyze(stmt, params, false).await
220    }
221
222    /// Prepend `EXPLAIN ANALYZE` to the query.
223    ///
224    /// Log the query plan inside a transaction that may be committed or rolled back.
225    ///
226    /// # Arguments
227    ///
228    /// * `stmt` - `&str` SQL statement.
229    /// * `params` - `&[&(dyn ToSql + Sync)]` SQL parameters.
230    /// * `rollback` - `bool` whether to roll back the transaction or not.
231    async fn explain_analyze(
232        stmt: &str,
233        params: &[&(dyn ToSql + Sync)],
234        rollback: bool,
235    ) -> anyhow::Result<()> {
236        let span = debug_span!(
237            "query_plan",
238            query_statement = stmt,
239            params = format!("{:?}", params),
240            uuid = uuid::Uuid::new_v4().to_string()
241        );
242
243        async move {
244            let mut conn = Self::get_pool_connection().await?;
245            let transaction = conn.transaction().await?;
246            let explain_stmt = transaction
247                .prepare(format!("EXPLAIN ANALYZE {stmt}").as_str())
248                .await?;
249            let rows = transaction.query(&explain_stmt, params).await?;
250            for r in rows {
251                let query_plan_str: String = r.get("QUERY PLAN");
252                debug!("{}", query_plan_str);
253            }
254            if rollback {
255                transaction.rollback().await?;
256            } else {
257                transaction.commit().await?;
258            }
259            Ok(())
260        }
261        .instrument(span)
262        .await
263    }
264
265    /// Wait for the Event DB to be ready before continuing
266    pub(crate) async fn wait_until_ready(interval: Duration) {
267        loop {
268            if Self::connection_is_ok().await {
269                return;
270            }
271
272            tokio::time::sleep(interval).await;
273        }
274    }
275
276    /// Spawns a background task checking for the Event DB to be
277    /// ready.
278    /// Could spawn only one background task at a time
279    pub(crate) fn spawn_ready_probe() {
280        let spawning = || -> anyhow::Result<()> {
281            let mut task = EVENT_DB_PROBE_TASK
282                .lock()
283                .map_err(|e| anyhow::anyhow!("{e}"))?;
284
285            if task.as_ref().is_none_or(JoinHandle::is_finished) {
286                /// Event DB probe check wait interval
287                const INTERVAL: Duration = Duration::from_secs(1);
288
289                *task = Some(tokio::spawn(async move {
290                    Self::wait_until_ready(INTERVAL).await;
291                    info!("Event DB is ready");
292                }));
293            }
294            Ok(())
295        };
296
297        debug!("Waiting for the Event DB background probe check task to be spawned");
298        while let Err(e) = spawning() {
299            error!(error = ?e, "EVENT_DB_PROBE_TASK is poisoned, should never happen");
300            EVENT_DB_PROBE_TASK.clear_poison();
301        }
302    }
303}
304
305/// Establish a connection pool to the database.
306/// After successful initialisation of the connection pool, spawns a background ready
307/// probe task.
308///
309/// # Parameters
310///
311/// * `url` set to the postgres connection string needed to connect to the database.  IF
312///   it is None, then the env var "`DATABASE_URL`" will be used for this connection
313///   string. eg: "`postgres://catalyst-dev:CHANGE_ME@localhost/CatalystDev`"
314///
315/// # Notes
316///
317/// The env var "`DATABASE_URL`" can be set directly as an anv var, or in a
318/// `.env` file.
319///
320/// If connection to the pool is `OK`, the `LIVE_EVENT_DB` atomic flag is set to `true`.
321pub fn establish_connection_pool() {
322    debug!("Establishing connection with Event DB pool");
323
324    // This was pre-validated and can't fail, but provide default in the impossible case it
325    // does.
326    let url = Settings::event_db_settings().url();
327    let mut config = tokio_postgres::config::Config::from_str(url).unwrap_or_else(|_| {
328        error!(url = url, "Postgres URL Pre Validation has failed.");
329        tokio_postgres::config::Config::default()
330    });
331    if let Some(user) = Settings::event_db_settings().username() {
332        config.user(user);
333    }
334    if let Some(pass) = Settings::event_db_settings().password() {
335        config.password(pass);
336    }
337
338    let pg_mgr = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
339
340    match deadpool::managed::Pool::builder(pg_mgr)
341        .max_size(Settings::event_db_settings().max_connections() as usize)
342        .create_timeout(Settings::event_db_settings().connection_creation_timeout())
343        .wait_timeout(Settings::event_db_settings().slot_wait_timeout())
344        .recycle_timeout(Settings::event_db_settings().connection_recycle_timeout())
345        .runtime(deadpool::Runtime::Tokio1)
346        .build()
347    {
348        Ok(pool) => {
349            debug!("Event DB pool configured.");
350            if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
351                error!("Failed to set EVENT_DB_POOL. Already set?");
352            }
353            EventDB::spawn_ready_probe();
354        },
355        Err(err) => {
356            error!(error = %err, "Failed to establish connection with EventDB pool");
357        },
358    }
359}