vector/sinks/kafka/
config.rs1use std::{collections::HashMap, time::Duration};
2
3use futures::FutureExt;
4use rdkafka::ClientConfig;
5use serde_with::serde_as;
6use vector_lib::codecs::JsonSerializerConfig;
7use vector_lib::configurable::configurable_component;
8use vector_lib::lookup::lookup_v2::ConfigTargetPath;
9use vrl::value::Kind;
10
11use crate::{
12 kafka::{KafkaAuthConfig, KafkaCompression},
13 serde::json::to_string,
14 sinks::{
15 kafka::sink::{healthcheck, KafkaSink},
16 prelude::*,
17 },
18};
19
20#[serde_as]
22#[configurable_component(sink(
23 "kafka",
24 "Publish observability event data to Apache Kafka topics."
25))]
26#[derive(Clone, Debug)]
27#[serde(deny_unknown_fields)]
28pub struct KafkaSinkConfig {
29 #[configurable(metadata(docs::examples = "10.14.22.123:9092,10.14.23.332:9092"))]
36 pub bootstrap_servers: String,
37
38 #[configurable(metadata(docs::templateable))]
40 #[configurable(metadata(
41 docs::examples = "topic-1234",
42 docs::examples = "logs-{{unit}}-%Y-%m-%d"
43 ))]
44 pub topic: Template,
45
46 pub healthcheck_topic: Option<String>,
51
52 #[configurable(metadata(docs::advanced))]
60 #[configurable(metadata(docs::examples = "user_id"))]
61 #[configurable(metadata(docs::examples = ".my_topic"))]
62 #[configurable(metadata(docs::examples = "%my_topic"))]
63 pub key_field: Option<ConfigTargetPath>,
64
65 #[configurable(derived)]
66 pub encoding: EncodingConfig,
67
68 #[configurable(derived)]
70 #[configurable(metadata(docs::advanced))]
71 #[serde(default)]
72 pub batch: BatchConfig<NoDefaultsBatchSettings>,
73
74 #[configurable(derived)]
75 #[configurable(metadata(docs::advanced))]
76 #[serde(default)]
77 pub compression: KafkaCompression,
78
79 #[configurable(derived)]
80 #[serde(flatten)]
81 pub auth: KafkaAuthConfig,
82
83 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
85 #[serde(default = "default_socket_timeout_ms")]
86 #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))]
87 #[configurable(metadata(docs::advanced))]
88 #[configurable(metadata(docs::human_name = "Socket Timeout"))]
89 pub socket_timeout_ms: Duration,
90
91 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
93 #[configurable(metadata(docs::examples = 150000, docs::examples = 450000))]
94 #[serde(default = "default_message_timeout_ms")]
95 #[configurable(metadata(docs::human_name = "Message Timeout"))]
96 #[configurable(metadata(docs::advanced))]
97 pub message_timeout_ms: Duration,
98
99 #[configurable(metadata(docs::type_unit = "seconds"))]
101 #[configurable(metadata(docs::human_name = "Rate Limit Duration"))]
102 #[serde(default = "default_rate_limit_duration_secs")]
103 pub rate_limit_duration_secs: u64,
104
105 #[configurable(metadata(docs::type_unit = "requests"))]
107 #[configurable(metadata(docs::human_name = "Rate Limit Number"))]
108 #[serde(default = "default_rate_limit_num")]
109 pub rate_limit_num: u64,
110
111 #[serde(default)]
117 #[configurable(metadata(docs::examples = "example_librdkafka_options()"))]
118 #[configurable(metadata(docs::advanced))]
119 #[configurable(metadata(
120 docs::additional_props_description = "A librdkafka configuration option."
121 ))]
122 pub librdkafka_options: HashMap<String, String>,
123
124 #[configurable(metadata(docs::advanced))]
128 #[serde(alias = "headers_field")] #[configurable(metadata(docs::examples = "headers"))]
130 pub headers_key: Option<ConfigTargetPath>,
131
132 #[configurable(derived)]
133 #[serde(
134 default,
135 deserialize_with = "crate::serde::bool_or_struct",
136 skip_serializing_if = "crate::serde::is_default"
137 )]
138 pub acknowledgements: AcknowledgementsConfig,
139}
140
141const fn default_socket_timeout_ms() -> Duration {
142 Duration::from_millis(60000) }
144
145const fn default_message_timeout_ms() -> Duration {
146 Duration::from_millis(300000) }
148
149const fn default_rate_limit_duration_secs() -> u64 {
150 1
151}
152
153const fn default_rate_limit_num() -> u64 {
154 i64::MAX as u64 }
156
157fn example_librdkafka_options() -> HashMap<String, String> {
158 HashMap::<_, _>::from_iter([
159 ("client.id".to_string(), "${ENV_VAR}".to_string()),
160 ("fetch.error.backoff.ms".to_string(), "1000".to_string()),
161 ("socket.send.buffer.bytes".to_string(), "100".to_string()),
162 ])
163}
164
165impl KafkaSinkConfig {
166 pub(crate) fn to_rdkafka(&self) -> crate::Result<ClientConfig> {
167 let mut client_config = ClientConfig::new();
168 client_config
169 .set("bootstrap.servers", &self.bootstrap_servers)
170 .set(
171 "socket.timeout.ms",
172 self.socket_timeout_ms.as_millis().to_string(),
173 )
174 .set("statistics.interval.ms", "1000");
175
176 self.auth.apply(&mut client_config)?;
177
178 client_config
180 .set("compression.codec", to_string(self.compression))
181 .set(
182 "message.timeout.ms",
183 self.message_timeout_ms.as_millis().to_string(),
184 );
185
186 if let Some(value) = self.batch.timeout_secs {
187 let key = "queue.buffering.max.ms";
193 if let Some(val) = self.librdkafka_options.get(key) {
194 return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{key}={value}`.\
195 The config already sets this as `librdkafka_options.queue.buffering.max.ms={val}`.\
196 Please delete one.").into());
197 }
198 debug!(
199 librdkafka_option = key,
200 batch_option = "timeout_secs",
201 value,
202 "Applying batch option as librdkafka option."
203 );
204 client_config.set(key, (value * 1000.0).round().to_string());
205 }
206 if let Some(value) = self.batch.max_events {
207 let key = "batch.num.messages";
211 if let Some(val) = self.librdkafka_options.get(key) {
212 return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{key}={value}`.\
213 The config already sets this as `librdkafka_options.batch.num.messages={val}`.\
214 Please delete one.").into());
215 }
216 debug!(
217 librdkafka_option = key,
218 batch_option = "max_events",
219 value,
220 "Applying batch option as librdkafka option."
221 );
222 client_config.set(key, value.to_string());
223 }
224 if let Some(value) = self.batch.max_bytes {
225 let key = "batch.size";
232 if let Some(val) = self.librdkafka_options.get(key) {
233 return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{key}={value}`.\
234 The config already sets this as `librdkafka_options.batch.size={val}`.\
235 Please delete one.").into());
236 }
237 debug!(
238 librdkafka_option = key,
239 batch_option = "max_bytes",
240 value,
241 "Applying batch option as librdkafka option."
242 );
243 client_config.set(key, value.to_string());
244 }
245
246 for (key, value) in self.librdkafka_options.iter() {
247 debug!(option = %key, value = %value, "Setting librdkafka option.");
248 client_config.set(key.as_str(), value.as_str());
249 }
250
251 Ok(client_config)
252 }
253}
254
255impl GenerateConfig for KafkaSinkConfig {
256 fn generate_config() -> toml::Value {
257 toml::Value::try_from(Self {
258 bootstrap_servers: "10.14.22.123:9092,10.14.23.332:9092".to_owned(),
259 topic: Template::try_from("topic-1234".to_owned()).unwrap(),
260 healthcheck_topic: None,
261 key_field: Some(ConfigTargetPath::try_from("user_id".to_owned()).unwrap()),
262 encoding: JsonSerializerConfig::default().into(),
263 batch: Default::default(),
264 compression: KafkaCompression::None,
265 auth: Default::default(),
266 socket_timeout_ms: default_socket_timeout_ms(),
267 message_timeout_ms: default_message_timeout_ms(),
268 rate_limit_duration_secs: default_rate_limit_duration_secs(),
269 rate_limit_num: default_rate_limit_num(),
270 librdkafka_options: Default::default(),
271 headers_key: None,
272 acknowledgements: Default::default(),
273 })
274 .unwrap()
275 }
276}
277
278#[async_trait::async_trait]
279#[typetag::serde(name = "kafka")]
280impl SinkConfig for KafkaSinkConfig {
281 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
282 let sink = KafkaSink::new(self.clone())?;
283 let hc = healthcheck(self.clone(), cx.healthcheck.clone()).boxed();
284 Ok((VectorSink::from_event_streamsink(sink), hc))
285 }
286
287 fn input(&self) -> Input {
288 let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
289
290 Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric))
291 .with_schema_requirement(requirements)
292 }
293
294 fn acknowledgements(&self) -> &AcknowledgementsConfig {
295 &self.acknowledgements
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[test]
304 fn generate_config() {
305 KafkaSinkConfig::generate_config();
306 }
307}