vector/sinks/datadog/traces/
service.rs1use std::{
2 collections::BTreeMap,
3 task::{Context, Poll},
4};
5
6use bytes::{Buf, Bytes};
7use futures::future::BoxFuture;
8use http::{Request, StatusCode, Uri};
9use hyper::Body;
10use snafu::ResultExt;
11use tower::Service;
12use vector_lib::event::{EventFinalizers, EventStatus, Finalizable};
13use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
14use vector_lib::stream::DriverResponse;
15
16use crate::{
17 http::{BuildRequestSnafu, CallRequestSnafu, HttpClient, HttpError},
18 sinks::util::retries::{RetryAction, RetryLogic},
19};
20
21#[derive(Debug, Default, Clone)]
22pub struct TraceApiRetry;
23
24impl RetryLogic for TraceApiRetry {
25 type Error = HttpError;
26 type Request = TraceApiRequest;
27 type Response = TraceApiResponse;
28
29 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
30 true
31 }
32
33 fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
34 let status = response.status_code;
35 match status {
36 StatusCode::FORBIDDEN => RetryAction::Retry("forbidden".into()),
45 StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
46 _ if status.is_server_error() => RetryAction::Retry(
47 format!("{}: {}", status, String::from_utf8_lossy(&response.body)).into(),
48 ),
49 _ if status.is_success() => RetryAction::Successful,
50 _ => RetryAction::DontRetry(format!("response status: {status}").into()),
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
56pub struct TraceApiRequest {
57 pub body: Bytes,
58 pub headers: BTreeMap<String, String>,
59 pub finalizers: EventFinalizers,
60 pub uri: Uri,
61 pub uncompressed_size: usize,
62 pub metadata: RequestMetadata,
63}
64
65impl TraceApiRequest {
66 pub fn into_http_request(self) -> http::Result<Request<Body>> {
67 let mut request = Request::post(self.uri);
68 for (k, v) in self.headers.iter() {
69 request = request.header(k, v);
70 }
71 request.body(Body::from(self.body))
72 }
73}
74
75impl Finalizable for TraceApiRequest {
76 fn take_finalizers(&mut self) -> EventFinalizers {
77 std::mem::take(&mut self.finalizers)
78 }
79}
80
81impl MetaDescriptive for TraceApiRequest {
82 fn get_metadata(&self) -> &RequestMetadata {
83 &self.metadata
84 }
85
86 fn metadata_mut(&mut self) -> &mut RequestMetadata {
87 &mut self.metadata
88 }
89}
90
91#[derive(Debug)]
92pub struct TraceApiResponse {
93 status_code: StatusCode,
94 body: Bytes,
95 byte_size: GroupedCountByteSize,
96 uncompressed_size: usize,
97}
98
99impl DriverResponse for TraceApiResponse {
100 fn event_status(&self) -> EventStatus {
101 if self.status_code.is_success() {
102 EventStatus::Delivered
103 } else if self.status_code.is_client_error() {
104 EventStatus::Rejected
105 } else {
106 EventStatus::Errored
107 }
108 }
109
110 fn events_sent(&self) -> &GroupedCountByteSize {
111 &self.byte_size
112 }
113
114 fn bytes_sent(&self) -> Option<usize> {
115 Some(self.uncompressed_size)
116 }
117}
118
119#[derive(Debug, Clone)]
125pub struct TraceApiService {
126 client: HttpClient,
127}
128
129impl TraceApiService {
130 pub const fn new(client: HttpClient) -> Self {
131 Self { client }
132 }
133}
134
135impl Service<TraceApiRequest> for TraceApiService {
136 type Response = TraceApiResponse;
137 type Error = HttpError;
138 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
139
140 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
142 self.client.poll_ready(cx)
143 }
144
145 fn call(&mut self, mut request: TraceApiRequest) -> Self::Future {
147 let client = self.client.clone();
148
149 Box::pin(async move {
150 let metadata = std::mem::take(request.metadata_mut());
151 let byte_size = metadata.into_events_estimated_json_encoded_byte_size();
152 let uncompressed_size = request.uncompressed_size;
153 let http_request = request.into_http_request().context(BuildRequestSnafu)?;
154
155 let response = client.send(http_request).await?;
156 let (parts, body) = response.into_parts();
157 let mut body = hyper::body::aggregate(body)
158 .await
159 .context(CallRequestSnafu)?;
160 let body = body.copy_to_bytes(body.remaining());
161
162 Ok(TraceApiResponse {
163 status_code: parts.status,
164 body,
165 byte_size,
166 uncompressed_size,
167 })
168 })
169 }
170}