vector/sinks/datadog/events/
request_builder.rs

1use std::{io, sync::Arc};
2
3use bytes::Bytes;
4use vector_lib::{
5    ByteSizeOf,
6    codecs::JsonSerializerConfig,
7    lookup::lookup_v2::ConfigValuePath,
8    request_metadata::{MetaDescriptive, RequestMetadata},
9};
10
11use crate::{
12    codecs::{Encoder, TimestampFormat, Transformer},
13    event::{Event, EventFinalizers, Finalizable},
14    sinks::util::{
15        Compression, ElementCount, RequestBuilder, metadata::RequestMetadataBuilder,
16        request_builder::EncodeResult,
17    },
18};
19
20#[derive(Clone)]
21pub struct DatadogEventsRequest {
22    pub body: Bytes,
23    pub metadata: Metadata,
24    request_metadata: RequestMetadata,
25}
26
27impl Finalizable for DatadogEventsRequest {
28    fn take_finalizers(&mut self) -> EventFinalizers {
29        std::mem::take(&mut self.metadata.finalizers)
30    }
31}
32
33impl ByteSizeOf for DatadogEventsRequest {
34    fn allocated_bytes(&self) -> usize {
35        self.body.allocated_bytes() + self.metadata.finalizers.allocated_bytes()
36    }
37}
38
39impl ElementCount for DatadogEventsRequest {
40    fn element_count(&self) -> usize {
41        // Datadog Events api only accepts a single event per request
42        1
43    }
44}
45
46impl MetaDescriptive for DatadogEventsRequest {
47    fn get_metadata(&self) -> &RequestMetadata {
48        &self.request_metadata
49    }
50
51    fn metadata_mut(&mut self) -> &mut RequestMetadata {
52        &mut self.request_metadata
53    }
54}
55
56#[derive(Clone)]
57pub struct Metadata {
58    pub finalizers: EventFinalizers,
59    pub api_key: Option<Arc<str>>,
60}
61
62pub struct DatadogEventsRequestBuilder {
63    encoder: (Transformer, Encoder<()>),
64}
65
66impl Default for DatadogEventsRequestBuilder {
67    fn default() -> Self {
68        Self::new()
69    }
70}
71
72impl DatadogEventsRequestBuilder {
73    pub fn new() -> DatadogEventsRequestBuilder {
74        DatadogEventsRequestBuilder { encoder: encoder() }
75    }
76}
77
78impl RequestBuilder<Event> for DatadogEventsRequestBuilder {
79    type Metadata = Metadata;
80    type Events = Event;
81    type Encoder = (Transformer, Encoder<()>);
82    type Payload = Bytes;
83    type Request = DatadogEventsRequest;
84    type Error = io::Error;
85
86    fn compression(&self) -> Compression {
87        Compression::None
88    }
89
90    fn encoder(&self) -> &Self::Encoder {
91        &self.encoder
92    }
93
94    fn split_input(&self, event: Event) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
95        let builder = RequestMetadataBuilder::from_event(&event);
96
97        let mut log = event.into_log();
98        let metadata = Metadata {
99            finalizers: log.take_finalizers(),
100            api_key: log.metadata_mut().datadog_api_key(),
101        };
102
103        (metadata, builder, Event::from(log))
104    }
105
106    fn build_request(
107        &self,
108        metadata: Self::Metadata,
109        request_metadata: RequestMetadata,
110        payload: EncodeResult<Self::Payload>,
111    ) -> Self::Request {
112        DatadogEventsRequest {
113            body: payload.into_payload(),
114            metadata,
115            request_metadata,
116        }
117    }
118}
119
120fn encoder() -> (Transformer, Encoder<()>) {
121    // DataDog Event API allows only some fields, and refuses
122    // to accept event if it contains any other field.
123    let only_fields = Some(
124        [
125            "aggregation_key",
126            "alert_type",
127            "date_happened",
128            "device_name",
129            "host",
130            "priority",
131            "related_event_id",
132            "source_type_name",
133            "tags",
134            "text",
135            "title",
136        ]
137        .iter()
138        .map(|field| ConfigValuePath::try_from((*field).to_string()).unwrap())
139        .collect(),
140    );
141    // DataDog Event API requires unix timestamp.
142    let timestamp_format = Some(TimestampFormat::Unix);
143
144    (
145        Transformer::new(only_fields, None, timestamp_format)
146            .expect("transformer configuration must be valid"),
147        Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
148    )
149}