vector/sinks/kafka/
tests.rs

1#![allow(clippy::print_stdout)] // tests
2
3#[cfg(feature = "kafka-integration-tests")]
4#[cfg(test)]
5mod integration_test {
6    use std::{collections::HashMap, future::ready, thread, time::Duration};
7
8    use bytes::Bytes;
9    use futures::{StreamExt, stream};
10    use rdkafka::{
11        Message, Offset, TopicPartitionList,
12        consumer::{BaseConsumer, Consumer},
13        message::Headers,
14    };
15    use vector_lib::{
16        codecs::{JsonSerializerConfig, TextSerializerConfig},
17        config::{Tags, Telemetry, init_telemetry},
18        event::{BatchNotifier, BatchStatus},
19        lookup::lookup_v2::ConfigTargetPath,
20    };
21
22    use super::super::{config::KafkaSinkConfig, sink::KafkaSink, *};
23    use crate::{
24        event::{ObjectMap, TraceEvent, Value},
25        kafka::{KafkaAuthConfig, KafkaCompression, KafkaSaslConfig},
26        sinks::prelude::*,
27        test_util::{
28            components::{
29                DATA_VOLUME_SINK_TAGS, SINK_TAGS, assert_data_volume_sink_compliance,
30                assert_sink_compliance,
31            },
32            map_event_batch_stream, random_lines_with_stream, random_string, wait_for,
33        },
34        tls::{TEST_PEM_INTERMEDIATE_CA_PATH, TlsConfig, TlsEnableableConfig},
35    };
36
37    fn kafka_host() -> String {
38        std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
39    }
40
41    fn kafka_address(port: u16) -> String {
42        format!("{}:{}", kafka_host(), port)
43    }
44
45    #[tokio::test]
46    async fn healthcheck() {
47        crate::test_util::trace_init();
48
49        let topic = format!("test-{}", random_string(10));
50
51        let config = KafkaSinkConfig {
52            bootstrap_servers: kafka_address(9091),
53            topic: Template::try_from(topic.clone()).unwrap(),
54            healthcheck_topic: None,
55            key_field: None,
56            encoding: TextSerializerConfig::default().into(),
57            batch: BatchConfig::default(),
58            compression: KafkaCompression::None,
59            auth: KafkaAuthConfig::default(),
60            socket_timeout_ms: Duration::from_millis(60000),
61            message_timeout_ms: Duration::from_millis(300000),
62            rate_limit_duration_secs: 1,
63            rate_limit_num: i64::MAX as u64,
64            librdkafka_options: HashMap::new(),
65            headers_key: None,
66            acknowledgements: Default::default(),
67        };
68        self::sink::healthcheck(config, Default::default())
69            .await
70            .unwrap();
71    }
72
73    #[tokio::test]
74    async fn healthcheck_topic() {
75        crate::test_util::trace_init();
76
77        let topic = format!("{{ {} }}", random_string(10));
78
79        let config = KafkaSinkConfig {
80            bootstrap_servers: kafka_address(9091),
81            topic: Template::try_from(topic.clone()).unwrap(),
82            healthcheck_topic: Some(String::from("topic-1234")),
83            key_field: None,
84            encoding: TextSerializerConfig::default().into(),
85            batch: BatchConfig::default(),
86            compression: KafkaCompression::None,
87            auth: KafkaAuthConfig::default(),
88            socket_timeout_ms: Duration::from_millis(60000),
89            message_timeout_ms: Duration::from_millis(300000),
90            rate_limit_duration_secs: 1,
91            rate_limit_num: i64::MAX as u64,
92            librdkafka_options: HashMap::new(),
93            headers_key: None,
94            acknowledgements: Default::default(),
95        };
96        self::sink::healthcheck(config, Default::default())
97            .await
98            .unwrap();
99    }
100
101    #[tokio::test]
102    async fn kafka_happy_path_plaintext() {
103        crate::test_util::trace_init();
104        kafka_happy_path(
105            kafka_address(9091),
106            None,
107            None,
108            KafkaCompression::None,
109            true,
110        )
111        .await;
112        kafka_happy_path(
113            kafka_address(9091),
114            None,
115            None,
116            KafkaCompression::None,
117            false,
118        )
119        .await;
120    }
121
122    #[tokio::test]
123    async fn kafka_happy_path_gzip() {
124        crate::test_util::trace_init();
125        kafka_happy_path(
126            kafka_address(9091),
127            None,
128            None,
129            KafkaCompression::Gzip,
130            false,
131        )
132        .await;
133    }
134
135    #[tokio::test]
136    async fn kafka_happy_path_lz4() {
137        crate::test_util::trace_init();
138        kafka_happy_path(
139            kafka_address(9091),
140            None,
141            None,
142            KafkaCompression::Lz4,
143            false,
144        )
145        .await;
146    }
147
148    #[tokio::test]
149    async fn kafka_happy_path_snappy() {
150        crate::test_util::trace_init();
151        kafka_happy_path(
152            kafka_address(9091),
153            None,
154            None,
155            KafkaCompression::Snappy,
156            false,
157        )
158        .await;
159    }
160
161    #[tokio::test]
162    async fn kafka_happy_path_zstd() {
163        crate::test_util::trace_init();
164        kafka_happy_path(
165            kafka_address(9091),
166            None,
167            None,
168            KafkaCompression::Zstd,
169            false,
170        )
171        .await;
172    }
173
174    async fn kafka_batch_options_overrides(
175        batch: BatchConfig<NoDefaultsBatchSettings>,
176        librdkafka_options: HashMap<String, String>,
177    ) -> crate::Result<KafkaSink> {
178        let topic = format!("test-{}", random_string(10));
179        let config = KafkaSinkConfig {
180            bootstrap_servers: kafka_address(9091),
181            topic: Template::try_from(format!("{topic}-%Y%m%d")).unwrap(),
182            compression: KafkaCompression::None,
183            healthcheck_topic: None,
184            encoding: TextSerializerConfig::default().into(),
185            key_field: None,
186            auth: KafkaAuthConfig {
187                sasl: None,
188                tls: None,
189            },
190            socket_timeout_ms: Duration::from_millis(60000),
191            message_timeout_ms: Duration::from_millis(300000),
192            rate_limit_duration_secs: 1,
193            rate_limit_num: i64::MAX as u64,
194            batch,
195            librdkafka_options,
196            headers_key: None,
197            acknowledgements: Default::default(),
198        };
199        config.clone().to_rdkafka()?;
200        self::sink::healthcheck(config.clone(), Default::default()).await?;
201        KafkaSink::new(config)
202    }
203
204    #[tokio::test]
205    async fn kafka_batch_options_max_bytes_errors_on_double_set() {
206        crate::test_util::trace_init();
207        let mut batch = BatchConfig::default();
208        batch.max_bytes = Some(1000);
209
210        assert!(
211            kafka_batch_options_overrides(
212                batch,
213                indexmap::indexmap! {
214                    "batch.size".to_string() => 1.to_string(),
215                }
216                .into_iter()
217                .collect()
218            )
219            .await
220            .is_err()
221        )
222    }
223
224    #[tokio::test]
225    async fn kafka_batch_options_actually_sets() {
226        crate::test_util::trace_init();
227        let mut batch = BatchConfig::default();
228        batch.max_events = Some(10);
229        batch.timeout_secs = Some(2.0);
230
231        kafka_batch_options_overrides(batch, indexmap::indexmap! {}.into_iter().collect())
232            .await
233            .unwrap();
234    }
235
236    #[tokio::test]
237    async fn kafka_batch_options_max_events_errors_on_double_set() {
238        crate::test_util::trace_init();
239        let mut batch = BatchConfig::default();
240        batch.max_events = Some(10);
241
242        assert!(
243            kafka_batch_options_overrides(
244                batch,
245                indexmap::indexmap! {
246                    "batch.num.messages".to_string() => 1.to_string(),
247                }
248                .into_iter()
249                .collect()
250            )
251            .await
252            .is_err()
253        )
254    }
255
256    #[tokio::test]
257    async fn kafka_batch_options_timeout_secs_errors_on_double_set() {
258        crate::test_util::trace_init();
259        let mut batch = BatchConfig::default();
260        batch.timeout_secs = Some(10.0);
261
262        assert!(
263            kafka_batch_options_overrides(
264                batch,
265                indexmap::indexmap! {
266                    "queue.buffering.max.ms".to_string() => 1.to_string(),
267                }
268                .into_iter()
269                .collect()
270            )
271            .await
272            .is_err()
273        )
274    }
275
276    #[tokio::test]
277    async fn kafka_happy_path_tls() {
278        crate::test_util::trace_init();
279        let mut options = TlsConfig::test_config();
280        // couldn't get Kafka to load and return a certificate chain, it only returns the leaf
281        // certificate
282        options.ca_file = Some(TEST_PEM_INTERMEDIATE_CA_PATH.into());
283        kafka_happy_path(
284            kafka_address(9092),
285            None,
286            Some(TlsEnableableConfig {
287                enabled: Some(true),
288                options: TlsConfig::test_config(),
289            }),
290            KafkaCompression::None,
291            false,
292        )
293        .await;
294    }
295
296    #[tokio::test]
297    async fn kafka_happy_path_sasl() {
298        crate::test_util::trace_init();
299        kafka_happy_path(
300            kafka_address(9093),
301            Some(KafkaSaslConfig {
302                enabled: Some(true),
303                username: Some("admin".to_string()),
304                password: Some("admin".to_string().into()),
305                mechanism: Some("PLAIN".to_owned()),
306            }),
307            None,
308            KafkaCompression::None,
309            false,
310        )
311        .await;
312    }
313
314    #[tokio::test]
315    async fn kafka_happy_path_trace_events() {
316        crate::test_util::trace_init();
317
318        assert_sink_compliance(&SINK_TAGS, async move {
319            let topic_prefix = format!("test-trace-{}", random_string(10));
320            let topic = format!("{}-{}", topic_prefix, chrono::Utc::now().format("%Y%m%d"));
321            let key_field = ConfigTargetPath::try_from("trace_key".to_string()).unwrap();
322            let headers_key = ConfigTargetPath::try_from("trace_headers".to_string()).unwrap();
323            let trace_key = "trace-partition-key";
324            let header_key = "trace-header-key";
325            let header_value = "trace-header-value";
326
327            let config = KafkaSinkConfig {
328                bootstrap_servers: kafka_address(9091),
329                topic: Template::try_from(format!("{topic_prefix}-%Y%m%d")).unwrap(),
330                healthcheck_topic: None,
331                key_field: Some(key_field.clone()),
332                encoding: JsonSerializerConfig::default().into(),
333                batch: BatchConfig::default(),
334                compression: KafkaCompression::None,
335                auth: KafkaAuthConfig::default(),
336                socket_timeout_ms: Duration::from_millis(60000),
337                message_timeout_ms: Duration::from_millis(300000),
338                rate_limit_duration_secs: 1,
339                rate_limit_num: i64::MAX as u64,
340                librdkafka_options: HashMap::new(),
341                headers_key: Some(headers_key.clone()),
342                acknowledgements: Default::default(),
343            };
344
345            let num_events = 100;
346            let mut expected_messages = Vec::with_capacity(num_events);
347
348            let (batch, receiver) = BatchNotifier::new_with_receiver();
349            let mut events = Vec::with_capacity(num_events);
350            for i in 0..num_events {
351                let message = format!("trace-message-{i}");
352                expected_messages.push(message.clone());
353
354                let mut trace = TraceEvent::default();
355                trace.insert("message", message);
356                trace.insert("trace_key", trace_key);
357                trace.insert("timestamp", chrono::Utc::now());
358
359                let mut trace_headers = ObjectMap::new();
360                trace_headers.insert(header_key.into(), Value::Bytes(Bytes::from(header_value)));
361                trace.insert(&headers_key, trace_headers);
362
363                events.push(Event::Trace(trace.with_batch_notifier(&batch)));
364            }
365
366            let sink = KafkaSink::new(config).unwrap();
367            let sink = VectorSink::from_event_streamsink(sink);
368            let stream = map_event_batch_stream(stream::iter(events), Some(batch));
369            sink.run(stream).await.unwrap();
370            assert_eq!(receiver.await, BatchStatus::Delivered);
371
372            // Read back everything from the beginning.
373            let mut client_config = rdkafka::ClientConfig::new();
374            client_config.set("bootstrap.servers", kafka_address(9091).as_str());
375            client_config.set("group.id", random_string(10));
376            client_config.set("enable.partition.eof", "true");
377            let mut tpl = TopicPartitionList::new();
378            tpl.add_partition(&topic, 0)
379                .set_offset(Offset::Beginning)
380                .unwrap();
381            let consumer: BaseConsumer = client_config.create().unwrap();
382            consumer.assign(&tpl).unwrap();
383
384            wait_for(
385                || match consumer.fetch_watermarks(&topic, 0, Duration::from_secs(3)) {
386                    Ok((_low, high)) => ready(high >= num_events as i64),
387                    Err(err) => {
388                        println!("retrying due to error fetching watermarks: {err}");
389                        ready(false)
390                    }
391                },
392            )
393            .await;
394
395            let (low, high) = consumer
396                .fetch_watermarks(&topic, 0, Duration::from_secs(3))
397                .unwrap();
398            assert_eq!((0, num_events as i64), (low, high));
399
400            let mut failures = 0;
401            let mut observed_messages = Vec::new();
402            while failures < 100 {
403                match consumer.poll(Duration::from_secs(3)) {
404                    Some(Ok(msg)) => {
405                        let payload: &str = msg.payload_view().unwrap().unwrap();
406                        let payload_json: serde_json::Value =
407                            serde_json::from_str(payload).unwrap();
408                        observed_messages
409                            .push(payload_json["message"].as_str().unwrap().to_owned());
410
411                        let key = msg.key().unwrap();
412                        assert_eq!(key, trace_key.as_bytes());
413
414                        let timestamp = msg.timestamp().to_millis();
415                        assert!(timestamp.is_some());
416
417                        let header = msg.headers().unwrap().get(0);
418                        assert_eq!(header.key, header_key);
419                        assert_eq!(header.value.unwrap(), header_value.as_bytes());
420                    }
421                    None if observed_messages.len() >= num_events => break,
422                    _ => {
423                        failures += 1;
424                        thread::sleep(Duration::from_millis(50));
425                    }
426                }
427            }
428
429            assert_eq!(observed_messages.len(), num_events);
430            assert_eq!(observed_messages, expected_messages);
431        })
432        .await;
433    }
434
435    async fn kafka_happy_path(
436        server: String,
437        sasl: Option<KafkaSaslConfig>,
438        tls: Option<TlsEnableableConfig>,
439        compression: KafkaCompression,
440        test_telemetry_tags: bool,
441    ) {
442        if test_telemetry_tags {
443            // We need to configure Vector to emit the service and source tags.
444            // The default is to not emit these.
445            init_telemetry(
446                Telemetry {
447                    tags: Tags {
448                        emit_service: true,
449                        emit_source: true,
450                    },
451                },
452                true,
453            );
454        }
455
456        let topic = format!("test-{}", random_string(10));
457        let headers_key = ConfigTargetPath::try_from("headers_key".to_string()).unwrap();
458        let kafka_auth = KafkaAuthConfig { sasl, tls };
459        let config = KafkaSinkConfig {
460            bootstrap_servers: server.clone(),
461            topic: Template::try_from(format!("{topic}-%Y%m%d")).unwrap(),
462            healthcheck_topic: None,
463            key_field: None,
464            encoding: TextSerializerConfig::default().into(),
465            batch: BatchConfig::default(),
466            compression,
467            auth: kafka_auth.clone(),
468            socket_timeout_ms: Duration::from_millis(60000),
469            message_timeout_ms: Duration::from_millis(300000),
470            rate_limit_duration_secs: 1,
471            rate_limit_num: i64::MAX as u64,
472            librdkafka_options: HashMap::new(),
473            headers_key: Some(headers_key.clone()),
474            acknowledgements: Default::default(),
475        };
476        let topic = format!("{}-{}", topic, chrono::Utc::now().format("%Y%m%d"));
477        println!("Topic name generated in test: {topic:?}");
478
479        let num_events = 1000;
480        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
481        let (input, events) = random_lines_with_stream(100, num_events, Some(batch));
482
483        let header_1_key = "header-1-key";
484        let header_1_value = "header-1-value";
485        let input_events = events.map(move |mut events| {
486            let headers_key = headers_key.clone();
487            let mut header_values = ObjectMap::new();
488            header_values.insert(
489                header_1_key.into(),
490                Value::Bytes(Bytes::from(header_1_value)),
491            );
492            events.iter_logs_mut().for_each(move |log| {
493                log.insert(&headers_key, header_values.clone());
494            });
495            events
496        });
497
498        if test_telemetry_tags {
499            assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async move {
500                let sink = KafkaSink::new(config).unwrap();
501                let sink = VectorSink::from_event_streamsink(sink);
502                sink.run(input_events).await
503            })
504            .await
505            .expect("Running sink failed");
506        } else {
507            assert_sink_compliance(&SINK_TAGS, async move {
508                let sink = KafkaSink::new(config).unwrap();
509                let sink = VectorSink::from_event_streamsink(sink);
510                sink.run(input_events).await
511            })
512            .await
513            .expect("Running sink failed");
514        }
515        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
516
517        // read back everything from the beginning
518        let mut client_config = rdkafka::ClientConfig::new();
519        client_config.set("bootstrap.servers", server.as_str());
520        client_config.set("group.id", random_string(10));
521        client_config.set("enable.partition.eof", "true");
522        kafka_auth.apply(&mut client_config).unwrap();
523
524        let mut tpl = TopicPartitionList::new();
525        tpl.add_partition(&topic, 0)
526            .set_offset(Offset::Beginning)
527            .unwrap();
528
529        let consumer: BaseConsumer = client_config.create().unwrap();
530        consumer.assign(&tpl).unwrap();
531
532        // wait for messages to show up
533        wait_for(
534            || match consumer.fetch_watermarks(&topic, 0, Duration::from_secs(3)) {
535                Ok((_low, high)) => ready(high > 0),
536                Err(err) => {
537                    println!("retrying due to error fetching watermarks: {err}");
538                    ready(false)
539                }
540            },
541        )
542        .await;
543
544        // check we have the expected number of messages in the topic
545        let (low, high) = consumer
546            .fetch_watermarks(&topic, 0, Duration::from_secs(3))
547            .unwrap();
548        assert_eq!((0, num_events as i64), (low, high));
549
550        // loop instead of iter so we can set a timeout
551        let mut failures = 0;
552        let mut out = Vec::new();
553        while failures < 100 {
554            match consumer.poll(Duration::from_secs(3)) {
555                Some(Ok(msg)) => {
556                    let s: &str = msg.payload_view().unwrap().unwrap();
557                    out.push(s.to_owned());
558                    let header = msg.headers().unwrap().get(0);
559                    assert_eq!(header.key, header_1_key);
560                    assert_eq!(header.value.unwrap(), header_1_value.as_bytes());
561                }
562                None if out.len() >= input.len() => break,
563                _ => {
564                    failures += 1;
565                    thread::sleep(Duration::from_millis(50));
566                }
567            }
568        }
569
570        assert_eq!(out.len(), input.len());
571        assert_eq!(out, input);
572    }
573}