vector/sinks/datadog/events/
sink.rs

1use std::fmt;
2
3use vector_lib::lookup::event_path;
4
5use crate::{
6    internal_events::{ParserMissingFieldError, DROP_EVENT},
7    sinks::{
8        datadog::events::request_builder::{DatadogEventsRequest, DatadogEventsRequestBuilder},
9        prelude::*,
10    },
11};
12
13pub struct DatadogEventsSink<S> {
14    pub(super) service: S,
15}
16
17impl<S> DatadogEventsSink<S>
18where
19    S: Service<DatadogEventsRequest> + Send + 'static,
20    S::Future: Send + 'static,
21    S::Response: DriverResponse + Send + 'static,
22    S::Error: fmt::Debug + Into<crate::Error> + Send,
23{
24    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
25        input
26            .filter_map(ensure_required_fields)
27            .request_builder(
28                default_request_builder_concurrency_limit(),
29                DatadogEventsRequestBuilder::new(),
30            )
31            .filter_map(|request| async move {
32                match request {
33                    Err(error) => {
34                        emit!(SinkRequestBuildError { error });
35                        None
36                    }
37                    Ok(req) => Some(req),
38                }
39            })
40            .into_driver(self.service)
41            .run()
42            .await
43    }
44}
45
46async fn ensure_required_fields(event: Event) -> Option<Event> {
47    let mut log = event.into_log();
48
49    if !log.contains(event_path!("title")) {
50        emit!(ParserMissingFieldError::<DROP_EVENT> { field: "title" });
51        return None;
52    }
53
54    if !log.contains(event_path!("text")) {
55        let message_path = log
56            .message_path()
57            .expect("message is required (make sure the \"message\" semantic meaning is set)")
58            .clone();
59        log.rename_key(&message_path, event_path!("text"));
60    }
61
62    if !log.contains(event_path!("host")) {
63        if let Some(host_path) = log.host_path().cloned().as_ref() {
64            log.rename_key(host_path, event_path!("host"));
65        }
66    }
67
68    if !log.contains(event_path!("date_happened")) {
69        if let Some(timestamp_path) = log.timestamp_path().cloned().as_ref() {
70            log.rename_key(timestamp_path, event_path!("date_happened"));
71        }
72    }
73
74    if !log.contains(event_path!("source_type_name")) {
75        if let Some(source_type_path) = log.source_type_path().cloned().as_ref() {
76            log.rename_key(source_type_path, event_path!("source_type_name"));
77        }
78    }
79
80    Some(Event::from(log))
81}
82
83#[async_trait]
84impl<S> StreamSink<Event> for DatadogEventsSink<S>
85where
86    S: Service<DatadogEventsRequest> + Send + 'static,
87    S::Future: Send + 'static,
88    S::Response: DriverResponse + Send + 'static,
89    S::Error: fmt::Debug + Into<crate::Error> + Send,
90{
91    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
92        self.run(input).await
93    }
94}