vector/sinks/datadog/logs/
service.rs1use std::{
2 collections::BTreeMap,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::future::BoxFuture;
9use http::{
10 HeaderValue, Request, Uri,
11 header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
12};
13use hyper::Body;
14use tower::Service;
15use tracing::Instrument;
16use vector_lib::{
17 event::{EventFinalizers, EventStatus, Finalizable},
18 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
19 stream::DriverResponse,
20};
21
22use crate::{
23 http::HttpClient,
24 sinks::{
25 datadog::DatadogApiError,
26 util::{
27 Compression,
28 http::{OrderedHeaderName, validate_headers},
29 retries::RetryLogic,
30 },
31 },
32};
33
34#[derive(Debug, Default, Clone)]
35pub struct LogApiRetry;
36
37impl RetryLogic for LogApiRetry {
38 type Error = DatadogApiError;
39 type Request = LogApiRequest;
40 type Response = LogApiResponse;
41
42 fn is_retriable_error(&self, error: &Self::Error) -> bool {
43 error.is_retriable()
44 }
45}
46
47#[derive(Debug, Clone)]
48pub struct LogApiRequest {
49 pub api_key: Arc<str>,
50 pub compression: Compression,
51 pub body: Bytes,
52 pub finalizers: EventFinalizers,
53 pub uncompressed_size: usize,
54 pub metadata: RequestMetadata,
55}
56
57impl Finalizable for LogApiRequest {
58 fn take_finalizers(&mut self) -> EventFinalizers {
59 std::mem::take(&mut self.finalizers)
60 }
61}
62
63impl MetaDescriptive for LogApiRequest {
64 fn get_metadata(&self) -> &RequestMetadata {
65 &self.metadata
66 }
67
68 fn metadata_mut(&mut self) -> &mut RequestMetadata {
69 &mut self.metadata
70 }
71}
72
73#[derive(Debug)]
74pub struct LogApiResponse {
75 event_status: EventStatus,
76 events_byte_size: GroupedCountByteSize,
77 raw_byte_size: usize,
78}
79
80impl DriverResponse for LogApiResponse {
81 fn event_status(&self) -> EventStatus {
82 self.event_status
83 }
84
85 fn events_sent(&self) -> &GroupedCountByteSize {
86 &self.events_byte_size
87 }
88
89 fn bytes_sent(&self) -> Option<usize> {
90 Some(self.raw_byte_size)
91 }
92}
93
94#[derive(Debug, Clone)]
100pub struct LogApiService {
101 client: HttpClient,
102 uri: Uri,
103 user_provided_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
104 dd_evp_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
105}
106
107impl LogApiService {
108 pub fn new(
109 client: HttpClient,
110 uri: Uri,
111 headers: BTreeMap<String, String>,
112 dd_evp_origin: String,
113 ) -> crate::Result<Self> {
114 let user_provided_headers = validate_headers(&headers)?;
115
116 let dd_evp_headers: BTreeMap<String, String> = [
117 ("DD-EVP-ORIGIN".to_string(), dd_evp_origin),
118 ("DD-EVP-ORIGIN-VERSION".to_string(), crate::get_version()),
119 ]
120 .into_iter()
121 .collect();
122 let dd_evp_headers = validate_headers(&dd_evp_headers)?;
123
124 Ok(Self {
125 client,
126 uri,
127 user_provided_headers,
128 dd_evp_headers,
129 })
130 }
131}
132
133impl Service<LogApiRequest> for LogApiService {
134 type Response = LogApiResponse;
135 type Error = DatadogApiError;
136 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
137
138 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
140 Poll::Ready(Ok(()))
141 }
142
143 fn call(&mut self, mut request: LogApiRequest) -> Self::Future {
145 let mut client = self.client.clone();
146 let http_request = Request::post(&self.uri)
147 .header(CONTENT_TYPE, "application/json")
148 .header("DD-API-KEY", request.api_key.to_string());
149
150 let http_request = if let Some(ce) = request.compression.content_encoding() {
151 http_request.header(CONTENT_ENCODING, ce)
152 } else {
153 http_request
154 };
155
156 let metadata = std::mem::take(request.metadata_mut());
157 let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
158 let raw_byte_size = request.uncompressed_size;
159
160 let mut http_request = http_request.header(CONTENT_LENGTH, request.body.len());
161
162 if let Some(headers) = http_request.headers_mut() {
163 for (name, value) in &self.user_provided_headers {
164 headers.insert(name.inner(), value.clone());
166 }
167 for (name, value) in &self.dd_evp_headers {
169 headers.insert(name.inner(), value.clone());
170 }
171 }
172
173 let http_request = http_request
174 .body(Body::from(request.body))
175 .expect("building HTTP request failed unexpectedly");
176
177 Box::pin(async move {
178 DatadogApiError::from_result(client.call(http_request).in_current_span().await).map(
179 |_| LogApiResponse {
180 event_status: EventStatus::Delivered,
181 events_byte_size,
182 raw_byte_size,
183 },
184 )
185 })
186 }
187}