vector/sinks/datadog/logs/
service.rs1use std::{
2 sync::Arc,
3 task::{Context, Poll},
4};
5
6use bytes::Bytes;
7use futures::future::BoxFuture;
8use http::{
9 header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
10 HeaderValue, Request, Uri,
11};
12use hyper::Body;
13use std::collections::BTreeMap;
14use tower::Service;
15use tracing::Instrument;
16use vector_lib::event::{EventFinalizers, EventStatus, Finalizable};
17use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
18use vector_lib::stream::DriverResponse;
19
20use crate::{
21 http::HttpClient,
22 sinks::util::{retries::RetryLogic, Compression},
23 sinks::{
24 datadog::DatadogApiError,
25 util::http::{validate_headers, OrderedHeaderName},
26 },
27};
28
29#[derive(Debug, Default, Clone)]
30pub struct LogApiRetry;
31
32impl RetryLogic for LogApiRetry {
33 type Error = DatadogApiError;
34 type Request = LogApiRequest;
35 type Response = LogApiResponse;
36
37 fn is_retriable_error(&self, error: &Self::Error) -> bool {
38 error.is_retriable()
39 }
40}
41
42#[derive(Debug, Clone)]
43pub struct LogApiRequest {
44 pub api_key: Arc<str>,
45 pub compression: Compression,
46 pub body: Bytes,
47 pub finalizers: EventFinalizers,
48 pub uncompressed_size: usize,
49 pub metadata: RequestMetadata,
50}
51
52impl Finalizable for LogApiRequest {
53 fn take_finalizers(&mut self) -> EventFinalizers {
54 std::mem::take(&mut self.finalizers)
55 }
56}
57
58impl MetaDescriptive for LogApiRequest {
59 fn get_metadata(&self) -> &RequestMetadata {
60 &self.metadata
61 }
62
63 fn metadata_mut(&mut self) -> &mut RequestMetadata {
64 &mut self.metadata
65 }
66}
67
68#[derive(Debug)]
69pub struct LogApiResponse {
70 event_status: EventStatus,
71 events_byte_size: GroupedCountByteSize,
72 raw_byte_size: usize,
73}
74
75impl DriverResponse for LogApiResponse {
76 fn event_status(&self) -> EventStatus {
77 self.event_status
78 }
79
80 fn events_sent(&self) -> &GroupedCountByteSize {
81 &self.events_byte_size
82 }
83
84 fn bytes_sent(&self) -> Option<usize> {
85 Some(self.raw_byte_size)
86 }
87}
88
89#[derive(Debug, Clone)]
95pub struct LogApiService {
96 client: HttpClient,
97 uri: Uri,
98 user_provided_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
99 dd_evp_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
100}
101
102impl LogApiService {
103 pub fn new(
104 client: HttpClient,
105 uri: Uri,
106 headers: BTreeMap<String, String>,
107 dd_evp_origin: String,
108 ) -> crate::Result<Self> {
109 let user_provided_headers = validate_headers(&headers)?;
110
111 let dd_evp_headers: BTreeMap<String, String> = [
112 ("DD-EVP-ORIGIN".to_string(), dd_evp_origin),
113 ("DD-EVP-ORIGIN-VERSION".to_string(), crate::get_version()),
114 ]
115 .into_iter()
116 .collect();
117 let dd_evp_headers = validate_headers(&dd_evp_headers)?;
118
119 Ok(Self {
120 client,
121 uri,
122 user_provided_headers,
123 dd_evp_headers,
124 })
125 }
126}
127
128impl Service<LogApiRequest> for LogApiService {
129 type Response = LogApiResponse;
130 type Error = DatadogApiError;
131 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
132
133 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
135 Poll::Ready(Ok(()))
136 }
137
138 fn call(&mut self, mut request: LogApiRequest) -> Self::Future {
140 let mut client = self.client.clone();
141 let http_request = Request::post(&self.uri)
142 .header(CONTENT_TYPE, "application/json")
143 .header("DD-API-KEY", request.api_key.to_string());
144
145 let http_request = if let Some(ce) = request.compression.content_encoding() {
146 http_request.header(CONTENT_ENCODING, ce)
147 } else {
148 http_request
149 };
150
151 let metadata = std::mem::take(request.metadata_mut());
152 let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
153 let raw_byte_size = request.uncompressed_size;
154
155 let mut http_request = http_request.header(CONTENT_LENGTH, request.body.len());
156
157 if let Some(headers) = http_request.headers_mut() {
158 for (name, value) in &self.user_provided_headers {
159 headers.insert(name.inner(), value.clone());
161 }
162 for (name, value) in &self.dd_evp_headers {
164 headers.insert(name.inner(), value.clone());
165 }
166 }
167
168 let http_request = http_request
169 .body(Body::from(request.body))
170 .expect("building HTTP request failed unexpectedly");
171
172 Box::pin(async move {
173 DatadogApiError::from_result(client.call(http_request).in_current_span().await).map(
174 |_| LogApiResponse {
175 event_status: EventStatus::Delivered,
176 events_byte_size,
177 raw_byte_size,
178 },
179 )
180 })
181 }
182}