vector/sinks/kafka/
sink.rs

1use std::time::Duration;
2
3use rdkafka::{
4    error::KafkaError,
5    producer::{BaseProducer, FutureProducer, Producer},
6    ClientConfig,
7};
8use snafu::{ResultExt, Snafu};
9use tower::limit::RateLimit;
10use tracing::Span;
11use vrl::path::OwnedTargetPath;
12
13use super::config::KafkaSinkConfig;
14use crate::{
15    config::SinkHealthcheckOptions,
16    kafka::KafkaStatisticsContext,
17    sinks::{
18        kafka::{request_builder::KafkaRequestBuilder, service::KafkaService},
19        prelude::*,
20    },
21};
22
23#[derive(Debug, Snafu)]
24#[snafu(visibility(pub(crate)))]
25pub(super) enum BuildError {
26    #[snafu(display("creating kafka producer failed: {}", source))]
27    KafkaCreateFailed { source: KafkaError },
28}
29
30pub struct KafkaSink {
31    transformer: Transformer,
32    encoder: Encoder<()>,
33    service: RateLimit<KafkaService>,
34    topic: Template,
35    key_field: Option<OwnedTargetPath>,
36    headers_key: Option<OwnedTargetPath>,
37}
38
39pub(crate) fn create_producer(
40    client_config: ClientConfig,
41) -> crate::Result<FutureProducer<KafkaStatisticsContext>> {
42    let producer = client_config
43        .create_with_context(KafkaStatisticsContext {
44            expose_lag_metrics: false,
45            span: Span::current(),
46        })
47        .context(KafkaCreateFailedSnafu)?;
48    Ok(producer)
49}
50
51impl KafkaSink {
52    pub(crate) fn new(config: KafkaSinkConfig) -> crate::Result<Self> {
53        let producer_config = config.to_rdkafka()?;
54        let producer = create_producer(producer_config)?;
55        let transformer = config.encoding.transformer();
56        let serializer = config.encoding.build()?;
57        let encoder = Encoder::<()>::new(serializer);
58
59        Ok(KafkaSink {
60            headers_key: config.headers_key.map(|key| key.0),
61            transformer,
62            encoder,
63            service: ServiceBuilder::new()
64                .rate_limit(
65                    config.rate_limit_num,
66                    Duration::from_secs(config.rate_limit_duration_secs),
67                )
68                .service(KafkaService::new(producer)),
69            topic: config.topic,
70            key_field: config.key_field.map(|key| key.0),
71        })
72    }
73
74    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
75        let request_builder = KafkaRequestBuilder {
76            key_field: self.key_field,
77            headers_key: self.headers_key,
78            encoder: (self.transformer, self.encoder),
79        };
80
81        input
82            .filter_map(|event| {
83                // Compute the topic.
84                future::ready(
85                    self.topic
86                        .render_string(&event)
87                        .map_err(|error| {
88                            emit!(TemplateRenderingError {
89                                field: None,
90                                drop_event: true,
91                                error,
92                            });
93                        })
94                        .ok()
95                        .map(|topic| (topic, event)),
96                )
97            })
98            .request_builder(default_request_builder_concurrency_limit(), request_builder)
99            .filter_map(|request| async {
100                match request {
101                    Err(error) => {
102                        emit!(SinkRequestBuildError { error });
103                        None
104                    }
105                    Ok(req) => Some(req),
106                }
107            })
108            .into_driver(self.service)
109            .protocol("kafka")
110            .run()
111            .await
112    }
113}
114
115pub(crate) async fn healthcheck(
116    config: KafkaSinkConfig,
117    healthcheck_options: SinkHealthcheckOptions,
118) -> crate::Result<()> {
119    trace!("Healthcheck started.");
120    let client_config = config.to_rdkafka().unwrap();
121    let topic: Option<String> = match config.healthcheck_topic {
122        Some(topic) => Some(topic),
123        _ => match config.topic.render_string(&LogEvent::from_str_legacy("")) {
124            Ok(topic) => Some(topic),
125            Err(error) => {
126                warn!(
127                    message = "Could not generate topic for healthcheck.",
128                    %error,
129                );
130                None
131            }
132        },
133    };
134
135    tokio::task::spawn_blocking(move || {
136        let producer: BaseProducer = client_config.create().unwrap();
137        let topic = topic.as_ref().map(|topic| &topic[..]);
138
139        producer
140            .client()
141            .fetch_metadata(topic, healthcheck_options.timeout)
142            .map(|_| ())
143    })
144    .await??;
145    trace!("Healthcheck completed.");
146    Ok(())
147}
148
149#[async_trait]
150impl StreamSink<Event> for KafkaSink {
151    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
152        self.run_inner(input).await
153    }
154}