vector/sinks/datadog/logs/
service.rs

1use std::{
2    sync::Arc,
3    task::{Context, Poll},
4};
5
6use bytes::Bytes;
7use futures::future::BoxFuture;
8use http::{
9    header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
10    HeaderValue, Request, Uri,
11};
12use hyper::Body;
13use std::collections::BTreeMap;
14use tower::Service;
15use tracing::Instrument;
16use vector_lib::event::{EventFinalizers, EventStatus, Finalizable};
17use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
18use vector_lib::stream::DriverResponse;
19
20use crate::{
21    http::HttpClient,
22    sinks::util::{retries::RetryLogic, Compression},
23    sinks::{
24        datadog::DatadogApiError,
25        util::http::{validate_headers, OrderedHeaderName},
26    },
27};
28
29#[derive(Debug, Default, Clone)]
30pub struct LogApiRetry;
31
32impl RetryLogic for LogApiRetry {
33    type Error = DatadogApiError;
34    type Request = LogApiRequest;
35    type Response = LogApiResponse;
36
37    fn is_retriable_error(&self, error: &Self::Error) -> bool {
38        error.is_retriable()
39    }
40}
41
42#[derive(Debug, Clone)]
43pub struct LogApiRequest {
44    pub api_key: Arc<str>,
45    pub compression: Compression,
46    pub body: Bytes,
47    pub finalizers: EventFinalizers,
48    pub uncompressed_size: usize,
49    pub metadata: RequestMetadata,
50}
51
52impl Finalizable for LogApiRequest {
53    fn take_finalizers(&mut self) -> EventFinalizers {
54        std::mem::take(&mut self.finalizers)
55    }
56}
57
58impl MetaDescriptive for LogApiRequest {
59    fn get_metadata(&self) -> &RequestMetadata {
60        &self.metadata
61    }
62
63    fn metadata_mut(&mut self) -> &mut RequestMetadata {
64        &mut self.metadata
65    }
66}
67
68#[derive(Debug)]
69pub struct LogApiResponse {
70    event_status: EventStatus,
71    events_byte_size: GroupedCountByteSize,
72    raw_byte_size: usize,
73}
74
75impl DriverResponse for LogApiResponse {
76    fn event_status(&self) -> EventStatus {
77        self.event_status
78    }
79
80    fn events_sent(&self) -> &GroupedCountByteSize {
81        &self.events_byte_size
82    }
83
84    fn bytes_sent(&self) -> Option<usize> {
85        Some(self.raw_byte_size)
86    }
87}
88
89/// Wrapper for the Datadog API.
90///
91/// Provides a `tower::Service` for the Datadog Logs API, allowing it to be
92/// composed within a Tower "stack", such that we can easily and transparently
93/// provide retries, concurrency limits, rate limits, and more.
94#[derive(Debug, Clone)]
95pub struct LogApiService {
96    client: HttpClient,
97    uri: Uri,
98    user_provided_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
99    dd_evp_headers: BTreeMap<OrderedHeaderName, HeaderValue>,
100}
101
102impl LogApiService {
103    pub fn new(
104        client: HttpClient,
105        uri: Uri,
106        headers: BTreeMap<String, String>,
107        dd_evp_origin: String,
108    ) -> crate::Result<Self> {
109        let user_provided_headers = validate_headers(&headers)?;
110
111        let dd_evp_headers: BTreeMap<String, String> = [
112            ("DD-EVP-ORIGIN".to_string(), dd_evp_origin),
113            ("DD-EVP-ORIGIN-VERSION".to_string(), crate::get_version()),
114        ]
115        .into_iter()
116        .collect();
117        let dd_evp_headers = validate_headers(&dd_evp_headers)?;
118
119        Ok(Self {
120            client,
121            uri,
122            user_provided_headers,
123            dd_evp_headers,
124        })
125    }
126}
127
128impl Service<LogApiRequest> for LogApiService {
129    type Response = LogApiResponse;
130    type Error = DatadogApiError;
131    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
132
133    // Emission of Error internal event is handled upstream by the caller
134    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
135        Poll::Ready(Ok(()))
136    }
137
138    // Emission of Error internal event is handled upstream by the caller
139    fn call(&mut self, mut request: LogApiRequest) -> Self::Future {
140        let mut client = self.client.clone();
141        let http_request = Request::post(&self.uri)
142            .header(CONTENT_TYPE, "application/json")
143            .header("DD-API-KEY", request.api_key.to_string());
144
145        let http_request = if let Some(ce) = request.compression.content_encoding() {
146            http_request.header(CONTENT_ENCODING, ce)
147        } else {
148            http_request
149        };
150
151        let metadata = std::mem::take(request.metadata_mut());
152        let events_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
153        let raw_byte_size = request.uncompressed_size;
154
155        let mut http_request = http_request.header(CONTENT_LENGTH, request.body.len());
156
157        if let Some(headers) = http_request.headers_mut() {
158            for (name, value) in &self.user_provided_headers {
159                // Replace rather than append to any existing header values
160                headers.insert(name.inner(), value.clone());
161            }
162            // Set DD EVP headers last so that they cannot be overridden.
163            for (name, value) in &self.dd_evp_headers {
164                headers.insert(name.inner(), value.clone());
165            }
166        }
167
168        let http_request = http_request
169            .body(Body::from(request.body))
170            .expect("building HTTP request failed unexpectedly");
171
172        Box::pin(async move {
173            DatadogApiError::from_result(client.call(http_request).in_current_span().await).map(
174                |_| LogApiResponse {
175                    event_status: EventStatus::Delivered,
176                    events_byte_size,
177                    raw_byte_size,
178                },
179            )
180        })
181    }
182}