vector/sinks/datadog/traces/
service.rs

1use std::{
2    collections::BTreeMap,
3    task::{Context, Poll},
4};
5
6use bytes::{Buf, Bytes};
7use futures::future::BoxFuture;
8use http::{Request, StatusCode, Uri};
9use hyper::Body;
10use snafu::ResultExt;
11use tower::Service;
12use vector_lib::{
13    event::{EventFinalizers, EventStatus, Finalizable},
14    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
15    stream::DriverResponse,
16};
17
18use crate::{
19    http::{BuildRequestSnafu, CallRequestSnafu, HttpClient, HttpError},
20    sinks::util::retries::{RetryAction, RetryLogic},
21};
22
23#[derive(Debug, Default, Clone)]
24pub struct TraceApiRetry;
25
26impl RetryLogic for TraceApiRetry {
27    type Error = HttpError;
28    type Request = TraceApiRequest;
29    type Response = TraceApiResponse;
30
31    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
32        true
33    }
34
35    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
36        let status = response.status_code;
37        match status {
38            // Use the same status code/retry policy as the Trace agent, additionally retrying
39            // forbidden requests.
40            //
41            // This retry logic will be expanded further, but specifically retrying unauthorized
42            // requests for now. I verified using `curl` that `403` is the response code for this.
43            //
44            // https://github.com/vectordotdev/vector/issues/10870
45            // https://github.com/vectordotdev/vector/issues/12220
46            StatusCode::FORBIDDEN => RetryAction::Retry("forbidden".into()),
47            StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
48            _ if status.is_server_error() => RetryAction::Retry(
49                format!("{}: {}", status, String::from_utf8_lossy(&response.body)).into(),
50            ),
51            _ if status.is_success() => RetryAction::Successful,
52            _ => RetryAction::DontRetry(format!("response status: {status}").into()),
53        }
54    }
55}
56
57#[derive(Debug, Clone)]
58pub struct TraceApiRequest {
59    pub body: Bytes,
60    pub headers: BTreeMap<String, String>,
61    pub finalizers: EventFinalizers,
62    pub uri: Uri,
63    pub uncompressed_size: usize,
64    pub metadata: RequestMetadata,
65}
66
67impl TraceApiRequest {
68    pub fn into_http_request(self) -> http::Result<Request<Body>> {
69        let mut request = Request::post(self.uri);
70        for (k, v) in self.headers.iter() {
71            request = request.header(k, v);
72        }
73        request.body(Body::from(self.body))
74    }
75}
76
77impl Finalizable for TraceApiRequest {
78    fn take_finalizers(&mut self) -> EventFinalizers {
79        std::mem::take(&mut self.finalizers)
80    }
81}
82
83impl MetaDescriptive for TraceApiRequest {
84    fn get_metadata(&self) -> &RequestMetadata {
85        &self.metadata
86    }
87
88    fn metadata_mut(&mut self) -> &mut RequestMetadata {
89        &mut self.metadata
90    }
91}
92
93#[derive(Debug)]
94pub struct TraceApiResponse {
95    status_code: StatusCode,
96    body: Bytes,
97    byte_size: GroupedCountByteSize,
98    uncompressed_size: usize,
99}
100
101impl DriverResponse for TraceApiResponse {
102    fn event_status(&self) -> EventStatus {
103        if self.status_code.is_success() {
104            EventStatus::Delivered
105        } else if self.status_code.is_client_error() {
106            EventStatus::Rejected
107        } else {
108            EventStatus::Errored
109        }
110    }
111
112    fn events_sent(&self) -> &GroupedCountByteSize {
113        &self.byte_size
114    }
115
116    fn bytes_sent(&self) -> Option<usize> {
117        Some(self.uncompressed_size)
118    }
119}
120
121/// Wrapper for the Datadog API.
122///
123/// Provides a `tower::Service` for the Datadog Traces API, allowing it to be
124/// composed within a Tower "stack", such that we can easily and transparently
125/// provide retries, concurrency limits, rate limits, and more.
126#[derive(Debug, Clone)]
127pub struct TraceApiService {
128    client: HttpClient,
129}
130
131impl TraceApiService {
132    pub const fn new(client: HttpClient) -> Self {
133        Self { client }
134    }
135}
136
137impl Service<TraceApiRequest> for TraceApiService {
138    type Response = TraceApiResponse;
139    type Error = HttpError;
140    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
141
142    // Emission of Error internal event is handled upstream by the caller
143    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
144        self.client.poll_ready(cx)
145    }
146
147    // Emission of Error internal event is handled upstream by the caller
148    fn call(&mut self, mut request: TraceApiRequest) -> Self::Future {
149        let client = self.client.clone();
150
151        Box::pin(async move {
152            let metadata = std::mem::take(request.metadata_mut());
153            let byte_size = metadata.into_events_estimated_json_encoded_byte_size();
154            let uncompressed_size = request.uncompressed_size;
155            let http_request = request.into_http_request().context(BuildRequestSnafu)?;
156
157            let response = client.send(http_request).await?;
158            let (parts, body) = response.into_parts();
159            let mut body = hyper::body::aggregate(body)
160                .await
161                .context(CallRequestSnafu)?;
162            let body = body.copy_to_bytes(body.remaining());
163
164            Ok(TraceApiResponse {
165                status_code: parts.status,
166                body,
167                byte_size,
168                uncompressed_size,
169            })
170        })
171    }
172}