cat_gateway/db/event/
mod.rs1use std::{
3 str::FromStr,
4 sync::{
5 atomic::{AtomicBool, Ordering},
6 Arc, LazyLock, Mutex, OnceLock,
7 },
8 time::Duration,
9};
10
11use error::NotFoundError;
12use futures::{Stream, StreamExt, TryStreamExt};
13use tokio::task::JoinHandle;
14use tokio_postgres::{types::ToSql, Row};
15use tracing::{debug, debug_span, error, info, Instrument};
16
17use crate::{
18 service::utilities::health::{event_db_is_live, set_event_db_liveness},
19 settings::Settings,
20};
21
22pub(crate) mod common;
23pub(crate) mod config;
24pub(crate) mod error;
25pub(crate) mod schema_check;
26pub(crate) mod signed_docs;
27
28pub(crate) const DATABASE_SCHEMA_VERSION: i32 = 4;
31
32type SqlDbPool = Arc<deadpool::managed::Pool<deadpool_postgres::Manager>>;
34
35static EVENT_DB_POOL: OnceLock<SqlDbPool> = OnceLock::new();
37
38static EVENT_DB_PROBE_TASK: LazyLock<Mutex<Option<JoinHandle<()>>>> =
40 LazyLock::new(|| Mutex::new(None));
41
42static DEEP_QUERY_INSPECT: AtomicBool = AtomicBool::new(false);
44
45pub(crate) struct EventDB {}
47
48#[derive(thiserror::Error, Debug, PartialEq, Eq)]
50pub(crate) enum EventDBConnectionError {
51 #[error("DB Pool uninitialized")]
53 DbPoolUninitialized,
54 #[error("DB Pool connection is unavailable")]
56 PoolConnectionUnavailable,
57}
58
59impl EventDB {
60 async fn get_pool_connection(
62 ) -> Result<deadpool::managed::Object<deadpool_postgres::Manager>, EventDBConnectionError> {
63 let pool = EVENT_DB_POOL
64 .get()
65 .ok_or(EventDBConnectionError::DbPoolUninitialized)?;
66 let res = pool
67 .get()
68 .await
69 .map_err(|_| EventDBConnectionError::PoolConnectionUnavailable)?;
70 Ok(res)
71 }
72
73 pub(crate) fn is_deep_query_enabled() -> bool {
75 DEEP_QUERY_INSPECT.load(Ordering::SeqCst)
76 }
77
78 pub(crate) fn modify_deep_query(enable: bool) {
84 DEEP_QUERY_INSPECT.store(enable, Ordering::SeqCst);
85 }
86
87 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
101 pub(crate) async fn query(
102 stmt: &str,
103 params: &[&(dyn ToSql + Sync)],
104 ) -> anyhow::Result<Vec<Row>> {
105 if Self::is_deep_query_enabled() {
106 Self::explain_analyze_rollback(stmt, params).await?;
107 }
108 let conn = Self::get_pool_connection().await?;
109 let rows = conn.query(stmt, params).await?;
110 Ok(rows)
111 }
112
113 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
127 pub(crate) async fn query_stream(
128 stmt: &str,
129 params: &[&(dyn ToSql + Sync)],
130 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>>> {
131 if Self::is_deep_query_enabled() {
132 Self::explain_analyze_rollback(stmt, params).await?;
133 }
134 let conn = Self::get_pool_connection().await?;
135 let rows = conn.query_raw(stmt, params.iter().copied()).await?;
136 Ok(rows.map_err(Into::into).boxed())
137 }
138
139 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
150 pub(crate) async fn query_one(
151 stmt: &str,
152 params: &[&(dyn ToSql + Sync)],
153 ) -> anyhow::Result<Row> {
154 if Self::is_deep_query_enabled() {
155 Self::explain_analyze_rollback(stmt, params).await?;
156 }
157 let conn = Self::get_pool_connection().await?;
158 let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
159 Ok(row)
160 }
161
162 pub(crate) async fn modify(
176 stmt: &str,
177 params: &[&(dyn ToSql + Sync)],
178 ) -> anyhow::Result<()> {
179 if Self::is_deep_query_enabled() {
180 Self::explain_analyze_commit(stmt, params).await?;
181 } else {
182 let conn = Self::get_pool_connection().await?;
183 conn.query(stmt, params).await?;
184 }
185 Ok(())
186 }
187
188 pub(crate) async fn connection_is_ok() -> bool {
190 let event_db_liveness = event_db_is_live();
191 Self::get_pool_connection()
192 .await
193 .inspect(|_| {
194 if !event_db_liveness {
195 set_event_db_liveness(true);
196 }
197 })
198 .inspect_err(|_| {
199 if event_db_liveness {
200 set_event_db_liveness(false);
201 }
202 })
203 .is_ok()
204 }
205
206 async fn explain_analyze_rollback(
208 stmt: &str,
209 params: &[&(dyn ToSql + Sync)],
210 ) -> anyhow::Result<()> {
211 Self::explain_analyze(stmt, params, true).await
212 }
213
214 async fn explain_analyze_commit(
216 stmt: &str,
217 params: &[&(dyn ToSql + Sync)],
218 ) -> anyhow::Result<()> {
219 Self::explain_analyze(stmt, params, false).await
220 }
221
222 async fn explain_analyze(
232 stmt: &str,
233 params: &[&(dyn ToSql + Sync)],
234 rollback: bool,
235 ) -> anyhow::Result<()> {
236 let span = debug_span!(
237 "query_plan",
238 query_statement = stmt,
239 params = format!("{:?}", params),
240 uuid = uuid::Uuid::new_v4().to_string()
241 );
242
243 async move {
244 let mut conn = Self::get_pool_connection().await?;
245 let transaction = conn.transaction().await?;
246 let explain_stmt = transaction
247 .prepare(format!("EXPLAIN ANALYZE {stmt}").as_str())
248 .await?;
249 let rows = transaction.query(&explain_stmt, params).await?;
250 for r in rows {
251 let query_plan_str: String = r.get("QUERY PLAN");
252 debug!("{}", query_plan_str);
253 }
254 if rollback {
255 transaction.rollback().await?;
256 } else {
257 transaction.commit().await?;
258 }
259 Ok(())
260 }
261 .instrument(span)
262 .await
263 }
264
265 pub(crate) async fn wait_until_ready(interval: Duration) {
267 loop {
268 if Self::connection_is_ok().await {
269 return;
270 }
271
272 tokio::time::sleep(interval).await;
273 }
274 }
275
276 pub(crate) fn spawn_ready_probe() {
280 let spawning = || -> anyhow::Result<()> {
281 let mut task = EVENT_DB_PROBE_TASK
282 .lock()
283 .map_err(|e| anyhow::anyhow!("{e}"))?;
284
285 if task.as_ref().is_none_or(JoinHandle::is_finished) {
286 const INTERVAL: Duration = Duration::from_secs(1);
288
289 *task = Some(tokio::spawn(async move {
290 Self::wait_until_ready(INTERVAL).await;
291 info!("Event DB is ready");
292 }));
293 }
294 Ok(())
295 };
296
297 debug!("Waiting for the Event DB background probe check task to be spawned");
298 while let Err(e) = spawning() {
299 error!(error = ?e, "EVENT_DB_PROBE_TASK is poisoned, should never happen");
300 EVENT_DB_PROBE_TASK.clear_poison();
301 }
302 }
303}
304
305pub fn establish_connection_pool() {
322 debug!("Establishing connection with Event DB pool");
323
324 let url = Settings::event_db_settings().url();
327 let mut config = tokio_postgres::config::Config::from_str(url).unwrap_or_else(|_| {
328 error!(url = url, "Postgres URL Pre Validation has failed.");
329 tokio_postgres::config::Config::default()
330 });
331 if let Some(user) = Settings::event_db_settings().username() {
332 config.user(user);
333 }
334 if let Some(pass) = Settings::event_db_settings().password() {
335 config.password(pass);
336 }
337
338 let pg_mgr = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
339
340 match deadpool::managed::Pool::builder(pg_mgr)
341 .max_size(Settings::event_db_settings().max_connections() as usize)
342 .create_timeout(Settings::event_db_settings().connection_creation_timeout())
343 .wait_timeout(Settings::event_db_settings().slot_wait_timeout())
344 .recycle_timeout(Settings::event_db_settings().connection_recycle_timeout())
345 .runtime(deadpool::Runtime::Tokio1)
346 .build()
347 {
348 Ok(pool) => {
349 debug!("Event DB pool configured.");
350 if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
351 error!("Failed to set EVENT_DB_POOL. Already set?");
352 }
353 EventDB::spawn_ready_probe();
354 },
355 Err(err) => {
356 error!(error = %err, "Failed to establish connection with EventDB pool");
357 },
358 }
359}