vector/sinks/kafka/
tests.rs

1#![allow(clippy::print_stdout)] // tests
2
3#[cfg(feature = "kafka-integration-tests")]
4#[cfg(test)]
5mod integration_test {
6    use std::{collections::HashMap, future::ready, thread, time::Duration};
7
8    use bytes::Bytes;
9    use futures::StreamExt;
10    use rdkafka::{
11        Message, Offset, TopicPartitionList,
12        consumer::{BaseConsumer, Consumer},
13        message::Headers,
14    };
15    use vector_lib::{
16        codecs::TextSerializerConfig,
17        config::{Tags, Telemetry, init_telemetry},
18        event::{BatchNotifier, BatchStatus},
19        lookup::lookup_v2::ConfigTargetPath,
20    };
21
22    use super::super::{config::KafkaSinkConfig, sink::KafkaSink, *};
23    use crate::{
24        event::{ObjectMap, Value},
25        kafka::{KafkaAuthConfig, KafkaCompression, KafkaSaslConfig},
26        sinks::prelude::*,
27        test_util::{
28            components::{
29                DATA_VOLUME_SINK_TAGS, SINK_TAGS, assert_data_volume_sink_compliance,
30                assert_sink_compliance,
31            },
32            random_lines_with_stream, random_string, wait_for,
33        },
34        tls::{TEST_PEM_INTERMEDIATE_CA_PATH, TlsConfig, TlsEnableableConfig},
35    };
36
37    fn kafka_host() -> String {
38        std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
39    }
40
41    fn kafka_address(port: u16) -> String {
42        format!("{}:{}", kafka_host(), port)
43    }
44
45    #[tokio::test]
46    async fn healthcheck() {
47        crate::test_util::trace_init();
48
49        let topic = format!("test-{}", random_string(10));
50
51        let config = KafkaSinkConfig {
52            bootstrap_servers: kafka_address(9091),
53            topic: Template::try_from(topic.clone()).unwrap(),
54            healthcheck_topic: None,
55            key_field: None,
56            encoding: TextSerializerConfig::default().into(),
57            batch: BatchConfig::default(),
58            compression: KafkaCompression::None,
59            auth: KafkaAuthConfig::default(),
60            socket_timeout_ms: Duration::from_millis(60000),
61            message_timeout_ms: Duration::from_millis(300000),
62            rate_limit_duration_secs: 1,
63            rate_limit_num: i64::MAX as u64,
64            librdkafka_options: HashMap::new(),
65            headers_key: None,
66            acknowledgements: Default::default(),
67        };
68        self::sink::healthcheck(config, Default::default())
69            .await
70            .unwrap();
71    }
72
73    #[tokio::test]
74    async fn healthcheck_topic() {
75        crate::test_util::trace_init();
76
77        let topic = format!("{{ {} }}", random_string(10));
78
79        let config = KafkaSinkConfig {
80            bootstrap_servers: kafka_address(9091),
81            topic: Template::try_from(topic.clone()).unwrap(),
82            healthcheck_topic: Some(String::from("topic-1234")),
83            key_field: None,
84            encoding: TextSerializerConfig::default().into(),
85            batch: BatchConfig::default(),
86            compression: KafkaCompression::None,
87            auth: KafkaAuthConfig::default(),
88            socket_timeout_ms: Duration::from_millis(60000),
89            message_timeout_ms: Duration::from_millis(300000),
90            rate_limit_duration_secs: 1,
91            rate_limit_num: i64::MAX as u64,
92            librdkafka_options: HashMap::new(),
93            headers_key: None,
94            acknowledgements: Default::default(),
95        };
96        self::sink::healthcheck(config, Default::default())
97            .await
98            .unwrap();
99    }
100
101    #[tokio::test]
102    async fn kafka_happy_path_plaintext() {
103        crate::test_util::trace_init();
104        kafka_happy_path(
105            kafka_address(9091),
106            None,
107            None,
108            KafkaCompression::None,
109            true,
110        )
111        .await;
112        kafka_happy_path(
113            kafka_address(9091),
114            None,
115            None,
116            KafkaCompression::None,
117            false,
118        )
119        .await;
120    }
121
122    #[tokio::test]
123    async fn kafka_happy_path_gzip() {
124        crate::test_util::trace_init();
125        kafka_happy_path(
126            kafka_address(9091),
127            None,
128            None,
129            KafkaCompression::Gzip,
130            false,
131        )
132        .await;
133    }
134
135    #[tokio::test]
136    async fn kafka_happy_path_lz4() {
137        crate::test_util::trace_init();
138        kafka_happy_path(
139            kafka_address(9091),
140            None,
141            None,
142            KafkaCompression::Lz4,
143            false,
144        )
145        .await;
146    }
147
148    #[tokio::test]
149    async fn kafka_happy_path_snappy() {
150        crate::test_util::trace_init();
151        kafka_happy_path(
152            kafka_address(9091),
153            None,
154            None,
155            KafkaCompression::Snappy,
156            false,
157        )
158        .await;
159    }
160
161    #[tokio::test]
162    async fn kafka_happy_path_zstd() {
163        crate::test_util::trace_init();
164        kafka_happy_path(
165            kafka_address(9091),
166            None,
167            None,
168            KafkaCompression::Zstd,
169            false,
170        )
171        .await;
172    }
173
174    async fn kafka_batch_options_overrides(
175        batch: BatchConfig<NoDefaultsBatchSettings>,
176        librdkafka_options: HashMap<String, String>,
177    ) -> crate::Result<KafkaSink> {
178        let topic = format!("test-{}", random_string(10));
179        let config = KafkaSinkConfig {
180            bootstrap_servers: kafka_address(9091),
181            topic: Template::try_from(format!("{topic}-%Y%m%d")).unwrap(),
182            compression: KafkaCompression::None,
183            healthcheck_topic: None,
184            encoding: TextSerializerConfig::default().into(),
185            key_field: None,
186            auth: KafkaAuthConfig {
187                sasl: None,
188                tls: None,
189            },
190            socket_timeout_ms: Duration::from_millis(60000),
191            message_timeout_ms: Duration::from_millis(300000),
192            rate_limit_duration_secs: 1,
193            rate_limit_num: i64::MAX as u64,
194            batch,
195            librdkafka_options,
196            headers_key: None,
197            acknowledgements: Default::default(),
198        };
199        config.clone().to_rdkafka()?;
200        self::sink::healthcheck(config.clone(), Default::default()).await?;
201        KafkaSink::new(config)
202    }
203
204    #[tokio::test]
205    async fn kafka_batch_options_max_bytes_errors_on_double_set() {
206        crate::test_util::trace_init();
207        let mut batch = BatchConfig::default();
208        batch.max_bytes = Some(1000);
209
210        assert!(
211            kafka_batch_options_overrides(
212                batch,
213                indexmap::indexmap! {
214                    "batch.size".to_string() => 1.to_string(),
215                }
216                .into_iter()
217                .collect()
218            )
219            .await
220            .is_err()
221        )
222    }
223
224    #[tokio::test]
225    async fn kafka_batch_options_actually_sets() {
226        crate::test_util::trace_init();
227        let mut batch = BatchConfig::default();
228        batch.max_events = Some(10);
229        batch.timeout_secs = Some(2.0);
230
231        kafka_batch_options_overrides(batch, indexmap::indexmap! {}.into_iter().collect())
232            .await
233            .unwrap();
234    }
235
236    #[tokio::test]
237    async fn kafka_batch_options_max_events_errors_on_double_set() {
238        crate::test_util::trace_init();
239        let mut batch = BatchConfig::default();
240        batch.max_events = Some(10);
241
242        assert!(
243            kafka_batch_options_overrides(
244                batch,
245                indexmap::indexmap! {
246                    "batch.num.messages".to_string() => 1.to_string(),
247                }
248                .into_iter()
249                .collect()
250            )
251            .await
252            .is_err()
253        )
254    }
255
256    #[tokio::test]
257    async fn kafka_batch_options_timeout_secs_errors_on_double_set() {
258        crate::test_util::trace_init();
259        let mut batch = BatchConfig::default();
260        batch.timeout_secs = Some(10.0);
261
262        assert!(
263            kafka_batch_options_overrides(
264                batch,
265                indexmap::indexmap! {
266                    "queue.buffering.max.ms".to_string() => 1.to_string(),
267                }
268                .into_iter()
269                .collect()
270            )
271            .await
272            .is_err()
273        )
274    }
275
276    #[tokio::test]
277    async fn kafka_happy_path_tls() {
278        crate::test_util::trace_init();
279        let mut options = TlsConfig::test_config();
280        // couldn't get Kafka to load and return a certificate chain, it only returns the leaf
281        // certificate
282        options.ca_file = Some(TEST_PEM_INTERMEDIATE_CA_PATH.into());
283        kafka_happy_path(
284            kafka_address(9092),
285            None,
286            Some(TlsEnableableConfig {
287                enabled: Some(true),
288                options: TlsConfig::test_config(),
289            }),
290            KafkaCompression::None,
291            false,
292        )
293        .await;
294    }
295
296    #[tokio::test]
297    async fn kafka_happy_path_sasl() {
298        crate::test_util::trace_init();
299        kafka_happy_path(
300            kafka_address(9093),
301            Some(KafkaSaslConfig {
302                enabled: Some(true),
303                username: Some("admin".to_string()),
304                password: Some("admin".to_string().into()),
305                mechanism: Some("PLAIN".to_owned()),
306            }),
307            None,
308            KafkaCompression::None,
309            false,
310        )
311        .await;
312    }
313
314    async fn kafka_happy_path(
315        server: String,
316        sasl: Option<KafkaSaslConfig>,
317        tls: Option<TlsEnableableConfig>,
318        compression: KafkaCompression,
319        test_telemetry_tags: bool,
320    ) {
321        if test_telemetry_tags {
322            // We need to configure Vector to emit the service and source tags.
323            // The default is to not emit these.
324            init_telemetry(
325                Telemetry {
326                    tags: Tags {
327                        emit_service: true,
328                        emit_source: true,
329                    },
330                },
331                true,
332            );
333        }
334
335        let topic = format!("test-{}", random_string(10));
336        let headers_key = ConfigTargetPath::try_from("headers_key".to_string()).unwrap();
337        let kafka_auth = KafkaAuthConfig { sasl, tls };
338        let config = KafkaSinkConfig {
339            bootstrap_servers: server.clone(),
340            topic: Template::try_from(format!("{topic}-%Y%m%d")).unwrap(),
341            healthcheck_topic: None,
342            key_field: None,
343            encoding: TextSerializerConfig::default().into(),
344            batch: BatchConfig::default(),
345            compression,
346            auth: kafka_auth.clone(),
347            socket_timeout_ms: Duration::from_millis(60000),
348            message_timeout_ms: Duration::from_millis(300000),
349            rate_limit_duration_secs: 1,
350            rate_limit_num: i64::MAX as u64,
351            librdkafka_options: HashMap::new(),
352            headers_key: Some(headers_key.clone()),
353            acknowledgements: Default::default(),
354        };
355        let topic = format!("{}-{}", topic, chrono::Utc::now().format("%Y%m%d"));
356        println!("Topic name generated in test: {topic:?}");
357
358        let num_events = 1000;
359        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
360        let (input, events) = random_lines_with_stream(100, num_events, Some(batch));
361
362        let header_1_key = "header-1-key";
363        let header_1_value = "header-1-value";
364        let input_events = events.map(move |mut events| {
365            let headers_key = headers_key.clone();
366            let mut header_values = ObjectMap::new();
367            header_values.insert(
368                header_1_key.into(),
369                Value::Bytes(Bytes::from(header_1_value)),
370            );
371            events.iter_logs_mut().for_each(move |log| {
372                log.insert(&headers_key, header_values.clone());
373            });
374            events
375        });
376
377        if test_telemetry_tags {
378            assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async move {
379                let sink = KafkaSink::new(config).unwrap();
380                let sink = VectorSink::from_event_streamsink(sink);
381                sink.run(input_events).await
382            })
383            .await
384            .expect("Running sink failed");
385        } else {
386            assert_sink_compliance(&SINK_TAGS, async move {
387                let sink = KafkaSink::new(config).unwrap();
388                let sink = VectorSink::from_event_streamsink(sink);
389                sink.run(input_events).await
390            })
391            .await
392            .expect("Running sink failed");
393        }
394        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
395
396        // read back everything from the beginning
397        let mut client_config = rdkafka::ClientConfig::new();
398        client_config.set("bootstrap.servers", server.as_str());
399        client_config.set("group.id", random_string(10));
400        client_config.set("enable.partition.eof", "true");
401        kafka_auth.apply(&mut client_config).unwrap();
402
403        let mut tpl = TopicPartitionList::new();
404        tpl.add_partition(&topic, 0)
405            .set_offset(Offset::Beginning)
406            .unwrap();
407
408        let consumer: BaseConsumer = client_config.create().unwrap();
409        consumer.assign(&tpl).unwrap();
410
411        // wait for messages to show up
412        wait_for(
413            || match consumer.fetch_watermarks(&topic, 0, Duration::from_secs(3)) {
414                Ok((_low, high)) => ready(high > 0),
415                Err(err) => {
416                    println!("retrying due to error fetching watermarks: {err}");
417                    ready(false)
418                }
419            },
420        )
421        .await;
422
423        // check we have the expected number of messages in the topic
424        let (low, high) = consumer
425            .fetch_watermarks(&topic, 0, Duration::from_secs(3))
426            .unwrap();
427        assert_eq!((0, num_events as i64), (low, high));
428
429        // loop instead of iter so we can set a timeout
430        let mut failures = 0;
431        let mut out = Vec::new();
432        while failures < 100 {
433            match consumer.poll(Duration::from_secs(3)) {
434                Some(Ok(msg)) => {
435                    let s: &str = msg.payload_view().unwrap().unwrap();
436                    out.push(s.to_owned());
437                    let header = msg.headers().unwrap().get(0);
438                    assert_eq!(header.key, header_1_key);
439                    assert_eq!(header.value.unwrap(), header_1_value.as_bytes());
440                }
441                None if out.len() >= input.len() => break,
442                _ => {
443                    failures += 1;
444                    thread::sleep(Duration::from_millis(50));
445                }
446            }
447        }
448
449        assert_eq!(out.len(), input.len());
450        assert_eq!(out, input);
451    }
452}