vector/sinks/opentelemetry/
mod.rs

1use indoc::indoc;
2use vector_config::component::GenerateConfig;
3use vector_lib::{
4    codecs::{
5        JsonSerializerConfig,
6        encoding::{FramingConfig, SerializerConfig},
7    },
8    configurable::configurable_component,
9};
10
11use crate::{
12    codecs::{EncodingConfigWithFraming, Transformer},
13    config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext},
14    sinks::{
15        Healthcheck, VectorSink,
16        http::config::{HttpMethod, HttpSinkConfig},
17    },
18};
19
20/// Configuration for the `OpenTelemetry` sink.
21#[configurable_component(sink("opentelemetry", "Deliver OTLP data over HTTP."))]
22#[derive(Clone, Debug, Default)]
23pub struct OpenTelemetryConfig {
24    /// Protocol configuration
25    #[configurable(derived)]
26    protocol: Protocol,
27}
28
29/// The protocol used to send data to OpenTelemetry.
30/// Currently only HTTP is supported, but we plan to support gRPC.
31/// The proto definitions are defined [here](https://github.com/vectordotdev/vector/blob/master/lib/opentelemetry-proto/src/proto/opentelemetry-proto/opentelemetry/proto/README.md).
32#[configurable_component]
33#[derive(Clone, Debug)]
34#[serde(rename_all = "snake_case", tag = "type")]
35#[configurable(metadata(docs::enum_tag_description = "The communication protocol."))]
36pub enum Protocol {
37    /// Send data over HTTP.
38    Http(HttpSinkConfig),
39}
40
41impl Default for Protocol {
42    fn default() -> Self {
43        Protocol::Http(HttpSinkConfig {
44            encoding: EncodingConfigWithFraming::new(
45                Some(FramingConfig::NewlineDelimited),
46                SerializerConfig::Json(JsonSerializerConfig::default()),
47                Transformer::default(),
48            ),
49            uri: Default::default(),
50            method: HttpMethod::Post,
51            auth: Default::default(),
52            headers: Default::default(),
53            compression: Default::default(),
54            payload_prefix: Default::default(),
55            payload_suffix: Default::default(),
56            batch: Default::default(),
57            request: Default::default(),
58            tls: Default::default(),
59            acknowledgements: Default::default(),
60        })
61    }
62}
63
64impl GenerateConfig for OpenTelemetryConfig {
65    fn generate_config() -> toml::Value {
66        toml::from_str(indoc! {r#"
67            [protocol]
68            type = "http"
69            uri = "http://localhost:5318/v1/logs"
70            encoding.codec = "json"
71        "#})
72        .unwrap()
73    }
74}
75
76#[async_trait::async_trait]
77#[typetag::serde(name = "opentelemetry")]
78impl SinkConfig for OpenTelemetryConfig {
79    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
80        match &self.protocol {
81            Protocol::Http(config) => config.build(cx).await,
82        }
83    }
84
85    fn input(&self) -> Input {
86        match &self.protocol {
87            Protocol::Http(config) => config.input(),
88        }
89    }
90
91    fn acknowledgements(&self) -> &AcknowledgementsConfig {
92        match self.protocol {
93            Protocol::Http(ref config) => config.acknowledgements(),
94        }
95    }
96}
97
98#[cfg(test)]
99mod test {
100    #[test]
101    fn generate_config() {
102        crate::test_util::test_generate_config::<super::OpenTelemetryConfig>();
103    }
104}