vector/sinks/datadog/events/
config.rs1use indoc::indoc;
2use tower::ServiceBuilder;
3use vector_lib::config::proxy::ProxyConfig;
4use vector_lib::configurable::configurable_component;
5use vector_lib::schema;
6use vrl::value::Kind;
7
8use super::{
9 service::{DatadogEventsResponse, DatadogEventsService},
10 sink::DatadogEventsSink,
11};
12use crate::{
13 common::datadog,
14 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
15 http::HttpClient,
16 sinks::{
17 datadog::{DatadogCommonConfig, LocalDatadogCommonConfig},
18 util::{http::HttpStatusRetryLogic, ServiceBuilderExt, TowerRequestConfig},
19 Healthcheck, VectorSink,
20 },
21 tls::MaybeTlsSettings,
22};
23
24#[configurable_component(sink(
26 "datadog_events",
27 "Publish observability events to the Datadog Events API."
28))]
29#[derive(Clone, Debug, Default)]
30#[serde(deny_unknown_fields)]
31pub struct DatadogEventsConfig {
32 #[serde(flatten)]
33 pub dd_common: LocalDatadogCommonConfig,
34
35 #[configurable(derived)]
36 #[serde(default)]
37 pub request: TowerRequestConfig,
38}
39
40impl GenerateConfig for DatadogEventsConfig {
41 fn generate_config() -> toml::Value {
42 toml::from_str(indoc! {r#"
43 default_api_key = "${DATADOG_API_KEY_ENV_VAR}"
44 "#})
45 .unwrap()
46 }
47}
48
49impl DatadogEventsConfig {
50 fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
51 let tls = MaybeTlsSettings::from_config(self.dd_common.tls.as_ref(), false)?;
52 let client = HttpClient::new(tls, proxy)?;
53 Ok(client)
54 }
55
56 fn build_sink(
57 &self,
58 dd_common: &DatadogCommonConfig,
59 client: HttpClient,
60 ) -> crate::Result<VectorSink> {
61 let service = DatadogEventsService::new(
62 dd_common.get_api_endpoint("/api/v1/events")?,
63 dd_common.default_api_key.clone(),
64 client,
65 );
66
67 let request_opts = self.request;
68 let request_settings = request_opts.into_settings();
69 let retry_logic = HttpStatusRetryLogic::new(|req: &DatadogEventsResponse| req.http_status);
70
71 let service = ServiceBuilder::new()
72 .settings(request_settings, retry_logic)
73 .service(service);
74
75 let sink = DatadogEventsSink { service };
76
77 Ok(VectorSink::from_event_streamsink(sink))
78 }
79}
80
81#[async_trait::async_trait]
82#[typetag::serde(name = "datadog_events")]
83impl SinkConfig for DatadogEventsConfig {
84 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
85 let client = self.build_client(cx.proxy())?;
86 let global = cx.extra_context.get_or_default::<datadog::Options>();
87 let dd_common = self.dd_common.with_globals(global)?;
88 let healthcheck = dd_common.build_healthcheck(client.clone())?;
89 let sink = self.build_sink(&dd_common, client)?;
90
91 Ok((sink, healthcheck))
92 }
93
94 fn input(&self) -> Input {
95 let requirement = schema::Requirement::empty()
96 .required_meaning("message", Kind::bytes())
97 .optional_meaning("host", Kind::bytes())
98 .optional_meaning("timestamp", Kind::timestamp());
99
100 Input::log().with_schema_requirement(requirement)
101 }
102
103 fn acknowledgements(&self) -> &AcknowledgementsConfig {
104 &self.dd_common.acknowledgements
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use crate::sinks::datadog::events::config::DatadogEventsConfig;
111
112 #[test]
113 fn generate_config() {
114 crate::test_util::test_generate_config::<DatadogEventsConfig>();
115 }
116}