1use std::{
2 collections::{HashMap, HashSet},
3 io::Cursor,
4 pin::Pin,
5 sync::{
6 mpsc::{sync_channel, SyncSender},
7 Arc, OnceLock, Weak,
8 },
9 time::Duration,
10};
11
12use async_stream::stream;
13use bytes::Bytes;
14use chrono::{DateTime, TimeZone, Utc};
15use futures::{Stream, StreamExt};
16use futures_util::future::OptionFuture;
17use rdkafka::{
18 consumer::{
19 stream_consumer::StreamPartitionQueue, BaseConsumer, CommitMode, Consumer, ConsumerContext,
20 Rebalance, StreamConsumer,
21 },
22 error::KafkaError,
23 message::{BorrowedMessage, Headers as _, Message},
24 types::RDKafkaErrorCode,
25 ClientConfig, ClientContext, Statistics, TopicPartitionList,
26};
27use serde_with::serde_as;
28use snafu::{ResultExt, Snafu};
29use tokio::{
30 runtime::Handle,
31 sync::{
32 mpsc::{self, UnboundedReceiver, UnboundedSender},
33 oneshot,
34 },
35 task::JoinSet,
36 time::Sleep,
37};
38use tokio_util::codec::FramedRead;
39use tracing::{Instrument, Span};
40use vector_lib::codecs::{
41 decoding::{DeserializerConfig, FramingConfig},
42 StreamDecodingError,
43};
44use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
45
46use vector_lib::configurable::configurable_component;
47use vector_lib::finalizer::OrderedFinalizer;
48use vector_lib::{
49 config::{LegacyKey, LogNamespace},
50 EstimatedJsonEncodedSizeOf,
51};
52use vrl::value::{kind::Collection, Kind, ObjectMap};
53
54use crate::{
55 codecs::{Decoder, DecodingConfig},
56 config::{
57 log_schema, LogSchema, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
58 SourceOutput,
59 },
60 event::{BatchNotifier, BatchStatus, Event, Value},
61 internal_events::{
62 KafkaBytesReceived, KafkaEventsReceived, KafkaOffsetUpdateError, KafkaReadError,
63 StreamClosedError,
64 },
65 kafka,
66 serde::{bool_or_struct, default_decoding, default_framing_message_based},
67 shutdown::ShutdownSignal,
68 SourceSender,
69};
70
71#[derive(Debug, Snafu)]
72enum BuildError {
73 #[snafu(display("The drain_timeout_ms ({}) must be less than session_timeout_ms ({})", value, session_timeout_ms.as_millis()))]
74 InvalidDrainTimeout {
75 value: u64,
76 session_timeout_ms: Duration,
77 },
78 #[snafu(display("Could not create Kafka consumer: {}", source))]
79 CreateError { source: rdkafka::error::KafkaError },
80 #[snafu(display("Could not subscribe to Kafka topics: {}", source))]
81 SubscribeError { source: rdkafka::error::KafkaError },
82}
83
84#[configurable_component]
86#[derive(Clone, Debug, Default)]
87struct Metrics {
88 pub topic_lag_metric: bool,
90}
91
92#[serde_as]
94#[configurable_component(source("kafka", "Collect logs from Apache Kafka."))]
95#[derive(Clone, Debug, Derivative)]
96#[derivative(Default)]
97#[serde(deny_unknown_fields)]
98pub struct KafkaSourceConfig {
99 #[configurable(metadata(docs::examples = "10.14.22.123:9092,10.14.23.332:9092"))]
106 bootstrap_servers: String,
107
108 #[configurable(metadata(
112 docs::examples = "^(prefix1|prefix2)-.+",
113 docs::examples = "topic-1",
114 docs::examples = "topic-2"
115 ))]
116 topics: Vec<String>,
117
118 #[configurable(metadata(docs::examples = "consumer-group-name"))]
120 group_id: String,
121
122 #[serde(default = "default_auto_offset_reset")]
126 #[configurable(metadata(docs::examples = "example_auto_offset_reset_values()"))]
127 auto_offset_reset: String,
128
129 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
131 #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
132 #[configurable(metadata(docs::advanced))]
133 #[serde(default = "default_session_timeout_ms")]
134 #[configurable(metadata(docs::human_name = "Session Timeout"))]
135 session_timeout_ms: Duration,
136
137 #[serde(skip_serializing_if = "Option::is_none")]
147 #[configurable(metadata(docs::examples = 2500, docs::examples = 5000))]
148 #[configurable(metadata(docs::advanced))]
149 #[configurable(metadata(docs::human_name = "Drain Timeout"))]
150 drain_timeout_ms: Option<u64>,
151
152 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
154 #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))]
155 #[configurable(metadata(docs::advanced))]
156 #[serde(default = "default_socket_timeout_ms")]
157 #[configurable(metadata(docs::human_name = "Socket Timeout"))]
158 socket_timeout_ms: Duration,
159
160 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
162 #[configurable(metadata(docs::examples = 50, docs::examples = 100))]
163 #[configurable(metadata(docs::advanced))]
164 #[serde(default = "default_fetch_wait_max_ms")]
165 #[configurable(metadata(docs::human_name = "Max Fetch Wait Time"))]
166 fetch_wait_max_ms: Duration,
167
168 #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
170 #[serde(default = "default_commit_interval_ms")]
171 #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
172 #[configurable(metadata(docs::human_name = "Commit Interval"))]
173 commit_interval_ms: Duration,
174
175 #[serde(default = "default_key_field")]
181 #[configurable(metadata(docs::examples = "message_key"))]
182 key_field: OptionalValuePath,
183
184 #[serde(default = "default_topic_key")]
190 #[configurable(metadata(docs::examples = "topic"))]
191 topic_key: OptionalValuePath,
192
193 #[serde(default = "default_partition_key")]
199 #[configurable(metadata(docs::examples = "partition"))]
200 partition_key: OptionalValuePath,
201
202 #[serde(default = "default_offset_key")]
208 #[configurable(metadata(docs::examples = "offset"))]
209 offset_key: OptionalValuePath,
210
211 #[serde(default = "default_headers_key")]
217 #[configurable(metadata(docs::examples = "headers"))]
218 headers_key: OptionalValuePath,
219
220 #[configurable(metadata(docs::examples = "example_librdkafka_options()"))]
224 #[configurable(metadata(docs::advanced))]
225 #[configurable(metadata(
226 docs::additional_props_description = "A librdkafka configuration option."
227 ))]
228 librdkafka_options: Option<HashMap<String, String>>,
229
230 #[serde(flatten)]
231 auth: kafka::KafkaAuthConfig,
232
233 #[configurable(derived)]
234 #[configurable(metadata(docs::advanced))]
235 #[serde(default = "default_framing_message_based")]
236 #[derivative(Default(value = "default_framing_message_based()"))]
237 framing: FramingConfig,
238
239 #[configurable(derived)]
240 #[serde(default = "default_decoding")]
241 #[derivative(Default(value = "default_decoding()"))]
242 decoding: DeserializerConfig,
243
244 #[configurable(derived)]
245 #[serde(default, deserialize_with = "bool_or_struct")]
246 acknowledgements: SourceAcknowledgementsConfig,
247
248 #[configurable(metadata(docs::hidden))]
250 #[serde(default)]
251 log_namespace: Option<bool>,
252
253 #[configurable(derived)]
254 #[serde(default)]
255 metrics: Metrics,
256}
257
258impl KafkaSourceConfig {
259 fn keys(&self) -> Keys {
260 Keys::from(log_schema(), self)
261 }
262}
263
264const fn default_session_timeout_ms() -> Duration {
265 Duration::from_millis(10000) }
267
268const fn default_socket_timeout_ms() -> Duration {
269 Duration::from_millis(60000) }
271
272const fn default_fetch_wait_max_ms() -> Duration {
273 Duration::from_millis(100) }
275
276const fn default_commit_interval_ms() -> Duration {
277 Duration::from_millis(5000)
278}
279
280fn default_auto_offset_reset() -> String {
281 "largest".into() }
283
284fn default_key_field() -> OptionalValuePath {
285 OptionalValuePath::from(owned_value_path!("message_key"))
286}
287
288fn default_topic_key() -> OptionalValuePath {
289 OptionalValuePath::from(owned_value_path!("topic"))
290}
291
292fn default_partition_key() -> OptionalValuePath {
293 OptionalValuePath::from(owned_value_path!("partition"))
294}
295
296fn default_offset_key() -> OptionalValuePath {
297 OptionalValuePath::from(owned_value_path!("offset"))
298}
299
300fn default_headers_key() -> OptionalValuePath {
301 OptionalValuePath::from(owned_value_path!("headers"))
302}
303
304const fn example_auto_offset_reset_values() -> [&'static str; 7] {
305 [
306 "smallest",
307 "earliest",
308 "beginning",
309 "largest",
310 "latest",
311 "end",
312 "error",
313 ]
314}
315
316fn example_librdkafka_options() -> HashMap<String, String> {
317 HashMap::<_, _>::from_iter([
318 ("client.id".to_string(), "${ENV_VAR}".to_string()),
319 ("fetch.error.backoff.ms".to_string(), "1000".to_string()),
320 ("socket.send.buffer.bytes".to_string(), "100".to_string()),
321 ])
322}
323
324impl_generate_config_from_default!(KafkaSourceConfig);
325
326#[async_trait::async_trait]
327#[typetag::serde(name = "kafka")]
328impl SourceConfig for KafkaSourceConfig {
329 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
330 let log_namespace = cx.log_namespace(self.log_namespace);
331
332 let decoder =
333 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
334 .build()?;
335 let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
336
337 if let Some(d) = self.drain_timeout_ms {
338 snafu::ensure!(
339 Duration::from_millis(d) <= self.session_timeout_ms,
340 InvalidDrainTimeoutSnafu {
341 value: d,
342 session_timeout_ms: self.session_timeout_ms
343 }
344 );
345 }
346
347 let (consumer, callback_rx) = create_consumer(self, acknowledgements)?;
348
349 Ok(Box::pin(kafka_source(
350 self.clone(),
351 consumer,
352 callback_rx,
353 decoder,
354 cx.out,
355 cx.shutdown,
356 false,
357 log_namespace,
358 )))
359 }
360
361 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
362 let log_namespace = global_log_namespace.merge(self.log_namespace);
363 let keys = self.keys();
364
365 let schema_definition = self
366 .decoding
367 .schema_definition(log_namespace)
368 .with_standard_vector_source_metadata()
369 .with_source_metadata(
370 Self::NAME,
371 keys.timestamp.map(LegacyKey::Overwrite),
372 &owned_value_path!("timestamp"),
373 Kind::timestamp(),
374 Some("timestamp"),
375 )
376 .with_source_metadata(
377 Self::NAME,
378 keys.topic.clone().map(LegacyKey::Overwrite),
379 &owned_value_path!("topic"),
380 Kind::bytes(),
381 None,
382 )
383 .with_source_metadata(
384 Self::NAME,
385 keys.partition.clone().map(LegacyKey::Overwrite),
386 &owned_value_path!("partition"),
387 Kind::bytes(),
388 None,
389 )
390 .with_source_metadata(
391 Self::NAME,
392 keys.offset.clone().map(LegacyKey::Overwrite),
393 &owned_value_path!("offset"),
394 Kind::bytes(),
395 None,
396 )
397 .with_source_metadata(
398 Self::NAME,
399 keys.headers.clone().map(LegacyKey::Overwrite),
400 &owned_value_path!("headers"),
401 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
402 None,
403 )
404 .with_source_metadata(
405 Self::NAME,
406 keys.key_field.clone().map(LegacyKey::Overwrite),
407 &owned_value_path!("message_key"),
408 Kind::bytes(),
409 None,
410 );
411
412 vec![SourceOutput::new_maybe_logs(
413 self.decoding.output_type(),
414 schema_definition,
415 )]
416 }
417
418 fn can_acknowledge(&self) -> bool {
419 true
420 }
421}
422
423#[allow(clippy::too_many_arguments)]
424async fn kafka_source(
425 config: KafkaSourceConfig,
426 consumer: StreamConsumer<KafkaSourceContext>,
427 callback_rx: UnboundedReceiver<KafkaCallback>,
428 decoder: Decoder,
429 out: SourceSender,
430 shutdown: ShutdownSignal,
431 eof: bool,
432 log_namespace: LogNamespace,
433) -> Result<(), ()> {
434 let span = info_span!("kafka_source");
435 let consumer = Arc::new(consumer);
436
437 consumer
438 .context()
439 .consumer
440 .set(Arc::downgrade(&consumer))
441 .expect("Error setting up consumer context.");
442
443 let (eof_tx, eof_rx) = eof.then(oneshot::channel::<()>).unzip();
445
446 let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
447 if let Err(e) = consumer.subscribe(&topics).context(SubscribeSnafu) {
448 error!("{}", e);
449 return Err(());
450 }
451
452 let coordination_task = {
453 let span = span.clone();
454 let consumer = Arc::clone(&consumer);
455 let drain_timeout_ms = config
456 .drain_timeout_ms
457 .map_or(config.session_timeout_ms / 2, Duration::from_millis);
458 let consumer_state =
459 ConsumerStateInner::<Consuming>::new(config, decoder, out, log_namespace, span);
460 tokio::spawn(async move {
461 coordinate_kafka_callbacks(
462 consumer,
463 callback_rx,
464 consumer_state,
465 drain_timeout_ms,
466 eof_tx,
467 )
468 .await;
469 })
470 };
471
472 let client_task = {
473 let consumer = Arc::clone(&consumer);
474 tokio::task::spawn_blocking(move || {
475 let _enter = span.enter();
476 drive_kafka_consumer(consumer, shutdown, eof_rx);
477 })
478 };
479
480 _ = tokio::join!(client_task, coordination_task);
481 consumer.context().commit_consumer_state();
482
483 Ok(())
484}
485
486struct ConsumerStateInner<S> {
504 config: KafkaSourceConfig,
505 decoder: Decoder,
506 out: SourceSender,
507 log_namespace: LogNamespace,
508 consumer_state: S,
509}
510struct Consuming {
511 span: Span,
513}
514struct Draining {
515 signal: SyncSender<()>,
520
521 expect_drain: HashSet<TopicPartition>,
527
528 shutdown: bool,
532
533 span: Span,
535}
536type OptionDeadline = OptionFuture<Pin<Box<Sleep>>>;
537enum ConsumerState {
538 Consuming(ConsumerStateInner<Consuming>),
539 Draining(ConsumerStateInner<Draining>),
540 Complete,
541}
542impl Draining {
543 fn new(signal: SyncSender<()>, shutdown: bool, span: Span) -> Self {
544 Self {
545 signal,
546 shutdown,
547 expect_drain: HashSet::new(),
548 span,
549 }
550 }
551
552 fn is_complete(&self) -> bool {
553 self.expect_drain.is_empty()
554 }
555}
556
557impl<C> ConsumerStateInner<C> {
558 fn complete(self, _deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
559 (None.into(), ConsumerState::Complete)
560 }
561}
562
563impl ConsumerStateInner<Consuming> {
564 const fn new(
565 config: KafkaSourceConfig,
566 decoder: Decoder,
567 out: SourceSender,
568 log_namespace: LogNamespace,
569 span: Span,
570 ) -> Self {
571 Self {
572 config,
573 decoder,
574 out,
575 log_namespace,
576 consumer_state: Consuming { span },
577 }
578 }
579
580 fn consume_partition(
585 &self,
586 join_set: &mut JoinSet<(TopicPartition, PartitionConsumerStatus)>,
587 tp: TopicPartition,
588 consumer: Arc<StreamConsumer<KafkaSourceContext>>,
589 p: StreamPartitionQueue<KafkaSourceContext>,
590 acknowledgements: bool,
591 exit_eof: bool,
592 ) -> (oneshot::Sender<()>, tokio::task::AbortHandle) {
593 let keys = self.config.keys();
594 let decoder = self.decoder.clone();
595 let log_namespace = self.log_namespace;
596 let mut out = self.out.clone();
597
598 let (end_tx, mut end_signal) = oneshot::channel::<()>();
599
600 let handle = join_set.spawn(async move {
601 let mut messages = p.stream();
602 let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
603
604 let mut finalizer = Some(finalizer);
608
609 let mut status = PartitionConsumerStatus::NormalExit;
610
611 loop {
612 tokio::select!(
613 biased;
617
618 _ = &mut end_signal, if finalizer.is_some() => {
620 finalizer.take();
621 },
622
623 ack = ack_stream.next() => match ack {
624 Some((status, entry)) => {
625 if status == BatchStatus::Delivered {
626 if let Err(error) = consumer.store_offset(&entry.topic, entry.partition, entry.offset) {
627 emit!(KafkaOffsetUpdateError { error });
628 }
629 }
630 }
631 None if finalizer.is_none() => {
632 debug!("Acknowledgement stream complete for partition {}:{}.", &tp.0, tp.1);
633 break
634 }
635 None => {
636 debug!("Acknowledgement stream empty for {}:{}", &tp.0, tp.1);
637 }
638 },
639
640 message = messages.next(), if finalizer.is_some() => match message {
641 None => unreachable!("MessageStream never calls Ready(None)"),
642 Some(Err(error)) => match error {
643 rdkafka::error::KafkaError::PartitionEOF(partition) if exit_eof => {
644 debug!("EOF for partition {}.", partition);
645 status = PartitionConsumerStatus::PartitionEOF;
646 finalizer.take();
647 },
648 _ => emit!(KafkaReadError { error }),
649 },
650 Some(Ok(msg)) => {
651 emit!(KafkaBytesReceived {
652 byte_size: msg.payload_len(),
653 protocol: "tcp",
654 topic: msg.topic(),
655 partition: msg.partition(),
656 });
657 parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await;
658 }
659 },
660 )
661 }
662 (tp, status)
663 }.instrument(self.consumer_state.span.clone()));
664 (end_tx, handle)
665 }
666
667 fn begin_drain(
670 self,
671 max_drain_ms: Duration,
672 sig: SyncSender<()>,
673 shutdown: bool,
674 ) -> (OptionDeadline, ConsumerStateInner<Draining>) {
675 let deadline = Box::pin(tokio::time::sleep(max_drain_ms));
676
677 let draining = ConsumerStateInner {
678 config: self.config,
679 decoder: self.decoder,
680 out: self.out,
681 log_namespace: self.log_namespace,
682 consumer_state: Draining::new(sig, shutdown, self.consumer_state.span),
683 };
684
685 (Some(deadline).into(), draining)
686 }
687
688 pub const fn keep_consuming(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
689 (deadline, ConsumerState::Consuming(self))
690 }
691}
692
693impl ConsumerStateInner<Draining> {
694 fn revoke_partition(&mut self, tp: TopicPartition, end_signal: oneshot::Sender<()>) {
697 _ = end_signal.send(());
701 self.consumer_state.expect_drain.insert(tp);
702 }
703
704 fn partition_drained(&mut self, tp: TopicPartition) {
708 _ = self.consumer_state.signal.send(());
711 self.consumer_state.expect_drain.remove(&tp);
712 }
713
714 fn is_drain_complete(&self) -> bool {
716 self.consumer_state.is_complete()
717 }
718
719 fn finish_drain(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
722 if self.consumer_state.shutdown {
723 self.complete(deadline)
724 } else {
725 (
726 None.into(),
727 ConsumerState::Consuming(ConsumerStateInner {
728 config: self.config,
729 decoder: self.decoder,
730 out: self.out,
731 log_namespace: self.log_namespace,
732 consumer_state: Consuming {
733 span: self.consumer_state.span,
734 },
735 }),
736 )
737 }
738 }
739
740 pub const fn keep_draining(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
741 (deadline, ConsumerState::Draining(self))
742 }
743}
744
745async fn coordinate_kafka_callbacks(
746 consumer: Arc<StreamConsumer<KafkaSourceContext>>,
747 mut callbacks: UnboundedReceiver<KafkaCallback>,
748 consumer_state: ConsumerStateInner<Consuming>,
749 max_drain_ms: Duration,
750 mut eof: Option<oneshot::Sender<()>>,
751) {
752 let mut drain_deadline: OptionFuture<_> = None.into();
753 let mut consumer_state = ConsumerState::Consuming(consumer_state);
754
755 let mut end_signals: HashMap<TopicPartition, oneshot::Sender<()>> = HashMap::new();
758
759 let mut partition_consumers: JoinSet<(TopicPartition, PartitionConsumerStatus)> =
764 Default::default();
765
766 let mut abort_handles: HashMap<TopicPartition, tokio::task::AbortHandle> = HashMap::new();
768
769 let exit_eof = eof.is_some();
770
771 while let ConsumerState::Consuming(_) | ConsumerState::Draining(_) = consumer_state {
772 tokio::select! {
773 Some(Ok((finished_partition, status))) = partition_consumers.join_next(), if !partition_consumers.is_empty() => {
774 debug!("Partition consumer finished for {}:{}", &finished_partition.0, finished_partition.1);
775 end_signals.remove(&finished_partition);
777 abort_handles.remove(&finished_partition);
778
779 (drain_deadline, consumer_state) = match consumer_state {
780 ConsumerState::Complete => unreachable!("Partition consumer finished after completion."),
781 ConsumerState::Draining(mut state) => {
782 state.partition_drained(finished_partition);
783
784 if state.is_drain_complete() {
785 debug!("All expected partitions have drained.");
786 state.finish_drain(drain_deadline)
787 } else {
788 state.keep_draining(drain_deadline)
789 }
790 },
791 ConsumerState::Consuming(state) => {
792 if !exit_eof {
796 debug!("Partition consumer task finished, while not in draining mode.");
797 }
798 state.keep_consuming(drain_deadline)
799 },
800 };
801
802 if exit_eof && status == PartitionConsumerStatus::PartitionEOF && partition_consumers.is_empty() {
807 debug!("All partitions have exited or reached EOF.");
808 let _ = eof.take().map(|e| e.send(()));
809 }
810 },
811 Some(callback) = callbacks.recv() => match callback {
812 KafkaCallback::PartitionsAssigned(mut assigned_partitions, done) => match consumer_state {
813 ConsumerState::Complete => unreachable!("Partition assignment received after completion."),
814 ConsumerState::Draining(_) => error!("Partition assignment received while draining revoked partitions, maybe an invalid assignment."),
815 ConsumerState::Consuming(ref consumer_state) => {
816 let acks = consumer.context().acknowledgements;
817 for tp in assigned_partitions.drain(0..) {
818 let topic = tp.0.as_str();
819 let partition = tp.1;
820 match consumer.split_partition_queue(topic, partition) { Some(pq) => {
821 debug!("Consuming partition {}:{}.", &tp.0, tp.1);
822 let (end_tx, handle) = consumer_state.consume_partition(&mut partition_consumers, tp.clone(), Arc::clone(&consumer), pq, acks, exit_eof);
823 abort_handles.insert(tp.clone(), handle);
824 end_signals.insert(tp, end_tx);
825 } _ => {
826 warn!("Failed to get queue for assigned partition {}:{}.", &tp.0, tp.1);
827 }}
828 }
829 drop(done);
831 }
832 },
833 KafkaCallback::PartitionsRevoked(mut revoked_partitions, drain) => (drain_deadline, consumer_state) = match consumer_state {
834 ConsumerState::Complete => unreachable!("Partitions revoked after completion."),
835 ConsumerState::Draining(d) => {
836 warn!("Kafka client is already draining revoked partitions.");
840 d.keep_draining(drain_deadline)
841 },
842 ConsumerState::Consuming(state) => {
843 let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, false);
844
845 for tp in revoked_partitions.drain(0..) {
846 match end_signals.remove(&tp) { Some(end) => {
847 debug!("Revoking partition {}:{}", &tp.0, tp.1);
848 state.revoke_partition(tp, end);
849 } _ => {
850 debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
851 }}
852 }
853
854 state.keep_draining(deadline)
855 }
856 },
857 KafkaCallback::ShuttingDown(drain) => (drain_deadline, consumer_state) = match consumer_state {
858 ConsumerState::Complete => unreachable!("Shutdown received after completion."),
859 ConsumerState::Draining(state) => {
862 error!("Kafka client handled a shutdown signal while a rebalance was in progress.");
865 callbacks.close();
866 state.keep_draining(drain_deadline)
867 },
868 ConsumerState::Consuming(state) => {
869 callbacks.close();
870 let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, true);
871 if let Ok(tpl) = consumer.assignment() {
872 if tpl.capacity() == 0 {
874 return;
875 }
876 tpl.elements()
877 .iter()
878 .for_each(|el| {
879
880 let tp: TopicPartition = (el.topic().into(), el.partition());
881 match end_signals.remove(&tp) { Some(end) => {
882 debug!("Shutting down and revoking partition {}:{}", &tp.0, tp.1);
883 state.revoke_partition(tp, end);
884 } _ => {
885 debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
886 }}
887 });
888 }
889 if state.is_drain_complete() {
892 state.finish_drain(deadline)
893 } else {
894 state.keep_draining(deadline)
895 }
896 }
897 },
898 },
899
900 Some(_) = &mut drain_deadline => (drain_deadline, consumer_state) = match consumer_state {
901 ConsumerState::Complete => unreachable!("Drain deadline received after completion."),
902 ConsumerState::Consuming(state) => {
903 warn!("A drain deadline fired outside of draining mode.");
904 state.keep_consuming(None.into())
905 },
906 ConsumerState::Draining(mut draining) => {
907 debug!("Acknowledgement drain deadline reached. Dropping any pending ack streams for revoked partitions.");
908 for tp in draining.consumer_state.expect_drain.drain() {
909 if let Some(handle) = abort_handles.remove(&tp) {
910 handle.abort();
911 }
912 }
913 draining.finish_drain(drain_deadline)
914 }
915 },
916 }
917 }
918}
919
920fn drive_kafka_consumer(
921 consumer: Arc<StreamConsumer<KafkaSourceContext>>,
922 mut shutdown: ShutdownSignal,
923 eof: Option<oneshot::Receiver<()>>,
924) {
925 Handle::current().block_on(async move {
926 let mut eof: OptionFuture<_> = eof.into();
927 let mut stream = consumer.stream();
928 loop {
929 tokio::select! {
930 _ = &mut shutdown => {
931 consumer.context().shutdown();
932 break
933 },
934
935 Some(_) = &mut eof => {
936 consumer.context().shutdown();
937 break
938 },
939
940 message = stream.next() => match message {
943 None => unreachable!("MessageStream never returns Ready(None)"),
944 Some(Err(error)) => emit!(KafkaReadError { error }),
945 Some(Ok(_msg)) => {
946 unreachable!("Messages are consumed in dedicated tasks for each partition.")
947 }
948 },
949 }
950 }
951 });
952}
953
954async fn parse_message(
955 msg: BorrowedMessage<'_>,
956 decoder: Decoder,
957 keys: &'_ Keys,
958 out: &mut SourceSender,
959 acknowledgements: bool,
960 finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
961 log_namespace: LogNamespace,
962) {
963 if let Some((count, stream)) = parse_stream(&msg, decoder, keys, log_namespace) {
964 let (batch, receiver) = BatchNotifier::new_with_receiver();
965 let mut stream = stream.map(|event| {
966 if acknowledgements {
970 event.with_batch_notifier(&batch)
971 } else {
972 event
973 }
974 });
975 match out.send_event_stream(&mut stream).await {
976 Err(_) => {
977 emit!(StreamClosedError { count });
978 }
979 Ok(_) => {
980 drop(stream);
983 if let Some(f) = finalizer.as_ref() {
984 f.add(msg.into(), receiver)
985 }
986 }
987 }
988 }
989}
990
991fn parse_stream<'a>(
993 msg: &BorrowedMessage<'a>,
994 decoder: Decoder,
995 keys: &'a Keys,
996 log_namespace: LogNamespace,
997) -> Option<(usize, impl Stream<Item = Event> + 'a + use<'a>)> {
998 let payload = msg.payload()?; let rmsg = ReceivedMessage::from(msg);
1001
1002 let payload = Cursor::new(Bytes::copy_from_slice(payload));
1003
1004 let mut stream = FramedRead::with_capacity(payload, decoder, msg.payload_len());
1005 let (count, _) = stream.size_hint();
1006 let stream = stream! {
1007 while let Some(result) = stream.next().await {
1008 match result {
1009 Ok((events, _byte_size)) => {
1010 emit!(KafkaEventsReceived {
1011 count: events.len(),
1012 byte_size: events.estimated_json_encoded_size_of(),
1013 topic: &rmsg.topic,
1014 partition: rmsg.partition,
1015 });
1016 for mut event in events {
1017 rmsg.apply(keys, &mut event, log_namespace);
1018 yield event;
1019 }
1020 },
1021 Err(error) => {
1022 if !error.can_continue() {
1025 break;
1026 }
1027 }
1028 }
1029 }
1030 }
1031 .boxed();
1032 Some((count, stream))
1033}
1034
1035#[derive(Clone, Debug)]
1036struct Keys {
1037 timestamp: Option<OwnedValuePath>,
1038 key_field: Option<OwnedValuePath>,
1039 topic: Option<OwnedValuePath>,
1040 partition: Option<OwnedValuePath>,
1041 offset: Option<OwnedValuePath>,
1042 headers: Option<OwnedValuePath>,
1043}
1044
1045impl Keys {
1046 fn from(schema: &LogSchema, config: &KafkaSourceConfig) -> Self {
1047 Self {
1048 timestamp: schema.timestamp_key().cloned(),
1049 key_field: config.key_field.path.clone(),
1050 topic: config.topic_key.path.clone(),
1051 partition: config.partition_key.path.clone(),
1052 offset: config.offset_key.path.clone(),
1053 headers: config.headers_key.path.clone(),
1054 }
1055 }
1056}
1057
1058struct ReceivedMessage {
1059 timestamp: Option<DateTime<Utc>>,
1060 key: Value,
1061 headers: ObjectMap,
1062 topic: String,
1063 partition: i32,
1064 offset: i64,
1065}
1066
1067impl ReceivedMessage {
1068 fn from(msg: &BorrowedMessage<'_>) -> Self {
1069 let timestamp = msg
1071 .timestamp()
1072 .to_millis()
1073 .and_then(|millis| Utc.timestamp_millis_opt(millis).latest());
1074
1075 let key = msg
1076 .key()
1077 .map(|key| Value::from(Bytes::from(key.to_owned())))
1078 .unwrap_or(Value::Null);
1079
1080 let mut headers_map = ObjectMap::new();
1081 if let Some(headers) = msg.headers() {
1082 for header in headers.iter() {
1083 if let Some(value) = header.value {
1084 headers_map.insert(
1085 header.key.into(),
1086 Value::from(Bytes::from(value.to_owned())),
1087 );
1088 }
1089 }
1090 }
1091
1092 Self {
1093 timestamp,
1094 key,
1095 headers: headers_map,
1096 topic: msg.topic().to_string(),
1097 partition: msg.partition(),
1098 offset: msg.offset(),
1099 }
1100 }
1101
1102 fn apply(&self, keys: &Keys, event: &mut Event, log_namespace: LogNamespace) {
1103 if let Event::Log(log) = event {
1104 match log_namespace {
1105 LogNamespace::Vector => {
1106 log_namespace.insert_standard_vector_source_metadata(
1111 log,
1112 KafkaSourceConfig::NAME,
1113 Utc::now(),
1114 );
1115 }
1116 LogNamespace::Legacy => {
1117 if let Some(source_type_key) = log_schema().source_type_key_target_path() {
1118 log.insert(source_type_key, KafkaSourceConfig::NAME);
1119 }
1120 }
1121 }
1122
1123 log_namespace.insert_source_metadata(
1124 KafkaSourceConfig::NAME,
1125 log,
1126 keys.key_field.as_ref().map(LegacyKey::Overwrite),
1127 path!("message_key"),
1128 self.key.clone(),
1129 );
1130
1131 log_namespace.insert_source_metadata(
1132 KafkaSourceConfig::NAME,
1133 log,
1134 keys.timestamp.as_ref().map(LegacyKey::Overwrite),
1135 path!("timestamp"),
1136 self.timestamp,
1137 );
1138
1139 log_namespace.insert_source_metadata(
1140 KafkaSourceConfig::NAME,
1141 log,
1142 keys.topic.as_ref().map(LegacyKey::Overwrite),
1143 path!("topic"),
1144 self.topic.clone(),
1145 );
1146
1147 log_namespace.insert_source_metadata(
1148 KafkaSourceConfig::NAME,
1149 log,
1150 keys.partition.as_ref().map(LegacyKey::Overwrite),
1151 path!("partition"),
1152 self.partition,
1153 );
1154
1155 log_namespace.insert_source_metadata(
1156 KafkaSourceConfig::NAME,
1157 log,
1158 keys.offset.as_ref().map(LegacyKey::Overwrite),
1159 path!("offset"),
1160 self.offset,
1161 );
1162
1163 log_namespace.insert_source_metadata(
1164 KafkaSourceConfig::NAME,
1165 log,
1166 keys.headers.as_ref().map(LegacyKey::Overwrite),
1167 path!("headers"),
1168 self.headers.clone(),
1169 );
1170 }
1171 }
1172}
1173
1174#[derive(Debug, Eq, PartialEq, Hash)]
1175struct FinalizerEntry {
1176 topic: String,
1177 partition: i32,
1178 offset: i64,
1179}
1180
1181impl<'a> From<BorrowedMessage<'a>> for FinalizerEntry {
1182 fn from(msg: BorrowedMessage<'a>) -> Self {
1183 Self {
1184 topic: msg.topic().into(),
1185 partition: msg.partition(),
1186 offset: msg.offset(),
1187 }
1188 }
1189}
1190
1191fn create_consumer(
1192 config: &KafkaSourceConfig,
1193 acknowledgements: bool,
1194) -> crate::Result<(
1195 StreamConsumer<KafkaSourceContext>,
1196 UnboundedReceiver<KafkaCallback>,
1197)> {
1198 let mut client_config = ClientConfig::new();
1199 client_config
1200 .set("group.id", &config.group_id)
1201 .set("bootstrap.servers", &config.bootstrap_servers)
1202 .set("auto.offset.reset", &config.auto_offset_reset)
1203 .set(
1204 "session.timeout.ms",
1205 config.session_timeout_ms.as_millis().to_string(),
1206 )
1207 .set(
1208 "socket.timeout.ms",
1209 config.socket_timeout_ms.as_millis().to_string(),
1210 )
1211 .set(
1212 "fetch.wait.max.ms",
1213 config.fetch_wait_max_ms.as_millis().to_string(),
1214 )
1215 .set("enable.partition.eof", "false")
1216 .set("enable.auto.commit", "true")
1217 .set(
1218 "auto.commit.interval.ms",
1219 config.commit_interval_ms.as_millis().to_string(),
1220 )
1221 .set("enable.auto.offset.store", "false")
1222 .set("statistics.interval.ms", "1000")
1223 .set("client.id", "vector");
1224
1225 config.auth.apply(&mut client_config)?;
1226
1227 if let Some(librdkafka_options) = &config.librdkafka_options {
1228 for (key, value) in librdkafka_options {
1229 client_config.set(key.as_str(), value.as_str());
1230 }
1231 }
1232
1233 let (callbacks, callback_rx) = mpsc::unbounded_channel();
1234 let consumer = client_config
1235 .create_with_context::<_, StreamConsumer<_>>(KafkaSourceContext::new(
1236 config.metrics.topic_lag_metric,
1237 acknowledgements,
1238 callbacks,
1239 Span::current(),
1240 ))
1241 .context(CreateSnafu)?;
1242
1243 Ok((consumer, callback_rx))
1244}
1245
1246type TopicPartition = (String, i32);
1247
1248#[derive(PartialEq)]
1252enum PartitionConsumerStatus {
1253 NormalExit,
1254 PartitionEOF,
1255}
1256
1257enum KafkaCallback {
1258 PartitionsAssigned(Vec<TopicPartition>, SyncSender<()>),
1259 PartitionsRevoked(Vec<TopicPartition>, SyncSender<()>),
1260 ShuttingDown(SyncSender<()>),
1261}
1262
1263struct KafkaSourceContext {
1264 acknowledgements: bool,
1265 stats: kafka::KafkaStatisticsContext,
1266
1267 callbacks: UnboundedSender<KafkaCallback>,
1269
1270 consumer: OnceLock<Weak<StreamConsumer<KafkaSourceContext>>>,
1272}
1273
1274impl KafkaSourceContext {
1275 fn new(
1276 expose_lag_metrics: bool,
1277 acknowledgements: bool,
1278 callbacks: UnboundedSender<KafkaCallback>,
1279 span: Span,
1280 ) -> Self {
1281 Self {
1282 stats: kafka::KafkaStatisticsContext {
1283 expose_lag_metrics,
1284 span,
1285 },
1286 acknowledgements,
1287 consumer: OnceLock::default(),
1288 callbacks,
1289 }
1290 }
1291
1292 fn shutdown(&self) {
1293 let (send, rendezvous) = sync_channel(0);
1294 if self
1295 .callbacks
1296 .send(KafkaCallback::ShuttingDown(send))
1297 .is_ok()
1298 {
1299 while rendezvous.recv().is_ok() {
1300 self.commit_consumer_state();
1301 }
1302 }
1303 }
1304
1305 fn consume_partitions(&self, tpl: &TopicPartitionList) {
1310 if tpl.capacity() == 0 {
1312 return;
1313 }
1314 let (send, rendezvous) = sync_channel(0);
1315 let _ = self.callbacks.send(KafkaCallback::PartitionsAssigned(
1316 tpl.elements()
1317 .iter()
1318 .map(|tp| (tp.topic().into(), tp.partition()))
1319 .collect(),
1320 send,
1321 ));
1322
1323 while rendezvous.recv().is_ok() {
1324 }
1326 }
1327
1328 fn revoke_partitions(&self, tpl: &TopicPartitionList) {
1334 let (send, rendezvous) = sync_channel(0);
1335 let _ = self.callbacks.send(KafkaCallback::PartitionsRevoked(
1336 tpl.elements()
1337 .iter()
1338 .map(|tp| (tp.topic().into(), tp.partition()))
1339 .collect(),
1340 send,
1341 ));
1342
1343 while rendezvous.recv().is_ok() {
1344 self.commit_consumer_state();
1345 }
1346 }
1347
1348 fn commit_consumer_state(&self) {
1349 if let Some(consumer) = self
1350 .consumer
1351 .get()
1352 .expect("Consumer reference was not initialized.")
1353 .upgrade()
1354 {
1355 match consumer.commit_consumer_state(CommitMode::Sync) {
1356 Ok(_) | Err(KafkaError::ConsumerCommit(RDKafkaErrorCode::NoOffset)) => {
1357 }
1359 Err(error) => emit!(KafkaOffsetUpdateError { error }),
1360 }
1361 }
1362 }
1363}
1364
1365impl ClientContext for KafkaSourceContext {
1366 fn stats(&self, statistics: Statistics) {
1367 self.stats.stats(statistics)
1368 }
1369}
1370
1371impl ConsumerContext for KafkaSourceContext {
1372 fn pre_rebalance(&self, _base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance) {
1373 match rebalance {
1374 Rebalance::Assign(tpl) => self.consume_partitions(tpl),
1375
1376 Rebalance::Revoke(tpl) => {
1377 self.revoke_partitions(tpl);
1378 self.commit_consumer_state();
1379 }
1380
1381 Rebalance::Error(message) => {
1382 error!("Error during Kafka consumer group rebalance: {}.", message);
1383 }
1384 }
1385 }
1386}
1387
1388#[cfg(test)]
1389mod test {
1390 use vector_lib::lookup::OwnedTargetPath;
1391 use vector_lib::schema::Definition;
1392
1393 use super::*;
1394
1395 pub fn kafka_host() -> String {
1396 std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
1397 }
1398 pub fn kafka_port() -> u16 {
1399 let port = std::env::var("KAFKA_PORT").unwrap_or_else(|_| "9091".into());
1400 port.parse().expect("Invalid port number")
1401 }
1402
1403 pub fn kafka_address() -> String {
1404 format!("{}:{}", kafka_host(), kafka_port())
1405 }
1406
1407 #[test]
1408 fn generate_config() {
1409 crate::test_util::test_generate_config::<KafkaSourceConfig>();
1410 }
1411
1412 pub(super) fn make_config(
1413 topic: &str,
1414 group: &str,
1415 log_namespace: LogNamespace,
1416 librdkafka_options: Option<HashMap<String, String>>,
1417 ) -> KafkaSourceConfig {
1418 KafkaSourceConfig {
1419 bootstrap_servers: kafka_address(),
1420 topics: vec![topic.into()],
1421 group_id: group.into(),
1422 auto_offset_reset: "beginning".into(),
1423 session_timeout_ms: Duration::from_millis(6000),
1424 commit_interval_ms: Duration::from_millis(1),
1425 librdkafka_options,
1426 key_field: default_key_field(),
1427 topic_key: default_topic_key(),
1428 partition_key: default_partition_key(),
1429 offset_key: default_offset_key(),
1430 headers_key: default_headers_key(),
1431 socket_timeout_ms: Duration::from_millis(60000),
1432 fetch_wait_max_ms: Duration::from_millis(100),
1433 log_namespace: Some(log_namespace == LogNamespace::Vector),
1434 ..Default::default()
1435 }
1436 }
1437
1438 #[test]
1439 fn test_output_schema_definition_vector_namespace() {
1440 let definitions = make_config("topic", "group", LogNamespace::Vector, None)
1441 .outputs(LogNamespace::Vector)
1442 .remove(0)
1443 .schema_definition(true);
1444
1445 assert_eq!(
1446 definitions,
1447 Some(
1448 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1449 .with_meaning(OwnedTargetPath::event_root(), "message")
1450 .with_metadata_field(
1451 &owned_value_path!("kafka", "timestamp"),
1452 Kind::timestamp(),
1453 Some("timestamp")
1454 )
1455 .with_metadata_field(
1456 &owned_value_path!("kafka", "message_key"),
1457 Kind::bytes(),
1458 None
1459 )
1460 .with_metadata_field(&owned_value_path!("kafka", "topic"), Kind::bytes(), None)
1461 .with_metadata_field(
1462 &owned_value_path!("kafka", "partition"),
1463 Kind::bytes(),
1464 None
1465 )
1466 .with_metadata_field(&owned_value_path!("kafka", "offset"), Kind::bytes(), None)
1467 .with_metadata_field(
1468 &owned_value_path!("kafka", "headers"),
1469 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1470 None
1471 )
1472 .with_metadata_field(
1473 &owned_value_path!("vector", "ingest_timestamp"),
1474 Kind::timestamp(),
1475 None
1476 )
1477 .with_metadata_field(
1478 &owned_value_path!("vector", "source_type"),
1479 Kind::bytes(),
1480 None
1481 )
1482 )
1483 )
1484 }
1485
1486 #[test]
1487 fn test_output_schema_definition_legacy_namespace() {
1488 let definitions = make_config("topic", "group", LogNamespace::Legacy, None)
1489 .outputs(LogNamespace::Legacy)
1490 .remove(0)
1491 .schema_definition(true);
1492
1493 assert_eq!(
1494 definitions,
1495 Some(
1496 Definition::new_with_default_metadata(Kind::json(), [LogNamespace::Legacy])
1497 .unknown_fields(Kind::undefined())
1498 .with_event_field(
1499 &owned_value_path!("message"),
1500 Kind::bytes(),
1501 Some("message")
1502 )
1503 .with_event_field(
1504 &owned_value_path!("timestamp"),
1505 Kind::timestamp(),
1506 Some("timestamp")
1507 )
1508 .with_event_field(&owned_value_path!("message_key"), Kind::bytes(), None)
1509 .with_event_field(&owned_value_path!("topic"), Kind::bytes(), None)
1510 .with_event_field(&owned_value_path!("partition"), Kind::bytes(), None)
1511 .with_event_field(&owned_value_path!("offset"), Kind::bytes(), None)
1512 .with_event_field(
1513 &owned_value_path!("headers"),
1514 Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1515 None
1516 )
1517 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1518 )
1519 )
1520 }
1521
1522 #[tokio::test]
1523 async fn consumer_create_ok() {
1524 let config = make_config("topic", "group", LogNamespace::Legacy, None);
1525 assert!(create_consumer(&config, true).is_ok());
1526 }
1527
1528 #[tokio::test]
1529 async fn consumer_create_incorrect_auto_offset_reset() {
1530 let config = KafkaSourceConfig {
1531 auto_offset_reset: "incorrect-auto-offset-reset".to_string(),
1532 ..make_config("topic", "group", LogNamespace::Legacy, None)
1533 };
1534 assert!(create_consumer(&config, true).is_err());
1535 }
1536}
1537
1538#[cfg(feature = "kafka-integration-tests")]
1539#[cfg(test)]
1540mod integration_test {
1541 use std::time::Duration;
1542
1543 use chrono::{DateTime, SubsecRound, Utc};
1544 use futures::Stream;
1545 use futures_util::stream::FuturesUnordered;
1546 use rdkafka::{
1547 admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
1548 client::DefaultClientContext,
1549 config::{ClientConfig, FromClientConfig},
1550 consumer::BaseConsumer,
1551 message::{Header, OwnedHeaders},
1552 producer::{FutureProducer, FutureRecord},
1553 util::Timeout,
1554 Offset, TopicPartitionList,
1555 };
1556 use stream_cancel::{Trigger, Tripwire};
1557 use tokio::time::sleep;
1558 use vector_lib::event::EventStatus;
1559 use vrl::{event_path, value};
1560
1561 use super::{test::*, *};
1562 use crate::{
1563 event::{EventArray, EventContainer},
1564 shutdown::ShutdownSignal,
1565 test_util::{collect_n, components::assert_source_compliance, random_string},
1566 SourceSender,
1567 };
1568
1569 const KEY: &str = "my key";
1570 const TEXT: &str = "my message";
1571 const HEADER_KEY: &str = "my header";
1572 const HEADER_VALUE: &str = "my header value";
1573
1574 fn kafka_test_topic() -> String {
1575 std::env::var("KAFKA_TEST_TOPIC")
1576 .unwrap_or_else(|_| format!("test-topic-{}", random_string(10)))
1577 }
1578 fn kafka_max_bytes() -> String {
1579 std::env::var("KAFKA_MAX_BYTES").unwrap_or_else(|_| "1024".into())
1580 }
1581
1582 fn client_config<T: FromClientConfig>(group: Option<&str>) -> T {
1583 let mut client = ClientConfig::new();
1584 client.set("bootstrap.servers", kafka_address());
1585 client.set("produce.offset.report", "true");
1586 client.set("message.timeout.ms", "5000");
1587 client.set("auto.commit.interval.ms", "1");
1588 if let Some(group) = group {
1589 client.set("group.id", group);
1590 }
1591 client.create().expect("Producer creation error")
1592 }
1593
1594 async fn send_events(topic: String, partitions: i32, count: usize) -> DateTime<Utc> {
1595 let now = Utc::now();
1596 let timestamp = now.timestamp_millis();
1597
1598 let producer: &FutureProducer = &client_config(None);
1599 let topic_name = topic.as_ref();
1600
1601 create_topic(topic_name, partitions).await;
1602
1603 (0..count)
1604 .map(|i| async move {
1605 let text = format!("{TEXT} {i:03}");
1606 let key = format!("{KEY} {i}");
1607 let record = FutureRecord::to(topic_name)
1608 .payload(&text)
1609 .key(&key)
1610 .timestamp(timestamp)
1611 .headers(OwnedHeaders::new().insert(Header {
1612 key: HEADER_KEY,
1613 value: Some(HEADER_VALUE),
1614 }));
1615 if let Err(error) = producer.send(record, Timeout::Never).await {
1616 panic!("Cannot send event to Kafka: {error:?}");
1617 }
1618 })
1619 .collect::<FuturesUnordered<_>>()
1620 .collect::<Vec<_>>()
1621 .await;
1622
1623 now
1624 }
1625
1626 async fn send_to_test_topic(partitions: i32, count: usize) -> (String, String, DateTime<Utc>) {
1627 let topic = kafka_test_topic();
1628 let group_id = format!("test-group-{}", random_string(10));
1629
1630 let sent_at = send_events(topic.clone(), partitions, count).await;
1631
1632 (topic, group_id, sent_at)
1633 }
1634
1635 #[tokio::test]
1636 async fn consumes_event_with_acknowledgements() {
1637 send_receive(true, |_| false, 10, LogNamespace::Legacy).await;
1638 }
1639
1640 #[tokio::test]
1641 async fn consumes_event_with_acknowledgements_vector_namespace() {
1642 send_receive(true, |_| false, 10, LogNamespace::Vector).await;
1643 }
1644
1645 #[tokio::test]
1646 async fn consumes_event_without_acknowledgements() {
1647 send_receive(false, |_| false, 10, LogNamespace::Legacy).await;
1648 }
1649
1650 #[tokio::test]
1651 async fn consumes_event_without_acknowledgements_vector_namespace() {
1652 send_receive(false, |_| false, 10, LogNamespace::Vector).await;
1653 }
1654
1655 #[tokio::test]
1656 async fn handles_one_negative_acknowledgement() {
1657 send_receive(true, |n| n == 2, 10, LogNamespace::Legacy).await;
1658 }
1659
1660 #[tokio::test]
1661 async fn handles_one_negative_acknowledgement_vector_namespace() {
1662 send_receive(true, |n| n == 2, 10, LogNamespace::Vector).await;
1663 }
1664
1665 #[tokio::test]
1666 async fn handles_permanent_negative_acknowledgement() {
1667 send_receive(true, |n| n >= 2, 2, LogNamespace::Legacy).await;
1668 }
1669
1670 #[tokio::test]
1671 async fn handles_permanent_negative_acknowledgement_vector_namespace() {
1672 send_receive(true, |n| n >= 2, 2, LogNamespace::Vector).await;
1673 }
1674
1675 async fn send_receive(
1676 acknowledgements: bool,
1677 error_at: impl Fn(usize) -> bool,
1678 receive_count: usize,
1679 log_namespace: LogNamespace,
1680 ) {
1681 const SEND_COUNT: usize = 10;
1682
1683 let topic = format!("test-topic-{}", random_string(10));
1684 let group_id = format!("test-group-{}", random_string(10));
1685 let config = make_config(&topic, &group_id, log_namespace, None);
1686
1687 let now = send_events(topic.clone(), 1, 10).await;
1688
1689 let events = assert_source_compliance(&["protocol", "topic", "partition"], async move {
1690 let (tx, rx) = SourceSender::new_test_errors(error_at);
1691 let (trigger_shutdown, shutdown_done) =
1692 spawn_kafka(tx, config, acknowledgements, false, log_namespace);
1693 let events = collect_n(rx, SEND_COUNT).await;
1694 tokio::task::yield_now().await;
1697 drop(trigger_shutdown);
1698 shutdown_done.await;
1699
1700 events
1701 })
1702 .await;
1703
1704 let offset = fetch_tpl_offset(&group_id, &topic, 0);
1705 assert_eq!(offset, Offset::from_raw(receive_count as i64));
1706
1707 assert_eq!(events.len(), SEND_COUNT);
1708 for (i, event) in events.into_iter().enumerate() {
1709 if let LogNamespace::Legacy = log_namespace {
1710 assert_eq!(
1711 event.as_log()[log_schema().message_key().unwrap().to_string()],
1712 format!("{TEXT} {i:03}").into()
1713 );
1714 assert_eq!(event.as_log()["message_key"], format!("{KEY} {i}").into());
1715 assert_eq!(
1716 event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1717 "kafka".into()
1718 );
1719 assert_eq!(
1720 event.as_log()[log_schema().timestamp_key().unwrap().to_string()],
1721 now.trunc_subsecs(3).into()
1722 );
1723 assert_eq!(event.as_log()["topic"], topic.clone().into());
1724 assert!(event.as_log().contains("partition"));
1725 assert!(event.as_log().contains("offset"));
1726 let mut expected_headers = ObjectMap::new();
1727 expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1728 assert_eq!(event.as_log()["headers"], Value::from(expected_headers));
1729 } else {
1730 let meta = event.as_log().metadata().value();
1731
1732 assert_eq!(
1733 meta.get(path!("vector", "source_type")).unwrap(),
1734 &value!(KafkaSourceConfig::NAME)
1735 );
1736 assert!(meta
1737 .get(path!("vector", "ingest_timestamp"))
1738 .unwrap()
1739 .is_timestamp());
1740
1741 assert_eq!(
1742 event.as_log().value(),
1743 &value!(format!("{} {:03}", TEXT, i))
1744 );
1745 assert_eq!(
1746 meta.get(path!("kafka", "message_key")).unwrap(),
1747 &value!(format!("{} {}", KEY, i))
1748 );
1749
1750 assert_eq!(
1751 meta.get(path!("kafka", "timestamp")).unwrap(),
1752 &value!(now.trunc_subsecs(3))
1753 );
1754 assert_eq!(
1755 meta.get(path!("kafka", "topic")).unwrap(),
1756 &value!(topic.clone())
1757 );
1758 assert!(meta.get(path!("kafka", "partition")).unwrap().is_integer(),);
1759 assert!(meta.get(path!("kafka", "offset")).unwrap().is_integer(),);
1760
1761 let mut expected_headers = ObjectMap::new();
1762 expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1763 assert_eq!(
1764 meta.get(path!("kafka", "headers")).unwrap(),
1765 &Value::from(expected_headers)
1766 );
1767 }
1768 }
1769 }
1770
1771 fn make_rand_config() -> (String, String, KafkaSourceConfig) {
1772 let topic = format!("test-topic-{}", random_string(10));
1773 let group_id = format!("test-group-{}", random_string(10));
1774 let config = make_config(&topic, &group_id, LogNamespace::Legacy, None);
1775 (topic, group_id, config)
1776 }
1777
1778 fn delay_pipeline(
1779 id: usize,
1780 delay: Duration,
1781 status: EventStatus,
1782 ) -> (SourceSender, impl Stream<Item = EventArray> + Unpin) {
1783 let (pipe, recv) = SourceSender::new_test_sender_with_buffer(100);
1784 let recv = recv.into_stream();
1785 let recv = recv.then(move |item| async move {
1786 let mut events = item.events;
1787 events.iter_logs_mut().for_each(|log| {
1788 log.insert(event_path!("pipeline_id"), id.to_string());
1789 });
1790 sleep(delay).await;
1791 events.iter_events_mut().for_each(|mut event| {
1792 let metadata = event.metadata_mut();
1793 metadata.update_status(status);
1794 metadata.update_sources();
1795 });
1796 events
1797 });
1798 (pipe, Box::pin(recv))
1799 }
1800
1801 fn spawn_kafka(
1802 out: SourceSender,
1803 config: KafkaSourceConfig,
1804 acknowledgements: bool,
1805 eof: bool,
1806 log_namespace: LogNamespace,
1807 ) -> (Trigger, Tripwire) {
1808 let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
1809
1810 let decoder = DecodingConfig::new(
1811 config.framing.clone(),
1812 config.decoding.clone(),
1813 log_namespace,
1814 )
1815 .build()
1816 .unwrap();
1817
1818 let (consumer, callback_rx) = create_consumer(&config, acknowledgements).unwrap();
1819
1820 tokio::spawn(kafka_source(
1821 config,
1822 consumer,
1823 callback_rx,
1824 decoder,
1825 out,
1826 shutdown,
1827 eof,
1828 log_namespace,
1829 ));
1830 (trigger_shutdown, shutdown_done)
1831 }
1832
1833 fn fetch_tpl_offset(group_id: &str, topic: &str, partition: i32) -> Offset {
1834 let client: BaseConsumer = client_config(Some(group_id));
1835 client.subscribe(&[topic]).expect("Subscribing failed");
1836
1837 let mut tpl = TopicPartitionList::new();
1838 tpl.add_partition(topic, partition);
1839 client
1840 .committed_offsets(tpl, Duration::from_secs(1))
1841 .expect("Getting committed offsets failed")
1842 .find_partition(topic, partition)
1843 .expect("Missing topic/partition")
1844 .offset()
1845 }
1846
1847 async fn create_topic(topic: &str, partitions: i32) {
1848 let client: AdminClient<DefaultClientContext> = client_config(None);
1849 let topic_results = client
1850 .create_topics(
1851 [&NewTopic {
1852 name: topic,
1853 num_partitions: partitions,
1854 replication: TopicReplication::Fixed(1),
1855 config: vec![],
1856 }],
1857 &AdminOptions::default(),
1858 )
1859 .await
1860 .expect("create_topics failed");
1861
1862 for result in topic_results {
1863 if let Err((topic, err)) = result {
1864 if err != rdkafka::types::RDKafkaErrorCode::TopicAlreadyExists {
1865 panic!("Creating a topic failed: {:?}", (topic, err))
1866 }
1867 }
1868 }
1869 }
1870
1871 #[ignore]
1881 #[tokio::test]
1882 async fn handles_rebalance() {
1883 const NEVENTS: usize = 200;
1897 const DELAY: u64 = 100;
1898
1899 let (topic, group_id, config) = make_rand_config();
1900 create_topic(&topic, 2).await;
1901
1902 let _send_start = send_events(topic.clone(), 1, NEVENTS).await;
1903
1904 let (tx, rx1) = delay_pipeline(1, Duration::from_millis(200), EventStatus::Delivered);
1905 let (trigger_shutdown1, shutdown_done1) =
1906 spawn_kafka(tx, config.clone(), true, false, LogNamespace::Legacy);
1907 let events1 = tokio::spawn(collect_n(rx1, NEVENTS));
1908
1909 sleep(Duration::from_secs(1)).await;
1910
1911 let (tx, rx2) = delay_pipeline(2, Duration::from_millis(DELAY), EventStatus::Delivered);
1912 let (trigger_shutdown2, shutdown_done2) =
1913 spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1914 let events2 = tokio::spawn(collect_n(rx2, NEVENTS));
1915
1916 sleep(Duration::from_secs(5)).await;
1917
1918 drop(trigger_shutdown1);
1919 let events1 = events1.await.unwrap();
1920 shutdown_done1.await;
1921
1922 sleep(Duration::from_secs(5)).await;
1923
1924 drop(trigger_shutdown2);
1925 let events2 = events2.await.unwrap();
1926 shutdown_done2.await;
1927
1928 sleep(Duration::from_secs(1)).await;
1929
1930 assert!(!events1.is_empty());
1931 assert!(!events2.is_empty());
1932
1933 match fetch_tpl_offset(&group_id, &topic, 0) {
1934 Offset::Offset(offset) => {
1935 assert!((offset as isize - events1.len() as isize).abs() <= 1)
1936 }
1937 o => panic!("Invalid offset for partition 0 {o:?}"),
1938 }
1939
1940 match fetch_tpl_offset(&group_id, &topic, 1) {
1941 Offset::Offset(offset) => {
1942 assert!((offset as isize - events2.len() as isize).abs() <= 1)
1943 }
1944 o => panic!("Invalid offset for partition 0 {o:?}"),
1945 }
1946
1947 let mut all_events = events1
1948 .into_iter()
1949 .chain(events2.into_iter())
1950 .flat_map(map_logs)
1951 .collect::<Vec<String>>();
1952 all_events.sort();
1953
1954 }
1956
1957 #[tokio::test]
1958 async fn drains_acknowledgements_at_shutdown() {
1959 let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
1961 .unwrap_or_else(|_| "125000".into())
1962 .parse()
1963 .expect("Number of messages to send to kafka.");
1964 let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
1965 .unwrap_or_else(|_| format!("{send_count}"))
1966 .parse()
1967 .expect("Number of messages to expect consumers to process.");
1968 let delay_ms: u64 = std::env::var("KAFKA_SHUTDOWN_DELAY")
1969 .unwrap_or_else(|_| "2000".into())
1970 .parse()
1971 .expect("Number of milliseconds before shutting down first consumer.");
1972
1973 let (topic, group_id, _) = send_to_test_topic(1, send_count).await;
1974
1975 let mut opts = HashMap::new();
1978 opts.insert("enable.partition.eof".into(), "true".into());
1980 opts.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
1981 let events1 = {
1982 let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts.clone()));
1983 let (tx, rx) = SourceSender::new_test_errors(|_| false);
1984 let (trigger_shutdown, shutdown_done) =
1985 spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1986 let (events, _) = tokio::join!(rx.collect::<Vec<Event>>(), async move {
1987 sleep(Duration::from_millis(delay_ms)).await;
1988 drop(trigger_shutdown);
1989 });
1990 shutdown_done.await;
1991 events
1992 };
1993
1994 debug!("Consumer group.id: {}", &group_id);
1995 debug!(
1996 "First consumer read {} of {} messages.",
1997 events1.len(),
1998 expect_count
1999 );
2000
2001 let events2 = {
2003 let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts));
2004 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2005 let (trigger_shutdown, shutdown_done) =
2006 spawn_kafka(tx, config, true, true, LogNamespace::Legacy);
2007 let events = rx.collect::<Vec<Event>>().await;
2008 drop(trigger_shutdown);
2009 shutdown_done.await;
2010 events
2011 };
2012
2013 debug!(
2014 "Second consumer read {} of {} messages.",
2015 events2.len(),
2016 expect_count
2017 );
2018
2019 let total = events1.len() + events2.len();
2021 assert_ne!(
2022 events1.len(),
2023 0,
2024 "First batch of events should be non-zero (increase KAFKA_SHUTDOWN_DELAY?)"
2025 );
2026 assert_ne!(events2.len(), 0, "Second batch of events should be non-zero (decrease KAFKA_SHUTDOWN_DELAY or increase KAFKA_SEND_COUNT?) ");
2027 assert_eq!(total, expect_count);
2028 }
2029
2030 async fn consume_with_rebalance(rebalance_strategy: String) {
2031 let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
2033 .unwrap_or_else(|_| "125000".into())
2034 .parse()
2035 .expect("Number of messages to send to kafka.");
2036 let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
2037 .unwrap_or_else(|_| format!("{send_count}"))
2038 .parse()
2039 .expect("Number of messages to expect consumers to process.");
2040 let delay_ms: u64 = std::env::var("KAFKA_CONSUMER_DELAY")
2041 .unwrap_or_else(|_| "2000".into())
2042 .parse()
2043 .expect("Number of milliseconds before shutting down first consumer.");
2044
2045 let (topic, group_id, _) = send_to_test_topic(6, send_count).await;
2046 debug!("Topic: {}", &topic);
2047 debug!("Consumer group.id: {}", &group_id);
2048
2049 let mut kafka_options = HashMap::new();
2052 kafka_options.insert("enable.partition.eof".into(), "true".into());
2053 kafka_options.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
2054 kafka_options.insert("partition.assignment.strategy".into(), rebalance_strategy);
2055 let config1 = make_config(
2056 &topic,
2057 &group_id,
2058 LogNamespace::Legacy,
2059 Some(kafka_options.clone()),
2060 );
2061 let config2 = config1.clone();
2062 let config3 = config1.clone();
2063 let config4 = config1.clone();
2064
2065 let (events1, events2, events3) = tokio::join!(
2066 async move {
2067 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2068 let (_trigger_shutdown, _shutdown_done) =
2069 spawn_kafka(tx, config1, true, true, LogNamespace::Legacy);
2070
2071 rx.collect::<Vec<Event>>().await
2072 },
2073 async move {
2074 sleep(Duration::from_millis(delay_ms)).await;
2075 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2076 let (_trigger_shutdown, _shutdown_done) =
2077 spawn_kafka(tx, config2, true, true, LogNamespace::Legacy);
2078
2079 rx.collect::<Vec<Event>>().await
2080 },
2081 async move {
2082 sleep(Duration::from_millis(delay_ms * 2)).await;
2083 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2084 let (_trigger_shutdown, _shutdown_done) =
2085 spawn_kafka(tx, config3, true, true, LogNamespace::Legacy);
2086
2087 rx.collect::<Vec<Event>>().await
2088 }
2089 );
2090
2091 let unconsumed = async move {
2092 let (tx, rx) = SourceSender::new_test_errors(|_| false);
2093 let (_trigger_shutdown, _shutdown_done) =
2094 spawn_kafka(tx, config4, true, true, LogNamespace::Legacy);
2095
2096 rx.collect::<Vec<Event>>().await
2097 }
2098 .await;
2099
2100 debug!(
2101 "First consumer read {} of {} messages.",
2102 events1.len(),
2103 expect_count
2104 );
2105
2106 debug!(
2107 "Second consumer read {} of {} messages.",
2108 events2.len(),
2109 expect_count
2110 );
2111 debug!(
2112 "Third consumer read {} of {} messages.",
2113 events3.len(),
2114 expect_count
2115 );
2116
2117 let total = events1.len() + events2.len() + events3.len();
2119 assert_ne!(
2120 events1.len(),
2121 0,
2122 "First batch of events should be non-zero (increase delay?)"
2123 );
2124 assert_ne!(
2125 events2.len(),
2126 0,
2127 "Second batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2128 );
2129 assert_ne!(
2130 events3.len(),
2131 0,
2132 "Third batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2133 );
2134 assert_eq!(
2135 unconsumed.len(),
2136 0,
2137 "The first set of consumers should consume and ack all messages."
2138 );
2139 assert_eq!(total, expect_count);
2140 }
2141
2142 #[tokio::test]
2143 async fn drains_acknowledgements_during_rebalance_default_assignments() {
2144 consume_with_rebalance("range,roundrobin".into()).await;
2146 }
2147 #[tokio::test]
2148 async fn drains_acknowledgements_during_rebalance_sticky_assignments() {
2149 consume_with_rebalance("cooperative-sticky".into()).await;
2152 }
2153
2154 fn map_logs(events: EventArray) -> impl Iterator<Item = String> {
2155 events.into_events().map(|event| {
2156 let log = event.into_log();
2157 format!(
2158 "{} {} {} {}",
2159 log["message"].to_string_lossy(),
2160 log["topic"].to_string_lossy(),
2161 log["partition"].to_string_lossy(),
2162 log["offset"].to_string_lossy(),
2163 )
2164 })
2165 }
2166}