1#![allow(missing_docs)]
2use std::path::{Path, PathBuf};
3
4use rdkafka::{ClientConfig, ClientContext, Statistics, consumer::ConsumerContext};
5use snafu::Snafu;
6use tracing::Span;
7use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
8
9use crate::{
10 internal_events::KafkaStatisticsReceived,
11 tls::{PEM_START_MARKER, TlsEnableableConfig},
12};
13
14#[derive(Debug, Snafu)]
15enum KafkaError {
16 #[snafu(display("invalid path: {:?}", path))]
17 InvalidPath { path: PathBuf },
18}
19
20#[configurable_component]
22#[derive(Clone, Copy, Debug, Default)]
23#[serde(rename_all = "lowercase")]
24pub enum KafkaCompression {
25 #[default]
27 None,
28
29 Gzip,
31
32 Snappy,
34
35 Lz4,
37
38 Zstd,
40}
41
42#[configurable_component]
44#[derive(Clone, Debug, Default)]
45pub struct KafkaAuthConfig {
46 #[configurable(derived)]
47 pub(crate) sasl: Option<KafkaSaslConfig>,
48
49 #[configurable(derived)]
50 #[configurable(metadata(docs::advanced))]
51 pub(crate) tls: Option<TlsEnableableConfig>,
52}
53
54#[configurable_component]
56#[derive(Clone, Debug, Default)]
57pub struct KafkaSaslConfig {
58 pub(crate) enabled: Option<bool>,
70
71 #[configurable(metadata(docs::examples = "username"))]
73 pub(crate) username: Option<String>,
74
75 #[configurable(metadata(docs::examples = "password"))]
77 pub(crate) password: Option<SensitiveString>,
78
79 #[configurable(metadata(docs::examples = "SCRAM-SHA-256"))]
81 #[configurable(metadata(docs::examples = "SCRAM-SHA-512"))]
82 pub(crate) mechanism: Option<String>,
83}
84
85impl KafkaAuthConfig {
86 pub(crate) fn apply(&self, client: &mut ClientConfig) -> crate::Result<()> {
87 let sasl_enabled = self.sasl.as_ref().and_then(|s| s.enabled).unwrap_or(false);
88 let tls_enabled = self.tls.as_ref().and_then(|s| s.enabled).unwrap_or(false);
89
90 let protocol = match (sasl_enabled, tls_enabled) {
91 (false, false) => "plaintext",
92 (false, true) => "ssl",
93 (true, false) => "sasl_plaintext",
94 (true, true) => "sasl_ssl",
95 };
96 client.set("security.protocol", protocol);
97
98 if sasl_enabled {
99 let sasl = self.sasl.as_ref().unwrap();
100 if let Some(username) = &sasl.username {
101 client.set("sasl.username", username.as_str());
102 }
103 if let Some(password) = &sasl.password {
104 client.set("sasl.password", password.inner());
105 }
106 if let Some(mechanism) = &sasl.mechanism {
107 client.set("sasl.mechanism", mechanism);
108 }
109 }
110
111 if tls_enabled {
112 let tls = self.tls.as_ref().unwrap();
113
114 if let Some(verify_certificate) = &tls.options.verify_certificate {
115 client.set(
116 "enable.ssl.certificate.verification",
117 verify_certificate.to_string(),
118 );
119 }
120
121 if let Some(verify_hostname) = &tls.options.verify_hostname {
122 client.set(
123 "ssl.endpoint.identification.algorithm",
124 if *verify_hostname { "https" } else { "none" },
125 );
126 }
127
128 if let Some(path) = &tls.options.ca_file {
129 let text = pathbuf_to_string(path)?;
130 if text.contains(PEM_START_MARKER) {
131 client.set("ssl.ca.pem", text);
132 } else {
133 client.set("ssl.ca.location", text);
134 }
135 }
136
137 if let Some(path) = &tls.options.crt_file {
138 let text = pathbuf_to_string(path)?;
139 if text.contains(PEM_START_MARKER) {
140 client.set("ssl.certificate.pem", text);
141 } else {
142 client.set("ssl.certificate.location", text);
143 }
144 }
145
146 if let Some(path) = &tls.options.key_file {
147 let text = pathbuf_to_string(path)?;
148 if text.contains(PEM_START_MARKER) {
149 client.set("ssl.key.pem", text);
150 } else {
151 client.set("ssl.key.location", text);
152 }
153 }
154
155 if let Some(pass) = &tls.options.key_pass {
156 client.set("ssl.key.password", pass);
157 }
158 }
159
160 Ok(())
161 }
162}
163
164fn pathbuf_to_string(path: &Path) -> crate::Result<&str> {
165 path.to_str()
166 .ok_or_else(|| KafkaError::InvalidPath { path: path.into() }.into())
167}
168
169pub(crate) struct KafkaStatisticsContext {
170 pub(crate) expose_lag_metrics: bool,
171 pub span: Span,
172}
173
174impl ClientContext for KafkaStatisticsContext {
175 fn stats(&self, statistics: Statistics) {
176 let _entered = self.span.enter();
179 emit!(KafkaStatisticsReceived {
180 statistics: &statistics,
181 expose_lag_metrics: self.expose_lag_metrics,
182 });
183 }
184}
185
186impl ConsumerContext for KafkaStatisticsContext {}