cat_gateway/service/utilities/middleware/
tracing_mw.rs1use std::time::Instant;
4
5use cpu_time::ProcessTime; use poem::{
7 http::{header, HeaderMap, StatusCode},
8 web::RealIp,
9 Endpoint, Error, FromRequest, IntoResponse, Middleware, PathPattern, Request, Response, Result,
10};
11use poem_openapi::OperationId;
12use tracing::{error, field, warn, Instrument, Level, Span};
13use ulid::Ulid;
14use uuid::Uuid;
15
16use crate::{
17 metrics::endpoint::{
18 CLIENT_REQUEST_COUNT, HTTP_REQUEST_COUNT, HTTP_REQ_CPU_TIME_MS, HTTP_REQ_DURATION_MS,
19 NOT_FOUND_COUNT,
20 },
21 settings::Settings,
22 utils::blake2b_hash::generate_uuid_string_from_data,
23};
24
25#[derive(Default)]
53pub(crate) struct Tracing;
54
55impl<E: Endpoint> Middleware<E> for Tracing {
56 type Output = TracingEndpoint<E>;
57
58 fn transform(
59 &self,
60 ep: E,
61 ) -> Self::Output {
62 TracingEndpoint { inner: ep }
63 }
64}
65
66pub(crate) struct TracingEndpoint<E> {
68 inner: E,
70}
71
72fn anonymize_ip_address(remote_addr: &str) -> String {
74 let addr: Vec<String> = vec![remote_addr.to_string()];
75 generate_uuid_string_from_data(Settings::client_id_key(), &addr)
76}
77
78async fn anonymous_client_id(req: &Request) -> String {
86 let remote_addr = RealIp::from_request_without_body(req)
87 .await
88 .ok()
89 .and_then(|real_ip| real_ip.0)
90 .map_or_else(|| req.remote_addr().to_string(), |addr| addr.to_string());
91
92 anonymize_ip_address(&remote_addr)
93}
94
95struct ResponseData {
97 duration: f64,
99 cpu_time: f64,
101 status_code: u16,
103 endpoint: String,
105 }
107
108impl ResponseData {
109 fn new(
112 duration: f64,
113 cpu_time: f64,
114 resp: &Response,
115 panic: Option<Uuid>,
116 span: &Span,
117 ) -> Self {
118 let oid = resp
120 .data::<OperationId>()
121 .map_or("Unknown".to_string(), std::string::ToString::to_string);
122
123 let status = resp.status().as_u16();
124
125 let endpoint = resp.data::<PathPattern>();
127 let endpoint = endpoint.map_or("Unknown".to_string(), |endpoint| {
128 endpoint.0.trim_end_matches('/').to_string()
130 });
131
132 span.record("duration_ms", duration);
135 span.record("cpu_time_ms", cpu_time);
136 span.record("oid", &oid);
137 span.record("status", status);
138 span.record("endpoint", &endpoint);
139
140 if let Some(panic) = panic {
142 span.record("panic", panic.to_string());
143 }
144
145 add_interesting_headers_to_span(span, "resp", resp.headers());
146
147 Self {
148 duration,
149 cpu_time,
150 status_code: status,
151 endpoint,
152 }
154 }
155}
156
157fn add_interesting_headers_to_span(
160 span: &Span,
161 prefix: &str,
162 headers: &HeaderMap,
163) {
164 let size_field = prefix.to_string() + "_size";
165 let content_type_field = prefix.to_string() + "_content_type";
166 let encoding_field = prefix.to_string() + "_encoding";
167
168 if let Some(size) = headers.get(header::CONTENT_LENGTH) {
170 if let Ok(size) = size.to_str() {
171 span.record(size_field.as_str(), size);
172 }
173 }
174
175 if let Some(content_type) = headers.get(header::CONTENT_TYPE) {
177 if let Ok(content_type) = content_type.to_str() {
178 span.record(content_type_field.as_str(), content_type);
179 }
180 }
181
182 if let Some(encoding) = headers.get(header::CONTENT_ENCODING) {
184 if let Ok(encoding) = encoding.to_str() {
185 span.record(encoding_field.as_str(), encoding);
186 }
187 }
188}
189
190async fn mk_request_span(req: &Request) -> (Span, String, String, String) {
192 let client_id = anonymous_client_id(req).await;
193 let conn_id = Ulid::new();
194
195 let uri_path = req.uri().path().to_string();
196 let uri_query = req.uri().query().unwrap_or("").to_string();
197
198 let method = req.method().to_string();
199
200 let span = tracing::span!(
201 target: "Endpoint",
202 Level::INFO,
203 "request",
204 client = %client_id,
205 conn = %conn_id,
206 version = ?req.version(),
207 method = %method,
208 path = %uri_path,
209 query_size = field::Empty,
210 req_size = field::Empty,
211 req_content_type = field::Empty,
212 req_encoding = field::Empty,
213 resp_size = field::Empty,
214 resp_content_type = field::Empty,
215 resp_encoding = field::Empty,
216 endpoint = field::Empty,
217 duration_ms = field::Empty,
218 cpu_time_ms = field::Empty,
219 oid = field::Empty,
220 status = field::Empty,
221 panic = field::Empty,
222 );
223
224 if !uri_query.is_empty() {
226 span.record("query_size", uri_query.len());
227 }
228
229 add_interesting_headers_to_span(&span, "req", req.headers());
230
231 if let Some(endpoint) = req.data::<PathPattern>() {
233 let endpoint = endpoint.0.trim_end_matches('/').to_string();
234 span.record("endpoint", endpoint);
235 }
236
237 if let Some(oid) = req.data::<OperationId>() {
239 span.record("oid", oid.0.to_string());
240 }
241
242 (span, uri_path, method, client_id)
243}
244
245impl<E: Endpoint> Endpoint for TracingEndpoint<E> {
246 type Output = Response;
247
248 async fn call(
249 &self,
250 req: Request,
251 ) -> Result<Self::Output> {
252 let (span, uri_path, method, client_id) = mk_request_span(&req).await;
254
255 let inner_span = span.clone();
256
257 let (response, resp_data) = async move {
258 let now = Instant::now();
259 let now_proc = ProcessTime::now();
260
261 let resp = self.inner.call(req).await;
262
263 #[allow(clippy::cast_precision_loss)] let duration_proc = now_proc.elapsed().as_micros() as f64 / 1000.0;
265
266 #[allow(clippy::cast_precision_loss)] let duration = now.elapsed().as_micros() as f64 / 1000.0;
268
269 match resp {
270 Ok(resp) => {
271 let resp = resp.into_response();
279
280 let response_data =
281 ResponseData::new(duration, duration_proc, &resp, None, &inner_span);
282
283 (Ok(resp), response_data)
284 },
285 Err(err) => {
286 let panic: Option<Uuid> = None;
293
294 let error_message = err.to_string();
296 let resp = err.into_response();
297 let status = resp.status();
298
299 if status == StatusCode::NOT_FOUND {
301 if Settings::log_not_found() {
302 warn!(
303 %status);
304 }
305 } else {
307 error!(%error_message, %status, "HTTP Response Error");
308 }
309
310 let response_data =
311 ResponseData::new(duration, duration_proc, &resp, panic, &inner_span);
312
313 let mut error = Error::from_response(resp);
315 if !error_message.is_empty() {
316 error.set_error_message(&error_message);
317 }
318
319 (Err(error), response_data)
320 },
321 }
322 }
323 .instrument(span.clone())
324 .await;
325
326 span.in_scope(|| {
327 if resp_data.status_code == StatusCode::NOT_FOUND {
329 NOT_FOUND_COUNT.inc();
330 } else {
331 let path = if resp_data.endpoint.is_empty() {
334 uri_path
335 } else {
336 resp_data.endpoint
337 };
338
339 HTTP_REQ_DURATION_MS
340 .with_label_values(&[&path, &method, &resp_data.status_code.to_string()])
341 .observe(resp_data.duration);
342 HTTP_REQ_CPU_TIME_MS
343 .with_label_values(&[&path, &method, &resp_data.status_code.to_string()])
344 .observe(resp_data.cpu_time);
345 HTTP_REQUEST_COUNT
349 .with_label_values(&[&path, &method, &resp_data.status_code.to_string()])
350 .inc();
351 CLIENT_REQUEST_COUNT
352 .with_label_values(&[&client_id, &resp_data.status_code.to_string()])
353 .inc();
354 }
358 });
359
360 response
361 }
362}