vector/sinks/datadog/events/
sink.rs1use std::fmt;
2
3use vector_lib::lookup::event_path;
4
5use crate::{
6 internal_events::{ParserMissingFieldError, DROP_EVENT},
7 sinks::{
8 datadog::events::request_builder::{DatadogEventsRequest, DatadogEventsRequestBuilder},
9 prelude::*,
10 },
11};
12
13pub struct DatadogEventsSink<S> {
14 pub(super) service: S,
15}
16
17impl<S> DatadogEventsSink<S>
18where
19 S: Service<DatadogEventsRequest> + Send + 'static,
20 S::Future: Send + 'static,
21 S::Response: DriverResponse + Send + 'static,
22 S::Error: fmt::Debug + Into<crate::Error> + Send,
23{
24 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
25 input
26 .filter_map(ensure_required_fields)
27 .request_builder(
28 default_request_builder_concurrency_limit(),
29 DatadogEventsRequestBuilder::new(),
30 )
31 .filter_map(|request| async move {
32 match request {
33 Err(error) => {
34 emit!(SinkRequestBuildError { error });
35 None
36 }
37 Ok(req) => Some(req),
38 }
39 })
40 .into_driver(self.service)
41 .run()
42 .await
43 }
44}
45
46async fn ensure_required_fields(event: Event) -> Option<Event> {
47 let mut log = event.into_log();
48
49 if !log.contains(event_path!("title")) {
50 emit!(ParserMissingFieldError::<DROP_EVENT> { field: "title" });
51 return None;
52 }
53
54 if !log.contains(event_path!("text")) {
55 let message_path = log
56 .message_path()
57 .expect("message is required (make sure the \"message\" semantic meaning is set)")
58 .clone();
59 log.rename_key(&message_path, event_path!("text"));
60 }
61
62 if !log.contains(event_path!("host")) {
63 if let Some(host_path) = log.host_path().cloned().as_ref() {
64 log.rename_key(host_path, event_path!("host"));
65 }
66 }
67
68 if !log.contains(event_path!("date_happened")) {
69 if let Some(timestamp_path) = log.timestamp_path().cloned().as_ref() {
70 log.rename_key(timestamp_path, event_path!("date_happened"));
71 }
72 }
73
74 if !log.contains(event_path!("source_type_name")) {
75 if let Some(source_type_path) = log.source_type_path().cloned().as_ref() {
76 log.rename_key(source_type_path, event_path!("source_type_name"));
77 }
78 }
79
80 Some(Event::from(log))
81}
82
83#[async_trait]
84impl<S> StreamSink<Event> for DatadogEventsSink<S>
85where
86 S: Service<DatadogEventsRequest> + Send + 'static,
87 S::Future: Send + 'static,
88 S::Response: DriverResponse + Send + 'static,
89 S::Error: fmt::Debug + Into<crate::Error> + Send,
90{
91 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
92 self.run(input).await
93 }
94}