vector/sinks/datadog/events/
request_builder.rs

1use std::{io, sync::Arc};
2
3use bytes::Bytes;
4use vector_lib::codecs::JsonSerializerConfig;
5use vector_lib::lookup::lookup_v2::ConfigValuePath;
6use vector_lib::request_metadata::{MetaDescriptive, RequestMetadata};
7use vector_lib::ByteSizeOf;
8
9use crate::{
10    codecs::{Encoder, TimestampFormat, Transformer},
11    event::{Event, EventFinalizers, Finalizable},
12    sinks::util::{
13        metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression, ElementCount,
14        RequestBuilder,
15    },
16};
17
18#[derive(Clone)]
19pub struct DatadogEventsRequest {
20    pub body: Bytes,
21    pub metadata: Metadata,
22    request_metadata: RequestMetadata,
23}
24
25impl Finalizable for DatadogEventsRequest {
26    fn take_finalizers(&mut self) -> EventFinalizers {
27        std::mem::take(&mut self.metadata.finalizers)
28    }
29}
30
31impl ByteSizeOf for DatadogEventsRequest {
32    fn allocated_bytes(&self) -> usize {
33        self.body.allocated_bytes() + self.metadata.finalizers.allocated_bytes()
34    }
35}
36
37impl ElementCount for DatadogEventsRequest {
38    fn element_count(&self) -> usize {
39        // Datadog Events api only accepts a single event per request
40        1
41    }
42}
43
44impl MetaDescriptive for DatadogEventsRequest {
45    fn get_metadata(&self) -> &RequestMetadata {
46        &self.request_metadata
47    }
48
49    fn metadata_mut(&mut self) -> &mut RequestMetadata {
50        &mut self.request_metadata
51    }
52}
53
54#[derive(Clone)]
55pub struct Metadata {
56    pub finalizers: EventFinalizers,
57    pub api_key: Option<Arc<str>>,
58}
59
60pub struct DatadogEventsRequestBuilder {
61    encoder: (Transformer, Encoder<()>),
62}
63
64impl Default for DatadogEventsRequestBuilder {
65    fn default() -> Self {
66        Self::new()
67    }
68}
69
70impl DatadogEventsRequestBuilder {
71    pub fn new() -> DatadogEventsRequestBuilder {
72        DatadogEventsRequestBuilder { encoder: encoder() }
73    }
74}
75
76impl RequestBuilder<Event> for DatadogEventsRequestBuilder {
77    type Metadata = Metadata;
78    type Events = Event;
79    type Encoder = (Transformer, Encoder<()>);
80    type Payload = Bytes;
81    type Request = DatadogEventsRequest;
82    type Error = io::Error;
83
84    fn compression(&self) -> Compression {
85        Compression::None
86    }
87
88    fn encoder(&self) -> &Self::Encoder {
89        &self.encoder
90    }
91
92    fn split_input(&self, event: Event) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
93        let builder = RequestMetadataBuilder::from_event(&event);
94
95        let mut log = event.into_log();
96        let metadata = Metadata {
97            finalizers: log.take_finalizers(),
98            api_key: log.metadata_mut().datadog_api_key(),
99        };
100
101        (metadata, builder, Event::from(log))
102    }
103
104    fn build_request(
105        &self,
106        metadata: Self::Metadata,
107        request_metadata: RequestMetadata,
108        payload: EncodeResult<Self::Payload>,
109    ) -> Self::Request {
110        DatadogEventsRequest {
111            body: payload.into_payload(),
112            metadata,
113            request_metadata,
114        }
115    }
116}
117
118fn encoder() -> (Transformer, Encoder<()>) {
119    // DataDog Event API allows only some fields, and refuses
120    // to accept event if it contains any other field.
121    let only_fields = Some(
122        [
123            "aggregation_key",
124            "alert_type",
125            "date_happened",
126            "device_name",
127            "host",
128            "priority",
129            "related_event_id",
130            "source_type_name",
131            "tags",
132            "text",
133            "title",
134        ]
135        .iter()
136        .map(|field| ConfigValuePath::try_from((*field).to_string()).unwrap())
137        .collect(),
138    );
139    // DataDog Event API requires unix timestamp.
140    let timestamp_format = Some(TimestampFormat::Unix);
141
142    (
143        Transformer::new(only_fields, None, timestamp_format)
144            .expect("transformer configuration must be valid"),
145        Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
146    )
147}