vector/sinks/opentelemetry/
mod.rs

1use indoc::indoc;
2use vector_config::component::GenerateConfig;
3use vector_lib::{
4    codecs::{
5        JsonSerializerConfig,
6        encoding::{FramingConfig, SerializerConfig},
7    },
8    configurable::configurable_component,
9};
10
11use crate::{
12    codecs::{EncodingConfigWithFraming, Transformer},
13    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
14    sinks::{
15        Healthcheck, VectorSink,
16        http::config::{HttpMethod, HttpSinkConfig},
17    },
18};
19
20/// Configuration for the `OpenTelemetry` sink.
21#[configurable_component(sink("opentelemetry", "Deliver OTLP data over HTTP."))]
22#[derive(Clone, Debug, Default)]
23pub struct OpenTelemetryConfig {
24    /// Protocol configuration
25    #[configurable(derived)]
26    protocol: Protocol,
27}
28
29/// The protocol used to send data to OpenTelemetry.
30/// Currently only HTTP is supported, but we plan to support gRPC.
31/// The proto definitions are defined [here](https://github.com/vectordotdev/vector/blob/master/lib/opentelemetry-proto/src/proto/opentelemetry-proto/opentelemetry/proto/README.md).
32#[configurable_component]
33#[derive(Clone, Debug)]
34#[serde(rename_all = "snake_case", tag = "type")]
35#[configurable(metadata(docs::enum_tag_description = "The communication protocol."))]
36pub enum Protocol {
37    /// Send data over HTTP.
38    Http(HttpSinkConfig),
39}
40
41impl Default for Protocol {
42    fn default() -> Self {
43        Protocol::Http(HttpSinkConfig {
44            encoding: EncodingConfigWithFraming::new(
45                Some(FramingConfig::NewlineDelimited),
46                SerializerConfig::Json(JsonSerializerConfig::default()),
47                Transformer::default(),
48            ),
49            uri: Default::default(),
50            method: HttpMethod::Post,
51            auth: Default::default(),
52            compression: Default::default(),
53            payload_prefix: Default::default(),
54            payload_suffix: Default::default(),
55            batch: Default::default(),
56            request: Default::default(),
57            tls: Default::default(),
58            acknowledgements: Default::default(),
59        })
60    }
61}
62
63impl GenerateConfig for OpenTelemetryConfig {
64    fn generate_config() -> toml::Value {
65        toml::from_str(indoc! {r#"
66            [protocol]
67            type = "http"
68            uri = "http://localhost:5318/v1/logs"
69            encoding.codec = "json"
70        "#})
71        .unwrap()
72    }
73}
74
75#[async_trait::async_trait]
76#[typetag::serde(name = "opentelemetry")]
77impl SinkConfig for OpenTelemetryConfig {
78    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
79        match &self.protocol {
80            Protocol::Http(config) => config.build(cx).await,
81        }
82    }
83
84    fn input(&self) -> Input {
85        match &self.protocol {
86            Protocol::Http(config) => config.input(),
87        }
88    }
89
90    fn acknowledgements(&self) -> &AcknowledgementsConfig {
91        match self.protocol {
92            Protocol::Http(ref config) => config.acknowledgements(),
93        }
94    }
95}
96
97#[cfg(test)]
98mod test {
99    #[test]
100    fn generate_config() {
101        crate::test_util::test_generate_config::<super::OpenTelemetryConfig>();
102    }
103}