1use std::{
2 collections::{HashMap, HashSet},
3 io::Cursor,
4 pin::Pin,
5 sync::{
6 Arc, OnceLock, Weak,
7 mpsc::{SyncSender, sync_channel},
8 },
9 time::Duration,
10};
11
12use async_stream::stream;
13use bytes::Bytes;
14use chrono::{DateTime, TimeZone, Utc};
15use futures::{Stream, StreamExt};
16use futures_util::future::OptionFuture;
17use rdkafka::{
18 ClientConfig, ClientContext, Statistics, TopicPartitionList,
19 consumer::{
20 BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance, StreamConsumer,
21 stream_consumer::StreamPartitionQueue,
22 },
23 error::KafkaError,
24 message::{BorrowedMessage, Headers as _, Message},
25 types::RDKafkaErrorCode,
26};
27use serde_with::serde_as;
28use snafu::{ResultExt, Snafu};
29use tokio::{
30 runtime::Handle,
31 sync::{
32 mpsc::{self, UnboundedReceiver, UnboundedSender},
33 oneshot,
34 },
35 task::JoinSet,
36 time::Sleep,
37};
38use tokio_util::codec::FramedRead;
39use tracing::{Instrument, Span};
40use vector_lib::{
41 EstimatedJsonEncodedSizeOf,
42 codecs::{
43 StreamDecodingError,
44 decoding::{DeserializerConfig, FramingConfig},
45 },
46 config::{LegacyKey, LogNamespace},
47 configurable::configurable_component,
48 finalizer::OrderedFinalizer,
49 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
50};
51use vrl::value::{Kind, ObjectMap, kind::Collection};
52
53use crate::{
54 SourceSender,
55 codecs::{Decoder, DecodingConfig},
56 config::{
57 LogSchema, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
58 log_schema,
59 },
60 event::{BatchNotifier, BatchStatus, Event, Value},
61 internal_events::{
62 KafkaBytesReceived, KafkaEventsReceived, KafkaOffsetUpdateError, KafkaReadError,
63 StreamClosedError,
64 },
65 kafka,
66 serde::{bool_or_struct, default_decoding, default_framing_message_based},
67 shutdown::ShutdownSignal,
68};
69
70#[derive(Debug, Snafu)]
71enum BuildError {
72 #[snafu(display("The drain_timeout_ms ({}) must be less than session_timeout_ms ({})", value, session_timeout_ms.as_millis()))]
73 InvalidDrainTimeout {
74 value: u64,
75 session_timeout_ms: Duration,
76 },
77 #[snafu(display("Could not create Kafka consumer: {}", source))]
78 CreateError { source: rdkafka::error::KafkaError },
79 #[snafu(display("Could not subscribe to Kafka topics: {}", source))]
80 SubscribeError { source: rdkafka::error::KafkaError },
81}
82
83#[configurable_component]
85#[derive(Clone, Debug, Default)]
86struct Metrics {
87 pub topic_lag_metric: bool,
89}
90
91#[serde_as]
93#[configurable_component(source("kafka", "Collect logs from Apache Kafka."))]
94#[derive(Clone, Debug, Derivative)]
95#[derivative(Default)]
96#[serde(deny_unknown_fields)]
97pub struct KafkaSourceConfig {
98 #[configurable(metadata(docs::examples = "10.14.22.123:9092,10.14.23.332:9092"))]
105 bootstrap_servers: String,
106
107 #[configurable(metadata(
111 docs::examples = "^(prefix1|prefix2)-.+",
112 docs::examples = "topic-1",
113 docs::examples = "topic-2"
114 ))]
115 topics: Vec<String>,
116
117 #[configurable(metadata(docs::examples = "consumer-group-name"))]
119 group_id: String,
120
121 #[serde(default = "default_auto_offset_reset")]
125 #[configurable(metadata(docs::examples = "example_auto_offset_reset_values()"))]
126 auto_offset_reset: String,
127
128 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
130 #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
131 #[configurable(metadata(docs::advanced))]
132 #[serde(default = "default_session_timeout_ms")]
133 #[configurable(metadata(docs::human_name = "Session Timeout"))]
134 session_timeout_ms: Duration,
135
136 #[serde(skip_serializing_if = "Option::is_none")]
146 #[configurable(metadata(docs::examples = 2500, docs::examples = 5000))]
147 #[configurable(metadata(docs::advanced))]
148 #[configurable(metadata(docs::human_name = "Drain Timeout"))]
149 drain_timeout_ms: Option<u64>,
150
151 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
153 #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))]
154 #[configurable(metadata(docs::advanced))]
155 #[serde(default = "default_socket_timeout_ms")]
156 #[configurable(metadata(docs::human_name = "Socket Timeout"))]
157 socket_timeout_ms: Duration,
158
159 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
161 #[configurable(metadata(docs::examples = 50, docs::examples = 100))]
162 #[configurable(metadata(docs::advanced))]
163 #[serde(default = "default_fetch_wait_max_ms")]
164 #[configurable(metadata(docs::human_name = "Max Fetch Wait Time"))]
165 fetch_wait_max_ms: Duration,
166
167 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
169 #[serde(default = "default_commit_interval_ms")]
170 #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
171 #[configurable(metadata(docs::human_name = "Commit Interval"))]
172 commit_interval_ms: Duration,
173
174 #[serde(default = "default_key_field")]
180 #[configurable(metadata(docs::examples = "message_key"))]
181 key_field: OptionalValuePath,
182
183 #[serde(default = "default_topic_key")]
189 #[configurable(metadata(docs::examples = "topic"))]
190 topic_key: OptionalValuePath,
191
192 #[serde(default = "default_partition_key")]
198 #[configurable(metadata(docs::examples = "partition"))]
199 partition_key: OptionalValuePath,
200
201 #[serde(default = "default_offset_key")]
207 #[configurable(metadata(docs::examples = "offset"))]
208 offset_key: OptionalValuePath,
209
210 #[serde(default = "default_headers_key")]
216 #[configurable(metadata(docs::examples = "headers"))]
217 headers_key: OptionalValuePath,
218
219 #[configurable(metadata(docs::examples = "example_librdkafka_options()"))]
223 #[configurable(metadata(docs::advanced))]
224 #[configurable(metadata(
225 docs::additional_props_description = "A librdkafka configuration option."
226 ))]
227 librdkafka_options: Option<HashMap<String, String>>,
228
229 #[serde(flatten)]
230 auth: kafka::KafkaAuthConfig,
231
232 #[configurable(derived)]
233 #[configurable(metadata(docs::advanced))]
234 #[serde(default = "default_framing_message_based")]
235 #[derivative(Default(value = "default_framing_message_based()"))]
236 framing: FramingConfig,
237
238 #[configurable(derived)]
239 #[serde(default = "default_decoding")]
240 #[derivative(Default(value = "default_decoding()"))]
241 decoding: DeserializerConfig,
242
243 #[configurable(derived)]
244 #[serde(default, deserialize_with = "bool_or_struct")]
245 acknowledgements: SourceAcknowledgementsConfig,
246
247 #[configurable(metadata(docs::hidden))]
249 #[serde(default)]
250 log_namespace: Option<bool>,
251
252 #[configurable(derived)]
253 #[serde(default)]
254 metrics: Metrics,
255}
256
257impl KafkaSourceConfig {
258 fn keys(&self) -> Keys {
259 Keys::from(log_schema(), self)
260 }
261}
262
263const fn default_session_timeout_ms() -> Duration {
264 Duration::from_millis(10000) }
266
267const fn default_socket_timeout_ms() -> Duration {
268 Duration::from_millis(60000) }
270
271const fn default_fetch_wait_max_ms() -> Duration {
272 Duration::from_millis(100) }
274
275const fn default_commit_interval_ms() -> Duration {
276 Duration::from_millis(5000)
277}
278
279fn default_auto_offset_reset() -> String {
280 "largest".into() }
282
283fn default_key_field() -> OptionalValuePath {
284 OptionalValuePath::from(owned_value_path!("message_key"))
285}
286
287fn default_topic_key() -> OptionalValuePath {
288 OptionalValuePath::from(owned_value_path!("topic"))
289}
290
291fn default_partition_key() -> OptionalValuePath {
292 OptionalValuePath::from(owned_value_path!("partition"))
293}
294
295fn default_offset_key() -> OptionalValuePath {
296 OptionalValuePath::from(owned_value_path!("offset"))
297}
298
299fn default_headers_key() -> OptionalValuePath {
300 OptionalValuePath::from(owned_value_path!("headers"))
301}
302
303const fn example_auto_offset_reset_values() -> [&'static str; 7] {
304 [
305 "smallest",
306 "earliest",
307 "beginning",
308 "largest",
309 "latest",
310 "end",
311 "error",
312 ]
313}
314
315fn example_librdkafka_options() -> HashMap<String, String> {
316 HashMap::<_, _>::from_iter([
317 ("client.id".to_string(), "${ENV_VAR}".to_string()),
318 ("fetch.error.backoff.ms".to_string(), "1000".to_string()),
319 ("socket.send.buffer.bytes".to_string(), "100".to_string()),
320 ])
321}
322
323impl_generate_config_from_default!(KafkaSourceConfig);
324
325#[async_trait::async_trait]
326#[typetag::serde(name = "kafka")]
327impl SourceConfig for KafkaSourceConfig {
328 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
329 let log_namespace = cx.log_namespace(self.log_namespace);
330
331 let decoder =
332 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
333 .build()?;
334 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
335
336 if let Some(d) = self.drain_timeout_ms {
337 snafu::ensure!(
338 Duration::from_millis(d) <= self.session_timeout_ms,
339 InvalidDrainTimeoutSnafu {
340 value: d,
341 session_timeout_ms: self.session_timeout_ms
342 }
343 );
344 }
345
346 let (consumer, callback_rx) = create_consumer(self, acknowledgements)?;
347
348 Ok(Box::pin(kafka_source(
349 self.clone(),
350 consumer,
351 callback_rx,
352 decoder,
353 cx.out,
354 cx.shutdown,
355 false,
356 log_namespace,
357 )))
358 }
359
360 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
361 let log_namespace = global_log_namespace.merge(self.log_namespace);
362 let keys = self.keys();
363
364 let schema_definition = self
365 .decoding
366 .schema_definition(log_namespace)
367 .with_standard_vector_source_metadata()
368 .with_source_metadata(
369 Self::NAME,
370 keys.timestamp.map(LegacyKey::Overwrite),
371 &owned_value_path!("timestamp"),
372 Kind::timestamp(),
373 Some("timestamp"),
374 )
375 .with_source_metadata(
376 Self::NAME,
377 keys.topic.clone().map(LegacyKey::Overwrite),
378 &owned_value_path!("topic"),
379 Kind::bytes(),
380 None,
381 )
382 .with_source_metadata(
383 Self::NAME,
384 keys.partition.clone().map(LegacyKey::Overwrite),
385 &owned_value_path!("partition"),
386 Kind::bytes(),
387 None,
388 )
389 .with_source_metadata(
390 Self::NAME,
391 keys.offset.clone().map(LegacyKey::Overwrite),
392 &owned_value_path!("offset"),
393 Kind::bytes(),
394 None,
395 )
396 .with_source_metadata(
397 Self::NAME,
398 keys.headers.clone().map(LegacyKey::Overwrite),
399 &owned_value_path!("headers"),
400 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
401 None,
402 )
403 .with_source_metadata(
404 Self::NAME,
405 keys.key_field.clone().map(LegacyKey::Overwrite),
406 &owned_value_path!("message_key"),
407 Kind::bytes(),
408 None,
409 );
410
411 vec![SourceOutput::new_maybe_logs(
412 self.decoding.output_type(),
413 schema_definition,
414 )]
415 }
416
417 fn can_acknowledge(&self) -> bool {
418 true
419 }
420}
421
422#[allow(clippy::too_many_arguments)]
423async fn kafka_source(
424 config: KafkaSourceConfig,
425 consumer: StreamConsumer<KafkaSourceContext>,
426 callback_rx: UnboundedReceiver<KafkaCallback>,
427 decoder: Decoder,
428 out: SourceSender,
429 shutdown: ShutdownSignal,
430 eof: bool,
431 log_namespace: LogNamespace,
432) -> Result<(), ()> {
433 let span = info_span!("kafka_source");
434 let consumer = Arc::new(consumer);
435
436 consumer
437 .context()
438 .consumer
439 .set(Arc::downgrade(&consumer))
440 .expect("Error setting up consumer context.");
441
442 let (eof_tx, eof_rx) = eof.then(oneshot::channel::<()>).unzip();
444
445 let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
446 if let Err(e) = consumer.subscribe(&topics).context(SubscribeSnafu) {
447 error!("{}", e);
448 return Err(());
449 }
450
451 let coordination_task = {
452 let span = span.clone();
453 let consumer = Arc::clone(&consumer);
454 let drain_timeout_ms = config
455 .drain_timeout_ms
456 .map_or(config.session_timeout_ms / 2, Duration::from_millis);
457 let consumer_state =
458 ConsumerStateInner::<Consuming>::new(config, decoder, out, log_namespace, span);
459 tokio::spawn(async move {
460 coordinate_kafka_callbacks(
461 consumer,
462 callback_rx,
463 consumer_state,
464 drain_timeout_ms,
465 eof_tx,
466 )
467 .await;
468 })
469 };
470
471 let client_task = {
472 let consumer = Arc::clone(&consumer);
473 tokio::task::spawn_blocking(move || {
474 let _enter = span.enter();
475 drive_kafka_consumer(consumer, shutdown, eof_rx);
476 })
477 };
478
479 _ = tokio::join!(client_task, coordination_task);
480 consumer.context().commit_consumer_state();
481
482 Ok(())
483}
484
485struct ConsumerStateInner<S> {
503 config: KafkaSourceConfig,
504 decoder: Decoder,
505 out: SourceSender,
506 log_namespace: LogNamespace,
507 consumer_state: S,
508}
509struct Consuming {
510 span: Span,
512}
513struct Draining {
514 signal: SyncSender<()>,
519
520 expect_drain: HashSet<TopicPartition>,
526
527 shutdown: bool,
531
532 span: Span,
534}
535type OptionDeadline = OptionFuture<Pin<Box<Sleep>>>;
536enum ConsumerState {
537 Consuming(ConsumerStateInner<Consuming>),
538 Draining(ConsumerStateInner<Draining>),
539 Complete,
540}
541impl Draining {
542 fn new(signal: SyncSender<()>, shutdown: bool, span: Span) -> Self {
543 Self {
544 signal,
545 shutdown,
546 expect_drain: HashSet::new(),
547 span,
548 }
549 }
550
551 fn is_complete(&self) -> bool {
552 self.expect_drain.is_empty()
553 }
554}
555
556impl<C> ConsumerStateInner<C> {
557 fn complete(self, _deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
558 (None.into(), ConsumerState::Complete)
559 }
560}
561
562impl ConsumerStateInner<Consuming> {
563 const fn new(
564 config: KafkaSourceConfig,
565 decoder: Decoder,
566 out: SourceSender,
567 log_namespace: LogNamespace,
568 span: Span,
569 ) -> Self {
570 Self {
571 config,
572 decoder,
573 out,
574 log_namespace,
575 consumer_state: Consuming { span },
576 }
577 }
578
579 fn consume_partition(
584 &self,
585 join_set: &mut JoinSet<(TopicPartition, PartitionConsumerStatus)>,
586 tp: TopicPartition,
587 consumer: Arc<StreamConsumer<KafkaSourceContext>>,
588 p: StreamPartitionQueue<KafkaSourceContext>,
589 acknowledgements: bool,
590 exit_eof: bool,
591 ) -> (oneshot::Sender<()>, tokio::task::AbortHandle) {
592 let keys = self.config.keys();
593 let decoder = self.decoder.clone();
594 let log_namespace = self.log_namespace;
595 let mut out = self.out.clone();
596
597 let (end_tx, mut end_signal) = oneshot::channel::<()>();
598
599 let handle = join_set.spawn(async move {
600 let mut messages = p.stream();
601 let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
602
603 let mut finalizer = Some(finalizer);
607
608 let mut status = PartitionConsumerStatus::NormalExit;
609
610 loop {
611 tokio::select!(
612 biased;
616
617 _ = &mut end_signal, if finalizer.is_some() => {
619 finalizer.take();
620 },
621
622 ack = ack_stream.next() => match ack {
623 Some((status, entry)) => {
624 if status == BatchStatus::Delivered
625 && let Err(error) = consumer.store_offset(&entry.topic, entry.partition, entry.offset) {
626 emit!(KafkaOffsetUpdateError { error });
627 }
628 }
629 None if finalizer.is_none() => {
630 debug!("Acknowledgement stream complete for partition {}:{}.", &tp.0, tp.1);
631 break
632 }
633 None => {
634 debug!("Acknowledgement stream empty for {}:{}", &tp.0, tp.1);
635 }
636 },
637
638 message = messages.next(), if finalizer.is_some() => match message {
639 None => unreachable!("MessageStream never calls Ready(None)"),
640 Some(Err(error)) => match error {
641 rdkafka::error::KafkaError::PartitionEOF(partition) if exit_eof => {
642 debug!("EOF for partition {}.", partition);
643 status = PartitionConsumerStatus::PartitionEOF;
644 finalizer.take();
645 },
646 _ => emit!(KafkaReadError { error }),
647 },
648 Some(Ok(msg)) => {
649 emit!(KafkaBytesReceived {
650 byte_size: msg.payload_len(),
651 protocol: "tcp",
652 topic: msg.topic(),
653 partition: msg.partition(),
654 });
655 parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await;
656 }
657 },
658 )
659 }
660 (tp, status)
661 }.instrument(self.consumer_state.span.clone()));
662 (end_tx, handle)
663 }
664
665 fn begin_drain(
668 self,
669 max_drain_ms: Duration,
670 sig: SyncSender<()>,
671 shutdown: bool,
672 ) -> (OptionDeadline, ConsumerStateInner<Draining>) {
673 let deadline = Box::pin(tokio::time::sleep(max_drain_ms));
674
675 let draining = ConsumerStateInner {
676 config: self.config,
677 decoder: self.decoder,
678 out: self.out,
679 log_namespace: self.log_namespace,
680 consumer_state: Draining::new(sig, shutdown, self.consumer_state.span),
681 };
682
683 (Some(deadline).into(), draining)
684 }
685
686 pub const fn keep_consuming(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
687 (deadline, ConsumerState::Consuming(self))
688 }
689}
690
691impl ConsumerStateInner<Draining> {
692 fn revoke_partition(&mut self, tp: TopicPartition, end_signal: oneshot::Sender<()>) {
695 _ = end_signal.send(());
699 self.consumer_state.expect_drain.insert(tp);
700 }
701
702 fn partition_drained(&mut self, tp: TopicPartition) {
706 _ = self.consumer_state.signal.send(());
709 self.consumer_state.expect_drain.remove(&tp);
710 }
711
712 fn is_drain_complete(&self) -> bool {
714 self.consumer_state.is_complete()
715 }
716
717 fn finish_drain(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
720 if self.consumer_state.shutdown {
721 self.complete(deadline)
722 } else {
723 (
724 None.into(),
725 ConsumerState::Consuming(ConsumerStateInner {
726 config: self.config,
727 decoder: self.decoder,
728 out: self.out,
729 log_namespace: self.log_namespace,
730 consumer_state: Consuming {
731 span: self.consumer_state.span,
732 },
733 }),
734 )
735 }
736 }
737
738 pub const fn keep_draining(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
739 (deadline, ConsumerState::Draining(self))
740 }
741}
742
743async fn coordinate_kafka_callbacks(
744 consumer: Arc<StreamConsumer<KafkaSourceContext>>,
745 mut callbacks: UnboundedReceiver<KafkaCallback>,
746 consumer_state: ConsumerStateInner<Consuming>,
747 max_drain_ms: Duration,
748 mut eof: Option<oneshot::Sender<()>>,
749) {
750 let mut drain_deadline: OptionFuture<_> = None.into();
751 let mut consumer_state = ConsumerState::Consuming(consumer_state);
752
753 let mut end_signals: HashMap<TopicPartition, oneshot::Sender<()>> = HashMap::new();
756
757 let mut partition_consumers: JoinSet<(TopicPartition, PartitionConsumerStatus)> =
762 Default::default();
763
764 let mut abort_handles: HashMap<TopicPartition, tokio::task::AbortHandle> = HashMap::new();
766
767 let exit_eof = eof.is_some();
768
769 while let ConsumerState::Consuming(_) | ConsumerState::Draining(_) = consumer_state {
770 tokio::select! {
771 Some(Ok((finished_partition, status))) = partition_consumers.join_next(), if !partition_consumers.is_empty() => {
772 debug!("Partition consumer finished for {}:{}", &finished_partition.0, finished_partition.1);
773 end_signals.remove(&finished_partition);
775 abort_handles.remove(&finished_partition);
776
777 (drain_deadline, consumer_state) = match consumer_state {
778 ConsumerState::Complete => unreachable!("Partition consumer finished after completion."),
779 ConsumerState::Draining(mut state) => {
780 state.partition_drained(finished_partition);
781
782 if state.is_drain_complete() {
783 debug!("All expected partitions have drained.");
784 state.finish_drain(drain_deadline)
785 } else {
786 state.keep_draining(drain_deadline)
787 }
788 },
789 ConsumerState::Consuming(state) => {
790 if !exit_eof {
794 debug!("Partition consumer task finished, while not in draining mode.");
795 }
796 state.keep_consuming(drain_deadline)
797 },
798 };
799
800 if exit_eof && status == PartitionConsumerStatus::PartitionEOF && partition_consumers.is_empty() {
805 debug!("All partitions have exited or reached EOF.");
806 let _ = eof.take().map(|e| e.send(()));
807 }
808 },
809 Some(callback) = callbacks.recv() => match callback {
810 KafkaCallback::PartitionsAssigned(mut assigned_partitions, done) => match consumer_state {
811 ConsumerState::Complete => unreachable!("Partition assignment received after completion."),
812 ConsumerState::Draining(_) => error!("Partition assignment received while draining revoked partitions, maybe an invalid assignment."),
813 ConsumerState::Consuming(ref consumer_state) => {
814 let acks = consumer.context().acknowledgements;
815 for tp in assigned_partitions.drain(0..) {
816 let topic = tp.0.as_str();
817 let partition = tp.1;
818 match consumer.split_partition_queue(topic, partition) { Some(pq) => {
819 debug!("Consuming partition {}:{}.", &tp.0, tp.1);
820 let (end_tx, handle) = consumer_state.consume_partition(&mut partition_consumers, tp.clone(), Arc::clone(&consumer), pq, acks, exit_eof);
821 abort_handles.insert(tp.clone(), handle);
822 end_signals.insert(tp, end_tx);
823 } _ => {
824 warn!("Failed to get queue for assigned partition {}:{}.", &tp.0, tp.1);
825 }}
826 }
827 drop(done);
829 }
830 },
831 KafkaCallback::PartitionsRevoked(mut revoked_partitions, drain) => (drain_deadline, consumer_state) = match consumer_state {
832 ConsumerState::Complete => unreachable!("Partitions revoked after completion."),
833 ConsumerState::Draining(d) => {
834 warn!("Kafka client is already draining revoked partitions.");
838 d.keep_draining(drain_deadline)
839 },
840 ConsumerState::Consuming(state) => {
841 let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, false);
842
843 for tp in revoked_partitions.drain(0..) {
844 match end_signals.remove(&tp) { Some(end) => {
845 debug!("Revoking partition {}:{}", &tp.0, tp.1);
846 state.revoke_partition(tp, end);
847 } _ => {
848 debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
849 }}
850 }
851
852 state.keep_draining(deadline)
853 }
854 },
855 KafkaCallback::ShuttingDown(drain) => (drain_deadline, consumer_state) = match consumer_state {
856 ConsumerState::Complete => unreachable!("Shutdown received after completion."),
857 ConsumerState::Draining(state) => {
860 error!("Kafka client handled a shutdown signal while a rebalance was in progress.");
863 callbacks.close();
864 state.keep_draining(drain_deadline)
865 },
866 ConsumerState::Consuming(state) => {
867 callbacks.close();
868 let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, true);
869 if let Ok(tpl) = consumer.assignment() {
870 if tpl.capacity() == 0 {
872 return;
873 }
874 tpl.elements()
875 .iter()
876 .for_each(|el| {
877
878 let tp: TopicPartition = (el.topic().into(), el.partition());
879 match end_signals.remove(&tp) { Some(end) => {
880 debug!("Shutting down and revoking partition {}:{}", &tp.0, tp.1);
881 state.revoke_partition(tp, end);
882 } _ => {
883 debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
884 }}
885 });
886 }
887 if state.is_drain_complete() {
890 state.finish_drain(deadline)
891 } else {
892 state.keep_draining(deadline)
893 }
894 }
895 },
896 },
897
898 Some(_) = &mut drain_deadline => (drain_deadline, consumer_state) = match consumer_state {
899 ConsumerState::Complete => unreachable!("Drain deadline received after completion."),
900 ConsumerState::Consuming(state) => {
901 warn!("A drain deadline fired outside of draining mode.");
902 state.keep_consuming(None.into())
903 },
904 ConsumerState::Draining(mut draining) => {
905 debug!("Acknowledgement drain deadline reached. Dropping any pending ack streams for revoked partitions.");
906 for tp in draining.consumer_state.expect_drain.drain() {
907 if let Some(handle) = abort_handles.remove(&tp) {
908 handle.abort();
909 }
910 }
911 draining.finish_drain(drain_deadline)
912 }
913 },
914 }
915 }
916}
917
918fn drive_kafka_consumer(
919 consumer: Arc<StreamConsumer<KafkaSourceContext>>,
920 mut shutdown: ShutdownSignal,
921 eof: Option<oneshot::Receiver<()>>,
922) {
923 Handle::current().block_on(async move {
924 let mut eof: OptionFuture<_> = eof.into();
925 let mut stream = consumer.stream();
926 loop {
927 tokio::select! {
928 _ = &mut shutdown => {
929 consumer.context().shutdown();
930 break
931 },
932
933 Some(_) = &mut eof => {
934 consumer.context().shutdown();
935 break
936 },
937
938 message = stream.next() => match message {
941 None => unreachable!("MessageStream never returns Ready(None)"),
942 Some(Err(error)) => emit!(KafkaReadError { error }),
943 Some(Ok(_msg)) => {
944 unreachable!("Messages are consumed in dedicated tasks for each partition.")
945 }
946 },
947 }
948 }
949 });
950}
951
952async fn parse_message(
953 msg: BorrowedMessage<'_>,
954 decoder: Decoder,
955 keys: &'_ Keys,
956 out: &mut SourceSender,
957 acknowledgements: bool,
958 finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
959 log_namespace: LogNamespace,
960) {
961 if let Some((count, stream)) = parse_stream(&msg, decoder, keys, log_namespace) {
962 let (batch, receiver) = BatchNotifier::new_with_receiver();
963 let mut stream = stream.map(|event| {
964 if acknowledgements {
968 event.with_batch_notifier(&batch)
969 } else {
970 event
971 }
972 });
973 match out.send_event_stream(&mut stream).await {
974 Err(_) => {
975 emit!(StreamClosedError { count });
976 }
977 Ok(_) => {
978 drop(stream);
981 if let Some(f) = finalizer.as_ref() {
982 f.add(msg.into(), receiver)
983 }
984 }
985 }
986 }
987}
988
989fn parse_stream<'a>(
991 msg: &BorrowedMessage<'a>,
992 decoder: Decoder,
993 keys: &'a Keys,
994 log_namespace: LogNamespace,
995) -> Option<(usize, impl Stream<Item = Event> + 'a + use<'a>)> {
996 let payload = msg.payload()?; let rmsg = ReceivedMessage::from(msg);
999
1000 let payload = Cursor::new(Bytes::copy_from_slice(payload));
1001
1002 let mut stream = FramedRead::with_capacity(payload, decoder, msg.payload_len());
1003 let (count, _) = stream.size_hint();
1004 let stream = stream! {
1005 while let Some(result) = stream.next().await {
1006 match result {
1007 Ok((events, _byte_size)) => {
1008 emit!(KafkaEventsReceived {
1009 count: events.len(),
1010 byte_size: events.estimated_json_encoded_size_of(),
1011 topic: &rmsg.topic,
1012 partition: rmsg.partition,
1013 });
1014 for mut event in events {
1015 rmsg.apply(keys, &mut event, log_namespace);
1016 yield event;
1017 }
1018 },
1019 Err(error) => {
1020 if !error.can_continue() {
1023 break;
1024 }
1025 }
1026 }
1027 }
1028 }
1029 .boxed();
1030 Some((count, stream))
1031}
1032
1033#[derive(Clone, Debug)]
1034struct Keys {
1035 timestamp: Option<OwnedValuePath>,
1036 key_field: Option<OwnedValuePath>,
1037 topic: Option<OwnedValuePath>,
1038 partition: Option<OwnedValuePath>,
1039 offset: Option<OwnedValuePath>,
1040 headers: Option<OwnedValuePath>,
1041}
1042
1043impl Keys {
1044 fn from(schema: &LogSchema, config: &KafkaSourceConfig) -> Self {
1045 Self {
1046 timestamp: schema.timestamp_key().cloned(),
1047 key_field: config.key_field.path.clone(),
1048 topic: config.topic_key.path.clone(),
1049 partition: config.partition_key.path.clone(),
1050 offset: config.offset_key.path.clone(),
1051 headers: config.headers_key.path.clone(),
1052 }
1053 }
1054}
1055
1056struct ReceivedMessage {
1057 timestamp: Option<DateTime<Utc>>,
1058 key: Value,
1059 headers: ObjectMap,
1060 topic: String,
1061 partition: i32,
1062 offset: i64,
1063}
1064
1065impl ReceivedMessage {
1066 fn from(msg: &BorrowedMessage<'_>) -> Self {
1067 let timestamp = msg
1069 .timestamp()
1070 .to_millis()
1071 .and_then(|millis| Utc.timestamp_millis_opt(millis).latest());
1072
1073 let key = msg
1074 .key()
1075 .map(|key| Value::from(Bytes::from(key.to_owned())))
1076 .unwrap_or(Value::Null);
1077
1078 let mut headers_map = ObjectMap::new();
1079 if let Some(headers) = msg.headers() {
1080 for header in headers.iter() {
1081 if let Some(value) = header.value {
1082 headers_map.insert(
1083 header.key.into(),
1084 Value::from(Bytes::from(value.to_owned())),
1085 );
1086 }
1087 }
1088 }
1089
1090 Self {
1091 timestamp,
1092 key,
1093 headers: headers_map,
1094 topic: msg.topic().to_string(),
1095 partition: msg.partition(),
1096 offset: msg.offset(),
1097 }
1098 }
1099
1100 fn apply(&self, keys: &Keys, event: &mut Event, log_namespace: LogNamespace) {
1101 if let Event::Log(log) = event {
1102 match log_namespace {
1103 LogNamespace::Vector => {
1104 log_namespace.insert_standard_vector_source_metadata(
1109 log,
1110 KafkaSourceConfig::NAME,
1111 Utc::now(),
1112 );
1113 }
1114 LogNamespace::Legacy => {
1115 if let Some(source_type_key) = log_schema().source_type_key_target_path() {
1116 log.insert(source_type_key, KafkaSourceConfig::NAME);
1117 }
1118 }
1119 }
1120
1121 log_namespace.insert_source_metadata(
1122 KafkaSourceConfig::NAME,
1123 log,
1124 keys.key_field.as_ref().map(LegacyKey::Overwrite),
1125 path!("message_key"),
1126 self.key.clone(),
1127 );
1128
1129 log_namespace.insert_source_metadata(
1130 KafkaSourceConfig::NAME,
1131 log,
1132 keys.timestamp.as_ref().map(LegacyKey::Overwrite),
1133 path!("timestamp"),
1134 self.timestamp,
1135 );
1136
1137 log_namespace.insert_source_metadata(
1138 KafkaSourceConfig::NAME,
1139 log,
1140 keys.topic.as_ref().map(LegacyKey::Overwrite),
1141 path!("topic"),
1142 self.topic.clone(),
1143 );
1144
1145 log_namespace.insert_source_metadata(
1146 KafkaSourceConfig::NAME,
1147 log,
1148 keys.partition.as_ref().map(LegacyKey::Overwrite),
1149 path!("partition"),
1150 self.partition,
1151 );
1152
1153 log_namespace.insert_source_metadata(
1154 KafkaSourceConfig::NAME,
1155 log,
1156 keys.offset.as_ref().map(LegacyKey::Overwrite),
1157 path!("offset"),
1158 self.offset,
1159 );
1160
1161 log_namespace.insert_source_metadata(
1162 KafkaSourceConfig::NAME,
1163 log,
1164 keys.headers.as_ref().map(LegacyKey::Overwrite),
1165 path!("headers"),
1166 self.headers.clone(),
1167 );
1168 }
1169 }
1170}
1171
1172#[derive(Debug, Eq, PartialEq, Hash)]
1173struct FinalizerEntry {
1174 topic: String,
1175 partition: i32,
1176 offset: i64,
1177}
1178
1179impl<'a> From<BorrowedMessage<'a>> for FinalizerEntry {
1180 fn from(msg: BorrowedMessage<'a>) -> Self {
1181 Self {
1182 topic: msg.topic().into(),
1183 partition: msg.partition(),
1184 offset: msg.offset(),
1185 }
1186 }
1187}
1188
1189fn create_consumer(
1190 config: &KafkaSourceConfig,
1191 acknowledgements: bool,
1192) -> crate::Result<(
1193 StreamConsumer<KafkaSourceContext>,
1194 UnboundedReceiver<KafkaCallback>,
1195)> {
1196 let mut client_config = ClientConfig::new();
1197 client_config
1198 .set("group.id", &config.group_id)
1199 .set("bootstrap.servers", &config.bootstrap_servers)
1200 .set("auto.offset.reset", &config.auto_offset_reset)
1201 .set(
1202 "session.timeout.ms",
1203 config.session_timeout_ms.as_millis().to_string(),
1204 )
1205 .set(
1206 "socket.timeout.ms",
1207 config.socket_timeout_ms.as_millis().to_string(),
1208 )
1209 .set(
1210 "fetch.wait.max.ms",
1211 config.fetch_wait_max_ms.as_millis().to_string(),
1212 )
1213 .set("enable.partition.eof", "false")
1214 .set("enable.auto.commit", "true")
1215 .set(
1216 "auto.commit.interval.ms",
1217 config.commit_interval_ms.as_millis().to_string(),
1218 )
1219 .set("enable.auto.offset.store", "false")
1220 .set("statistics.interval.ms", "1000")
1221 .set("client.id", "vector");
1222
1223 config.auth.apply(&mut client_config)?;
1224
1225 if let Some(librdkafka_options) = &config.librdkafka_options {
1226 for (key, value) in librdkafka_options {
1227 client_config.set(key.as_str(), value.as_str());
1228 }
1229 }
1230
1231 let (callbacks, callback_rx) = mpsc::unbounded_channel();
1232 let consumer = client_config
1233 .create_with_context::<_, StreamConsumer<_>>(KafkaSourceContext::new(
1234 config.metrics.topic_lag_metric,
1235 acknowledgements,
1236 callbacks,
1237 Span::current(),
1238 ))
1239 .context(CreateSnafu)?;
1240
1241 Ok((consumer, callback_rx))
1242}
1243
1244type TopicPartition = (String, i32);
1245
1246#[derive(PartialEq)]
1250enum PartitionConsumerStatus {
1251 NormalExit,
1252 PartitionEOF,
1253}
1254
1255enum KafkaCallback {
1256 PartitionsAssigned(Vec<TopicPartition>, SyncSender<()>),
1257 PartitionsRevoked(Vec<TopicPartition>, SyncSender<()>),
1258 ShuttingDown(SyncSender<()>),
1259}
1260
1261struct KafkaSourceContext {
1262 acknowledgements: bool,
1263 stats: kafka::KafkaStatisticsContext,
1264
1265 callbacks: UnboundedSender<KafkaCallback>,
1267
1268 consumer: OnceLock<Weak<StreamConsumer<KafkaSourceContext>>>,
1270}
1271
1272impl KafkaSourceContext {
1273 fn new(
1274 expose_lag_metrics: bool,
1275 acknowledgements: bool,
1276 callbacks: UnboundedSender<KafkaCallback>,
1277 span: Span,
1278 ) -> Self {
1279 Self {
1280 stats: kafka::KafkaStatisticsContext {
1281 expose_lag_metrics,
1282 span,
1283 },
1284 acknowledgements,
1285 consumer: OnceLock::default(),
1286 callbacks,
1287 }
1288 }
1289
1290 fn shutdown(&self) {
1291 let (send, rendezvous) = sync_channel(0);
1292 if self
1293 .callbacks
1294 .send(KafkaCallback::ShuttingDown(send))
1295 .is_ok()
1296 {
1297 while rendezvous.recv().is_ok() {
1298 self.commit_consumer_state();
1299 }
1300 }
1301 }
1302
1303 fn consume_partitions(&self, tpl: &TopicPartitionList) {
1308 if tpl.capacity() == 0 {
1310 return;
1311 }
1312 let (send, rendezvous) = sync_channel(0);
1313 let _ = self.callbacks.send(KafkaCallback::PartitionsAssigned(
1314 tpl.elements()
1315 .iter()
1316 .map(|tp| (tp.topic().into(), tp.partition()))
1317 .collect(),
1318 send,
1319 ));
1320
1321 while rendezvous.recv().is_ok() {
1322 }
1324 }
1325
1326 fn revoke_partitions(&self, tpl: &TopicPartitionList) {
1332 let (send, rendezvous) = sync_channel(0);
1333 let _ = self.callbacks.send(KafkaCallback::PartitionsRevoked(
1334 tpl.elements()
1335 .iter()
1336 .map(|tp| (tp.topic().into(), tp.partition()))
1337 .collect(),
1338 send,
1339 ));
1340
1341 while rendezvous.recv().is_ok() {
1342 self.commit_consumer_state();
1343 }
1344 }
1345
1346 fn commit_consumer_state(&self) {
1347 if let Some(consumer) = self
1348 .consumer
1349 .get()
1350 .expect("Consumer reference was not initialized.")
1351 .upgrade()
1352 {
1353 match consumer.commit_consumer_state(CommitMode::Sync) {
1354 Ok(_) | Err(KafkaError::ConsumerCommit(RDKafkaErrorCode::NoOffset)) => {
1355 }
1357 Err(error) => emit!(KafkaOffsetUpdateError { error }),
1358 }
1359 }
1360 }
1361}
1362
1363impl ClientContext for KafkaSourceContext {
1364 fn stats(&self, statistics: Statistics) {
1365 self.stats.stats(statistics)
1366 }
1367}
1368
1369impl ConsumerContext for KafkaSourceContext {
1370 fn pre_rebalance(&self, _base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance) {
1371 match rebalance {
1372 Rebalance::Assign(tpl) => self.consume_partitions(tpl),
1373
1374 Rebalance::Revoke(tpl) => {
1375 self.revoke_partitions(tpl);
1376 self.commit_consumer_state();
1377 }
1378
1379 Rebalance::Error(message) => {
1380 error!("Error during Kafka consumer group rebalance: {}.", message);
1381 }
1382 }
1383 }
1384}
1385
1386#[cfg(test)]
1387mod test {
1388 use vector_lib::{lookup::OwnedTargetPath, schema::Definition};
1389
1390 use super::*;
1391
1392 pub fn kafka_host() -> String {
1393 std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
1394 }
1395 pub fn kafka_port() -> u16 {
1396 let port = std::env::var("KAFKA_PORT").unwrap_or_else(|_| "9091".into());
1397 port.parse().expect("Invalid port number")
1398 }
1399
1400 pub fn kafka_address() -> String {
1401 format!("{}:{}", kafka_host(), kafka_port())
1402 }
1403
1404 #[test]
1405 fn generate_config() {
1406 crate::test_util::test_generate_config::<KafkaSourceConfig>();
1407 }
1408
1409 pub(super) fn make_config(
1410 topic: &str,
1411 group: &str,
1412 log_namespace: LogNamespace,
1413 librdkafka_options: Option<HashMap<String, String>>,
1414 ) -> KafkaSourceConfig {
1415 KafkaSourceConfig {
1416 bootstrap_servers: kafka_address(),
1417 topics: vec![topic.into()],
1418 group_id: group.into(),
1419 auto_offset_reset: "beginning".into(),
1420 session_timeout_ms: Duration::from_millis(6000),
1421 commit_interval_ms: Duration::from_millis(1),
1422 librdkafka_options,
1423 key_field: default_key_field(),
1424 topic_key: default_topic_key(),
1425 partition_key: default_partition_key(),
1426 offset_key: default_offset_key(),
1427 headers_key: default_headers_key(),
1428 socket_timeout_ms: Duration::from_millis(60000),
1429 fetch_wait_max_ms: Duration::from_millis(100),
1430 log_namespace: Some(log_namespace == LogNamespace::Vector),
1431 ..Default::default()
1432 }
1433 }
1434
1435 #[test]
1436 fn test_output_schema_definition_vector_namespace() {
1437 let definitions = make_config("topic", "group", LogNamespace::Vector, None)
1438 .outputs(LogNamespace::Vector)
1439 .remove(0)
1440 .schema_definition(true);
1441
1442 assert_eq!(
1443 definitions,
1444 Some(
1445 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1446 .with_meaning(OwnedTargetPath::event_root(), "message")
1447 .with_metadata_field(
1448 &owned_value_path!("kafka", "timestamp"),
1449 Kind::timestamp(),
1450 Some("timestamp")
1451 )
1452 .with_metadata_field(
1453 &owned_value_path!("kafka", "message_key"),
1454 Kind::bytes(),
1455 None
1456 )
1457 .with_metadata_field(&owned_value_path!("kafka", "topic"), Kind::bytes(), None)
1458 .with_metadata_field(
1459 &owned_value_path!("kafka", "partition"),
1460 Kind::bytes(),
1461 None
1462 )
1463 .with_metadata_field(&owned_value_path!("kafka", "offset"), Kind::bytes(), None)
1464 .with_metadata_field(
1465 &owned_value_path!("kafka", "headers"),
1466 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1467 None
1468 )
1469 .with_metadata_field(
1470 &owned_value_path!("vector", "ingest_timestamp"),
1471 Kind::timestamp(),
1472 None
1473 )
1474 .with_metadata_field(
1475 &owned_value_path!("vector", "source_type"),
1476 Kind::bytes(),
1477 None
1478 )
1479 )
1480 )
1481 }
1482
1483 #[test]
1484 fn test_output_schema_definition_legacy_namespace() {
1485 let definitions = make_config("topic", "group", LogNamespace::Legacy, None)
1486 .outputs(LogNamespace::Legacy)
1487 .remove(0)
1488 .schema_definition(true);
1489
1490 assert_eq!(
1491 definitions,
1492 Some(
1493 Definition::new_with_default_metadata(Kind::json(), [LogNamespace::Legacy])
1494 .unknown_fields(Kind::undefined())
1495 .with_event_field(
1496 &owned_value_path!("message"),
1497 Kind::bytes(),
1498 Some("message")
1499 )
1500 .with_event_field(
1501 &owned_value_path!("timestamp"),
1502 Kind::timestamp(),
1503 Some("timestamp")
1504 )
1505 .with_event_field(&owned_value_path!("message_key"), Kind::bytes(), None)
1506 .with_event_field(&owned_value_path!("topic"), Kind::bytes(), None)
1507 .with_event_field(&owned_value_path!("partition"), Kind::bytes(), None)
1508 .with_event_field(&owned_value_path!("offset"), Kind::bytes(), None)
1509 .with_event_field(
1510 &owned_value_path!("headers"),
1511 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1512 None
1513 )
1514 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1515 )
1516 )
1517 }
1518
1519 #[tokio::test]
1520 async fn consumer_create_ok() {
1521 let config = make_config("topic", "group", LogNamespace::Legacy, None);
1522 assert!(create_consumer(&config, true).is_ok());
1523 }
1524
1525 #[tokio::test]
1526 async fn consumer_create_incorrect_auto_offset_reset() {
1527 let config = KafkaSourceConfig {
1528 auto_offset_reset: "incorrect-auto-offset-reset".to_string(),
1529 ..make_config("topic", "group", LogNamespace::Legacy, None)
1530 };
1531 assert!(create_consumer(&config, true).is_err());
1532 }
1533}
1534
1535#[cfg(feature = "kafka-integration-tests")]
1536#[cfg(test)]
1537mod integration_test {
1538 use std::time::Duration;
1539
1540 use chrono::{DateTime, SubsecRound, Utc};
1541 use futures::Stream;
1542 use futures_util::stream::FuturesUnordered;
1543 use rdkafka::{
1544 Offset, TopicPartitionList,
1545 admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
1546 client::DefaultClientContext,
1547 config::{ClientConfig, FromClientConfig},
1548 consumer::BaseConsumer,
1549 message::{Header, OwnedHeaders},
1550 producer::{FutureProducer, FutureRecord},
1551 util::Timeout,
1552 };
1553 use stream_cancel::{Trigger, Tripwire};
1554 use tokio::time::sleep;
1555 use vector_lib::event::EventStatus;
1556 use vrl::{event_path, value};
1557
1558 use super::{test::*, *};
1559 use crate::{
1560 SourceSender,
1561 event::{EventArray, EventContainer},
1562 shutdown::ShutdownSignal,
1563 test_util::{collect_n, components::assert_source_compliance, random_string},
1564 };
1565
1566 const KEY: &str = "my key";
1567 const TEXT: &str = "my message";
1568 const HEADER_KEY: &str = "my header";
1569 const HEADER_VALUE: &str = "my header value";
1570
1571 fn kafka_test_topic() -> String {
1572 std::env::var("KAFKA_TEST_TOPIC")
1573 .unwrap_or_else(|_| format!("test-topic-{}", random_string(10)))
1574 }
1575 fn kafka_max_bytes() -> String {
1576 std::env::var("KAFKA_MAX_BYTES").unwrap_or_else(|_| "1024".into())
1577 }
1578
1579 fn client_config<T: FromClientConfig>(group: Option<&str>) -> T {
1580 let mut client = ClientConfig::new();
1581 client.set("bootstrap.servers", kafka_address());
1582 client.set("produce.offset.report", "true");
1583 client.set("message.timeout.ms", "5000");
1584 client.set("auto.commit.interval.ms", "1");
1585 if let Some(group) = group {
1586 client.set("group.id", group);
1587 }
1588 client.create().expect("Producer creation error")
1589 }
1590
1591 async fn send_events(topic: String, partitions: i32, count: usize) -> DateTime<Utc> {
1592 let now = Utc::now();
1593 let timestamp = now.timestamp_millis();
1594
1595 let producer: &FutureProducer = &client_config(None);
1596 let topic_name = topic.as_ref();
1597
1598 create_topic(topic_name, partitions).await;
1599
1600 (0..count)
1601 .map(|i| async move {
1602 let text = format!("{TEXT} {i:03}");
1603 let key = format!("{KEY} {i}");
1604 let record = FutureRecord::to(topic_name)
1605 .payload(&text)
1606 .key(&key)
1607 .timestamp(timestamp)
1608 .headers(OwnedHeaders::new().insert(Header {
1609 key: HEADER_KEY,
1610 value: Some(HEADER_VALUE),
1611 }));
1612 if let Err(error) = producer.send(record, Timeout::Never).await {
1613 panic!("Cannot send event to Kafka: {error:?}");
1614 }
1615 })
1616 .collect::<FuturesUnordered<_>>()
1617 .collect::<Vec<_>>()
1618 .await;
1619
1620 now
1621 }
1622
1623 async fn send_to_test_topic(partitions: i32, count: usize) -> (String, String, DateTime<Utc>) {
1624 let topic = kafka_test_topic();
1625 let group_id = format!("test-group-{}", random_string(10));
1626
1627 let sent_at = send_events(topic.clone(), partitions, count).await;
1628
1629 (topic, group_id, sent_at)
1630 }
1631
1632 #[tokio::test]
1633 async fn consumes_event_with_acknowledgements() {
1634 send_receive(true, |_| false, 10, LogNamespace::Legacy).await;
1635 }
1636
1637 #[tokio::test]
1638 async fn consumes_event_with_acknowledgements_vector_namespace() {
1639 send_receive(true, |_| false, 10, LogNamespace::Vector).await;
1640 }
1641
1642 #[tokio::test]
1643 async fn consumes_event_without_acknowledgements() {
1644 send_receive(false, |_| false, 10, LogNamespace::Legacy).await;
1645 }
1646
1647 #[tokio::test]
1648 async fn consumes_event_without_acknowledgements_vector_namespace() {
1649 send_receive(false, |_| false, 10, LogNamespace::Vector).await;
1650 }
1651
1652 #[tokio::test]
1653 async fn handles_one_negative_acknowledgement() {
1654 send_receive(true, |n| n == 2, 10, LogNamespace::Legacy).await;
1655 }
1656
1657 #[tokio::test]
1658 async fn handles_one_negative_acknowledgement_vector_namespace() {
1659 send_receive(true, |n| n == 2, 10, LogNamespace::Vector).await;
1660 }
1661
1662 #[tokio::test]
1663 async fn handles_permanent_negative_acknowledgement() {
1664 send_receive(true, |n| n >= 2, 2, LogNamespace::Legacy).await;
1665 }
1666
1667 #[tokio::test]
1668 async fn handles_permanent_negative_acknowledgement_vector_namespace() {
1669 send_receive(true, |n| n >= 2, 2, LogNamespace::Vector).await;
1670 }
1671
1672 async fn send_receive(
1673 acknowledgements: bool,
1674 error_at: impl Fn(usize) -> bool,
1675 receive_count: usize,
1676 log_namespace: LogNamespace,
1677 ) {
1678 const SEND_COUNT: usize = 10;
1679
1680 let topic = format!("test-topic-{}", random_string(10));
1681 let group_id = format!("test-group-{}", random_string(10));
1682 let config = make_config(&topic, &group_id, log_namespace, None);
1683
1684 let now = send_events(topic.clone(), 1, 10).await;
1685
1686 let events = assert_source_compliance(&["protocol", "topic", "partition"], async move {
1687 let (tx, rx) = SourceSender::new_test_errors(error_at);
1688 let (trigger_shutdown, shutdown_done) =
1689 spawn_kafka(tx, config, acknowledgements, false, log_namespace);
1690 let events = collect_n(rx, SEND_COUNT).await;
1691 tokio::task::yield_now().await;
1694 drop(trigger_shutdown);
1695 shutdown_done.await;
1696
1697 events
1698 })
1699 .await;
1700
1701 let offset = fetch_tpl_offset(&group_id, &topic, 0);
1702 assert_eq!(offset, Offset::from_raw(receive_count as i64));
1703
1704 assert_eq!(events.len(), SEND_COUNT);
1705 for (i, event) in events.into_iter().enumerate() {
1706 if let LogNamespace::Legacy = log_namespace {
1707 assert_eq!(
1708 event.as_log()[log_schema().message_key().unwrap().to_string()],
1709 format!("{TEXT} {i:03}").into()
1710 );
1711 assert_eq!(event.as_log()["message_key"], format!("{KEY} {i}").into());
1712 assert_eq!(
1713 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1714 "kafka".into()
1715 );
1716 assert_eq!(
1717 event.as_log()[log_schema().timestamp_key().unwrap().to_string()],
1718 now.trunc_subsecs(3).into()
1719 );
1720 assert_eq!(event.as_log()["topic"], topic.clone().into());
1721 assert!(event.as_log().contains("partition"));
1722 assert!(event.as_log().contains("offset"));
1723 let mut expected_headers = ObjectMap::new();
1724 expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1725 assert_eq!(event.as_log()["headers"], Value::from(expected_headers));
1726 } else {
1727 let meta = event.as_log().metadata().value();
1728
1729 assert_eq!(
1730 meta.get(path!("vector", "source_type")).unwrap(),
1731 &value!(KafkaSourceConfig::NAME)
1732 );
1733 assert!(
1734 meta.get(path!("vector", "ingest_timestamp"))
1735 .unwrap()
1736 .is_timestamp()
1737 );
1738
1739 assert_eq!(
1740 event.as_log().value(),
1741 &value!(format!("{} {:03}", TEXT, i))
1742 );
1743 assert_eq!(
1744 meta.get(path!("kafka", "message_key")).unwrap(),
1745 &value!(format!("{} {}", KEY, i))
1746 );
1747
1748 assert_eq!(
1749 meta.get(path!("kafka", "timestamp")).unwrap(),
1750 &value!(now.trunc_subsecs(3))
1751 );
1752 assert_eq!(
1753 meta.get(path!("kafka", "topic")).unwrap(),
1754 &value!(topic.clone())
1755 );
1756 assert!(meta.get(path!("kafka", "partition")).unwrap().is_integer(),);
1757 assert!(meta.get(path!("kafka", "offset")).unwrap().is_integer(),);
1758
1759 let mut expected_headers = ObjectMap::new();
1760 expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1761 assert_eq!(
1762 meta.get(path!("kafka", "headers")).unwrap(),
1763 &Value::from(expected_headers)
1764 );
1765 }
1766 }
1767 }
1768
1769 fn make_rand_config() -> (String, String, KafkaSourceConfig) {
1770 let topic = format!("test-topic-{}", random_string(10));
1771 let group_id = format!("test-group-{}", random_string(10));
1772 let config = make_config(&topic, &group_id, LogNamespace::Legacy, None);
1773 (topic, group_id, config)
1774 }
1775
1776 fn delay_pipeline(
1777 id: usize,
1778 delay: Duration,
1779 status: EventStatus,
1780 ) -> (SourceSender, impl Stream<Item = EventArray> + Unpin) {
1781 let (pipe, recv) = SourceSender::new_test_sender_with_buffer(100);
1782 let recv = recv.into_stream();
1783 let recv = recv.then(move |item| async move {
1784 let mut events = item.events;
1785 events.iter_logs_mut().for_each(|log| {
1786 log.insert(event_path!("pipeline_id"), id.to_string());
1787 });
1788 sleep(delay).await;
1789 events.iter_events_mut().for_each(|mut event| {
1790 let metadata = event.metadata_mut();
1791 metadata.update_status(status);
1792 metadata.update_sources();
1793 });
1794 events
1795 });
1796 (pipe, Box::pin(recv))
1797 }
1798
1799 fn spawn_kafka(
1800 out: SourceSender,
1801 config: KafkaSourceConfig,
1802 acknowledgements: bool,
1803 eof: bool,
1804 log_namespace: LogNamespace,
1805 ) -> (Trigger, Tripwire) {
1806 let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
1807
1808 let decoder = DecodingConfig::new(
1809 config.framing.clone(),
1810 config.decoding.clone(),
1811 log_namespace,
1812 )
1813 .build()
1814 .unwrap();
1815
1816 let (consumer, callback_rx) = create_consumer(&config, acknowledgements).unwrap();
1817
1818 tokio::spawn(kafka_source(
1819 config,
1820 consumer,
1821 callback_rx,
1822 decoder,
1823 out,
1824 shutdown,
1825 eof,
1826 log_namespace,
1827 ));
1828 (trigger_shutdown, shutdown_done)
1829 }
1830
1831 fn fetch_tpl_offset(group_id: &str, topic: &str, partition: i32) -> Offset {
1832 let client: BaseConsumer = client_config(Some(group_id));
1833 client.subscribe(&[topic]).expect("Subscribing failed");
1834
1835 let mut tpl = TopicPartitionList::new();
1836 tpl.add_partition(topic, partition);
1837 client
1838 .committed_offsets(tpl, Duration::from_secs(1))
1839 .expect("Getting committed offsets failed")
1840 .find_partition(topic, partition)
1841 .expect("Missing topic/partition")
1842 .offset()
1843 }
1844
1845 async fn create_topic(topic: &str, partitions: i32) {
1846 let client: AdminClient<DefaultClientContext> = client_config(None);
1847 let topic_results = client
1848 .create_topics(
1849 [&NewTopic {
1850 name: topic,
1851 num_partitions: partitions,
1852 replication: TopicReplication::Fixed(1),
1853 config: vec![],
1854 }],
1855 &AdminOptions::default(),
1856 )
1857 .await
1858 .expect("create_topics failed");
1859
1860 for result in topic_results {
1861 if let Err((topic, err)) = result
1862 && err != rdkafka::types::RDKafkaErrorCode::TopicAlreadyExists
1863 {
1864 panic!("Creating a topic failed: {:?}", (topic, err))
1865 }
1866 }
1867 }
1868
1869 #[ignore]
1879 #[tokio::test]
1880 async fn handles_rebalance() {
1881 const NEVENTS: usize = 200;
1895 const DELAY: u64 = 100;
1896
1897 let (topic, group_id, config) = make_rand_config();
1898 create_topic(&topic, 2).await;
1899
1900 let _send_start = send_events(topic.clone(), 1, NEVENTS).await;
1901
1902 let (tx, rx1) = delay_pipeline(1, Duration::from_millis(200), EventStatus::Delivered);
1903 let (trigger_shutdown1, shutdown_done1) =
1904 spawn_kafka(tx, config.clone(), true, false, LogNamespace::Legacy);
1905 let events1 = tokio::spawn(collect_n(rx1, NEVENTS));
1906
1907 sleep(Duration::from_secs(1)).await;
1908
1909 let (tx, rx2) = delay_pipeline(2, Duration::from_millis(DELAY), EventStatus::Delivered);
1910 let (trigger_shutdown2, shutdown_done2) =
1911 spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1912 let events2 = tokio::spawn(collect_n(rx2, NEVENTS));
1913
1914 sleep(Duration::from_secs(5)).await;
1915
1916 drop(trigger_shutdown1);
1917 let events1 = events1.await.unwrap();
1918 shutdown_done1.await;
1919
1920 sleep(Duration::from_secs(5)).await;
1921
1922 drop(trigger_shutdown2);
1923 let events2 = events2.await.unwrap();
1924 shutdown_done2.await;
1925
1926 sleep(Duration::from_secs(1)).await;
1927
1928 assert!(!events1.is_empty());
1929 assert!(!events2.is_empty());
1930
1931 match fetch_tpl_offset(&group_id, &topic, 0) {
1932 Offset::Offset(offset) => {
1933 assert!((offset as isize - events1.len() as isize).abs() <= 1)
1934 }
1935 o => panic!("Invalid offset for partition 0 {o:?}"),
1936 }
1937
1938 match fetch_tpl_offset(&group_id, &topic, 1) {
1939 Offset::Offset(offset) => {
1940 assert!((offset as isize - events2.len() as isize).abs() <= 1)
1941 }
1942 o => panic!("Invalid offset for partition 0 {o:?}"),
1943 }
1944
1945 let mut all_events = events1
1946 .into_iter()
1947 .chain(events2.into_iter())
1948 .flat_map(map_logs)
1949 .collect::<Vec<String>>();
1950 all_events.sort();
1951
1952 }
1954
1955 #[tokio::test]
1956 async fn drains_acknowledgements_at_shutdown() {
1957 let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
1959 .unwrap_or_else(|_| "125000".into())
1960 .parse()
1961 .expect("Number of messages to send to kafka.");
1962 let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
1963 .unwrap_or_else(|_| format!("{send_count}"))
1964 .parse()
1965 .expect("Number of messages to expect consumers to process.");
1966 let delay_ms: u64 = std::env::var("KAFKA_SHUTDOWN_DELAY")
1967 .unwrap_or_else(|_| "2000".into())
1968 .parse()
1969 .expect("Number of milliseconds before shutting down first consumer.");
1970
1971 let (topic, group_id, _) = send_to_test_topic(1, send_count).await;
1972
1973 let mut opts = HashMap::new();
1976 opts.insert("enable.partition.eof".into(), "true".into());
1978 opts.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
1979 let events1 = {
1980 let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts.clone()));
1981 let (tx, rx) = SourceSender::new_test_errors(|_| false);
1982 let (trigger_shutdown, shutdown_done) =
1983 spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1984 let (events, _) = tokio::join!(rx.collect::<Vec<Event>>(), async move {
1985 sleep(Duration::from_millis(delay_ms)).await;
1986 drop(trigger_shutdown);
1987 });
1988 shutdown_done.await;
1989 events
1990 };
1991
1992 debug!("Consumer group.id: {}", &group_id);
1993 debug!(
1994 "First consumer read {} of {} messages.",
1995 events1.len(),
1996 expect_count
1997 );
1998
1999 let events2 = {
2001 let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts));
2002 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2003 let (trigger_shutdown, shutdown_done) =
2004 spawn_kafka(tx, config, true, true, LogNamespace::Legacy);
2005 let events = rx.collect::<Vec<Event>>().await;
2006 drop(trigger_shutdown);
2007 shutdown_done.await;
2008 events
2009 };
2010
2011 debug!(
2012 "Second consumer read {} of {} messages.",
2013 events2.len(),
2014 expect_count
2015 );
2016
2017 let total = events1.len() + events2.len();
2019 assert_ne!(
2020 events1.len(),
2021 0,
2022 "First batch of events should be non-zero (increase KAFKA_SHUTDOWN_DELAY?)"
2023 );
2024 assert_ne!(
2025 events2.len(),
2026 0,
2027 "Second batch of events should be non-zero (decrease KAFKA_SHUTDOWN_DELAY or increase KAFKA_SEND_COUNT?) "
2028 );
2029 assert_eq!(total, expect_count);
2030 }
2031
2032 async fn consume_with_rebalance(rebalance_strategy: String) {
2033 let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
2035 .unwrap_or_else(|_| "125000".into())
2036 .parse()
2037 .expect("Number of messages to send to kafka.");
2038 let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
2039 .unwrap_or_else(|_| format!("{send_count}"))
2040 .parse()
2041 .expect("Number of messages to expect consumers to process.");
2042 let delay_ms: u64 = std::env::var("KAFKA_CONSUMER_DELAY")
2043 .unwrap_or_else(|_| "2000".into())
2044 .parse()
2045 .expect("Number of milliseconds before shutting down first consumer.");
2046
2047 let (topic, group_id, _) = send_to_test_topic(6, send_count).await;
2048 debug!("Topic: {}", &topic);
2049 debug!("Consumer group.id: {}", &group_id);
2050
2051 let mut kafka_options = HashMap::new();
2054 kafka_options.insert("enable.partition.eof".into(), "true".into());
2055 kafka_options.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
2056 kafka_options.insert("partition.assignment.strategy".into(), rebalance_strategy);
2057 let config1 = make_config(
2058 &topic,
2059 &group_id,
2060 LogNamespace::Legacy,
2061 Some(kafka_options.clone()),
2062 );
2063 let config2 = config1.clone();
2064 let config3 = config1.clone();
2065 let config4 = config1.clone();
2066
2067 let (events1, events2, events3) = tokio::join!(
2068 async move {
2069 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2070 let (_trigger_shutdown, _shutdown_done) =
2071 spawn_kafka(tx, config1, true, true, LogNamespace::Legacy);
2072
2073 rx.collect::<Vec<Event>>().await
2074 },
2075 async move {
2076 sleep(Duration::from_millis(delay_ms)).await;
2077 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2078 let (_trigger_shutdown, _shutdown_done) =
2079 spawn_kafka(tx, config2, true, true, LogNamespace::Legacy);
2080
2081 rx.collect::<Vec<Event>>().await
2082 },
2083 async move {
2084 sleep(Duration::from_millis(delay_ms * 2)).await;
2085 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2086 let (_trigger_shutdown, _shutdown_done) =
2087 spawn_kafka(tx, config3, true, true, LogNamespace::Legacy);
2088
2089 rx.collect::<Vec<Event>>().await
2090 }
2091 );
2092
2093 let unconsumed = async move {
2094 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2095 let (_trigger_shutdown, _shutdown_done) =
2096 spawn_kafka(tx, config4, true, true, LogNamespace::Legacy);
2097
2098 rx.collect::<Vec<Event>>().await
2099 }
2100 .await;
2101
2102 debug!(
2103 "First consumer read {} of {} messages.",
2104 events1.len(),
2105 expect_count
2106 );
2107
2108 debug!(
2109 "Second consumer read {} of {} messages.",
2110 events2.len(),
2111 expect_count
2112 );
2113 debug!(
2114 "Third consumer read {} of {} messages.",
2115 events3.len(),
2116 expect_count
2117 );
2118
2119 let total = events1.len() + events2.len() + events3.len();
2121 assert_ne!(
2122 events1.len(),
2123 0,
2124 "First batch of events should be non-zero (increase delay?)"
2125 );
2126 assert_ne!(
2127 events2.len(),
2128 0,
2129 "Second batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2130 );
2131 assert_ne!(
2132 events3.len(),
2133 0,
2134 "Third batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2135 );
2136 assert_eq!(
2137 unconsumed.len(),
2138 0,
2139 "The first set of consumers should consume and ack all messages."
2140 );
2141 assert_eq!(total, expect_count);
2142 }
2143
2144 #[tokio::test]
2145 async fn drains_acknowledgements_during_rebalance_default_assignments() {
2146 consume_with_rebalance("range,roundrobin".into()).await;
2148 }
2149 #[tokio::test]
2150 async fn drains_acknowledgements_during_rebalance_sticky_assignments() {
2151 consume_with_rebalance("cooperative-sticky".into()).await;
2154 }
2155
2156 fn map_logs(events: EventArray) -> impl Iterator<Item = String> {
2157 events.into_events().map(|event| {
2158 let log = event.into_log();
2159 format!(
2160 "{} {} {} {}",
2161 log["message"].to_string_lossy(),
2162 log["topic"].to_string_lossy(),
2163 log["partition"].to_string_lossy(),
2164 log["offset"].to_string_lossy(),
2165 )
2166 })
2167 }
2168}