vector/sinks/datadog/events/
request_builder.rs1use std::{io, sync::Arc};
2
3use bytes::Bytes;
4use vector_lib::{
5 ByteSizeOf,
6 codecs::JsonSerializerConfig,
7 lookup::lookup_v2::ConfigValuePath,
8 request_metadata::{MetaDescriptive, RequestMetadata},
9};
10
11use crate::{
12 codecs::{Encoder, TimestampFormat, Transformer},
13 event::{Event, EventFinalizers, Finalizable},
14 sinks::util::{
15 Compression, ElementCount, RequestBuilder, metadata::RequestMetadataBuilder,
16 request_builder::EncodeResult,
17 },
18};
19
20#[derive(Clone)]
21pub struct DatadogEventsRequest {
22 pub body: Bytes,
23 pub metadata: Metadata,
24 request_metadata: RequestMetadata,
25}
26
27impl Finalizable for DatadogEventsRequest {
28 fn take_finalizers(&mut self) -> EventFinalizers {
29 std::mem::take(&mut self.metadata.finalizers)
30 }
31}
32
33impl ByteSizeOf for DatadogEventsRequest {
34 fn allocated_bytes(&self) -> usize {
35 self.body.allocated_bytes() + self.metadata.finalizers.allocated_bytes()
36 }
37}
38
39impl ElementCount for DatadogEventsRequest {
40 fn element_count(&self) -> usize {
41 1
43 }
44}
45
46impl MetaDescriptive for DatadogEventsRequest {
47 fn get_metadata(&self) -> &RequestMetadata {
48 &self.request_metadata
49 }
50
51 fn metadata_mut(&mut self) -> &mut RequestMetadata {
52 &mut self.request_metadata
53 }
54}
55
56#[derive(Clone)]
57pub struct Metadata {
58 pub finalizers: EventFinalizers,
59 pub api_key: Option<Arc<str>>,
60}
61
62pub struct DatadogEventsRequestBuilder {
63 encoder: (Transformer, Encoder<()>),
64}
65
66impl Default for DatadogEventsRequestBuilder {
67 fn default() -> Self {
68 Self::new()
69 }
70}
71
72impl DatadogEventsRequestBuilder {
73 pub fn new() -> DatadogEventsRequestBuilder {
74 DatadogEventsRequestBuilder { encoder: encoder() }
75 }
76}
77
78impl RequestBuilder<Event> for DatadogEventsRequestBuilder {
79 type Metadata = Metadata;
80 type Events = Event;
81 type Encoder = (Transformer, Encoder<()>);
82 type Payload = Bytes;
83 type Request = DatadogEventsRequest;
84 type Error = io::Error;
85
86 fn compression(&self) -> Compression {
87 Compression::None
88 }
89
90 fn encoder(&self) -> &Self::Encoder {
91 &self.encoder
92 }
93
94 fn split_input(&self, event: Event) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
95 let builder = RequestMetadataBuilder::from_event(&event);
96
97 let mut log = event.into_log();
98 let metadata = Metadata {
99 finalizers: log.take_finalizers(),
100 api_key: log.metadata_mut().datadog_api_key(),
101 };
102
103 (metadata, builder, Event::from(log))
104 }
105
106 fn build_request(
107 &self,
108 metadata: Self::Metadata,
109 request_metadata: RequestMetadata,
110 payload: EncodeResult<Self::Payload>,
111 ) -> Self::Request {
112 DatadogEventsRequest {
113 body: payload.into_payload(),
114 metadata,
115 request_metadata,
116 }
117 }
118}
119
120fn encoder() -> (Transformer, Encoder<()>) {
121 let only_fields = Some(
124 [
125 "aggregation_key",
126 "alert_type",
127 "date_happened",
128 "device_name",
129 "host",
130 "priority",
131 "related_event_id",
132 "source_type_name",
133 "tags",
134 "text",
135 "title",
136 ]
137 .iter()
138 .map(|field| ConfigValuePath::try_from((*field).to_string()).unwrap())
139 .collect(),
140 );
141 let timestamp_format = Some(TimestampFormat::Unix);
143
144 (
145 Transformer::new(only_fields, None, timestamp_format)
146 .expect("transformer configuration must be valid"),
147 Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
148 )
149}