1#![allow(clippy::print_stdout)] #[cfg(feature = "kafka-integration-tests")]
4#[cfg(test)]
5mod integration_test {
6 use std::{collections::HashMap, future::ready, thread, time::Duration};
7
8 use bytes::Bytes;
9 use futures::StreamExt;
10 use rdkafka::{
11 consumer::{BaseConsumer, Consumer},
12 message::Headers,
13 Message, Offset, TopicPartitionList,
14 };
15 use vector_lib::codecs::TextSerializerConfig;
16 use vector_lib::lookup::lookup_v2::ConfigTargetPath;
17 use vector_lib::{
18 config::{init_telemetry, Tags, Telemetry},
19 event::{BatchNotifier, BatchStatus},
20 };
21
22 use super::super::{config::KafkaSinkConfig, sink::KafkaSink, *};
23 use crate::{
24 event::{ObjectMap, Value},
25 kafka::{KafkaAuthConfig, KafkaCompression, KafkaSaslConfig},
26 sinks::prelude::*,
27 test_util::{
28 components::{
29 assert_data_volume_sink_compliance, assert_sink_compliance, DATA_VOLUME_SINK_TAGS,
30 SINK_TAGS,
31 },
32 random_lines_with_stream, random_string, wait_for,
33 },
34 tls::{TlsConfig, TlsEnableableConfig, TEST_PEM_INTERMEDIATE_CA_PATH},
35 };
36
37 fn kafka_host() -> String {
38 std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
39 }
40
41 fn kafka_address(port: u16) -> String {
42 format!("{}:{}", kafka_host(), port)
43 }
44
45 #[tokio::test]
46 async fn healthcheck() {
47 crate::test_util::trace_init();
48
49 let topic = format!("test-{}", random_string(10));
50
51 let config = KafkaSinkConfig {
52 bootstrap_servers: kafka_address(9091),
53 topic: Template::try_from(topic.clone()).unwrap(),
54 healthcheck_topic: None,
55 key_field: None,
56 encoding: TextSerializerConfig::default().into(),
57 batch: BatchConfig::default(),
58 compression: KafkaCompression::None,
59 auth: KafkaAuthConfig::default(),
60 socket_timeout_ms: Duration::from_millis(60000),
61 message_timeout_ms: Duration::from_millis(300000),
62 rate_limit_duration_secs: 1,
63 rate_limit_num: i64::MAX as u64,
64 librdkafka_options: HashMap::new(),
65 headers_key: None,
66 acknowledgements: Default::default(),
67 };
68 self::sink::healthcheck(config, Default::default())
69 .await
70 .unwrap();
71 }
72
73 #[tokio::test]
74 async fn healthcheck_topic() {
75 crate::test_util::trace_init();
76
77 let topic = format!("{{ {} }}", random_string(10));
78
79 let config = KafkaSinkConfig {
80 bootstrap_servers: kafka_address(9091),
81 topic: Template::try_from(topic.clone()).unwrap(),
82 healthcheck_topic: Some(String::from("topic-1234")),
83 key_field: None,
84 encoding: TextSerializerConfig::default().into(),
85 batch: BatchConfig::default(),
86 compression: KafkaCompression::None,
87 auth: KafkaAuthConfig::default(),
88 socket_timeout_ms: Duration::from_millis(60000),
89 message_timeout_ms: Duration::from_millis(300000),
90 rate_limit_duration_secs: 1,
91 rate_limit_num: i64::MAX as u64,
92 librdkafka_options: HashMap::new(),
93 headers_key: None,
94 acknowledgements: Default::default(),
95 };
96 self::sink::healthcheck(config, Default::default())
97 .await
98 .unwrap();
99 }
100
101 #[tokio::test]
102 async fn kafka_happy_path_plaintext() {
103 crate::test_util::trace_init();
104 kafka_happy_path(
105 kafka_address(9091),
106 None,
107 None,
108 KafkaCompression::None,
109 true,
110 )
111 .await;
112 kafka_happy_path(
113 kafka_address(9091),
114 None,
115 None,
116 KafkaCompression::None,
117 false,
118 )
119 .await;
120 }
121
122 #[tokio::test]
123 async fn kafka_happy_path_gzip() {
124 crate::test_util::trace_init();
125 kafka_happy_path(
126 kafka_address(9091),
127 None,
128 None,
129 KafkaCompression::Gzip,
130 false,
131 )
132 .await;
133 }
134
135 #[tokio::test]
136 async fn kafka_happy_path_lz4() {
137 crate::test_util::trace_init();
138 kafka_happy_path(
139 kafka_address(9091),
140 None,
141 None,
142 KafkaCompression::Lz4,
143 false,
144 )
145 .await;
146 }
147
148 #[tokio::test]
149 async fn kafka_happy_path_snappy() {
150 crate::test_util::trace_init();
151 kafka_happy_path(
152 kafka_address(9091),
153 None,
154 None,
155 KafkaCompression::Snappy,
156 false,
157 )
158 .await;
159 }
160
161 #[tokio::test]
162 async fn kafka_happy_path_zstd() {
163 crate::test_util::trace_init();
164 kafka_happy_path(
165 kafka_address(9091),
166 None,
167 None,
168 KafkaCompression::Zstd,
169 false,
170 )
171 .await;
172 }
173
174 async fn kafka_batch_options_overrides(
175 batch: BatchConfig<NoDefaultsBatchSettings>,
176 librdkafka_options: HashMap<String, String>,
177 ) -> crate::Result<KafkaSink> {
178 let topic = format!("test-{}", random_string(10));
179 let config = KafkaSinkConfig {
180 bootstrap_servers: kafka_address(9091),
181 topic: Template::try_from(format!("{topic}-%Y%m%d")).unwrap(),
182 compression: KafkaCompression::None,
183 healthcheck_topic: None,
184 encoding: TextSerializerConfig::default().into(),
185 key_field: None,
186 auth: KafkaAuthConfig {
187 sasl: None,
188 tls: None,
189 },
190 socket_timeout_ms: Duration::from_millis(60000),
191 message_timeout_ms: Duration::from_millis(300000),
192 rate_limit_duration_secs: 1,
193 rate_limit_num: i64::MAX as u64,
194 batch,
195 librdkafka_options,
196 headers_key: None,
197 acknowledgements: Default::default(),
198 };
199 config.clone().to_rdkafka()?;
200 self::sink::healthcheck(config.clone(), Default::default()).await?;
201 KafkaSink::new(config)
202 }
203
204 #[tokio::test]
205 async fn kafka_batch_options_max_bytes_errors_on_double_set() {
206 crate::test_util::trace_init();
207 let mut batch = BatchConfig::default();
208 batch.max_bytes = Some(1000);
209
210 assert!(kafka_batch_options_overrides(
211 batch,
212 indexmap::indexmap! {
213 "batch.size".to_string() => 1.to_string(),
214 }
215 .into_iter()
216 .collect()
217 )
218 .await
219 .is_err())
220 }
221
222 #[tokio::test]
223 async fn kafka_batch_options_actually_sets() {
224 crate::test_util::trace_init();
225 let mut batch = BatchConfig::default();
226 batch.max_events = Some(10);
227 batch.timeout_secs = Some(2.0);
228
229 kafka_batch_options_overrides(batch, indexmap::indexmap! {}.into_iter().collect())
230 .await
231 .unwrap();
232 }
233
234 #[tokio::test]
235 async fn kafka_batch_options_max_events_errors_on_double_set() {
236 crate::test_util::trace_init();
237 let mut batch = BatchConfig::default();
238 batch.max_events = Some(10);
239
240 assert!(kafka_batch_options_overrides(
241 batch,
242 indexmap::indexmap! {
243 "batch.num.messages".to_string() => 1.to_string(),
244 }
245 .into_iter()
246 .collect()
247 )
248 .await
249 .is_err())
250 }
251
252 #[tokio::test]
253 async fn kafka_batch_options_timeout_secs_errors_on_double_set() {
254 crate::test_util::trace_init();
255 let mut batch = BatchConfig::default();
256 batch.timeout_secs = Some(10.0);
257
258 assert!(kafka_batch_options_overrides(
259 batch,
260 indexmap::indexmap! {
261 "queue.buffering.max.ms".to_string() => 1.to_string(),
262 }
263 .into_iter()
264 .collect()
265 )
266 .await
267 .is_err())
268 }
269
270 #[tokio::test]
271 async fn kafka_happy_path_tls() {
272 crate::test_util::trace_init();
273 let mut options = TlsConfig::test_config();
274 options.ca_file = Some(TEST_PEM_INTERMEDIATE_CA_PATH.into());
277 kafka_happy_path(
278 kafka_address(9092),
279 None,
280 Some(TlsEnableableConfig {
281 enabled: Some(true),
282 options: TlsConfig::test_config(),
283 }),
284 KafkaCompression::None,
285 false,
286 )
287 .await;
288 }
289
290 #[tokio::test]
291 async fn kafka_happy_path_sasl() {
292 crate::test_util::trace_init();
293 kafka_happy_path(
294 kafka_address(9093),
295 Some(KafkaSaslConfig {
296 enabled: Some(true),
297 username: Some("admin".to_string()),
298 password: Some("admin".to_string().into()),
299 mechanism: Some("PLAIN".to_owned()),
300 }),
301 None,
302 KafkaCompression::None,
303 false,
304 )
305 .await;
306 }
307
308 async fn kafka_happy_path(
309 server: String,
310 sasl: Option<KafkaSaslConfig>,
311 tls: Option<TlsEnableableConfig>,
312 compression: KafkaCompression,
313 test_telemetry_tags: bool,
314 ) {
315 if test_telemetry_tags {
316 init_telemetry(
319 Telemetry {
320 tags: Tags {
321 emit_service: true,
322 emit_source: true,
323 },
324 },
325 true,
326 );
327 }
328
329 let topic = format!("test-{}", random_string(10));
330 let headers_key = ConfigTargetPath::try_from("headers_key".to_string()).unwrap();
331 let kafka_auth = KafkaAuthConfig { sasl, tls };
332 let config = KafkaSinkConfig {
333 bootstrap_servers: server.clone(),
334 topic: Template::try_from(format!("{topic}-%Y%m%d")).unwrap(),
335 healthcheck_topic: None,
336 key_field: None,
337 encoding: TextSerializerConfig::default().into(),
338 batch: BatchConfig::default(),
339 compression,
340 auth: kafka_auth.clone(),
341 socket_timeout_ms: Duration::from_millis(60000),
342 message_timeout_ms: Duration::from_millis(300000),
343 rate_limit_duration_secs: 1,
344 rate_limit_num: i64::MAX as u64,
345 librdkafka_options: HashMap::new(),
346 headers_key: Some(headers_key.clone()),
347 acknowledgements: Default::default(),
348 };
349 let topic = format!("{}-{}", topic, chrono::Utc::now().format("%Y%m%d"));
350 println!("Topic name generated in test: {topic:?}");
351
352 let num_events = 1000;
353 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
354 let (input, events) = random_lines_with_stream(100, num_events, Some(batch));
355
356 let header_1_key = "header-1-key";
357 let header_1_value = "header-1-value";
358 let input_events = events.map(move |mut events| {
359 let headers_key = headers_key.clone();
360 let mut header_values = ObjectMap::new();
361 header_values.insert(
362 header_1_key.into(),
363 Value::Bytes(Bytes::from(header_1_value)),
364 );
365 events.iter_logs_mut().for_each(move |log| {
366 log.insert(&headers_key, header_values.clone());
367 });
368 events
369 });
370
371 if test_telemetry_tags {
372 assert_data_volume_sink_compliance(&DATA_VOLUME_SINK_TAGS, async move {
373 let sink = KafkaSink::new(config).unwrap();
374 let sink = VectorSink::from_event_streamsink(sink);
375 sink.run(input_events).await
376 })
377 .await
378 .expect("Running sink failed");
379 } else {
380 assert_sink_compliance(&SINK_TAGS, async move {
381 let sink = KafkaSink::new(config).unwrap();
382 let sink = VectorSink::from_event_streamsink(sink);
383 sink.run(input_events).await
384 })
385 .await
386 .expect("Running sink failed");
387 }
388 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
389
390 let mut client_config = rdkafka::ClientConfig::new();
392 client_config.set("bootstrap.servers", server.as_str());
393 client_config.set("group.id", random_string(10));
394 client_config.set("enable.partition.eof", "true");
395 kafka_auth.apply(&mut client_config).unwrap();
396
397 let mut tpl = TopicPartitionList::new();
398 tpl.add_partition(&topic, 0)
399 .set_offset(Offset::Beginning)
400 .unwrap();
401
402 let consumer: BaseConsumer = client_config.create().unwrap();
403 consumer.assign(&tpl).unwrap();
404
405 wait_for(
407 || match consumer.fetch_watermarks(&topic, 0, Duration::from_secs(3)) {
408 Ok((_low, high)) => ready(high > 0),
409 Err(err) => {
410 println!("retrying due to error fetching watermarks: {err}");
411 ready(false)
412 }
413 },
414 )
415 .await;
416
417 let (low, high) = consumer
419 .fetch_watermarks(&topic, 0, Duration::from_secs(3))
420 .unwrap();
421 assert_eq!((0, num_events as i64), (low, high));
422
423 let mut failures = 0;
425 let mut out = Vec::new();
426 while failures < 100 {
427 match consumer.poll(Duration::from_secs(3)) {
428 Some(Ok(msg)) => {
429 let s: &str = msg.payload_view().unwrap().unwrap();
430 out.push(s.to_owned());
431 let header = msg.headers().unwrap().get(0);
432 assert_eq!(header.key, header_1_key);
433 assert_eq!(header.value.unwrap(), header_1_value.as_bytes());
434 }
435 None if out.len() >= input.len() => break,
436 _ => {
437 failures += 1;
438 thread::sleep(Duration::from_millis(50));
439 }
440 }
441 }
442
443 assert_eq!(out.len(), input.len());
444 assert_eq!(out, input);
445 }
446}