vector/sinks/datadog/events/
config.rs

1use indoc::indoc;
2use tower::ServiceBuilder;
3use vector_lib::config::proxy::ProxyConfig;
4use vector_lib::configurable::configurable_component;
5use vector_lib::schema;
6use vrl::value::Kind;
7
8use super::{
9    service::{DatadogEventsResponse, DatadogEventsService},
10    sink::DatadogEventsSink,
11};
12use crate::{
13    common::datadog,
14    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
15    http::HttpClient,
16    sinks::{
17        datadog::{DatadogCommonConfig, LocalDatadogCommonConfig},
18        util::{http::HttpStatusRetryLogic, ServiceBuilderExt, TowerRequestConfig},
19        Healthcheck, VectorSink,
20    },
21    tls::MaybeTlsSettings,
22};
23
24/// Configuration for the `datadog_events` sink.
25#[configurable_component(sink(
26    "datadog_events",
27    "Publish observability events to the Datadog Events API."
28))]
29#[derive(Clone, Debug, Default)]
30#[serde(deny_unknown_fields)]
31pub struct DatadogEventsConfig {
32    #[serde(flatten)]
33    pub dd_common: LocalDatadogCommonConfig,
34
35    #[configurable(derived)]
36    #[serde(default)]
37    pub request: TowerRequestConfig,
38}
39
40impl GenerateConfig for DatadogEventsConfig {
41    fn generate_config() -> toml::Value {
42        toml::from_str(indoc! {r#"
43            default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
44        "#})
45        .unwrap()
46    }
47}
48
49impl DatadogEventsConfig {
50    fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
51        let tls = MaybeTlsSettings::from_config(self.dd_common.tls.as_ref(), false)?;
52        let client = HttpClient::new(tls, proxy)?;
53        Ok(client)
54    }
55
56    fn build_sink(
57        &self,
58        dd_common: &DatadogCommonConfig,
59        client: HttpClient,
60    ) -> crate::Result<VectorSink> {
61        let service = DatadogEventsService::new(
62            dd_common.get_api_endpoint("/api/v1/events")?,
63            dd_common.default_api_key.clone(),
64            client,
65        );
66
67        let request_opts = self.request;
68        let request_settings = request_opts.into_settings();
69        let retry_logic = HttpStatusRetryLogic::new(|req: &DatadogEventsResponse| req.http_status);
70
71        let service = ServiceBuilder::new()
72            .settings(request_settings, retry_logic)
73            .service(service);
74
75        let sink = DatadogEventsSink { service };
76
77        Ok(VectorSink::from_event_streamsink(sink))
78    }
79}
80
81#[async_trait::async_trait]
82#[typetag::serde(name = "datadog_events")]
83impl SinkConfig for DatadogEventsConfig {
84    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
85        let client = self.build_client(cx.proxy())?;
86        let global = cx.extra_context.get_or_default::<datadog::Options>();
87        let dd_common = self.dd_common.with_globals(global)?;
88        let healthcheck = dd_common.build_healthcheck(client.clone())?;
89        let sink = self.build_sink(&dd_common, client)?;
90
91        Ok((sink, healthcheck))
92    }
93
94    fn input(&self) -> Input {
95        let requirement = schema::Requirement::empty()
96            .required_meaning("message", Kind::bytes())
97            .optional_meaning("host", Kind::bytes())
98            .optional_meaning("timestamp", Kind::timestamp());
99
100        Input::log().with_schema_requirement(requirement)
101    }
102
103    fn acknowledgements(&self) -> &AcknowledgementsConfig {
104        &self.dd_common.acknowledgements
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use crate::sinks::datadog::events::config::DatadogEventsConfig;
111
112    #[test]
113    fn generate_config() {
114        crate::test_util::test_generate_config::<DatadogEventsConfig>();
115    }
116}