vector/sinks/datadog/traces/
service.rs1use std::{
2 collections::BTreeMap,
3 task::{Context, Poll},
4};
5
6use bytes::{Buf, Bytes};
7use futures::future::BoxFuture;
8use http::{Request, StatusCode, Uri};
9use hyper::Body;
10use snafu::ResultExt;
11use tower::Service;
12use vector_lib::{
13 event::{EventFinalizers, EventStatus, Finalizable},
14 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
15 stream::DriverResponse,
16};
17
18use crate::{
19 http::{BuildRequestSnafu, CallRequestSnafu, HttpClient, HttpError},
20 sinks::util::retries::{RetryAction, RetryLogic},
21};
22
23#[derive(Debug, Default, Clone)]
24pub struct TraceApiRetry;
25
26impl RetryLogic for TraceApiRetry {
27 type Error = HttpError;
28 type Request = TraceApiRequest;
29 type Response = TraceApiResponse;
30
31 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
32 true
33 }
34
35 fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
36 let status = response.status_code;
37 match status {
38 StatusCode::FORBIDDEN => RetryAction::Retry("forbidden".into()),
47 StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
48 _ if status.is_server_error() => RetryAction::Retry(
49 format!("{}: {}", status, String::from_utf8_lossy(&response.body)).into(),
50 ),
51 _ if status.is_success() => RetryAction::Successful,
52 _ => RetryAction::DontRetry(format!("response status: {status}").into()),
53 }
54 }
55}
56
57#[derive(Debug, Clone)]
58pub struct TraceApiRequest {
59 pub body: Bytes,
60 pub headers: BTreeMap<String, String>,
61 pub finalizers: EventFinalizers,
62 pub uri: Uri,
63 pub uncompressed_size: usize,
64 pub metadata: RequestMetadata,
65}
66
67impl TraceApiRequest {
68 pub fn into_http_request(self) -> http::Result<Request<Body>> {
69 let mut request = Request::post(self.uri);
70 for (k, v) in self.headers.iter() {
71 request = request.header(k, v);
72 }
73 request.body(Body::from(self.body))
74 }
75}
76
77impl Finalizable for TraceApiRequest {
78 fn take_finalizers(&mut self) -> EventFinalizers {
79 std::mem::take(&mut self.finalizers)
80 }
81}
82
83impl MetaDescriptive for TraceApiRequest {
84 fn get_metadata(&self) -> &RequestMetadata {
85 &self.metadata
86 }
87
88 fn metadata_mut(&mut self) -> &mut RequestMetadata {
89 &mut self.metadata
90 }
91}
92
93#[derive(Debug)]
94pub struct TraceApiResponse {
95 status_code: StatusCode,
96 body: Bytes,
97 byte_size: GroupedCountByteSize,
98 uncompressed_size: usize,
99}
100
101impl DriverResponse for TraceApiResponse {
102 fn event_status(&self) -> EventStatus {
103 if self.status_code.is_success() {
104 EventStatus::Delivered
105 } else if self.status_code.is_client_error() {
106 EventStatus::Rejected
107 } else {
108 EventStatus::Errored
109 }
110 }
111
112 fn events_sent(&self) -> &GroupedCountByteSize {
113 &self.byte_size
114 }
115
116 fn bytes_sent(&self) -> Option<usize> {
117 Some(self.uncompressed_size)
118 }
119}
120
121#[derive(Debug, Clone)]
127pub struct TraceApiService {
128 client: HttpClient,
129}
130
131impl TraceApiService {
132 pub const fn new(client: HttpClient) -> Self {
133 Self { client }
134 }
135}
136
137impl Service<TraceApiRequest> for TraceApiService {
138 type Response = TraceApiResponse;
139 type Error = HttpError;
140 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
141
142 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
144 self.client.poll_ready(cx)
145 }
146
147 fn call(&mut self, mut request: TraceApiRequest) -> Self::Future {
149 let client = self.client.clone();
150
151 Box::pin(async move {
152 let metadata = std::mem::take(request.metadata_mut());
153 let byte_size = metadata.into_events_estimated_json_encoded_byte_size();
154 let uncompressed_size = request.uncompressed_size;
155 let http_request = request.into_http_request().context(BuildRequestSnafu)?;
156
157 let response = client.send(http_request).await?;
158 let (parts, body) = response.into_parts();
159 let mut body = hyper::body::aggregate(body)
160 .await
161 .context(CallRequestSnafu)?;
162 let body = body.copy_to_bytes(body.remaining());
163
164 Ok(TraceApiResponse {
165 status_code: parts.status,
166 body,
167 byte_size,
168 uncompressed_size,
169 })
170 })
171 }
172}