vector/sinks/kafka/
config.rs

1use std::{collections::HashMap, time::Duration};
2
3use futures::FutureExt;
4use rdkafka::ClientConfig;
5use serde_with::serde_as;
6use vector_lib::codecs::JsonSerializerConfig;
7use vector_lib::configurable::configurable_component;
8use vector_lib::lookup::lookup_v2::ConfigTargetPath;
9use vrl::value::Kind;
10
11use crate::{
12    kafka::{KafkaAuthConfig, KafkaCompression},
13    serde::json::to_string,
14    sinks::{
15        kafka::sink::{healthcheck, KafkaSink},
16        prelude::*,
17    },
18};
19
20/// Configuration for the `kafka` sink.
21#[serde_as]
22#[configurable_component(sink(
23    "kafka",
24    "Publish observability event data to Apache Kafka topics."
25))]
26#[derive(Clone, Debug)]
27#[serde(deny_unknown_fields)]
28pub struct KafkaSinkConfig {
29    /// A comma-separated list of Kafka bootstrap servers.
30    ///
31    /// These are the servers in a Kafka cluster that a client should use to bootstrap its
32    /// connection to the cluster, allowing discovery of all the other hosts in the cluster.
33    ///
34    /// Must be in the form of `host:port`, and comma-separated.
35    #[configurable(metadata(docs::examples = "10.14.22.123:9092,10.14.23.332:9092"))]
36    pub bootstrap_servers: String,
37
38    /// The Kafka topic name to write events to.
39    #[configurable(metadata(docs::templateable))]
40    #[configurable(metadata(
41        docs::examples = "topic-1234",
42        docs::examples = "logs-{{unit}}-%Y-%m-%d"
43    ))]
44    pub topic: Template,
45
46    /// The topic name to use for healthcheck. If omitted, `topic` is used.
47    /// This option helps prevent healthcheck warnings when `topic` is templated.
48    ///
49    /// It is ignored when healthcheck is disabled.
50    pub healthcheck_topic: Option<String>,
51
52    /// The log field name or tag key to use for the topic key.
53    ///
54    /// If the field does not exist in the log or in the tags, a blank value is used. If
55    /// unspecified, the key is not sent.
56    ///
57    /// Kafka uses a hash of the key to choose the partition or uses round-robin if the record has
58    /// no key.
59    #[configurable(metadata(docs::advanced))]
60    #[configurable(metadata(docs::examples = "user_id"))]
61    #[configurable(metadata(docs::examples = ".my_topic"))]
62    #[configurable(metadata(docs::examples = "%my_topic"))]
63    pub key_field: Option<ConfigTargetPath>,
64
65    #[configurable(derived)]
66    pub encoding: EncodingConfig,
67
68    // These batching options will **not** override librdkafka_options values.
69    #[configurable(derived)]
70    #[configurable(metadata(docs::advanced))]
71    #[serde(default)]
72    pub batch: BatchConfig<NoDefaultsBatchSettings>,
73
74    #[configurable(derived)]
75    #[configurable(metadata(docs::advanced))]
76    #[serde(default)]
77    pub compression: KafkaCompression,
78
79    #[configurable(derived)]
80    #[serde(flatten)]
81    pub auth: KafkaAuthConfig,
82
83    /// Default timeout, in milliseconds, for network requests.
84    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
85    #[serde(default = "default_socket_timeout_ms")]
86    #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))]
87    #[configurable(metadata(docs::advanced))]
88    #[configurable(metadata(docs::human_name = "Socket Timeout"))]
89    pub socket_timeout_ms: Duration,
90
91    /// Local message timeout, in milliseconds.
92    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
93    #[configurable(metadata(docs::examples = 150000, docs::examples = 450000))]
94    #[serde(default = "default_message_timeout_ms")]
95    #[configurable(metadata(docs::human_name = "Message Timeout"))]
96    #[configurable(metadata(docs::advanced))]
97    pub message_timeout_ms: Duration,
98
99    /// The time window used for the `rate_limit_num` option.
100    #[configurable(metadata(docs::type_unit = "seconds"))]
101    #[configurable(metadata(docs::human_name = "Rate Limit Duration"))]
102    #[serde(default = "default_rate_limit_duration_secs")]
103    pub rate_limit_duration_secs: u64,
104
105    /// The maximum number of requests allowed within the `rate_limit_duration_secs` time window.
106    #[configurable(metadata(docs::type_unit = "requests"))]
107    #[configurable(metadata(docs::human_name = "Rate Limit Number"))]
108    #[serde(default = "default_rate_limit_num")]
109    pub rate_limit_num: u64,
110
111    /// A map of advanced options to pass directly to the underlying `librdkafka` client.
112    ///
113    /// For more information on configuration options, see [Configuration properties][config_props_docs].
114    ///
115    /// [config_props_docs]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
116    #[serde(default)]
117    #[configurable(metadata(docs::examples = "example_librdkafka_options()"))]
118    #[configurable(metadata(docs::advanced))]
119    #[configurable(metadata(
120        docs::additional_props_description = "A librdkafka configuration option."
121    ))]
122    pub librdkafka_options: HashMap<String, String>,
123
124    /// The log field name to use for the Kafka headers.
125    ///
126    /// If omitted, no headers are written.
127    #[configurable(metadata(docs::advanced))]
128    #[serde(alias = "headers_field")] // accidentally released as `headers_field` in 0.18
129    #[configurable(metadata(docs::examples = "headers"))]
130    pub headers_key: Option<ConfigTargetPath>,
131
132    #[configurable(derived)]
133    #[serde(
134        default,
135        deserialize_with = "crate::serde::bool_or_struct",
136        skip_serializing_if = "crate::serde::is_default"
137    )]
138    pub acknowledgements: AcknowledgementsConfig,
139}
140
141const fn default_socket_timeout_ms() -> Duration {
142    Duration::from_millis(60000) // default in librdkafka
143}
144
145const fn default_message_timeout_ms() -> Duration {
146    Duration::from_millis(300000) // default in librdkafka
147}
148
149const fn default_rate_limit_duration_secs() -> u64 {
150    1
151}
152
153const fn default_rate_limit_num() -> u64 {
154    i64::MAX as u64 // i64 avoids TOML deserialize issue
155}
156
157fn example_librdkafka_options() -> HashMap<String, String> {
158    HashMap::<_, _>::from_iter([
159        ("client.id".to_string(), "${ENV_VAR}".to_string()),
160        ("fetch.error.backoff.ms".to_string(), "1000".to_string()),
161        ("socket.send.buffer.bytes".to_string(), "100".to_string()),
162    ])
163}
164
165impl KafkaSinkConfig {
166    pub(crate) fn to_rdkafka(&self) -> crate::Result<ClientConfig> {
167        let mut client_config = ClientConfig::new();
168        client_config
169            .set("bootstrap.servers", &self.bootstrap_servers)
170            .set(
171                "socket.timeout.ms",
172                self.socket_timeout_ms.as_millis().to_string(),
173            )
174            .set("statistics.interval.ms", "1000");
175
176        self.auth.apply(&mut client_config)?;
177
178        // All batch options are producer only.
179        client_config
180            .set("compression.codec", to_string(self.compression))
181            .set(
182                "message.timeout.ms",
183                self.message_timeout_ms.as_millis().to_string(),
184            );
185
186        if let Some(value) = self.batch.timeout_secs {
187            // Delay in milliseconds to wait for messages in the producer queue to accumulate before
188            // constructing message batches (MessageSets) to transmit to brokers. A higher value
189            // allows larger and more effective (less overhead, improved compression) batches of
190            // messages to accumulate at the expense of increased message delivery latency.
191            // Type: float
192            let key = "queue.buffering.max.ms";
193            if let Some(val) = self.librdkafka_options.get(key) {
194                return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{key}={value}`.\
195                                    The config already sets this as `librdkafka_options.queue.buffering.max.ms={val}`.\
196                                    Please delete one.").into());
197            }
198            debug!(
199                librdkafka_option = key,
200                batch_option = "timeout_secs",
201                value,
202                "Applying batch option as librdkafka option."
203            );
204            client_config.set(key, (value * 1000.0).round().to_string());
205        }
206        if let Some(value) = self.batch.max_events {
207            // Maximum number of messages batched in one MessageSet. The total MessageSet size is
208            // also limited by batch.size and message.max.bytes.
209            // Type: integer
210            let key = "batch.num.messages";
211            if let Some(val) = self.librdkafka_options.get(key) {
212                return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{key}={value}`.\
213                                    The config already sets this as `librdkafka_options.batch.num.messages={val}`.\
214                                    Please delete one.").into());
215            }
216            debug!(
217                librdkafka_option = key,
218                batch_option = "max_events",
219                value,
220                "Applying batch option as librdkafka option."
221            );
222            client_config.set(key, value.to_string());
223        }
224        if let Some(value) = self.batch.max_bytes {
225            // Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
226            // framing overhead. This limit is applied after the first message has been added to the
227            // batch, regardless of the first message's size, this is to ensure that messages that
228            // exceed batch.size are produced. The total MessageSet size is also limited by
229            // batch.num.messages and message.max.bytes.
230            // Type: integer
231            let key = "batch.size";
232            if let Some(val) = self.librdkafka_options.get(key) {
233                return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{key}={value}`.\
234                                    The config already sets this as `librdkafka_options.batch.size={val}`.\
235                                    Please delete one.").into());
236            }
237            debug!(
238                librdkafka_option = key,
239                batch_option = "max_bytes",
240                value,
241                "Applying batch option as librdkafka option."
242            );
243            client_config.set(key, value.to_string());
244        }
245
246        for (key, value) in self.librdkafka_options.iter() {
247            debug!(option = %key, value = %value, "Setting librdkafka option.");
248            client_config.set(key.as_str(), value.as_str());
249        }
250
251        Ok(client_config)
252    }
253}
254
255impl GenerateConfig for KafkaSinkConfig {
256    fn generate_config() -> toml::Value {
257        toml::Value::try_from(Self {
258            bootstrap_servers: "10.14.22.123:9092,10.14.23.332:9092".to_owned(),
259            topic: Template::try_from("topic-1234".to_owned()).unwrap(),
260            healthcheck_topic: None,
261            key_field: Some(ConfigTargetPath::try_from("user_id".to_owned()).unwrap()),
262            encoding: JsonSerializerConfig::default().into(),
263            batch: Default::default(),
264            compression: KafkaCompression::None,
265            auth: Default::default(),
266            socket_timeout_ms: default_socket_timeout_ms(),
267            message_timeout_ms: default_message_timeout_ms(),
268            rate_limit_duration_secs: default_rate_limit_duration_secs(),
269            rate_limit_num: default_rate_limit_num(),
270            librdkafka_options: Default::default(),
271            headers_key: None,
272            acknowledgements: Default::default(),
273        })
274        .unwrap()
275    }
276}
277
278#[async_trait::async_trait]
279#[typetag::serde(name = "kafka")]
280impl SinkConfig for KafkaSinkConfig {
281    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
282        let sink = KafkaSink::new(self.clone())?;
283        let hc = healthcheck(self.clone(), cx.healthcheck.clone()).boxed();
284        Ok((VectorSink::from_event_streamsink(sink), hc))
285    }
286
287    fn input(&self) -> Input {
288        let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
289
290        Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric))
291            .with_schema_requirement(requirements)
292    }
293
294    fn acknowledgements(&self) -> &AcknowledgementsConfig {
295        &self.acknowledgements
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302
303    #[test]
304    fn generate_config() {
305        KafkaSinkConfig::generate_config();
306    }
307}