1#![allow(clippy::print_stdout)] #[cfg(feature = "kafka-integration-tests")]
4#[cfg(test)]
5mod integration_test {
6 use std::{collections::HashMap, future::ready, thread, time::Duration};
7
8 use bytes::Bytes;
9 use futures::StreamExt;
10 use rdkafka::{
11 Message, Offset, TopicPartitionList,
12 consumer::{BaseConsumer, Consumer},
13 message::Headers,
14 };
15 use vector_lib::{
16 codecs::TextSerializerConfig,
17 config::{Tags, Telemetry, init_telemetry},
18 event::{BatchNotifier, BatchStatus},
19 lookup::lookup_v2::ConfigTargetPath,
20 };
21
22 use super::super::{config::KafkaSinkConfig, sink::KafkaSink, *};
23 use crate::{
24 event::{ObjectMap, Value},
25 kafka::{KafkaAuthConfig, KafkaCompression, KafkaSaslConfig},
26 sinks::prelude::*,
27 test_util::{
28 components::{
29 DATA_VOLUME_SINK_TAGS, SINK_TAGS, assert_data_volume_sink_compliance,
30 assert_sink_compliance,
31 },
32 random_lines_with_stream, random_string, wait_for,
33 },
34 tls::{TEST_PEM_INTERMEDIATE_CA_PATH, TlsConfig, TlsEnableableConfig},
35 };
36
37 fn kafka_host() -> String {
38 std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
39 }
40
41 fn kafka_address(port: u16) -> String {
42 format!("{}:{}", kafka_host(), port)
43 }
44
45 #[tokio::test]
46 async fn healthcheck() {
47 crate::test_util::trace_init();
48
49 let topic = format!("test-{}", random_string(10));
50
51 let config = KafkaSinkConfig {
52 bootstrap_servers: kafka_address(9091),
53 topic: Template::try_from(topic.clone()).unwrap(),
54 healthcheck_topic: None,
55 key_field: None,
56 encoding: TextSerializerConfig::default().into(),
57 batch: BatchConfig::default(),
58 compression: KafkaCompression::None,
59 auth: KafkaAuthConfig::default(),
60 socket_timeout_ms: Duration::from_millis(60000),
61 message_timeout_ms: Duration::from_millis(300000),
62 rate_limit_duration_secs: 1,
63 rate_limit_num: i64::MAX as u64,
64 librdkafka_options: HashMap::new(),
65 headers_key: None,
66 acknowledgements: Default::default(),
67 };
68 self::sink::healthcheck(config, Default::default())
69 .await
70 .unwrap();
71 }
72
73 #[tokio::test]
74 async fn healthcheck_topic() {
75 crate::test_util::trace_init();
76
77 let topic = format!("{{ {} }}", random_string(10));
78
79 let config = KafkaSinkConfig {
80 bootstrap_servers: kafka_address(9091),
81 topic: Template::try_from(topic.clone()).unwrap(),
82 healthcheck_topic: Some(String::from("topic-1234")),
83 key_field: None,
84 encoding: TextSerializerConfig::default().into(),
85 batch: BatchConfig::default(),
86 compression: KafkaCompression::None,
87 auth: KafkaAuthConfig::default(),
88 socket_timeout_ms: Duration::from_millis(60000),
89 message_timeout_ms: Duration::from_millis(300000),
90 rate_limit_duration_secs: 1,
91 rate_limit_num: i64::MAX as u64,
92 librdkafka_options: HashMap::new(),
93 headers_key: None,
94 acknowledgements: Default::default(),
95 };
96 self::sink::healthcheck(config, Default::default())
97 .await
98 .unwrap();
99 }
100
101 #[tokio::test]
102 async fn kafka_happy_path_plaintext() {
103 crate::test_util::trace_init();
104 kafka_happy_path(
105 kafka_address(9091),
106 None,
107 None,
108 KafkaCompression::None,
109 true,
110 )
111 .await;
112 kafka_happy_path(
113 kafka_address(9091),
114 None,
115 None,
116 KafkaCompression::None,
117 false,
118 )
119 .await;
120 }
121
122 #[tokio::test]
123 async fn kafka_happy_path_gzip() {
124 crate::test_util::trace_init();
125 kafka_happy_path(
126 kafka_address(9091),
127 None,
128 None,
129 KafkaCompression::Gzip,
130 false,
131 )
132 .await;
133 }
134
135 #[tokio::test]
136 async fn kafka_happy_path_lz4() {
137 crate::test_util::trace_init();
138 kafka_happy_path(
139 kafka_address(9091),
140 None,
141 None,
142 KafkaCompression::Lz4,
143 false,
144 )
145 .await;
146 }
147
148 #[tokio::test]
149 async fn kafka_happy_path_snappy() {
150 crate::test_util::trace_init();
151 kafka_happy_path(
152 kafka_address(9091),
153 None,
154 None,
155 KafkaCompression::Snappy,
156 false,
157 )
158 .await;
159 }
160
161 #[tokio::test]
162 async fn kafka_happy_path_zstd() {
163 crate::test_util::trace_init();
164 kafka_happy_path(
165 kafka_address(9091),
166 None,
167 None,
168 KafkaCompression::Zstd,
169 false,
170 )
171 .await;
172 }
173
174 async fn kafka_batch_options_overrides(
175 batch: BatchConfig<NoDefaultsBatchSettings>,
176 librdkafka_options: HashMap<String, String>,
177 ) -> crate::Result<KafkaSink> {
178 let topic = format!("test-{}", random_string(10));
179 let config = KafkaSinkConfig {
180 bootstrap_servers: kafka_address(9091),
181 topic: Template::try_from(format!("{topic}-%Y%m%d")).unwrap(),
182 compression: KafkaCompression::None,
183 healthcheck_topic: None,
184 encoding: TextSerializerConfig::default().into(),
185 key_field: None,
186 auth: KafkaAuthConfig {
187 sasl: None,
188 tls: None,
189 },
190 socket_timeout_ms: Duration::from_millis(60000),
191 message_timeout_ms: Duration::from_millis(300000),
192 rate_limit_duration_secs: 1,
193 rate_limit_num: i64::MAX as u64,
194 batch,
195 librdkafka_options,
196 headers_key: None,
197 acknowledgements: Default::default(),
198 };
199 config.clone().to_rdkafka()?;
200 self::sink::healthcheck(config.clone(), Default::default()).await?;
201 KafkaSink::new(config)
202 }
203
204 #[tokio::test]
205 async fn kafka_batch_options_max_bytes_errors_on_double_set() {
206 crate::test_util::trace_init();
207 let mut batch = BatchConfig::default();
208 batch.max_bytes = Some(1000);
209
210 assert!(
211 kafka_batch_options_overrides(
212 batch,
213 indexmap::indexmap! {
214 "batch.size".to_string() => 1.to_string(),
215 }
216 .into_iter()
217 .collect()
218 )
219 .await
220 .is_err()
221 )
222 }
223
224 #[tokio::test]
225 async fn kafka_batch_options_actually_sets() {
226 crate::test_util::trace_init();
227 let mut batch = BatchConfig::default();
228 batch.max_events = Some(10);
229 batch.timeout_secs = Some(2.0);
230
231 kafka_batch_options_overrides(batch, indexmap::indexmap! {}.into_iter().collect())
232 .await
233 .unwrap();
234 }
235
236 #[tokio::test]
237 async fn kafka_batch_options_max_events_errors_on_double_set() {
238 crate::test_util::trace_init();
239 let mut batch = BatchConfig::default();
240 batch.max_events = Some(10);
241
242 assert!(
243 kafka_batch_options_overrides(
244 batch,
245 indexmap::indexmap! {
246 "batch.num.messages".to_string() => 1.to_string(),
247 }
248 .into_iter()
249 .collect()
250 )
251 .await
252 .is_err()
253 )
254 }
255
256 #[tokio::test]
257 async fn kafka_batch_options_timeout_secs_errors_on_double_set() {
258 crate::test_util::trace_init();
259 let mut batch = BatchConfig::default();
260 batch.timeout_secs = Some(10.0);
261
262 assert!(
263 kafka_batch_options_overrides(
264 batch,
265 indexmap::indexmap! {
266 "queue.buffering.max.ms".to_string() => 1.to_string(),
267 }
268 .into_iter()
269 .collect()
270 )
271 .await
272 .is_err()
273 )
274 }
275
276 #[tokio::test]
277 async fn kafka_happy_path_tls() {
278 crate::test_util::trace_init();
279 let mut options = TlsConfig::test_config();
280 options.ca_file = Some(TEST_PEM_INTERMEDIATE_CA_PATH.into());
283 kafka_happy_path(
284 kafka_address(9092),
285 None,
286 Some(TlsEnableableConfig {
287 enabled: Some(true),
288 options: TlsConfig::test_config(),
289 }),
290 KafkaCompression::None,
291 false,
292 )
293 .await;
294 }
295
296 #[tokio::test]
297 async fn kafka_happy_path_sasl() {
298 crate::test_util::trace_init();
299 kafka_happy_path(
300 kafka_address(9093),
301 Some(KafkaSaslConfig {
302 enabled: Some(true),
303 username: Some("admin".to_string()),
304 password: Some("admin".to_string().into()),
305 mechanism: Some("PLAIN".to_owned()),
306 }),
307 None,
308 KafkaCompression::None,
309 false,
310 )
311 .await;
312 }
313
314 async fn kafka_happy_path(
315 server: String,
316 sasl: Option<KafkaSaslConfig>,
317 tls: Option<TlsEnableableConfig>,
318 compression: KafkaCompression,
319 test_telemetry_tags: bool,
320 ) {
321 if test_telemetry_tags {
322 init_telemetry(
325 Telemetry {
326 tags: Tags {
327 emit_service: true,
328 emit_source: true,
329 },
330 },
331 true,
332 );
333 }
334
335 let topic = format!("test-{}", random_string(10));
336 let headers_key = ConfigTargetPath::try_from("headers_key".to_string()).unwrap();
337 let kafka_auth = KafkaAuthConfig { sasl, tls };
338 let config = KafkaSinkConfig {
339 bootstrap_servers: server.clone(),
340 topic: Template::try_from(format!("{topic}-%Y%m%d")).unwrap(),
341 healthcheck_topic: None,
342 key_field: None,
343 encoding: TextSerializerConfig::default().into(),
344 batch: BatchConfig::default(),
345 compression,
346 auth: kafka_auth.clone(),
347 socket_timeout_ms: Duration::from_millis(60000),
348 message_timeout_ms: Duration::from_millis(300000),
349 rate_limit_duration_secs: 1,
350 rate_limit_num: i64::MAX as u64,
351 librdkafka_options: HashMap::new(),
352 headers_key: Some(headers_key.clone()),
353 acknowledgements: Default::default(),
354 };
355 let topic = format!("{}-{}", topic, chrono::Utc::now().format("%Y%m%d"));
356 println!("Topic name generated in test: {topic:?}");
357
358 let num_events = 1000;
359 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
360 let (input, events) = random_lines_with_stream(100, num_events, Some(batch));
361
362 let header_1_key = "header-1-key";
363 let header_1_value = "header-1-value";
364 let input_events = events.map(move |mut events| {
365 let headers_key = headers_key.clone();
366 let mut header_values = ObjectMap::new();
367 header_values.insert(
368 header_1_key.into(),
369 Value::Bytes(Bytes::from(header_1_value)),
370 );
371 events.iter_logs_mut().for_each(move |log| {
372 log.insert(&headers_key, header_values.clone());
373 });
374 events
375 });
376
377 if test_telemetry_tags {
378 assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async move {
379 let sink = KafkaSink::new(config).unwrap();
380 let sink = VectorSink::from_event_streamsink(sink);
381 sink.run(input_events).await
382 })
383 .await
384 .expect("Running sink failed");
385 } else {
386 assert_sink_compliance(&SINK_TAGS, async move {
387 let sink = KafkaSink::new(config).unwrap();
388 let sink = VectorSink::from_event_streamsink(sink);
389 sink.run(input_events).await
390 })
391 .await
392 .expect("Running sink failed");
393 }
394 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
395
396 let mut client_config = rdkafka::ClientConfig::new();
398 client_config.set("bootstrap.servers", server.as_str());
399 client_config.set("group.id", random_string(10));
400 client_config.set("enable.partition.eof", "true");
401 kafka_auth.apply(&mut client_config).unwrap();
402
403 let mut tpl = TopicPartitionList::new();
404 tpl.add_partition(&topic, 0)
405 .set_offset(Offset::Beginning)
406 .unwrap();
407
408 let consumer: BaseConsumer = client_config.create().unwrap();
409 consumer.assign(&tpl).unwrap();
410
411 wait_for(
413 || match consumer.fetch_watermarks(&topic, 0, Duration::from_secs(3)) {
414 Ok((_low, high)) => ready(high > 0),
415 Err(err) => {
416 println!("retrying due to error fetching watermarks: {err}");
417 ready(false)
418 }
419 },
420 )
421 .await;
422
423 let (low, high) = consumer
425 .fetch_watermarks(&topic, 0, Duration::from_secs(3))
426 .unwrap();
427 assert_eq!((0, num_events as i64), (low, high));
428
429 let mut failures = 0;
431 let mut out = Vec::new();
432 while failures < 100 {
433 match consumer.poll(Duration::from_secs(3)) {
434 Some(Ok(msg)) => {
435 let s: &str = msg.payload_view().unwrap().unwrap();
436 out.push(s.to_owned());
437 let header = msg.headers().unwrap().get(0);
438 assert_eq!(header.key, header_1_key);
439 assert_eq!(header.value.unwrap(), header_1_value.as_bytes());
440 }
441 None if out.len() >= input.len() => break,
442 _ => {
443 failures += 1;
444 thread::sleep(Duration::from_millis(50));
445 }
446 }
447 }
448
449 assert_eq!(out.len(), input.len());
450 assert_eq!(out, input);
451 }
452}