cat_gateway/service/utilities/middleware/
tracing_mw.rs

1//! Full Tracing and metrics middleware.
2
3use std::time::Instant;
4
5use cpu_time::ProcessTime; // ThreadTime doesn't work.
6use poem::{
7    http::{header, HeaderMap, StatusCode},
8    web::RealIp,
9    Endpoint, Error, FromRequest, IntoResponse, Middleware, PathPattern, Request, Response, Result,
10};
11use poem_openapi::OperationId;
12use tracing::{error, field, warn, Instrument, Level, Span};
13use ulid::Ulid;
14use uuid::Uuid;
15
16use crate::{
17    metrics::endpoint::{
18        CLIENT_REQUEST_COUNT, HTTP_REQUEST_COUNT, HTTP_REQ_CPU_TIME_MS, HTTP_REQ_DURATION_MS,
19        NOT_FOUND_COUNT,
20    },
21    settings::Settings,
22    utils::blake2b_hash::generate_uuid_string_from_data,
23};
24
25// Currently no way to get these values. TODO.
26// Panic Request Count histogram.
27// static PANIC_REQUEST_COUNT: LazyLock<IntCounterVec> = LazyLock::new(|| {
28// #[allow(clippy::ignored_unit_patterns)]
29// register_int_counter_vec!(
30// "panic_request_count",
31// "Number of HTTP requests that panicked",
32// &METRIC_LABELS
33// )
34// .unwrap()
35// });
36
37// Currently no way to get these values without reading the whole response which is BAD.
38// static ref HTTP_REQUEST_SIZE_BYTES: HistogramVec = register_histogram_vec!(
39// "http_request_size_bytes",
40// "Size of HTTP requests in bytes",
41// &METRIC_LABELS
42// )
43// .unwrap();
44// static ref HTTP_RESPONSE_SIZE_BYTES: HistogramVec = register_histogram_vec!(
45// "http_response_size_bytes",
46// "Size of HTTP responses in bytes",
47// &METRIC_LABELS
48// )
49// .unwrap();
50
51/// Middleware for [`tracing`](https://crates.io/crates/tracing).
52#[derive(Default)]
53pub(crate) struct Tracing;
54
55impl<E: Endpoint> Middleware<E> for Tracing {
56    type Output = TracingEndpoint<E>;
57
58    fn transform(
59        &self,
60        ep: E,
61    ) -> Self::Output {
62        TracingEndpoint { inner: ep }
63    }
64}
65
66/// Endpoint for `Tracing` middleware.
67pub(crate) struct TracingEndpoint<E> {
68    /// Inner endpoint
69    inner: E,
70}
71
72/// Given a Clients IP Address, return the anonymized version of it.
73fn anonymize_ip_address(remote_addr: &str) -> String {
74    let addr: Vec<String> = vec![remote_addr.to_string()];
75    generate_uuid_string_from_data(Settings::client_id_key(), &addr)
76}
77
78/// Get an anonymized client ID from the request.
79///
80/// This simply takes the clients IP address,
81/// adds a supplied key to it, and hashes the result.
82///
83/// The Hash is unique per client IP, but not able to
84/// be reversed or analyzed without both the client IP and the key.
85async fn anonymous_client_id(req: &Request) -> String {
86    let remote_addr = RealIp::from_request_without_body(req)
87        .await
88        .ok()
89        .and_then(|real_ip| real_ip.0)
90        .map_or_else(|| req.remote_addr().to_string(), |addr| addr.to_string());
91
92    anonymize_ip_address(&remote_addr)
93}
94
95/// Data we collected about the response
96struct ResponseData {
97    /// Duration of the request
98    duration: f64,
99    /// CPU time of the request
100    cpu_time: f64,
101    /// Status code returned
102    status_code: u16,
103    /// Endpoint name
104    endpoint: String,
105    // panic: bool,
106}
107
108impl ResponseData {
109    /// Create a new `ResponseData` set from the response.
110    /// In the process add relevant data to the span from the response.
111    fn new(
112        duration: f64,
113        cpu_time: f64,
114        resp: &Response,
115        panic: Option<Uuid>,
116        span: &Span,
117    ) -> Self {
118        // The OpenAPI Operation ID of this request.
119        let oid = resp
120            .data::<OperationId>()
121            .map_or("Unknown".to_string(), std::string::ToString::to_string);
122
123        let status = resp.status().as_u16();
124
125        // Get the endpoint (path pattern) (this prevents metrics explosion).
126        let endpoint = resp.data::<PathPattern>();
127        let endpoint = endpoint.map_or("Unknown".to_string(), |endpoint| {
128            // For some reason path patterns can have trailing slashes, so remove them.
129            endpoint.0.trim_end_matches('/').to_string()
130        });
131
132        // Distinguish between "internal" endpoints and truly unknown endpoints.
133
134        span.record("duration_ms", duration);
135        span.record("cpu_time_ms", cpu_time);
136        span.record("oid", &oid);
137        span.record("status", status);
138        span.record("endpoint", &endpoint);
139
140        // Record the panic field in the span if it was set.
141        if let Some(panic) = panic {
142            span.record("panic", panic.to_string());
143        }
144
145        add_interesting_headers_to_span(span, "resp", resp.headers());
146
147        Self {
148            duration,
149            cpu_time,
150            status_code: status,
151            endpoint,
152            // panic: panic.is_some(),
153        }
154    }
155}
156
157/// Add all interesting headers to the correct fields in a span.
158/// This logic is the same for both requests and responses.
159fn add_interesting_headers_to_span(
160    span: &Span,
161    prefix: &str,
162    headers: &HeaderMap,
163) {
164    let size_field = prefix.to_string() + "_size";
165    let content_type_field = prefix.to_string() + "_content_type";
166    let encoding_field = prefix.to_string() + "_encoding";
167
168    // Record request size if its specified.
169    if let Some(size) = headers.get(header::CONTENT_LENGTH) {
170        if let Ok(size) = size.to_str() {
171            span.record(size_field.as_str(), size);
172        }
173    }
174
175    // Record request content type if its specified.
176    if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
177        if let Ok(content_type) = content_type.to_str() {
178            span.record(content_type_field.as_str(), content_type);
179        }
180    }
181
182    // Record request encoding if its specified.
183    if let Some(encoding) = headers.get(header::CONTENT_ENCODING) {
184        if let Ok(encoding) = encoding.to_str() {
185            span.record(encoding_field.as_str(), encoding);
186        }
187    }
188}
189
190/// Make a span from the request
191async fn mk_request_span(req: &Request) -> (Span, String, String, String) {
192    let client_id = anonymous_client_id(req).await;
193    let conn_id = Ulid::new();
194
195    let uri_path = req.uri().path().to_string();
196    let uri_query = req.uri().query().unwrap_or("").to_string();
197
198    let method = req.method().to_string();
199
200    let span = tracing::span!(
201        target: "Endpoint",
202        Level::INFO,
203        "request",
204        client = %client_id,
205        conn = %conn_id,
206        version = ?req.version(),
207        method = %method,
208        path = %uri_path,
209        query_size = field::Empty,
210        req_size = field::Empty,
211        req_content_type = field::Empty,
212        req_encoding = field::Empty,
213        resp_size = field::Empty,
214        resp_content_type = field::Empty,
215        resp_encoding = field::Empty,
216        endpoint = field::Empty,
217        duration_ms = field::Empty,
218        cpu_time_ms = field::Empty,
219        oid = field::Empty,
220        status = field::Empty,
221        panic = field::Empty,
222    );
223
224    // Record query size (To see if we are sent enormous queries).
225    if !uri_query.is_empty() {
226        span.record("query_size", uri_query.len());
227    }
228
229    add_interesting_headers_to_span(&span, "req", req.headers());
230
231    // Try and get the endpoint as a path pattern (this prevents metrics explosion).
232    if let Some(endpoint) = req.data::<PathPattern>() {
233        let endpoint = endpoint.0.trim_end_matches('/').to_string();
234        span.record("endpoint", endpoint);
235    }
236
237    // Try and get the endpoint as a path pattern (this prevents metrics explosion).
238    if let Some(oid) = req.data::<OperationId>() {
239        span.record("oid", oid.0.to_string());
240    }
241
242    (span, uri_path, method, client_id)
243}
244
245impl<E: Endpoint> Endpoint for TracingEndpoint<E> {
246    type Output = Response;
247
248    async fn call(
249        &self,
250        req: Request,
251    ) -> Result<Self::Output> {
252        // Construct the span from the request.
253        let (span, uri_path, method, client_id) = mk_request_span(&req).await;
254
255        let inner_span = span.clone();
256
257        let (response, resp_data) = async move {
258            let now = Instant::now();
259            let now_proc = ProcessTime::now();
260
261            let resp = self.inner.call(req).await;
262
263            #[allow(clippy::cast_precision_loss)] // Precision loss is acceptable for this timing.
264            let duration_proc = now_proc.elapsed().as_micros() as f64 / 1000.0;
265
266            #[allow(clippy::cast_precision_loss)] // Precision loss is acceptable for this timing.
267            let duration = now.elapsed().as_micros() as f64 / 1000.0;
268
269            match resp {
270                Ok(resp) => {
271                    // let panic = if let Some(panic) = resp.downcast_ref::<ServerError>() {
272                    // Add panic ID to the span.
273                    //    Some(panic.id());
274                    //} else {
275                    //    None
276                    //};
277
278                    let resp = resp.into_response();
279
280                    let response_data =
281                        ResponseData::new(duration, duration_proc, &resp, None, &inner_span);
282
283                    (Ok(resp), response_data)
284                },
285                Err(err) => {
286                    // let panic = if let Some(panic) = err.downcast_ref::<ServerError>() {
287                    // Add panic ID to the span.
288                    //    Some(panic.id());
289                    //} else {
290                    //    None
291                    //};
292                    let panic: Option<Uuid> = None;
293
294                    // Convert the error into a response, so we can deal with the error
295                    let error_message = err.to_string();
296                    let resp = err.into_response();
297                    let status = resp.status();
298
299                    // Log 404 as warning, if env set
300                    if status == StatusCode::NOT_FOUND {
301                        if Settings::log_not_found() {
302                            warn!(
303                            %status);
304                        }
305                    // Other response error code
306                    } else {
307                        error!(%error_message, %status, "HTTP Response Error");
308                    }
309
310                    let response_data =
311                        ResponseData::new(duration, duration_proc, &resp, panic, &inner_span);
312
313                    // Convert the response back to an error, and try and recover the message.
314                    let mut error = Error::from_response(resp);
315                    if !error_message.is_empty() {
316                        error.set_error_message(&error_message);
317                    }
318
319                    (Err(error), response_data)
320                },
321            }
322        }
323        .instrument(span.clone())
324        .await;
325
326        span.in_scope(|| {
327            // Only count 404, no other metrics to avoid spam from crawlers
328            if resp_data.status_code == StatusCode::NOT_FOUND {
329                NOT_FOUND_COUNT.inc();
330            } else {
331                // We really want to use the path_pattern from the response, but if not set use the path
332                // from the request.
333                let path = if resp_data.endpoint.is_empty() {
334                    uri_path
335                } else {
336                    resp_data.endpoint
337                };
338
339                HTTP_REQ_DURATION_MS
340                    .with_label_values(&[&path, &method, &resp_data.status_code.to_string()])
341                    .observe(resp_data.duration);
342                HTTP_REQ_CPU_TIME_MS
343                    .with_label_values(&[&path, &method, &resp_data.status_code.to_string()])
344                    .observe(resp_data.cpu_time);
345                // HTTP_REQUEST_RATE
346                //.with_label_values(&[&uri_path, &method, &response.status_code.to_string()])
347                //.inc();
348                HTTP_REQUEST_COUNT
349                    .with_label_values(&[&path, &method, &resp_data.status_code.to_string()])
350                    .inc();
351                CLIENT_REQUEST_COUNT
352                    .with_label_values(&[&client_id, &resp_data.status_code.to_string()])
353                    .inc();
354                // HTTP_REQUEST_SIZE_BYTES
355                //.with_label_values(&[&uri_path, &method, &response.status_code.to_string()])
356                //.observe(response.body().len() as f64);
357            }
358        });
359
360        response
361    }
362}