vector/sources/
kafka.rs

1use std::{
2    collections::{HashMap, HashSet},
3    io::Cursor,
4    pin::Pin,
5    sync::{
6        mpsc::{sync_channel, SyncSender},
7        Arc, OnceLock, Weak,
8    },
9    time::Duration,
10};
11
12use async_stream::stream;
13use bytes::Bytes;
14use chrono::{DateTime, TimeZone, Utc};
15use futures::{Stream, StreamExt};
16use futures_util::future::OptionFuture;
17use rdkafka::{
18    consumer::{
19        stream_consumer::StreamPartitionQueue, BaseConsumer, CommitMode, Consumer, ConsumerContext,
20        Rebalance, StreamConsumer,
21    },
22    error::KafkaError,
23    message::{BorrowedMessage, Headers as _, Message},
24    types::RDKafkaErrorCode,
25    ClientConfig, ClientContext, Statistics, TopicPartitionList,
26};
27use serde_with::serde_as;
28use snafu::{ResultExt, Snafu};
29use tokio::{
30    runtime::Handle,
31    sync::{
32        mpsc::{self, UnboundedReceiver, UnboundedSender},
33        oneshot,
34    },
35    task::JoinSet,
36    time::Sleep,
37};
38use tokio_util::codec::FramedRead;
39use tracing::{Instrument, Span};
40use vector_lib::codecs::{
41    decoding::{DeserializerConfig, FramingConfig},
42    StreamDecodingError,
43};
44use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
45
46use vector_lib::configurable::configurable_component;
47use vector_lib::finalizer::OrderedFinalizer;
48use vector_lib::{
49    config::{LegacyKey, LogNamespace},
50    EstimatedJsonEncodedSizeOf,
51};
52use vrl::value::{kind::Collection, Kind, ObjectMap};
53
54use crate::{
55    codecs::{Decoder, DecodingConfig},
56    config::{
57        log_schema, LogSchema, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
58        SourceOutput,
59    },
60    event::{BatchNotifier, BatchStatus, Event, Value},
61    internal_events::{
62        KafkaBytesReceived, KafkaEventsReceived, KafkaOffsetUpdateError, KafkaReadError,
63        StreamClosedError,
64    },
65    kafka,
66    serde::{bool_or_struct, default_decoding, default_framing_message_based},
67    shutdown::ShutdownSignal,
68    SourceSender,
69};
70
71#[derive(Debug, Snafu)]
72enum BuildError {
73    #[snafu(display("The drain_timeout_ms ({}) must be less than session_timeout_ms ({})", value, session_timeout_ms.as_millis()))]
74    InvalidDrainTimeout {
75        value: u64,
76        session_timeout_ms: Duration,
77    },
78    #[snafu(display("Could not create Kafka consumer: {}", source))]
79    CreateError { source: rdkafka::error::KafkaError },
80    #[snafu(display("Could not subscribe to Kafka topics: {}", source))]
81    SubscribeError { source: rdkafka::error::KafkaError },
82}
83
84/// Metrics (beta) configuration.
85#[configurable_component]
86#[derive(Clone, Debug, Default)]
87struct Metrics {
88    /// Expose topic lag metrics for all topics and partitions. Metric names are `kafka_consumer_lag`.
89    pub topic_lag_metric: bool,
90}
91
92/// Configuration for the `kafka` source.
93#[serde_as]
94#[configurable_component(source("kafka", "Collect logs from Apache Kafka."))]
95#[derive(Clone, Debug, Derivative)]
96#[derivative(Default)]
97#[serde(deny_unknown_fields)]
98pub struct KafkaSourceConfig {
99    /// A comma-separated list of Kafka bootstrap servers.
100    ///
101    /// These are the servers in a Kafka cluster that a client should use to bootstrap its connection to the cluster,
102    /// allowing discovery of all the other hosts in the cluster.
103    ///
104    /// Must be in the form of `host:port`, and comma-separated.
105    #[configurable(metadata(docs::examples = "10.14.22.123:9092,10.14.23.332:9092"))]
106    bootstrap_servers: String,
107
108    /// The Kafka topics names to read events from.
109    ///
110    /// Regular expression syntax is supported if the topic begins with `^`.
111    #[configurable(metadata(
112        docs::examples = "^(prefix1|prefix2)-.+",
113        docs::examples = "topic-1",
114        docs::examples = "topic-2"
115    ))]
116    topics: Vec<String>,
117
118    /// The consumer group name to be used to consume events from Kafka.
119    #[configurable(metadata(docs::examples = "consumer-group-name"))]
120    group_id: String,
121
122    /// If offsets for consumer group do not exist, set them using this strategy.
123    ///
124    /// See the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for the `auto.offset.reset` option for further clarification.
125    #[serde(default = "default_auto_offset_reset")]
126    #[configurable(metadata(docs::examples = "example_auto_offset_reset_values()"))]
127    auto_offset_reset: String,
128
129    /// The Kafka session timeout.
130    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
131    #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
132    #[configurable(metadata(docs::advanced))]
133    #[serde(default = "default_session_timeout_ms")]
134    #[configurable(metadata(docs::human_name = "Session Timeout"))]
135    session_timeout_ms: Duration,
136
137    /// Timeout to drain pending acknowledgements during shutdown or a Kafka
138    /// consumer group rebalance.
139    ///
140    /// When Vector shuts down or the Kafka consumer group revokes partitions from this
141    /// consumer, wait a maximum of `drain_timeout_ms` for the source to
142    /// process pending acknowledgements. Must be less than `session_timeout_ms`
143    /// to ensure the consumer is not excluded from the group during a rebalance.
144    ///
145    /// Default value is half of `session_timeout_ms`.
146    #[serde(skip_serializing_if = "Option::is_none")]
147    #[configurable(metadata(docs::examples = 2500, docs::examples = 5000))]
148    #[configurable(metadata(docs::advanced))]
149    #[configurable(metadata(docs::human_name = "Drain Timeout"))]
150    drain_timeout_ms: Option<u64>,
151
152    /// Timeout for network requests.
153    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
154    #[configurable(metadata(docs::examples = 30000, docs::examples = 60000))]
155    #[configurable(metadata(docs::advanced))]
156    #[serde(default = "default_socket_timeout_ms")]
157    #[configurable(metadata(docs::human_name = "Socket Timeout"))]
158    socket_timeout_ms: Duration,
159
160    /// Maximum time the broker may wait to fill the response.
161    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
162    #[configurable(metadata(docs::examples = 50, docs::examples = 100))]
163    #[configurable(metadata(docs::advanced))]
164    #[serde(default = "default_fetch_wait_max_ms")]
165    #[configurable(metadata(docs::human_name = "Max Fetch Wait Time"))]
166    fetch_wait_max_ms: Duration,
167
168    /// The frequency that the consumer offsets are committed (written) to offset storage.
169    #[serde_as(as = "serde_with::DurationMilliSeconds<u64>")]
170    #[serde(default = "default_commit_interval_ms")]
171    #[configurable(metadata(docs::examples = 5000, docs::examples = 10000))]
172    #[configurable(metadata(docs::human_name = "Commit Interval"))]
173    commit_interval_ms: Duration,
174
175    /// Overrides the name of the log field used to add the message key to each event.
176    ///
177    /// The value is the message key of the Kafka message itself.
178    ///
179    /// By default, `"message_key"` is used.
180    #[serde(default = "default_key_field")]
181    #[configurable(metadata(docs::examples = "message_key"))]
182    key_field: OptionalValuePath,
183
184    /// Overrides the name of the log field used to add the topic to each event.
185    ///
186    /// The value is the topic from which the Kafka message was consumed from.
187    ///
188    /// By default, `"topic"` is used.
189    #[serde(default = "default_topic_key")]
190    #[configurable(metadata(docs::examples = "topic"))]
191    topic_key: OptionalValuePath,
192
193    /// Overrides the name of the log field used to add the partition to each event.
194    ///
195    /// The value is the partition from which the Kafka message was consumed from.
196    ///
197    /// By default, `"partition"` is used.
198    #[serde(default = "default_partition_key")]
199    #[configurable(metadata(docs::examples = "partition"))]
200    partition_key: OptionalValuePath,
201
202    /// Overrides the name of the log field used to add the offset to each event.
203    ///
204    /// The value is the offset of the Kafka message itself.
205    ///
206    /// By default, `"offset"` is used.
207    #[serde(default = "default_offset_key")]
208    #[configurable(metadata(docs::examples = "offset"))]
209    offset_key: OptionalValuePath,
210
211    /// Overrides the name of the log field used to add the headers to each event.
212    ///
213    /// The value is the headers of the Kafka message itself.
214    ///
215    /// By default, `"headers"` is used.
216    #[serde(default = "default_headers_key")]
217    #[configurable(metadata(docs::examples = "headers"))]
218    headers_key: OptionalValuePath,
219
220    /// Advanced options set directly on the underlying `librdkafka` client.
221    ///
222    /// See the [librdkafka documentation](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md) for details.
223    #[configurable(metadata(docs::examples = "example_librdkafka_options()"))]
224    #[configurable(metadata(docs::advanced))]
225    #[configurable(metadata(
226        docs::additional_props_description = "A librdkafka configuration option."
227    ))]
228    librdkafka_options: Option<HashMap<String, String>>,
229
230    #[serde(flatten)]
231    auth: kafka::KafkaAuthConfig,
232
233    #[configurable(derived)]
234    #[configurable(metadata(docs::advanced))]
235    #[serde(default = "default_framing_message_based")]
236    #[derivative(Default(value = "default_framing_message_based()"))]
237    framing: FramingConfig,
238
239    #[configurable(derived)]
240    #[serde(default = "default_decoding")]
241    #[derivative(Default(value = "default_decoding()"))]
242    decoding: DeserializerConfig,
243
244    #[configurable(derived)]
245    #[serde(default, deserialize_with = "bool_or_struct")]
246    acknowledgements: SourceAcknowledgementsConfig,
247
248    /// The namespace to use for logs. This overrides the global setting.
249    #[configurable(metadata(docs::hidden))]
250    #[serde(default)]
251    log_namespace: Option<bool>,
252
253    #[configurable(derived)]
254    #[serde(default)]
255    metrics: Metrics,
256}
257
258impl KafkaSourceConfig {
259    fn keys(&self) -> Keys {
260        Keys::from(log_schema(), self)
261    }
262}
263
264const fn default_session_timeout_ms() -> Duration {
265    Duration::from_millis(10000) // default in librdkafka
266}
267
268const fn default_socket_timeout_ms() -> Duration {
269    Duration::from_millis(60000) // default in librdkafka
270}
271
272const fn default_fetch_wait_max_ms() -> Duration {
273    Duration::from_millis(100) // default in librdkafka
274}
275
276const fn default_commit_interval_ms() -> Duration {
277    Duration::from_millis(5000)
278}
279
280fn default_auto_offset_reset() -> String {
281    "largest".into() // default in librdkafka
282}
283
284fn default_key_field() -> OptionalValuePath {
285    OptionalValuePath::from(owned_value_path!("message_key"))
286}
287
288fn default_topic_key() -> OptionalValuePath {
289    OptionalValuePath::from(owned_value_path!("topic"))
290}
291
292fn default_partition_key() -> OptionalValuePath {
293    OptionalValuePath::from(owned_value_path!("partition"))
294}
295
296fn default_offset_key() -> OptionalValuePath {
297    OptionalValuePath::from(owned_value_path!("offset"))
298}
299
300fn default_headers_key() -> OptionalValuePath {
301    OptionalValuePath::from(owned_value_path!("headers"))
302}
303
304const fn example_auto_offset_reset_values() -> [&'static str; 7] {
305    [
306        "smallest",
307        "earliest",
308        "beginning",
309        "largest",
310        "latest",
311        "end",
312        "error",
313    ]
314}
315
316fn example_librdkafka_options() -> HashMap<String, String> {
317    HashMap::<_, _>::from_iter([
318        ("client.id".to_string(), "${ENV_VAR}".to_string()),
319        ("fetch.error.backoff.ms".to_string(), "1000".to_string()),
320        ("socket.send.buffer.bytes".to_string(), "100".to_string()),
321    ])
322}
323
324impl_generate_config_from_default!(KafkaSourceConfig);
325
326#[async_trait::async_trait]
327#[typetag::serde(name = "kafka")]
328impl SourceConfig for KafkaSourceConfig {
329    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
330        let log_namespace = cx.log_namespace(self.log_namespace);
331
332        let decoder =
333            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
334                .build()?;
335        let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
336
337        if let Some(d) = self.drain_timeout_ms {
338            snafu::ensure!(
339                Duration::from_millis(d) <= self.session_timeout_ms,
340                InvalidDrainTimeoutSnafu {
341                    value: d,
342                    session_timeout_ms: self.session_timeout_ms
343                }
344            );
345        }
346
347        let (consumer, callback_rx) = create_consumer(self, acknowledgements)?;
348
349        Ok(Box::pin(kafka_source(
350            self.clone(),
351            consumer,
352            callback_rx,
353            decoder,
354            cx.out,
355            cx.shutdown,
356            false,
357            log_namespace,
358        )))
359    }
360
361    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
362        let log_namespace = global_log_namespace.merge(self.log_namespace);
363        let keys = self.keys();
364
365        let schema_definition = self
366            .decoding
367            .schema_definition(log_namespace)
368            .with_standard_vector_source_metadata()
369            .with_source_metadata(
370                Self::NAME,
371                keys.timestamp.map(LegacyKey::Overwrite),
372                &owned_value_path!("timestamp"),
373                Kind::timestamp(),
374                Some("timestamp"),
375            )
376            .with_source_metadata(
377                Self::NAME,
378                keys.topic.clone().map(LegacyKey::Overwrite),
379                &owned_value_path!("topic"),
380                Kind::bytes(),
381                None,
382            )
383            .with_source_metadata(
384                Self::NAME,
385                keys.partition.clone().map(LegacyKey::Overwrite),
386                &owned_value_path!("partition"),
387                Kind::bytes(),
388                None,
389            )
390            .with_source_metadata(
391                Self::NAME,
392                keys.offset.clone().map(LegacyKey::Overwrite),
393                &owned_value_path!("offset"),
394                Kind::bytes(),
395                None,
396            )
397            .with_source_metadata(
398                Self::NAME,
399                keys.headers.clone().map(LegacyKey::Overwrite),
400                &owned_value_path!("headers"),
401                Kind::object(Collection::empty().with_unknown(Kind::bytes())),
402                None,
403            )
404            .with_source_metadata(
405                Self::NAME,
406                keys.key_field.clone().map(LegacyKey::Overwrite),
407                &owned_value_path!("message_key"),
408                Kind::bytes(),
409                None,
410            );
411
412        vec![SourceOutput::new_maybe_logs(
413            self.decoding.output_type(),
414            schema_definition,
415        )]
416    }
417
418    fn can_acknowledge(&self) -> bool {
419        true
420    }
421}
422
423#[allow(clippy::too_many_arguments)]
424async fn kafka_source(
425    config: KafkaSourceConfig,
426    consumer: StreamConsumer<KafkaSourceContext>,
427    callback_rx: UnboundedReceiver<KafkaCallback>,
428    decoder: Decoder,
429    out: SourceSender,
430    shutdown: ShutdownSignal,
431    eof: bool,
432    log_namespace: LogNamespace,
433) -> Result<(), ()> {
434    let span = info_span!("kafka_source");
435    let consumer = Arc::new(consumer);
436
437    consumer
438        .context()
439        .consumer
440        .set(Arc::downgrade(&consumer))
441        .expect("Error setting up consumer context.");
442
443    // EOF signal allowing the coordination task to tell the kafka client task when all partitions have reached EOF
444    let (eof_tx, eof_rx) = eof.then(oneshot::channel::<()>).unzip();
445
446    let topics: Vec<&str> = config.topics.iter().map(|s| s.as_str()).collect();
447    if let Err(e) = consumer.subscribe(&topics).context(SubscribeSnafu) {
448        error!("{}", e);
449        return Err(());
450    }
451
452    let coordination_task = {
453        let span = span.clone();
454        let consumer = Arc::clone(&consumer);
455        let drain_timeout_ms = config
456            .drain_timeout_ms
457            .map_or(config.session_timeout_ms / 2, Duration::from_millis);
458        let consumer_state =
459            ConsumerStateInner::<Consuming>::new(config, decoder, out, log_namespace, span);
460        tokio::spawn(async move {
461            coordinate_kafka_callbacks(
462                consumer,
463                callback_rx,
464                consumer_state,
465                drain_timeout_ms,
466                eof_tx,
467            )
468            .await;
469        })
470    };
471
472    let client_task = {
473        let consumer = Arc::clone(&consumer);
474        tokio::task::spawn_blocking(move || {
475            let _enter = span.enter();
476            drive_kafka_consumer(consumer, shutdown, eof_rx);
477        })
478    };
479
480    _ = tokio::join!(client_task, coordination_task);
481    consumer.context().commit_consumer_state();
482
483    Ok(())
484}
485
486/// ConsumerStateInner implements a small struct/enum-based state machine.
487///
488/// With a ConsumerStateInner<Consuming>, the client is able to spawn new tasks
489/// when partitions are assigned. When a shutdown signal is received, or
490/// partitions are being revoked, the Consuming state is traded for a Draining
491/// state (and associated drain deadline future) via the `begin_drain` method
492///
493/// A ConsumerStateInner<Draining> keeps track of partitions that are expected
494/// to complete, and also owns the signal that, when dropped, indicates to the
495/// client driver task that it is safe to proceed with the rebalance or shutdown.
496/// When draining is complete, or the deadline is reached, Draining is traded in for
497/// either a Consuming (after a revoke) or Complete (in the case of shutdown) state,
498/// via the `finish_drain` method.
499///
500/// A ConsumerStateInner<Complete> is the final state, reached after a shutdown
501/// signal is received. This can not be traded for another state, and the
502/// coordination task should exit when this state is reached.
503struct ConsumerStateInner<S> {
504    config: KafkaSourceConfig,
505    decoder: Decoder,
506    out: SourceSender,
507    log_namespace: LogNamespace,
508    consumer_state: S,
509}
510struct Consuming {
511    /// The source's tracing Span used to instrument metrics emitted by consumer tasks
512    span: Span,
513}
514struct Draining {
515    /// The rendezvous channel sender from the revoke or shutdown callback. Sending on this channel
516    /// indicates to the kafka client task that one or more partitions have been drained, while
517    /// closing this channel indicates that all expected partitions have drained, or the drain
518    /// timeout has been reached.
519    signal: SyncSender<()>,
520
521    /// The set of topic-partition tasks that are required to complete during
522    /// the draining phase, populated at the beginning of a rebalance or shutdown.
523    /// Partitions that are being revoked, but not being actively consumed
524    /// (e.g. due to the consumer task exiting early) should not be included.
525    /// The draining phase is considered complete when this set is empty.
526    expect_drain: HashSet<TopicPartition>,
527
528    /// Whether the client is shutting down after draining. If set to true,
529    /// the `finish_drain` method will return a Complete state, otherwise
530    /// a Consuming state.
531    shutdown: bool,
532
533    /// The source's tracing Span used to instrument metrics emitted by consumer tasks
534    span: Span,
535}
536type OptionDeadline = OptionFuture<Pin<Box<Sleep>>>;
537enum ConsumerState {
538    Consuming(ConsumerStateInner<Consuming>),
539    Draining(ConsumerStateInner<Draining>),
540    Complete,
541}
542impl Draining {
543    fn new(signal: SyncSender<()>, shutdown: bool, span: Span) -> Self {
544        Self {
545            signal,
546            shutdown,
547            expect_drain: HashSet::new(),
548            span,
549        }
550    }
551
552    fn is_complete(&self) -> bool {
553        self.expect_drain.is_empty()
554    }
555}
556
557impl<C> ConsumerStateInner<C> {
558    fn complete(self, _deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
559        (None.into(), ConsumerState::Complete)
560    }
561}
562
563impl ConsumerStateInner<Consuming> {
564    const fn new(
565        config: KafkaSourceConfig,
566        decoder: Decoder,
567        out: SourceSender,
568        log_namespace: LogNamespace,
569        span: Span,
570    ) -> Self {
571        Self {
572            config,
573            decoder,
574            out,
575            log_namespace,
576            consumer_state: Consuming { span },
577        }
578    }
579
580    /// Spawn a task on the provided JoinSet to consume the kafka StreamPartitionQueue, and handle
581    /// acknowledgements for the messages consumed Returns a channel sender that can be used to
582    /// signal that the consumer should stop and drain pending acknowledgements, and an AbortHandle
583    /// that can be used to forcefully end the task.
584    fn consume_partition(
585        &self,
586        join_set: &mut JoinSet<(TopicPartition, PartitionConsumerStatus)>,
587        tp: TopicPartition,
588        consumer: Arc<StreamConsumer<KafkaSourceContext>>,
589        p: StreamPartitionQueue<KafkaSourceContext>,
590        acknowledgements: bool,
591        exit_eof: bool,
592    ) -> (oneshot::Sender<()>, tokio::task::AbortHandle) {
593        let keys = self.config.keys();
594        let decoder = self.decoder.clone();
595        let log_namespace = self.log_namespace;
596        let mut out = self.out.clone();
597
598        let (end_tx, mut end_signal) = oneshot::channel::<()>();
599
600        let handle = join_set.spawn(async move {
601            let mut messages = p.stream();
602            let (finalizer, mut ack_stream) = OrderedFinalizer::<FinalizerEntry>::new(None);
603
604            // finalizer is the entry point for new pending acknowledgements;
605            // when it is dropped, no new messages will be consumed, and the
606            // task will end when it reaches the end of ack_stream
607            let mut finalizer = Some(finalizer);
608
609            let mut status = PartitionConsumerStatus::NormalExit;
610
611            loop {
612                tokio::select!(
613                    // Make sure to handle the acknowledgement stream before new messages to prevent
614                    // unbounded memory growth caused by those acks being handled slower than
615                    // incoming messages when the load is high.
616                    biased;
617
618                    // is_some() checks prevent polling end_signal after it completes
619                    _ = &mut end_signal, if finalizer.is_some() => {
620                        finalizer.take();
621                    },
622
623                    ack = ack_stream.next() => match ack {
624                        Some((status, entry)) => {
625                            if status == BatchStatus::Delivered {
626                                if let Err(error) =  consumer.store_offset(&entry.topic, entry.partition, entry.offset) {
627                                    emit!(KafkaOffsetUpdateError { error });
628                                }
629                            }
630                        }
631                        None if finalizer.is_none() => {
632                            debug!("Acknowledgement stream complete for partition {}:{}.", &tp.0, tp.1);
633                            break
634                        }
635                        None => {
636                            debug!("Acknowledgement stream empty for {}:{}", &tp.0, tp.1);
637                        }
638                    },
639
640                    message = messages.next(), if finalizer.is_some() => match message {
641                        None => unreachable!("MessageStream never calls Ready(None)"),
642                        Some(Err(error)) => match error {
643                            rdkafka::error::KafkaError::PartitionEOF(partition) if exit_eof => {
644                                debug!("EOF for partition {}.", partition);
645                                status = PartitionConsumerStatus::PartitionEOF;
646                                finalizer.take();
647                            },
648                            _ => emit!(KafkaReadError { error }),
649                        },
650                        Some(Ok(msg)) => {
651                            emit!(KafkaBytesReceived {
652                                byte_size: msg.payload_len(),
653                                protocol: "tcp",
654                                topic: msg.topic(),
655                                partition: msg.partition(),
656                            });
657                            parse_message(msg, decoder.clone(), &keys, &mut out, acknowledgements, &finalizer, log_namespace).await;
658                        }
659                    },
660                )
661            }
662            (tp, status)
663        }.instrument(self.consumer_state.span.clone()));
664        (end_tx, handle)
665    }
666
667    /// Consume self, and return a "Draining" ConsumerState, along with a Future
668    /// representing a drain deadline, based on max_drain_ms
669    fn begin_drain(
670        self,
671        max_drain_ms: Duration,
672        sig: SyncSender<()>,
673        shutdown: bool,
674    ) -> (OptionDeadline, ConsumerStateInner<Draining>) {
675        let deadline = Box::pin(tokio::time::sleep(max_drain_ms));
676
677        let draining = ConsumerStateInner {
678            config: self.config,
679            decoder: self.decoder,
680            out: self.out,
681            log_namespace: self.log_namespace,
682            consumer_state: Draining::new(sig, shutdown, self.consumer_state.span),
683        };
684
685        (Some(deadline).into(), draining)
686    }
687
688    pub const fn keep_consuming(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
689        (deadline, ConsumerState::Consuming(self))
690    }
691}
692
693impl ConsumerStateInner<Draining> {
694    /// Mark the given TopicPartition as being revoked, adding it to the set of
695    /// partitions expected to drain
696    fn revoke_partition(&mut self, tp: TopicPartition, end_signal: oneshot::Sender<()>) {
697        // Note that if this send() returns Err, it means the task has already
698        // ended, but the completion has not been processed yet (otherwise we wouldn't have access to the end_signal),
699        // so we should still add it to the "expect to drain" set
700        _ = end_signal.send(());
701        self.consumer_state.expect_drain.insert(tp);
702    }
703
704    /// Add the given TopicPartition to the set of known "drained" partitions,
705    /// i.e. the consumer has drained the acknowledgement channel. A signal is
706    /// sent on the signal channel, indicating to the client that offsets may be committed
707    fn partition_drained(&mut self, tp: TopicPartition) {
708        // This send() will only return Err if the receiver has already been disconnected (i.e. the
709        // kafka client task is no longer running)
710        _ = self.consumer_state.signal.send(());
711        self.consumer_state.expect_drain.remove(&tp);
712    }
713
714    /// Return true if all expected partitions have drained
715    fn is_drain_complete(&self) -> bool {
716        self.consumer_state.is_complete()
717    }
718
719    /// Finish partition drain mode. Consumes self and the drain deadline
720    /// future, and returns a "Consuming" or "Complete" ConsumerState
721    fn finish_drain(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
722        if self.consumer_state.shutdown {
723            self.complete(deadline)
724        } else {
725            (
726                None.into(),
727                ConsumerState::Consuming(ConsumerStateInner {
728                    config: self.config,
729                    decoder: self.decoder,
730                    out: self.out,
731                    log_namespace: self.log_namespace,
732                    consumer_state: Consuming {
733                        span: self.consumer_state.span,
734                    },
735                }),
736            )
737        }
738    }
739
740    pub const fn keep_draining(self, deadline: OptionDeadline) -> (OptionDeadline, ConsumerState) {
741        (deadline, ConsumerState::Draining(self))
742    }
743}
744
745async fn coordinate_kafka_callbacks(
746    consumer: Arc<StreamConsumer<KafkaSourceContext>>,
747    mut callbacks: UnboundedReceiver<KafkaCallback>,
748    consumer_state: ConsumerStateInner<Consuming>,
749    max_drain_ms: Duration,
750    mut eof: Option<oneshot::Sender<()>>,
751) {
752    let mut drain_deadline: OptionFuture<_> = None.into();
753    let mut consumer_state = ConsumerState::Consuming(consumer_state);
754
755    // A oneshot channel is used for each consumed partition, so that we can
756    // signal to that task to stop consuming, drain pending acks, and exit
757    let mut end_signals: HashMap<TopicPartition, oneshot::Sender<()>> = HashMap::new();
758
759    // The set of consumer tasks, each consuming a specific partition. The task
760    // is both consuming the messages (passing them to the output stream) _and_
761    // processing the corresponding acknowledgement stream. A consumer task
762    // should completely drain its acknowledgement stream after receiving an end signal
763    let mut partition_consumers: JoinSet<(TopicPartition, PartitionConsumerStatus)> =
764        Default::default();
765
766    // Handles that will let us end any consumer task that exceeds a drain deadline
767    let mut abort_handles: HashMap<TopicPartition, tokio::task::AbortHandle> = HashMap::new();
768
769    let exit_eof = eof.is_some();
770
771    while let ConsumerState::Consuming(_) | ConsumerState::Draining(_) = consumer_state {
772        tokio::select! {
773            Some(Ok((finished_partition, status))) = partition_consumers.join_next(), if !partition_consumers.is_empty() => {
774                debug!("Partition consumer finished for {}:{}", &finished_partition.0, finished_partition.1);
775                // If this task ended on its own, the end_signal for it will still be in here.
776                end_signals.remove(&finished_partition);
777                abort_handles.remove(&finished_partition);
778
779                (drain_deadline, consumer_state) = match consumer_state {
780                    ConsumerState::Complete => unreachable!("Partition consumer finished after completion."),
781                    ConsumerState::Draining(mut state) => {
782                        state.partition_drained(finished_partition);
783
784                        if state.is_drain_complete() {
785                            debug!("All expected partitions have drained.");
786                            state.finish_drain(drain_deadline)
787                        } else {
788                            state.keep_draining(drain_deadline)
789                        }
790                    },
791                    ConsumerState::Consuming(state) => {
792                        // If we are here, it is likely because the consumer
793                        // tasks are set up to exit upon reaching the end of the
794                        // partition.
795                        if !exit_eof {
796                            debug!("Partition consumer task finished, while not in draining mode.");
797                        }
798                        state.keep_consuming(drain_deadline)
799                    },
800                };
801
802                // PartitionConsumerStatus differentiates between a task that exited after
803                // being signaled to end, and one that reached the end of its partition and
804                // was configured to exit. After the last such task ends, we signal the kafka
805                // driver task to shut down the main consumer too. Note this is only used in tests.
806                if exit_eof && status == PartitionConsumerStatus::PartitionEOF && partition_consumers.is_empty() {
807                    debug!("All partitions have exited or reached EOF.");
808                    let _ = eof.take().map(|e| e.send(()));
809                }
810            },
811            Some(callback) = callbacks.recv() => match callback {
812                KafkaCallback::PartitionsAssigned(mut assigned_partitions, done) => match consumer_state {
813                    ConsumerState::Complete => unreachable!("Partition assignment received after completion."),
814                    ConsumerState::Draining(_) => error!("Partition assignment received while draining revoked partitions, maybe an invalid assignment."),
815                    ConsumerState::Consuming(ref consumer_state) => {
816                        let acks = consumer.context().acknowledgements;
817                        for tp in assigned_partitions.drain(0..) {
818                            let topic = tp.0.as_str();
819                            let partition = tp.1;
820                            match consumer.split_partition_queue(topic, partition) { Some(pq) => {
821                                debug!("Consuming partition {}:{}.", &tp.0, tp.1);
822                                let (end_tx, handle) = consumer_state.consume_partition(&mut partition_consumers, tp.clone(), Arc::clone(&consumer), pq, acks, exit_eof);
823                                abort_handles.insert(tp.clone(), handle);
824                                end_signals.insert(tp, end_tx);
825                            } _ => {
826                                warn!("Failed to get queue for assigned partition {}:{}.", &tp.0, tp.1);
827                            }}
828                        }
829                        // ensure this is retained until all individual queues are set up
830                        drop(done);
831                    }
832                },
833                KafkaCallback::PartitionsRevoked(mut revoked_partitions, drain) => (drain_deadline, consumer_state) = match consumer_state {
834                    ConsumerState::Complete => unreachable!("Partitions revoked after completion."),
835                    ConsumerState::Draining(d) => {
836                        // NB: This would only happen if the task driving the kafka client (i.e. rebalance handlers)
837                        // is not handling shutdown signals, and a revoke happens during a shutdown drain; otherwise
838                        // this is unreachable code.
839                        warn!("Kafka client is already draining revoked partitions.");
840                        d.keep_draining(drain_deadline)
841                    },
842                    ConsumerState::Consuming(state) => {
843                        let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, false);
844
845                        for tp in revoked_partitions.drain(0..) {
846                            match end_signals.remove(&tp) { Some(end) => {
847                                debug!("Revoking partition {}:{}", &tp.0, tp.1);
848                                state.revoke_partition(tp, end);
849                            } _ => {
850                                debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
851                            }}
852                        }
853
854                        state.keep_draining(deadline)
855                    }
856                },
857                KafkaCallback::ShuttingDown(drain) => (drain_deadline, consumer_state) = match consumer_state {
858                    ConsumerState::Complete => unreachable!("Shutdown received after completion."),
859                    // Shutting down is just like a full assignment revoke, but we also close the
860                    // callback channels, since we don't expect additional assignments or rebalances
861                    ConsumerState::Draining(state) => {
862                        // NB: This would only happen if the task driving the kafka client is
863                        // not handling shutdown signals; otherwise this is unreachable code
864                        error!("Kafka client handled a shutdown signal while a rebalance was in progress.");
865                        callbacks.close();
866                        state.keep_draining(drain_deadline)
867                    },
868                    ConsumerState::Consuming(state) => {
869                        callbacks.close();
870                        let (deadline, mut state) = state.begin_drain(max_drain_ms, drain, true);
871                        if let Ok(tpl) = consumer.assignment() {
872                            // TODO  workaround for https://github.com/fede1024/rust-rdkafka/issues/681
873                            if tpl.capacity() == 0 {
874                                return;
875                            }
876                            tpl.elements()
877                                .iter()
878                                .for_each(|el| {
879
880                                let tp: TopicPartition = (el.topic().into(), el.partition());
881                                match end_signals.remove(&tp) { Some(end) => {
882                                    debug!("Shutting down and revoking partition {}:{}", &tp.0, tp.1);
883                                    state.revoke_partition(tp, end);
884                                } _ => {
885                                    debug!("Consumer task for partition {}:{} already finished.", &tp.0, tp.1);
886                                }}
887                            });
888                        }
889                        // If shutdown was initiated by partition EOF mode, the drain phase
890                        // will already be complete and would time out if not accounted for here
891                        if state.is_drain_complete() {
892                            state.finish_drain(deadline)
893                        } else {
894                            state.keep_draining(deadline)
895                        }
896                    }
897                },
898            },
899
900            Some(_) = &mut drain_deadline => (drain_deadline, consumer_state) = match consumer_state {
901                ConsumerState::Complete => unreachable!("Drain deadline received after completion."),
902                ConsumerState::Consuming(state) => {
903                    warn!("A drain deadline fired outside of draining mode.");
904                    state.keep_consuming(None.into())
905                },
906                ConsumerState::Draining(mut draining) => {
907                    debug!("Acknowledgement drain deadline reached. Dropping any pending ack streams for revoked partitions.");
908                    for tp in draining.consumer_state.expect_drain.drain() {
909                        if let Some(handle) = abort_handles.remove(&tp) {
910                            handle.abort();
911                        }
912                    }
913                    draining.finish_drain(drain_deadline)
914                }
915            },
916        }
917    }
918}
919
920fn drive_kafka_consumer(
921    consumer: Arc<StreamConsumer<KafkaSourceContext>>,
922    mut shutdown: ShutdownSignal,
923    eof: Option<oneshot::Receiver<()>>,
924) {
925    Handle::current().block_on(async move {
926        let mut eof: OptionFuture<_> = eof.into();
927        let mut stream = consumer.stream();
928        loop {
929            tokio::select! {
930                _ = &mut shutdown => {
931                    consumer.context().shutdown();
932                    break
933                },
934
935                Some(_) = &mut eof => {
936                    consumer.context().shutdown();
937                    break
938                },
939
940                // NB: messages are not received on this thread, however we poll
941                // the consumer to serve client callbacks, such as rebalance notifications
942                message = stream.next() => match message {
943                    None => unreachable!("MessageStream never returns Ready(None)"),
944                    Some(Err(error)) => emit!(KafkaReadError { error }),
945                    Some(Ok(_msg)) => {
946                        unreachable!("Messages are consumed in dedicated tasks for each partition.")
947                    }
948                },
949            }
950        }
951    });
952}
953
954async fn parse_message(
955    msg: BorrowedMessage<'_>,
956    decoder: Decoder,
957    keys: &'_ Keys,
958    out: &mut SourceSender,
959    acknowledgements: bool,
960    finalizer: &Option<OrderedFinalizer<FinalizerEntry>>,
961    log_namespace: LogNamespace,
962) {
963    if let Some((count, stream)) = parse_stream(&msg, decoder, keys, log_namespace) {
964        let (batch, receiver) = BatchNotifier::new_with_receiver();
965        let mut stream = stream.map(|event| {
966            // All acknowledgements flow through the normal Finalizer stream so
967            // that they can be handled in one place, but are only tied to the
968            // batch when acknowledgements are enabled
969            if acknowledgements {
970                event.with_batch_notifier(&batch)
971            } else {
972                event
973            }
974        });
975        match out.send_event_stream(&mut stream).await {
976            Err(_) => {
977                emit!(StreamClosedError { count });
978            }
979            Ok(_) => {
980                // Drop stream to avoid borrowing `msg`: "[...] borrow might be used
981                // here, when `stream` is dropped and runs the destructor [...]".
982                drop(stream);
983                if let Some(f) = finalizer.as_ref() {
984                    f.add(msg.into(), receiver)
985                }
986            }
987        }
988    }
989}
990
991// Turn the received message into a stream of parsed events.
992fn parse_stream<'a>(
993    msg: &BorrowedMessage<'a>,
994    decoder: Decoder,
995    keys: &'a Keys,
996    log_namespace: LogNamespace,
997) -> Option<(usize, impl Stream<Item = Event> + 'a + use<'a>)> {
998    let payload = msg.payload()?; // skip messages with empty payload
999
1000    let rmsg = ReceivedMessage::from(msg);
1001
1002    let payload = Cursor::new(Bytes::copy_from_slice(payload));
1003
1004    let mut stream = FramedRead::with_capacity(payload, decoder, msg.payload_len());
1005    let (count, _) = stream.size_hint();
1006    let stream = stream! {
1007        while let Some(result) = stream.next().await {
1008            match result {
1009                Ok((events, _byte_size)) => {
1010                    emit!(KafkaEventsReceived {
1011                        count: events.len(),
1012                        byte_size: events.estimated_json_encoded_size_of(),
1013                        topic: &rmsg.topic,
1014                        partition: rmsg.partition,
1015                    });
1016                    for mut event in events {
1017                        rmsg.apply(keys, &mut event, log_namespace);
1018                        yield event;
1019                    }
1020                },
1021                Err(error) => {
1022                    // Error is logged by `codecs::Decoder`, no further handling
1023                    // is needed here.
1024                    if !error.can_continue() {
1025                        break;
1026                    }
1027                }
1028            }
1029        }
1030    }
1031    .boxed();
1032    Some((count, stream))
1033}
1034
1035#[derive(Clone, Debug)]
1036struct Keys {
1037    timestamp: Option<OwnedValuePath>,
1038    key_field: Option<OwnedValuePath>,
1039    topic: Option<OwnedValuePath>,
1040    partition: Option<OwnedValuePath>,
1041    offset: Option<OwnedValuePath>,
1042    headers: Option<OwnedValuePath>,
1043}
1044
1045impl Keys {
1046    fn from(schema: &LogSchema, config: &KafkaSourceConfig) -> Self {
1047        Self {
1048            timestamp: schema.timestamp_key().cloned(),
1049            key_field: config.key_field.path.clone(),
1050            topic: config.topic_key.path.clone(),
1051            partition: config.partition_key.path.clone(),
1052            offset: config.offset_key.path.clone(),
1053            headers: config.headers_key.path.clone(),
1054        }
1055    }
1056}
1057
1058struct ReceivedMessage {
1059    timestamp: Option<DateTime<Utc>>,
1060    key: Value,
1061    headers: ObjectMap,
1062    topic: String,
1063    partition: i32,
1064    offset: i64,
1065}
1066
1067impl ReceivedMessage {
1068    fn from(msg: &BorrowedMessage<'_>) -> Self {
1069        // Extract timestamp from kafka message
1070        let timestamp = msg
1071            .timestamp()
1072            .to_millis()
1073            .and_then(|millis| Utc.timestamp_millis_opt(millis).latest());
1074
1075        let key = msg
1076            .key()
1077            .map(|key| Value::from(Bytes::from(key.to_owned())))
1078            .unwrap_or(Value::Null);
1079
1080        let mut headers_map = ObjectMap::new();
1081        if let Some(headers) = msg.headers() {
1082            for header in headers.iter() {
1083                if let Some(value) = header.value {
1084                    headers_map.insert(
1085                        header.key.into(),
1086                        Value::from(Bytes::from(value.to_owned())),
1087                    );
1088                }
1089            }
1090        }
1091
1092        Self {
1093            timestamp,
1094            key,
1095            headers: headers_map,
1096            topic: msg.topic().to_string(),
1097            partition: msg.partition(),
1098            offset: msg.offset(),
1099        }
1100    }
1101
1102    fn apply(&self, keys: &Keys, event: &mut Event, log_namespace: LogNamespace) {
1103        if let Event::Log(log) = event {
1104            match log_namespace {
1105                LogNamespace::Vector => {
1106                    // We'll only use this function in Vector namespaces because we don't want
1107                    // "timestamp" to be set automatically in legacy namespaces. In legacy
1108                    // namespaces, the "timestamp" field corresponds to the Kafka message, not the
1109                    // timestamp when the event was processed.
1110                    log_namespace.insert_standard_vector_source_metadata(
1111                        log,
1112                        KafkaSourceConfig::NAME,
1113                        Utc::now(),
1114                    );
1115                }
1116                LogNamespace::Legacy => {
1117                    if let Some(source_type_key) = log_schema().source_type_key_target_path() {
1118                        log.insert(source_type_key, KafkaSourceConfig::NAME);
1119                    }
1120                }
1121            }
1122
1123            log_namespace.insert_source_metadata(
1124                KafkaSourceConfig::NAME,
1125                log,
1126                keys.key_field.as_ref().map(LegacyKey::Overwrite),
1127                path!("message_key"),
1128                self.key.clone(),
1129            );
1130
1131            log_namespace.insert_source_metadata(
1132                KafkaSourceConfig::NAME,
1133                log,
1134                keys.timestamp.as_ref().map(LegacyKey::Overwrite),
1135                path!("timestamp"),
1136                self.timestamp,
1137            );
1138
1139            log_namespace.insert_source_metadata(
1140                KafkaSourceConfig::NAME,
1141                log,
1142                keys.topic.as_ref().map(LegacyKey::Overwrite),
1143                path!("topic"),
1144                self.topic.clone(),
1145            );
1146
1147            log_namespace.insert_source_metadata(
1148                KafkaSourceConfig::NAME,
1149                log,
1150                keys.partition.as_ref().map(LegacyKey::Overwrite),
1151                path!("partition"),
1152                self.partition,
1153            );
1154
1155            log_namespace.insert_source_metadata(
1156                KafkaSourceConfig::NAME,
1157                log,
1158                keys.offset.as_ref().map(LegacyKey::Overwrite),
1159                path!("offset"),
1160                self.offset,
1161            );
1162
1163            log_namespace.insert_source_metadata(
1164                KafkaSourceConfig::NAME,
1165                log,
1166                keys.headers.as_ref().map(LegacyKey::Overwrite),
1167                path!("headers"),
1168                self.headers.clone(),
1169            );
1170        }
1171    }
1172}
1173
1174#[derive(Debug, Eq, PartialEq, Hash)]
1175struct FinalizerEntry {
1176    topic: String,
1177    partition: i32,
1178    offset: i64,
1179}
1180
1181impl<'a> From<BorrowedMessage<'a>> for FinalizerEntry {
1182    fn from(msg: BorrowedMessage<'a>) -> Self {
1183        Self {
1184            topic: msg.topic().into(),
1185            partition: msg.partition(),
1186            offset: msg.offset(),
1187        }
1188    }
1189}
1190
1191fn create_consumer(
1192    config: &KafkaSourceConfig,
1193    acknowledgements: bool,
1194) -> crate::Result<(
1195    StreamConsumer<KafkaSourceContext>,
1196    UnboundedReceiver<KafkaCallback>,
1197)> {
1198    let mut client_config = ClientConfig::new();
1199    client_config
1200        .set("group.id", &config.group_id)
1201        .set("bootstrap.servers", &config.bootstrap_servers)
1202        .set("auto.offset.reset", &config.auto_offset_reset)
1203        .set(
1204            "session.timeout.ms",
1205            config.session_timeout_ms.as_millis().to_string(),
1206        )
1207        .set(
1208            "socket.timeout.ms",
1209            config.socket_timeout_ms.as_millis().to_string(),
1210        )
1211        .set(
1212            "fetch.wait.max.ms",
1213            config.fetch_wait_max_ms.as_millis().to_string(),
1214        )
1215        .set("enable.partition.eof", "false")
1216        .set("enable.auto.commit", "true")
1217        .set(
1218            "auto.commit.interval.ms",
1219            config.commit_interval_ms.as_millis().to_string(),
1220        )
1221        .set("enable.auto.offset.store", "false")
1222        .set("statistics.interval.ms", "1000")
1223        .set("client.id", "vector");
1224
1225    config.auth.apply(&mut client_config)?;
1226
1227    if let Some(librdkafka_options) = &config.librdkafka_options {
1228        for (key, value) in librdkafka_options {
1229            client_config.set(key.as_str(), value.as_str());
1230        }
1231    }
1232
1233    let (callbacks, callback_rx) = mpsc::unbounded_channel();
1234    let consumer = client_config
1235        .create_with_context::<_, StreamConsumer<_>>(KafkaSourceContext::new(
1236            config.metrics.topic_lag_metric,
1237            acknowledgements,
1238            callbacks,
1239            Span::current(),
1240        ))
1241        .context(CreateSnafu)?;
1242
1243    Ok((consumer, callback_rx))
1244}
1245
1246type TopicPartition = (String, i32);
1247
1248/// Status returned by partition consumer tasks, allowing the coordination task
1249/// to differentiate between a consumer exiting normally (after receiving an end
1250/// signal) and exiting when it reaches the end of a partition
1251#[derive(PartialEq)]
1252enum PartitionConsumerStatus {
1253    NormalExit,
1254    PartitionEOF,
1255}
1256
1257enum KafkaCallback {
1258    PartitionsAssigned(Vec<TopicPartition>, SyncSender<()>),
1259    PartitionsRevoked(Vec<TopicPartition>, SyncSender<()>),
1260    ShuttingDown(SyncSender<()>),
1261}
1262
1263struct KafkaSourceContext {
1264    acknowledgements: bool,
1265    stats: kafka::KafkaStatisticsContext,
1266
1267    /// A callback channel used to coordinate between the main consumer task and the acknowledgement task
1268    callbacks: UnboundedSender<KafkaCallback>,
1269
1270    /// A weak reference to the consumer, so that we can commit offsets during a rebalance operation
1271    consumer: OnceLock<Weak<StreamConsumer<KafkaSourceContext>>>,
1272}
1273
1274impl KafkaSourceContext {
1275    fn new(
1276        expose_lag_metrics: bool,
1277        acknowledgements: bool,
1278        callbacks: UnboundedSender<KafkaCallback>,
1279        span: Span,
1280    ) -> Self {
1281        Self {
1282            stats: kafka::KafkaStatisticsContext {
1283                expose_lag_metrics,
1284                span,
1285            },
1286            acknowledgements,
1287            consumer: OnceLock::default(),
1288            callbacks,
1289        }
1290    }
1291
1292    fn shutdown(&self) {
1293        let (send, rendezvous) = sync_channel(0);
1294        if self
1295            .callbacks
1296            .send(KafkaCallback::ShuttingDown(send))
1297            .is_ok()
1298        {
1299            while rendezvous.recv().is_ok() {
1300                self.commit_consumer_state();
1301            }
1302        }
1303    }
1304
1305    /// Emit a PartitionsAssigned callback with the topic-partitions to be consumed,
1306    /// and block until confirmation is received that a stream and consumer for
1307    /// each topic-partition has been set up. This function blocks until the
1308    /// rendezvous channel sender is dropped by the callback handler.
1309    fn consume_partitions(&self, tpl: &TopicPartitionList) {
1310        // TODO  workaround for https://github.com/fede1024/rust-rdkafka/issues/681
1311        if tpl.capacity() == 0 {
1312            return;
1313        }
1314        let (send, rendezvous) = sync_channel(0);
1315        let _ = self.callbacks.send(KafkaCallback::PartitionsAssigned(
1316            tpl.elements()
1317                .iter()
1318                .map(|tp| (tp.topic().into(), tp.partition()))
1319                .collect(),
1320            send,
1321        ));
1322
1323        while rendezvous.recv().is_ok() {
1324            // no-op: wait for partition assignment handler to complete
1325        }
1326    }
1327
1328    /// Emit a PartitionsRevoked callback and block until confirmation is
1329    /// received that acknowledgements have been processed for each of them.
1330    /// The rendezvous channel used in the callback can send multiple times to
1331    /// signal individual partitions completing. This function blocks until the
1332    /// sender is dropped by the callback handler.
1333    fn revoke_partitions(&self, tpl: &TopicPartitionList) {
1334        let (send, rendezvous) = sync_channel(0);
1335        let _ = self.callbacks.send(KafkaCallback::PartitionsRevoked(
1336            tpl.elements()
1337                .iter()
1338                .map(|tp| (tp.topic().into(), tp.partition()))
1339                .collect(),
1340            send,
1341        ));
1342
1343        while rendezvous.recv().is_ok() {
1344            self.commit_consumer_state();
1345        }
1346    }
1347
1348    fn commit_consumer_state(&self) {
1349        if let Some(consumer) = self
1350            .consumer
1351            .get()
1352            .expect("Consumer reference was not initialized.")
1353            .upgrade()
1354        {
1355            match consumer.commit_consumer_state(CommitMode::Sync) {
1356                Ok(_) | Err(KafkaError::ConsumerCommit(RDKafkaErrorCode::NoOffset)) => {
1357                    /* Success, or nothing to do - yay \0/ */
1358                }
1359                Err(error) => emit!(KafkaOffsetUpdateError { error }),
1360            }
1361        }
1362    }
1363}
1364
1365impl ClientContext for KafkaSourceContext {
1366    fn stats(&self, statistics: Statistics) {
1367        self.stats.stats(statistics)
1368    }
1369}
1370
1371impl ConsumerContext for KafkaSourceContext {
1372    fn pre_rebalance(&self, _base_consumer: &BaseConsumer<Self>, rebalance: &Rebalance) {
1373        match rebalance {
1374            Rebalance::Assign(tpl) => self.consume_partitions(tpl),
1375
1376            Rebalance::Revoke(tpl) => {
1377                self.revoke_partitions(tpl);
1378                self.commit_consumer_state();
1379            }
1380
1381            Rebalance::Error(message) => {
1382                error!("Error during Kafka consumer group rebalance: {}.", message);
1383            }
1384        }
1385    }
1386}
1387
1388#[cfg(test)]
1389mod test {
1390    use vector_lib::lookup::OwnedTargetPath;
1391    use vector_lib::schema::Definition;
1392
1393    use super::*;
1394
1395    pub fn kafka_host() -> String {
1396        std::env::var("KAFKA_HOST").unwrap_or_else(|_| "localhost".into())
1397    }
1398    pub fn kafka_port() -> u16 {
1399        let port = std::env::var("KAFKA_PORT").unwrap_or_else(|_| "9091".into());
1400        port.parse().expect("Invalid port number")
1401    }
1402
1403    pub fn kafka_address() -> String {
1404        format!("{}:{}", kafka_host(), kafka_port())
1405    }
1406
1407    #[test]
1408    fn generate_config() {
1409        crate::test_util::test_generate_config::<KafkaSourceConfig>();
1410    }
1411
1412    pub(super) fn make_config(
1413        topic: &str,
1414        group: &str,
1415        log_namespace: LogNamespace,
1416        librdkafka_options: Option<HashMap<String, String>>,
1417    ) -> KafkaSourceConfig {
1418        KafkaSourceConfig {
1419            bootstrap_servers: kafka_address(),
1420            topics: vec![topic.into()],
1421            group_id: group.into(),
1422            auto_offset_reset: "beginning".into(),
1423            session_timeout_ms: Duration::from_millis(6000),
1424            commit_interval_ms: Duration::from_millis(1),
1425            librdkafka_options,
1426            key_field: default_key_field(),
1427            topic_key: default_topic_key(),
1428            partition_key: default_partition_key(),
1429            offset_key: default_offset_key(),
1430            headers_key: default_headers_key(),
1431            socket_timeout_ms: Duration::from_millis(60000),
1432            fetch_wait_max_ms: Duration::from_millis(100),
1433            log_namespace: Some(log_namespace == LogNamespace::Vector),
1434            ..Default::default()
1435        }
1436    }
1437
1438    #[test]
1439    fn test_output_schema_definition_vector_namespace() {
1440        let definitions = make_config("topic", "group", LogNamespace::Vector, None)
1441            .outputs(LogNamespace::Vector)
1442            .remove(0)
1443            .schema_definition(true);
1444
1445        assert_eq!(
1446            definitions,
1447            Some(
1448                Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
1449                    .with_meaning(OwnedTargetPath::event_root(), "message")
1450                    .with_metadata_field(
1451                        &owned_value_path!("kafka", "timestamp"),
1452                        Kind::timestamp(),
1453                        Some("timestamp")
1454                    )
1455                    .with_metadata_field(
1456                        &owned_value_path!("kafka", "message_key"),
1457                        Kind::bytes(),
1458                        None
1459                    )
1460                    .with_metadata_field(&owned_value_path!("kafka", "topic"), Kind::bytes(), None)
1461                    .with_metadata_field(
1462                        &owned_value_path!("kafka", "partition"),
1463                        Kind::bytes(),
1464                        None
1465                    )
1466                    .with_metadata_field(&owned_value_path!("kafka", "offset"), Kind::bytes(), None)
1467                    .with_metadata_field(
1468                        &owned_value_path!("kafka", "headers"),
1469                        Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1470                        None
1471                    )
1472                    .with_metadata_field(
1473                        &owned_value_path!("vector", "ingest_timestamp"),
1474                        Kind::timestamp(),
1475                        None
1476                    )
1477                    .with_metadata_field(
1478                        &owned_value_path!("vector", "source_type"),
1479                        Kind::bytes(),
1480                        None
1481                    )
1482            )
1483        )
1484    }
1485
1486    #[test]
1487    fn test_output_schema_definition_legacy_namespace() {
1488        let definitions = make_config("topic", "group", LogNamespace::Legacy, None)
1489            .outputs(LogNamespace::Legacy)
1490            .remove(0)
1491            .schema_definition(true);
1492
1493        assert_eq!(
1494            definitions,
1495            Some(
1496                Definition::new_with_default_metadata(Kind::json(), [LogNamespace::Legacy])
1497                    .unknown_fields(Kind::undefined())
1498                    .with_event_field(
1499                        &owned_value_path!("message"),
1500                        Kind::bytes(),
1501                        Some("message")
1502                    )
1503                    .with_event_field(
1504                        &owned_value_path!("timestamp"),
1505                        Kind::timestamp(),
1506                        Some("timestamp")
1507                    )
1508                    .with_event_field(&owned_value_path!("message_key"), Kind::bytes(), None)
1509                    .with_event_field(&owned_value_path!("topic"), Kind::bytes(), None)
1510                    .with_event_field(&owned_value_path!("partition"), Kind::bytes(), None)
1511                    .with_event_field(&owned_value_path!("offset"), Kind::bytes(), None)
1512                    .with_event_field(
1513                        &owned_value_path!("headers"),
1514                        Kind::object(Collection::empty().with_unknown(Kind::bytes())),
1515                        None
1516                    )
1517                    .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
1518            )
1519        )
1520    }
1521
1522    #[tokio::test]
1523    async fn consumer_create_ok() {
1524        let config = make_config("topic", "group", LogNamespace::Legacy, None);
1525        assert!(create_consumer(&config, true).is_ok());
1526    }
1527
1528    #[tokio::test]
1529    async fn consumer_create_incorrect_auto_offset_reset() {
1530        let config = KafkaSourceConfig {
1531            auto_offset_reset: "incorrect-auto-offset-reset".to_string(),
1532            ..make_config("topic", "group", LogNamespace::Legacy, None)
1533        };
1534        assert!(create_consumer(&config, true).is_err());
1535    }
1536}
1537
1538#[cfg(feature = "kafka-integration-tests")]
1539#[cfg(test)]
1540mod integration_test {
1541    use std::time::Duration;
1542
1543    use chrono::{DateTime, SubsecRound, Utc};
1544    use futures::Stream;
1545    use futures_util::stream::FuturesUnordered;
1546    use rdkafka::{
1547        admin::{AdminClient, AdminOptions, NewTopic, TopicReplication},
1548        client::DefaultClientContext,
1549        config::{ClientConfig, FromClientConfig},
1550        consumer::BaseConsumer,
1551        message::{Header, OwnedHeaders},
1552        producer::{FutureProducer, FutureRecord},
1553        util::Timeout,
1554        Offset, TopicPartitionList,
1555    };
1556    use stream_cancel::{Trigger, Tripwire};
1557    use tokio::time::sleep;
1558    use vector_lib::event::EventStatus;
1559    use vrl::{event_path, value};
1560
1561    use super::{test::*, *};
1562    use crate::{
1563        event::{EventArray, EventContainer},
1564        shutdown::ShutdownSignal,
1565        test_util::{collect_n, components::assert_source_compliance, random_string},
1566        SourceSender,
1567    };
1568
1569    const KEY: &str = "my key";
1570    const TEXT: &str = "my message";
1571    const HEADER_KEY: &str = "my header";
1572    const HEADER_VALUE: &str = "my header value";
1573
1574    fn kafka_test_topic() -> String {
1575        std::env::var("KAFKA_TEST_TOPIC")
1576            .unwrap_or_else(|_| format!("test-topic-{}", random_string(10)))
1577    }
1578    fn kafka_max_bytes() -> String {
1579        std::env::var("KAFKA_MAX_BYTES").unwrap_or_else(|_| "1024".into())
1580    }
1581
1582    fn client_config<T: FromClientConfig>(group: Option<&str>) -> T {
1583        let mut client = ClientConfig::new();
1584        client.set("bootstrap.servers", kafka_address());
1585        client.set("produce.offset.report", "true");
1586        client.set("message.timeout.ms", "5000");
1587        client.set("auto.commit.interval.ms", "1");
1588        if let Some(group) = group {
1589            client.set("group.id", group);
1590        }
1591        client.create().expect("Producer creation error")
1592    }
1593
1594    async fn send_events(topic: String, partitions: i32, count: usize) -> DateTime<Utc> {
1595        let now = Utc::now();
1596        let timestamp = now.timestamp_millis();
1597
1598        let producer: &FutureProducer = &client_config(None);
1599        let topic_name = topic.as_ref();
1600
1601        create_topic(topic_name, partitions).await;
1602
1603        (0..count)
1604            .map(|i| async move {
1605                let text = format!("{TEXT} {i:03}");
1606                let key = format!("{KEY} {i}");
1607                let record = FutureRecord::to(topic_name)
1608                    .payload(&text)
1609                    .key(&key)
1610                    .timestamp(timestamp)
1611                    .headers(OwnedHeaders::new().insert(Header {
1612                        key: HEADER_KEY,
1613                        value: Some(HEADER_VALUE),
1614                    }));
1615                if let Err(error) = producer.send(record, Timeout::Never).await {
1616                    panic!("Cannot send event to Kafka: {error:?}");
1617                }
1618            })
1619            .collect::<FuturesUnordered<_>>()
1620            .collect::<Vec<_>>()
1621            .await;
1622
1623        now
1624    }
1625
1626    async fn send_to_test_topic(partitions: i32, count: usize) -> (String, String, DateTime<Utc>) {
1627        let topic = kafka_test_topic();
1628        let group_id = format!("test-group-{}", random_string(10));
1629
1630        let sent_at = send_events(topic.clone(), partitions, count).await;
1631
1632        (topic, group_id, sent_at)
1633    }
1634
1635    #[tokio::test]
1636    async fn consumes_event_with_acknowledgements() {
1637        send_receive(true, |_| false, 10, LogNamespace::Legacy).await;
1638    }
1639
1640    #[tokio::test]
1641    async fn consumes_event_with_acknowledgements_vector_namespace() {
1642        send_receive(true, |_| false, 10, LogNamespace::Vector).await;
1643    }
1644
1645    #[tokio::test]
1646    async fn consumes_event_without_acknowledgements() {
1647        send_receive(false, |_| false, 10, LogNamespace::Legacy).await;
1648    }
1649
1650    #[tokio::test]
1651    async fn consumes_event_without_acknowledgements_vector_namespace() {
1652        send_receive(false, |_| false, 10, LogNamespace::Vector).await;
1653    }
1654
1655    #[tokio::test]
1656    async fn handles_one_negative_acknowledgement() {
1657        send_receive(true, |n| n == 2, 10, LogNamespace::Legacy).await;
1658    }
1659
1660    #[tokio::test]
1661    async fn handles_one_negative_acknowledgement_vector_namespace() {
1662        send_receive(true, |n| n == 2, 10, LogNamespace::Vector).await;
1663    }
1664
1665    #[tokio::test]
1666    async fn handles_permanent_negative_acknowledgement() {
1667        send_receive(true, |n| n >= 2, 2, LogNamespace::Legacy).await;
1668    }
1669
1670    #[tokio::test]
1671    async fn handles_permanent_negative_acknowledgement_vector_namespace() {
1672        send_receive(true, |n| n >= 2, 2, LogNamespace::Vector).await;
1673    }
1674
1675    async fn send_receive(
1676        acknowledgements: bool,
1677        error_at: impl Fn(usize) -> bool,
1678        receive_count: usize,
1679        log_namespace: LogNamespace,
1680    ) {
1681        const SEND_COUNT: usize = 10;
1682
1683        let topic = format!("test-topic-{}", random_string(10));
1684        let group_id = format!("test-group-{}", random_string(10));
1685        let config = make_config(&topic, &group_id, log_namespace, None);
1686
1687        let now = send_events(topic.clone(), 1, 10).await;
1688
1689        let events = assert_source_compliance(&["protocol", "topic", "partition"], async move {
1690            let (tx, rx) = SourceSender::new_test_errors(error_at);
1691            let (trigger_shutdown, shutdown_done) =
1692                spawn_kafka(tx, config, acknowledgements, false, log_namespace);
1693            let events = collect_n(rx, SEND_COUNT).await;
1694            // Yield to the finalization task to let it collect the
1695            // batch status receivers before signalling the shutdown.
1696            tokio::task::yield_now().await;
1697            drop(trigger_shutdown);
1698            shutdown_done.await;
1699
1700            events
1701        })
1702        .await;
1703
1704        let offset = fetch_tpl_offset(&group_id, &topic, 0);
1705        assert_eq!(offset, Offset::from_raw(receive_count as i64));
1706
1707        assert_eq!(events.len(), SEND_COUNT);
1708        for (i, event) in events.into_iter().enumerate() {
1709            if let LogNamespace::Legacy = log_namespace {
1710                assert_eq!(
1711                    event.as_log()[log_schema().message_key().unwrap().to_string()],
1712                    format!("{TEXT} {i:03}").into()
1713                );
1714                assert_eq!(event.as_log()["message_key"], format!("{KEY} {i}").into());
1715                assert_eq!(
1716                    event.as_log()[log_schema().source_type_key().unwrap().to_string()],
1717                    "kafka".into()
1718                );
1719                assert_eq!(
1720                    event.as_log()[log_schema().timestamp_key().unwrap().to_string()],
1721                    now.trunc_subsecs(3).into()
1722                );
1723                assert_eq!(event.as_log()["topic"], topic.clone().into());
1724                assert!(event.as_log().contains("partition"));
1725                assert!(event.as_log().contains("offset"));
1726                let mut expected_headers = ObjectMap::new();
1727                expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1728                assert_eq!(event.as_log()["headers"], Value::from(expected_headers));
1729            } else {
1730                let meta = event.as_log().metadata().value();
1731
1732                assert_eq!(
1733                    meta.get(path!("vector", "source_type")).unwrap(),
1734                    &value!(KafkaSourceConfig::NAME)
1735                );
1736                assert!(meta
1737                    .get(path!("vector", "ingest_timestamp"))
1738                    .unwrap()
1739                    .is_timestamp());
1740
1741                assert_eq!(
1742                    event.as_log().value(),
1743                    &value!(format!("{} {:03}", TEXT, i))
1744                );
1745                assert_eq!(
1746                    meta.get(path!("kafka", "message_key")).unwrap(),
1747                    &value!(format!("{} {}", KEY, i))
1748                );
1749
1750                assert_eq!(
1751                    meta.get(path!("kafka", "timestamp")).unwrap(),
1752                    &value!(now.trunc_subsecs(3))
1753                );
1754                assert_eq!(
1755                    meta.get(path!("kafka", "topic")).unwrap(),
1756                    &value!(topic.clone())
1757                );
1758                assert!(meta.get(path!("kafka", "partition")).unwrap().is_integer(),);
1759                assert!(meta.get(path!("kafka", "offset")).unwrap().is_integer(),);
1760
1761                let mut expected_headers = ObjectMap::new();
1762                expected_headers.insert(HEADER_KEY.into(), Value::from(HEADER_VALUE));
1763                assert_eq!(
1764                    meta.get(path!("kafka", "headers")).unwrap(),
1765                    &Value::from(expected_headers)
1766                );
1767            }
1768        }
1769    }
1770
1771    fn make_rand_config() -> (String, String, KafkaSourceConfig) {
1772        let topic = format!("test-topic-{}", random_string(10));
1773        let group_id = format!("test-group-{}", random_string(10));
1774        let config = make_config(&topic, &group_id, LogNamespace::Legacy, None);
1775        (topic, group_id, config)
1776    }
1777
1778    fn delay_pipeline(
1779        id: usize,
1780        delay: Duration,
1781        status: EventStatus,
1782    ) -> (SourceSender, impl Stream<Item = EventArray> + Unpin) {
1783        let (pipe, recv) = SourceSender::new_test_sender_with_buffer(100);
1784        let recv = recv.into_stream();
1785        let recv = recv.then(move |item| async move {
1786            let mut events = item.events;
1787            events.iter_logs_mut().for_each(|log| {
1788                log.insert(event_path!("pipeline_id"), id.to_string());
1789            });
1790            sleep(delay).await;
1791            events.iter_events_mut().for_each(|mut event| {
1792                let metadata = event.metadata_mut();
1793                metadata.update_status(status);
1794                metadata.update_sources();
1795            });
1796            events
1797        });
1798        (pipe, Box::pin(recv))
1799    }
1800
1801    fn spawn_kafka(
1802        out: SourceSender,
1803        config: KafkaSourceConfig,
1804        acknowledgements: bool,
1805        eof: bool,
1806        log_namespace: LogNamespace,
1807    ) -> (Trigger, Tripwire) {
1808        let (trigger_shutdown, shutdown, shutdown_done) = ShutdownSignal::new_wired();
1809
1810        let decoder = DecodingConfig::new(
1811            config.framing.clone(),
1812            config.decoding.clone(),
1813            log_namespace,
1814        )
1815        .build()
1816        .unwrap();
1817
1818        let (consumer, callback_rx) = create_consumer(&config, acknowledgements).unwrap();
1819
1820        tokio::spawn(kafka_source(
1821            config,
1822            consumer,
1823            callback_rx,
1824            decoder,
1825            out,
1826            shutdown,
1827            eof,
1828            log_namespace,
1829        ));
1830        (trigger_shutdown, shutdown_done)
1831    }
1832
1833    fn fetch_tpl_offset(group_id: &str, topic: &str, partition: i32) -> Offset {
1834        let client: BaseConsumer = client_config(Some(group_id));
1835        client.subscribe(&[topic]).expect("Subscribing failed");
1836
1837        let mut tpl = TopicPartitionList::new();
1838        tpl.add_partition(topic, partition);
1839        client
1840            .committed_offsets(tpl, Duration::from_secs(1))
1841            .expect("Getting committed offsets failed")
1842            .find_partition(topic, partition)
1843            .expect("Missing topic/partition")
1844            .offset()
1845    }
1846
1847    async fn create_topic(topic: &str, partitions: i32) {
1848        let client: AdminClient<DefaultClientContext> = client_config(None);
1849        let topic_results = client
1850            .create_topics(
1851                [&NewTopic {
1852                    name: topic,
1853                    num_partitions: partitions,
1854                    replication: TopicReplication::Fixed(1),
1855                    config: vec![],
1856                }],
1857                &AdminOptions::default(),
1858            )
1859            .await
1860            .expect("create_topics failed");
1861
1862        for result in topic_results {
1863            if let Err((topic, err)) = result {
1864                if err != rdkafka::types::RDKafkaErrorCode::TopicAlreadyExists {
1865                    panic!("Creating a topic failed: {:?}", (topic, err))
1866                }
1867            }
1868        }
1869    }
1870
1871    // Failure timeline:
1872    // - Topic exists on multiple partitions
1873    // - Consumer A connects to topic, is assigned both partitions
1874    // - Consumer A receives some messages
1875    // - Consumer B connects to topic
1876    // - Consumer A has one partition revoked (rebalance)
1877    // - Consumer B is assigned a partition
1878    // - Consumer A stores an order on the revoked partition
1879    // - Consumer B skips receiving messages?
1880    #[ignore]
1881    #[tokio::test]
1882    async fn handles_rebalance() {
1883        // The test plan here is to:
1884        // - Set up one source instance, feeding into a pipeline that delays acks.
1885        // - Wait a bit, and set up a second source instance. This should cause a rebalance.
1886        // - Wait further until all events will have been pulled down.
1887        // - Verify that all events are captured by the two sources, and that offsets are set right, etc.
1888
1889        // However this test, as written, does not actually cause the
1890        // conditions required to test this. We have had external
1891        // validation that the sink behaves properly on rebalance
1892        // events.  This test also requires the insertion of a small
1893        // delay into the source to guarantee the timing, which is not
1894        // suitable for production code.
1895
1896        const NEVENTS: usize = 200;
1897        const DELAY: u64 = 100;
1898
1899        let (topic, group_id, config) = make_rand_config();
1900        create_topic(&topic, 2).await;
1901
1902        let _send_start = send_events(topic.clone(), 1, NEVENTS).await;
1903
1904        let (tx, rx1) = delay_pipeline(1, Duration::from_millis(200), EventStatus::Delivered);
1905        let (trigger_shutdown1, shutdown_done1) =
1906            spawn_kafka(tx, config.clone(), true, false, LogNamespace::Legacy);
1907        let events1 = tokio::spawn(collect_n(rx1, NEVENTS));
1908
1909        sleep(Duration::from_secs(1)).await;
1910
1911        let (tx, rx2) = delay_pipeline(2, Duration::from_millis(DELAY), EventStatus::Delivered);
1912        let (trigger_shutdown2, shutdown_done2) =
1913            spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1914        let events2 = tokio::spawn(collect_n(rx2, NEVENTS));
1915
1916        sleep(Duration::from_secs(5)).await;
1917
1918        drop(trigger_shutdown1);
1919        let events1 = events1.await.unwrap();
1920        shutdown_done1.await;
1921
1922        sleep(Duration::from_secs(5)).await;
1923
1924        drop(trigger_shutdown2);
1925        let events2 = events2.await.unwrap();
1926        shutdown_done2.await;
1927
1928        sleep(Duration::from_secs(1)).await;
1929
1930        assert!(!events1.is_empty());
1931        assert!(!events2.is_empty());
1932
1933        match fetch_tpl_offset(&group_id, &topic, 0) {
1934            Offset::Offset(offset) => {
1935                assert!((offset as isize - events1.len() as isize).abs() <= 1)
1936            }
1937            o => panic!("Invalid offset for partition 0 {o:?}"),
1938        }
1939
1940        match fetch_tpl_offset(&group_id, &topic, 1) {
1941            Offset::Offset(offset) => {
1942                assert!((offset as isize - events2.len() as isize).abs() <= 1)
1943            }
1944            o => panic!("Invalid offset for partition 0 {o:?}"),
1945        }
1946
1947        let mut all_events = events1
1948            .into_iter()
1949            .chain(events2.into_iter())
1950            .flat_map(map_logs)
1951            .collect::<Vec<String>>();
1952        all_events.sort();
1953
1954        // Assert they are all in sequential order and no dupes, TODO
1955    }
1956
1957    #[tokio::test]
1958    async fn drains_acknowledgements_at_shutdown() {
1959        // 1. Send N events (if running against a pre-populated kafka topic, use send_count=0 and expect_count=expected number of messages; otherwise just set send_count)
1960        let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
1961            .unwrap_or_else(|_| "125000".into())
1962            .parse()
1963            .expect("Number of messages to send to kafka.");
1964        let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
1965            .unwrap_or_else(|_| format!("{send_count}"))
1966            .parse()
1967            .expect("Number of messages to expect consumers to process.");
1968        let delay_ms: u64 = std::env::var("KAFKA_SHUTDOWN_DELAY")
1969            .unwrap_or_else(|_| "2000".into())
1970            .parse()
1971            .expect("Number of milliseconds before shutting down first consumer.");
1972
1973        let (topic, group_id, _) = send_to_test_topic(1, send_count).await;
1974
1975        // 2. Run the kafka source to read some of the events
1976        // 3. Send a shutdown signal (at some point before all events are read)
1977        let mut opts = HashMap::new();
1978        // Set options to get partition EOF notifications, and fetch data in small/configurable size chunks
1979        opts.insert("enable.partition.eof".into(), "true".into());
1980        opts.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
1981        let events1 = {
1982            let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts.clone()));
1983            let (tx, rx) = SourceSender::new_test_errors(|_| false);
1984            let (trigger_shutdown, shutdown_done) =
1985                spawn_kafka(tx, config, true, false, LogNamespace::Legacy);
1986            let (events, _) = tokio::join!(rx.collect::<Vec<Event>>(), async move {
1987                sleep(Duration::from_millis(delay_ms)).await;
1988                drop(trigger_shutdown);
1989            });
1990            shutdown_done.await;
1991            events
1992        };
1993
1994        debug!("Consumer group.id: {}", &group_id);
1995        debug!(
1996            "First consumer read {} of {} messages.",
1997            events1.len(),
1998            expect_count
1999        );
2000
2001        // 4. Run the kafka source again to finish reading the events
2002        let events2 = {
2003            let config = make_config(&topic, &group_id, LogNamespace::Legacy, Some(opts));
2004            let (tx, rx) = SourceSender::new_test_errors(|_| false);
2005            let (trigger_shutdown, shutdown_done) =
2006                spawn_kafka(tx, config, true, true, LogNamespace::Legacy);
2007            let events = rx.collect::<Vec<Event>>().await;
2008            drop(trigger_shutdown);
2009            shutdown_done.await;
2010            events
2011        };
2012
2013        debug!(
2014            "Second consumer read {} of {} messages.",
2015            events2.len(),
2016            expect_count
2017        );
2018
2019        // 5. Total number of events processed should equal the number sent
2020        let total = events1.len() + events2.len();
2021        assert_ne!(
2022            events1.len(),
2023            0,
2024            "First batch of events should be non-zero (increase KAFKA_SHUTDOWN_DELAY?)"
2025        );
2026        assert_ne!(events2.len(), 0, "Second batch of events should be non-zero (decrease KAFKA_SHUTDOWN_DELAY or increase KAFKA_SEND_COUNT?) ");
2027        assert_eq!(total, expect_count);
2028    }
2029
2030    async fn consume_with_rebalance(rebalance_strategy: String) {
2031        // 1. Send N events (if running against a pre-populated kafka topic, use send_count=0 and expect_count=expected number of messages; otherwise just set send_count)
2032        let send_count: usize = std::env::var("KAFKA_SEND_COUNT")
2033            .unwrap_or_else(|_| "125000".into())
2034            .parse()
2035            .expect("Number of messages to send to kafka.");
2036        let expect_count: usize = std::env::var("KAFKA_EXPECT_COUNT")
2037            .unwrap_or_else(|_| format!("{send_count}"))
2038            .parse()
2039            .expect("Number of messages to expect consumers to process.");
2040        let delay_ms: u64 = std::env::var("KAFKA_CONSUMER_DELAY")
2041            .unwrap_or_else(|_| "2000".into())
2042            .parse()
2043            .expect("Number of milliseconds before shutting down first consumer.");
2044
2045        let (topic, group_id, _) = send_to_test_topic(6, send_count).await;
2046        debug!("Topic: {}", &topic);
2047        debug!("Consumer group.id: {}", &group_id);
2048
2049        // 2. Run the kafka source to read some of the events
2050        // 3. Start 2nd & 3rd consumers using the same group.id, triggering rebalance events
2051        let mut kafka_options = HashMap::new();
2052        kafka_options.insert("enable.partition.eof".into(), "true".into());
2053        kafka_options.insert("fetch.message.max.bytes".into(), kafka_max_bytes());
2054        kafka_options.insert("partition.assignment.strategy".into(), rebalance_strategy);
2055        let config1 = make_config(
2056            &topic,
2057            &group_id,
2058            LogNamespace::Legacy,
2059            Some(kafka_options.clone()),
2060        );
2061        let config2 = config1.clone();
2062        let config3 = config1.clone();
2063        let config4 = config1.clone();
2064
2065        let (events1, events2, events3) = tokio::join!(
2066            async move {
2067                let (tx, rx) = SourceSender::new_test_errors(|_| false);
2068                let (_trigger_shutdown, _shutdown_done) =
2069                    spawn_kafka(tx, config1, true, true, LogNamespace::Legacy);
2070
2071                rx.collect::<Vec<Event>>().await
2072            },
2073            async move {
2074                sleep(Duration::from_millis(delay_ms)).await;
2075                let (tx, rx) = SourceSender::new_test_errors(|_| false);
2076                let (_trigger_shutdown, _shutdown_done) =
2077                    spawn_kafka(tx, config2, true, true, LogNamespace::Legacy);
2078
2079                rx.collect::<Vec<Event>>().await
2080            },
2081            async move {
2082                sleep(Duration::from_millis(delay_ms * 2)).await;
2083                let (tx, rx) = SourceSender::new_test_errors(|_| false);
2084                let (_trigger_shutdown, _shutdown_done) =
2085                    spawn_kafka(tx, config3, true, true, LogNamespace::Legacy);
2086
2087                rx.collect::<Vec<Event>>().await
2088            }
2089        );
2090
2091        let unconsumed = async move {
2092            let (tx, rx) = SourceSender::new_test_errors(|_| false);
2093            let (_trigger_shutdown, _shutdown_done) =
2094                spawn_kafka(tx, config4, true, true, LogNamespace::Legacy);
2095
2096            rx.collect::<Vec<Event>>().await
2097        }
2098        .await;
2099
2100        debug!(
2101            "First consumer read {} of {} messages.",
2102            events1.len(),
2103            expect_count
2104        );
2105
2106        debug!(
2107            "Second consumer read {} of {} messages.",
2108            events2.len(),
2109            expect_count
2110        );
2111        debug!(
2112            "Third consumer read {} of {} messages.",
2113            events3.len(),
2114            expect_count
2115        );
2116
2117        // 5. Total number of events processed should equal the number sent
2118        let total = events1.len() + events2.len() + events3.len();
2119        assert_ne!(
2120            events1.len(),
2121            0,
2122            "First batch of events should be non-zero (increase delay?)"
2123        );
2124        assert_ne!(
2125            events2.len(),
2126            0,
2127            "Second batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2128        );
2129        assert_ne!(
2130            events3.len(),
2131            0,
2132            "Third batch of events should be non-zero (decrease delay or increase KAFKA_SEND_COUNT?) "
2133        );
2134        assert_eq!(
2135            unconsumed.len(),
2136            0,
2137            "The first set of consumers should consume and ack all messages."
2138        );
2139        assert_eq!(total, expect_count);
2140    }
2141
2142    #[tokio::test]
2143    async fn drains_acknowledgements_during_rebalance_default_assignments() {
2144        // the default, eager rebalance strategies generally result in more revocations
2145        consume_with_rebalance("range,roundrobin".into()).await;
2146    }
2147    #[tokio::test]
2148    async fn drains_acknowledgements_during_rebalance_sticky_assignments() {
2149        // Cooperative rebalance strategies generally result in fewer revokes,
2150        // as only reassigned partitions are revoked
2151        consume_with_rebalance("cooperative-sticky".into()).await;
2152    }
2153
2154    fn map_logs(events: EventArray) -> impl Iterator<Item = String> {
2155        events.into_events().map(|event| {
2156            let log = event.into_log();
2157            format!(
2158                "{} {} {} {}",
2159                log["message"].to_string_lossy(),
2160                log["topic"].to_string_lossy(),
2161                log["partition"].to_string_lossy(),
2162                log["offset"].to_string_lossy(),
2163            )
2164        })
2165    }
2166}