vector/
kafka.rs

1#![allow(missing_docs)]
2use std::path::{Path, PathBuf};
3
4use rdkafka::{consumer::ConsumerContext, ClientConfig, ClientContext, Statistics};
5use snafu::Snafu;
6use tracing::Span;
7use vector_lib::configurable::configurable_component;
8use vector_lib::sensitive_string::SensitiveString;
9
10use crate::{
11    internal_events::KafkaStatisticsReceived, tls::TlsEnableableConfig, tls::PEM_START_MARKER,
12};
13
14#[derive(Debug, Snafu)]
15enum KafkaError {
16    #[snafu(display("invalid path: {:?}", path))]
17    InvalidPath { path: PathBuf },
18}
19
20/// Supported compression types for Kafka.
21#[configurable_component]
22#[derive(Clone, Copy, Debug, Derivative)]
23#[derivative(Default)]
24#[serde(rename_all = "lowercase")]
25pub enum KafkaCompression {
26    /// No compression.
27    #[derivative(Default)]
28    None,
29
30    /// Gzip.
31    Gzip,
32
33    /// Snappy.
34    Snappy,
35
36    /// LZ4.
37    Lz4,
38
39    /// Zstandard.
40    Zstd,
41}
42
43/// Kafka authentication configuration.
44#[configurable_component]
45#[derive(Clone, Debug, Default)]
46pub struct KafkaAuthConfig {
47    #[configurable(derived)]
48    pub(crate) sasl: Option<KafkaSaslConfig>,
49
50    #[configurable(derived)]
51    #[configurable(metadata(docs::advanced))]
52    pub(crate) tls: Option<TlsEnableableConfig>,
53}
54
55/// Configuration for SASL authentication when interacting with Kafka.
56#[configurable_component]
57#[derive(Clone, Debug, Default)]
58pub struct KafkaSaslConfig {
59    /// Enables SASL authentication.
60    ///
61    /// Only `PLAIN`- and `SCRAM`-based mechanisms are supported when configuring SASL authentication using `sasl.*`. For
62    /// other mechanisms, `librdkafka_options.*` must be used directly to configure other `librdkafka`-specific values.
63    /// If using `sasl.kerberos.*` as an example, where `*` is `service.name`, `principal`, `kinit.md`, etc., then
64    /// `librdkafka_options.*` as a result becomes `librdkafka_options.sasl.kerberos.service.name`,
65    /// `librdkafka_options.sasl.kerberos.principal`, etc.
66    ///
67    /// See the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for details.
68    ///
69    /// SASL authentication is not supported on Windows.
70    pub(crate) enabled: Option<bool>,
71
72    /// The SASL username.
73    #[configurable(metadata(docs::examples = "username"))]
74    pub(crate) username: Option<String>,
75
76    /// The SASL password.
77    #[configurable(metadata(docs::examples = "password"))]
78    pub(crate) password: Option<SensitiveString>,
79
80    /// The SASL mechanism to use.
81    #[configurable(metadata(docs::examples = "SCRAM-SHA-256"))]
82    #[configurable(metadata(docs::examples = "SCRAM-SHA-512"))]
83    pub(crate) mechanism: Option<String>,
84}
85
86impl KafkaAuthConfig {
87    pub(crate) fn apply(&self, client: &mut ClientConfig) -> crate::Result<()> {
88        let sasl_enabled = self.sasl.as_ref().and_then(|s| s.enabled).unwrap_or(false);
89        let tls_enabled = self.tls.as_ref().and_then(|s| s.enabled).unwrap_or(false);
90
91        let protocol = match (sasl_enabled, tls_enabled) {
92            (false, false) => "plaintext",
93            (false, true) => "ssl",
94            (true, false) => "sasl_plaintext",
95            (true, true) => "sasl_ssl",
96        };
97        client.set("security.protocol", protocol);
98
99        if sasl_enabled {
100            let sasl = self.sasl.as_ref().unwrap();
101            if let Some(username) = &sasl.username {
102                client.set("sasl.username", username.as_str());
103            }
104            if let Some(password) = &sasl.password {
105                client.set("sasl.password", password.inner());
106            }
107            if let Some(mechanism) = &sasl.mechanism {
108                client.set("sasl.mechanism", mechanism);
109            }
110        }
111
112        if tls_enabled {
113            let tls = self.tls.as_ref().unwrap();
114
115            if let Some(verify_certificate) = &tls.options.verify_certificate {
116                client.set(
117                    "enable.ssl.certificate.verification",
118                    verify_certificate.to_string(),
119                );
120            }
121
122            if let Some(verify_hostname) = &tls.options.verify_hostname {
123                client.set(
124                    "ssl.endpoint.identification.algorithm",
125                    if *verify_hostname { "https" } else { "none" },
126                );
127            }
128
129            if let Some(path) = &tls.options.ca_file {
130                let text = pathbuf_to_string(path)?;
131                if text.contains(PEM_START_MARKER) {
132                    client.set("ssl.ca.pem", text);
133                } else {
134                    client.set("ssl.ca.location", text);
135                }
136            }
137
138            if let Some(path) = &tls.options.crt_file {
139                let text = pathbuf_to_string(path)?;
140                if text.contains(PEM_START_MARKER) {
141                    client.set("ssl.certificate.pem", text);
142                } else {
143                    client.set("ssl.certificate.location", text);
144                }
145            }
146
147            if let Some(path) = &tls.options.key_file {
148                let text = pathbuf_to_string(path)?;
149                if text.contains(PEM_START_MARKER) {
150                    client.set("ssl.key.pem", text);
151                } else {
152                    client.set("ssl.key.location", text);
153                }
154            }
155
156            if let Some(pass) = &tls.options.key_pass {
157                client.set("ssl.key.password", pass);
158            }
159        }
160
161        Ok(())
162    }
163}
164
165fn pathbuf_to_string(path: &Path) -> crate::Result<&str> {
166    path.to_str()
167        .ok_or_else(|| KafkaError::InvalidPath { path: path.into() }.into())
168}
169
170pub(crate) struct KafkaStatisticsContext {
171    pub(crate) expose_lag_metrics: bool,
172    pub span: Span,
173}
174
175impl ClientContext for KafkaStatisticsContext {
176    fn stats(&self, statistics: Statistics) {
177        // This callback get executed on a separate thread within the rdkafka library, so we need
178        // to propagate the span here to attach the component tags to the emitted events.
179        let _entered = self.span.enter();
180        emit!(KafkaStatisticsReceived {
181            statistics: &statistics,
182            expose_lag_metrics: self.expose_lag_metrics,
183        });
184    }
185}
186
187impl ConsumerContext for KafkaStatisticsContext {}