vector/sinks/datadog/events/
request_builder.rs1use std::{io, sync::Arc};
2
3use bytes::Bytes;
4use vector_lib::codecs::JsonSerializerConfig;
5use vector_lib::lookup::lookup_v2::ConfigValuePath;
6use vector_lib::request_metadata::{MetaDescriptive, RequestMetadata};
7use vector_lib::ByteSizeOf;
8
9use crate::{
10 codecs::{Encoder, TimestampFormat, Transformer},
11 event::{Event, EventFinalizers, Finalizable},
12 sinks::util::{
13 metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression, ElementCount,
14 RequestBuilder,
15 },
16};
17
18#[derive(Clone)]
19pub struct DatadogEventsRequest {
20 pub body: Bytes,
21 pub metadata: Metadata,
22 request_metadata: RequestMetadata,
23}
24
25impl Finalizable for DatadogEventsRequest {
26 fn take_finalizers(&mut self) -> EventFinalizers {
27 std::mem::take(&mut self.metadata.finalizers)
28 }
29}
30
31impl ByteSizeOf for DatadogEventsRequest {
32 fn allocated_bytes(&self) -> usize {
33 self.body.allocated_bytes() + self.metadata.finalizers.allocated_bytes()
34 }
35}
36
37impl ElementCount for DatadogEventsRequest {
38 fn element_count(&self) -> usize {
39 1
41 }
42}
43
44impl MetaDescriptive for DatadogEventsRequest {
45 fn get_metadata(&self) -> &RequestMetadata {
46 &self.request_metadata
47 }
48
49 fn metadata_mut(&mut self) -> &mut RequestMetadata {
50 &mut self.request_metadata
51 }
52}
53
54#[derive(Clone)]
55pub struct Metadata {
56 pub finalizers: EventFinalizers,
57 pub api_key: Option<Arc<str>>,
58}
59
60pub struct DatadogEventsRequestBuilder {
61 encoder: (Transformer, Encoder<()>),
62}
63
64impl Default for DatadogEventsRequestBuilder {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl DatadogEventsRequestBuilder {
71 pub fn new() -> DatadogEventsRequestBuilder {
72 DatadogEventsRequestBuilder { encoder: encoder() }
73 }
74}
75
76impl RequestBuilder<Event> for DatadogEventsRequestBuilder {
77 type Metadata = Metadata;
78 type Events = Event;
79 type Encoder = (Transformer, Encoder<()>);
80 type Payload = Bytes;
81 type Request = DatadogEventsRequest;
82 type Error = io::Error;
83
84 fn compression(&self) -> Compression {
85 Compression::None
86 }
87
88 fn encoder(&self) -> &Self::Encoder {
89 &self.encoder
90 }
91
92 fn split_input(&self, event: Event) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
93 let builder = RequestMetadataBuilder::from_event(&event);
94
95 let mut log = event.into_log();
96 let metadata = Metadata {
97 finalizers: log.take_finalizers(),
98 api_key: log.metadata_mut().datadog_api_key(),
99 };
100
101 (metadata, builder, Event::from(log))
102 }
103
104 fn build_request(
105 &self,
106 metadata: Self::Metadata,
107 request_metadata: RequestMetadata,
108 payload: EncodeResult<Self::Payload>,
109 ) -> Self::Request {
110 DatadogEventsRequest {
111 body: payload.into_payload(),
112 metadata,
113 request_metadata,
114 }
115 }
116}
117
118fn encoder() -> (Transformer, Encoder<()>) {
119 let only_fields = Some(
122 [
123 "aggregation_key",
124 "alert_type",
125 "date_happened",
126 "device_name",
127 "host",
128 "priority",
129 "related_event_id",
130 "source_type_name",
131 "tags",
132 "text",
133 "title",
134 ]
135 .iter()
136 .map(|field| ConfigValuePath::try_from((*field).to_string()).unwrap())
137 .collect(),
138 );
139 let timestamp_format = Some(TimestampFormat::Unix);
141
142 (
143 Transformer::new(only_fields, None, timestamp_format)
144 .expect("transformer configuration must be valid"),
145 Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
146 )
147}