1#![allow(clippy::print_stdout)] #[cfg(feature = "kafka-integration-tests")]
4#[cfg(test)]
5mod integration_test {
6 use std::{collections::HashMap, future::ready, thread, time::Duration};
7
8 use bytes::Bytes;
9 use futures::{StreamExt, stream};
10 use rdkafka::{
11 Message, Offset, TopicPartitionList,
12 consumer::{BaseConsumer, Consumer},
13 message::Headers,
14 };
15 use vector_lib::{
16 codecs::{JsonSerializerConfig, TextSerializerConfig},
17 config::{Tags, Telemetry, init_telemetry},
18 event::{BatchNotifier, BatchStatus},
19 lookup::lookup_v2::ConfigTargetPath,
20 };
21
22 use super::super::{config::KafkaSinkConfig, sink::KafkaSink, *};
23 use crate::{
24 event::{ObjectMap, TraceEvent, Value},
25 kafka::{KafkaAuthConfig, KafkaCompression, KafkaSaslConfig},
26 sinks::prelude::*,
27 test_util::{
28 components::{
29 DATA_VOLUME_SINK_TAGS, SINK_TAGS, assert_data_volume_sink_compliance,
30 assert_sink_compliance,
31 },
32 map_event_batch_stream, random_lines_with_stream, random_string, wait_for,
33 },
34 tls::{TEST_PEM_INTERMEDIATE_CA_PATH, TlsConfig, TlsEnableableConfig},
35 };
36
37 fn kafka_host() -> String {
38 std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
39 }
40
41 fn kafka_address(port: u16) -> String {
42 format!("{}:{}", kafka_host(), port)
43 }
44
45 #[tokio::test]
46 async fn healthcheck() {
47 crate::test_util::trace_init();
48
49 let topic = format!("test-{}", random_string(10));
50
51 let config = KafkaSinkConfig {
52 bootstrap_servers: kafka_address(9091),
53 topic: Template::try_from(topic.clone()).unwrap(),
54 healthcheck_topic: None,
55 key_field: None,
56 encoding: TextSerializerConfig::default().into(),
57 batch: BatchConfig::default(),
58 compression: KafkaCompression::None,
59 auth: KafkaAuthConfig::default(),
60 socket_timeout_ms: Duration::from_millis(60000),
61 message_timeout_ms: Duration::from_millis(300000),
62 rate_limit_duration_secs: 1,
63 rate_limit_num: i64::MAX as u64,
64 librdkafka_options: HashMap::new(),
65 headers_key: None,
66 acknowledgements: Default::default(),
67 };
68 self::sink::healthcheck(config, Default::default())
69 .await
70 .unwrap();
71 }
72
73 #[tokio::test]
74 async fn healthcheck_topic() {
75 crate::test_util::trace_init();
76
77 let topic = format!("{{ {} }}", random_string(10));
78
79 let config = KafkaSinkConfig {
80 bootstrap_servers: kafka_address(9091),
81 topic: Template::try_from(topic.clone()).unwrap(),
82 healthcheck_topic: Some(String::from("topic-1234")),
83 key_field: None,
84 encoding: TextSerializerConfig::default().into(),
85 batch: BatchConfig::default(),
86 compression: KafkaCompression::None,
87 auth: KafkaAuthConfig::default(),
88 socket_timeout_ms: Duration::from_millis(60000),
89 message_timeout_ms: Duration::from_millis(300000),
90 rate_limit_duration_secs: 1,
91 rate_limit_num: i64::MAX as u64,
92 librdkafka_options: HashMap::new(),
93 headers_key: None,
94 acknowledgements: Default::default(),
95 };
96 self::sink::healthcheck(config, Default::default())
97 .await
98 .unwrap();
99 }
100
101 #[tokio::test]
102 async fn kafka_happy_path_plaintext() {
103 crate::test_util::trace_init();
104 kafka_happy_path(
105 kafka_address(9091),
106 None,
107 None,
108 KafkaCompression::None,
109 true,
110 )
111 .await;
112 kafka_happy_path(
113 kafka_address(9091),
114 None,
115 None,
116 KafkaCompression::None,
117 false,
118 )
119 .await;
120 }
121
122 #[tokio::test]
123 async fn kafka_happy_path_gzip() {
124 crate::test_util::trace_init();
125 kafka_happy_path(
126 kafka_address(9091),
127 None,
128 None,
129 KafkaCompression::Gzip,
130 false,
131 )
132 .await;
133 }
134
135 #[tokio::test]
136 async fn kafka_happy_path_lz4() {
137 crate::test_util::trace_init();
138 kafka_happy_path(
139 kafka_address(9091),
140 None,
141 None,
142 KafkaCompression::Lz4,
143 false,
144 )
145 .await;
146 }
147
148 #[tokio::test]
149 async fn kafka_happy_path_snappy() {
150 crate::test_util::trace_init();
151 kafka_happy_path(
152 kafka_address(9091),
153 None,
154 None,
155 KafkaCompression::Snappy,
156 false,
157 )
158 .await;
159 }
160
161 #[tokio::test]
162 async fn kafka_happy_path_zstd() {
163 crate::test_util::trace_init();
164 kafka_happy_path(
165 kafka_address(9091),
166 None,
167 None,
168 KafkaCompression::Zstd,
169 false,
170 )
171 .await;
172 }
173
174 async fn kafka_batch_options_overrides(
175 batch: BatchConfig<NoDefaultsBatchSettings>,
176 librdkafka_options: HashMap<String, String>,
177 ) -> crate::Result<KafkaSink> {
178 let topic = format!("test-{}", random_string(10));
179 let config = KafkaSinkConfig {
180 bootstrap_servers: kafka_address(9091),
181 topic: Template::try_from(format!("{topic}-%Y%m%d")).unwrap(),
182 compression: KafkaCompression::None,
183 healthcheck_topic: None,
184 encoding: TextSerializerConfig::default().into(),
185 key_field: None,
186 auth: KafkaAuthConfig {
187 sasl: None,
188 tls: None,
189 },
190 socket_timeout_ms: Duration::from_millis(60000),
191 message_timeout_ms: Duration::from_millis(300000),
192 rate_limit_duration_secs: 1,
193 rate_limit_num: i64::MAX as u64,
194 batch,
195 librdkafka_options,
196 headers_key: None,
197 acknowledgements: Default::default(),
198 };
199 config.clone().to_rdkafka()?;
200 self::sink::healthcheck(config.clone(), Default::default()).await?;
201 KafkaSink::new(config)
202 }
203
204 #[tokio::test]
205 async fn kafka_batch_options_max_bytes_errors_on_double_set() {
206 crate::test_util::trace_init();
207 let mut batch = BatchConfig::default();
208 batch.max_bytes = Some(1000);
209
210 assert!(
211 kafka_batch_options_overrides(
212 batch,
213 indexmap::indexmap! {
214 "batch.size".to_string() => 1.to_string(),
215 }
216 .into_iter()
217 .collect()
218 )
219 .await
220 .is_err()
221 )
222 }
223
224 #[tokio::test]
225 async fn kafka_batch_options_actually_sets() {
226 crate::test_util::trace_init();
227 let mut batch = BatchConfig::default();
228 batch.max_events = Some(10);
229 batch.timeout_secs = Some(2.0);
230
231 kafka_batch_options_overrides(batch, indexmap::indexmap! {}.into_iter().collect())
232 .await
233 .unwrap();
234 }
235
236 #[tokio::test]
237 async fn kafka_batch_options_max_events_errors_on_double_set() {
238 crate::test_util::trace_init();
239 let mut batch = BatchConfig::default();
240 batch.max_events = Some(10);
241
242 assert!(
243 kafka_batch_options_overrides(
244 batch,
245 indexmap::indexmap! {
246 "batch.num.messages".to_string() => 1.to_string(),
247 }
248 .into_iter()
249 .collect()
250 )
251 .await
252 .is_err()
253 )
254 }
255
256 #[tokio::test]
257 async fn kafka_batch_options_timeout_secs_errors_on_double_set() {
258 crate::test_util::trace_init();
259 let mut batch = BatchConfig::default();
260 batch.timeout_secs = Some(10.0);
261
262 assert!(
263 kafka_batch_options_overrides(
264 batch,
265 indexmap::indexmap! {
266 "queue.buffering.max.ms".to_string() => 1.to_string(),
267 }
268 .into_iter()
269 .collect()
270 )
271 .await
272 .is_err()
273 )
274 }
275
276 #[tokio::test]
277 async fn kafka_happy_path_tls() {
278 crate::test_util::trace_init();
279 let mut options = TlsConfig::test_config();
280 options.ca_file = Some(TEST_PEM_INTERMEDIATE_CA_PATH.into());
283 kafka_happy_path(
284 kafka_address(9092),
285 None,
286 Some(TlsEnableableConfig {
287 enabled: Some(true),
288 options: TlsConfig::test_config(),
289 }),
290 KafkaCompression::None,
291 false,
292 )
293 .await;
294 }
295
296 #[tokio::test]
297 async fn kafka_happy_path_sasl() {
298 crate::test_util::trace_init();
299 kafka_happy_path(
300 kafka_address(9093),
301 Some(KafkaSaslConfig {
302 enabled: Some(true),
303 username: Some("admin".to_string()),
304 password: Some("admin".to_string().into()),
305 mechanism: Some("PLAIN".to_owned()),
306 }),
307 None,
308 KafkaCompression::None,
309 false,
310 )
311 .await;
312 }
313
314 #[tokio::test]
315 async fn kafka_happy_path_trace_events() {
316 crate::test_util::trace_init();
317
318 assert_sink_compliance(&SINK_TAGS, async move {
319 let topic_prefix = format!("test-trace-{}", random_string(10));
320 let topic = format!("{}-{}", topic_prefix, chrono::Utc::now().format("%Y%m%d"));
321 let key_field = ConfigTargetPath::try_from("trace_key".to_string()).unwrap();
322 let headers_key = ConfigTargetPath::try_from("trace_headers".to_string()).unwrap();
323 let trace_key = "trace-partition-key";
324 let header_key = "trace-header-key";
325 let header_value = "trace-header-value";
326
327 let config = KafkaSinkConfig {
328 bootstrap_servers: kafka_address(9091),
329 topic: Template::try_from(format!("{topic_prefix}-%Y%m%d")).unwrap(),
330 healthcheck_topic: None,
331 key_field: Some(key_field.clone()),
332 encoding: JsonSerializerConfig::default().into(),
333 batch: BatchConfig::default(),
334 compression: KafkaCompression::None,
335 auth: KafkaAuthConfig::default(),
336 socket_timeout_ms: Duration::from_millis(60000),
337 message_timeout_ms: Duration::from_millis(300000),
338 rate_limit_duration_secs: 1,
339 rate_limit_num: i64::MAX as u64,
340 librdkafka_options: HashMap::new(),
341 headers_key: Some(headers_key.clone()),
342 acknowledgements: Default::default(),
343 };
344
345 let num_events = 100;
346 let mut expected_messages = Vec::with_capacity(num_events);
347
348 let (batch, receiver) = BatchNotifier::new_with_receiver();
349 let mut events = Vec::with_capacity(num_events);
350 for i in 0..num_events {
351 let message = format!("trace-message-{i}");
352 expected_messages.push(message.clone());
353
354 let mut trace = TraceEvent::default();
355 trace.insert("message", message);
356 trace.insert("trace_key", trace_key);
357 trace.insert("timestamp", chrono::Utc::now());
358
359 let mut trace_headers = ObjectMap::new();
360 trace_headers.insert(header_key.into(), Value::Bytes(Bytes::from(header_value)));
361 trace.insert(&headers_key, trace_headers);
362
363 events.push(Event::Trace(trace.with_batch_notifier(&batch)));
364 }
365
366 let sink = KafkaSink::new(config).unwrap();
367 let sink = VectorSink::from_event_streamsink(sink);
368 let stream = map_event_batch_stream(stream::iter(events), Some(batch));
369 sink.run(stream).await.unwrap();
370 assert_eq!(receiver.await, BatchStatus::Delivered);
371
372 let mut client_config = rdkafka::ClientConfig::new();
374 client_config.set("bootstrap.servers", kafka_address(9091).as_str());
375 client_config.set("group.id", random_string(10));
376 client_config.set("enable.partition.eof", "true");
377 let mut tpl = TopicPartitionList::new();
378 tpl.add_partition(&topic, 0)
379 .set_offset(Offset::Beginning)
380 .unwrap();
381 let consumer: BaseConsumer = client_config.create().unwrap();
382 consumer.assign(&tpl).unwrap();
383
384 wait_for(
385 || match consumer.fetch_watermarks(&topic, 0, Duration::from_secs(3)) {
386 Ok((_low, high)) => ready(high >= num_events as i64),
387 Err(err) => {
388 println!("retrying due to error fetching watermarks: {err}");
389 ready(false)
390 }
391 },
392 )
393 .await;
394
395 let (low, high) = consumer
396 .fetch_watermarks(&topic, 0, Duration::from_secs(3))
397 .unwrap();
398 assert_eq!((0, num_events as i64), (low, high));
399
400 let mut failures = 0;
401 let mut observed_messages = Vec::new();
402 while failures < 100 {
403 match consumer.poll(Duration::from_secs(3)) {
404 Some(Ok(msg)) => {
405 let payload: &str = msg.payload_view().unwrap().unwrap();
406 let payload_json: serde_json::Value =
407 serde_json::from_str(payload).unwrap();
408 observed_messages
409 .push(payload_json["message"].as_str().unwrap().to_owned());
410
411 let key = msg.key().unwrap();
412 assert_eq!(key, trace_key.as_bytes());
413
414 let timestamp = msg.timestamp().to_millis();
415 assert!(timestamp.is_some());
416
417 let header = msg.headers().unwrap().get(0);
418 assert_eq!(header.key, header_key);
419 assert_eq!(header.value.unwrap(), header_value.as_bytes());
420 }
421 None if observed_messages.len() >= num_events => break,
422 _ => {
423 failures += 1;
424 thread::sleep(Duration::from_millis(50));
425 }
426 }
427 }
428
429 assert_eq!(observed_messages.len(), num_events);
430 assert_eq!(observed_messages, expected_messages);
431 })
432 .await;
433 }
434
435 async fn kafka_happy_path(
436 server: String,
437 sasl: Option<KafkaSaslConfig>,
438 tls: Option<TlsEnableableConfig>,
439 compression: KafkaCompression,
440 test_telemetry_tags: bool,
441 ) {
442 if test_telemetry_tags {
443 init_telemetry(
446 Telemetry {
447 tags: Tags {
448 emit_service: true,
449 emit_source: true,
450 },
451 },
452 true,
453 );
454 }
455
456 let topic = format!("test-{}", random_string(10));
457 let headers_key = ConfigTargetPath::try_from("headers_key".to_string()).unwrap();
458 let kafka_auth = KafkaAuthConfig { sasl, tls };
459 let config = KafkaSinkConfig {
460 bootstrap_servers: server.clone(),
461 topic: Template::try_from(format!("{topic}-%Y%m%d")).unwrap(),
462 healthcheck_topic: None,
463 key_field: None,
464 encoding: TextSerializerConfig::default().into(),
465 batch: BatchConfig::default(),
466 compression,
467 auth: kafka_auth.clone(),
468 socket_timeout_ms: Duration::from_millis(60000),
469 message_timeout_ms: Duration::from_millis(300000),
470 rate_limit_duration_secs: 1,
471 rate_limit_num: i64::MAX as u64,
472 librdkafka_options: HashMap::new(),
473 headers_key: Some(headers_key.clone()),
474 acknowledgements: Default::default(),
475 };
476 let topic = format!("{}-{}", topic, chrono::Utc::now().format("%Y%m%d"));
477 println!("Topic name generated in test: {topic:?}");
478
479 let num_events = 1000;
480 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
481 let (input, events) = random_lines_with_stream(100, num_events, Some(batch));
482
483 let header_1_key = "header-1-key";
484 let header_1_value = "header-1-value";
485 let input_events = events.map(move |mut events| {
486 let headers_key = headers_key.clone();
487 let mut header_values = ObjectMap::new();
488 header_values.insert(
489 header_1_key.into(),
490 Value::Bytes(Bytes::from(header_1_value)),
491 );
492 events.iter_logs_mut().for_each(move |log| {
493 log.insert(&headers_key, header_values.clone());
494 });
495 events
496 });
497
498 if test_telemetry_tags {
499 assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async move {
500 let sink = KafkaSink::new(config).unwrap();
501 let sink = VectorSink::from_event_streamsink(sink);
502 sink.run(input_events).await
503 })
504 .await
505 .expect("Running sink failed");
506 } else {
507 assert_sink_compliance(&SINK_TAGS, async move {
508 let sink = KafkaSink::new(config).unwrap();
509 let sink = VectorSink::from_event_streamsink(sink);
510 sink.run(input_events).await
511 })
512 .await
513 .expect("Running sink failed");
514 }
515 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
516
517 let mut client_config = rdkafka::ClientConfig::new();
519 client_config.set("bootstrap.servers", server.as_str());
520 client_config.set("group.id", random_string(10));
521 client_config.set("enable.partition.eof", "true");
522 kafka_auth.apply(&mut client_config).unwrap();
523
524 let mut tpl = TopicPartitionList::new();
525 tpl.add_partition(&topic, 0)
526 .set_offset(Offset::Beginning)
527 .unwrap();
528
529 let consumer: BaseConsumer = client_config.create().unwrap();
530 consumer.assign(&tpl).unwrap();
531
532 wait_for(
534 || match consumer.fetch_watermarks(&topic, 0, Duration::from_secs(3)) {
535 Ok((_low, high)) => ready(high > 0),
536 Err(err) => {
537 println!("retrying due to error fetching watermarks: {err}");
538 ready(false)
539 }
540 },
541 )
542 .await;
543
544 let (low, high) = consumer
546 .fetch_watermarks(&topic, 0, Duration::from_secs(3))
547 .unwrap();
548 assert_eq!((0, num_events as i64), (low, high));
549
550 let mut failures = 0;
552 let mut out = Vec::new();
553 while failures < 100 {
554 match consumer.poll(Duration::from_secs(3)) {
555 Some(Ok(msg)) => {
556 let s: &str = msg.payload_view().unwrap().unwrap();
557 out.push(s.to_owned());
558 let header = msg.headers().unwrap().get(0);
559 assert_eq!(header.key, header_1_key);
560 assert_eq!(header.value.unwrap(), header_1_value.as_bytes());
561 }
562 None if out.len() >= input.len() => break,
563 _ => {
564 failures += 1;
565 thread::sleep(Duration::from_millis(50));
566 }
567 }
568 }
569
570 assert_eq!(out.len(), input.len());
571 assert_eq!(out, input);
572 }
573}