vector/sinks/datadog/traces/
service.rs1use std::{
2 collections::BTreeMap,
3 task::{Context, Poll},
4};
5
6use bytes::{Buf, Bytes};
7use futures::future::BoxFuture;
8use http::{Request, StatusCode, Uri};
9use http_body::{Body as _, Collected};
10use hyper::Body;
11use snafu::ResultExt;
12use tower::Service;
13use vector_lib::{
14 event::{EventFinalizers, EventStatus, Finalizable},
15 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
16 stream::DriverResponse,
17};
18
19use crate::{
20 http::{BuildRequestSnafu, CallRequestSnafu, HttpClient, HttpError},
21 sinks::util::retries::{RetryAction, RetryLogic},
22};
23
24#[derive(Debug, Default, Clone)]
25pub struct TraceApiRetry;
26
27impl RetryLogic for TraceApiRetry {
28 type Error = HttpError;
29 type Request = TraceApiRequest;
30 type Response = TraceApiResponse;
31
32 fn is_retriable_error(&self, _error: &Self::Error) -> bool {
33 true
34 }
35
36 fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
37 let status = response.status_code;
38 match status {
39 StatusCode::FORBIDDEN => RetryAction::Retry("forbidden".into()),
48 StatusCode::REQUEST_TIMEOUT => RetryAction::Retry("request timeout".into()),
49 _ if status.is_server_error() => RetryAction::Retry(
50 format!("{}: {}", status, String::from_utf8_lossy(&response.body)).into(),
51 ),
52 _ if status.is_success() => RetryAction::Successful,
53 _ => RetryAction::DontRetry(format!("response status: {status}").into()),
54 }
55 }
56}
57
58#[derive(Debug, Clone)]
59pub struct TraceApiRequest {
60 pub body: Bytes,
61 pub headers: BTreeMap<String, String>,
62 pub finalizers: EventFinalizers,
63 pub uri: Uri,
64 pub uncompressed_size: usize,
65 pub metadata: RequestMetadata,
66}
67
68impl TraceApiRequest {
69 pub fn into_http_request(self) -> http::Result<Request<Body>> {
70 let mut request = Request::post(self.uri);
71 for (k, v) in self.headers.iter() {
72 request = request.header(k, v);
73 }
74 request.body(Body::from(self.body))
75 }
76}
77
78impl Finalizable for TraceApiRequest {
79 fn take_finalizers(&mut self) -> EventFinalizers {
80 std::mem::take(&mut self.finalizers)
81 }
82}
83
84impl MetaDescriptive for TraceApiRequest {
85 fn get_metadata(&self) -> &RequestMetadata {
86 &self.metadata
87 }
88
89 fn metadata_mut(&mut self) -> &mut RequestMetadata {
90 &mut self.metadata
91 }
92}
93
94#[derive(Debug)]
95pub struct TraceApiResponse {
96 status_code: StatusCode,
97 body: Bytes,
98 byte_size: GroupedCountByteSize,
99 uncompressed_size: usize,
100}
101
102impl DriverResponse for TraceApiResponse {
103 fn event_status(&self) -> EventStatus {
104 if self.status_code.is_success() {
105 EventStatus::Delivered
106 } else if self.status_code.is_client_error() {
107 EventStatus::Rejected
108 } else {
109 EventStatus::Errored
110 }
111 }
112
113 fn events_sent(&self) -> &GroupedCountByteSize {
114 &self.byte_size
115 }
116
117 fn bytes_sent(&self) -> Option<usize> {
118 Some(self.uncompressed_size)
119 }
120}
121
122#[derive(Debug, Clone)]
128pub struct TraceApiService {
129 client: HttpClient,
130}
131
132impl TraceApiService {
133 pub const fn new(client: HttpClient) -> Self {
134 Self { client }
135 }
136}
137
138impl Service<TraceApiRequest> for TraceApiService {
139 type Response = TraceApiResponse;
140 type Error = HttpError;
141 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
142
143 fn poll_ready(&mut self, cx: &mut Context) -> Poll<Result<(), Self::Error>> {
145 self.client.poll_ready(cx)
146 }
147
148 fn call(&mut self, mut request: TraceApiRequest) -> Self::Future {
150 let client = self.client.clone();
151
152 Box::pin(async move {
153 let metadata = std::mem::take(request.metadata_mut());
154 let byte_size = metadata.into_events_estimated_json_encoded_byte_size();
155 let uncompressed_size = request.uncompressed_size;
156 let http_request = request.into_http_request().context(BuildRequestSnafu)?;
157
158 let response = client.send(http_request).await?;
159 let (parts, body) = response.into_parts();
160 let mut body = body
161 .collect()
162 .await
163 .map(Collected::aggregate)
164 .context(CallRequestSnafu)?;
165 let body = body.copy_to_bytes(body.remaining());
166
167 Ok(TraceApiResponse {
168 status_code: parts.status,
169 body,
170 byte_size,
171 uncompressed_size,
172 })
173 })
174 }
175}