1use std::{
2 collections::{HashMap, HashSet},
3 io::Cursor,
4 pin::Pin,
5 sync::{
6 Arc, OnceLock, Weak,
7 mpsc::{SyncSender, sync_channel},
8 },
9 time::Duration,
10};
11
12use async_stream::stream;
13use bytes::Bytes;
14use chrono::{DateTime, TimeZone, Utc};
15use futures::{Stream, StreamExt};
16use futures_util::future::OptionFuture;
17use rdkafka::{
18 ClientConfig, ClientContext, Statistics, TopicPartitionList,
19 consumer::{
20 BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance, StreamConsumer,
21 stream_consumer::StreamPartitionQueue,
22 },
23 error::KafkaError,
24 message::{BorrowedMessage, Headers as _, Message},
25 types::RDKafkaErrorCode,
26};
27use serde_with::serde_as;
28use snafu::{ResultExt, Snafu};
29use tokio::{
30 runtime::Handle,
31 sync::{
32 mpsc::{self, UnboundedReceiver, UnboundedSender},
33 oneshot,
34 },
35 task::JoinSet,
36 time::Sleep,
37};
38use tracing::{Instrument, Span};
39use vector_lib::{
40 EstimatedJsonEncodedSizeOf,
41 codecs::{
42 DecoderFramedRead, StreamDecodingError,
43 decoding::{DeserializerConfig, FramingConfig},
44 },
45 config::{LegacyKey, LogNamespace},
46 configurable::configurable_component,
47 finalizer::OrderedFinalizer,
48 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
49};
50use vrl::value::{Kind, ObjectMap, kind::Collection};
51
52use crate::{
53 SourceSender,
54 codecs::{Decoder, DecodingConfig},
55 config::{
56 LogSchema, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
57 log_schema,
58 },
59 event::{BatchNotifier, BatchStatus, Event, Value},
60 internal_events::{
61 KafkaBytesReceived, KafkaEventsReceived, KafkaOffsetUpdateError, KafkaReadError,
62 StreamClosedError,
63 },
64 kafka,
65 serde::{bool_or_struct, default_decoding, default_framing_message_based},
66 shutdown::ShutdownSignal,
67};
68
69#[derive(Debug, Snafu)]
70enum BuildError {
71 #[snafu(display("The drain_timeout_ms ({}) must be less than session_timeout_ms ({})", value, session_timeout_ms.as_millis()))]
72 InvalidDrainTimeout {
73 value: u64,
74 session_timeout_ms: Duration,
75 },
76 #[snafu(display("Could not create Kafka consumer: {}", source))]
77 CreateError { source: rdkafka::error::KafkaError },
78 #[snafu(display("Could not subscribe to Kafka topics: {}", source))]
79 SubscribeError { source: rdkafka::error::KafkaError },
80}
81
82#[configurable_component]
84#[derive(Clone, Debug, Default)]
85struct Metrics {
86 pub topic_lag_metric: bool,
88}
89
90#[serde_as]
92#[configurable_component(source("kafka", "Collect logs from Apache Kafka."))]
93#[derive(Clone, Debug, Derivative)]
94#[derivative(Default)]
95#[serde(deny_unknown_fields)]
96pub struct KafkaSourceConfig {
97 #[configurable(metadata(docs::examples = "10.14.22.123:9092,10.14.23.332:9092"))]
104 bootstrap_servers: String,
105
106 #[configurable(metadata(
110 docs::examples = "^(prefix1|prefix2)-.+",
111 docs::examples = "topic-1",
112 docs::examples = "topic-2"
113 ))]
114 topics: Vec<String>,
115
116 #[configurable(metadata(docs::examples = "consumer-group-name"))]
118 group_id: String,
119
120 #[serde(default = "default_auto_offset_reset")]
124 #[configurable(metadata(docs::examples = "example_auto_offset_reset_values()"))]
125 auto_offset_reset: String,
126
127 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
129 #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
130 #[configurable(metadata(docs::advanced))]
131 #[serde(default = "default_session_timeout_ms")]
132 #[configurable(metadata(docs::human_name = "Session Timeout"))]
133 session_timeout_ms: Duration,
134
135 #[serde(skip_serializing_if = "Option::is_none")]
145 #[configurable(metadata(docs::examples = 2500, docs::examples = 5000))]
146 #[configurable(metadata(docs::advanced))]
147 #[configurable(metadata(docs::human_name = "Drain Timeout"))]
148 drain_timeout_ms: Option<u64>,
149
150 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
152 #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))]
153 #[configurable(metadata(docs::advanced))]
154 #[serde(default = "default_socket_timeout_ms")]
155 #[configurable(metadata(docs::human_name = "Socket Timeout"))]
156 socket_timeout_ms: Duration,
157
158 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
160 #[configurable(metadata(docs::examples = 50, docs::examples = 100))]
161 #[configurable(metadata(docs::advanced))]
162 #[serde(default = "default_fetch_wait_max_ms")]
163 #[configurable(metadata(docs::human_name = "Max Fetch Wait Time"))]
164 fetch_wait_max_ms: Duration,
165
166 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
168 #[serde(default = "default_commit_interval_ms")]
169 #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
170 #[configurable(metadata(docs::human_name = "Commit Interval"))]
171 commit_interval_ms: Duration,
172
173 #[serde(default = "default_key_field")]
179 #[configurable(metadata(docs::examples = "message_key"))]
180 key_field: OptionalValuePath,
181
182 #[serde(default = "default_topic_key")]
188 #[configurable(metadata(docs::examples = "topic"))]
189 topic_key: OptionalValuePath,
190
191 #[serde(default = "default_partition_key")]
197 #[configurable(metadata(docs::examples = "partition"))]
198 partition_key: OptionalValuePath,
199
200 #[serde(default = "default_offset_key")]
206 #[configurable(metadata(docs::examples = "offset"))]
207 offset_key: OptionalValuePath,
208
209 #[serde(default = "default_headers_key")]
215 #[configurable(metadata(docs::examples = "headers"))]
216 headers_key: OptionalValuePath,
217
218 #[configurable(metadata(docs::examples = "example_librdkafka_options()"))]
222 #[configurable(metadata(docs::advanced))]
223 #[configurable(metadata(
224 docs::additional_props_description = "A librdkafka configuration option."
225 ))]
226 librdkafka_options: Option<HashMap<String, String>>,
227
228 #[serde(flatten)]
229 auth: kafka::KafkaAuthConfig,
230
231 #[configurable(derived)]
232 #[configurable(metadata(docs::advanced))]
233 #[serde(default = "default_framing_message_based")]
234 #[derivative(Default(value = "default_framing_message_based()"))]
235 framing: FramingConfig,
236
237 #[configurable(derived)]
238 #[serde(default = "default_decoding")]
239 #[derivative(Default(value = "default_decoding()"))]
240 decoding: DeserializerConfig,
241
242 #[configurable(derived)]
243 #[serde(default, deserialize_with = "bool_or_struct")]
244 acknowledgements: SourceAcknowledgementsConfig,
245
246 #[configurable(metadata(docs::hidden))]
248 #[serde(default)]
249 log_namespace: Option<bool>,
250
251 #[configurable(derived)]
252 #[serde(default)]
253 metrics: Metrics,
254}
255
256impl KafkaSourceConfig {
257 fn keys(&self) -> Keys {
258 Keys::from(log_schema(), self)
259 }
260}
261
262const fn default_session_timeout_ms() -> Duration {
263 Duration::from_millis(10000) }
265
266const fn default_socket_timeout_ms() -> Duration {
267 Duration::from_millis(60000) }
269
270const fn default_fetch_wait_max_ms() -> Duration {
271 Duration::from_millis(100) }
273
274const fn default_commit_interval_ms() -> Duration {
275 Duration::from_millis(5000)
276}
277
278fn default_auto_offset_reset() -> String {
279 "largest".into() }
281
282fn default_key_field() -> OptionalValuePath {
283 OptionalValuePath::from(owned_value_path!("message_key"))
284}
285
286fn default_topic_key() -> OptionalValuePath {
287 OptionalValuePath::from(owned_value_path!("topic"))
288}
289
290fn default_partition_key() -> OptionalValuePath {
291 OptionalValuePath::from(owned_value_path!("partition"))
292}
293
294fn default_offset_key() -> OptionalValuePath {
295 OptionalValuePath::from(owned_value_path!("offset"))
296}
297
298fn default_headers_key() -> OptionalValuePath {
299 OptionalValuePath::from(owned_value_path!("headers"))
300}
301
302const fn example_auto_offset_reset_values() -> [&'static str; 7] {
303 [
304 "smallest",
305 "earliest",
306 "beginning",
307 "largest",
308 "latest",
309 "end",
310 "error",
311 ]
312}
313
314fn example_librdkafka_options() -> HashMap<String, String> {
315 HashMap::<_, _>::from_iter([
316 ("client.id".to_string(), "${ENV_VAR}".to_string()),
317 ("fetch.error.backoff.ms".to_string(), "1000".to_string()),
318 ("socket.send.buffer.bytes".to_string(), "100".to_string()),
319 ])
320}
321
322impl_generate_config_from_default!(KafkaSourceConfig);
323
324#[async_trait::async_trait]
325#[typetag::serde(name = "kafka")]
326impl SourceConfig for KafkaSourceConfig {
327 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
328 let log_namespace = cx.log_namespace(self.log_namespace);
329
330 let decoder =
331 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
332 .build()?;
333 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
334
335 if let Some(d) = self.drain_timeout_ms {
336 snafu::ensure!(
337 Duration::from_millis(d) <= self.session_timeout_ms,
338 InvalidDrainTimeoutSnafu {
339 value: d,
340 session_timeout_ms: self.session_timeout_ms
341 }
342 );
343 }
344
345 let (consumer, callback_rx) = create_consumer(self, acknowledgements)?;
346
347 Ok(Box::pin(kafka_source(
348 self.clone(),
349 consumer,
350 callback_rx,
351 decoder,
352 cx.out,
353 cx.shutdown,
354 false,
355 log_namespace,
356 )))
357 }
358
359 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
360 let log_namespace = global_log_namespace.merge(self.log_namespace);
361 let keys = self.keys();
362
363 let schema_definition = self
364 .decoding
365 .schema_definition(log_namespace)
366 .with_standard_vector_source_metadata()
367 .with_source_metadata(
368 Self::NAME,
369 keys.timestamp.map(LegacyKey::Overwrite),
370 &owned_value_path!("timestamp"),
371 Kind::timestamp(),
372 Some("timestamp"),
373 )
374 .with_source_metadata(
375 Self::NAME,
376 keys.topic.clone().map(LegacyKey::Overwrite),
377 &owned_value_path!("topic"),
378 Kind::bytes(),
379 None,
380 )
381 .with_source_metadata(
382 Self::NAME,
383 keys.partition.clone().map(LegacyKey::Overwrite),
384 &owned_value_path!("partition"),
385 Kind::bytes(),
386 None,
387 )
388 .with_source_metadata(
389 Self::NAME,
390 keys.offset.clone().map(LegacyKey::Overwrite),
391 &owned_value_path!("offset"),
392 Kind::bytes(),
393 None,
394 )
395 .with_source_metadata(
396 Self::NAME,
397 keys.headers.clone().map(LegacyKey::Overwrite),
398 &owned_value_path!("headers"),
399 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
400 None,
401 )
402 .with_source_metadata(
403 Self::NAME,
404 keys.key_field.clone().map(LegacyKey::Overwrite),
405 &owned_value_path!("message_key"),
406 Kind::bytes(),
407 None,
408 );
409
410 vec![SourceOutput::new_maybe_logs(
411 self.decoding.output_type(),
412 schema_definition,
413 )]
414 }
415
416 fn can_acknowledge(&self) -> bool {
417 true
418 }
419}
420
421#[allow(clippy::too_many_arguments)]
422async fn kafka_source(
423 config: KafkaSourceConfig,
424 consumer: StreamConsumer<KafkaSourceContext>,
425 callback_rx: UnboundedReceiver<KafkaCallback>,
426 decoder: Decoder,
427 out: SourceSender,
428 shutdown: ShutdownSignal,
429 eof: bool,
430 log_namespace: LogNamespace,
431) -> Result<(), ()> {
432 let span = info_span!("kafka_source");
433 let consumer = Arc::new(consumer);
434
435 consumer
436 .context()
437 .consumer
438 .set(Arc::downgrade(&consumer))
439 .expect("Error setting up consumer context.");
440
441 let (eof_tx, eof_rx) = eof.then(oneshot::channel::<()>).unzip();
443
444 let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
445 if let Err(e) = consumer.subscribe(&topics).context(SubscribeSnafu) {
446 error!("{}", e);
447 return Err(());
448 }
449
450 let coordination_task = {
451 let span = span.clone();
452 let consumer = Arc::clone(&consumer);
453 let drain_timeout_ms = config
454 .drain_timeout_ms
455 .map_or(config.session_timeout_ms / 2, Duration::from_millis);
456 let consumer_state =
457 ConsumerStateInner::<Consuming>::new(config, decoder, out, log_namespace, span);
458 tokio::spawn(async move {
459 coordinate_kafka_callbacks(
460 consumer,
461 callback_rx,
462 consumer_state,
463 drain_timeout_ms,
464 eof_tx,
465 )
466 .await;
467 })
468 };
469
470 let client_task = {
471 let consumer = Arc::clone(&consumer);
472 tokio::task::spawn_blocking(move || {
473 let _enter = span.enter();
474 drive_kafka_consumer(consumer, shutdown, eof_rx);
475 })
476 };
477
478 _ = tokio::join!(client_task, coordination_task);
479 consumer.context().commit_consumer_state();
480
481 Ok(())
482}
483
484struct ConsumerStateInner<S> {
502 config: KafkaSourceConfig,
503 decoder: Decoder,
504 out: SourceSender,
505 log_namespace: LogNamespace,
506 consumer_state: S,
507}
508struct Consuming {
509 span: Span,
511}
512struct Draining {
513 signal: SyncSender<()>,
518
519 expect_drain: HashSet<TopicPartition>,
525
526 shutdown: bool,
530
531 span: Span,
533}
534type OptionDeadline = OptionFuture<Pin<Box<Sleep>>>;
535enum ConsumerState {
536 Consuming(ConsumerStateInner<Consuming>),
537 Draining(ConsumerStateInner<Draining>),
538 Complete,
539}
540impl Draining {
541 fn new(signal: SyncSender<()>, shutdown: bool, span: Span) -> Self {
542 Self {
543 signal,
544 shutdown,
545 expect_drain: HashSet::new(),
546 span,
547 }
548 }
549
550 fn is_complete(&self) -> bool {
551 self.expect_drain.is_empty()
552 }
553}
554
555impl<C> ConsumerStateInner<C> {
556 fn complete(self, _deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
557 (None.into(), ConsumerState::Complete)
558 }
559}
560
561impl ConsumerStateInner<Consuming> {
562 const fn new(
563 config: KafkaSourceConfig,
564 decoder: Decoder,
565 out: SourceSender,
566 log_namespace: LogNamespace,
567 span: Span,
568 ) -> Self {
569 Self {
570 config,
571 decoder,
572 out,
573 log_namespace,
574 consumer_state: Consuming { span },
575 }
576 }
577
578 fn consume_partition(
583 &self,
584 join_set: &mut JoinSet<(TopicPartition, PartitionConsumerStatus)>,
585 tp: TopicPartition,
586 consumer: Arc<StreamConsumer<KafkaSourceContext>>,
587 p: StreamPartitionQueue<KafkaSourceContext>,
588 acknowledgements: bool,
589 exit_eof: bool,
590 ) -> (oneshot::Sender<()>, tokio::task::AbortHandle) {
591 let keys = self.config.keys();
592 let decoder = self.decoder.clone();
593 let log_namespace = self.log_namespace;
594 let mut out = self.out.clone();
595
596 let (end_tx, mut end_signal) = oneshot::channel::<()>();
597
598 let handle = join_set.spawn(async move {
599 let mut messages = p.stream();
600 let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
601
602 let mut finalizer = Some(finalizer);
606
607 let mut status = PartitionConsumerStatus::NormalExit;
608
609 loop {
610 tokio::select!(
611 biased;
615
616 _ = &mut end_signal, if finalizer.is_some() => {
618 finalizer.take();
619 },
620
621 ack = ack_stream.next() => match ack {
622 Some((status, entry)) => {
623 if status == BatchStatus::Delivered
624 && let Err(error) = consumer.store_offset(&entry.topic, entry.partition, entry.offset) {
625 emit!(KafkaOffsetUpdateError { error });
626 }
627 }
628 None if finalizer.is_none() => {
629 debug!("Acknowledgement stream complete for partition {}:{}.", &tp.0, tp.1);
630 break
631 }
632 None => {
633 debug!("Acknowledgement stream empty for {}:{}", &tp.0, tp.1);
634 }
635 },
636
637 message = messages.next(), if finalizer.is_some() => match message {
638 None => unreachable!("MessageStream never calls Ready(None)"),
639 Some(Err(error)) => match error {
640 rdkafka::error::KafkaError::PartitionEOF(partition) if exit_eof => {
641 debug!("EOF for partition {}.", partition);
642 status = PartitionConsumerStatus::PartitionEOF;
643 finalizer.take();
644 },
645 _ => emit!(KafkaReadError { error }),
646 },
647 Some(Ok(msg)) => {
648 emit!(KafkaBytesReceived {
649 byte_size: msg.payload_len(),
650 protocol: "tcp",
651 topic: msg.topic(),
652 partition: msg.partition(),
653 });
654 parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await;
655 }
656 },
657 )
658 }
659 (tp, status)
660 }.instrument(self.consumer_state.span.clone()));
661 (end_tx, handle)
662 }
663
664 fn begin_drain(
667 self,
668 max_drain_ms: Duration,
669 sig: SyncSender<()>,
670 shutdown: bool,
671 ) -> (OptionDeadline, ConsumerStateInner<Draining>) {
672 let deadline = Box::pin(tokio::time::sleep(max_drain_ms));
673
674 let draining = ConsumerStateInner {
675 config: self.config,
676 decoder: self.decoder,
677 out: self.out,
678 log_namespace: self.log_namespace,
679 consumer_state: Draining::new(sig, shutdown, self.consumer_state.span),
680 };
681
682 (Some(deadline).into(), draining)
683 }
684
685 pub const fn keep_consuming(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
686 (deadline, ConsumerState::Consuming(self))
687 }
688}
689
690impl ConsumerStateInner<Draining> {
691 fn revoke_partition(&mut self, tp: TopicPartition, end_signal: oneshot::Sender<()>) {
694 _ = end_signal.send(());
698 self.consumer_state.expect_drain.insert(tp);
699 }
700
701 fn partition_drained(&mut self, tp: TopicPartition) {
705 _ = self.consumer_state.signal.send(());
708 self.consumer_state.expect_drain.remove(&tp);
709 }
710
711 fn is_drain_complete(&self) -> bool {
713 self.consumer_state.is_complete()
714 }
715
716 fn finish_drain(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
719 if self.consumer_state.shutdown {
720 self.complete(deadline)
721 } else {
722 (
723 None.into(),
724 ConsumerState::Consuming(ConsumerStateInner {
725 config: self.config,
726 decoder: self.decoder,
727 out: self.out,
728 log_namespace: self.log_namespace,
729 consumer_state: Consuming {
730 span: self.consumer_state.span,
731 },
732 }),
733 )
734 }
735 }
736
737 pub const fn keep_draining(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
738 (deadline, ConsumerState::Draining(self))
739 }
740}
741
742async fn coordinate_kafka_callbacks(
743 consumer: Arc<StreamConsumer<KafkaSourceContext>>,
744 mut callbacks: UnboundedReceiver<KafkaCallback>,
745 consumer_state: ConsumerStateInner<Consuming>,
746 max_drain_ms: Duration,
747 mut eof: Option<oneshot::Sender<()>>,
748) {
749 let mut drain_deadline: OptionFuture<_> = None.into();
750 let mut consumer_state = ConsumerState::Consuming(consumer_state);
751
752 let mut end_signals: HashMap<TopicPartition, oneshot::Sender<()>> = HashMap::new();
755
756 let mut partition_consumers: JoinSet<(TopicPartition, PartitionConsumerStatus)> =
761 Default::default();
762
763 let mut abort_handles: HashMap<TopicPartition, tokio::task::AbortHandle> = HashMap::new();
765
766 let exit_eof = eof.is_some();
767
768 while let ConsumerState::Consuming(_) | ConsumerState::Draining(_) = consumer_state {
769 tokio::select! {
770 Some(Ok((finished_partition, status))) = partition_consumers.join_next(), if !partition_consumers.is_empty() => {
771 debug!("Partition consumer finished for {}:{}", &finished_partition.0, finished_partition.1);
772 end_signals.remove(&finished_partition);
774 abort_handles.remove(&finished_partition);
775
776 (drain_deadline, consumer_state) = match consumer_state {
777 ConsumerState::Complete => unreachable!("Partition consumer finished after completion."),
778 ConsumerState::Draining(mut state) => {
779 state.partition_drained(finished_partition);
780
781 if state.is_drain_complete() {
782 debug!("All expected partitions have drained.");
783 state.finish_drain(drain_deadline)
784 } else {
785 state.keep_draining(drain_deadline)
786 }
787 },
788 ConsumerState::Consuming(state) => {
789 if !exit_eof {
793 debug!("Partition consumer task finished, while not in draining mode.");
794 }
795 state.keep_consuming(drain_deadline)
796 },
797 };
798
799 if exit_eof && status == PartitionConsumerStatus::PartitionEOF && partition_consumers.is_empty() {
804 debug!("All partitions have exited or reached EOF.");
805 let _ = eof.take().map(|e| e.send(()));
806 }
807 },
808 Some(callback) = callbacks.recv() => match callback {
809 KafkaCallback::PartitionsAssigned(mut assigned_partitions, done) => match consumer_state {
810 ConsumerState::Complete => unreachable!("Partition assignment received after completion."),
811 ConsumerState::Draining(_) => error!("Partition assignment received while draining revoked partitions, maybe an invalid assignment."),
812 ConsumerState::Consuming(ref consumer_state) => {
813 let acks = consumer.context().acknowledgements;
814 for tp in assigned_partitions.drain(0..) {
815 let topic = tp.0.as_str();
816 let partition = tp.1;
817 match consumer.split_partition_queue(topic, partition) { Some(pq) => {
818 debug!("Consuming partition {}:{}.", &tp.0, tp.1);
819 let (end_tx, handle) = consumer_state.consume_partition(&mut partition_consumers, tp.clone(), Arc::clone(&consumer), pq, acks, exit_eof);
820 abort_handles.insert(tp.clone(), handle);
821 end_signals.insert(tp, end_tx);
822 } _ => {
823 warn!("Failed to get queue for assigned partition {}:{}.", &tp.0, tp.1);
824 }}
825 }
826 drop(done);
828 }
829 },
830 KafkaCallback::PartitionsRevoked(mut revoked_partitions, drain) => (drain_deadline, consumer_state) = match consumer_state {
831 ConsumerState::Complete => unreachable!("Partitions revoked after completion."),
832 ConsumerState::Draining(d) => {
833 warn!("Kafka client is already draining revoked partitions.");
837 d.keep_draining(drain_deadline)
838 },
839 ConsumerState::Consuming(state) => {
840 let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, false);
841
842 for tp in revoked_partitions.drain(0..) {
843 match end_signals.remove(&tp) { Some(end) => {
844 debug!("Revoking partition {}:{}", &tp.0, tp.1);
845 state.revoke_partition(tp, end);
846 } _ => {
847 debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
848 }}
849 }
850
851 state.keep_draining(deadline)
852 }
853 },
854 KafkaCallback::ShuttingDown(drain) => (drain_deadline, consumer_state) = match consumer_state {
855 ConsumerState::Complete => unreachable!("Shutdown received after completion."),
856 ConsumerState::Draining(state) => {
859 error!("Kafka client handled a shutdown signal while a rebalance was in progress.");
862 callbacks.close();
863 state.keep_draining(drain_deadline)
864 },
865 ConsumerState::Consuming(state) => {
866 callbacks.close();
867 let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, true);
868 if let Ok(tpl) = consumer.assignment() {
869 if tpl.capacity() == 0 {
871 return;
872 }
873 tpl.elements()
874 .iter()
875 .for_each(|el| {
876
877 let tp: TopicPartition = (el.topic().into(), el.partition());
878 match end_signals.remove(&tp) { Some(end) => {
879 debug!("Shutting down and revoking partition {}:{}", &tp.0, tp.1);
880 state.revoke_partition(tp, end);
881 } _ => {
882 debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
883 }}
884 });
885 }
886 if state.is_drain_complete() {
889 state.finish_drain(deadline)
890 } else {
891 state.keep_draining(deadline)
892 }
893 }
894 },
895 },
896
897 Some(_) = &mut drain_deadline => (drain_deadline, consumer_state) = match consumer_state {
898 ConsumerState::Complete => unreachable!("Drain deadline received after completion."),
899 ConsumerState::Consuming(state) => {
900 warn!("A drain deadline fired outside of draining mode.");
901 state.keep_consuming(None.into())
902 },
903 ConsumerState::Draining(mut draining) => {
904 debug!("Acknowledgement drain deadline reached. Dropping any pending ack streams for revoked partitions.");
905 for tp in draining.consumer_state.expect_drain.drain() {
906 if let Some(handle) = abort_handles.remove(&tp) {
907 handle.abort();
908 }
909 }
910 draining.finish_drain(drain_deadline)
911 }
912 },
913 }
914 }
915}
916
917fn drive_kafka_consumer(
918 consumer: Arc<StreamConsumer<KafkaSourceContext>>,
919 mut shutdown: ShutdownSignal,
920 eof: Option<oneshot::Receiver<()>>,
921) {
922 Handle::current().block_on(async move {
923 let mut eof: OptionFuture<_> = eof.into();
924 let mut stream = consumer.stream();
925 loop {
926 tokio::select! {
927 _ = &mut shutdown => {
928 consumer.context().shutdown();
929 break
930 },
931
932 Some(_) = &mut eof => {
933 consumer.context().shutdown();
934 break
935 },
936
937 message = stream.next() => match message {
940 None => unreachable!("MessageStream never returns Ready(None)"),
941 Some(Err(error)) => emit!(KafkaReadError { error }),
942 Some(Ok(_msg)) => {
943 unreachable!("Messages are consumed in dedicated tasks for each partition.")
944 }
945 },
946 }
947 }
948 });
949}
950
951async fn parse_message(
952 msg: BorrowedMessage<'_>,
953 decoder: Decoder,
954 keys: &'_ Keys,
955 out: &mut SourceSender,
956 acknowledgements: bool,
957 finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
958 log_namespace: LogNamespace,
959) {
960 if let Some((count, stream)) = parse_stream(&msg, decoder, keys, log_namespace) {
961 let (batch, receiver) = BatchNotifier::new_with_receiver();
962 let mut stream = stream.map(|event| {
963 if acknowledgements {
967 event.with_batch_notifier(&batch)
968 } else {
969 event
970 }
971 });
972 match out.send_event_stream(&mut stream).await {
973 Err(_) => {
974 emit!(StreamClosedError { count });
975 }
976 Ok(_) => {
977 drop(stream);
980 if let Some(f) = finalizer.as_ref() {
981 f.add(msg.into(), receiver)
982 }
983 }
984 }
985 }
986}
987
988fn parse_stream<'a>(
990 msg: &BorrowedMessage<'a>,
991 decoder: Decoder,
992 keys: &'a Keys,
993 log_namespace: LogNamespace,
994) -> Option<(usize, impl Stream<Item = Event> + 'a + use<'a>)> {
995 let payload = msg.payload()?; let rmsg = ReceivedMessage::from(msg);
998
999 let payload = Cursor::new(Bytes::copy_from_slice(payload));
1000
1001 let mut stream = DecoderFramedRead::with_capacity(payload, decoder, msg.payload_len());
1002 let (count, _) = stream.size_hint();
1003 let stream = stream! {
1004 while let Some(result) = stream.next().await {
1005 match result {
1006 Ok((events, _byte_size)) => {
1007 emit!(KafkaEventsReceived {
1008 count: events.len(),
1009 byte_size: events.estimated_json_encoded_size_of(),
1010 topic: &rmsg.topic,
1011 partition: rmsg.partition,
1012 });
1013 for mut event in events {
1014 rmsg.apply(keys, &mut event, log_namespace);
1015 yield event;
1016 }
1017 },
1018 Err(error) => {
1019 if !error.can_continue() {
1022 break;
1023 }
1024 }
1025 }
1026 }
1027 }
1028 .boxed();
1029 Some((count, stream))
1030}
1031
1032#[derive(Clone, Debug)]
1033struct Keys {
1034 timestamp: Option<OwnedValuePath>,
1035 key_field: Option<OwnedValuePath>,
1036 topic: Option<OwnedValuePath>,
1037 partition: Option<OwnedValuePath>,
1038 offset: Option<OwnedValuePath>,
1039 headers: Option<OwnedValuePath>,
1040}
1041
1042impl Keys {
1043 fn from(schema: &LogSchema, config: &KafkaSourceConfig) -> Self {
1044 Self {
1045 timestamp: schema.timestamp_key().cloned(),
1046 key_field: config.key_field.path.clone(),
1047 topic: config.topic_key.path.clone(),
1048 partition: config.partition_key.path.clone(),
1049 offset: config.offset_key.path.clone(),
1050 headers: config.headers_key.path.clone(),
1051 }
1052 }
1053}
1054
1055struct ReceivedMessage {
1056 timestamp: Option<DateTime<Utc>>,
1057 key: Value,
1058 headers: ObjectMap,
1059 topic: String,
1060 partition: i32,
1061 offset: i64,
1062}
1063
1064impl ReceivedMessage {
1065 fn from(msg: &BorrowedMessage<'_>) -> Self {
1066 let timestamp = msg
1068 .timestamp()
1069 .to_millis()
1070 .and_then(|millis| Utc.timestamp_millis_opt(millis).latest());
1071
1072 let key = msg
1073 .key()
1074 .map(|key| Value::from(Bytes::from(key.to_owned())))
1075 .unwrap_or(Value::Null);
1076
1077 let mut headers_map = ObjectMap::new();
1078 if let Some(headers) = msg.headers() {
1079 for header in headers.iter() {
1080 if let Some(value) = header.value {
1081 headers_map.insert(
1082 header.key.into(),
1083 Value::from(Bytes::from(value.to_owned())),
1084 );
1085 }
1086 }
1087 }
1088
1089 Self {
1090 timestamp,
1091 key,
1092 headers: headers_map,
1093 topic: msg.topic().to_string(),
1094 partition: msg.partition(),
1095 offset: msg.offset(),
1096 }
1097 }
1098
1099 fn apply(&self, keys: &Keys, event: &mut Event, log_namespace: LogNamespace) {
1100 if let Event::Log(log) = event {
1101 match log_namespace {
1102 LogNamespace::Vector => {
1103 log_namespace.insert_standard_vector_source_metadata(
1108 log,
1109 KafkaSourceConfig::NAME,
1110 Utc::now(),
1111 );
1112 }
1113 LogNamespace::Legacy => {
1114 if let Some(source_type_key) = log_schema().source_type_key_target_path() {
1115 log.insert(source_type_key, KafkaSourceConfig::NAME);
1116 }
1117 }
1118 }
1119
1120 log_namespace.insert_source_metadata(
1121 KafkaSourceConfig::NAME,
1122 log,
1123 keys.key_field.as_ref().map(LegacyKey::Overwrite),
1124 path!("message_key"),
1125 self.key.clone(),
1126 );
1127
1128 log_namespace.insert_source_metadata(
1129 KafkaSourceConfig::NAME,
1130 log,
1131 keys.timestamp.as_ref().map(LegacyKey::Overwrite),
1132 path!("timestamp"),
1133 self.timestamp,
1134 );
1135
1136 log_namespace.insert_source_metadata(
1137 KafkaSourceConfig::NAME,
1138 log,
1139 keys.topic.as_ref().map(LegacyKey::Overwrite),
1140 path!("topic"),
1141 self.topic.clone(),
1142 );
1143
1144 log_namespace.insert_source_metadata(
1145 KafkaSourceConfig::NAME,
1146 log,
1147 keys.partition.as_ref().map(LegacyKey::Overwrite),
1148 path!("partition"),
1149 self.partition,
1150 );
1151
1152 log_namespace.insert_source_metadata(
1153 KafkaSourceConfig::NAME,
1154 log,
1155 keys.offset.as_ref().map(LegacyKey::Overwrite),
1156 path!("offset"),
1157 self.offset,
1158 );
1159
1160 log_namespace.insert_source_metadata(
1161 KafkaSourceConfig::NAME,
1162 log,
1163 keys.headers.as_ref().map(LegacyKey::Overwrite),
1164 path!("headers"),
1165 self.headers.clone(),
1166 );
1167 }
1168 }
1169}
1170
1171#[derive(Debug, Eq, PartialEq, Hash)]
1172struct FinalizerEntry {
1173 topic: String,
1174 partition: i32,
1175 offset: i64,
1176}
1177
1178impl<'a> From<BorrowedMessage<'a>> for FinalizerEntry {
1179 fn from(msg: BorrowedMessage<'a>) -> Self {
1180 Self {
1181 topic: msg.topic().into(),
1182 partition: msg.partition(),
1183 offset: msg.offset(),
1184 }
1185 }
1186}
1187
1188fn create_consumer(
1189 config: &KafkaSourceConfig,
1190 acknowledgements: bool,
1191) -> crate::Result<(
1192 StreamConsumer<KafkaSourceContext>,
1193 UnboundedReceiver<KafkaCallback>,
1194)> {
1195 let mut client_config = ClientConfig::new();
1196 client_config
1197 .set("group.id", &config.group_id)
1198 .set("bootstrap.servers", &config.bootstrap_servers)
1199 .set("auto.offset.reset", &config.auto_offset_reset)
1200 .set(
1201 "session.timeout.ms",
1202 config.session_timeout_ms.as_millis().to_string(),
1203 )
1204 .set(
1205 "socket.timeout.ms",
1206 config.socket_timeout_ms.as_millis().to_string(),
1207 )
1208 .set(
1209 "fetch.wait.max.ms",
1210 config.fetch_wait_max_ms.as_millis().to_string(),
1211 )
1212 .set("enable.partition.eof", "false")
1213 .set("enable.auto.commit", "true")
1214 .set(
1215 "auto.commit.interval.ms",
1216 config.commit_interval_ms.as_millis().to_string(),
1217 )
1218 .set("enable.auto.offset.store", "false")
1219 .set("statistics.interval.ms", "1000")
1220 .set("client.id", "vector");
1221
1222 config.auth.apply(&mut client_config)?;
1223
1224 if let Some(librdkafka_options) = &config.librdkafka_options {
1225 for (key, value) in librdkafka_options {
1226 client_config.set(key.as_str(), value.as_str());
1227 }
1228 }
1229
1230 let (callbacks, callback_rx) = mpsc::unbounded_channel();
1231 let consumer = client_config
1232 .create_with_context::<_, StreamConsumer<_>>(KafkaSourceContext::new(
1233 config.metrics.topic_lag_metric,
1234 acknowledgements,
1235 callbacks,
1236 Span::current(),
1237 ))
1238 .context(CreateSnafu)?;
1239
1240 Ok((consumer, callback_rx))
1241}
1242
1243type TopicPartition = (String, i32);
1244
1245#[derive(PartialEq)]
1249enum PartitionConsumerStatus {
1250 NormalExit,
1251 PartitionEOF,
1252}
1253
1254enum KafkaCallback {
1255 PartitionsAssigned(Vec<TopicPartition>, SyncSender<()>),
1256 PartitionsRevoked(Vec<TopicPartition>, SyncSender<()>),
1257 ShuttingDown(SyncSender<()>),
1258}
1259
1260struct KafkaSourceContext {
1261 acknowledgements: bool,
1262 stats: kafka::KafkaStatisticsContext,
1263
1264 callbacks: UnboundedSender<KafkaCallback>,
1266
1267 consumer: OnceLock<Weak<StreamConsumer<KafkaSourceContext>>>,
1269}
1270
1271impl KafkaSourceContext {
1272 fn new(
1273 expose_lag_metrics: bool,
1274 acknowledgements: bool,
1275 callbacks: UnboundedSender<KafkaCallback>,
1276 span: Span,
1277 ) -> Self {
1278 Self {
1279 stats: kafka::KafkaStatisticsContext {
1280 expose_lag_metrics,
1281 span,
1282 },
1283 acknowledgements,
1284 consumer: OnceLock::default(),
1285 callbacks,
1286 }
1287 }
1288
1289 fn shutdown(&self) {
1290 let (send, rendezvous) = sync_channel(0);
1291 if self
1292 .callbacks
1293 .send(KafkaCallback::ShuttingDown(send))
1294 .is_ok()
1295 {
1296 while rendezvous.recv().is_ok() {
1297 self.commit_consumer_state();
1298 }
1299 }
1300 }
1301
1302 fn consume_partitions(&self, tpl: &TopicPartitionList) {
1307 if tpl.capacity() == 0 {
1309 return;
1310 }
1311 let (send, rendezvous) = sync_channel(0);
1312 let _ = self.callbacks.send(KafkaCallback::PartitionsAssigned(
1313 tpl.elements()
1314 .iter()
1315 .map(|tp| (tp.topic().into(), tp.partition()))
1316 .collect(),
1317 send,
1318 ));
1319
1320 while rendezvous.recv().is_ok() {
1321 }
1323 }
1324
1325 fn revoke_partitions(&self, tpl: &TopicPartitionList) {
1331 let (send, rendezvous) = sync_channel(0);
1332 let _ = self.callbacks.send(KafkaCallback::PartitionsRevoked(
1333 tpl.elements()
1334 .iter()
1335 .map(|tp| (tp.topic().into(), tp.partition()))
1336 .collect(),
1337 send,
1338 ));
1339
1340 while rendezvous.recv().is_ok() {
1341 self.commit_consumer_state();
1342 }
1343 }
1344
1345 fn commit_consumer_state(&self) {
1346 if let Some(consumer) = self
1347 .consumer
1348 .get()
1349 .expect("Consumer reference was not initialized.")
1350 .upgrade()
1351 {
1352 match consumer.commit_consumer_state(CommitMode::Sync) {
1353 Ok(_) | Err(KafkaError::ConsumerCommit(RDKafkaErrorCode::NoOffset)) => {
1354 }
1356 Err(error) => emit!(KafkaOffsetUpdateError { error }),
1357 }
1358 }
1359 }
1360}
1361
1362impl ClientContext for KafkaSourceContext {
1363 fn stats(&self, statistics: Statistics) {
1364 self.stats.stats(statistics)
1365 }
1366}
1367
1368impl ConsumerContext for KafkaSourceContext {
1369 fn pre_rebalance(&self, _base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance) {
1370 match rebalance {
1371 Rebalance::Assign(tpl) => self.consume_partitions(tpl),
1372
1373 Rebalance::Revoke(tpl) => {
1374 self.revoke_partitions(tpl);
1375 self.commit_consumer_state();
1376 }
1377
1378 Rebalance::Error(message) => {
1379 error!("Error during Kafka consumer group rebalance: {}.", message);
1380 }
1381 }
1382 }
1383}
1384
1385#[cfg(test)]
1386mod test {
1387 use vector_lib::{lookup::OwnedTargetPath, schema::Definition};
1388
1389 use super::*;
1390
1391 pub fn kafka_host() -> String {
1392 std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
1393 }
1394 pub fn kafka_port() -> u16 {
1395 let port = std::env::var("KAFKA_PORT").unwrap_or_else(|_| "9091".into());
1396 port.parse().expect("Invalid port number")
1397 }
1398
1399 pub fn kafka_address() -> String {
1400 format!("{}:{}", kafka_host(), kafka_port())
1401 }
1402
1403 #[test]
1404 fn generate_config() {
1405 crate::test_util::test_generate_config::<KafkaSourceConfig>();
1406 }
1407
1408 pub(super) fn make_config(
1409 topic: &str,
1410 group: &str,
1411 log_namespace: LogNamespace,
1412 librdkafka_options: Option<HashMap<String, String>>,
1413 ) -> KafkaSourceConfig {
1414 KafkaSourceConfig {
1415 bootstrap_servers: kafka_address(),
1416 topics: vec![topic.into()],
1417 group_id: group.into(),
1418 auto_offset_reset: "beginning".into(),
1419 session_timeout_ms: Duration::from_millis(6000),
1420 commit_interval_ms: Duration::from_millis(1),
1421 librdkafka_options,
1422 key_field: default_key_field(),
1423 topic_key: default_topic_key(),
1424 partition_key: default_partition_key(),
1425 offset_key: default_offset_key(),
1426 headers_key: default_headers_key(),
1427 socket_timeout_ms: Duration::from_millis(60000),
1428 fetch_wait_max_ms: Duration::from_millis(100),
1429 log_namespace: Some(log_namespace == LogNamespace::Vector),
1430 ..Default::default()
1431 }
1432 }
1433
1434 #[test]
1435 fn test_output_schema_definition_vector_namespace() {
1436 let definitions = make_config("topic", "group", LogNamespace::Vector, None)
1437 .outputs(LogNamespace::Vector)
1438 .remove(0)
1439 .schema_definition(true);
1440
1441 assert_eq!(
1442 definitions,
1443 Some(
1444 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1445 .with_meaning(OwnedTargetPath::event_root(), "message")
1446 .with_metadata_field(
1447 &owned_value_path!("kafka", "timestamp"),
1448 Kind::timestamp(),
1449 Some("timestamp")
1450 )
1451 .with_metadata_field(
1452 &owned_value_path!("kafka", "message_key"),
1453 Kind::bytes(),
1454 None
1455 )
1456 .with_metadata_field(&owned_value_path!("kafka", "topic"), Kind::bytes(), None)
1457 .with_metadata_field(
1458 &owned_value_path!("kafka", "partition"),
1459 Kind::bytes(),
1460 None
1461 )
1462 .with_metadata_field(&owned_value_path!("kafka", "offset"), Kind::bytes(), None)
1463 .with_metadata_field(
1464 &owned_value_path!("kafka", "headers"),
1465 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1466 None
1467 )
1468 .with_metadata_field(
1469 &owned_value_path!("vector", "ingest_timestamp"),
1470 Kind::timestamp(),
1471 None
1472 )
1473 .with_metadata_field(
1474 &owned_value_path!("vector", "source_type"),
1475 Kind::bytes(),
1476 None
1477 )
1478 )
1479 )
1480 }
1481
1482 #[test]
1483 fn test_output_schema_definition_legacy_namespace() {
1484 let definitions = make_config("topic", "group", LogNamespace::Legacy, None)
1485 .outputs(LogNamespace::Legacy)
1486 .remove(0)
1487 .schema_definition(true);
1488
1489 assert_eq!(
1490 definitions,
1491 Some(
1492 Definition::new_with_default_metadata(Kind::json(), [LogNamespace::Legacy])
1493 .unknown_fields(Kind::undefined())
1494 .with_event_field(
1495 &owned_value_path!("message"),
1496 Kind::bytes(),
1497 Some("message")
1498 )
1499 .with_event_field(
1500 &owned_value_path!("timestamp"),
1501 Kind::timestamp(),
1502 Some("timestamp")
1503 )
1504 .with_event_field(&owned_value_path!("message_key"), Kind::bytes(), None)
1505 .with_event_field(&owned_value_path!("topic"), Kind::bytes(), None)
1506 .with_event_field(&owned_value_path!("partition"), Kind::bytes(), None)
1507 .with_event_field(&owned_value_path!("offset"), Kind::bytes(), None)
1508 .with_event_field(
1509 &owned_value_path!("headers"),
1510 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1511 None
1512 )
1513 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1514 )
1515 )
1516 }
1517
1518 #[tokio::test]
1519 async fn consumer_create_ok() {
1520 let config = make_config("topic", "group", LogNamespace::Legacy, None);
1521 assert!(create_consumer(&config, true).is_ok());
1522 }
1523
1524 #[tokio::test]
1525 async fn consumer_create_incorrect_auto_offset_reset() {
1526 let config = KafkaSourceConfig {
1527 auto_offset_reset: "incorrect-auto-offset-reset".to_string(),
1528 ..make_config("topic", "group", LogNamespace::Legacy, None)
1529 };
1530 assert!(create_consumer(&config, true).is_err());
1531 }
1532}
1533
1534#[cfg(feature = "kafka-integration-tests")]
1535#[cfg(test)]
1536mod integration_test {
1537 use std::time::Duration;
1538
1539 use chrono::{DateTime, SubsecRound, Utc};
1540 use futures::Stream;
1541 use futures_util::stream::FuturesUnordered;
1542 use rdkafka::{
1543 Offset, TopicPartitionList,
1544 admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
1545 client::DefaultClientContext,
1546 config::{ClientConfig, FromClientConfig},
1547 consumer::BaseConsumer,
1548 message::{Header, OwnedHeaders},
1549 producer::{FutureProducer, FutureRecord},
1550 util::Timeout,
1551 };
1552 use stream_cancel::{Trigger, Tripwire};
1553 use tokio::time::sleep;
1554 use vector_lib::event::EventStatus;
1555 use vrl::{event_path, value};
1556
1557 use super::{test::*, *};
1558 use crate::{
1559 SourceSender,
1560 event::{EventArray, EventContainer},
1561 shutdown::ShutdownSignal,
1562 test_util::{collect_n, components::assert_source_compliance, random_string},
1563 };
1564
1565 const KEY: &str = "my key";
1566 const TEXT: &str = "my message";
1567 const HEADER_KEY: &str = "my header";
1568 const HEADER_VALUE: &str = "my header value";
1569
1570 fn kafka_test_topic() -> String {
1571 std::env::var("KAFKA_TEST_TOPIC")
1572 .unwrap_or_else(|_| format!("test-topic-{}", random_string(10)))
1573 }
1574 fn kafka_max_bytes() -> String {
1575 std::env::var("KAFKA_MAX_BYTES").unwrap_or_else(|_| "1024".into())
1576 }
1577
1578 fn client_config<T: FromClientConfig>(group: Option<&str>) -> T {
1579 let mut client = ClientConfig::new();
1580 client.set("bootstrap.servers", kafka_address());
1581 client.set("produce.offset.report", "true");
1582 client.set("message.timeout.ms", "5000");
1583 client.set("auto.commit.interval.ms", "1");
1584 if let Some(group) = group {
1585 client.set("group.id", group);
1586 }
1587 client.create().expect("Producer creation error")
1588 }
1589
1590 async fn send_events(topic: String, partitions: i32, count: usize) -> DateTime<Utc> {
1591 let now = Utc::now();
1592 let timestamp = now.timestamp_millis();
1593
1594 let producer: &FutureProducer = &client_config(None);
1595 let topic_name = topic.as_ref();
1596
1597 create_topic(topic_name, partitions).await;
1598
1599 (0..count)
1600 .map(|i| async move {
1601 let text = format!("{TEXT} {i:03}");
1602 let key = format!("{KEY} {i}");
1603 let record = FutureRecord::to(topic_name)
1604 .payload(&text)
1605 .key(&key)
1606 .timestamp(timestamp)
1607 .headers(OwnedHeaders::new().insert(Header {
1608 key: HEADER_KEY,
1609 value: Some(HEADER_VALUE),
1610 }));
1611 if let Err(error) = producer.send(record, Timeout::Never).await {
1612 panic!("Cannot send event to Kafka: {error:?}");
1613 }
1614 })
1615 .collect::<FuturesUnordered<_>>()
1616 .collect::<Vec<_>>()
1617 .await;
1618
1619 now
1620 }
1621
1622 async fn send_to_test_topic(partitions: i32, count: usize) -> (String, String, DateTime<Utc>) {
1623 let topic = kafka_test_topic();
1624 let group_id = format!("test-group-{}", random_string(10));
1625
1626 let sent_at = send_events(topic.clone(), partitions, count).await;
1627
1628 (topic, group_id, sent_at)
1629 }
1630
1631 #[tokio::test]
1632 async fn consumes_event_with_acknowledgements() {
1633 send_receive(true, |_| false, 10, LogNamespace::Legacy).await;
1634 }
1635
1636 #[tokio::test]
1637 async fn consumes_event_with_acknowledgements_vector_namespace() {
1638 send_receive(true, |_| false, 10, LogNamespace::Vector).await;
1639 }
1640
1641 #[tokio::test]
1642 async fn consumes_event_without_acknowledgements() {
1643 send_receive(false, |_| false, 10, LogNamespace::Legacy).await;
1644 }
1645
1646 #[tokio::test]
1647 async fn consumes_event_without_acknowledgements_vector_namespace() {
1648 send_receive(false, |_| false, 10, LogNamespace::Vector).await;
1649 }
1650
1651 #[tokio::test]
1652 async fn handles_one_negative_acknowledgement() {
1653 send_receive(true, |n| n == 2, 10, LogNamespace::Legacy).await;
1654 }
1655
1656 #[tokio::test]
1657 async fn handles_one_negative_acknowledgement_vector_namespace() {
1658 send_receive(true, |n| n == 2, 10, LogNamespace::Vector).await;
1659 }
1660
1661 #[tokio::test]
1662 async fn handles_permanent_negative_acknowledgement() {
1663 send_receive(true, |n| n >= 2, 2, LogNamespace::Legacy).await;
1664 }
1665
1666 #[tokio::test]
1667 async fn handles_permanent_negative_acknowledgement_vector_namespace() {
1668 send_receive(true, |n| n >= 2, 2, LogNamespace::Vector).await;
1669 }
1670
1671 async fn send_receive(
1672 acknowledgements: bool,
1673 error_at: impl Fn(usize) -> bool,
1674 receive_count: usize,
1675 log_namespace: LogNamespace,
1676 ) {
1677 const SEND_COUNT: usize = 10;
1678
1679 let topic = format!("test-topic-{}", random_string(10));
1680 let group_id = format!("test-group-{}", random_string(10));
1681 let config = make_config(&topic, &group_id, log_namespace, None);
1682
1683 let now = send_events(topic.clone(), 1, 10).await;
1684
1685 let events = assert_source_compliance(&["protocol", "topic", "partition"], async move {
1686 let (tx, rx) = SourceSender::new_test_errors(error_at);
1687 let (trigger_shutdown, shutdown_done) =
1688 spawn_kafka(tx, config, acknowledgements, false, log_namespace);
1689 let events = collect_n(rx, SEND_COUNT).await;
1690 tokio::task::yield_now().await;
1693 drop(trigger_shutdown);
1694 shutdown_done.await;
1695
1696 events
1697 })
1698 .await;
1699
1700 let offset = fetch_tpl_offset(&group_id, &topic, 0);
1701 assert_eq!(offset, Offset::from_raw(receive_count as i64));
1702
1703 assert_eq!(events.len(), SEND_COUNT);
1704 for (i, event) in events.into_iter().enumerate() {
1705 if let LogNamespace::Legacy = log_namespace {
1706 assert_eq!(
1707 event.as_log()[log_schema().message_key().unwrap().to_string()],
1708 format!("{TEXT} {i:03}").into()
1709 );
1710 assert_eq!(event.as_log()["message_key"], format!("{KEY} {i}").into());
1711 assert_eq!(
1712 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1713 "kafka".into()
1714 );
1715 assert_eq!(
1716 event.as_log()[log_schema().timestamp_key().unwrap().to_string()],
1717 now.trunc_subsecs(3).into()
1718 );
1719 assert_eq!(event.as_log()["topic"], topic.clone().into());
1720 assert!(event.as_log().contains("partition"));
1721 assert!(event.as_log().contains("offset"));
1722 let mut expected_headers = ObjectMap::new();
1723 expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1724 assert_eq!(event.as_log()["headers"], Value::from(expected_headers));
1725 } else {
1726 let meta = event.as_log().metadata().value();
1727
1728 assert_eq!(
1729 meta.get(path!("vector", "source_type")).unwrap(),
1730 &value!(KafkaSourceConfig::NAME)
1731 );
1732 assert!(
1733 meta.get(path!("vector", "ingest_timestamp"))
1734 .unwrap()
1735 .is_timestamp()
1736 );
1737
1738 assert_eq!(
1739 event.as_log().value(),
1740 &value!(format!("{} {:03}", TEXT, i))
1741 );
1742 assert_eq!(
1743 meta.get(path!("kafka", "message_key")).unwrap(),
1744 &value!(format!("{} {}", KEY, i))
1745 );
1746
1747 assert_eq!(
1748 meta.get(path!("kafka", "timestamp")).unwrap(),
1749 &value!(now.trunc_subsecs(3))
1750 );
1751 assert_eq!(
1752 meta.get(path!("kafka", "topic")).unwrap(),
1753 &value!(topic.clone())
1754 );
1755 assert!(meta.get(path!("kafka", "partition")).unwrap().is_integer(),);
1756 assert!(meta.get(path!("kafka", "offset")).unwrap().is_integer(),);
1757
1758 let mut expected_headers = ObjectMap::new();
1759 expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1760 assert_eq!(
1761 meta.get(path!("kafka", "headers")).unwrap(),
1762 &Value::from(expected_headers)
1763 );
1764 }
1765 }
1766 }
1767
1768 fn make_rand_config() -> (String, String, KafkaSourceConfig) {
1769 let topic = format!("test-topic-{}", random_string(10));
1770 let group_id = format!("test-group-{}", random_string(10));
1771 let config = make_config(&topic, &group_id, LogNamespace::Legacy, None);
1772 (topic, group_id, config)
1773 }
1774
1775 fn delay_pipeline(
1776 id: usize,
1777 delay: Duration,
1778 status: EventStatus,
1779 ) -> (SourceSender, impl Stream<Item = EventArray> + Unpin) {
1780 let (pipe, recv) = SourceSender::new_test_sender_with_options(100, None);
1781 let recv = recv.into_stream();
1782 let recv = recv.then(move |item| async move {
1783 let mut events = item.events;
1784 events.iter_logs_mut().for_each(|log| {
1785 log.insert(event_path!("pipeline_id"), id.to_string());
1786 });
1787 sleep(delay).await;
1788 events.iter_events_mut().for_each(|mut event| {
1789 let metadata = event.metadata_mut();
1790 metadata.update_status(status);
1791 metadata.update_sources();
1792 });
1793 events
1794 });
1795 (pipe, Box::pin(recv))
1796 }
1797
1798 fn spawn_kafka(
1799 out: SourceSender,
1800 config: KafkaSourceConfig,
1801 acknowledgements: bool,
1802 eof: bool,
1803 log_namespace: LogNamespace,
1804 ) -> (Trigger, Tripwire) {
1805 let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
1806
1807 let decoder = DecodingConfig::new(
1808 config.framing.clone(),
1809 config.decoding.clone(),
1810 log_namespace,
1811 )
1812 .build()
1813 .unwrap();
1814
1815 let (consumer, callback_rx) = create_consumer(&config, acknowledgements).unwrap();
1816
1817 tokio::spawn(kafka_source(
1818 config,
1819 consumer,
1820 callback_rx,
1821 decoder,
1822 out,
1823 shutdown,
1824 eof,
1825 log_namespace,
1826 ));
1827 (trigger_shutdown, shutdown_done)
1828 }
1829
1830 fn fetch_tpl_offset(group_id: &str, topic: &str, partition: i32) -> Offset {
1831 let client: BaseConsumer = client_config(Some(group_id));
1832 client.subscribe(&[topic]).expect("Subscribing failed");
1833
1834 let mut tpl = TopicPartitionList::new();
1835 tpl.add_partition(topic, partition);
1836 client
1837 .committed_offsets(tpl, Duration::from_secs(1))
1838 .expect("Getting committed offsets failed")
1839 .find_partition(topic, partition)
1840 .expect("Missing topic/partition")
1841 .offset()
1842 }
1843
1844 async fn create_topic(topic: &str, partitions: i32) {
1845 let client: AdminClient<DefaultClientContext> = client_config(None);
1846 let topic_results = client
1847 .create_topics(
1848 [&NewTopic {
1849 name: topic,
1850 num_partitions: partitions,
1851 replication: TopicReplication::Fixed(1),
1852 config: vec![],
1853 }],
1854 &AdminOptions::default(),
1855 )
1856 .await
1857 .expect("create_topics failed");
1858
1859 for result in topic_results {
1860 if let Err((topic, err)) = result
1861 && err != rdkafka::types::RDKafkaErrorCode::TopicAlreadyExists
1862 {
1863 panic!("Creating a topic failed: {:?}", (topic, err))
1864 }
1865 }
1866 }
1867
1868 #[ignore]
1878 #[tokio::test]
1879 async fn handles_rebalance() {
1880 const NEVENTS: usize = 200;
1894 const DELAY: u64 = 100;
1895
1896 let (topic, group_id, config) = make_rand_config();
1897 create_topic(&topic, 2).await;
1898
1899 let _send_start = send_events(topic.clone(), 1, NEVENTS).await;
1900
1901 let (tx, rx1) = delay_pipeline(1, Duration::from_millis(200), EventStatus::Delivered);
1902 let (trigger_shutdown1, shutdown_done1) =
1903 spawn_kafka(tx, config.clone(), true, false, LogNamespace::Legacy);
1904 let events1 = tokio::spawn(collect_n(rx1, NEVENTS));
1905
1906 sleep(Duration::from_secs(1)).await;
1907
1908 let (tx, rx2) = delay_pipeline(2, Duration::from_millis(DELAY), EventStatus::Delivered);
1909 let (trigger_shutdown2, shutdown_done2) =
1910 spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1911 let events2 = tokio::spawn(collect_n(rx2, NEVENTS));
1912
1913 sleep(Duration::from_secs(5)).await;
1914
1915 drop(trigger_shutdown1);
1916 let events1 = events1.await.unwrap();
1917 shutdown_done1.await;
1918
1919 sleep(Duration::from_secs(5)).await;
1920
1921 drop(trigger_shutdown2);
1922 let events2 = events2.await.unwrap();
1923 shutdown_done2.await;
1924
1925 sleep(Duration::from_secs(1)).await;
1926
1927 assert!(!events1.is_empty());
1928 assert!(!events2.is_empty());
1929
1930 match fetch_tpl_offset(&group_id, &topic, 0) {
1931 Offset::Offset(offset) => {
1932 assert!((offset as isize - events1.len() as isize).abs() <= 1)
1933 }
1934 o => panic!("Invalid offset for partition 0 {o:?}"),
1935 }
1936
1937 match fetch_tpl_offset(&group_id, &topic, 1) {
1938 Offset::Offset(offset) => {
1939 assert!((offset as isize - events2.len() as isize).abs() <= 1)
1940 }
1941 o => panic!("Invalid offset for partition 0 {o:?}"),
1942 }
1943
1944 let mut all_events = events1
1945 .into_iter()
1946 .chain(events2.into_iter())
1947 .flat_map(map_logs)
1948 .collect::<Vec<String>>();
1949 all_events.sort();
1950
1951 }
1953
1954 #[tokio::test]
1955 async fn drains_acknowledgements_at_shutdown() {
1956 let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
1958 .unwrap_or_else(|_| "125000".into())
1959 .parse()
1960 .expect("Number of messages to send to kafka.");
1961 let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
1962 .unwrap_or_else(|_| format!("{send_count}"))
1963 .parse()
1964 .expect("Number of messages to expect consumers to process.");
1965 let delay_ms: u64 = std::env::var("KAFKA_SHUTDOWN_DELAY")
1966 .unwrap_or_else(|_| "2000".into())
1967 .parse()
1968 .expect("Number of milliseconds before shutting down first consumer.");
1969
1970 let (topic, group_id, _) = send_to_test_topic(1, send_count).await;
1971
1972 let mut opts = HashMap::new();
1975 opts.insert("enable.partition.eof".into(), "true".into());
1977 opts.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
1978 let events1 = {
1979 let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts.clone()));
1980 let (tx, rx) = SourceSender::new_test_errors(|_| false);
1981 let (trigger_shutdown, shutdown_done) =
1982 spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1983 let (events, _) = tokio::join!(rx.collect::<Vec<Event>>(), async move {
1984 sleep(Duration::from_millis(delay_ms)).await;
1985 drop(trigger_shutdown);
1986 });
1987 shutdown_done.await;
1988 events
1989 };
1990
1991 debug!("Consumer group.id: {}", &group_id);
1992 debug!(
1993 "First consumer read {} of {} messages.",
1994 events1.len(),
1995 expect_count
1996 );
1997
1998 let events2 = {
2000 let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts));
2001 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2002 let (trigger_shutdown, shutdown_done) =
2003 spawn_kafka(tx, config, true, true, LogNamespace::Legacy);
2004 let events = rx.collect::<Vec<Event>>().await;
2005 drop(trigger_shutdown);
2006 shutdown_done.await;
2007 events
2008 };
2009
2010 debug!(
2011 "Second consumer read {} of {} messages.",
2012 events2.len(),
2013 expect_count
2014 );
2015
2016 let total = events1.len() + events2.len();
2018 assert_ne!(
2019 events1.len(),
2020 0,
2021 "First batch of events should be non-zero (increase KAFKA_SHUTDOWN_DELAY?)"
2022 );
2023 assert_ne!(
2024 events2.len(),
2025 0,
2026 "Second batch of events should be non-zero (decrease KAFKA_SHUTDOWN_DELAY or increase KAFKA_SEND_COUNT?) "
2027 );
2028 assert_eq!(total, expect_count);
2029 }
2030
2031 async fn consume_with_rebalance(rebalance_strategy: String) {
2032 let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
2034 .unwrap_or_else(|_| "125000".into())
2035 .parse()
2036 .expect("Number of messages to send to kafka.");
2037 let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
2038 .unwrap_or_else(|_| format!("{send_count}"))
2039 .parse()
2040 .expect("Number of messages to expect consumers to process.");
2041 let delay_ms: u64 = std::env::var("KAFKA_CONSUMER_DELAY")
2042 .unwrap_or_else(|_| "2000".into())
2043 .parse()
2044 .expect("Number of milliseconds before shutting down first consumer.");
2045
2046 let (topic, group_id, _) = send_to_test_topic(6, send_count).await;
2047 debug!("Topic: {}", &topic);
2048 debug!("Consumer group.id: {}", &group_id);
2049
2050 let mut kafka_options = HashMap::new();
2053 kafka_options.insert("enable.partition.eof".into(), "true".into());
2054 kafka_options.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
2055 kafka_options.insert("partition.assignment.strategy".into(), rebalance_strategy);
2056 let config1 = make_config(
2057 &topic,
2058 &group_id,
2059 LogNamespace::Legacy,
2060 Some(kafka_options.clone()),
2061 );
2062 let config2 = config1.clone();
2063 let config3 = config1.clone();
2064 let config4 = config1.clone();
2065
2066 let (events1, events2, events3) = tokio::join!(
2067 async move {
2068 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2069 let (_trigger_shutdown, _shutdown_done) =
2070 spawn_kafka(tx, config1, true, true, LogNamespace::Legacy);
2071
2072 rx.collect::<Vec<Event>>().await
2073 },
2074 async move {
2075 sleep(Duration::from_millis(delay_ms)).await;
2076 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2077 let (_trigger_shutdown, _shutdown_done) =
2078 spawn_kafka(tx, config2, true, true, LogNamespace::Legacy);
2079
2080 rx.collect::<Vec<Event>>().await
2081 },
2082 async move {
2083 sleep(Duration::from_millis(delay_ms * 2)).await;
2084 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2085 let (_trigger_shutdown, _shutdown_done) =
2086 spawn_kafka(tx, config3, true, true, LogNamespace::Legacy);
2087
2088 rx.collect::<Vec<Event>>().await
2089 }
2090 );
2091
2092 let unconsumed = async move {
2093 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2094 let (_trigger_shutdown, _shutdown_done) =
2095 spawn_kafka(tx, config4, true, true, LogNamespace::Legacy);
2096
2097 rx.collect::<Vec<Event>>().await
2098 }
2099 .await;
2100
2101 debug!(
2102 "First consumer read {} of {} messages.",
2103 events1.len(),
2104 expect_count
2105 );
2106
2107 debug!(
2108 "Second consumer read {} of {} messages.",
2109 events2.len(),
2110 expect_count
2111 );
2112 debug!(
2113 "Third consumer read {} of {} messages.",
2114 events3.len(),
2115 expect_count
2116 );
2117
2118 let total = events1.len() + events2.len() + events3.len();
2120 assert_ne!(
2121 events1.len(),
2122 0,
2123 "First batch of events should be non-zero (increase delay?)"
2124 );
2125 assert_ne!(
2126 events2.len(),
2127 0,
2128 "Second batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2129 );
2130 assert_ne!(
2131 events3.len(),
2132 0,
2133 "Third batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2134 );
2135 assert_eq!(
2136 unconsumed.len(),
2137 0,
2138 "The first set of consumers should consume and ack all messages."
2139 );
2140 assert_eq!(total, expect_count);
2141 }
2142
2143 #[tokio::test]
2144 async fn drains_acknowledgements_during_rebalance_default_assignments() {
2145 consume_with_rebalance("range,roundrobin".into()).await;
2147 }
2148 #[tokio::test]
2149 async fn drains_acknowledgements_during_rebalance_sticky_assignments() {
2150 consume_with_rebalance("cooperative-sticky".into()).await;
2153 }
2154
2155 fn map_logs(events: EventArray) -> impl Iterator<Item = String> {
2156 events.into_events().map(|event| {
2157 let log = event.into_log();
2158 format!(
2159 "{} {} {} {}",
2160 log["message"].to_string_lossy(),
2161 log["topic"].to_string_lossy(),
2162 log["partition"].to_string_lossy(),
2163 log["offset"].to_string_lossy(),
2164 )
2165 })
2166 }
2167}