vector/sinks/datadog/logs/
service.rs

1use std::{
2    collections::BTreeMap,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use bytes::Bytes;
8use futures::future::BoxFuture;
9use http::{
10    HeaderValue, Request, Uri,
11    header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
12};
13use hyper::Body;
14use tower::Service;
15use tracing::Instrument;
16use vector_lib::{
17    event::{EventFinalizers, EventStatus, Finalizable},
18    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
19    stream::DriverResponse,
20};
21
22use crate::{
23    http::HttpClient,
24    sinks::{
25        datadog::DatadogApiError,
26        util::{
27            Compression,
28            http::{OrderedHeaderName, validate_headers},
29            retries::RetryLogic,
30        },
31    },
32};
33
34#[derive(Debug, Default, Clone)]
35pub struct LogApiRetry;
36
37impl RetryLogic for LogApiRetry {
38    type Error = DatadogApiError;
39    type Request = LogApiRequest;
40    type Response = LogApiResponse;
41
42    fn is_retriable_error(&self, error: &Self::Error) -> bool {
43        error.is_retriable()
44    }
45}
46
47#[derive(Debug, Clone)]
48pub struct LogApiRequest {
49    pub api_key: Arc<str>,
50    pub compression: Compression,
51    pub body: Bytes,
52    pub finalizers: EventFinalizers,
53    pub uncompressed_size: usize,
54    pub metadata: RequestMetadata,
55}
56
57impl Finalizable for LogApiRequest {
58    fn take_finalizers(&mut self) -> EventFinalizers {
59        std::mem::take(&mut self.finalizers)
60    }
61}
62
63impl MetaDescriptive for LogApiRequest {
64    fn get_metadata(&self) -> &RequestMetadata {
65        &self.metadata
66    }
67
68    fn metadata_mut(&mut self) -> &mut RequestMetadata {
69        &mut self.metadata
70    }
71}
72
73#[derive(Debug)]
74pub struct LogApiResponse {
75    event_status: EventStatus,
76    events_byte_size: GroupedCountByteSize,
77    raw_byte_size: usize,
78}
79
80impl DriverResponse for LogApiResponse {
81    fn event_status(&self) -> EventStatus {
82        self.event_status
83    }
84
85    fn events_sent(&self) -> &GroupedCountByteSize {
86        &self.events_byte_size
87    }
88
89    fn bytes_sent(&self) -> Option<usize> {
90        Some(self.raw_byte_size)
91    }
92}
93
94/// Wrapper for the Datadog API.
95///
96/// Provides a `tower::Service` for the Datadog Logs API, allowing it to be
97/// composed within a Tower "stack", such that we can easily and transparently
98/// provide retries, concurrency limits, rate limits, and more.
99#[derive(Debug, Clone)]
100pub struct LogApiService {
101    client: HttpClient,
102    uri: Uri,
103    user_provided_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
104    dd_evp_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
105}
106
107impl LogApiService {
108    pub fn new(
109        client: HttpClient,
110        uri: Uri,
111        headers: BTreeMap<String, String>,
112        dd_evp_origin: String,
113    ) -> crate::Result<Self> {
114        let user_provided_headers = validate_headers(&headers)?;
115
116        let dd_evp_headers: BTreeMap<String, String> = [
117            ("DD-EVP-ORIGIN".to_string(), dd_evp_origin),
118            ("DD-EVP-ORIGIN-VERSION".to_string(), crate::get_version()),
119        ]
120        .into_iter()
121        .collect();
122        let dd_evp_headers = validate_headers(&dd_evp_headers)?;
123
124        Ok(Self {
125            client,
126            uri,
127            user_provided_headers,
128            dd_evp_headers,
129        })
130    }
131}
132
133impl Service<LogApiRequest> for LogApiService {
134    type Response = LogApiResponse;
135    type Error = DatadogApiError;
136    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
137
138    // Emission of Error internal event is handled upstream by the caller
139    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
140        Poll::Ready(Ok(()))
141    }
142
143    // Emission of Error internal event is handled upstream by the caller
144    fn call(&mut self, mut request: LogApiRequest) -> Self::Future {
145        let mut client = self.client.clone();
146        let http_request = Request::post(&self.uri)
147            .header(CONTENT_TYPE, "application/json")
148            .header("DD-API-KEY", request.api_key.to_string());
149
150        let http_request = if let Some(ce) = request.compression.content_encoding() {
151            http_request.header(CONTENT_ENCODING, ce)
152        } else {
153            http_request
154        };
155
156        let metadata = std::mem::take(request.metadata_mut());
157        let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
158        let raw_byte_size = request.uncompressed_size;
159
160        let mut http_request = http_request.header(CONTENT_LENGTH, request.body.len());
161
162        if let Some(headers) = http_request.headers_mut() {
163            for (name, value) in &self.user_provided_headers {
164                // Replace rather than append to any existing header values
165                headers.insert(name.inner(), value.clone());
166            }
167            // Set DD EVP headers last so that they cannot be overridden.
168            for (name, value) in &self.dd_evp_headers {
169                headers.insert(name.inner(), value.clone());
170            }
171        }
172
173        let http_request = http_request
174            .body(Body::from(request.body))
175            .expect("building HTTP request failed unexpectedly");
176
177        Box::pin(async move {
178            DatadogApiError::from_result(client.call(http_request).in_current_span().await).map(
179                |_| LogApiResponse {
180                    event_status: EventStatus::Delivered,
181                    events_byte_size,
182                    raw_byte_size,
183                },
184            )
185        })
186    }
187}