vector/
kafka.rs

1#![allow(missing_docs)]
2use std::path::{Path, PathBuf};
3
4use rdkafka::{ClientConfig, ClientContext, Statistics, consumer::ConsumerContext};
5use snafu::Snafu;
6use tracing::Span;
7use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
8
9use crate::{
10    internal_events::KafkaStatisticsReceived,
11    tls::{PEM_START_MARKER, TlsEnableableConfig},
12};
13
14#[derive(Debug, Snafu)]
15enum KafkaError {
16    #[snafu(display("invalid path: {:?}", path))]
17    InvalidPath { path: PathBuf },
18}
19
20/// Supported compression types for Kafka.
21#[configurable_component]
22#[derive(Clone, Copy, Debug, Default)]
23#[serde(rename_all = "lowercase")]
24pub enum KafkaCompression {
25    /// No compression.
26    #[default]
27    None,
28
29    /// Gzip.
30    Gzip,
31
32    /// Snappy.
33    Snappy,
34
35    /// LZ4.
36    Lz4,
37
38    /// Zstandard.
39    Zstd,
40}
41
42/// Kafka authentication configuration.
43#[configurable_component]
44#[derive(Clone, Debug, Default)]
45pub struct KafkaAuthConfig {
46    #[configurable(derived)]
47    pub(crate) sasl: Option<KafkaSaslConfig>,
48
49    #[configurable(derived)]
50    #[configurable(metadata(docs::advanced))]
51    pub(crate) tls: Option<TlsEnableableConfig>,
52}
53
54/// Configuration for SASL authentication when interacting with Kafka.
55#[configurable_component]
56#[derive(Clone, Debug, Default)]
57pub struct KafkaSaslConfig {
58    /// Enables SASL authentication.
59    ///
60    /// Only `PLAIN`- and `SCRAM`-based mechanisms are supported when configuring SASL authentication using `sasl.*`. For
61    /// other mechanisms, `librdkafka_options.*` must be used directly to configure other `librdkafka`-specific values.
62    /// If using `sasl.kerberos.*` as an example, where `*` is `service.name`, `principal`, `kinit.md`, etc., then
63    /// `librdkafka_options.*` as a result becomes `librdkafka_options.sasl.kerberos.service.name`,
64    /// `librdkafka_options.sasl.kerberos.principal`, etc.
65    ///
66    /// See the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for details.
67    ///
68    /// SASL authentication is not supported on Windows.
69    pub(crate) enabled: Option<bool>,
70
71    /// The SASL username.
72    #[configurable(metadata(docs::examples = "username"))]
73    pub(crate) username: Option<String>,
74
75    /// The SASL password.
76    #[configurable(metadata(docs::examples = "password"))]
77    pub(crate) password: Option<SensitiveString>,
78
79    /// The SASL mechanism to use.
80    #[configurable(metadata(docs::examples = "SCRAM-SHA-256"))]
81    #[configurable(metadata(docs::examples = "SCRAM-SHA-512"))]
82    pub(crate) mechanism: Option<String>,
83}
84
85impl KafkaAuthConfig {
86    pub(crate) fn apply(&self, client: &mut ClientConfig) -> crate::Result<()> {
87        let sasl_enabled = self.sasl.as_ref().and_then(|s| s.enabled).unwrap_or(false);
88        let tls_enabled = self.tls.as_ref().and_then(|s| s.enabled).unwrap_or(false);
89
90        let protocol = match (sasl_enabled, tls_enabled) {
91            (false, false) => "plaintext",
92            (false, true) => "ssl",
93            (true, false) => "sasl_plaintext",
94            (true, true) => "sasl_ssl",
95        };
96        client.set("security.protocol", protocol);
97
98        if sasl_enabled {
99            let sasl = self.sasl.as_ref().unwrap();
100            if let Some(username) = &sasl.username {
101                client.set("sasl.username", username.as_str());
102            }
103            if let Some(password) = &sasl.password {
104                client.set("sasl.password", password.inner());
105            }
106            if let Some(mechanism) = &sasl.mechanism {
107                client.set("sasl.mechanism", mechanism);
108            }
109        }
110
111        if tls_enabled {
112            let tls = self.tls.as_ref().unwrap();
113
114            if let Some(verify_certificate) = &tls.options.verify_certificate {
115                client.set(
116                    "enable.ssl.certificate.verification",
117                    verify_certificate.to_string(),
118                );
119            }
120
121            if let Some(verify_hostname) = &tls.options.verify_hostname {
122                client.set(
123                    "ssl.endpoint.identification.algorithm",
124                    if *verify_hostname { "https" } else { "none" },
125                );
126            }
127
128            if let Some(path) = &tls.options.ca_file {
129                let text = pathbuf_to_string(path)?;
130                if text.contains(PEM_START_MARKER) {
131                    client.set("ssl.ca.pem", text);
132                } else {
133                    client.set("ssl.ca.location", text);
134                }
135            }
136
137            if let Some(path) = &tls.options.crt_file {
138                let text = pathbuf_to_string(path)?;
139                if text.contains(PEM_START_MARKER) {
140                    client.set("ssl.certificate.pem", text);
141                } else {
142                    client.set("ssl.certificate.location", text);
143                }
144            }
145
146            if let Some(path) = &tls.options.key_file {
147                let text = pathbuf_to_string(path)?;
148                if text.contains(PEM_START_MARKER) {
149                    client.set("ssl.key.pem", text);
150                } else {
151                    client.set("ssl.key.location", text);
152                }
153            }
154
155            if let Some(pass) = &tls.options.key_pass {
156                client.set("ssl.key.password", pass);
157            }
158        }
159
160        Ok(())
161    }
162}
163
164fn pathbuf_to_string(path: &Path) -> crate::Result<&str> {
165    path.to_str()
166        .ok_or_else(|| KafkaError::InvalidPath { path: path.into() }.into())
167}
168
169pub(crate) struct KafkaStatisticsContext {
170    pub(crate) expose_lag_metrics: bool,
171    pub span: Span,
172}
173
174impl ClientContext for KafkaStatisticsContext {
175    fn stats(&self, statistics: Statistics) {
176        // This callback get executed on a separate thread within the rdkafka library, so we need
177        // to propagate the span here to attach the component tags to the emitted events.
178        let _entered = self.span.enter();
179        emit!(KafkaStatisticsReceived {
180            statistics: &statistics,
181            expose_lag_metrics: self.expose_lag_metrics,
182        });
183    }
184}
185
186impl ConsumerContext for KafkaStatisticsContext {}