vector/sinks/opentelemetry/
mod.rs

1use crate::codecs::{EncodingConfigWithFraming, Transformer};
2use crate::config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext};
3use crate::sinks::http::config::{HttpMethod, HttpSinkConfig};
4use crate::sinks::{Healthcheck, VectorSink};
5use indoc::indoc;
6use vector_config::component::GenerateConfig;
7use vector_lib::codecs::encoding::{FramingConfig, SerializerConfig};
8use vector_lib::codecs::JsonSerializerConfig;
9use vector_lib::configurable::configurable_component;
10
11/// Configuration for the `OpenTelemetry` sink.
12#[configurable_component(sink("opentelemetry", "Deliver OTLP data over HTTP."))]
13#[derive(Clone, Debug, Default)]
14pub struct OpenTelemetryConfig {
15    /// Protocol configuration
16    #[configurable(derived)]
17    protocol: Protocol,
18}
19
20/// The protocol used to send data to OpenTelemetry.
21/// Currently only HTTP is supported, but we plan to support gRPC.
22/// The proto definitions are defined [here](https://github.com/vectordotdev/vector/blob/master/lib/opentelemetry-proto/src/proto/opentelemetry-proto/opentelemetry/proto/README.md).
23#[configurable_component]
24#[derive(Clone, Debug)]
25#[serde(rename_all = "snake_case", tag = "type")]
26#[configurable(metadata(docs::enum_tag_description = "The communication protocol."))]
27pub enum Protocol {
28    /// Send data over HTTP.
29    Http(HttpSinkConfig),
30}
31
32impl Default for Protocol {
33    fn default() -> Self {
34        Protocol::Http(HttpSinkConfig {
35            encoding: EncodingConfigWithFraming::new(
36                Some(FramingConfig::NewlineDelimited),
37                SerializerConfig::Json(JsonSerializerConfig::default()),
38                Transformer::default(),
39            ),
40            uri: Default::default(),
41            method: HttpMethod::Post,
42            auth: Default::default(),
43            headers: Default::default(),
44            compression: Default::default(),
45            payload_prefix: Default::default(),
46            payload_suffix: Default::default(),
47            batch: Default::default(),
48            request: Default::default(),
49            tls: Default::default(),
50            acknowledgements: Default::default(),
51        })
52    }
53}
54
55impl GenerateConfig for OpenTelemetryConfig {
56    fn generate_config() -> toml::Value {
57        toml::from_str(indoc! {r#"
58            [protocol]
59            type = "http"
60            uri = "http://localhost:5318/v1/logs"
61            encoding.codec = "json"
62        "#})
63        .unwrap()
64    }
65}
66
67#[async_trait::async_trait]
68#[typetag::serde(name = "opentelemetry")]
69impl SinkConfig for OpenTelemetryConfig {
70    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
71        match &self.protocol {
72            Protocol::Http(config) => config.build(cx).await,
73        }
74    }
75
76    fn input(&self) -> Input {
77        match &self.protocol {
78            Protocol::Http(config) => config.input(),
79        }
80    }
81
82    fn acknowledgements(&self) -> &AcknowledgementsConfig {
83        match self.protocol {
84            Protocol::Http(ref config) => config.acknowledgements(),
85        }
86    }
87}
88
89#[cfg(test)]
90mod test {
91    #[test]
92    fn generate_config() {
93        crate::test_util::test_generate_config::<super::OpenTelemetryConfig>();
94    }
95}