1#![allow(missing_docs)]
2use std::path::{Path, PathBuf};
3
4use rdkafka::{consumer::ConsumerContext, ClientConfig, ClientContext, Statistics};
5use snafu::Snafu;
6use tracing::Span;
7use vector_lib::configurable::configurable_component;
8use vector_lib::sensitive_string::SensitiveString;
9
10use crate::{
11 internal_events::KafkaStatisticsReceived, tls::TlsEnableableConfig, tls::PEM_START_MARKER,
12};
13
14#[derive(Debug, Snafu)]
15enum KafkaError {
16 #[snafu(display("invalid path: {:?}", path))]
17 InvalidPath { path: PathBuf },
18}
19
20#[configurable_component]
22#[derive(Clone, Copy, Debug, Derivative)]
23#[derivative(Default)]
24#[serde(rename_all = "lowercase")]
25pub enum KafkaCompression {
26 #[derivative(Default)]
28 None,
29
30 Gzip,
32
33 Snappy,
35
36 Lz4,
38
39 Zstd,
41}
42
43#[configurable_component]
45#[derive(Clone, Debug, Default)]
46pub struct KafkaAuthConfig {
47 #[configurable(derived)]
48 pub(crate) sasl: Option<KafkaSaslConfig>,
49
50 #[configurable(derived)]
51 #[configurable(metadata(docs::advanced))]
52 pub(crate) tls: Option<TlsEnableableConfig>,
53}
54
55#[configurable_component]
57#[derive(Clone, Debug, Default)]
58pub struct KafkaSaslConfig {
59 pub(crate) enabled: Option<bool>,
71
72 #[configurable(metadata(docs::examples = "username"))]
74 pub(crate) username: Option<String>,
75
76 #[configurable(metadata(docs::examples = "password"))]
78 pub(crate) password: Option<SensitiveString>,
79
80 #[configurable(metadata(docs::examples = "SCRAM-SHA-256"))]
82 #[configurable(metadata(docs::examples = "SCRAM-SHA-512"))]
83 pub(crate) mechanism: Option<String>,
84}
85
86impl KafkaAuthConfig {
87 pub(crate) fn apply(&self, client: &mut ClientConfig) -> crate::Result<()> {
88 let sasl_enabled = self.sasl.as_ref().and_then(|s| s.enabled).unwrap_or(false);
89 let tls_enabled = self.tls.as_ref().and_then(|s| s.enabled).unwrap_or(false);
90
91 let protocol = match (sasl_enabled, tls_enabled) {
92 (false, false) => "plaintext",
93 (false, true) => "ssl",
94 (true, false) => "sasl_plaintext",
95 (true, true) => "sasl_ssl",
96 };
97 client.set("security.protocol", protocol);
98
99 if sasl_enabled {
100 let sasl = self.sasl.as_ref().unwrap();
101 if let Some(username) = &sasl.username {
102 client.set("sasl.username", username.as_str());
103 }
104 if let Some(password) = &sasl.password {
105 client.set("sasl.password", password.inner());
106 }
107 if let Some(mechanism) = &sasl.mechanism {
108 client.set("sasl.mechanism", mechanism);
109 }
110 }
111
112 if tls_enabled {
113 let tls = self.tls.as_ref().unwrap();
114
115 if let Some(verify_certificate) = &tls.options.verify_certificate {
116 client.set(
117 "enable.ssl.certificate.verification",
118 verify_certificate.to_string(),
119 );
120 }
121
122 if let Some(verify_hostname) = &tls.options.verify_hostname {
123 client.set(
124 "ssl.endpoint.identification.algorithm",
125 if *verify_hostname { "https" } else { "none" },
126 );
127 }
128
129 if let Some(path) = &tls.options.ca_file {
130 let text = pathbuf_to_string(path)?;
131 if text.contains(PEM_START_MARKER) {
132 client.set("ssl.ca.pem", text);
133 } else {
134 client.set("ssl.ca.location", text);
135 }
136 }
137
138 if let Some(path) = &tls.options.crt_file {
139 let text = pathbuf_to_string(path)?;
140 if text.contains(PEM_START_MARKER) {
141 client.set("ssl.certificate.pem", text);
142 } else {
143 client.set("ssl.certificate.location", text);
144 }
145 }
146
147 if let Some(path) = &tls.options.key_file {
148 let text = pathbuf_to_string(path)?;
149 if text.contains(PEM_START_MARKER) {
150 client.set("ssl.key.pem", text);
151 } else {
152 client.set("ssl.key.location", text);
153 }
154 }
155
156 if let Some(pass) = &tls.options.key_pass {
157 client.set("ssl.key.password", pass);
158 }
159 }
160
161 Ok(())
162 }
163}
164
165fn pathbuf_to_string(path: &Path) -> crate::Result<&str> {
166 path.to_str()
167 .ok_or_else(|| KafkaError::InvalidPath { path: path.into() }.into())
168}
169
170pub(crate) struct KafkaStatisticsContext {
171 pub(crate) expose_lag_metrics: bool,
172 pub span: Span,
173}
174
175impl ClientContext for KafkaStatisticsContext {
176 fn stats(&self, statistics: Statistics) {
177 let _entered = self.span.enter();
180 emit!(KafkaStatisticsReceived {
181 statistics: &statistics,
182 expose_lag_metrics: self.expose_lag_metrics,
183 });
184 }
185}
186
187impl ConsumerContext for KafkaStatisticsContext {}