vector/sinks/kafka/
tests.rs

1#![allow(clippy::print_stdout)] // tests
2
3#[cfg(feature = "kafka-integration-tests")]
4#[cfg(test)]
5mod integration_test {
6    use std::{collections::HashMap, future::ready, thread, time::Duration};
7
8    use bytes::Bytes;
9    use futures::StreamExt;
10    use rdkafka::{
11        consumer::{BaseConsumer, Consumer},
12        message::Headers,
13        Message, Offset, TopicPartitionList,
14    };
15    use vector_lib::codecs::TextSerializerConfig;
16    use vector_lib::lookup::lookup_v2::ConfigTargetPath;
17    use vector_lib::{
18        config::{init_telemetry, Tags, Telemetry},
19        event::{BatchNotifier, BatchStatus},
20    };
21
22    use super::super::{config::KafkaSinkConfig, sink::KafkaSink, *};
23    use crate::{
24        event::{ObjectMap, Value},
25        kafka::{KafkaAuthConfig, KafkaCompression, KafkaSaslConfig},
26        sinks::prelude::*,
27        test_util::{
28            components::{
29                assert_data_volume_sink_compliance, assert_sink_compliance, DATA_VOLUME_SINK_TAGS,
30                SINK_TAGS,
31            },
32            random_lines_with_stream, random_string, wait_for,
33        },
34        tls::{TlsConfig, TlsEnableableConfig, TEST_PEM_INTERMEDIATE_CA_PATH},
35    };
36
37    fn kafka_host() -> String {
38        std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
39    }
40
41    fn kafka_address(port: u16) -> String {
42        format!("{}:{}", kafka_host(), port)
43    }
44
45    #[tokio::test]
46    async fn healthcheck() {
47        crate::test_util::trace_init();
48
49        let topic = format!("test-{}", random_string(10));
50
51        let config = KafkaSinkConfig {
52            bootstrap_servers: kafka_address(9091),
53            topic: Template::try_from(topic.clone()).unwrap(),
54            healthcheck_topic: None,
55            key_field: None,
56            encoding: TextSerializerConfig::default().into(),
57            batch: BatchConfig::default(),
58            compression: KafkaCompression::None,
59            auth: KafkaAuthConfig::default(),
60            socket_timeout_ms: Duration::from_millis(60000),
61            message_timeout_ms: Duration::from_millis(300000),
62            rate_limit_duration_secs: 1,
63            rate_limit_num: i64::MAX as u64,
64            librdkafka_options: HashMap::new(),
65            headers_key: None,
66            acknowledgements: Default::default(),
67        };
68        self::sink::healthcheck(config, Default::default())
69            .await
70            .unwrap();
71    }
72
73    #[tokio::test]
74    async fn healthcheck_topic() {
75        crate::test_util::trace_init();
76
77        let topic = format!("{{ {} }}", random_string(10));
78
79        let config = KafkaSinkConfig {
80            bootstrap_servers: kafka_address(9091),
81            topic: Template::try_from(topic.clone()).unwrap(),
82            healthcheck_topic: Some(String::from("topic-1234")),
83            key_field: None,
84            encoding: TextSerializerConfig::default().into(),
85            batch: BatchConfig::default(),
86            compression: KafkaCompression::None,
87            auth: KafkaAuthConfig::default(),
88            socket_timeout_ms: Duration::from_millis(60000),
89            message_timeout_ms: Duration::from_millis(300000),
90            rate_limit_duration_secs: 1,
91            rate_limit_num: i64::MAX as u64,
92            librdkafka_options: HashMap::new(),
93            headers_key: None,
94            acknowledgements: Default::default(),
95        };
96        self::sink::healthcheck(config, Default::default())
97            .await
98            .unwrap();
99    }
100
101    #[tokio::test]
102    async fn kafka_happy_path_plaintext() {
103        crate::test_util::trace_init();
104        kafka_happy_path(
105            kafka_address(9091),
106            None,
107            None,
108            KafkaCompression::None,
109            true,
110        )
111        .await;
112        kafka_happy_path(
113            kafka_address(9091),
114            None,
115            None,
116            KafkaCompression::None,
117            false,
118        )
119        .await;
120    }
121
122    #[tokio::test]
123    async fn kafka_happy_path_gzip() {
124        crate::test_util::trace_init();
125        kafka_happy_path(
126            kafka_address(9091),
127            None,
128            None,
129            KafkaCompression::Gzip,
130            false,
131        )
132        .await;
133    }
134
135    #[tokio::test]
136    async fn kafka_happy_path_lz4() {
137        crate::test_util::trace_init();
138        kafka_happy_path(
139            kafka_address(9091),
140            None,
141            None,
142            KafkaCompression::Lz4,
143            false,
144        )
145        .await;
146    }
147
148    #[tokio::test]
149    async fn kafka_happy_path_snappy() {
150        crate::test_util::trace_init();
151        kafka_happy_path(
152            kafka_address(9091),
153            None,
154            None,
155            KafkaCompression::Snappy,
156            false,
157        )
158        .await;
159    }
160
161    #[tokio::test]
162    async fn kafka_happy_path_zstd() {
163        crate::test_util::trace_init();
164        kafka_happy_path(
165            kafka_address(9091),
166            None,
167            None,
168            KafkaCompression::Zstd,
169            false,
170        )
171        .await;
172    }
173
174    async fn kafka_batch_options_overrides(
175        batch: BatchConfig<NoDefaultsBatchSettings>,
176        librdkafka_options: HashMap<String, String>,
177    ) -> crate::Result<KafkaSink> {
178        let topic = format!("test-{}", random_string(10));
179        let config = KafkaSinkConfig {
180            bootstrap_servers: kafka_address(9091),
181            topic: Template::try_from(format!("{topic}-%Y%m%d")).unwrap(),
182            compression: KafkaCompression::None,
183            healthcheck_topic: None,
184            encoding: TextSerializerConfig::default().into(),
185            key_field: None,
186            auth: KafkaAuthConfig {
187                sasl: None,
188                tls: None,
189            },
190            socket_timeout_ms: Duration::from_millis(60000),
191            message_timeout_ms: Duration::from_millis(300000),
192            rate_limit_duration_secs: 1,
193            rate_limit_num: i64::MAX as u64,
194            batch,
195            librdkafka_options,
196            headers_key: None,
197            acknowledgements: Default::default(),
198        };
199        config.clone().to_rdkafka()?;
200        self::sink::healthcheck(config.clone(), Default::default()).await?;
201        KafkaSink::new(config)
202    }
203
204    #[tokio::test]
205    async fn kafka_batch_options_max_bytes_errors_on_double_set() {
206        crate::test_util::trace_init();
207        let mut batch = BatchConfig::default();
208        batch.max_bytes = Some(1000);
209
210        assert!(kafka_batch_options_overrides(
211            batch,
212            indexmap::indexmap! {
213                "batch.size".to_string() => 1.to_string(),
214            }
215            .into_iter()
216            .collect()
217        )
218        .await
219        .is_err())
220    }
221
222    #[tokio::test]
223    async fn kafka_batch_options_actually_sets() {
224        crate::test_util::trace_init();
225        let mut batch = BatchConfig::default();
226        batch.max_events = Some(10);
227        batch.timeout_secs = Some(2.0);
228
229        kafka_batch_options_overrides(batch, indexmap::indexmap! {}.into_iter().collect())
230            .await
231            .unwrap();
232    }
233
234    #[tokio::test]
235    async fn kafka_batch_options_max_events_errors_on_double_set() {
236        crate::test_util::trace_init();
237        let mut batch = BatchConfig::default();
238        batch.max_events = Some(10);
239
240        assert!(kafka_batch_options_overrides(
241            batch,
242            indexmap::indexmap! {
243                "batch.num.messages".to_string() => 1.to_string(),
244            }
245            .into_iter()
246            .collect()
247        )
248        .await
249        .is_err())
250    }
251
252    #[tokio::test]
253    async fn kafka_batch_options_timeout_secs_errors_on_double_set() {
254        crate::test_util::trace_init();
255        let mut batch = BatchConfig::default();
256        batch.timeout_secs = Some(10.0);
257
258        assert!(kafka_batch_options_overrides(
259            batch,
260            indexmap::indexmap! {
261                "queue.buffering.max.ms".to_string() => 1.to_string(),
262            }
263            .into_iter()
264            .collect()
265        )
266        .await
267        .is_err())
268    }
269
270    #[tokio::test]
271    async fn kafka_happy_path_tls() {
272        crate::test_util::trace_init();
273        let mut options = TlsConfig::test_config();
274        // couldn't get Kafka to load and return a certificate chain, it only returns the leaf
275        // certificate
276        options.ca_file = Some(TEST_PEM_INTERMEDIATE_CA_PATH.into());
277        kafka_happy_path(
278            kafka_address(9092),
279            None,
280            Some(TlsEnableableConfig {
281                enabled: Some(true),
282                options: TlsConfig::test_config(),
283            }),
284            KafkaCompression::None,
285            false,
286        )
287        .await;
288    }
289
290    #[tokio::test]
291    async fn kafka_happy_path_sasl() {
292        crate::test_util::trace_init();
293        kafka_happy_path(
294            kafka_address(9093),
295            Some(KafkaSaslConfig {
296                enabled: Some(true),
297                username: Some("admin".to_string()),
298                password: Some("admin".to_string().into()),
299                mechanism: Some("PLAIN".to_owned()),
300            }),
301            None,
302            KafkaCompression::None,
303            false,
304        )
305        .await;
306    }
307
308    async fn kafka_happy_path(
309        server: String,
310        sasl: Option<KafkaSaslConfig>,
311        tls: Option<TlsEnableableConfig>,
312        compression: KafkaCompression,
313        test_telemetry_tags: bool,
314    ) {
315        if test_telemetry_tags {
316            // We need to configure Vector to emit the service and source tags.
317            // The default is to not emit these.
318            init_telemetry(
319                Telemetry {
320                    tags: Tags {
321                        emit_service: true,
322                        emit_source: true,
323                    },
324                },
325                true,
326            );
327        }
328
329        let topic = format!("test-{}", random_string(10));
330        let headers_key = ConfigTargetPath::try_from("headers_key".to_string()).unwrap();
331        let kafka_auth = KafkaAuthConfig { sasl, tls };
332        let config = KafkaSinkConfig {
333            bootstrap_servers: server.clone(),
334            topic: Template::try_from(format!("{topic}-%Y%m%d")).unwrap(),
335            healthcheck_topic: None,
336            key_field: None,
337            encoding: TextSerializerConfig::default().into(),
338            batch: BatchConfig::default(),
339            compression,
340            auth: kafka_auth.clone(),
341            socket_timeout_ms: Duration::from_millis(60000),
342            message_timeout_ms: Duration::from_millis(300000),
343            rate_limit_duration_secs: 1,
344            rate_limit_num: i64::MAX as u64,
345            librdkafka_options: HashMap::new(),
346            headers_key: Some(headers_key.clone()),
347            acknowledgements: Default::default(),
348        };
349        let topic = format!("{}-{}", topic, chrono::Utc::now().format("%Y%m%d"));
350        println!("Topic name generated in test: {topic:?}");
351
352        let num_events = 1000;
353        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
354        let (input, events) = random_lines_with_stream(100, num_events, Some(batch));
355
356        let header_1_key = "header-1-key";
357        let header_1_value = "header-1-value";
358        let input_events = events.map(move |mut events| {
359            let headers_key = headers_key.clone();
360            let mut header_values = ObjectMap::new();
361            header_values.insert(
362                header_1_key.into(),
363                Value::Bytes(Bytes::from(header_1_value)),
364            );
365            events.iter_logs_mut().for_each(move |log| {
366                log.insert(&headers_key, header_values.clone());
367            });
368            events
369        });
370
371        if test_telemetry_tags {
372            assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async move {
373                let sink = KafkaSink::new(config).unwrap();
374                let sink = VectorSink::from_event_streamsink(sink);
375                sink.run(input_events).await
376            })
377            .await
378            .expect("Running sink failed");
379        } else {
380            assert_sink_compliance(&SINK_TAGS, async move {
381                let sink = KafkaSink::new(config).unwrap();
382                let sink = VectorSink::from_event_streamsink(sink);
383                sink.run(input_events).await
384            })
385            .await
386            .expect("Running sink failed");
387        }
388        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
389
390        // read back everything from the beginning
391        let mut client_config = rdkafka::ClientConfig::new();
392        client_config.set("bootstrap.servers", server.as_str());
393        client_config.set("group.id", random_string(10));
394        client_config.set("enable.partition.eof", "true");
395        kafka_auth.apply(&mut client_config).unwrap();
396
397        let mut tpl = TopicPartitionList::new();
398        tpl.add_partition(&topic, 0)
399            .set_offset(Offset::Beginning)
400            .unwrap();
401
402        let consumer: BaseConsumer = client_config.create().unwrap();
403        consumer.assign(&tpl).unwrap();
404
405        // wait for messages to show up
406        wait_for(
407            || match consumer.fetch_watermarks(&topic, 0, Duration::from_secs(3)) {
408                Ok((_low, high)) => ready(high > 0),
409                Err(err) => {
410                    println!("retrying due to error fetching watermarks: {err}");
411                    ready(false)
412                }
413            },
414        )
415        .await;
416
417        // check we have the expected number of messages in the topic
418        let (low, high) = consumer
419            .fetch_watermarks(&topic, 0, Duration::from_secs(3))
420            .unwrap();
421        assert_eq!((0, num_events as i64), (low, high));
422
423        // loop instead of iter so we can set a timeout
424        let mut failures = 0;
425        let mut out = Vec::new();
426        while failures < 100 {
427            match consumer.poll(Duration::from_secs(3)) {
428                Some(Ok(msg)) => {
429                    let s: &str = msg.payload_view().unwrap().unwrap();
430                    out.push(s.to_owned());
431                    let header = msg.headers().unwrap().get(0);
432                    assert_eq!(header.key, header_1_key);
433                    assert_eq!(header.value.unwrap(), header_1_value.as_bytes());
434                }
435                None if out.len() >= input.len() => break,
436                _ => {
437                    failures += 1;
438                    thread::sleep(Duration::from_millis(50));
439                }
440            }
441        }
442
443        assert_eq!(out.len(), input.len());
444        assert_eq!(out, input);
445    }
446}