vector/sinks/datadog/traces/
service.rs

1use std::{
2    collections::BTreeMap,
3    task::{Context, Poll},
4};
5
6use bytes::{Buf, Bytes};
7use futures::future::BoxFuture;
8use http::{Request, StatusCode, Uri};
9use hyper::Body;
10use snafu::ResultExt;
11use tower::Service;
12use vector_lib::event::{EventFinalizers, EventStatus, Finalizable};
13use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
14use vector_lib::stream::DriverResponse;
15
16use crate::{
17    http::{BuildRequestSnafu, CallRequestSnafu, HttpClient, HttpError},
18    sinks::util::retries::{RetryAction, RetryLogic},
19};
20
21#[derive(Debug, Default, Clone)]
22pub struct TraceApiRetry;
23
24impl RetryLogic for TraceApiRetry {
25    type Error = HttpError;
26    type Request = TraceApiRequest;
27    type Response = TraceApiResponse;
28
29    fn is_retriable_error(&self, _error: &Self::Error) -> bool {
30        true
31    }
32
33    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
34        let status = response.status_code;
35        match status {
36            // Use the same status code/retry policy as the Trace agent, additionally retrying
37            // forbidden requests.
38            //
39            // This retry logic will be expanded further, but specifically retrying unauthorized
40            // requests for now. I verified using `curl` that `403` is the response code for this.
41            //
42            // https://github.com/vectordotdev/vector/issues/10870
43            // https://github.com/vectordotdev/vector/issues/12220
44            StatusCode::FORBIDDEN => RetryAction::Retry("forbidden".into()),
45            StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
46            _ if status.is_server_error() => RetryAction::Retry(
47                format!("{}: {}", status, String::from_utf8_lossy(&response.body)).into(),
48            ),
49            _ if status.is_success() => RetryAction::Successful,
50            _ => RetryAction::DontRetry(format!("response status: {status}").into()),
51        }
52    }
53}
54
55#[derive(Debug, Clone)]
56pub struct TraceApiRequest {
57    pub body: Bytes,
58    pub headers: BTreeMap<String, String>,
59    pub finalizers: EventFinalizers,
60    pub uri: Uri,
61    pub uncompressed_size: usize,
62    pub metadata: RequestMetadata,
63}
64
65impl TraceApiRequest {
66    pub fn into_http_request(self) -> http::Result<Request<Body>> {
67        let mut request = Request::post(self.uri);
68        for (k, v) in self.headers.iter() {
69            request = request.header(k, v);
70        }
71        request.body(Body::from(self.body))
72    }
73}
74
75impl Finalizable for TraceApiRequest {
76    fn take_finalizers(&mut self) -> EventFinalizers {
77        std::mem::take(&mut self.finalizers)
78    }
79}
80
81impl MetaDescriptive for TraceApiRequest {
82    fn get_metadata(&self) -> &RequestMetadata {
83        &self.metadata
84    }
85
86    fn metadata_mut(&mut self) -> &mut RequestMetadata {
87        &mut self.metadata
88    }
89}
90
91#[derive(Debug)]
92pub struct TraceApiResponse {
93    status_code: StatusCode,
94    body: Bytes,
95    byte_size: GroupedCountByteSize,
96    uncompressed_size: usize,
97}
98
99impl DriverResponse for TraceApiResponse {
100    fn event_status(&self) -> EventStatus {
101        if self.status_code.is_success() {
102            EventStatus::Delivered
103        } else if self.status_code.is_client_error() {
104            EventStatus::Rejected
105        } else {
106            EventStatus::Errored
107        }
108    }
109
110    fn events_sent(&self) -> &GroupedCountByteSize {
111        &self.byte_size
112    }
113
114    fn bytes_sent(&self) -> Option<usize> {
115        Some(self.uncompressed_size)
116    }
117}
118
119/// Wrapper for the Datadog API.
120///
121/// Provides a `tower::Service` for the Datadog Traces API, allowing it to be
122/// composed within a Tower "stack", such that we can easily and transparently
123/// provide retries, concurrency limits, rate limits, and more.
124#[derive(Debug, Clone)]
125pub struct TraceApiService {
126    client: HttpClient,
127}
128
129impl TraceApiService {
130    pub const fn new(client: HttpClient) -> Self {
131        Self { client }
132    }
133}
134
135impl Service<TraceApiRequest> for TraceApiService {
136    type Response = TraceApiResponse;
137    type Error = HttpError;
138    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
139
140    // Emission of Error internal event is handled upstream by the caller
141    fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
142        self.client.poll_ready(cx)
143    }
144
145    // Emission of Error internal event is handled upstream by the caller
146    fn call(&mut self, mut request: TraceApiRequest) -> Self::Future {
147        let client = self.client.clone();
148
149        Box::pin(async move {
150            let metadata = std::mem::take(request.metadata_mut());
151            let byte_size = metadata.into_events_estimated_json_encoded_byte_size();
152            let uncompressed_size = request.uncompressed_size;
153            let http_request = request.into_http_request().context(BuildRequestSnafu)?;
154
155            let response = client.send(http_request).await?;
156            let (parts, body) = response.into_parts();
157            let mut body = hyper::body::aggregate(body)
158                .await
159                .context(CallRequestSnafu)?;
160            let body = body.copy_to_bytes(body.remaining());
161
162            Ok(TraceApiResponse {
163                status_code: parts.status,
164                body,
165                byte_size,
166                uncompressed_size,
167            })
168        })
169    }
170}