1use std::{collections::HashMap, time::Duration};
2
3use futures::FutureExt;
4use rdkafka::ClientConfig;
5use serde_with::serde_as;
6use vector_lib::{
7 codecs::JsonSerializerConfig, configurable::configurable_component,
8 lookup::lookup_v2::ConfigTargetPath,
9};
10use vrl::value::Kind;
11
12use crate::{
13 kafka::{KafkaAuthConfig, KafkaCompression},
14 serde::json::to_string,
15 sinks::{
16 kafka::sink::{KafkaSink, healthcheck},
17 prelude::*,
18 },
19};
20
21#[serde_as]
23#[configurable_component(sink(
24 "kafka",
25 "Publish observability event data to Apache Kafka topics."
26))]
27#[derive(Clone, Debug)]
28#[serde(deny_unknown_fields)]
29pub struct KafkaSinkConfig {
30 #[configurable(metadata(docs::examples = "10.14.22.123:9092,10.14.23.332:9092"))]
37 pub bootstrap_servers: String,
38
39 #[configurable(metadata(docs::templateable))]
41 #[configurable(metadata(
42 docs::examples = "topic-1234",
43 docs::examples = "logs-{{unit}}-%Y-%m-%d"
44 ))]
45 pub topic: Template,
46
47 pub healthcheck_topic: Option<String>,
52
53 #[configurable(metadata(docs::advanced))]
61 #[configurable(metadata(docs::examples = "user_id"))]
62 #[configurable(metadata(docs::examples = ".my_topic"))]
63 #[configurable(metadata(docs::examples = "%my_topic"))]
64 pub key_field: Option<ConfigTargetPath>,
65
66 #[configurable(derived)]
67 pub encoding: EncodingConfig,
68
69 #[configurable(derived)]
71 #[configurable(metadata(docs::advanced))]
72 #[serde(default)]
73 pub batch: BatchConfig<NoDefaultsBatchSettings>,
74
75 #[configurable(derived)]
76 #[configurable(metadata(docs::advanced))]
77 #[serde(default)]
78 pub compression: KafkaCompression,
79
80 #[configurable(derived)]
81 #[serde(flatten)]
82 pub auth: KafkaAuthConfig,
83
84 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
86 #[serde(default = "default_socket_timeout_ms")]
87 #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))]
88 #[configurable(metadata(docs::advanced))]
89 #[configurable(metadata(docs::human_name = "Socket Timeout"))]
90 pub socket_timeout_ms: Duration,
91
92 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
94 #[configurable(metadata(docs::examples = 150000, docs::examples = 450000))]
95 #[serde(default = "default_message_timeout_ms")]
96 #[configurable(metadata(docs::human_name = "Message Timeout"))]
97 #[configurable(metadata(docs::advanced))]
98 pub message_timeout_ms: Duration,
99
100 #[configurable(metadata(docs::type_unit = "seconds"))]
102 #[configurable(metadata(docs::human_name = "Rate Limit Duration"))]
103 #[serde(default = "default_rate_limit_duration_secs")]
104 pub rate_limit_duration_secs: u64,
105
106 #[configurable(metadata(docs::type_unit = "requests"))]
108 #[configurable(metadata(docs::human_name = "Rate Limit Number"))]
109 #[serde(default = "default_rate_limit_num")]
110 pub rate_limit_num: u64,
111
112 #[serde(default)]
118 #[configurable(metadata(docs::examples = "example_librdkafka_options()"))]
119 #[configurable(metadata(docs::advanced))]
120 #[configurable(metadata(
121 docs::additional_props_description = "A librdkafka configuration option."
122 ))]
123 pub librdkafka_options: HashMap<String, String>,
124
125 #[configurable(metadata(docs::advanced))]
129 #[serde(alias = "headers_field")] #[configurable(metadata(docs::examples = "headers"))]
131 pub headers_key: Option<ConfigTargetPath>,
132
133 #[configurable(derived)]
134 #[serde(
135 default,
136 deserialize_with = "crate::serde::bool_or_struct",
137 skip_serializing_if = "crate::serde::is_default"
138 )]
139 pub acknowledgements: AcknowledgementsConfig,
140}
141
142const fn default_socket_timeout_ms() -> Duration {
143 Duration::from_millis(60000) }
145
146const fn default_message_timeout_ms() -> Duration {
147 Duration::from_millis(300000) }
149
150const fn default_rate_limit_duration_secs() -> u64 {
151 1
152}
153
154const fn default_rate_limit_num() -> u64 {
155 i64::MAX as u64 }
157
158fn example_librdkafka_options() -> HashMap<String, String> {
159 HashMap::<_, _>::from_iter([
160 ("client.id".to_string(), "${ENV_VAR}".to_string()),
161 ("fetch.error.backoff.ms".to_string(), "1000".to_string()),
162 ("socket.send.buffer.bytes".to_string(), "100".to_string()),
163 ])
164}
165
166impl KafkaSinkConfig {
167 pub(crate) fn to_rdkafka(&self) -> crate::Result<ClientConfig> {
168 let mut client_config = ClientConfig::new();
169 client_config
170 .set("bootstrap.servers", &self.bootstrap_servers)
171 .set(
172 "socket.timeout.ms",
173 self.socket_timeout_ms.as_millis().to_string(),
174 )
175 .set("statistics.interval.ms", "1000");
176
177 self.auth.apply(&mut client_config)?;
178
179 client_config
181 .set("compression.codec", to_string(self.compression))
182 .set(
183 "message.timeout.ms",
184 self.message_timeout_ms.as_millis().to_string(),
185 );
186
187 if let Some(value) = self.batch.timeout_secs {
188 let key = "queue.buffering.max.ms";
194 if let Some(val) = self.librdkafka_options.get(key) {
195 return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{key}={value}`.\
196 The config already sets this as `librdkafka_options.queue.buffering.max.ms={val}`.\
197 Please delete one.").into());
198 }
199 debug!(
200 librdkafka_option = key,
201 batch_option = "timeout_secs",
202 value,
203 "Applying batch option as librdkafka option."
204 );
205 client_config.set(key, (value * 1000.0).round().to_string());
206 }
207 if let Some(value) = self.batch.max_events {
208 let key = "batch.num.messages";
212 if let Some(val) = self.librdkafka_options.get(key) {
213 return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{key}={value}`.\
214 The config already sets this as `librdkafka_options.batch.num.messages={val}`.\
215 Please delete one.").into());
216 }
217 debug!(
218 librdkafka_option = key,
219 batch_option = "max_events",
220 value,
221 "Applying batch option as librdkafka option."
222 );
223 client_config.set(key, value.to_string());
224 }
225 if let Some(value) = self.batch.max_bytes {
226 let key = "batch.size";
233 if let Some(val) = self.librdkafka_options.get(key) {
234 return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{key}={value}`.\
235 The config already sets this as `librdkafka_options.batch.size={val}`.\
236 Please delete one.").into());
237 }
238 debug!(
239 librdkafka_option = key,
240 batch_option = "max_bytes",
241 value,
242 "Applying batch option as librdkafka option."
243 );
244 client_config.set(key, value.to_string());
245 }
246
247 for (key, value) in self.librdkafka_options.iter() {
248 debug!(option = %key, value = %value, "Setting librdkafka option.");
249 client_config.set(key.as_str(), value.as_str());
250 }
251
252 Ok(client_config)
253 }
254}
255
256impl GenerateConfig for KafkaSinkConfig {
257 fn generate_config() -> toml::Value {
258 toml::Value::try_from(Self {
259 bootstrap_servers: "10.14.22.123:9092,10.14.23.332:9092".to_owned(),
260 topic: Template::try_from("topic-1234".to_owned()).unwrap(),
261 healthcheck_topic: None,
262 key_field: Some(ConfigTargetPath::try_from("user_id".to_owned()).unwrap()),
263 encoding: JsonSerializerConfig::default().into(),
264 batch: Default::default(),
265 compression: KafkaCompression::None,
266 auth: Default::default(),
267 socket_timeout_ms: default_socket_timeout_ms(),
268 message_timeout_ms: default_message_timeout_ms(),
269 rate_limit_duration_secs: default_rate_limit_duration_secs(),
270 rate_limit_num: default_rate_limit_num(),
271 librdkafka_options: Default::default(),
272 headers_key: None,
273 acknowledgements: Default::default(),
274 })
275 .unwrap()
276 }
277}
278
279#[async_trait::async_trait]
280#[typetag::serde(name = "kafka")]
281impl SinkConfig for KafkaSinkConfig {
282 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
283 let sink = KafkaSink::new(self.clone())?;
284 let hc = healthcheck(self.clone(), cx.healthcheck.clone()).boxed();
285 Ok((VectorSink::from_event_streamsink(sink), hc))
286 }
287
288 fn input(&self) -> Input {
289 let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
290
291 Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric))
292 .with_schema_requirement(requirements)
293 }
294
295 fn acknowledgements(&self) -> &AcknowledgementsConfig {
296 &self.acknowledgements
297 }
298}
299
300#[cfg(test)]
301mod tests {
302 use super::*;
303
304 #[test]
305 fn generate_config() {
306 KafkaSinkConfig::generate_config();
307 }
308}