vector/sources/
kafka.rs

1use std::{
2    collections::{HashMap, HashSet},
3    io::Cursor,
4    pin::Pin,
5    sync::{
6        Arc, OnceLock, Weak,
7        mpsc::{SyncSender, sync_channel},
8    },
9    time::Duration,
10};
11
12use async_stream::stream;
13use bytes::Bytes;
14use chrono::{DateTime, TimeZone, Utc};
15use futures::{Stream, StreamExt};
16use futures_util::future::OptionFuture;
17use rdkafka::{
18    ClientConfig, ClientContext, Statistics, TopicPartitionList,
19    consumer::{
20        BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance, StreamConsumer,
21        stream_consumer::StreamPartitionQueue,
22    },
23    error::KafkaError,
24    message::{BorrowedMessage, Headers as _, Message},
25    types::RDKafkaErrorCode,
26};
27use serde_with::serde_as;
28use snafu::{ResultExt, Snafu};
29use tokio::{
30    runtime::Handle,
31    sync::{
32        mpsc::{self, UnboundedReceiver, UnboundedSender},
33        oneshot,
34    },
35    task::JoinSet,
36    time::Sleep,
37};
38use tracing::{Instrument, Span};
39use vector_lib::{
40    EstimatedJsonEncodedSizeOf,
41    codecs::{
42        DecoderFramedRead, StreamDecodingError,
43        decoding::{DeserializerConfig, FramingConfig},
44    },
45    config::{LegacyKey, LogNamespace},
46    configurable::configurable_component,
47    finalizer::OrderedFinalizer,
48    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
49};
50use vrl::value::{Kind, ObjectMap, kind::Collection};
51
52use crate::{
53    SourceSender,
54    codecs::{Decoder, DecodingConfig},
55    config::{
56        LogSchema, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
57        log_schema,
58    },
59    event::{BatchNotifier, BatchStatus, Event, Value},
60    internal_events::{
61        KafkaBytesReceived, KafkaEventsReceived, KafkaOffsetUpdateError, KafkaReadError,
62        StreamClosedError,
63    },
64    kafka,
65    serde::{bool_or_struct, default_decoding, default_framing_message_based},
66    shutdown::ShutdownSignal,
67};
68
69#[derive(Debug, Snafu)]
70enum BuildError {
71    #[snafu(display("The drain_timeout_ms ({}) must be less than session_timeout_ms ({})", value, session_timeout_ms.as_millis()))]
72    InvalidDrainTimeout {
73        value: u64,
74        session_timeout_ms: Duration,
75    },
76    #[snafu(display("Could not create Kafka consumer: {}", source))]
77    CreateError { source: rdkafka::error::KafkaError },
78    #[snafu(display("Could not subscribe to Kafka topics: {}", source))]
79    SubscribeError { source: rdkafka::error::KafkaError },
80}
81
82/// Metrics (beta) configuration.
83#[configurable_component]
84#[derive(Clone, Debug, Default)]
85struct Metrics {
86    /// Expose topic lag metrics for all topics and partitions. Metric names are `kafka_consumer_lag`.
87    pub topic_lag_metric: bool,
88}
89
90/// Configuration for the `kafka` source.
91#[serde_as]
92#[configurable_component(source("kafka", "Collect logs from Apache Kafka."))]
93#[derive(Clone, Debug, Derivative)]
94#[derivative(Default)]
95#[serde(deny_unknown_fields)]
96pub struct KafkaSourceConfig {
97    /// A comma-separated list of Kafka bootstrap servers.
98    ///
99    /// These are the servers in a Kafka cluster that a client should use to bootstrap its connection to the cluster,
100    /// allowing discovery of all the other hosts in the cluster.
101    ///
102    /// Must be in the form of `host:port`, and comma-separated.
103    #[configurable(metadata(docs::examples = "10.14.22.123:9092,10.14.23.332:9092"))]
104    bootstrap_servers: String,
105
106    /// The Kafka topics names to read events from.
107    ///
108    /// Regular expression syntax is supported if the topic begins with `^`.
109    #[configurable(metadata(
110        docs::examples = "^(prefix1|prefix2)-.+",
111        docs::examples = "topic-1",
112        docs::examples = "topic-2"
113    ))]
114    topics: Vec<String>,
115
116    /// The consumer group name to be used to consume events from Kafka.
117    #[configurable(metadata(docs::examples = "consumer-group-name"))]
118    group_id: String,
119
120    /// If offsets for consumer group do not exist, set them using this strategy.
121    ///
122    /// See the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for the `auto.offset.reset` option for further clarification.
123    #[serde(default = "default_auto_offset_reset")]
124    #[configurable(metadata(docs::examples = "example_auto_offset_reset_values()"))]
125    auto_offset_reset: String,
126
127    /// The Kafka session timeout.
128    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
129    #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
130    #[configurable(metadata(docs::advanced))]
131    #[serde(default = "default_session_timeout_ms")]
132    #[configurable(metadata(docs::human_name = "Session Timeout"))]
133    session_timeout_ms: Duration,
134
135    /// Timeout to drain pending acknowledgements during shutdown or a Kafka
136    /// consumer group rebalance.
137    ///
138    /// When Vector shuts down or the Kafka consumer group revokes partitions from this
139    /// consumer, wait a maximum of `drain_timeout_ms` for the source to
140    /// process pending acknowledgements. Must be less than `session_timeout_ms`
141    /// to ensure the consumer is not excluded from the group during a rebalance.
142    ///
143    /// Default value is half of `session_timeout_ms`.
144    #[serde(skip_serializing_if = "Option::is_none")]
145    #[configurable(metadata(docs::examples = 2500, docs::examples = 5000))]
146    #[configurable(metadata(docs::advanced))]
147    #[configurable(metadata(docs::human_name = "Drain Timeout"))]
148    drain_timeout_ms: Option<u64>,
149
150    /// Timeout for network requests.
151    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
152    #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))]
153    #[configurable(metadata(docs::advanced))]
154    #[serde(default = "default_socket_timeout_ms")]
155    #[configurable(metadata(docs::human_name = "Socket Timeout"))]
156    socket_timeout_ms: Duration,
157
158    /// Maximum time the broker may wait to fill the response.
159    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
160    #[configurable(metadata(docs::examples = 50, docs::examples = 100))]
161    #[configurable(metadata(docs::advanced))]
162    #[serde(default = "default_fetch_wait_max_ms")]
163    #[configurable(metadata(docs::human_name = "Max Fetch Wait Time"))]
164    fetch_wait_max_ms: Duration,
165
166    /// The frequency that the consumer offsets are committed (written) to offset storage.
167    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
168    #[serde(default = "default_commit_interval_ms")]
169    #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
170    #[configurable(metadata(docs::human_name = "Commit Interval"))]
171    commit_interval_ms: Duration,
172
173    /// Overrides the name of the log field used to add the message key to each event.
174    ///
175    /// The value is the message key of the Kafka message itself.
176    ///
177    /// By default, `"message_key"` is used.
178    #[serde(default = "default_key_field")]
179    #[configurable(metadata(docs::examples = "message_key"))]
180    key_field: OptionalValuePath,
181
182    /// Overrides the name of the log field used to add the topic to each event.
183    ///
184    /// The value is the topic from which the Kafka message was consumed from.
185    ///
186    /// By default, `"topic"` is used.
187    #[serde(default = "default_topic_key")]
188    #[configurable(metadata(docs::examples = "topic"))]
189    topic_key: OptionalValuePath,
190
191    /// Overrides the name of the log field used to add the partition to each event.
192    ///
193    /// The value is the partition from which the Kafka message was consumed from.
194    ///
195    /// By default, `"partition"` is used.
196    #[serde(default = "default_partition_key")]
197    #[configurable(metadata(docs::examples = "partition"))]
198    partition_key: OptionalValuePath,
199
200    /// Overrides the name of the log field used to add the offset to each event.
201    ///
202    /// The value is the offset of the Kafka message itself.
203    ///
204    /// By default, `"offset"` is used.
205    #[serde(default = "default_offset_key")]
206    #[configurable(metadata(docs::examples = "offset"))]
207    offset_key: OptionalValuePath,
208
209    /// Overrides the name of the log field used to add the headers to each event.
210    ///
211    /// The value is the headers of the Kafka message itself.
212    ///
213    /// By default, `"headers"` is used.
214    #[serde(default = "default_headers_key")]
215    #[configurable(metadata(docs::examples = "headers"))]
216    headers_key: OptionalValuePath,
217
218    /// Advanced options set directly on the underlying `librdkafka` client.
219    ///
220    /// See the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for details.
221    #[configurable(metadata(docs::examples = "example_librdkafka_options()"))]
222    #[configurable(metadata(docs::advanced))]
223    #[configurable(metadata(
224        docs::additional_props_description = "A librdkafka configuration option."
225    ))]
226    librdkafka_options: Option<HashMap<String, String>>,
227
228    #[serde(flatten)]
229    auth: kafka::KafkaAuthConfig,
230
231    #[configurable(derived)]
232    #[configurable(metadata(docs::advanced))]
233    #[serde(default = "default_framing_message_based")]
234    #[derivative(Default(value = "default_framing_message_based()"))]
235    framing: FramingConfig,
236
237    #[configurable(derived)]
238    #[serde(default = "default_decoding")]
239    #[derivative(Default(value = "default_decoding()"))]
240    decoding: DeserializerConfig,
241
242    #[configurable(derived)]
243    #[serde(default, deserialize_with = "bool_or_struct")]
244    acknowledgements: SourceAcknowledgementsConfig,
245
246    /// The namespace to use for logs. This overrides the global setting.
247    #[configurable(metadata(docs::hidden))]
248    #[serde(default)]
249    log_namespace: Option<bool>,
250
251    #[configurable(derived)]
252    #[serde(default)]
253    metrics: Metrics,
254}
255
256impl KafkaSourceConfig {
257    fn keys(&self) -> Keys {
258        Keys::from(log_schema(), self)
259    }
260}
261
262const fn default_session_timeout_ms() -> Duration {
263    Duration::from_millis(10000) // default in librdkafka
264}
265
266const fn default_socket_timeout_ms() -> Duration {
267    Duration::from_millis(60000) // default in librdkafka
268}
269
270const fn default_fetch_wait_max_ms() -> Duration {
271    Duration::from_millis(100) // default in librdkafka
272}
273
274const fn default_commit_interval_ms() -> Duration {
275    Duration::from_millis(5000)
276}
277
278fn default_auto_offset_reset() -> String {
279    "largest".into() // default in librdkafka
280}
281
282fn default_key_field() -> OptionalValuePath {
283    OptionalValuePath::from(owned_value_path!("message_key"))
284}
285
286fn default_topic_key() -> OptionalValuePath {
287    OptionalValuePath::from(owned_value_path!("topic"))
288}
289
290fn default_partition_key() -> OptionalValuePath {
291    OptionalValuePath::from(owned_value_path!("partition"))
292}
293
294fn default_offset_key() -> OptionalValuePath {
295    OptionalValuePath::from(owned_value_path!("offset"))
296}
297
298fn default_headers_key() -> OptionalValuePath {
299    OptionalValuePath::from(owned_value_path!("headers"))
300}
301
302const fn example_auto_offset_reset_values() -> [&'static str; 7] {
303    [
304        "smallest",
305        "earliest",
306        "beginning",
307        "largest",
308        "latest",
309        "end",
310        "error",
311    ]
312}
313
314fn example_librdkafka_options() -> HashMap<String, String> {
315    HashMap::<_, _>::from_iter([
316        ("client.id".to_string(), "${ENV_VAR}".to_string()),
317        ("fetch.error.backoff.ms".to_string(), "1000".to_string()),
318        ("socket.send.buffer.bytes".to_string(), "100".to_string()),
319    ])
320}
321
322impl_generate_config_from_default!(KafkaSourceConfig);
323
324#[async_trait::async_trait]
325#[typetag::serde(name = "kafka")]
326impl SourceConfig for KafkaSourceConfig {
327    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
328        let log_namespace = cx.log_namespace(self.log_namespace);
329
330        let decoder =
331            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
332                .build()?;
333        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
334
335        if let Some(d) = self.drain_timeout_ms {
336            snafu::ensure!(
337                Duration::from_millis(d) <= self.session_timeout_ms,
338                InvalidDrainTimeoutSnafu {
339                    value: d,
340                    session_timeout_ms: self.session_timeout_ms
341                }
342            );
343        }
344
345        let (consumer, callback_rx) = create_consumer(self, acknowledgements)?;
346
347        Ok(Box::pin(kafka_source(
348            self.clone(),
349            consumer,
350            callback_rx,
351            decoder,
352            cx.out,
353            cx.shutdown,
354            false,
355            log_namespace,
356        )))
357    }
358
359    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
360        let log_namespace = global_log_namespace.merge(self.log_namespace);
361        let keys = self.keys();
362
363        let schema_definition = self
364            .decoding
365            .schema_definition(log_namespace)
366            .with_standard_vector_source_metadata()
367            .with_source_metadata(
368                Self::NAME,
369                keys.timestamp.map(LegacyKey::Overwrite),
370                &owned_value_path!("timestamp"),
371                Kind::timestamp(),
372                Some("timestamp"),
373            )
374            .with_source_metadata(
375                Self::NAME,
376                keys.topic.clone().map(LegacyKey::Overwrite),
377                &owned_value_path!("topic"),
378                Kind::bytes(),
379                None,
380            )
381            .with_source_metadata(
382                Self::NAME,
383                keys.partition.clone().map(LegacyKey::Overwrite),
384                &owned_value_path!("partition"),
385                Kind::bytes(),
386                None,
387            )
388            .with_source_metadata(
389                Self::NAME,
390                keys.offset.clone().map(LegacyKey::Overwrite),
391                &owned_value_path!("offset"),
392                Kind::bytes(),
393                None,
394            )
395            .with_source_metadata(
396                Self::NAME,
397                keys.headers.clone().map(LegacyKey::Overwrite),
398                &owned_value_path!("headers"),
399                Kind::object(Collection::empty().with_unknown(Kind::bytes())),
400                None,
401            )
402            .with_source_metadata(
403                Self::NAME,
404                keys.key_field.clone().map(LegacyKey::Overwrite),
405                &owned_value_path!("message_key"),
406                Kind::bytes(),
407                None,
408            );
409
410        vec![SourceOutput::new_maybe_logs(
411            self.decoding.output_type(),
412            schema_definition,
413        )]
414    }
415
416    fn can_acknowledge(&self) -> bool {
417        true
418    }
419}
420
421#[allow(clippy::too_many_arguments)]
422async fn kafka_source(
423    config: KafkaSourceConfig,
424    consumer: StreamConsumer<KafkaSourceContext>,
425    callback_rx: UnboundedReceiver<KafkaCallback>,
426    decoder: Decoder,
427    out: SourceSender,
428    shutdown: ShutdownSignal,
429    eof: bool,
430    log_namespace: LogNamespace,
431) -> Result<(), ()> {
432    let span = info_span!("kafka_source");
433    let consumer = Arc::new(consumer);
434
435    consumer
436        .context()
437        .consumer
438        .set(Arc::downgrade(&consumer))
439        .expect("Error setting up consumer context.");
440
441    // EOF signal allowing the coordination task to tell the kafka client task when all partitions have reached EOF
442    let (eof_tx, eof_rx) = eof.then(oneshot::channel::<()>).unzip();
443
444    let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
445    if let Err(e) = consumer.subscribe(&topics).context(SubscribeSnafu) {
446        error!("{}", e);
447        return Err(());
448    }
449
450    let coordination_task = {
451        let span = span.clone();
452        let consumer = Arc::clone(&consumer);
453        let drain_timeout_ms = config
454            .drain_timeout_ms
455            .map_or(config.session_timeout_ms / 2, Duration::from_millis);
456        let consumer_state =
457            ConsumerStateInner::<Consuming>::new(config, decoder, out, log_namespace, span);
458        tokio::spawn(async move {
459            coordinate_kafka_callbacks(
460                consumer,
461                callback_rx,
462                consumer_state,
463                drain_timeout_ms,
464                eof_tx,
465            )
466            .await;
467        })
468    };
469
470    let client_task = {
471        let consumer = Arc::clone(&consumer);
472        tokio::task::spawn_blocking(move || {
473            let _enter = span.enter();
474            drive_kafka_consumer(consumer, shutdown, eof_rx);
475        })
476    };
477
478    _ = tokio::join!(client_task, coordination_task);
479    consumer.context().commit_consumer_state();
480
481    Ok(())
482}
483
484/// ConsumerStateInner implements a small struct/enum-based state machine.
485///
486/// With a ConsumerStateInner<Consuming>, the client is able to spawn new tasks
487/// when partitions are assigned. When a shutdown signal is received, or
488/// partitions are being revoked, the Consuming state is traded for a Draining
489/// state (and associated drain deadline future) via the `begin_drain` method
490///
491/// A ConsumerStateInner<Draining> keeps track of partitions that are expected
492/// to complete, and also owns the signal that, when dropped, indicates to the
493/// client driver task that it is safe to proceed with the rebalance or shutdown.
494/// When draining is complete, or the deadline is reached, Draining is traded in for
495/// either a Consuming (after a revoke) or Complete (in the case of shutdown) state,
496/// via the `finish_drain` method.
497///
498/// A ConsumerStateInner<Complete> is the final state, reached after a shutdown
499/// signal is received. This can not be traded for another state, and the
500/// coordination task should exit when this state is reached.
501struct ConsumerStateInner<S> {
502    config: KafkaSourceConfig,
503    decoder: Decoder,
504    out: SourceSender,
505    log_namespace: LogNamespace,
506    consumer_state: S,
507}
508struct Consuming {
509    /// The source's tracing Span used to instrument metrics emitted by consumer tasks
510    span: Span,
511}
512struct Draining {
513    /// The rendezvous channel sender from the revoke or shutdown callback. Sending on this channel
514    /// indicates to the kafka client task that one or more partitions have been drained, while
515    /// closing this channel indicates that all expected partitions have drained, or the drain
516    /// timeout has been reached.
517    signal: SyncSender<()>,
518
519    /// The set of topic-partition tasks that are required to complete during
520    /// the draining phase, populated at the beginning of a rebalance or shutdown.
521    /// Partitions that are being revoked, but not being actively consumed
522    /// (e.g. due to the consumer task exiting early) should not be included.
523    /// The draining phase is considered complete when this set is empty.
524    expect_drain: HashSet<TopicPartition>,
525
526    /// Whether the client is shutting down after draining. If set to true,
527    /// the `finish_drain` method will return a Complete state, otherwise
528    /// a Consuming state.
529    shutdown: bool,
530
531    /// The source's tracing Span used to instrument metrics emitted by consumer tasks
532    span: Span,
533}
534type OptionDeadline = OptionFuture<Pin<Box<Sleep>>>;
535enum ConsumerState {
536    Consuming(ConsumerStateInner<Consuming>),
537    Draining(ConsumerStateInner<Draining>),
538    Complete,
539}
540impl Draining {
541    fn new(signal: SyncSender<()>, shutdown: bool, span: Span) -> Self {
542        Self {
543            signal,
544            shutdown,
545            expect_drain: HashSet::new(),
546            span,
547        }
548    }
549
550    fn is_complete(&self) -> bool {
551        self.expect_drain.is_empty()
552    }
553}
554
555impl<C> ConsumerStateInner<C> {
556    fn complete(self, _deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
557        (None.into(), ConsumerState::Complete)
558    }
559}
560
561impl ConsumerStateInner<Consuming> {
562    const fn new(
563        config: KafkaSourceConfig,
564        decoder: Decoder,
565        out: SourceSender,
566        log_namespace: LogNamespace,
567        span: Span,
568    ) -> Self {
569        Self {
570            config,
571            decoder,
572            out,
573            log_namespace,
574            consumer_state: Consuming { span },
575        }
576    }
577
578    /// Spawn a task on the provided JoinSet to consume the kafka StreamPartitionQueue, and handle
579    /// acknowledgements for the messages consumed Returns a channel sender that can be used to
580    /// signal that the consumer should stop and drain pending acknowledgements, and an AbortHandle
581    /// that can be used to forcefully end the task.
582    fn consume_partition(
583        &self,
584        join_set: &mut JoinSet<(TopicPartition, PartitionConsumerStatus)>,
585        tp: TopicPartition,
586        consumer: Arc<StreamConsumer<KafkaSourceContext>>,
587        p: StreamPartitionQueue<KafkaSourceContext>,
588        acknowledgements: bool,
589        exit_eof: bool,
590    ) -> (oneshot::Sender<()>, tokio::task::AbortHandle) {
591        let keys = self.config.keys();
592        let decoder = self.decoder.clone();
593        let log_namespace = self.log_namespace;
594        let mut out = self.out.clone();
595
596        let (end_tx, mut end_signal) = oneshot::channel::<()>();
597
598        let handle = join_set.spawn(async move {
599            let mut messages = p.stream();
600            let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
601
602            // finalizer is the entry point for new pending acknowledgements;
603            // when it is dropped, no new messages will be consumed, and the
604            // task will end when it reaches the end of ack_stream
605            let mut finalizer = Some(finalizer);
606
607            let mut status = PartitionConsumerStatus::NormalExit;
608
609            loop {
610                tokio::select!(
611                    // Make sure to handle the acknowledgement stream before new messages to prevent
612                    // unbounded memory growth caused by those acks being handled slower than
613                    // incoming messages when the load is high.
614                    biased;
615
616                    // is_some() checks prevent polling end_signal after it completes
617                    _ = &mut end_signal, if finalizer.is_some() => {
618                        finalizer.take();
619                    },
620
621                    ack = ack_stream.next() => match ack {
622                        Some((status, entry)) => {
623                            if status == BatchStatus::Delivered
624                                && let Err(error) =  consumer.store_offset(&entry.topic, entry.partition, entry.offset) {
625                                    emit!(KafkaOffsetUpdateError { error });
626                                }
627                        }
628                        None if finalizer.is_none() => {
629                            debug!("Acknowledgement stream complete for partition {}:{}.", &tp.0, tp.1);
630                            break
631                        }
632                        None => {
633                            debug!("Acknowledgement stream empty for {}:{}", &tp.0, tp.1);
634                        }
635                    },
636
637                    message = messages.next(), if finalizer.is_some() => match message {
638                        None => unreachable!("MessageStream never calls Ready(None)"),
639                        Some(Err(error)) => match error {
640                            rdkafka::error::KafkaError::PartitionEOF(partition) if exit_eof => {
641                                debug!("EOF for partition {}.", partition);
642                                status = PartitionConsumerStatus::PartitionEOF;
643                                finalizer.take();
644                            },
645                            _ => emit!(KafkaReadError { error }),
646                        },
647                        Some(Ok(msg)) => {
648                            emit!(KafkaBytesReceived {
649                                byte_size: msg.payload_len(),
650                                protocol: "tcp",
651                                topic: msg.topic(),
652                                partition: msg.partition(),
653                            });
654                            parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await;
655                        }
656                    },
657                )
658            }
659            (tp, status)
660        }.instrument(self.consumer_state.span.clone()));
661        (end_tx, handle)
662    }
663
664    /// Consume self, and return a "Draining" ConsumerState, along with a Future
665    /// representing a drain deadline, based on max_drain_ms
666    fn begin_drain(
667        self,
668        max_drain_ms: Duration,
669        sig: SyncSender<()>,
670        shutdown: bool,
671    ) -> (OptionDeadline, ConsumerStateInner<Draining>) {
672        let deadline = Box::pin(tokio::time::sleep(max_drain_ms));
673
674        let draining = ConsumerStateInner {
675            config: self.config,
676            decoder: self.decoder,
677            out: self.out,
678            log_namespace: self.log_namespace,
679            consumer_state: Draining::new(sig, shutdown, self.consumer_state.span),
680        };
681
682        (Some(deadline).into(), draining)
683    }
684
685    pub const fn keep_consuming(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
686        (deadline, ConsumerState::Consuming(self))
687    }
688}
689
690impl ConsumerStateInner<Draining> {
691    /// Mark the given TopicPartition as being revoked, adding it to the set of
692    /// partitions expected to drain
693    fn revoke_partition(&mut self, tp: TopicPartition, end_signal: oneshot::Sender<()>) {
694        // Note that if this send() returns Err, it means the task has already
695        // ended, but the completion has not been processed yet (otherwise we wouldn't have access to the end_signal),
696        // so we should still add it to the "expect to drain" set
697        _ = end_signal.send(());
698        self.consumer_state.expect_drain.insert(tp);
699    }
700
701    /// Add the given TopicPartition to the set of known "drained" partitions,
702    /// i.e. the consumer has drained the acknowledgement channel. A signal is
703    /// sent on the signal channel, indicating to the client that offsets may be committed
704    fn partition_drained(&mut self, tp: TopicPartition) {
705        // This send() will only return Err if the receiver has already been disconnected (i.e. the
706        // kafka client task is no longer running)
707        _ = self.consumer_state.signal.send(());
708        self.consumer_state.expect_drain.remove(&tp);
709    }
710
711    /// Return true if all expected partitions have drained
712    fn is_drain_complete(&self) -> bool {
713        self.consumer_state.is_complete()
714    }
715
716    /// Finish partition drain mode. Consumes self and the drain deadline
717    /// future, and returns a "Consuming" or "Complete" ConsumerState
718    fn finish_drain(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
719        if self.consumer_state.shutdown {
720            self.complete(deadline)
721        } else {
722            (
723                None.into(),
724                ConsumerState::Consuming(ConsumerStateInner {
725                    config: self.config,
726                    decoder: self.decoder,
727                    out: self.out,
728                    log_namespace: self.log_namespace,
729                    consumer_state: Consuming {
730                        span: self.consumer_state.span,
731                    },
732                }),
733            )
734        }
735    }
736
737    pub const fn keep_draining(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
738        (deadline, ConsumerState::Draining(self))
739    }
740}
741
742async fn coordinate_kafka_callbacks(
743    consumer: Arc<StreamConsumer<KafkaSourceContext>>,
744    mut callbacks: UnboundedReceiver<KafkaCallback>,
745    consumer_state: ConsumerStateInner<Consuming>,
746    max_drain_ms: Duration,
747    mut eof: Option<oneshot::Sender<()>>,
748) {
749    let mut drain_deadline: OptionFuture<_> = None.into();
750    let mut consumer_state = ConsumerState::Consuming(consumer_state);
751
752    // A oneshot channel is used for each consumed partition, so that we can
753    // signal to that task to stop consuming, drain pending acks, and exit
754    let mut end_signals: HashMap<TopicPartition, oneshot::Sender<()>> = HashMap::new();
755
756    // The set of consumer tasks, each consuming a specific partition. The task
757    // is both consuming the messages (passing them to the output stream) _and_
758    // processing the corresponding acknowledgement stream. A consumer task
759    // should completely drain its acknowledgement stream after receiving an end signal
760    let mut partition_consumers: JoinSet<(TopicPartition, PartitionConsumerStatus)> =
761        Default::default();
762
763    // Handles that will let us end any consumer task that exceeds a drain deadline
764    let mut abort_handles: HashMap<TopicPartition, tokio::task::AbortHandle> = HashMap::new();
765
766    let exit_eof = eof.is_some();
767
768    while let ConsumerState::Consuming(_) | ConsumerState::Draining(_) = consumer_state {
769        tokio::select! {
770            Some(Ok((finished_partition, status))) = partition_consumers.join_next(), if !partition_consumers.is_empty() => {
771                debug!("Partition consumer finished for {}:{}", &finished_partition.0, finished_partition.1);
772                // If this task ended on its own, the end_signal for it will still be in here.
773                end_signals.remove(&finished_partition);
774                abort_handles.remove(&finished_partition);
775
776                (drain_deadline, consumer_state) = match consumer_state {
777                    ConsumerState::Complete => unreachable!("Partition consumer finished after completion."),
778                    ConsumerState::Draining(mut state) => {
779                        state.partition_drained(finished_partition);
780
781                        if state.is_drain_complete() {
782                            debug!("All expected partitions have drained.");
783                            state.finish_drain(drain_deadline)
784                        } else {
785                            state.keep_draining(drain_deadline)
786                        }
787                    },
788                    ConsumerState::Consuming(state) => {
789                        // If we are here, it is likely because the consumer
790                        // tasks are set up to exit upon reaching the end of the
791                        // partition.
792                        if !exit_eof {
793                            debug!("Partition consumer task finished, while not in draining mode.");
794                        }
795                        state.keep_consuming(drain_deadline)
796                    },
797                };
798
799                // PartitionConsumerStatus differentiates between a task that exited after
800                // being signaled to end, and one that reached the end of its partition and
801                // was configured to exit. After the last such task ends, we signal the kafka
802                // driver task to shut down the main consumer too. Note this is only used in tests.
803                if exit_eof && status == PartitionConsumerStatus::PartitionEOF && partition_consumers.is_empty() {
804                    debug!("All partitions have exited or reached EOF.");
805                    let _ = eof.take().map(|e| e.send(()));
806                }
807            },
808            Some(callback) = callbacks.recv() => match callback {
809                KafkaCallback::PartitionsAssigned(mut assigned_partitions, done) => match consumer_state {
810                    ConsumerState::Complete => unreachable!("Partition assignment received after completion."),
811                    ConsumerState::Draining(_) => error!("Partition assignment received while draining revoked partitions, maybe an invalid assignment."),
812                    ConsumerState::Consuming(ref consumer_state) => {
813                        let acks = consumer.context().acknowledgements;
814                        for tp in assigned_partitions.drain(0..) {
815                            let topic = tp.0.as_str();
816                            let partition = tp.1;
817                            match consumer.split_partition_queue(topic, partition) { Some(pq) => {
818                                debug!("Consuming partition {}:{}.", &tp.0, tp.1);
819                                let (end_tx, handle) = consumer_state.consume_partition(&mut partition_consumers, tp.clone(), Arc::clone(&consumer), pq, acks, exit_eof);
820                                abort_handles.insert(tp.clone(), handle);
821                                end_signals.insert(tp, end_tx);
822                            } _ => {
823                                warn!("Failed to get queue for assigned partition {}:{}.", &tp.0, tp.1);
824                            }}
825                        }
826                        // ensure this is retained until all individual queues are set up
827                        drop(done);
828                    }
829                },
830                KafkaCallback::PartitionsRevoked(mut revoked_partitions, drain) => (drain_deadline, consumer_state) = match consumer_state {
831                    ConsumerState::Complete => unreachable!("Partitions revoked after completion."),
832                    ConsumerState::Draining(d) => {
833                        // NB: This would only happen if the task driving the kafka client (i.e. rebalance handlers)
834                        // is not handling shutdown signals, and a revoke happens during a shutdown drain; otherwise
835                        // this is unreachable code.
836                        warn!("Kafka client is already draining revoked partitions.");
837                        d.keep_draining(drain_deadline)
838                    },
839                    ConsumerState::Consuming(state) => {
840                        let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, false);
841
842                        for tp in revoked_partitions.drain(0..) {
843                            match end_signals.remove(&tp) { Some(end) => {
844                                debug!("Revoking partition {}:{}", &tp.0, tp.1);
845                                state.revoke_partition(tp, end);
846                            } _ => {
847                                debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
848                            }}
849                        }
850
851                        state.keep_draining(deadline)
852                    }
853                },
854                KafkaCallback::ShuttingDown(drain) => (drain_deadline, consumer_state) = match consumer_state {
855                    ConsumerState::Complete => unreachable!("Shutdown received after completion."),
856                    // Shutting down is just like a full assignment revoke, but we also close the
857                    // callback channels, since we don't expect additional assignments or rebalances
858                    ConsumerState::Draining(state) => {
859                        // NB: This would only happen if the task driving the kafka client is
860                        // not handling shutdown signals; otherwise this is unreachable code
861                        error!("Kafka client handled a shutdown signal while a rebalance was in progress.");
862                        callbacks.close();
863                        state.keep_draining(drain_deadline)
864                    },
865                    ConsumerState::Consuming(state) => {
866                        callbacks.close();
867                        let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, true);
868                        if let Ok(tpl) = consumer.assignment() {
869                            // TODO  workaround for https://github.com/fede1024/rust-rdkafka/issues/681
870                            if tpl.capacity() == 0 {
871                                return;
872                            }
873                            tpl.elements()
874                                .iter()
875                                .for_each(|el| {
876
877                                let tp: TopicPartition = (el.topic().into(), el.partition());
878                                match end_signals.remove(&tp) { Some(end) => {
879                                    debug!("Shutting down and revoking partition {}:{}", &tp.0, tp.1);
880                                    state.revoke_partition(tp, end);
881                                } _ => {
882                                    debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
883                                }}
884                            });
885                        }
886                        // If shutdown was initiated by partition EOF mode, the drain phase
887                        // will already be complete and would time out if not accounted for here
888                        if state.is_drain_complete() {
889                            state.finish_drain(deadline)
890                        } else {
891                            state.keep_draining(deadline)
892                        }
893                    }
894                },
895            },
896
897            Some(_) = &mut drain_deadline => (drain_deadline, consumer_state) = match consumer_state {
898                ConsumerState::Complete => unreachable!("Drain deadline received after completion."),
899                ConsumerState::Consuming(state) => {
900                    warn!("A drain deadline fired outside of draining mode.");
901                    state.keep_consuming(None.into())
902                },
903                ConsumerState::Draining(mut draining) => {
904                    debug!("Acknowledgement drain deadline reached. Dropping any pending ack streams for revoked partitions.");
905                    for tp in draining.consumer_state.expect_drain.drain() {
906                        if let Some(handle) = abort_handles.remove(&tp) {
907                            handle.abort();
908                        }
909                    }
910                    draining.finish_drain(drain_deadline)
911                }
912            },
913        }
914    }
915}
916
917fn drive_kafka_consumer(
918    consumer: Arc<StreamConsumer<KafkaSourceContext>>,
919    mut shutdown: ShutdownSignal,
920    eof: Option<oneshot::Receiver<()>>,
921) {
922    Handle::current().block_on(async move {
923        let mut eof: OptionFuture<_> = eof.into();
924        let mut stream = consumer.stream();
925        loop {
926            tokio::select! {
927                _ = &mut shutdown => {
928                    consumer.context().shutdown();
929                    break
930                },
931
932                Some(_) = &mut eof => {
933                    consumer.context().shutdown();
934                    break
935                },
936
937                // NB: messages are not received on this thread, however we poll
938                // the consumer to serve client callbacks, such as rebalance notifications
939                message = stream.next() => match message {
940                    None => unreachable!("MessageStream never returns Ready(None)"),
941                    Some(Err(error)) => emit!(KafkaReadError { error }),
942                    Some(Ok(_msg)) => {
943                        unreachable!("Messages are consumed in dedicated tasks for each partition.")
944                    }
945                },
946            }
947        }
948    });
949}
950
951async fn parse_message(
952    msg: BorrowedMessage<'_>,
953    decoder: Decoder,
954    keys: &'_ Keys,
955    out: &mut SourceSender,
956    acknowledgements: bool,
957    finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
958    log_namespace: LogNamespace,
959) {
960    if let Some((count, stream)) = parse_stream(&msg, decoder, keys, log_namespace) {
961        let (batch, receiver) = BatchNotifier::new_with_receiver();
962        let mut stream = stream.map(|event| {
963            // All acknowledgements flow through the normal Finalizer stream so
964            // that they can be handled in one place, but are only tied to the
965            // batch when acknowledgements are enabled
966            if acknowledgements {
967                event.with_batch_notifier(&batch)
968            } else {
969                event
970            }
971        });
972        match out.send_event_stream(&mut stream).await {
973            Err(_) => {
974                emit!(StreamClosedError { count });
975            }
976            Ok(_) => {
977                // Drop stream to avoid borrowing `msg`: "[...] borrow might be used
978                // here, when `stream` is dropped and runs the destructor [...]".
979                drop(stream);
980                if let Some(f) = finalizer.as_ref() {
981                    f.add(msg.into(), receiver)
982                }
983            }
984        }
985    }
986}
987
988// Turn the received message into a stream of parsed events.
989fn parse_stream<'a>(
990    msg: &BorrowedMessage<'a>,
991    decoder: Decoder,
992    keys: &'a Keys,
993    log_namespace: LogNamespace,
994) -> Option<(usize, impl Stream<Item = Event> + 'a + use<'a>)> {
995    let payload = msg.payload()?; // skip messages with empty payload
996
997    let rmsg = ReceivedMessage::from(msg);
998
999    let payload = Cursor::new(Bytes::copy_from_slice(payload));
1000
1001    let mut stream = DecoderFramedRead::with_capacity(payload, decoder, msg.payload_len());
1002    let (count, _) = stream.size_hint();
1003    let stream = stream! {
1004        while let Some(result) = stream.next().await {
1005            match result {
1006                Ok((events, _byte_size)) => {
1007                    emit!(KafkaEventsReceived {
1008                        count: events.len(),
1009                        byte_size: events.estimated_json_encoded_size_of(),
1010                        topic: &rmsg.topic,
1011                        partition: rmsg.partition,
1012                    });
1013                    for mut event in events {
1014                        rmsg.apply(keys, &mut event, log_namespace);
1015                        yield event;
1016                    }
1017                },
1018                Err(error) => {
1019                    // Error is logged by `codecs::Decoder`, no further handling
1020                    // is needed here.
1021                    if !error.can_continue() {
1022                        break;
1023                    }
1024                }
1025            }
1026        }
1027    }
1028    .boxed();
1029    Some((count, stream))
1030}
1031
1032#[derive(Clone, Debug)]
1033struct Keys {
1034    timestamp: Option<OwnedValuePath>,
1035    key_field: Option<OwnedValuePath>,
1036    topic: Option<OwnedValuePath>,
1037    partition: Option<OwnedValuePath>,
1038    offset: Option<OwnedValuePath>,
1039    headers: Option<OwnedValuePath>,
1040}
1041
1042impl Keys {
1043    fn from(schema: &LogSchema, config: &KafkaSourceConfig) -> Self {
1044        Self {
1045            timestamp: schema.timestamp_key().cloned(),
1046            key_field: config.key_field.path.clone(),
1047            topic: config.topic_key.path.clone(),
1048            partition: config.partition_key.path.clone(),
1049            offset: config.offset_key.path.clone(),
1050            headers: config.headers_key.path.clone(),
1051        }
1052    }
1053}
1054
1055struct ReceivedMessage {
1056    timestamp: Option<DateTime<Utc>>,
1057    key: Value,
1058    headers: ObjectMap,
1059    topic: String,
1060    partition: i32,
1061    offset: i64,
1062}
1063
1064impl ReceivedMessage {
1065    fn from(msg: &BorrowedMessage<'_>) -> Self {
1066        // Extract timestamp from kafka message
1067        let timestamp = msg
1068            .timestamp()
1069            .to_millis()
1070            .and_then(|millis| Utc.timestamp_millis_opt(millis).latest());
1071
1072        let key = msg
1073            .key()
1074            .map(|key| Value::from(Bytes::from(key.to_owned())))
1075            .unwrap_or(Value::Null);
1076
1077        let mut headers_map = ObjectMap::new();
1078        if let Some(headers) = msg.headers() {
1079            for header in headers.iter() {
1080                if let Some(value) = header.value {
1081                    headers_map.insert(
1082                        header.key.into(),
1083                        Value::from(Bytes::from(value.to_owned())),
1084                    );
1085                }
1086            }
1087        }
1088
1089        Self {
1090            timestamp,
1091            key,
1092            headers: headers_map,
1093            topic: msg.topic().to_string(),
1094            partition: msg.partition(),
1095            offset: msg.offset(),
1096        }
1097    }
1098
1099    fn apply(&self, keys: &Keys, event: &mut Event, log_namespace: LogNamespace) {
1100        if let Event::Log(log) = event {
1101            match log_namespace {
1102                LogNamespace::Vector => {
1103                    // We'll only use this function in Vector namespaces because we don't want
1104                    // "timestamp" to be set automatically in legacy namespaces. In legacy
1105                    // namespaces, the "timestamp" field corresponds to the Kafka message, not the
1106                    // timestamp when the event was processed.
1107                    log_namespace.insert_standard_vector_source_metadata(
1108                        log,
1109                        KafkaSourceConfig::NAME,
1110                        Utc::now(),
1111                    );
1112                }
1113                LogNamespace::Legacy => {
1114                    if let Some(source_type_key) = log_schema().source_type_key_target_path() {
1115                        log.insert(source_type_key, KafkaSourceConfig::NAME);
1116                    }
1117                }
1118            }
1119
1120            log_namespace.insert_source_metadata(
1121                KafkaSourceConfig::NAME,
1122                log,
1123                keys.key_field.as_ref().map(LegacyKey::Overwrite),
1124                path!("message_key"),
1125                self.key.clone(),
1126            );
1127
1128            log_namespace.insert_source_metadata(
1129                KafkaSourceConfig::NAME,
1130                log,
1131                keys.timestamp.as_ref().map(LegacyKey::Overwrite),
1132                path!("timestamp"),
1133                self.timestamp,
1134            );
1135
1136            log_namespace.insert_source_metadata(
1137                KafkaSourceConfig::NAME,
1138                log,
1139                keys.topic.as_ref().map(LegacyKey::Overwrite),
1140                path!("topic"),
1141                self.topic.clone(),
1142            );
1143
1144            log_namespace.insert_source_metadata(
1145                KafkaSourceConfig::NAME,
1146                log,
1147                keys.partition.as_ref().map(LegacyKey::Overwrite),
1148                path!("partition"),
1149                self.partition,
1150            );
1151
1152            log_namespace.insert_source_metadata(
1153                KafkaSourceConfig::NAME,
1154                log,
1155                keys.offset.as_ref().map(LegacyKey::Overwrite),
1156                path!("offset"),
1157                self.offset,
1158            );
1159
1160            log_namespace.insert_source_metadata(
1161                KafkaSourceConfig::NAME,
1162                log,
1163                keys.headers.as_ref().map(LegacyKey::Overwrite),
1164                path!("headers"),
1165                self.headers.clone(),
1166            );
1167        }
1168    }
1169}
1170
1171#[derive(Debug, Eq, PartialEq, Hash)]
1172struct FinalizerEntry {
1173    topic: String,
1174    partition: i32,
1175    offset: i64,
1176}
1177
1178impl<'a> From<BorrowedMessage<'a>> for FinalizerEntry {
1179    fn from(msg: BorrowedMessage<'a>) -> Self {
1180        Self {
1181            topic: msg.topic().into(),
1182            partition: msg.partition(),
1183            offset: msg.offset(),
1184        }
1185    }
1186}
1187
1188fn create_consumer(
1189    config: &KafkaSourceConfig,
1190    acknowledgements: bool,
1191) -> crate::Result<(
1192    StreamConsumer<KafkaSourceContext>,
1193    UnboundedReceiver<KafkaCallback>,
1194)> {
1195    let mut client_config = ClientConfig::new();
1196    client_config
1197        .set("group.id", &config.group_id)
1198        .set("bootstrap.servers", &config.bootstrap_servers)
1199        .set("auto.offset.reset", &config.auto_offset_reset)
1200        .set(
1201            "session.timeout.ms",
1202            config.session_timeout_ms.as_millis().to_string(),
1203        )
1204        .set(
1205            "socket.timeout.ms",
1206            config.socket_timeout_ms.as_millis().to_string(),
1207        )
1208        .set(
1209            "fetch.wait.max.ms",
1210            config.fetch_wait_max_ms.as_millis().to_string(),
1211        )
1212        .set("enable.partition.eof", "false")
1213        .set("enable.auto.commit", "true")
1214        .set(
1215            "auto.commit.interval.ms",
1216            config.commit_interval_ms.as_millis().to_string(),
1217        )
1218        .set("enable.auto.offset.store", "false")
1219        .set("statistics.interval.ms", "1000")
1220        .set("client.id", "vector");
1221
1222    config.auth.apply(&mut client_config)?;
1223
1224    if let Some(librdkafka_options) = &config.librdkafka_options {
1225        for (key, value) in librdkafka_options {
1226            client_config.set(key.as_str(), value.as_str());
1227        }
1228    }
1229
1230    let (callbacks, callback_rx) = mpsc::unbounded_channel();
1231    let consumer = client_config
1232        .create_with_context::<_, StreamConsumer<_>>(KafkaSourceContext::new(
1233            config.metrics.topic_lag_metric,
1234            acknowledgements,
1235            callbacks,
1236            Span::current(),
1237        ))
1238        .context(CreateSnafu)?;
1239
1240    Ok((consumer, callback_rx))
1241}
1242
1243type TopicPartition = (String, i32);
1244
1245/// Status returned by partition consumer tasks, allowing the coordination task
1246/// to differentiate between a consumer exiting normally (after receiving an end
1247/// signal) and exiting when it reaches the end of a partition
1248#[derive(PartialEq)]
1249enum PartitionConsumerStatus {
1250    NormalExit,
1251    PartitionEOF,
1252}
1253
1254enum KafkaCallback {
1255    PartitionsAssigned(Vec<TopicPartition>, SyncSender<()>),
1256    PartitionsRevoked(Vec<TopicPartition>, SyncSender<()>),
1257    ShuttingDown(SyncSender<()>),
1258}
1259
1260struct KafkaSourceContext {
1261    acknowledgements: bool,
1262    stats: kafka::KafkaStatisticsContext,
1263
1264    /// A callback channel used to coordinate between the main consumer task and the acknowledgement task
1265    callbacks: UnboundedSender<KafkaCallback>,
1266
1267    /// A weak reference to the consumer, so that we can commit offsets during a rebalance operation
1268    consumer: OnceLock<Weak<StreamConsumer<KafkaSourceContext>>>,
1269}
1270
1271impl KafkaSourceContext {
1272    fn new(
1273        expose_lag_metrics: bool,
1274        acknowledgements: bool,
1275        callbacks: UnboundedSender<KafkaCallback>,
1276        span: Span,
1277    ) -> Self {
1278        Self {
1279            stats: kafka::KafkaStatisticsContext {
1280                expose_lag_metrics,
1281                span,
1282            },
1283            acknowledgements,
1284            consumer: OnceLock::default(),
1285            callbacks,
1286        }
1287    }
1288
1289    fn shutdown(&self) {
1290        let (send, rendezvous) = sync_channel(0);
1291        if self
1292            .callbacks
1293            .send(KafkaCallback::ShuttingDown(send))
1294            .is_ok()
1295        {
1296            while rendezvous.recv().is_ok() {
1297                self.commit_consumer_state();
1298            }
1299        }
1300    }
1301
1302    /// Emit a PartitionsAssigned callback with the topic-partitions to be consumed,
1303    /// and block until confirmation is received that a stream and consumer for
1304    /// each topic-partition has been set up. This function blocks until the
1305    /// rendezvous channel sender is dropped by the callback handler.
1306    fn consume_partitions(&self, tpl: &TopicPartitionList) {
1307        // TODO  workaround for https://github.com/fede1024/rust-rdkafka/issues/681
1308        if tpl.capacity() == 0 {
1309            return;
1310        }
1311        let (send, rendezvous) = sync_channel(0);
1312        let _ = self.callbacks.send(KafkaCallback::PartitionsAssigned(
1313            tpl.elements()
1314                .iter()
1315                .map(|tp| (tp.topic().into(), tp.partition()))
1316                .collect(),
1317            send,
1318        ));
1319
1320        while rendezvous.recv().is_ok() {
1321            // no-op: wait for partition assignment handler to complete
1322        }
1323    }
1324
1325    /// Emit a PartitionsRevoked callback and block until confirmation is
1326    /// received that acknowledgements have been processed for each of them.
1327    /// The rendezvous channel used in the callback can send multiple times to
1328    /// signal individual partitions completing. This function blocks until the
1329    /// sender is dropped by the callback handler.
1330    fn revoke_partitions(&self, tpl: &TopicPartitionList) {
1331        let (send, rendezvous) = sync_channel(0);
1332        let _ = self.callbacks.send(KafkaCallback::PartitionsRevoked(
1333            tpl.elements()
1334                .iter()
1335                .map(|tp| (tp.topic().into(), tp.partition()))
1336                .collect(),
1337            send,
1338        ));
1339
1340        while rendezvous.recv().is_ok() {
1341            self.commit_consumer_state();
1342        }
1343    }
1344
1345    fn commit_consumer_state(&self) {
1346        if let Some(consumer) = self
1347            .consumer
1348            .get()
1349            .expect("Consumer reference was not initialized.")
1350            .upgrade()
1351        {
1352            match consumer.commit_consumer_state(CommitMode::Sync) {
1353                Ok(_) | Err(KafkaError::ConsumerCommit(RDKafkaErrorCode::NoOffset)) => {
1354                    /* Success, or nothing to do - yay \0/ */
1355                }
1356                Err(error) => emit!(KafkaOffsetUpdateError { error }),
1357            }
1358        }
1359    }
1360}
1361
1362impl ClientContext for KafkaSourceContext {
1363    fn stats(&self, statistics: Statistics) {
1364        self.stats.stats(statistics)
1365    }
1366}
1367
1368impl ConsumerContext for KafkaSourceContext {
1369    fn pre_rebalance(&self, _base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance) {
1370        match rebalance {
1371            Rebalance::Assign(tpl) => self.consume_partitions(tpl),
1372
1373            Rebalance::Revoke(tpl) => {
1374                self.revoke_partitions(tpl);
1375                self.commit_consumer_state();
1376            }
1377
1378            Rebalance::Error(message) => {
1379                error!("Error during Kafka consumer group rebalance: {}.", message);
1380            }
1381        }
1382    }
1383}
1384
1385#[cfg(test)]
1386mod test {
1387    use vector_lib::{lookup::OwnedTargetPath, schema::Definition};
1388
1389    use super::*;
1390
1391    pub fn kafka_host() -> String {
1392        std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
1393    }
1394    pub fn kafka_port() -> u16 {
1395        let port = std::env::var("KAFKA_PORT").unwrap_or_else(|_| "9091".into());
1396        port.parse().expect("Invalid port number")
1397    }
1398
1399    pub fn kafka_address() -> String {
1400        format!("{}:{}", kafka_host(), kafka_port())
1401    }
1402
1403    #[test]
1404    fn generate_config() {
1405        crate::test_util::test_generate_config::<KafkaSourceConfig>();
1406    }
1407
1408    pub(super) fn make_config(
1409        topic: &str,
1410        group: &str,
1411        log_namespace: LogNamespace,
1412        librdkafka_options: Option<HashMap<String, String>>,
1413    ) -> KafkaSourceConfig {
1414        KafkaSourceConfig {
1415            bootstrap_servers: kafka_address(),
1416            topics: vec![topic.into()],
1417            group_id: group.into(),
1418            auto_offset_reset: "beginning".into(),
1419            session_timeout_ms: Duration::from_millis(6000),
1420            commit_interval_ms: Duration::from_millis(1),
1421            librdkafka_options,
1422            key_field: default_key_field(),
1423            topic_key: default_topic_key(),
1424            partition_key: default_partition_key(),
1425            offset_key: default_offset_key(),
1426            headers_key: default_headers_key(),
1427            socket_timeout_ms: Duration::from_millis(60000),
1428            fetch_wait_max_ms: Duration::from_millis(100),
1429            log_namespace: Some(log_namespace == LogNamespace::Vector),
1430            ..Default::default()
1431        }
1432    }
1433
1434    #[test]
1435    fn test_output_schema_definition_vector_namespace() {
1436        let definitions = make_config("topic", "group", LogNamespace::Vector, None)
1437            .outputs(LogNamespace::Vector)
1438            .remove(0)
1439            .schema_definition(true);
1440
1441        assert_eq!(
1442            definitions,
1443            Some(
1444                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1445                    .with_meaning(OwnedTargetPath::event_root(), "message")
1446                    .with_metadata_field(
1447                        &owned_value_path!("kafka", "timestamp"),
1448                        Kind::timestamp(),
1449                        Some("timestamp")
1450                    )
1451                    .with_metadata_field(
1452                        &owned_value_path!("kafka", "message_key"),
1453                        Kind::bytes(),
1454                        None
1455                    )
1456                    .with_metadata_field(&owned_value_path!("kafka", "topic"), Kind::bytes(), None)
1457                    .with_metadata_field(
1458                        &owned_value_path!("kafka", "partition"),
1459                        Kind::bytes(),
1460                        None
1461                    )
1462                    .with_metadata_field(&owned_value_path!("kafka", "offset"), Kind::bytes(), None)
1463                    .with_metadata_field(
1464                        &owned_value_path!("kafka", "headers"),
1465                        Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1466                        None
1467                    )
1468                    .with_metadata_field(
1469                        &owned_value_path!("vector", "ingest_timestamp"),
1470                        Kind::timestamp(),
1471                        None
1472                    )
1473                    .with_metadata_field(
1474                        &owned_value_path!("vector", "source_type"),
1475                        Kind::bytes(),
1476                        None
1477                    )
1478            )
1479        )
1480    }
1481
1482    #[test]
1483    fn test_output_schema_definition_legacy_namespace() {
1484        let definitions = make_config("topic", "group", LogNamespace::Legacy, None)
1485            .outputs(LogNamespace::Legacy)
1486            .remove(0)
1487            .schema_definition(true);
1488
1489        assert_eq!(
1490            definitions,
1491            Some(
1492                Definition::new_with_default_metadata(Kind::json(), [LogNamespace::Legacy])
1493                    .unknown_fields(Kind::undefined())
1494                    .with_event_field(
1495                        &owned_value_path!("message"),
1496                        Kind::bytes(),
1497                        Some("message")
1498                    )
1499                    .with_event_field(
1500                        &owned_value_path!("timestamp"),
1501                        Kind::timestamp(),
1502                        Some("timestamp")
1503                    )
1504                    .with_event_field(&owned_value_path!("message_key"), Kind::bytes(), None)
1505                    .with_event_field(&owned_value_path!("topic"), Kind::bytes(), None)
1506                    .with_event_field(&owned_value_path!("partition"), Kind::bytes(), None)
1507                    .with_event_field(&owned_value_path!("offset"), Kind::bytes(), None)
1508                    .with_event_field(
1509                        &owned_value_path!("headers"),
1510                        Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1511                        None
1512                    )
1513                    .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1514            )
1515        )
1516    }
1517
1518    #[tokio::test]
1519    async fn consumer_create_ok() {
1520        let config = make_config("topic", "group", LogNamespace::Legacy, None);
1521        assert!(create_consumer(&config, true).is_ok());
1522    }
1523
1524    #[tokio::test]
1525    async fn consumer_create_incorrect_auto_offset_reset() {
1526        let config = KafkaSourceConfig {
1527            auto_offset_reset: "incorrect-auto-offset-reset".to_string(),
1528            ..make_config("topic", "group", LogNamespace::Legacy, None)
1529        };
1530        assert!(create_consumer(&config, true).is_err());
1531    }
1532}
1533
1534#[cfg(feature = "kafka-integration-tests")]
1535#[cfg(test)]
1536mod integration_test {
1537    use std::time::Duration;
1538
1539    use chrono::{DateTime, SubsecRound, Utc};
1540    use futures::Stream;
1541    use futures_util::stream::FuturesUnordered;
1542    use rdkafka::{
1543        Offset, TopicPartitionList,
1544        admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
1545        client::DefaultClientContext,
1546        config::{ClientConfig, FromClientConfig},
1547        consumer::BaseConsumer,
1548        message::{Header, OwnedHeaders},
1549        producer::{FutureProducer, FutureRecord},
1550        util::Timeout,
1551    };
1552    use stream_cancel::{Trigger, Tripwire};
1553    use tokio::time::sleep;
1554    use vector_lib::event::EventStatus;
1555    use vrl::{event_path, value};
1556
1557    use super::{test::*, *};
1558    use crate::{
1559        SourceSender,
1560        event::{EventArray, EventContainer},
1561        shutdown::ShutdownSignal,
1562        test_util::{collect_n, components::assert_source_compliance, random_string},
1563    };
1564
1565    const KEY: &str = "my key";
1566    const TEXT: &str = "my message";
1567    const HEADER_KEY: &str = "my header";
1568    const HEADER_VALUE: &str = "my header value";
1569
1570    fn kafka_test_topic() -> String {
1571        std::env::var("KAFKA_TEST_TOPIC")
1572            .unwrap_or_else(|_| format!("test-topic-{}", random_string(10)))
1573    }
1574    fn kafka_max_bytes() -> String {
1575        std::env::var("KAFKA_MAX_BYTES").unwrap_or_else(|_| "1024".into())
1576    }
1577
1578    fn client_config<T: FromClientConfig>(group: Option<&str>) -> T {
1579        let mut client = ClientConfig::new();
1580        client.set("bootstrap.servers", kafka_address());
1581        client.set("produce.offset.report", "true");
1582        client.set("message.timeout.ms", "5000");
1583        client.set("auto.commit.interval.ms", "1");
1584        if let Some(group) = group {
1585            client.set("group.id", group);
1586        }
1587        client.create().expect("Producer creation error")
1588    }
1589
1590    async fn send_events(topic: String, partitions: i32, count: usize) -> DateTime<Utc> {
1591        let now = Utc::now();
1592        let timestamp = now.timestamp_millis();
1593
1594        let producer: &FutureProducer = &client_config(None);
1595        let topic_name = topic.as_ref();
1596
1597        create_topic(topic_name, partitions).await;
1598
1599        (0..count)
1600            .map(|i| async move {
1601                let text = format!("{TEXT} {i:03}");
1602                let key = format!("{KEY} {i}");
1603                let record = FutureRecord::to(topic_name)
1604                    .payload(&text)
1605                    .key(&key)
1606                    .timestamp(timestamp)
1607                    .headers(OwnedHeaders::new().insert(Header {
1608                        key: HEADER_KEY,
1609                        value: Some(HEADER_VALUE),
1610                    }));
1611                if let Err(error) = producer.send(record, Timeout::Never).await {
1612                    panic!("Cannot send event to Kafka: {error:?}");
1613                }
1614            })
1615            .collect::<FuturesUnordered<_>>()
1616            .collect::<Vec<_>>()
1617            .await;
1618
1619        now
1620    }
1621
1622    async fn send_to_test_topic(partitions: i32, count: usize) -> (String, String, DateTime<Utc>) {
1623        let topic = kafka_test_topic();
1624        let group_id = format!("test-group-{}", random_string(10));
1625
1626        let sent_at = send_events(topic.clone(), partitions, count).await;
1627
1628        (topic, group_id, sent_at)
1629    }
1630
1631    #[tokio::test]
1632    async fn consumes_event_with_acknowledgements() {
1633        send_receive(true, |_| false, 10, LogNamespace::Legacy).await;
1634    }
1635
1636    #[tokio::test]
1637    async fn consumes_event_with_acknowledgements_vector_namespace() {
1638        send_receive(true, |_| false, 10, LogNamespace::Vector).await;
1639    }
1640
1641    #[tokio::test]
1642    async fn consumes_event_without_acknowledgements() {
1643        send_receive(false, |_| false, 10, LogNamespace::Legacy).await;
1644    }
1645
1646    #[tokio::test]
1647    async fn consumes_event_without_acknowledgements_vector_namespace() {
1648        send_receive(false, |_| false, 10, LogNamespace::Vector).await;
1649    }
1650
1651    #[tokio::test]
1652    async fn handles_one_negative_acknowledgement() {
1653        send_receive(true, |n| n == 2, 10, LogNamespace::Legacy).await;
1654    }
1655
1656    #[tokio::test]
1657    async fn handles_one_negative_acknowledgement_vector_namespace() {
1658        send_receive(true, |n| n == 2, 10, LogNamespace::Vector).await;
1659    }
1660
1661    #[tokio::test]
1662    async fn handles_permanent_negative_acknowledgement() {
1663        send_receive(true, |n| n >= 2, 2, LogNamespace::Legacy).await;
1664    }
1665
1666    #[tokio::test]
1667    async fn handles_permanent_negative_acknowledgement_vector_namespace() {
1668        send_receive(true, |n| n >= 2, 2, LogNamespace::Vector).await;
1669    }
1670
1671    async fn send_receive(
1672        acknowledgements: bool,
1673        error_at: impl Fn(usize) -> bool,
1674        receive_count: usize,
1675        log_namespace: LogNamespace,
1676    ) {
1677        const SEND_COUNT: usize = 10;
1678
1679        let topic = format!("test-topic-{}", random_string(10));
1680        let group_id = format!("test-group-{}", random_string(10));
1681        let config = make_config(&topic, &group_id, log_namespace, None);
1682
1683        let now = send_events(topic.clone(), 1, 10).await;
1684
1685        let events = assert_source_compliance(&["protocol", "topic", "partition"], async move {
1686            let (tx, rx) = SourceSender::new_test_errors(error_at);
1687            let (trigger_shutdown, shutdown_done) =
1688                spawn_kafka(tx, config, acknowledgements, false, log_namespace);
1689            let events = collect_n(rx, SEND_COUNT).await;
1690            // Yield to the finalization task to let it collect the
1691            // batch status receivers before signalling the shutdown.
1692            tokio::task::yield_now().await;
1693            drop(trigger_shutdown);
1694            shutdown_done.await;
1695
1696            events
1697        })
1698        .await;
1699
1700        let offset = fetch_tpl_offset(&group_id, &topic, 0);
1701        assert_eq!(offset, Offset::from_raw(receive_count as i64));
1702
1703        assert_eq!(events.len(), SEND_COUNT);
1704        for (i, event) in events.into_iter().enumerate() {
1705            if let LogNamespace::Legacy = log_namespace {
1706                assert_eq!(
1707                    event.as_log()[log_schema().message_key().unwrap().to_string()],
1708                    format!("{TEXT} {i:03}").into()
1709                );
1710                assert_eq!(event.as_log()["message_key"], format!("{KEY} {i}").into());
1711                assert_eq!(
1712                    event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1713                    "kafka".into()
1714                );
1715                assert_eq!(
1716                    event.as_log()[log_schema().timestamp_key().unwrap().to_string()],
1717                    now.trunc_subsecs(3).into()
1718                );
1719                assert_eq!(event.as_log()["topic"], topic.clone().into());
1720                assert!(event.as_log().contains("partition"));
1721                assert!(event.as_log().contains("offset"));
1722                let mut expected_headers = ObjectMap::new();
1723                expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1724                assert_eq!(event.as_log()["headers"], Value::from(expected_headers));
1725            } else {
1726                let meta = event.as_log().metadata().value();
1727
1728                assert_eq!(
1729                    meta.get(path!("vector", "source_type")).unwrap(),
1730                    &value!(KafkaSourceConfig::NAME)
1731                );
1732                assert!(
1733                    meta.get(path!("vector", "ingest_timestamp"))
1734                        .unwrap()
1735                        .is_timestamp()
1736                );
1737
1738                assert_eq!(
1739                    event.as_log().value(),
1740                    &value!(format!("{} {:03}", TEXT, i))
1741                );
1742                assert_eq!(
1743                    meta.get(path!("kafka", "message_key")).unwrap(),
1744                    &value!(format!("{} {}", KEY, i))
1745                );
1746
1747                assert_eq!(
1748                    meta.get(path!("kafka", "timestamp")).unwrap(),
1749                    &value!(now.trunc_subsecs(3))
1750                );
1751                assert_eq!(
1752                    meta.get(path!("kafka", "topic")).unwrap(),
1753                    &value!(topic.clone())
1754                );
1755                assert!(meta.get(path!("kafka", "partition")).unwrap().is_integer(),);
1756                assert!(meta.get(path!("kafka", "offset")).unwrap().is_integer(),);
1757
1758                let mut expected_headers = ObjectMap::new();
1759                expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1760                assert_eq!(
1761                    meta.get(path!("kafka", "headers")).unwrap(),
1762                    &Value::from(expected_headers)
1763                );
1764            }
1765        }
1766    }
1767
1768    fn make_rand_config() -> (String, String, KafkaSourceConfig) {
1769        let topic = format!("test-topic-{}", random_string(10));
1770        let group_id = format!("test-group-{}", random_string(10));
1771        let config = make_config(&topic, &group_id, LogNamespace::Legacy, None);
1772        (topic, group_id, config)
1773    }
1774
1775    fn delay_pipeline(
1776        id: usize,
1777        delay: Duration,
1778        status: EventStatus,
1779    ) -> (SourceSender, impl Stream<Item = EventArray> + Unpin) {
1780        let (pipe, recv) = SourceSender::new_test_sender_with_options(100, None);
1781        let recv = recv.into_stream();
1782        let recv = recv.then(move |item| async move {
1783            let mut events = item.events;
1784            events.iter_logs_mut().for_each(|log| {
1785                log.insert(event_path!("pipeline_id"), id.to_string());
1786            });
1787            sleep(delay).await;
1788            events.iter_events_mut().for_each(|mut event| {
1789                let metadata = event.metadata_mut();
1790                metadata.update_status(status);
1791                metadata.update_sources();
1792            });
1793            events
1794        });
1795        (pipe, Box::pin(recv))
1796    }
1797
1798    fn spawn_kafka(
1799        out: SourceSender,
1800        config: KafkaSourceConfig,
1801        acknowledgements: bool,
1802        eof: bool,
1803        log_namespace: LogNamespace,
1804    ) -> (Trigger, Tripwire) {
1805        let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
1806
1807        let decoder = DecodingConfig::new(
1808            config.framing.clone(),
1809            config.decoding.clone(),
1810            log_namespace,
1811        )
1812        .build()
1813        .unwrap();
1814
1815        let (consumer, callback_rx) = create_consumer(&config, acknowledgements).unwrap();
1816
1817        tokio::spawn(kafka_source(
1818            config,
1819            consumer,
1820            callback_rx,
1821            decoder,
1822            out,
1823            shutdown,
1824            eof,
1825            log_namespace,
1826        ));
1827        (trigger_shutdown, shutdown_done)
1828    }
1829
1830    fn fetch_tpl_offset(group_id: &str, topic: &str, partition: i32) -> Offset {
1831        let client: BaseConsumer = client_config(Some(group_id));
1832        client.subscribe(&[topic]).expect("Subscribing failed");
1833
1834        let mut tpl = TopicPartitionList::new();
1835        tpl.add_partition(topic, partition);
1836        client
1837            .committed_offsets(tpl, Duration::from_secs(1))
1838            .expect("Getting committed offsets failed")
1839            .find_partition(topic, partition)
1840            .expect("Missing topic/partition")
1841            .offset()
1842    }
1843
1844    async fn create_topic(topic: &str, partitions: i32) {
1845        let client: AdminClient<DefaultClientContext> = client_config(None);
1846        let topic_results = client
1847            .create_topics(
1848                [&NewTopic {
1849                    name: topic,
1850                    num_partitions: partitions,
1851                    replication: TopicReplication::Fixed(1),
1852                    config: vec![],
1853                }],
1854                &AdminOptions::default(),
1855            )
1856            .await
1857            .expect("create_topics failed");
1858
1859        for result in topic_results {
1860            if let Err((topic, err)) = result
1861                && err != rdkafka::types::RDKafkaErrorCode::TopicAlreadyExists
1862            {
1863                panic!("Creating a topic failed: {:?}", (topic, err))
1864            }
1865        }
1866    }
1867
1868    // Failure timeline:
1869    // - Topic exists on multiple partitions
1870    // - Consumer A connects to topic, is assigned both partitions
1871    // - Consumer A receives some messages
1872    // - Consumer B connects to topic
1873    // - Consumer A has one partition revoked (rebalance)
1874    // - Consumer B is assigned a partition
1875    // - Consumer A stores an order on the revoked partition
1876    // - Consumer B skips receiving messages?
1877    #[ignore]
1878    #[tokio::test]
1879    async fn handles_rebalance() {
1880        // The test plan here is to:
1881        // - Set up one source instance, feeding into a pipeline that delays acks.
1882        // - Wait a bit, and set up a second source instance. This should cause a rebalance.
1883        // - Wait further until all events will have been pulled down.
1884        // - Verify that all events are captured by the two sources, and that offsets are set right, etc.
1885
1886        // However this test, as written, does not actually cause the
1887        // conditions required to test this. We have had external
1888        // validation that the sink behaves properly on rebalance
1889        // events.  This test also requires the insertion of a small
1890        // delay into the source to guarantee the timing, which is not
1891        // suitable for production code.
1892
1893        const NEVENTS: usize = 200;
1894        const DELAY: u64 = 100;
1895
1896        let (topic, group_id, config) = make_rand_config();
1897        create_topic(&topic, 2).await;
1898
1899        let _send_start = send_events(topic.clone(), 1, NEVENTS).await;
1900
1901        let (tx, rx1) = delay_pipeline(1, Duration::from_millis(200), EventStatus::Delivered);
1902        let (trigger_shutdown1, shutdown_done1) =
1903            spawn_kafka(tx, config.clone(), true, false, LogNamespace::Legacy);
1904        let events1 = tokio::spawn(collect_n(rx1, NEVENTS));
1905
1906        sleep(Duration::from_secs(1)).await;
1907
1908        let (tx, rx2) = delay_pipeline(2, Duration::from_millis(DELAY), EventStatus::Delivered);
1909        let (trigger_shutdown2, shutdown_done2) =
1910            spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1911        let events2 = tokio::spawn(collect_n(rx2, NEVENTS));
1912
1913        sleep(Duration::from_secs(5)).await;
1914
1915        drop(trigger_shutdown1);
1916        let events1 = events1.await.unwrap();
1917        shutdown_done1.await;
1918
1919        sleep(Duration::from_secs(5)).await;
1920
1921        drop(trigger_shutdown2);
1922        let events2 = events2.await.unwrap();
1923        shutdown_done2.await;
1924
1925        sleep(Duration::from_secs(1)).await;
1926
1927        assert!(!events1.is_empty());
1928        assert!(!events2.is_empty());
1929
1930        match fetch_tpl_offset(&group_id, &topic, 0) {
1931            Offset::Offset(offset) => {
1932                assert!((offset as isize - events1.len() as isize).abs() <= 1)
1933            }
1934            o => panic!("Invalid offset for partition 0 {o:?}"),
1935        }
1936
1937        match fetch_tpl_offset(&group_id, &topic, 1) {
1938            Offset::Offset(offset) => {
1939                assert!((offset as isize - events2.len() as isize).abs() <= 1)
1940            }
1941            o => panic!("Invalid offset for partition 0 {o:?}"),
1942        }
1943
1944        let mut all_events = events1
1945            .into_iter()
1946            .chain(events2.into_iter())
1947            .flat_map(map_logs)
1948            .collect::<Vec<String>>();
1949        all_events.sort();
1950
1951        // Assert they are all in sequential order and no dupes, TODO
1952    }
1953
1954    #[tokio::test]
1955    async fn drains_acknowledgements_at_shutdown() {
1956        // 1. Send N events (if running against a pre-populated kafka topic, use send_count=0 and expect_count=expected number of messages; otherwise just set send_count)
1957        let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
1958            .unwrap_or_else(|_| "125000".into())
1959            .parse()
1960            .expect("Number of messages to send to kafka.");
1961        let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
1962            .unwrap_or_else(|_| format!("{send_count}"))
1963            .parse()
1964            .expect("Number of messages to expect consumers to process.");
1965        let delay_ms: u64 = std::env::var("KAFKA_SHUTDOWN_DELAY")
1966            .unwrap_or_else(|_| "2000".into())
1967            .parse()
1968            .expect("Number of milliseconds before shutting down first consumer.");
1969
1970        let (topic, group_id, _) = send_to_test_topic(1, send_count).await;
1971
1972        // 2. Run the kafka source to read some of the events
1973        // 3. Send a shutdown signal (at some point before all events are read)
1974        let mut opts = HashMap::new();
1975        // Set options to get partition EOF notifications, and fetch data in small/configurable size chunks
1976        opts.insert("enable.partition.eof".into(), "true".into());
1977        opts.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
1978        let events1 = {
1979            let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts.clone()));
1980            let (tx, rx) = SourceSender::new_test_errors(|_| false);
1981            let (trigger_shutdown, shutdown_done) =
1982                spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1983            let (events, _) = tokio::join!(rx.collect::<Vec<Event>>(), async move {
1984                sleep(Duration::from_millis(delay_ms)).await;
1985                drop(trigger_shutdown);
1986            });
1987            shutdown_done.await;
1988            events
1989        };
1990
1991        debug!("Consumer group.id: {}", &group_id);
1992        debug!(
1993            "First consumer read {} of {} messages.",
1994            events1.len(),
1995            expect_count
1996        );
1997
1998        // 4. Run the kafka source again to finish reading the events
1999        let events2 = {
2000            let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts));
2001            let (tx, rx) = SourceSender::new_test_errors(|_| false);
2002            let (trigger_shutdown, shutdown_done) =
2003                spawn_kafka(tx, config, true, true, LogNamespace::Legacy);
2004            let events = rx.collect::<Vec<Event>>().await;
2005            drop(trigger_shutdown);
2006            shutdown_done.await;
2007            events
2008        };
2009
2010        debug!(
2011            "Second consumer read {} of {} messages.",
2012            events2.len(),
2013            expect_count
2014        );
2015
2016        // 5. Total number of events processed should equal the number sent
2017        let total = events1.len() + events2.len();
2018        assert_ne!(
2019            events1.len(),
2020            0,
2021            "First batch of events should be non-zero (increase KAFKA_SHUTDOWN_DELAY?)"
2022        );
2023        assert_ne!(
2024            events2.len(),
2025            0,
2026            "Second batch of events should be non-zero (decrease KAFKA_SHUTDOWN_DELAY or increase KAFKA_SEND_COUNT?) "
2027        );
2028        assert_eq!(total, expect_count);
2029    }
2030
2031    async fn consume_with_rebalance(rebalance_strategy: String) {
2032        // 1. Send N events (if running against a pre-populated kafka topic, use send_count=0 and expect_count=expected number of messages; otherwise just set send_count)
2033        let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
2034            .unwrap_or_else(|_| "125000".into())
2035            .parse()
2036            .expect("Number of messages to send to kafka.");
2037        let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
2038            .unwrap_or_else(|_| format!("{send_count}"))
2039            .parse()
2040            .expect("Number of messages to expect consumers to process.");
2041        let delay_ms: u64 = std::env::var("KAFKA_CONSUMER_DELAY")
2042            .unwrap_or_else(|_| "2000".into())
2043            .parse()
2044            .expect("Number of milliseconds before shutting down first consumer.");
2045
2046        let (topic, group_id, _) = send_to_test_topic(6, send_count).await;
2047        debug!("Topic: {}", &topic);
2048        debug!("Consumer group.id: {}", &group_id);
2049
2050        // 2. Run the kafka source to read some of the events
2051        // 3. Start 2nd & 3rd consumers using the same group.id, triggering rebalance events
2052        let mut kafka_options = HashMap::new();
2053        kafka_options.insert("enable.partition.eof".into(), "true".into());
2054        kafka_options.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
2055        kafka_options.insert("partition.assignment.strategy".into(), rebalance_strategy);
2056        let config1 = make_config(
2057            &topic,
2058            &group_id,
2059            LogNamespace::Legacy,
2060            Some(kafka_options.clone()),
2061        );
2062        let config2 = config1.clone();
2063        let config3 = config1.clone();
2064        let config4 = config1.clone();
2065
2066        let (events1, events2, events3) = tokio::join!(
2067            async move {
2068                let (tx, rx) = SourceSender::new_test_errors(|_| false);
2069                let (_trigger_shutdown, _shutdown_done) =
2070                    spawn_kafka(tx, config1, true, true, LogNamespace::Legacy);
2071
2072                rx.collect::<Vec<Event>>().await
2073            },
2074            async move {
2075                sleep(Duration::from_millis(delay_ms)).await;
2076                let (tx, rx) = SourceSender::new_test_errors(|_| false);
2077                let (_trigger_shutdown, _shutdown_done) =
2078                    spawn_kafka(tx, config2, true, true, LogNamespace::Legacy);
2079
2080                rx.collect::<Vec<Event>>().await
2081            },
2082            async move {
2083                sleep(Duration::from_millis(delay_ms * 2)).await;
2084                let (tx, rx) = SourceSender::new_test_errors(|_| false);
2085                let (_trigger_shutdown, _shutdown_done) =
2086                    spawn_kafka(tx, config3, true, true, LogNamespace::Legacy);
2087
2088                rx.collect::<Vec<Event>>().await
2089            }
2090        );
2091
2092        let unconsumed = async move {
2093            let (tx, rx) = SourceSender::new_test_errors(|_| false);
2094            let (_trigger_shutdown, _shutdown_done) =
2095                spawn_kafka(tx, config4, true, true, LogNamespace::Legacy);
2096
2097            rx.collect::<Vec<Event>>().await
2098        }
2099        .await;
2100
2101        debug!(
2102            "First consumer read {} of {} messages.",
2103            events1.len(),
2104            expect_count
2105        );
2106
2107        debug!(
2108            "Second consumer read {} of {} messages.",
2109            events2.len(),
2110            expect_count
2111        );
2112        debug!(
2113            "Third consumer read {} of {} messages.",
2114            events3.len(),
2115            expect_count
2116        );
2117
2118        // 5. Total number of events processed should equal the number sent
2119        let total = events1.len() + events2.len() + events3.len();
2120        assert_ne!(
2121            events1.len(),
2122            0,
2123            "First batch of events should be non-zero (increase delay?)"
2124        );
2125        assert_ne!(
2126            events2.len(),
2127            0,
2128            "Second batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2129        );
2130        assert_ne!(
2131            events3.len(),
2132            0,
2133            "Third batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2134        );
2135        assert_eq!(
2136            unconsumed.len(),
2137            0,
2138            "The first set of consumers should consume and ack all messages."
2139        );
2140        assert_eq!(total, expect_count);
2141    }
2142
2143    #[tokio::test]
2144    async fn drains_acknowledgements_during_rebalance_default_assignments() {
2145        // the default, eager rebalance strategies generally result in more revocations
2146        consume_with_rebalance("range,roundrobin".into()).await;
2147    }
2148    #[tokio::test]
2149    async fn drains_acknowledgements_during_rebalance_sticky_assignments() {
2150        // Cooperative rebalance strategies generally result in fewer revokes,
2151        // as only reassigned partitions are revoked
2152        consume_with_rebalance("cooperative-sticky".into()).await;
2153    }
2154
2155    fn map_logs(events: EventArray) -> impl Iterator<Item = String> {
2156        events.into_events().map(|event| {
2157            let log = event.into_log();
2158            format!(
2159                "{} {} {} {}",
2160                log["message"].to_string_lossy(),
2161                log["topic"].to_string_lossy(),
2162                log["partition"].to_string_lossy(),
2163                log["offset"].to_string_lossy(),
2164            )
2165        })
2166    }
2167}