vector/sinks/kafka/
config.rs

1use std::{collections::HashMap, time::Duration};
2
3use futures::FutureExt;
4use rdkafka::ClientConfig;
5use serde_with::serde_as;
6use vector_lib::{
7    codecs::JsonSerializerConfig, configurable::configurable_component,
8    lookup::lookup_v2::ConfigTargetPath,
9};
10use vrl::value::Kind;
11
12use crate::{
13    kafka::{KafkaAuthConfig, KafkaCompression},
14    serde::json::to_string,
15    sinks::{
16        kafka::sink::{KafkaSink, healthcheck},
17        prelude::*,
18    },
19};
20
21/// Configuration for the `kafka` sink.
22#[serde_as]
23#[configurable_component(sink(
24    "kafka",
25    "Publish observability event data to Apache Kafka topics."
26))]
27#[derive(Clone, Debug)]
28#[serde(deny_unknown_fields)]
29pub struct KafkaSinkConfig {
30    /// A comma-separated list of Kafka bootstrap servers.
31    ///
32    /// These are the servers in a Kafka cluster that a client should use to bootstrap its
33    /// connection to the cluster, allowing discovery of all the other hosts in the cluster.
34    ///
35    /// Must be in the form of `host:port`, and comma-separated.
36    #[configurable(metadata(docs::examples = "10.14.22.123:9092,10.14.23.332:9092"))]
37    pub bootstrap_servers: String,
38
39    /// The Kafka topic name to write events to.
40    #[configurable(metadata(docs::templateable))]
41    #[configurable(metadata(
42        docs::examples = "topic-1234",
43        docs::examples = "logs-{{unit}}-%Y-%m-%d"
44    ))]
45    pub topic: Template,
46
47    /// The topic name to use for healthcheck. If omitted, `topic` is used.
48    /// This option helps prevent healthcheck warnings when `topic` is templated.
49    ///
50    /// It is ignored when healthcheck is disabled.
51    pub healthcheck_topic: Option<String>,
52
53    /// The log field name or tag key to use for the topic key.
54    ///
55    /// If the field does not exist in the log or in the tags, a blank value is used. If
56    /// unspecified, the key is not sent.
57    ///
58    /// Kafka uses a hash of the key to choose the partition or uses round-robin if the record has
59    /// no key.
60    #[configurable(metadata(docs::advanced))]
61    #[configurable(metadata(docs::examples = "user_id"))]
62    #[configurable(metadata(docs::examples = ".my_topic"))]
63    #[configurable(metadata(docs::examples = "%my_topic"))]
64    pub key_field: Option<ConfigTargetPath>,
65
66    #[configurable(derived)]
67    pub encoding: EncodingConfig,
68
69    // These batching options will **not** override librdkafka_options values.
70    #[configurable(derived)]
71    #[configurable(metadata(docs::advanced))]
72    #[serde(default)]
73    pub batch: BatchConfig<NoDefaultsBatchSettings>,
74
75    #[configurable(derived)]
76    #[configurable(metadata(docs::advanced))]
77    #[serde(default)]
78    pub compression: KafkaCompression,
79
80    #[configurable(derived)]
81    #[serde(flatten)]
82    pub auth: KafkaAuthConfig,
83
84    /// Default timeout, in milliseconds, for network requests.
85    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
86    #[serde(default = "default_socket_timeout_ms")]
87    #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))]
88    #[configurable(metadata(docs::advanced))]
89    #[configurable(metadata(docs::human_name = "Socket Timeout"))]
90    pub socket_timeout_ms: Duration,
91
92    /// Local message timeout, in milliseconds.
93    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
94    #[configurable(metadata(docs::examples = 150000, docs::examples = 450000))]
95    #[serde(default = "default_message_timeout_ms")]
96    #[configurable(metadata(docs::human_name = "Message Timeout"))]
97    #[configurable(metadata(docs::advanced))]
98    pub message_timeout_ms: Duration,
99
100    /// The time window used for the `rate_limit_num` option.
101    #[configurable(metadata(docs::type_unit = "seconds"))]
102    #[configurable(metadata(docs::human_name = "Rate Limit Duration"))]
103    #[serde(default = "default_rate_limit_duration_secs")]
104    pub rate_limit_duration_secs: u64,
105
106    /// The maximum number of requests allowed within the `rate_limit_duration_secs` time window.
107    #[configurable(metadata(docs::type_unit = "requests"))]
108    #[configurable(metadata(docs::human_name = "Rate Limit Number"))]
109    #[serde(default = "default_rate_limit_num")]
110    pub rate_limit_num: u64,
111
112    /// A map of advanced options to pass directly to the underlying `librdkafka` client.
113    ///
114    /// For more information on configuration options, see [Configuration properties][config_props_docs].
115    ///
116    /// [config_props_docs]: https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
117    #[serde(default)]
118    #[configurable(metadata(docs::examples = "example_librdkafka_options()"))]
119    #[configurable(metadata(docs::advanced))]
120    #[configurable(metadata(
121        docs::additional_props_description = "A librdkafka configuration option."
122    ))]
123    pub librdkafka_options: HashMap<String, String>,
124
125    /// The log field name to use for the Kafka headers.
126    ///
127    /// If omitted, no headers are written.
128    #[configurable(metadata(docs::advanced))]
129    #[serde(alias = "headers_field")] // accidentally released as `headers_field` in 0.18
130    #[configurable(metadata(docs::examples = "headers"))]
131    pub headers_key: Option<ConfigTargetPath>,
132
133    #[configurable(derived)]
134    #[serde(
135        default,
136        deserialize_with = "crate::serde::bool_or_struct",
137        skip_serializing_if = "crate::serde::is_default"
138    )]
139    pub acknowledgements: AcknowledgementsConfig,
140}
141
142const fn default_socket_timeout_ms() -> Duration {
143    Duration::from_millis(60000) // default in librdkafka
144}
145
146const fn default_message_timeout_ms() -> Duration {
147    Duration::from_millis(300000) // default in librdkafka
148}
149
150const fn default_rate_limit_duration_secs() -> u64 {
151    1
152}
153
154const fn default_rate_limit_num() -> u64 {
155    i64::MAX as u64 // i64 avoids TOML deserialize issue
156}
157
158fn example_librdkafka_options() -> HashMap<String, String> {
159    HashMap::<_, _>::from_iter([
160        ("client.id".to_string(), "${ENV_VAR}".to_string()),
161        ("fetch.error.backoff.ms".to_string(), "1000".to_string()),
162        ("socket.send.buffer.bytes".to_string(), "100".to_string()),
163    ])
164}
165
166impl KafkaSinkConfig {
167    pub(crate) fn to_rdkafka(&self) -> crate::Result<ClientConfig> {
168        let mut client_config = ClientConfig::new();
169        client_config
170            .set("bootstrap.servers", &self.bootstrap_servers)
171            .set(
172                "socket.timeout.ms",
173                self.socket_timeout_ms.as_millis().to_string(),
174            )
175            .set("statistics.interval.ms", "1000");
176
177        self.auth.apply(&mut client_config)?;
178
179        // All batch options are producer only.
180        client_config
181            .set("compression.codec", to_string(self.compression))
182            .set(
183                "message.timeout.ms",
184                self.message_timeout_ms.as_millis().to_string(),
185            );
186
187        if let Some(value) = self.batch.timeout_secs {
188            // Delay in milliseconds to wait for messages in the producer queue to accumulate before
189            // constructing message batches (MessageSets) to transmit to brokers. A higher value
190            // allows larger and more effective (less overhead, improved compression) batches of
191            // messages to accumulate at the expense of increased message delivery latency.
192            // Type: float
193            let key = "queue.buffering.max.ms";
194            if let Some(val) = self.librdkafka_options.get(key) {
195                return Err(format!("Batching setting `batch.timeout_secs` sets `librdkafka_options.{key}={value}`.\
196                                    The config already sets this as `librdkafka_options.queue.buffering.max.ms={val}`.\
197                                    Please delete one.").into());
198            }
199            debug!(
200                librdkafka_option = key,
201                batch_option = "timeout_secs",
202                value,
203                "Applying batch option as librdkafka option."
204            );
205            client_config.set(key, (value * 1000.0).round().to_string());
206        }
207        if let Some(value) = self.batch.max_events {
208            // Maximum number of messages batched in one MessageSet. The total MessageSet size is
209            // also limited by batch.size and message.max.bytes.
210            // Type: integer
211            let key = "batch.num.messages";
212            if let Some(val) = self.librdkafka_options.get(key) {
213                return Err(format!("Batching setting `batch.max_events` sets `librdkafka_options.{key}={value}`.\
214                                    The config already sets this as `librdkafka_options.batch.num.messages={val}`.\
215                                    Please delete one.").into());
216            }
217            debug!(
218                librdkafka_option = key,
219                batch_option = "max_events",
220                value,
221                "Applying batch option as librdkafka option."
222            );
223            client_config.set(key, value.to_string());
224        }
225        if let Some(value) = self.batch.max_bytes {
226            // Maximum size (in bytes) of all messages batched in one MessageSet, including protocol
227            // framing overhead. This limit is applied after the first message has been added to the
228            // batch, regardless of the first message's size, this is to ensure that messages that
229            // exceed batch.size are produced. The total MessageSet size is also limited by
230            // batch.num.messages and message.max.bytes.
231            // Type: integer
232            let key = "batch.size";
233            if let Some(val) = self.librdkafka_options.get(key) {
234                return Err(format!("Batching setting `batch.max_bytes` sets `librdkafka_options.{key}={value}`.\
235                                    The config already sets this as `librdkafka_options.batch.size={val}`.\
236                                    Please delete one.").into());
237            }
238            debug!(
239                librdkafka_option = key,
240                batch_option = "max_bytes",
241                value,
242                "Applying batch option as librdkafka option."
243            );
244            client_config.set(key, value.to_string());
245        }
246
247        for (key, value) in self.librdkafka_options.iter() {
248            debug!(option = %key, value = %value, "Setting librdkafka option.");
249            client_config.set(key.as_str(), value.as_str());
250        }
251
252        Ok(client_config)
253    }
254}
255
256impl GenerateConfig for KafkaSinkConfig {
257    fn generate_config() -> toml::Value {
258        toml::Value::try_from(Self {
259            bootstrap_servers: "10.14.22.123:9092,10.14.23.332:9092".to_owned(),
260            topic: Template::try_from("topic-1234".to_owned()).unwrap(),
261            healthcheck_topic: None,
262            key_field: Some(ConfigTargetPath::try_from("user_id".to_owned()).unwrap()),
263            encoding: JsonSerializerConfig::default().into(),
264            batch: Default::default(),
265            compression: KafkaCompression::None,
266            auth: Default::default(),
267            socket_timeout_ms: default_socket_timeout_ms(),
268            message_timeout_ms: default_message_timeout_ms(),
269            rate_limit_duration_secs: default_rate_limit_duration_secs(),
270            rate_limit_num: default_rate_limit_num(),
271            librdkafka_options: Default::default(),
272            headers_key: None,
273            acknowledgements: Default::default(),
274        })
275        .unwrap()
276    }
277}
278
279#[async_trait::async_trait]
280#[typetag::serde(name = "kafka")]
281impl SinkConfig for KafkaSinkConfig {
282    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
283        let sink = KafkaSink::new(self.clone())?;
284        let hc = healthcheck(self.clone(), cx.healthcheck.clone()).boxed();
285        Ok((VectorSink::from_event_streamsink(sink), hc))
286    }
287
288    fn input(&self) -> Input {
289        let requirements = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
290
291        Input::new(self.encoding.config().input_type() & (DataType::Log | DataType::Metric))
292            .with_schema_requirement(requirements)
293    }
294
295    fn acknowledgements(&self) -> &AcknowledgementsConfig {
296        &self.acknowledgements
297    }
298}
299
300#[cfg(test)]
301mod tests {
302    use super::*;
303
304    #[test]
305    fn generate_config() {
306        KafkaSinkConfig::generate_config();
307    }
308}