cat_gateway/db/event/
mod.rs1use std::{
3 str::FromStr,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc, OnceLock,
7 },
8};
9
10use bb8::{Pool, PooledConnection};
11use bb8_postgres::PostgresConnectionManager;
12use error::NotFoundError;
13use futures::{Stream, StreamExt, TryStreamExt};
14use tokio_postgres::{types::ToSql, NoTls, Row};
15use tracing::{debug, debug_span, error, Instrument};
16
17use crate::{
18 service::utilities::health::{event_db_is_live, set_event_db_liveness},
19 settings::Settings,
20};
21
22pub(crate) mod common;
23pub(crate) mod config;
24pub(crate) mod error;
25pub(crate) mod schema_check;
26pub(crate) mod signed_docs;
27
28pub(crate) const DATABASE_SCHEMA_VERSION: i32 = 4;
31
32type SqlDbPool = Arc<Pool<PostgresConnectionManager<NoTls>>>;
34
35static EVENT_DB_POOL: OnceLock<SqlDbPool> = OnceLock::new();
37
38static DEEP_QUERY_INSPECT: AtomicBool = AtomicBool::new(false);
40
41pub(crate) struct EventDB {}
43
44#[derive(thiserror::Error, Debug, PartialEq, Eq)]
46pub(crate) enum EventDBConnectionError {
47 #[error("DB Pool uninitialized")]
49 DbPoolUninitialized,
50 #[error("DB Pool connection is unavailable")]
52 PoolConnectionUnavailable,
53}
54
55impl EventDB {
56 async fn get_pool_connection<'a>(
58 ) -> Result<PooledConnection<'a, PostgresConnectionManager<NoTls>>, EventDBConnectionError>
59 {
60 let pool = EVENT_DB_POOL
61 .get()
62 .ok_or(EventDBConnectionError::DbPoolUninitialized)?;
63 pool.get()
64 .await
65 .map_err(|_| EventDBConnectionError::PoolConnectionUnavailable)
66 }
67
68 pub(crate) fn is_deep_query_enabled() -> bool {
70 DEEP_QUERY_INSPECT.load(Ordering::SeqCst)
71 }
72
73 pub(crate) fn modify_deep_query(enable: bool) {
79 DEEP_QUERY_INSPECT.store(enable, Ordering::SeqCst);
80 }
81
82 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
96 pub(crate) async fn query(
97 stmt: &str, params: &[&(dyn ToSql + Sync)],
98 ) -> anyhow::Result<Vec<Row>> {
99 if Self::is_deep_query_enabled() {
100 Self::explain_analyze_rollback(stmt, params).await?;
101 }
102 let conn = Self::get_pool_connection().await?;
103 let rows = conn.query(stmt, params).await?;
104 Ok(rows)
105 }
106
107 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
121 pub(crate) async fn query_stream(
122 stmt: &str, params: &[&(dyn ToSql + Sync)],
123 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>>> {
124 if Self::is_deep_query_enabled() {
125 Self::explain_analyze_rollback(stmt, params).await?;
126 }
127 let conn = Self::get_pool_connection().await?;
128 let rows = conn.query_raw(stmt, params.iter().copied()).await?;
129 Ok(rows.map_err(Into::into).boxed())
130 }
131
132 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
143 pub(crate) async fn query_one(
144 stmt: &str, params: &[&(dyn ToSql + Sync)],
145 ) -> anyhow::Result<Row> {
146 if Self::is_deep_query_enabled() {
147 Self::explain_analyze_rollback(stmt, params).await?;
148 }
149 let conn = Self::get_pool_connection().await?;
150 let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
151 Ok(row)
152 }
153
154 pub(crate) async fn modify(stmt: &str, params: &[&(dyn ToSql + Sync)]) -> anyhow::Result<()> {
168 if Self::is_deep_query_enabled() {
169 Self::explain_analyze_commit(stmt, params).await?;
170 } else {
171 let conn = Self::get_pool_connection().await?;
172 conn.query(stmt, params).await?;
173 }
174 Ok(())
175 }
176
177 pub(crate) async fn connection_is_ok() -> bool {
179 let event_db_liveness = event_db_is_live();
180 Self::get_pool_connection()
181 .await
182 .inspect(|_| {
183 if !event_db_liveness {
184 set_event_db_liveness(true);
185 }
186 })
187 .inspect_err(|_| {
188 if event_db_liveness {
189 set_event_db_liveness(false);
190 }
191 })
192 .is_ok()
193 }
194
195 async fn explain_analyze_rollback(
197 stmt: &str, params: &[&(dyn ToSql + Sync)],
198 ) -> anyhow::Result<()> {
199 Self::explain_analyze(stmt, params, true).await
200 }
201
202 async fn explain_analyze_commit(
204 stmt: &str, params: &[&(dyn ToSql + Sync)],
205 ) -> anyhow::Result<()> {
206 Self::explain_analyze(stmt, params, false).await
207 }
208
209 async fn explain_analyze(
219 stmt: &str, params: &[&(dyn ToSql + Sync)], rollback: bool,
220 ) -> anyhow::Result<()> {
221 let span = debug_span!(
222 "query_plan",
223 query_statement = stmt,
224 params = format!("{:?}", params),
225 uuid = uuid::Uuid::new_v4().to_string()
226 );
227
228 async move {
229 let mut conn = Self::get_pool_connection().await?;
230 let transaction = conn.transaction().await?;
231 let explain_stmt = transaction
232 .prepare(format!("EXPLAIN ANALYZE {stmt}").as_str())
233 .await?;
234 let rows = transaction.query(&explain_stmt, params).await?;
235 for r in rows {
236 let query_plan_str: String = r.get("QUERY PLAN");
237 debug!("{}", query_plan_str);
238 }
239 if rollback {
240 transaction.rollback().await?;
241 } else {
242 transaction.commit().await?;
243 }
244 Ok(())
245 }
246 .instrument(span)
247 .await
248 }
249}
250
251pub async fn establish_connection_pool() {
276 let (url, user, pass, max_connections, max_lifetime, min_idle, connection_timeout) =
277 Settings::event_db_settings();
278 debug!("Establishing connection with Event DB pool");
279
280 let mut config = tokio_postgres::config::Config::from_str(url).unwrap_or_else(|_| {
283 error!(url = url, "Postgres URL Pre Validation has failed.");
284 tokio_postgres::config::Config::default()
285 });
286 if let Some(user) = user {
287 config.user(user);
288 }
289 if let Some(pass) = pass {
290 config.password(pass);
291 }
292
293 let pg_mgr = PostgresConnectionManager::new(config, tokio_postgres::NoTls);
294
295 match Pool::builder()
296 .max_size(max_connections)
297 .max_lifetime(Some(core::time::Duration::from_secs(max_lifetime.into())))
298 .min_idle(min_idle)
299 .connection_timeout(core::time::Duration::from_secs(connection_timeout.into()))
300 .build(pg_mgr)
301 .await
302 {
303 Ok(pool) => {
304 debug!("Event DB pool configured.");
305 if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
306 error!("Failed to set EVENT_DB_POOL. Already set?");
307 }
308 },
309 Err(err) => {
310 error!(error = %err, "Failed to establish connection with EventDB pool");
311 },
312 }
313}