vector/sinks/datadog/events/
config.rs

1use indoc::indoc;
2use tower::ServiceBuilder;
3use vector_lib::{config::proxy::ProxyConfig, configurable::configurable_component, schema};
4use vrl::value::Kind;
5
6use super::{
7    service::{DatadogEventsResponse, DatadogEventsService},
8    sink::DatadogEventsSink,
9};
10use crate::{
11    common::datadog,
12    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
13    http::HttpClient,
14    sinks::{
15        Healthcheck, VectorSink,
16        datadog::{DatadogCommonConfig, LocalDatadogCommonConfig},
17        util::{ServiceBuilderExt, TowerRequestConfig, http::HttpStatusRetryLogic},
18    },
19    tls::MaybeTlsSettings,
20};
21
22/// Configuration for the `datadog_events` sink.
23#[configurable_component(sink(
24    "datadog_events",
25    "Publish observability events to the Datadog Events API."
26))]
27#[derive(Clone, Debug, Default)]
28#[serde(deny_unknown_fields)]
29pub struct DatadogEventsConfig {
30    #[serde(flatten)]
31    pub dd_common: LocalDatadogCommonConfig,
32
33    #[configurable(derived)]
34    #[serde(default)]
35    pub request: TowerRequestConfig,
36}
37
38impl GenerateConfig for DatadogEventsConfig {
39    fn generate_config() -> toml::Value {
40        toml::from_str(indoc! {r#"
41            default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
42        "#})
43        .unwrap()
44    }
45}
46
47impl DatadogEventsConfig {
48    fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
49        let tls = MaybeTlsSettings::from_config(self.dd_common.tls.as_ref(), false)?;
50        let client = HttpClient::new(tls, proxy)?;
51        Ok(client)
52    }
53
54    fn build_sink(
55        &self,
56        dd_common: &DatadogCommonConfig,
57        client: HttpClient,
58    ) -> crate::Result<VectorSink> {
59        let service = DatadogEventsService::new(
60            dd_common.get_api_endpoint("/api/v1/events")?,
61            dd_common.default_api_key.clone(),
62            client,
63        );
64
65        let request_opts = self.request;
66        let request_settings = request_opts.into_settings();
67        let retry_logic = HttpStatusRetryLogic::new(|req: &DatadogEventsResponse| req.http_status);
68
69        let service = ServiceBuilder::new()
70            .settings(request_settings, retry_logic)
71            .service(service);
72
73        let sink = DatadogEventsSink { service };
74
75        Ok(VectorSink::from_event_streamsink(sink))
76    }
77}
78
79#[async_trait::async_trait]
80#[typetag::serde(name = "datadog_events")]
81impl SinkConfig for DatadogEventsConfig {
82    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
83        let client = self.build_client(cx.proxy())?;
84        let global = cx.extra_context.get_or_default::<datadog::Options>();
85        let dd_common = self.dd_common.with_globals(global)?;
86        let healthcheck = dd_common.build_healthcheck(client.clone())?;
87        let sink = self.build_sink(&dd_common, client)?;
88
89        Ok((sink, healthcheck))
90    }
91
92    fn input(&self) -> Input {
93        let requirement = schema::Requirement::empty()
94            .required_meaning("message", Kind::bytes())
95            .optional_meaning("host", Kind::bytes())
96            .optional_meaning("timestamp", Kind::timestamp());
97
98        Input::log().with_schema_requirement(requirement)
99    }
100
101    fn acknowledgements(&self) -> &AcknowledgementsConfig {
102        &self.dd_common.acknowledgements
103    }
104}
105
106#[cfg(test)]
107mod tests {
108    use crate::sinks::datadog::events::config::DatadogEventsConfig;
109
110    #[test]
111    fn generate_config() {
112        crate::test_util::test_generate_config::<DatadogEventsConfig>();
113    }
114}