vector/sinks/datadog/events/
service.rs

1use std::task::{Context, Poll};
2
3use bytes::Bytes;
4use futures::{
5    future,
6    future::{BoxFuture, Ready},
7};
8use http::Request;
9use hyper::Body;
10use tower::{Service, ServiceExt};
11use vector_lib::stream::DriverResponse;
12use vector_lib::{
13    request_metadata::{GroupedCountByteSize, MetaDescriptive},
14    sensitive_string::SensitiveString,
15};
16
17use crate::{
18    event::EventStatus,
19    http::HttpClient,
20    sinks::{
21        datadog::events::request_builder::DatadogEventsRequest,
22        util::{http::HttpBatchService, sink::Response},
23    },
24};
25
26pub struct DatadogEventsResponse {
27    pub(self) event_status: EventStatus,
28    pub http_status: http::StatusCode,
29    pub event_byte_size: GroupedCountByteSize,
30}
31
32impl DriverResponse for DatadogEventsResponse {
33    fn event_status(&self) -> EventStatus {
34        self.event_status
35    }
36
37    fn events_sent(&self) -> &GroupedCountByteSize {
38        &self.event_byte_size
39    }
40
41    fn bytes_sent(&self) -> Option<usize> {
42        // HttpBatchService emits EndpointBytesSend
43        None
44    }
45}
46
47#[derive(Clone)]
48pub struct DatadogEventsService {
49    // TODO: `HttpBatchService` has been deprecated for direct use in sinks.
50    //       This sink should undergo a refactor to utilize the `HttpService`
51    //       instead, which extracts much of the boilerplate code for `Service`.
52    batch_http_service:
53        HttpBatchService<Ready<Result<http::Request<Bytes>, crate::Error>>, DatadogEventsRequest>,
54}
55
56impl DatadogEventsService {
57    pub fn new(
58        endpoint: http::Uri,
59        default_api_key: SensitiveString,
60        http_client: HttpClient<Body>,
61    ) -> Self {
62        let batch_http_service = HttpBatchService::new(http_client, move |req| {
63            let req: DatadogEventsRequest = req;
64
65            let api_key = match req.metadata.api_key.as_ref() {
66                Some(x) => x.as_ref(),
67                None => default_api_key.inner(),
68            };
69
70            let request = Request::post(&endpoint)
71                .header("Content-Type", "application/json")
72                .header("DD-API-KEY", api_key)
73                .header("Content-Length", req.body.len())
74                .body(req.body)
75                .map_err(|x| x.into());
76            future::ready(request)
77        });
78
79        Self { batch_http_service }
80    }
81}
82
83impl Service<DatadogEventsRequest> for DatadogEventsService {
84    type Response = DatadogEventsResponse;
85    type Error = crate::Error;
86    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
87
88    // Emission of Error internal event is handled upstream by the caller
89    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90        Poll::Ready(Ok(()))
91    }
92
93    // Emission of Error internal event is handled upstream by the caller
94    fn call(&mut self, mut req: DatadogEventsRequest) -> Self::Future {
95        let mut http_service = self.batch_http_service.clone();
96
97        Box::pin(async move {
98            let metadata = std::mem::take(req.metadata_mut());
99            http_service.ready().await?;
100            let event_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
101            let http_response = http_service.call(req).await?;
102            let event_status = if http_response.is_successful() {
103                EventStatus::Delivered
104            } else if http_response.is_transient() {
105                EventStatus::Errored
106            } else {
107                EventStatus::Rejected
108            };
109            Ok(DatadogEventsResponse {
110                event_status,
111                http_status: http_response.status(),
112                event_byte_size,
113            })
114        })
115    }
116}