vector/sources/
kafka.rs

1use std::{
2    collections::{HashMap, HashSet},
3    io::Cursor,
4    pin::Pin,
5    sync::{
6        Arc, OnceLock, Weak,
7        mpsc::{SyncSender, sync_channel},
8    },
9    time::Duration,
10};
11
12use async_stream::stream;
13use bytes::Bytes;
14use chrono::{DateTime, TimeZone, Utc};
15use futures::{Stream, StreamExt};
16use futures_util::future::OptionFuture;
17use rdkafka::{
18    ClientConfig, ClientContext, Statistics, TopicPartitionList,
19    consumer::{
20        BaseConsumer, CommitMode, Consumer, ConsumerContext, Rebalance, StreamConsumer,
21        stream_consumer::StreamPartitionQueue,
22    },
23    error::KafkaError,
24    message::{BorrowedMessage, Headers as _, Message},
25    types::RDKafkaErrorCode,
26};
27use serde_with::serde_as;
28use snafu::{ResultExt, Snafu};
29use tokio::{
30    runtime::Handle,
31    sync::{
32        mpsc::{self, UnboundedReceiver, UnboundedSender},
33        oneshot,
34    },
35    task::JoinSet,
36    time::Sleep,
37};
38use tokio_util::codec::FramedRead;
39use tracing::{Instrument, Span};
40use vector_lib::{
41    EstimatedJsonEncodedSizeOf,
42    codecs::{
43        StreamDecodingError,
44        decoding::{DeserializerConfig, FramingConfig},
45    },
46    config::{LegacyKey, LogNamespace},
47    configurable::configurable_component,
48    finalizer::OrderedFinalizer,
49    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
50};
51use vrl::value::{Kind, ObjectMap, kind::Collection};
52
53use crate::{
54    SourceSender,
55    codecs::{Decoder, DecodingConfig},
56    config::{
57        LogSchema, SourceAcknowledgementsConfig, SourceConfig, SourceContext, SourceOutput,
58        log_schema,
59    },
60    event::{BatchNotifier, BatchStatus, Event, Value},
61    internal_events::{
62        KafkaBytesReceived, KafkaEventsReceived, KafkaOffsetUpdateError, KafkaReadError,
63        StreamClosedError,
64    },
65    kafka,
66    serde::{bool_or_struct, default_decoding, default_framing_message_based},
67    shutdown::ShutdownSignal,
68};
69
70#[derive(Debug, Snafu)]
71enum BuildError {
72    #[snafu(display("The drain_timeout_ms ({}) must be less than session_timeout_ms ({})", value, session_timeout_ms.as_millis()))]
73    InvalidDrainTimeout {
74        value: u64,
75        session_timeout_ms: Duration,
76    },
77    #[snafu(display("Could not create Kafka consumer: {}", source))]
78    CreateError { source: rdkafka::error::KafkaError },
79    #[snafu(display("Could not subscribe to Kafka topics: {}", source))]
80    SubscribeError { source: rdkafka::error::KafkaError },
81}
82
83/// Metrics (beta) configuration.
84#[configurable_component]
85#[derive(Clone, Debug, Default)]
86struct Metrics {
87    /// Expose topic lag metrics for all topics and partitions. Metric names are `kafka_consumer_lag`.
88    pub topic_lag_metric: bool,
89}
90
91/// Configuration for the `kafka` source.
92#[serde_as]
93#[configurable_component(source("kafka", "Collect logs from Apache Kafka."))]
94#[derive(Clone, Debug, Derivative)]
95#[derivative(Default)]
96#[serde(deny_unknown_fields)]
97pub struct KafkaSourceConfig {
98    /// A comma-separated list of Kafka bootstrap servers.
99    ///
100    /// These are the servers in a Kafka cluster that a client should use to bootstrap its connection to the cluster,
101    /// allowing discovery of all the other hosts in the cluster.
102    ///
103    /// Must be in the form of `host:port`, and comma-separated.
104    #[configurable(metadata(docs::examples = "10.14.22.123:9092,10.14.23.332:9092"))]
105    bootstrap_servers: String,
106
107    /// The Kafka topics names to read events from.
108    ///
109    /// Regular expression syntax is supported if the topic begins with `^`.
110    #[configurable(metadata(
111        docs::examples = "^(prefix1|prefix2)-.+",
112        docs::examples = "topic-1",
113        docs::examples = "topic-2"
114    ))]
115    topics: Vec<String>,
116
117    /// The consumer group name to be used to consume events from Kafka.
118    #[configurable(metadata(docs::examples = "consumer-group-name"))]
119    group_id: String,
120
121    /// If offsets for consumer group do not exist, set them using this strategy.
122    ///
123    /// See the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for the `auto.offset.reset` option for further clarification.
124    #[serde(default = "default_auto_offset_reset")]
125    #[configurable(metadata(docs::examples = "example_auto_offset_reset_values()"))]
126    auto_offset_reset: String,
127
128    /// The Kafka session timeout.
129    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
130    #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
131    #[configurable(metadata(docs::advanced))]
132    #[serde(default = "default_session_timeout_ms")]
133    #[configurable(metadata(docs::human_name = "Session Timeout"))]
134    session_timeout_ms: Duration,
135
136    /// Timeout to drain pending acknowledgements during shutdown or a Kafka
137    /// consumer group rebalance.
138    ///
139    /// When Vector shuts down or the Kafka consumer group revokes partitions from this
140    /// consumer, wait a maximum of `drain_timeout_ms` for the source to
141    /// process pending acknowledgements. Must be less than `session_timeout_ms`
142    /// to ensure the consumer is not excluded from the group during a rebalance.
143    ///
144    /// Default value is half of `session_timeout_ms`.
145    #[serde(skip_serializing_if = "Option::is_none")]
146    #[configurable(metadata(docs::examples = 2500, docs::examples = 5000))]
147    #[configurable(metadata(docs::advanced))]
148    #[configurable(metadata(docs::human_name = "Drain Timeout"))]
149    drain_timeout_ms: Option<u64>,
150
151    /// Timeout for network requests.
152    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
153    #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))]
154    #[configurable(metadata(docs::advanced))]
155    #[serde(default = "default_socket_timeout_ms")]
156    #[configurable(metadata(docs::human_name = "Socket Timeout"))]
157    socket_timeout_ms: Duration,
158
159    /// Maximum time the broker may wait to fill the response.
160    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
161    #[configurable(metadata(docs::examples = 50, docs::examples = 100))]
162    #[configurable(metadata(docs::advanced))]
163    #[serde(default = "default_fetch_wait_max_ms")]
164    #[configurable(metadata(docs::human_name = "Max Fetch Wait Time"))]
165    fetch_wait_max_ms: Duration,
166
167    /// The frequency that the consumer offsets are committed (written) to offset storage.
168    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
169    #[serde(default = "default_commit_interval_ms")]
170    #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
171    #[configurable(metadata(docs::human_name = "Commit Interval"))]
172    commit_interval_ms: Duration,
173
174    /// Overrides the name of the log field used to add the message key to each event.
175    ///
176    /// The value is the message key of the Kafka message itself.
177    ///
178    /// By default, `"message_key"` is used.
179    #[serde(default = "default_key_field")]
180    #[configurable(metadata(docs::examples = "message_key"))]
181    key_field: OptionalValuePath,
182
183    /// Overrides the name of the log field used to add the topic to each event.
184    ///
185    /// The value is the topic from which the Kafka message was consumed from.
186    ///
187    /// By default, `"topic"` is used.
188    #[serde(default = "default_topic_key")]
189    #[configurable(metadata(docs::examples = "topic"))]
190    topic_key: OptionalValuePath,
191
192    /// Overrides the name of the log field used to add the partition to each event.
193    ///
194    /// The value is the partition from which the Kafka message was consumed from.
195    ///
196    /// By default, `"partition"` is used.
197    #[serde(default = "default_partition_key")]
198    #[configurable(metadata(docs::examples = "partition"))]
199    partition_key: OptionalValuePath,
200
201    /// Overrides the name of the log field used to add the offset to each event.
202    ///
203    /// The value is the offset of the Kafka message itself.
204    ///
205    /// By default, `"offset"` is used.
206    #[serde(default = "default_offset_key")]
207    #[configurable(metadata(docs::examples = "offset"))]
208    offset_key: OptionalValuePath,
209
210    /// Overrides the name of the log field used to add the headers to each event.
211    ///
212    /// The value is the headers of the Kafka message itself.
213    ///
214    /// By default, `"headers"` is used.
215    #[serde(default = "default_headers_key")]
216    #[configurable(metadata(docs::examples = "headers"))]
217    headers_key: OptionalValuePath,
218
219    /// Advanced options set directly on the underlying `librdkafka` client.
220    ///
221    /// See the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for details.
222    #[configurable(metadata(docs::examples = "example_librdkafka_options()"))]
223    #[configurable(metadata(docs::advanced))]
224    #[configurable(metadata(
225        docs::additional_props_description = "A librdkafka configuration option."
226    ))]
227    librdkafka_options: Option<HashMap<String, String>>,
228
229    #[serde(flatten)]
230    auth: kafka::KafkaAuthConfig,
231
232    #[configurable(derived)]
233    #[configurable(metadata(docs::advanced))]
234    #[serde(default = "default_framing_message_based")]
235    #[derivative(Default(value = "default_framing_message_based()"))]
236    framing: FramingConfig,
237
238    #[configurable(derived)]
239    #[serde(default = "default_decoding")]
240    #[derivative(Default(value = "default_decoding()"))]
241    decoding: DeserializerConfig,
242
243    #[configurable(derived)]
244    #[serde(default, deserialize_with = "bool_or_struct")]
245    acknowledgements: SourceAcknowledgementsConfig,
246
247    /// The namespace to use for logs. This overrides the global setting.
248    #[configurable(metadata(docs::hidden))]
249    #[serde(default)]
250    log_namespace: Option<bool>,
251
252    #[configurable(derived)]
253    #[serde(default)]
254    metrics: Metrics,
255}
256
257impl KafkaSourceConfig {
258    fn keys(&self) -> Keys {
259        Keys::from(log_schema(), self)
260    }
261}
262
263const fn default_session_timeout_ms() -> Duration {
264    Duration::from_millis(10000) // default in librdkafka
265}
266
267const fn default_socket_timeout_ms() -> Duration {
268    Duration::from_millis(60000) // default in librdkafka
269}
270
271const fn default_fetch_wait_max_ms() -> Duration {
272    Duration::from_millis(100) // default in librdkafka
273}
274
275const fn default_commit_interval_ms() -> Duration {
276    Duration::from_millis(5000)
277}
278
279fn default_auto_offset_reset() -> String {
280    "largest".into() // default in librdkafka
281}
282
283fn default_key_field() -> OptionalValuePath {
284    OptionalValuePath::from(owned_value_path!("message_key"))
285}
286
287fn default_topic_key() -> OptionalValuePath {
288    OptionalValuePath::from(owned_value_path!("topic"))
289}
290
291fn default_partition_key() -> OptionalValuePath {
292    OptionalValuePath::from(owned_value_path!("partition"))
293}
294
295fn default_offset_key() -> OptionalValuePath {
296    OptionalValuePath::from(owned_value_path!("offset"))
297}
298
299fn default_headers_key() -> OptionalValuePath {
300    OptionalValuePath::from(owned_value_path!("headers"))
301}
302
303const fn example_auto_offset_reset_values() -> [&'static str; 7] {
304    [
305        "smallest",
306        "earliest",
307        "beginning",
308        "largest",
309        "latest",
310        "end",
311        "error",
312    ]
313}
314
315fn example_librdkafka_options() -> HashMap<String, String> {
316    HashMap::<_, _>::from_iter([
317        ("client.id".to_string(), "${ENV_VAR}".to_string()),
318        ("fetch.error.backoff.ms".to_string(), "1000".to_string()),
319        ("socket.send.buffer.bytes".to_string(), "100".to_string()),
320    ])
321}
322
323impl_generate_config_from_default!(KafkaSourceConfig);
324
325#[async_trait::async_trait]
326#[typetag::serde(name = "kafka")]
327impl SourceConfig for KafkaSourceConfig {
328    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
329        let log_namespace = cx.log_namespace(self.log_namespace);
330
331        let decoder =
332            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
333                .build()?;
334        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
335
336        if let Some(d) = self.drain_timeout_ms {
337            snafu::ensure!(
338                Duration::from_millis(d) <= self.session_timeout_ms,
339                InvalidDrainTimeoutSnafu {
340                    value: d,
341                    session_timeout_ms: self.session_timeout_ms
342                }
343            );
344        }
345
346        let (consumer, callback_rx) = create_consumer(self, acknowledgements)?;
347
348        Ok(Box::pin(kafka_source(
349            self.clone(),
350            consumer,
351            callback_rx,
352            decoder,
353            cx.out,
354            cx.shutdown,
355            false,
356            log_namespace,
357        )))
358    }
359
360    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
361        let log_namespace = global_log_namespace.merge(self.log_namespace);
362        let keys = self.keys();
363
364        let schema_definition = self
365            .decoding
366            .schema_definition(log_namespace)
367            .with_standard_vector_source_metadata()
368            .with_source_metadata(
369                Self::NAME,
370                keys.timestamp.map(LegacyKey::Overwrite),
371                &owned_value_path!("timestamp"),
372                Kind::timestamp(),
373                Some("timestamp"),
374            )
375            .with_source_metadata(
376                Self::NAME,
377                keys.topic.clone().map(LegacyKey::Overwrite),
378                &owned_value_path!("topic"),
379                Kind::bytes(),
380                None,
381            )
382            .with_source_metadata(
383                Self::NAME,
384                keys.partition.clone().map(LegacyKey::Overwrite),
385                &owned_value_path!("partition"),
386                Kind::bytes(),
387                None,
388            )
389            .with_source_metadata(
390                Self::NAME,
391                keys.offset.clone().map(LegacyKey::Overwrite),
392                &owned_value_path!("offset"),
393                Kind::bytes(),
394                None,
395            )
396            .with_source_metadata(
397                Self::NAME,
398                keys.headers.clone().map(LegacyKey::Overwrite),
399                &owned_value_path!("headers"),
400                Kind::object(Collection::empty().with_unknown(Kind::bytes())),
401                None,
402            )
403            .with_source_metadata(
404                Self::NAME,
405                keys.key_field.clone().map(LegacyKey::Overwrite),
406                &owned_value_path!("message_key"),
407                Kind::bytes(),
408                None,
409            );
410
411        vec![SourceOutput::new_maybe_logs(
412            self.decoding.output_type(),
413            schema_definition,
414        )]
415    }
416
417    fn can_acknowledge(&self) -> bool {
418        true
419    }
420}
421
422#[allow(clippy::too_many_arguments)]
423async fn kafka_source(
424    config: KafkaSourceConfig,
425    consumer: StreamConsumer<KafkaSourceContext>,
426    callback_rx: UnboundedReceiver<KafkaCallback>,
427    decoder: Decoder,
428    out: SourceSender,
429    shutdown: ShutdownSignal,
430    eof: bool,
431    log_namespace: LogNamespace,
432) -> Result<(), ()> {
433    let span = info_span!("kafka_source");
434    let consumer = Arc::new(consumer);
435
436    consumer
437        .context()
438        .consumer
439        .set(Arc::downgrade(&consumer))
440        .expect("Error setting up consumer context.");
441
442    // EOF signal allowing the coordination task to tell the kafka client task when all partitions have reached EOF
443    let (eof_tx, eof_rx) = eof.then(oneshot::channel::<()>).unzip();
444
445    let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
446    if let Err(e) = consumer.subscribe(&topics).context(SubscribeSnafu) {
447        error!("{}", e);
448        return Err(());
449    }
450
451    let coordination_task = {
452        let span = span.clone();
453        let consumer = Arc::clone(&consumer);
454        let drain_timeout_ms = config
455            .drain_timeout_ms
456            .map_or(config.session_timeout_ms / 2, Duration::from_millis);
457        let consumer_state =
458            ConsumerStateInner::<Consuming>::new(config, decoder, out, log_namespace, span);
459        tokio::spawn(async move {
460            coordinate_kafka_callbacks(
461                consumer,
462                callback_rx,
463                consumer_state,
464                drain_timeout_ms,
465                eof_tx,
466            )
467            .await;
468        })
469    };
470
471    let client_task = {
472        let consumer = Arc::clone(&consumer);
473        tokio::task::spawn_blocking(move || {
474            let _enter = span.enter();
475            drive_kafka_consumer(consumer, shutdown, eof_rx);
476        })
477    };
478
479    _ = tokio::join!(client_task, coordination_task);
480    consumer.context().commit_consumer_state();
481
482    Ok(())
483}
484
485/// ConsumerStateInner implements a small struct/enum-based state machine.
486///
487/// With a ConsumerStateInner<Consuming>, the client is able to spawn new tasks
488/// when partitions are assigned. When a shutdown signal is received, or
489/// partitions are being revoked, the Consuming state is traded for a Draining
490/// state (and associated drain deadline future) via the `begin_drain` method
491///
492/// A ConsumerStateInner<Draining> keeps track of partitions that are expected
493/// to complete, and also owns the signal that, when dropped, indicates to the
494/// client driver task that it is safe to proceed with the rebalance or shutdown.
495/// When draining is complete, or the deadline is reached, Draining is traded in for
496/// either a Consuming (after a revoke) or Complete (in the case of shutdown) state,
497/// via the `finish_drain` method.
498///
499/// A ConsumerStateInner<Complete> is the final state, reached after a shutdown
500/// signal is received. This can not be traded for another state, and the
501/// coordination task should exit when this state is reached.
502struct ConsumerStateInner<S> {
503    config: KafkaSourceConfig,
504    decoder: Decoder,
505    out: SourceSender,
506    log_namespace: LogNamespace,
507    consumer_state: S,
508}
509struct Consuming {
510    /// The source's tracing Span used to instrument metrics emitted by consumer tasks
511    span: Span,
512}
513struct Draining {
514    /// The rendezvous channel sender from the revoke or shutdown callback. Sending on this channel
515    /// indicates to the kafka client task that one or more partitions have been drained, while
516    /// closing this channel indicates that all expected partitions have drained, or the drain
517    /// timeout has been reached.
518    signal: SyncSender<()>,
519
520    /// The set of topic-partition tasks that are required to complete during
521    /// the draining phase, populated at the beginning of a rebalance or shutdown.
522    /// Partitions that are being revoked, but not being actively consumed
523    /// (e.g. due to the consumer task exiting early) should not be included.
524    /// The draining phase is considered complete when this set is empty.
525    expect_drain: HashSet<TopicPartition>,
526
527    /// Whether the client is shutting down after draining. If set to true,
528    /// the `finish_drain` method will return a Complete state, otherwise
529    /// a Consuming state.
530    shutdown: bool,
531
532    /// The source's tracing Span used to instrument metrics emitted by consumer tasks
533    span: Span,
534}
535type OptionDeadline = OptionFuture<Pin<Box<Sleep>>>;
536enum ConsumerState {
537    Consuming(ConsumerStateInner<Consuming>),
538    Draining(ConsumerStateInner<Draining>),
539    Complete,
540}
541impl Draining {
542    fn new(signal: SyncSender<()>, shutdown: bool, span: Span) -> Self {
543        Self {
544            signal,
545            shutdown,
546            expect_drain: HashSet::new(),
547            span,
548        }
549    }
550
551    fn is_complete(&self) -> bool {
552        self.expect_drain.is_empty()
553    }
554}
555
556impl<C> ConsumerStateInner<C> {
557    fn complete(self, _deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
558        (None.into(), ConsumerState::Complete)
559    }
560}
561
562impl ConsumerStateInner<Consuming> {
563    const fn new(
564        config: KafkaSourceConfig,
565        decoder: Decoder,
566        out: SourceSender,
567        log_namespace: LogNamespace,
568        span: Span,
569    ) -> Self {
570        Self {
571            config,
572            decoder,
573            out,
574            log_namespace,
575            consumer_state: Consuming { span },
576        }
577    }
578
579    /// Spawn a task on the provided JoinSet to consume the kafka StreamPartitionQueue, and handle
580    /// acknowledgements for the messages consumed Returns a channel sender that can be used to
581    /// signal that the consumer should stop and drain pending acknowledgements, and an AbortHandle
582    /// that can be used to forcefully end the task.
583    fn consume_partition(
584        &self,
585        join_set: &mut JoinSet<(TopicPartition, PartitionConsumerStatus)>,
586        tp: TopicPartition,
587        consumer: Arc<StreamConsumer<KafkaSourceContext>>,
588        p: StreamPartitionQueue<KafkaSourceContext>,
589        acknowledgements: bool,
590        exit_eof: bool,
591    ) -> (oneshot::Sender<()>, tokio::task::AbortHandle) {
592        let keys = self.config.keys();
593        let decoder = self.decoder.clone();
594        let log_namespace = self.log_namespace;
595        let mut out = self.out.clone();
596
597        let (end_tx, mut end_signal) = oneshot::channel::<()>();
598
599        let handle = join_set.spawn(async move {
600            let mut messages = p.stream();
601            let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
602
603            // finalizer is the entry point for new pending acknowledgements;
604            // when it is dropped, no new messages will be consumed, and the
605            // task will end when it reaches the end of ack_stream
606            let mut finalizer = Some(finalizer);
607
608            let mut status = PartitionConsumerStatus::NormalExit;
609
610            loop {
611                tokio::select!(
612                    // Make sure to handle the acknowledgement stream before new messages to prevent
613                    // unbounded memory growth caused by those acks being handled slower than
614                    // incoming messages when the load is high.
615                    biased;
616
617                    // is_some() checks prevent polling end_signal after it completes
618                    _ = &mut end_signal, if finalizer.is_some() => {
619                        finalizer.take();
620                    },
621
622                    ack = ack_stream.next() => match ack {
623                        Some((status, entry)) => {
624                            if status == BatchStatus::Delivered
625                                && let Err(error) =  consumer.store_offset(&entry.topic, entry.partition, entry.offset) {
626                                    emit!(KafkaOffsetUpdateError { error });
627                                }
628                        }
629                        None if finalizer.is_none() => {
630                            debug!("Acknowledgement stream complete for partition {}:{}.", &tp.0, tp.1);
631                            break
632                        }
633                        None => {
634                            debug!("Acknowledgement stream empty for {}:{}", &tp.0, tp.1);
635                        }
636                    },
637
638                    message = messages.next(), if finalizer.is_some() => match message {
639                        None => unreachable!("MessageStream never calls Ready(None)"),
640                        Some(Err(error)) => match error {
641                            rdkafka::error::KafkaError::PartitionEOF(partition) if exit_eof => {
642                                debug!("EOF for partition {}.", partition);
643                                status = PartitionConsumerStatus::PartitionEOF;
644                                finalizer.take();
645                            },
646                            _ => emit!(KafkaReadError { error }),
647                        },
648                        Some(Ok(msg)) => {
649                            emit!(KafkaBytesReceived {
650                                byte_size: msg.payload_len(),
651                                protocol: "tcp",
652                                topic: msg.topic(),
653                                partition: msg.partition(),
654                            });
655                            parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await;
656                        }
657                    },
658                )
659            }
660            (tp, status)
661        }.instrument(self.consumer_state.span.clone()));
662        (end_tx, handle)
663    }
664
665    /// Consume self, and return a "Draining" ConsumerState, along with a Future
666    /// representing a drain deadline, based on max_drain_ms
667    fn begin_drain(
668        self,
669        max_drain_ms: Duration,
670        sig: SyncSender<()>,
671        shutdown: bool,
672    ) -> (OptionDeadline, ConsumerStateInner<Draining>) {
673        let deadline = Box::pin(tokio::time::sleep(max_drain_ms));
674
675        let draining = ConsumerStateInner {
676            config: self.config,
677            decoder: self.decoder,
678            out: self.out,
679            log_namespace: self.log_namespace,
680            consumer_state: Draining::new(sig, shutdown, self.consumer_state.span),
681        };
682
683        (Some(deadline).into(), draining)
684    }
685
686    pub const fn keep_consuming(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
687        (deadline, ConsumerState::Consuming(self))
688    }
689}
690
691impl ConsumerStateInner<Draining> {
692    /// Mark the given TopicPartition as being revoked, adding it to the set of
693    /// partitions expected to drain
694    fn revoke_partition(&mut self, tp: TopicPartition, end_signal: oneshot::Sender<()>) {
695        // Note that if this send() returns Err, it means the task has already
696        // ended, but the completion has not been processed yet (otherwise we wouldn't have access to the end_signal),
697        // so we should still add it to the "expect to drain" set
698        _ = end_signal.send(());
699        self.consumer_state.expect_drain.insert(tp);
700    }
701
702    /// Add the given TopicPartition to the set of known "drained" partitions,
703    /// i.e. the consumer has drained the acknowledgement channel. A signal is
704    /// sent on the signal channel, indicating to the client that offsets may be committed
705    fn partition_drained(&mut self, tp: TopicPartition) {
706        // This send() will only return Err if the receiver has already been disconnected (i.e. the
707        // kafka client task is no longer running)
708        _ = self.consumer_state.signal.send(());
709        self.consumer_state.expect_drain.remove(&tp);
710    }
711
712    /// Return true if all expected partitions have drained
713    fn is_drain_complete(&self) -> bool {
714        self.consumer_state.is_complete()
715    }
716
717    /// Finish partition drain mode. Consumes self and the drain deadline
718    /// future, and returns a "Consuming" or "Complete" ConsumerState
719    fn finish_drain(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
720        if self.consumer_state.shutdown {
721            self.complete(deadline)
722        } else {
723            (
724                None.into(),
725                ConsumerState::Consuming(ConsumerStateInner {
726                    config: self.config,
727                    decoder: self.decoder,
728                    out: self.out,
729                    log_namespace: self.log_namespace,
730                    consumer_state: Consuming {
731                        span: self.consumer_state.span,
732                    },
733                }),
734            )
735        }
736    }
737
738    pub const fn keep_draining(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
739        (deadline, ConsumerState::Draining(self))
740    }
741}
742
743async fn coordinate_kafka_callbacks(
744    consumer: Arc<StreamConsumer<KafkaSourceContext>>,
745    mut callbacks: UnboundedReceiver<KafkaCallback>,
746    consumer_state: ConsumerStateInner<Consuming>,
747    max_drain_ms: Duration,
748    mut eof: Option<oneshot::Sender<()>>,
749) {
750    let mut drain_deadline: OptionFuture<_> = None.into();
751    let mut consumer_state = ConsumerState::Consuming(consumer_state);
752
753    // A oneshot channel is used for each consumed partition, so that we can
754    // signal to that task to stop consuming, drain pending acks, and exit
755    let mut end_signals: HashMap<TopicPartition, oneshot::Sender<()>> = HashMap::new();
756
757    // The set of consumer tasks, each consuming a specific partition. The task
758    // is both consuming the messages (passing them to the output stream) _and_
759    // processing the corresponding acknowledgement stream. A consumer task
760    // should completely drain its acknowledgement stream after receiving an end signal
761    let mut partition_consumers: JoinSet<(TopicPartition, PartitionConsumerStatus)> =
762        Default::default();
763
764    // Handles that will let us end any consumer task that exceeds a drain deadline
765    let mut abort_handles: HashMap<TopicPartition, tokio::task::AbortHandle> = HashMap::new();
766
767    let exit_eof = eof.is_some();
768
769    while let ConsumerState::Consuming(_) | ConsumerState::Draining(_) = consumer_state {
770        tokio::select! {
771            Some(Ok((finished_partition, status))) = partition_consumers.join_next(), if !partition_consumers.is_empty() => {
772                debug!("Partition consumer finished for {}:{}", &finished_partition.0, finished_partition.1);
773                // If this task ended on its own, the end_signal for it will still be in here.
774                end_signals.remove(&finished_partition);
775                abort_handles.remove(&finished_partition);
776
777                (drain_deadline, consumer_state) = match consumer_state {
778                    ConsumerState::Complete => unreachable!("Partition consumer finished after completion."),
779                    ConsumerState::Draining(mut state) => {
780                        state.partition_drained(finished_partition);
781
782                        if state.is_drain_complete() {
783                            debug!("All expected partitions have drained.");
784                            state.finish_drain(drain_deadline)
785                        } else {
786                            state.keep_draining(drain_deadline)
787                        }
788                    },
789                    ConsumerState::Consuming(state) => {
790                        // If we are here, it is likely because the consumer
791                        // tasks are set up to exit upon reaching the end of the
792                        // partition.
793                        if !exit_eof {
794                            debug!("Partition consumer task finished, while not in draining mode.");
795                        }
796                        state.keep_consuming(drain_deadline)
797                    },
798                };
799
800                // PartitionConsumerStatus differentiates between a task that exited after
801                // being signaled to end, and one that reached the end of its partition and
802                // was configured to exit. After the last such task ends, we signal the kafka
803                // driver task to shut down the main consumer too. Note this is only used in tests.
804                if exit_eof && status == PartitionConsumerStatus::PartitionEOF && partition_consumers.is_empty() {
805                    debug!("All partitions have exited or reached EOF.");
806                    let _ = eof.take().map(|e| e.send(()));
807                }
808            },
809            Some(callback) = callbacks.recv() => match callback {
810                KafkaCallback::PartitionsAssigned(mut assigned_partitions, done) => match consumer_state {
811                    ConsumerState::Complete => unreachable!("Partition assignment received after completion."),
812                    ConsumerState::Draining(_) => error!("Partition assignment received while draining revoked partitions, maybe an invalid assignment."),
813                    ConsumerState::Consuming(ref consumer_state) => {
814                        let acks = consumer.context().acknowledgements;
815                        for tp in assigned_partitions.drain(0..) {
816                            let topic = tp.0.as_str();
817                            let partition = tp.1;
818                            match consumer.split_partition_queue(topic, partition) { Some(pq) => {
819                                debug!("Consuming partition {}:{}.", &tp.0, tp.1);
820                                let (end_tx, handle) = consumer_state.consume_partition(&mut partition_consumers, tp.clone(), Arc::clone(&consumer), pq, acks, exit_eof);
821                                abort_handles.insert(tp.clone(), handle);
822                                end_signals.insert(tp, end_tx);
823                            } _ => {
824                                warn!("Failed to get queue for assigned partition {}:{}.", &tp.0, tp.1);
825                            }}
826                        }
827                        // ensure this is retained until all individual queues are set up
828                        drop(done);
829                    }
830                },
831                KafkaCallback::PartitionsRevoked(mut revoked_partitions, drain) => (drain_deadline, consumer_state) = match consumer_state {
832                    ConsumerState::Complete => unreachable!("Partitions revoked after completion."),
833                    ConsumerState::Draining(d) => {
834                        // NB: This would only happen if the task driving the kafka client (i.e. rebalance handlers)
835                        // is not handling shutdown signals, and a revoke happens during a shutdown drain; otherwise
836                        // this is unreachable code.
837                        warn!("Kafka client is already draining revoked partitions.");
838                        d.keep_draining(drain_deadline)
839                    },
840                    ConsumerState::Consuming(state) => {
841                        let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, false);
842
843                        for tp in revoked_partitions.drain(0..) {
844                            match end_signals.remove(&tp) { Some(end) => {
845                                debug!("Revoking partition {}:{}", &tp.0, tp.1);
846                                state.revoke_partition(tp, end);
847                            } _ => {
848                                debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
849                            }}
850                        }
851
852                        state.keep_draining(deadline)
853                    }
854                },
855                KafkaCallback::ShuttingDown(drain) => (drain_deadline, consumer_state) = match consumer_state {
856                    ConsumerState::Complete => unreachable!("Shutdown received after completion."),
857                    // Shutting down is just like a full assignment revoke, but we also close the
858                    // callback channels, since we don't expect additional assignments or rebalances
859                    ConsumerState::Draining(state) => {
860                        // NB: This would only happen if the task driving the kafka client is
861                        // not handling shutdown signals; otherwise this is unreachable code
862                        error!("Kafka client handled a shutdown signal while a rebalance was in progress.");
863                        callbacks.close();
864                        state.keep_draining(drain_deadline)
865                    },
866                    ConsumerState::Consuming(state) => {
867                        callbacks.close();
868                        let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, true);
869                        if let Ok(tpl) = consumer.assignment() {
870                            // TODO  workaround for https://github.com/fede1024/rust-rdkafka/issues/681
871                            if tpl.capacity() == 0 {
872                                return;
873                            }
874                            tpl.elements()
875                                .iter()
876                                .for_each(|el| {
877
878                                let tp: TopicPartition = (el.topic().into(), el.partition());
879                                match end_signals.remove(&tp) { Some(end) => {
880                                    debug!("Shutting down and revoking partition {}:{}", &tp.0, tp.1);
881                                    state.revoke_partition(tp, end);
882                                } _ => {
883                                    debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
884                                }}
885                            });
886                        }
887                        // If shutdown was initiated by partition EOF mode, the drain phase
888                        // will already be complete and would time out if not accounted for here
889                        if state.is_drain_complete() {
890                            state.finish_drain(deadline)
891                        } else {
892                            state.keep_draining(deadline)
893                        }
894                    }
895                },
896            },
897
898            Some(_) = &mut drain_deadline => (drain_deadline, consumer_state) = match consumer_state {
899                ConsumerState::Complete => unreachable!("Drain deadline received after completion."),
900                ConsumerState::Consuming(state) => {
901                    warn!("A drain deadline fired outside of draining mode.");
902                    state.keep_consuming(None.into())
903                },
904                ConsumerState::Draining(mut draining) => {
905                    debug!("Acknowledgement drain deadline reached. Dropping any pending ack streams for revoked partitions.");
906                    for tp in draining.consumer_state.expect_drain.drain() {
907                        if let Some(handle) = abort_handles.remove(&tp) {
908                            handle.abort();
909                        }
910                    }
911                    draining.finish_drain(drain_deadline)
912                }
913            },
914        }
915    }
916}
917
918fn drive_kafka_consumer(
919    consumer: Arc<StreamConsumer<KafkaSourceContext>>,
920    mut shutdown: ShutdownSignal,
921    eof: Option<oneshot::Receiver<()>>,
922) {
923    Handle::current().block_on(async move {
924        let mut eof: OptionFuture<_> = eof.into();
925        let mut stream = consumer.stream();
926        loop {
927            tokio::select! {
928                _ = &mut shutdown => {
929                    consumer.context().shutdown();
930                    break
931                },
932
933                Some(_) = &mut eof => {
934                    consumer.context().shutdown();
935                    break
936                },
937
938                // NB: messages are not received on this thread, however we poll
939                // the consumer to serve client callbacks, such as rebalance notifications
940                message = stream.next() => match message {
941                    None => unreachable!("MessageStream never returns Ready(None)"),
942                    Some(Err(error)) => emit!(KafkaReadError { error }),
943                    Some(Ok(_msg)) => {
944                        unreachable!("Messages are consumed in dedicated tasks for each partition.")
945                    }
946                },
947            }
948        }
949    });
950}
951
952async fn parse_message(
953    msg: BorrowedMessage<'_>,
954    decoder: Decoder,
955    keys: &'_ Keys,
956    out: &mut SourceSender,
957    acknowledgements: bool,
958    finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
959    log_namespace: LogNamespace,
960) {
961    if let Some((count, stream)) = parse_stream(&msg, decoder, keys, log_namespace) {
962        let (batch, receiver) = BatchNotifier::new_with_receiver();
963        let mut stream = stream.map(|event| {
964            // All acknowledgements flow through the normal Finalizer stream so
965            // that they can be handled in one place, but are only tied to the
966            // batch when acknowledgements are enabled
967            if acknowledgements {
968                event.with_batch_notifier(&batch)
969            } else {
970                event
971            }
972        });
973        match out.send_event_stream(&mut stream).await {
974            Err(_) => {
975                emit!(StreamClosedError { count });
976            }
977            Ok(_) => {
978                // Drop stream to avoid borrowing `msg`: "[...] borrow might be used
979                // here, when `stream` is dropped and runs the destructor [...]".
980                drop(stream);
981                if let Some(f) = finalizer.as_ref() {
982                    f.add(msg.into(), receiver)
983                }
984            }
985        }
986    }
987}
988
989// Turn the received message into a stream of parsed events.
990fn parse_stream<'a>(
991    msg: &BorrowedMessage<'a>,
992    decoder: Decoder,
993    keys: &'a Keys,
994    log_namespace: LogNamespace,
995) -> Option<(usize, impl Stream<Item = Event> + 'a + use<'a>)> {
996    let payload = msg.payload()?; // skip messages with empty payload
997
998    let rmsg = ReceivedMessage::from(msg);
999
1000    let payload = Cursor::new(Bytes::copy_from_slice(payload));
1001
1002    let mut stream = FramedRead::with_capacity(payload, decoder, msg.payload_len());
1003    let (count, _) = stream.size_hint();
1004    let stream = stream! {
1005        while let Some(result) = stream.next().await {
1006            match result {
1007                Ok((events, _byte_size)) => {
1008                    emit!(KafkaEventsReceived {
1009                        count: events.len(),
1010                        byte_size: events.estimated_json_encoded_size_of(),
1011                        topic: &rmsg.topic,
1012                        partition: rmsg.partition,
1013                    });
1014                    for mut event in events {
1015                        rmsg.apply(keys, &mut event, log_namespace);
1016                        yield event;
1017                    }
1018                },
1019                Err(error) => {
1020                    // Error is logged by `codecs::Decoder`, no further handling
1021                    // is needed here.
1022                    if !error.can_continue() {
1023                        break;
1024                    }
1025                }
1026            }
1027        }
1028    }
1029    .boxed();
1030    Some((count, stream))
1031}
1032
1033#[derive(Clone, Debug)]
1034struct Keys {
1035    timestamp: Option<OwnedValuePath>,
1036    key_field: Option<OwnedValuePath>,
1037    topic: Option<OwnedValuePath>,
1038    partition: Option<OwnedValuePath>,
1039    offset: Option<OwnedValuePath>,
1040    headers: Option<OwnedValuePath>,
1041}
1042
1043impl Keys {
1044    fn from(schema: &LogSchema, config: &KafkaSourceConfig) -> Self {
1045        Self {
1046            timestamp: schema.timestamp_key().cloned(),
1047            key_field: config.key_field.path.clone(),
1048            topic: config.topic_key.path.clone(),
1049            partition: config.partition_key.path.clone(),
1050            offset: config.offset_key.path.clone(),
1051            headers: config.headers_key.path.clone(),
1052        }
1053    }
1054}
1055
1056struct ReceivedMessage {
1057    timestamp: Option<DateTime<Utc>>,
1058    key: Value,
1059    headers: ObjectMap,
1060    topic: String,
1061    partition: i32,
1062    offset: i64,
1063}
1064
1065impl ReceivedMessage {
1066    fn from(msg: &BorrowedMessage<'_>) -> Self {
1067        // Extract timestamp from kafka message
1068        let timestamp = msg
1069            .timestamp()
1070            .to_millis()
1071            .and_then(|millis| Utc.timestamp_millis_opt(millis).latest());
1072
1073        let key = msg
1074            .key()
1075            .map(|key| Value::from(Bytes::from(key.to_owned())))
1076            .unwrap_or(Value::Null);
1077
1078        let mut headers_map = ObjectMap::new();
1079        if let Some(headers) = msg.headers() {
1080            for header in headers.iter() {
1081                if let Some(value) = header.value {
1082                    headers_map.insert(
1083                        header.key.into(),
1084                        Value::from(Bytes::from(value.to_owned())),
1085                    );
1086                }
1087            }
1088        }
1089
1090        Self {
1091            timestamp,
1092            key,
1093            headers: headers_map,
1094            topic: msg.topic().to_string(),
1095            partition: msg.partition(),
1096            offset: msg.offset(),
1097        }
1098    }
1099
1100    fn apply(&self, keys: &Keys, event: &mut Event, log_namespace: LogNamespace) {
1101        if let Event::Log(log) = event {
1102            match log_namespace {
1103                LogNamespace::Vector => {
1104                    // We'll only use this function in Vector namespaces because we don't want
1105                    // "timestamp" to be set automatically in legacy namespaces. In legacy
1106                    // namespaces, the "timestamp" field corresponds to the Kafka message, not the
1107                    // timestamp when the event was processed.
1108                    log_namespace.insert_standard_vector_source_metadata(
1109                        log,
1110                        KafkaSourceConfig::NAME,
1111                        Utc::now(),
1112                    );
1113                }
1114                LogNamespace::Legacy => {
1115                    if let Some(source_type_key) = log_schema().source_type_key_target_path() {
1116                        log.insert(source_type_key, KafkaSourceConfig::NAME);
1117                    }
1118                }
1119            }
1120
1121            log_namespace.insert_source_metadata(
1122                KafkaSourceConfig::NAME,
1123                log,
1124                keys.key_field.as_ref().map(LegacyKey::Overwrite),
1125                path!("message_key"),
1126                self.key.clone(),
1127            );
1128
1129            log_namespace.insert_source_metadata(
1130                KafkaSourceConfig::NAME,
1131                log,
1132                keys.timestamp.as_ref().map(LegacyKey::Overwrite),
1133                path!("timestamp"),
1134                self.timestamp,
1135            );
1136
1137            log_namespace.insert_source_metadata(
1138                KafkaSourceConfig::NAME,
1139                log,
1140                keys.topic.as_ref().map(LegacyKey::Overwrite),
1141                path!("topic"),
1142                self.topic.clone(),
1143            );
1144
1145            log_namespace.insert_source_metadata(
1146                KafkaSourceConfig::NAME,
1147                log,
1148                keys.partition.as_ref().map(LegacyKey::Overwrite),
1149                path!("partition"),
1150                self.partition,
1151            );
1152
1153            log_namespace.insert_source_metadata(
1154                KafkaSourceConfig::NAME,
1155                log,
1156                keys.offset.as_ref().map(LegacyKey::Overwrite),
1157                path!("offset"),
1158                self.offset,
1159            );
1160
1161            log_namespace.insert_source_metadata(
1162                KafkaSourceConfig::NAME,
1163                log,
1164                keys.headers.as_ref().map(LegacyKey::Overwrite),
1165                path!("headers"),
1166                self.headers.clone(),
1167            );
1168        }
1169    }
1170}
1171
1172#[derive(Debug, Eq, PartialEq, Hash)]
1173struct FinalizerEntry {
1174    topic: String,
1175    partition: i32,
1176    offset: i64,
1177}
1178
1179impl<'a> From<BorrowedMessage<'a>> for FinalizerEntry {
1180    fn from(msg: BorrowedMessage<'a>) -> Self {
1181        Self {
1182            topic: msg.topic().into(),
1183            partition: msg.partition(),
1184            offset: msg.offset(),
1185        }
1186    }
1187}
1188
1189fn create_consumer(
1190    config: &KafkaSourceConfig,
1191    acknowledgements: bool,
1192) -> crate::Result<(
1193    StreamConsumer<KafkaSourceContext>,
1194    UnboundedReceiver<KafkaCallback>,
1195)> {
1196    let mut client_config = ClientConfig::new();
1197    client_config
1198        .set("group.id", &config.group_id)
1199        .set("bootstrap.servers", &config.bootstrap_servers)
1200        .set("auto.offset.reset", &config.auto_offset_reset)
1201        .set(
1202            "session.timeout.ms",
1203            config.session_timeout_ms.as_millis().to_string(),
1204        )
1205        .set(
1206            "socket.timeout.ms",
1207            config.socket_timeout_ms.as_millis().to_string(),
1208        )
1209        .set(
1210            "fetch.wait.max.ms",
1211            config.fetch_wait_max_ms.as_millis().to_string(),
1212        )
1213        .set("enable.partition.eof", "false")
1214        .set("enable.auto.commit", "true")
1215        .set(
1216            "auto.commit.interval.ms",
1217            config.commit_interval_ms.as_millis().to_string(),
1218        )
1219        .set("enable.auto.offset.store", "false")
1220        .set("statistics.interval.ms", "1000")
1221        .set("client.id", "vector");
1222
1223    config.auth.apply(&mut client_config)?;
1224
1225    if let Some(librdkafka_options) = &config.librdkafka_options {
1226        for (key, value) in librdkafka_options {
1227            client_config.set(key.as_str(), value.as_str());
1228        }
1229    }
1230
1231    let (callbacks, callback_rx) = mpsc::unbounded_channel();
1232    let consumer = client_config
1233        .create_with_context::<_, StreamConsumer<_>>(KafkaSourceContext::new(
1234            config.metrics.topic_lag_metric,
1235            acknowledgements,
1236            callbacks,
1237            Span::current(),
1238        ))
1239        .context(CreateSnafu)?;
1240
1241    Ok((consumer, callback_rx))
1242}
1243
1244type TopicPartition = (String, i32);
1245
1246/// Status returned by partition consumer tasks, allowing the coordination task
1247/// to differentiate between a consumer exiting normally (after receiving an end
1248/// signal) and exiting when it reaches the end of a partition
1249#[derive(PartialEq)]
1250enum PartitionConsumerStatus {
1251    NormalExit,
1252    PartitionEOF,
1253}
1254
1255enum KafkaCallback {
1256    PartitionsAssigned(Vec<TopicPartition>, SyncSender<()>),
1257    PartitionsRevoked(Vec<TopicPartition>, SyncSender<()>),
1258    ShuttingDown(SyncSender<()>),
1259}
1260
1261struct KafkaSourceContext {
1262    acknowledgements: bool,
1263    stats: kafka::KafkaStatisticsContext,
1264
1265    /// A callback channel used to coordinate between the main consumer task and the acknowledgement task
1266    callbacks: UnboundedSender<KafkaCallback>,
1267
1268    /// A weak reference to the consumer, so that we can commit offsets during a rebalance operation
1269    consumer: OnceLock<Weak<StreamConsumer<KafkaSourceContext>>>,
1270}
1271
1272impl KafkaSourceContext {
1273    fn new(
1274        expose_lag_metrics: bool,
1275        acknowledgements: bool,
1276        callbacks: UnboundedSender<KafkaCallback>,
1277        span: Span,
1278    ) -> Self {
1279        Self {
1280            stats: kafka::KafkaStatisticsContext {
1281                expose_lag_metrics,
1282                span,
1283            },
1284            acknowledgements,
1285            consumer: OnceLock::default(),
1286            callbacks,
1287        }
1288    }
1289
1290    fn shutdown(&self) {
1291        let (send, rendezvous) = sync_channel(0);
1292        if self
1293            .callbacks
1294            .send(KafkaCallback::ShuttingDown(send))
1295            .is_ok()
1296        {
1297            while rendezvous.recv().is_ok() {
1298                self.commit_consumer_state();
1299            }
1300        }
1301    }
1302
1303    /// Emit a PartitionsAssigned callback with the topic-partitions to be consumed,
1304    /// and block until confirmation is received that a stream and consumer for
1305    /// each topic-partition has been set up. This function blocks until the
1306    /// rendezvous channel sender is dropped by the callback handler.
1307    fn consume_partitions(&self, tpl: &TopicPartitionList) {
1308        // TODO  workaround for https://github.com/fede1024/rust-rdkafka/issues/681
1309        if tpl.capacity() == 0 {
1310            return;
1311        }
1312        let (send, rendezvous) = sync_channel(0);
1313        let _ = self.callbacks.send(KafkaCallback::PartitionsAssigned(
1314            tpl.elements()
1315                .iter()
1316                .map(|tp| (tp.topic().into(), tp.partition()))
1317                .collect(),
1318            send,
1319        ));
1320
1321        while rendezvous.recv().is_ok() {
1322            // no-op: wait for partition assignment handler to complete
1323        }
1324    }
1325
1326    /// Emit a PartitionsRevoked callback and block until confirmation is
1327    /// received that acknowledgements have been processed for each of them.
1328    /// The rendezvous channel used in the callback can send multiple times to
1329    /// signal individual partitions completing. This function blocks until the
1330    /// sender is dropped by the callback handler.
1331    fn revoke_partitions(&self, tpl: &TopicPartitionList) {
1332        let (send, rendezvous) = sync_channel(0);
1333        let _ = self.callbacks.send(KafkaCallback::PartitionsRevoked(
1334            tpl.elements()
1335                .iter()
1336                .map(|tp| (tp.topic().into(), tp.partition()))
1337                .collect(),
1338            send,
1339        ));
1340
1341        while rendezvous.recv().is_ok() {
1342            self.commit_consumer_state();
1343        }
1344    }
1345
1346    fn commit_consumer_state(&self) {
1347        if let Some(consumer) = self
1348            .consumer
1349            .get()
1350            .expect("Consumer reference was not initialized.")
1351            .upgrade()
1352        {
1353            match consumer.commit_consumer_state(CommitMode::Sync) {
1354                Ok(_) | Err(KafkaError::ConsumerCommit(RDKafkaErrorCode::NoOffset)) => {
1355                    /* Success, or nothing to do - yay \0/ */
1356                }
1357                Err(error) => emit!(KafkaOffsetUpdateError { error }),
1358            }
1359        }
1360    }
1361}
1362
1363impl ClientContext for KafkaSourceContext {
1364    fn stats(&self, statistics: Statistics) {
1365        self.stats.stats(statistics)
1366    }
1367}
1368
1369impl ConsumerContext for KafkaSourceContext {
1370    fn pre_rebalance(&self, _base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance) {
1371        match rebalance {
1372            Rebalance::Assign(tpl) => self.consume_partitions(tpl),
1373
1374            Rebalance::Revoke(tpl) => {
1375                self.revoke_partitions(tpl);
1376                self.commit_consumer_state();
1377            }
1378
1379            Rebalance::Error(message) => {
1380                error!("Error during Kafka consumer group rebalance: {}.", message);
1381            }
1382        }
1383    }
1384}
1385
1386#[cfg(test)]
1387mod test {
1388    use vector_lib::{lookup::OwnedTargetPath, schema::Definition};
1389
1390    use super::*;
1391
1392    pub fn kafka_host() -> String {
1393        std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
1394    }
1395    pub fn kafka_port() -> u16 {
1396        let port = std::env::var("KAFKA_PORT").unwrap_or_else(|_| "9091".into());
1397        port.parse().expect("Invalid port number")
1398    }
1399
1400    pub fn kafka_address() -> String {
1401        format!("{}:{}", kafka_host(), kafka_port())
1402    }
1403
1404    #[test]
1405    fn generate_config() {
1406        crate::test_util::test_generate_config::<KafkaSourceConfig>();
1407    }
1408
1409    pub(super) fn make_config(
1410        topic: &str,
1411        group: &str,
1412        log_namespace: LogNamespace,
1413        librdkafka_options: Option<HashMap<String, String>>,
1414    ) -> KafkaSourceConfig {
1415        KafkaSourceConfig {
1416            bootstrap_servers: kafka_address(),
1417            topics: vec![topic.into()],
1418            group_id: group.into(),
1419            auto_offset_reset: "beginning".into(),
1420            session_timeout_ms: Duration::from_millis(6000),
1421            commit_interval_ms: Duration::from_millis(1),
1422            librdkafka_options,
1423            key_field: default_key_field(),
1424            topic_key: default_topic_key(),
1425            partition_key: default_partition_key(),
1426            offset_key: default_offset_key(),
1427            headers_key: default_headers_key(),
1428            socket_timeout_ms: Duration::from_millis(60000),
1429            fetch_wait_max_ms: Duration::from_millis(100),
1430            log_namespace: Some(log_namespace == LogNamespace::Vector),
1431            ..Default::default()
1432        }
1433    }
1434
1435    #[test]
1436    fn test_output_schema_definition_vector_namespace() {
1437        let definitions = make_config("topic", "group", LogNamespace::Vector, None)
1438            .outputs(LogNamespace::Vector)
1439            .remove(0)
1440            .schema_definition(true);
1441
1442        assert_eq!(
1443            definitions,
1444            Some(
1445                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1446                    .with_meaning(OwnedTargetPath::event_root(), "message")
1447                    .with_metadata_field(
1448                        &owned_value_path!("kafka", "timestamp"),
1449                        Kind::timestamp(),
1450                        Some("timestamp")
1451                    )
1452                    .with_metadata_field(
1453                        &owned_value_path!("kafka", "message_key"),
1454                        Kind::bytes(),
1455                        None
1456                    )
1457                    .with_metadata_field(&owned_value_path!("kafka", "topic"), Kind::bytes(), None)
1458                    .with_metadata_field(
1459                        &owned_value_path!("kafka", "partition"),
1460                        Kind::bytes(),
1461                        None
1462                    )
1463                    .with_metadata_field(&owned_value_path!("kafka", "offset"), Kind::bytes(), None)
1464                    .with_metadata_field(
1465                        &owned_value_path!("kafka", "headers"),
1466                        Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1467                        None
1468                    )
1469                    .with_metadata_field(
1470                        &owned_value_path!("vector", "ingest_timestamp"),
1471                        Kind::timestamp(),
1472                        None
1473                    )
1474                    .with_metadata_field(
1475                        &owned_value_path!("vector", "source_type"),
1476                        Kind::bytes(),
1477                        None
1478                    )
1479            )
1480        )
1481    }
1482
1483    #[test]
1484    fn test_output_schema_definition_legacy_namespace() {
1485        let definitions = make_config("topic", "group", LogNamespace::Legacy, None)
1486            .outputs(LogNamespace::Legacy)
1487            .remove(0)
1488            .schema_definition(true);
1489
1490        assert_eq!(
1491            definitions,
1492            Some(
1493                Definition::new_with_default_metadata(Kind::json(), [LogNamespace::Legacy])
1494                    .unknown_fields(Kind::undefined())
1495                    .with_event_field(
1496                        &owned_value_path!("message"),
1497                        Kind::bytes(),
1498                        Some("message")
1499                    )
1500                    .with_event_field(
1501                        &owned_value_path!("timestamp"),
1502                        Kind::timestamp(),
1503                        Some("timestamp")
1504                    )
1505                    .with_event_field(&owned_value_path!("message_key"), Kind::bytes(), None)
1506                    .with_event_field(&owned_value_path!("topic"), Kind::bytes(), None)
1507                    .with_event_field(&owned_value_path!("partition"), Kind::bytes(), None)
1508                    .with_event_field(&owned_value_path!("offset"), Kind::bytes(), None)
1509                    .with_event_field(
1510                        &owned_value_path!("headers"),
1511                        Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1512                        None
1513                    )
1514                    .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1515            )
1516        )
1517    }
1518
1519    #[tokio::test]
1520    async fn consumer_create_ok() {
1521        let config = make_config("topic", "group", LogNamespace::Legacy, None);
1522        assert!(create_consumer(&config, true).is_ok());
1523    }
1524
1525    #[tokio::test]
1526    async fn consumer_create_incorrect_auto_offset_reset() {
1527        let config = KafkaSourceConfig {
1528            auto_offset_reset: "incorrect-auto-offset-reset".to_string(),
1529            ..make_config("topic", "group", LogNamespace::Legacy, None)
1530        };
1531        assert!(create_consumer(&config, true).is_err());
1532    }
1533}
1534
1535#[cfg(feature = "kafka-integration-tests")]
1536#[cfg(test)]
1537mod integration_test {
1538    use std::time::Duration;
1539
1540    use chrono::{DateTime, SubsecRound, Utc};
1541    use futures::Stream;
1542    use futures_util::stream::FuturesUnordered;
1543    use rdkafka::{
1544        Offset, TopicPartitionList,
1545        admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
1546        client::DefaultClientContext,
1547        config::{ClientConfig, FromClientConfig},
1548        consumer::BaseConsumer,
1549        message::{Header, OwnedHeaders},
1550        producer::{FutureProducer, FutureRecord},
1551        util::Timeout,
1552    };
1553    use stream_cancel::{Trigger, Tripwire};
1554    use tokio::time::sleep;
1555    use vector_lib::event::EventStatus;
1556    use vrl::{event_path, value};
1557
1558    use super::{test::*, *};
1559    use crate::{
1560        SourceSender,
1561        event::{EventArray, EventContainer},
1562        shutdown::ShutdownSignal,
1563        test_util::{collect_n, components::assert_source_compliance, random_string},
1564    };
1565
1566    const KEY: &str = "my key";
1567    const TEXT: &str = "my message";
1568    const HEADER_KEY: &str = "my header";
1569    const HEADER_VALUE: &str = "my header value";
1570
1571    fn kafka_test_topic() -> String {
1572        std::env::var("KAFKA_TEST_TOPIC")
1573            .unwrap_or_else(|_| format!("test-topic-{}", random_string(10)))
1574    }
1575    fn kafka_max_bytes() -> String {
1576        std::env::var("KAFKA_MAX_BYTES").unwrap_or_else(|_| "1024".into())
1577    }
1578
1579    fn client_config<T: FromClientConfig>(group: Option<&str>) -> T {
1580        let mut client = ClientConfig::new();
1581        client.set("bootstrap.servers", kafka_address());
1582        client.set("produce.offset.report", "true");
1583        client.set("message.timeout.ms", "5000");
1584        client.set("auto.commit.interval.ms", "1");
1585        if let Some(group) = group {
1586            client.set("group.id", group);
1587        }
1588        client.create().expect("Producer creation error")
1589    }
1590
1591    async fn send_events(topic: String, partitions: i32, count: usize) -> DateTime<Utc> {
1592        let now = Utc::now();
1593        let timestamp = now.timestamp_millis();
1594
1595        let producer: &FutureProducer = &client_config(None);
1596        let topic_name = topic.as_ref();
1597
1598        create_topic(topic_name, partitions).await;
1599
1600        (0..count)
1601            .map(|i| async move {
1602                let text = format!("{TEXT} {i:03}");
1603                let key = format!("{KEY} {i}");
1604                let record = FutureRecord::to(topic_name)
1605                    .payload(&text)
1606                    .key(&key)
1607                    .timestamp(timestamp)
1608                    .headers(OwnedHeaders::new().insert(Header {
1609                        key: HEADER_KEY,
1610                        value: Some(HEADER_VALUE),
1611                    }));
1612                if let Err(error) = producer.send(record, Timeout::Never).await {
1613                    panic!("Cannot send event to Kafka: {error:?}");
1614                }
1615            })
1616            .collect::<FuturesUnordered<_>>()
1617            .collect::<Vec<_>>()
1618            .await;
1619
1620        now
1621    }
1622
1623    async fn send_to_test_topic(partitions: i32, count: usize) -> (String, String, DateTime<Utc>) {
1624        let topic = kafka_test_topic();
1625        let group_id = format!("test-group-{}", random_string(10));
1626
1627        let sent_at = send_events(topic.clone(), partitions, count).await;
1628
1629        (topic, group_id, sent_at)
1630    }
1631
1632    #[tokio::test]
1633    async fn consumes_event_with_acknowledgements() {
1634        send_receive(true, |_| false, 10, LogNamespace::Legacy).await;
1635    }
1636
1637    #[tokio::test]
1638    async fn consumes_event_with_acknowledgements_vector_namespace() {
1639        send_receive(true, |_| false, 10, LogNamespace::Vector).await;
1640    }
1641
1642    #[tokio::test]
1643    async fn consumes_event_without_acknowledgements() {
1644        send_receive(false, |_| false, 10, LogNamespace::Legacy).await;
1645    }
1646
1647    #[tokio::test]
1648    async fn consumes_event_without_acknowledgements_vector_namespace() {
1649        send_receive(false, |_| false, 10, LogNamespace::Vector).await;
1650    }
1651
1652    #[tokio::test]
1653    async fn handles_one_negative_acknowledgement() {
1654        send_receive(true, |n| n == 2, 10, LogNamespace::Legacy).await;
1655    }
1656
1657    #[tokio::test]
1658    async fn handles_one_negative_acknowledgement_vector_namespace() {
1659        send_receive(true, |n| n == 2, 10, LogNamespace::Vector).await;
1660    }
1661
1662    #[tokio::test]
1663    async fn handles_permanent_negative_acknowledgement() {
1664        send_receive(true, |n| n >= 2, 2, LogNamespace::Legacy).await;
1665    }
1666
1667    #[tokio::test]
1668    async fn handles_permanent_negative_acknowledgement_vector_namespace() {
1669        send_receive(true, |n| n >= 2, 2, LogNamespace::Vector).await;
1670    }
1671
1672    async fn send_receive(
1673        acknowledgements: bool,
1674        error_at: impl Fn(usize) -> bool,
1675        receive_count: usize,
1676        log_namespace: LogNamespace,
1677    ) {
1678        const SEND_COUNT: usize = 10;
1679
1680        let topic = format!("test-topic-{}", random_string(10));
1681        let group_id = format!("test-group-{}", random_string(10));
1682        let config = make_config(&topic, &group_id, log_namespace, None);
1683
1684        let now = send_events(topic.clone(), 1, 10).await;
1685
1686        let events = assert_source_compliance(&["protocol", "topic", "partition"], async move {
1687            let (tx, rx) = SourceSender::new_test_errors(error_at);
1688            let (trigger_shutdown, shutdown_done) =
1689                spawn_kafka(tx, config, acknowledgements, false, log_namespace);
1690            let events = collect_n(rx, SEND_COUNT).await;
1691            // Yield to the finalization task to let it collect the
1692            // batch status receivers before signalling the shutdown.
1693            tokio::task::yield_now().await;
1694            drop(trigger_shutdown);
1695            shutdown_done.await;
1696
1697            events
1698        })
1699        .await;
1700
1701        let offset = fetch_tpl_offset(&group_id, &topic, 0);
1702        assert_eq!(offset, Offset::from_raw(receive_count as i64));
1703
1704        assert_eq!(events.len(), SEND_COUNT);
1705        for (i, event) in events.into_iter().enumerate() {
1706            if let LogNamespace::Legacy = log_namespace {
1707                assert_eq!(
1708                    event.as_log()[log_schema().message_key().unwrap().to_string()],
1709                    format!("{TEXT} {i:03}").into()
1710                );
1711                assert_eq!(event.as_log()["message_key"], format!("{KEY} {i}").into());
1712                assert_eq!(
1713                    event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1714                    "kafka".into()
1715                );
1716                assert_eq!(
1717                    event.as_log()[log_schema().timestamp_key().unwrap().to_string()],
1718                    now.trunc_subsecs(3).into()
1719                );
1720                assert_eq!(event.as_log()["topic"], topic.clone().into());
1721                assert!(event.as_log().contains("partition"));
1722                assert!(event.as_log().contains("offset"));
1723                let mut expected_headers = ObjectMap::new();
1724                expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1725                assert_eq!(event.as_log()["headers"], Value::from(expected_headers));
1726            } else {
1727                let meta = event.as_log().metadata().value();
1728
1729                assert_eq!(
1730                    meta.get(path!("vector", "source_type")).unwrap(),
1731                    &value!(KafkaSourceConfig::NAME)
1732                );
1733                assert!(
1734                    meta.get(path!("vector", "ingest_timestamp"))
1735                        .unwrap()
1736                        .is_timestamp()
1737                );
1738
1739                assert_eq!(
1740                    event.as_log().value(),
1741                    &value!(format!("{} {:03}", TEXT, i))
1742                );
1743                assert_eq!(
1744                    meta.get(path!("kafka", "message_key")).unwrap(),
1745                    &value!(format!("{} {}", KEY, i))
1746                );
1747
1748                assert_eq!(
1749                    meta.get(path!("kafka", "timestamp")).unwrap(),
1750                    &value!(now.trunc_subsecs(3))
1751                );
1752                assert_eq!(
1753                    meta.get(path!("kafka", "topic")).unwrap(),
1754                    &value!(topic.clone())
1755                );
1756                assert!(meta.get(path!("kafka", "partition")).unwrap().is_integer(),);
1757                assert!(meta.get(path!("kafka", "offset")).unwrap().is_integer(),);
1758
1759                let mut expected_headers = ObjectMap::new();
1760                expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1761                assert_eq!(
1762                    meta.get(path!("kafka", "headers")).unwrap(),
1763                    &Value::from(expected_headers)
1764                );
1765            }
1766        }
1767    }
1768
1769    fn make_rand_config() -> (String, String, KafkaSourceConfig) {
1770        let topic = format!("test-topic-{}", random_string(10));
1771        let group_id = format!("test-group-{}", random_string(10));
1772        let config = make_config(&topic, &group_id, LogNamespace::Legacy, None);
1773        (topic, group_id, config)
1774    }
1775
1776    fn delay_pipeline(
1777        id: usize,
1778        delay: Duration,
1779        status: EventStatus,
1780    ) -> (SourceSender, impl Stream<Item = EventArray> + Unpin) {
1781        let (pipe, recv) = SourceSender::new_test_sender_with_buffer(100);
1782        let recv = recv.into_stream();
1783        let recv = recv.then(move |item| async move {
1784            let mut events = item.events;
1785            events.iter_logs_mut().for_each(|log| {
1786                log.insert(event_path!("pipeline_id"), id.to_string());
1787            });
1788            sleep(delay).await;
1789            events.iter_events_mut().for_each(|mut event| {
1790                let metadata = event.metadata_mut();
1791                metadata.update_status(status);
1792                metadata.update_sources();
1793            });
1794            events
1795        });
1796        (pipe, Box::pin(recv))
1797    }
1798
1799    fn spawn_kafka(
1800        out: SourceSender,
1801        config: KafkaSourceConfig,
1802        acknowledgements: bool,
1803        eof: bool,
1804        log_namespace: LogNamespace,
1805    ) -> (Trigger, Tripwire) {
1806        let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
1807
1808        let decoder = DecodingConfig::new(
1809            config.framing.clone(),
1810            config.decoding.clone(),
1811            log_namespace,
1812        )
1813        .build()
1814        .unwrap();
1815
1816        let (consumer, callback_rx) = create_consumer(&config, acknowledgements).unwrap();
1817
1818        tokio::spawn(kafka_source(
1819            config,
1820            consumer,
1821            callback_rx,
1822            decoder,
1823            out,
1824            shutdown,
1825            eof,
1826            log_namespace,
1827        ));
1828        (trigger_shutdown, shutdown_done)
1829    }
1830
1831    fn fetch_tpl_offset(group_id: &str, topic: &str, partition: i32) -> Offset {
1832        let client: BaseConsumer = client_config(Some(group_id));
1833        client.subscribe(&[topic]).expect("Subscribing failed");
1834
1835        let mut tpl = TopicPartitionList::new();
1836        tpl.add_partition(topic, partition);
1837        client
1838            .committed_offsets(tpl, Duration::from_secs(1))
1839            .expect("Getting committed offsets failed")
1840            .find_partition(topic, partition)
1841            .expect("Missing topic/partition")
1842            .offset()
1843    }
1844
1845    async fn create_topic(topic: &str, partitions: i32) {
1846        let client: AdminClient<DefaultClientContext> = client_config(None);
1847        let topic_results = client
1848            .create_topics(
1849                [&NewTopic {
1850                    name: topic,
1851                    num_partitions: partitions,
1852                    replication: TopicReplication::Fixed(1),
1853                    config: vec![],
1854                }],
1855                &AdminOptions::default(),
1856            )
1857            .await
1858            .expect("create_topics failed");
1859
1860        for result in topic_results {
1861            if let Err((topic, err)) = result
1862                && err != rdkafka::types::RDKafkaErrorCode::TopicAlreadyExists
1863            {
1864                panic!("Creating a topic failed: {:?}", (topic, err))
1865            }
1866        }
1867    }
1868
1869    // Failure timeline:
1870    // - Topic exists on multiple partitions
1871    // - Consumer A connects to topic, is assigned both partitions
1872    // - Consumer A receives some messages
1873    // - Consumer B connects to topic
1874    // - Consumer A has one partition revoked (rebalance)
1875    // - Consumer B is assigned a partition
1876    // - Consumer A stores an order on the revoked partition
1877    // - Consumer B skips receiving messages?
1878    #[ignore]
1879    #[tokio::test]
1880    async fn handles_rebalance() {
1881        // The test plan here is to:
1882        // - Set up one source instance, feeding into a pipeline that delays acks.
1883        // - Wait a bit, and set up a second source instance. This should cause a rebalance.
1884        // - Wait further until all events will have been pulled down.
1885        // - Verify that all events are captured by the two sources, and that offsets are set right, etc.
1886
1887        // However this test, as written, does not actually cause the
1888        // conditions required to test this. We have had external
1889        // validation that the sink behaves properly on rebalance
1890        // events.  This test also requires the insertion of a small
1891        // delay into the source to guarantee the timing, which is not
1892        // suitable for production code.
1893
1894        const NEVENTS: usize = 200;
1895        const DELAY: u64 = 100;
1896
1897        let (topic, group_id, config) = make_rand_config();
1898        create_topic(&topic, 2).await;
1899
1900        let _send_start = send_events(topic.clone(), 1, NEVENTS).await;
1901
1902        let (tx, rx1) = delay_pipeline(1, Duration::from_millis(200), EventStatus::Delivered);
1903        let (trigger_shutdown1, shutdown_done1) =
1904            spawn_kafka(tx, config.clone(), true, false, LogNamespace::Legacy);
1905        let events1 = tokio::spawn(collect_n(rx1, NEVENTS));
1906
1907        sleep(Duration::from_secs(1)).await;
1908
1909        let (tx, rx2) = delay_pipeline(2, Duration::from_millis(DELAY), EventStatus::Delivered);
1910        let (trigger_shutdown2, shutdown_done2) =
1911            spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1912        let events2 = tokio::spawn(collect_n(rx2, NEVENTS));
1913
1914        sleep(Duration::from_secs(5)).await;
1915
1916        drop(trigger_shutdown1);
1917        let events1 = events1.await.unwrap();
1918        shutdown_done1.await;
1919
1920        sleep(Duration::from_secs(5)).await;
1921
1922        drop(trigger_shutdown2);
1923        let events2 = events2.await.unwrap();
1924        shutdown_done2.await;
1925
1926        sleep(Duration::from_secs(1)).await;
1927
1928        assert!(!events1.is_empty());
1929        assert!(!events2.is_empty());
1930
1931        match fetch_tpl_offset(&group_id, &topic, 0) {
1932            Offset::Offset(offset) => {
1933                assert!((offset as isize - events1.len() as isize).abs() <= 1)
1934            }
1935            o => panic!("Invalid offset for partition 0 {o:?}"),
1936        }
1937
1938        match fetch_tpl_offset(&group_id, &topic, 1) {
1939            Offset::Offset(offset) => {
1940                assert!((offset as isize - events2.len() as isize).abs() <= 1)
1941            }
1942            o => panic!("Invalid offset for partition 0 {o:?}"),
1943        }
1944
1945        let mut all_events = events1
1946            .into_iter()
1947            .chain(events2.into_iter())
1948            .flat_map(map_logs)
1949            .collect::<Vec<String>>();
1950        all_events.sort();
1951
1952        // Assert they are all in sequential order and no dupes, TODO
1953    }
1954
1955    #[tokio::test]
1956    async fn drains_acknowledgements_at_shutdown() {
1957        // 1. Send N events (if running against a pre-populated kafka topic, use send_count=0 and expect_count=expected number of messages; otherwise just set send_count)
1958        let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
1959            .unwrap_or_else(|_| "125000".into())
1960            .parse()
1961            .expect("Number of messages to send to kafka.");
1962        let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
1963            .unwrap_or_else(|_| format!("{send_count}"))
1964            .parse()
1965            .expect("Number of messages to expect consumers to process.");
1966        let delay_ms: u64 = std::env::var("KAFKA_SHUTDOWN_DELAY")
1967            .unwrap_or_else(|_| "2000".into())
1968            .parse()
1969            .expect("Number of milliseconds before shutting down first consumer.");
1970
1971        let (topic, group_id, _) = send_to_test_topic(1, send_count).await;
1972
1973        // 2. Run the kafka source to read some of the events
1974        // 3. Send a shutdown signal (at some point before all events are read)
1975        let mut opts = HashMap::new();
1976        // Set options to get partition EOF notifications, and fetch data in small/configurable size chunks
1977        opts.insert("enable.partition.eof".into(), "true".into());
1978        opts.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
1979        let events1 = {
1980            let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts.clone()));
1981            let (tx, rx) = SourceSender::new_test_errors(|_| false);
1982            let (trigger_shutdown, shutdown_done) =
1983                spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1984            let (events, _) = tokio::join!(rx.collect::<Vec<Event>>(), async move {
1985                sleep(Duration::from_millis(delay_ms)).await;
1986                drop(trigger_shutdown);
1987            });
1988            shutdown_done.await;
1989            events
1990        };
1991
1992        debug!("Consumer group.id: {}", &group_id);
1993        debug!(
1994            "First consumer read {} of {} messages.",
1995            events1.len(),
1996            expect_count
1997        );
1998
1999        // 4. Run the kafka source again to finish reading the events
2000        let events2 = {
2001            let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts));
2002            let (tx, rx) = SourceSender::new_test_errors(|_| false);
2003            let (trigger_shutdown, shutdown_done) =
2004                spawn_kafka(tx, config, true, true, LogNamespace::Legacy);
2005            let events = rx.collect::<Vec<Event>>().await;
2006            drop(trigger_shutdown);
2007            shutdown_done.await;
2008            events
2009        };
2010
2011        debug!(
2012            "Second consumer read {} of {} messages.",
2013            events2.len(),
2014            expect_count
2015        );
2016
2017        // 5. Total number of events processed should equal the number sent
2018        let total = events1.len() + events2.len();
2019        assert_ne!(
2020            events1.len(),
2021            0,
2022            "First batch of events should be non-zero (increase KAFKA_SHUTDOWN_DELAY?)"
2023        );
2024        assert_ne!(
2025            events2.len(),
2026            0,
2027            "Second batch of events should be non-zero (decrease KAFKA_SHUTDOWN_DELAY or increase KAFKA_SEND_COUNT?) "
2028        );
2029        assert_eq!(total, expect_count);
2030    }
2031
2032    async fn consume_with_rebalance(rebalance_strategy: String) {
2033        // 1. Send N events (if running against a pre-populated kafka topic, use send_count=0 and expect_count=expected number of messages; otherwise just set send_count)
2034        let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
2035            .unwrap_or_else(|_| "125000".into())
2036            .parse()
2037            .expect("Number of messages to send to kafka.");
2038        let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
2039            .unwrap_or_else(|_| format!("{send_count}"))
2040            .parse()
2041            .expect("Number of messages to expect consumers to process.");
2042        let delay_ms: u64 = std::env::var("KAFKA_CONSUMER_DELAY")
2043            .unwrap_or_else(|_| "2000".into())
2044            .parse()
2045            .expect("Number of milliseconds before shutting down first consumer.");
2046
2047        let (topic, group_id, _) = send_to_test_topic(6, send_count).await;
2048        debug!("Topic: {}", &topic);
2049        debug!("Consumer group.id: {}", &group_id);
2050
2051        // 2. Run the kafka source to read some of the events
2052        // 3. Start 2nd & 3rd consumers using the same group.id, triggering rebalance events
2053        let mut kafka_options = HashMap::new();
2054        kafka_options.insert("enable.partition.eof".into(), "true".into());
2055        kafka_options.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
2056        kafka_options.insert("partition.assignment.strategy".into(), rebalance_strategy);
2057        let config1 = make_config(
2058            &topic,
2059            &group_id,
2060            LogNamespace::Legacy,
2061            Some(kafka_options.clone()),
2062        );
2063        let config2 = config1.clone();
2064        let config3 = config1.clone();
2065        let config4 = config1.clone();
2066
2067        let (events1, events2, events3) = tokio::join!(
2068            async move {
2069                let (tx, rx) = SourceSender::new_test_errors(|_| false);
2070                let (_trigger_shutdown, _shutdown_done) =
2071                    spawn_kafka(tx, config1, true, true, LogNamespace::Legacy);
2072
2073                rx.collect::<Vec<Event>>().await
2074            },
2075            async move {
2076                sleep(Duration::from_millis(delay_ms)).await;
2077                let (tx, rx) = SourceSender::new_test_errors(|_| false);
2078                let (_trigger_shutdown, _shutdown_done) =
2079                    spawn_kafka(tx, config2, true, true, LogNamespace::Legacy);
2080
2081                rx.collect::<Vec<Event>>().await
2082            },
2083            async move {
2084                sleep(Duration::from_millis(delay_ms * 2)).await;
2085                let (tx, rx) = SourceSender::new_test_errors(|_| false);
2086                let (_trigger_shutdown, _shutdown_done) =
2087                    spawn_kafka(tx, config3, true, true, LogNamespace::Legacy);
2088
2089                rx.collect::<Vec<Event>>().await
2090            }
2091        );
2092
2093        let unconsumed = async move {
2094            let (tx, rx) = SourceSender::new_test_errors(|_| false);
2095            let (_trigger_shutdown, _shutdown_done) =
2096                spawn_kafka(tx, config4, true, true, LogNamespace::Legacy);
2097
2098            rx.collect::<Vec<Event>>().await
2099        }
2100        .await;
2101
2102        debug!(
2103            "First consumer read {} of {} messages.",
2104            events1.len(),
2105            expect_count
2106        );
2107
2108        debug!(
2109            "Second consumer read {} of {} messages.",
2110            events2.len(),
2111            expect_count
2112        );
2113        debug!(
2114            "Third consumer read {} of {} messages.",
2115            events3.len(),
2116            expect_count
2117        );
2118
2119        // 5. Total number of events processed should equal the number sent
2120        let total = events1.len() + events2.len() + events3.len();
2121        assert_ne!(
2122            events1.len(),
2123            0,
2124            "First batch of events should be non-zero (increase delay?)"
2125        );
2126        assert_ne!(
2127            events2.len(),
2128            0,
2129            "Second batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2130        );
2131        assert_ne!(
2132            events3.len(),
2133            0,
2134            "Third batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2135        );
2136        assert_eq!(
2137            unconsumed.len(),
2138            0,
2139            "The first set of consumers should consume and ack all messages."
2140        );
2141        assert_eq!(total, expect_count);
2142    }
2143
2144    #[tokio::test]
2145    async fn drains_acknowledgements_during_rebalance_default_assignments() {
2146        // the default, eager rebalance strategies generally result in more revocations
2147        consume_with_rebalance("range,roundrobin".into()).await;
2148    }
2149    #[tokio::test]
2150    async fn drains_acknowledgements_during_rebalance_sticky_assignments() {
2151        // Cooperative rebalance strategies generally result in fewer revokes,
2152        // as only reassigned partitions are revoked
2153        consume_with_rebalance("cooperative-sticky".into()).await;
2154    }
2155
2156    fn map_logs(events: EventArray) -> impl Iterator<Item = String> {
2157        events.into_events().map(|event| {
2158            let log = event.into_log();
2159            format!(
2160                "{} {} {} {}",
2161                log["message"].to_string_lossy(),
2162                log["topic"].to_string_lossy(),
2163                log["partition"].to_string_lossy(),
2164                log["offset"].to_string_lossy(),
2165            )
2166        })
2167    }
2168}