cat_gateway/db/event/
mod.rs1use std::{
3 str::FromStr,
4 sync::{
5 Arc, LazyLock, Mutex, OnceLock,
6 atomic::{AtomicBool, Ordering},
7 },
8 time::Duration,
9};
10
11use error::NotFoundError;
12use futures::{Stream, StreamExt, TryStreamExt};
13use tokio::task::JoinHandle;
14use tokio_postgres::{Row, types::ToSql};
15use tracing::{Instrument, debug, debug_span, error, info};
16
17use crate::{
18 service::utilities::health::{event_db_is_live, set_event_db_liveness},
19 settings::Settings,
20};
21
22pub(crate) mod common;
23pub(crate) mod config;
24pub(crate) mod error;
25pub(crate) mod schema_check;
26pub(crate) mod signed_docs;
27
28pub(crate) const DATABASE_SCHEMA_VERSION: i32 = 3;
31
32type SqlDbPool = Arc<deadpool::managed::Pool<deadpool_postgres::Manager>>;
34
35static EVENT_DB_POOL: OnceLock<SqlDbPool> = OnceLock::new();
37
38static EVENT_DB_PROBE_TASK: LazyLock<Mutex<Option<JoinHandle<()>>>> =
40 LazyLock::new(|| Mutex::new(None));
41
42static DEEP_QUERY_INSPECT: AtomicBool = AtomicBool::new(false);
44
45pub(crate) struct EventDB {}
47
48#[derive(thiserror::Error, Debug, PartialEq, Eq)]
50pub(crate) enum EventDBConnectionError {
51 #[error("DB Pool uninitialized")]
53 DbPoolUninitialized,
54 #[error("DB Pool connection is unavailable")]
56 PoolConnectionUnavailable,
57}
58
59impl EventDB {
60 async fn get_pool_connection()
62 -> Result<deadpool::managed::Object<deadpool_postgres::Manager>, EventDBConnectionError> {
63 let pool = EVENT_DB_POOL
64 .get()
65 .ok_or(EventDBConnectionError::DbPoolUninitialized)?;
66 let res = pool
67 .get()
68 .await
69 .map_err(|_| EventDBConnectionError::PoolConnectionUnavailable)?;
70 Ok(res)
71 }
72
73 pub(crate) fn is_deep_query_enabled() -> bool {
75 DEEP_QUERY_INSPECT.load(Ordering::SeqCst)
76 }
77
78 pub(crate) fn modify_deep_query(enable: bool) {
84 DEEP_QUERY_INSPECT.store(enable, Ordering::SeqCst);
85 }
86
87 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
101 pub(crate) async fn query(
102 stmt: &str,
103 params: &[&(dyn ToSql + Sync)],
104 ) -> anyhow::Result<Vec<Row>> {
105 if Self::is_deep_query_enabled() {
106 Self::explain_analyze_rollback(stmt, params).await?;
107 }
108 let conn = Self::get_pool_connection().await?;
109 let rows = conn.query(stmt, params).await?;
110 Ok(rows)
111 }
112
113 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
127 pub(crate) async fn query_stream(
128 stmt: &str,
129 params: &[&(dyn ToSql + Sync)],
130 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<Row>> + use<>> {
131 if Self::is_deep_query_enabled() {
132 Self::explain_analyze_rollback(stmt, params).await?;
133 }
134 let conn = Self::get_pool_connection().await?;
135 let rows = conn.query_raw(stmt, params.iter().copied()).await?;
136 Ok(rows.map_err(Into::into).boxed())
137 }
138
139 #[must_use = "ONLY use this function for SELECT type operations which return row data, otherwise use `modify()`"]
150 pub(crate) async fn query_one(
151 stmt: &str,
152 params: &[&(dyn ToSql + Sync)],
153 ) -> anyhow::Result<Row> {
154 if Self::is_deep_query_enabled() {
155 Self::explain_analyze_rollback(stmt, params).await?;
156 }
157 let conn = Self::get_pool_connection().await?;
158 let row = conn.query_opt(stmt, params).await?.ok_or(NotFoundError)?;
159 Ok(row)
160 }
161
162 pub(crate) async fn modify(
176 stmt: &str,
177 params: &[&(dyn ToSql + Sync)],
178 ) -> anyhow::Result<()> {
179 if Self::is_deep_query_enabled() {
180 Self::explain_analyze_commit(stmt, params).await?;
181 } else {
182 let conn = Self::get_pool_connection().await?;
183 conn.query(stmt, params).await?;
184 }
185 Ok(())
186 }
187
188 pub(crate) async fn connection_is_ok() -> bool {
190 let event_db_liveness = event_db_is_live();
191 Self::schema_version_check()
192 .await
193 .inspect(|_| {
194 if !event_db_liveness {
195 set_event_db_liveness(true);
196 }
197 })
198 .inspect_err(|err| {
199 error!(err = err.to_string(), "Event DB connection issues");
200 if event_db_liveness {
201 set_event_db_liveness(false);
202 }
203 })
204 .is_ok()
205 }
206
207 async fn explain_analyze_rollback(
209 stmt: &str,
210 params: &[&(dyn ToSql + Sync)],
211 ) -> anyhow::Result<()> {
212 Self::explain_analyze(stmt, params, true).await
213 }
214
215 async fn explain_analyze_commit(
217 stmt: &str,
218 params: &[&(dyn ToSql + Sync)],
219 ) -> anyhow::Result<()> {
220 Self::explain_analyze(stmt, params, false).await
221 }
222
223 async fn explain_analyze(
233 stmt: &str,
234 params: &[&(dyn ToSql + Sync)],
235 rollback: bool,
236 ) -> anyhow::Result<()> {
237 let span = debug_span!(
238 "query_plan",
239 query_statement = stmt,
240 params = format!("{:?}", params),
241 uuid = uuid::Uuid::new_v4().to_string()
242 );
243
244 async move {
245 let mut conn = Self::get_pool_connection().await?;
246 let transaction = conn.transaction().await?;
247 let explain_stmt = transaction
248 .prepare(format!("EXPLAIN ANALYZE {stmt}").as_str())
249 .await?;
250 let rows = transaction.query(&explain_stmt, params).await?;
251 for r in rows {
252 let query_plan_str: String = r.get("QUERY PLAN");
253 debug!("{}", query_plan_str);
254 }
255 if rollback {
256 transaction.rollback().await?;
257 } else {
258 transaction.commit().await?;
259 }
260 Ok(())
261 }
262 .instrument(span)
263 .await
264 }
265
266 pub(crate) async fn wait_until_ready(interval: Duration) {
268 loop {
269 if Self::connection_is_ok().await {
270 return;
271 }
272
273 tokio::time::sleep(interval).await;
274 }
275 }
276
277 pub(crate) fn spawn_ready_probe() {
281 let spawning = || -> anyhow::Result<()> {
282 let mut task = EVENT_DB_PROBE_TASK
283 .lock()
284 .map_err(|e| anyhow::anyhow!("{e}"))?;
285
286 if task.as_ref().is_none_or(JoinHandle::is_finished) {
287 const INTERVAL: Duration = Duration::from_secs(1);
289
290 *task = Some(tokio::spawn(async move {
291 Self::wait_until_ready(INTERVAL).await;
292 info!("Event DB is ready");
293 }));
294 }
295 Ok(())
296 };
297
298 debug!("Waiting for the Event DB background probe check task to be spawned");
299 while let Err(e) = spawning() {
300 error!(error = ?e, "EVENT_DB_PROBE_TASK is poisoned, should never happen");
301 EVENT_DB_PROBE_TASK.clear_poison();
302 }
303 }
304}
305
306pub fn establish_connection_pool() {
323 debug!("Establishing connection with Event DB pool");
324
325 let url = Settings::event_db_settings().url();
328 let mut config = tokio_postgres::config::Config::from_str(url).unwrap_or_else(|_| {
329 error!(url = url, "Postgres URL Pre Validation has failed.");
330 tokio_postgres::config::Config::default()
331 });
332 if let Some(user) = Settings::event_db_settings().username() {
333 config.user(user);
334 }
335 if let Some(pass) = Settings::event_db_settings().password() {
336 config.password(pass);
337 }
338
339 let pg_mgr = deadpool_postgres::Manager::new(config, tokio_postgres::NoTls);
340 match deadpool::managed::Pool::builder(pg_mgr)
341 .max_size(Settings::event_db_settings().max_connections() as usize)
342 .create_timeout(Some(
343 Settings::event_db_settings().connection_creation_timeout(),
344 ))
345 .wait_timeout(Some(Settings::event_db_settings().slot_wait_timeout()))
346 .recycle_timeout(Some(
347 Settings::event_db_settings().connection_recycle_timeout(),
348 ))
349 .runtime(deadpool::Runtime::Tokio1)
350 .build()
351 {
352 Ok(pool) => {
353 debug!("Event DB pool configured.");
354 if EVENT_DB_POOL.set(Arc::new(pool)).is_err() {
355 error!("Failed to set EVENT_DB_POOL. Already set?");
356 }
357 EventDB::spawn_ready_probe();
358 },
359 Err(err) => {
360 error!(error = %err, "Failed to establish connection with EventDB pool");
361 },
362 }
363}