vector/sinks/datadog/events/
service.rs1use std::task::{Context, Poll};
2
3use bytes::Bytes;
4use futures::{
5 future,
6 future::{BoxFuture, Ready},
7};
8use http::Request;
9use hyper::Body;
10use tower::{Service, ServiceExt};
11use vector_lib::stream::DriverResponse;
12use vector_lib::{
13 request_metadata::{GroupedCountByteSize, MetaDescriptive},
14 sensitive_string::SensitiveString,
15};
16
17use crate::{
18 event::EventStatus,
19 http::HttpClient,
20 sinks::{
21 datadog::events::request_builder::DatadogEventsRequest,
22 util::{http::HttpBatchService, sink::Response},
23 },
24};
25
26pub struct DatadogEventsResponse {
27 pub(self) event_status: EventStatus,
28 pub http_status: http::StatusCode,
29 pub event_byte_size: GroupedCountByteSize,
30}
31
32impl DriverResponse for DatadogEventsResponse {
33 fn event_status(&self) -> EventStatus {
34 self.event_status
35 }
36
37 fn events_sent(&self) -> &GroupedCountByteSize {
38 &self.event_byte_size
39 }
40
41 fn bytes_sent(&self) -> Option<usize> {
42 None
44 }
45}
46
47#[derive(Clone)]
48pub struct DatadogEventsService {
49 batch_http_service:
53 HttpBatchService<Ready<Result<http::Request<Bytes>, crate::Error>>, DatadogEventsRequest>,
54}
55
56impl DatadogEventsService {
57 pub fn new(
58 endpoint: http::Uri,
59 default_api_key: SensitiveString,
60 http_client: HttpClient<Body>,
61 ) -> Self {
62 let batch_http_service = HttpBatchService::new(http_client, move |req| {
63 let req: DatadogEventsRequest = req;
64
65 let api_key = match req.metadata.api_key.as_ref() {
66 Some(x) => x.as_ref(),
67 None => default_api_key.inner(),
68 };
69
70 let request = Request::post(&endpoint)
71 .header("Content-Type", "application/json")
72 .header("DD-API-KEY", api_key)
73 .header("Content-Length", req.body.len())
74 .body(req.body)
75 .map_err(|x| x.into());
76 future::ready(request)
77 });
78
79 Self { batch_http_service }
80 }
81}
82
83impl Service<DatadogEventsRequest> for DatadogEventsService {
84 type Response = DatadogEventsResponse;
85 type Error = crate::Error;
86 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
87
88 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
90 Poll::Ready(Ok(()))
91 }
92
93 fn call(&mut self, mut req: DatadogEventsRequest) -> Self::Future {
95 let mut http_service = self.batch_http_service.clone();
96
97 Box::pin(async move {
98 let metadata = std::mem::take(req.metadata_mut());
99 http_service.ready().await?;
100 let event_byte_size = metadata.into_events_estimated_json_encoded_byte_size();
101 let http_response = http_service.call(req).await?;
102 let event_status = if http_response.is_successful() {
103 EventStatus::Delivered
104 } else if http_response.is_transient() {
105 EventStatus::Errored
106 } else {
107 EventStatus::Rejected
108 };
109 Ok(DatadogEventsResponse {
110 event_status,
111 http_status: http_response.status(),
112 event_byte_size,
113 })
114 })
115 }
116}