vector/sinks/datadog/traces/
service.rs

1use std::{
2    collections::BTreeMap,
3    task::{Context, Poll},
4};
5
6use bytes::{Buf, Bytes};
7use futures::future::BoxFuture;
8use http::{Request, StatusCode, Uri};
9use http_body::{Body as _, Collected};
10use hyper::Body;
11use snafu::ResultExt;
12use tower::Service;
13use vector_lib::{
14    event::{EventFinalizers, EventStatus, Finalizable},
15    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
16    stream::DriverResponse,
17};
18
19use crate::{
20    http::{BuildRequestSnafu, CallRequestSnafu, HttpClient, HttpError},
21    sinks::util::retries::{RetryAction, RetryLogic},
22};
23
24#[derive(Debug, Default, Clone)]
25pub struct TraceApiRetry;
26
27impl RetryLogic for TraceApiRetry {
28    type Error = HttpError;
29    type Request = TraceApiRequest;
30    type Response = TraceApiResponse;
31
32    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
33        true
34    }
35
36    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
37        let status = response.status_code;
38        match status {
39            // Use the same status code/retry policy as the Trace agent, additionally retrying
40            // forbidden requests.
41            //
42            // This retry logic will be expanded further, but specifically retrying unauthorized
43            // requests for now. I verified using `curl` that `403` is the response code for this.
44            //
45            // https://github.com/vectordotdev/vector/issues/10870
46            // https://github.com/vectordotdev/vector/issues/12220
47            StatusCode::FORBIDDEN => RetryAction::Retry("forbidden".into()),
48            StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
49            _ if status.is_server_error() => RetryAction::Retry(
50                format!("{}: {}", status, String::from_utf8_lossy(&response.body)).into(),
51            ),
52            _ if status.is_success() => RetryAction::Successful,
53            _ => RetryAction::DontRetry(format!("response status: {status}").into()),
54        }
55    }
56}
57
58#[derive(Debug, Clone)]
59pub struct TraceApiRequest {
60    pub body: Bytes,
61    pub headers: BTreeMap<String, String>,
62    pub finalizers: EventFinalizers,
63    pub uri: Uri,
64    pub uncompressed_size: usize,
65    pub metadata: RequestMetadata,
66}
67
68impl TraceApiRequest {
69    pub fn into_http_request(self) -> http::Result<Request<Body>> {
70        let mut request = Request::post(self.uri);
71        for (k, v) in self.headers.iter() {
72            request = request.header(k, v);
73        }
74        request.body(Body::from(self.body))
75    }
76}
77
78impl Finalizable for TraceApiRequest {
79    fn take_finalizers(&mut self) -> EventFinalizers {
80        std::mem::take(&mut self.finalizers)
81    }
82}
83
84impl MetaDescriptive for TraceApiRequest {
85    fn get_metadata(&self) -> &RequestMetadata {
86        &self.metadata
87    }
88
89    fn metadata_mut(&mut self) -> &mut RequestMetadata {
90        &mut self.metadata
91    }
92}
93
94#[derive(Debug)]
95pub struct TraceApiResponse {
96    status_code: StatusCode,
97    body: Bytes,
98    byte_size: GroupedCountByteSize,
99    uncompressed_size: usize,
100}
101
102impl DriverResponse for TraceApiResponse {
103    fn event_status(&self) -> EventStatus {
104        if self.status_code.is_success() {
105            EventStatus::Delivered
106        } else if self.status_code.is_client_error() {
107            EventStatus::Rejected
108        } else {
109            EventStatus::Errored
110        }
111    }
112
113    fn events_sent(&self) -> &GroupedCountByteSize {
114        &self.byte_size
115    }
116
117    fn bytes_sent(&self) -> Option<usize> {
118        Some(self.uncompressed_size)
119    }
120}
121
122/// Wrapper for the Datadog API.
123///
124/// Provides a `tower::Service` for the Datadog Traces API, allowing it to be
125/// composed within a Tower "stack", such that we can easily and transparently
126/// provide retries, concurrency limits, rate limits, and more.
127#[derive(Debug, Clone)]
128pub struct TraceApiService {
129    client: HttpClient,
130}
131
132impl TraceApiService {
133    pub const fn new(client: HttpClient) -> Self {
134        Self { client }
135    }
136}
137
138impl Service<TraceApiRequest> for TraceApiService {
139    type Response = TraceApiResponse;
140    type Error = HttpError;
141    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
142
143    // Emission of Error internal event is handled upstream by the caller
144    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
145        self.client.poll_ready(cx)
146    }
147
148    // Emission of Error internal event is handled upstream by the caller
149    fn call(&mut self, mut request: TraceApiRequest) -> Self::Future {
150        let client = self.client.clone();
151
152        Box::pin(async move {
153            let metadata = std::mem::take(request.metadata_mut());
154            let byte_size = metadata.into_events_estimated_json_encoded_byte_size();
155            let uncompressed_size = request.uncompressed_size;
156            let http_request = request.into_http_request().context(BuildRequestSnafu)?;
157
158            let response = client.send(http_request).await?;
159            let (parts, body) = response.into_parts();
160            let mut body = body
161                .collect()
162                .await
163                .map(Collected::aggregate)
164                .context(CallRequestSnafu)?;
165            let body = body.copy_to_bytes(body.remaining());
166
167            Ok(TraceApiResponse {
168                status_code: parts.status,
169                body,
170                byte_size,
171                uncompressed_size,
172            })
173        })
174    }
175}