vector/sinks/kafka/
sink.rs1use std::time::Duration;
2
3use rdkafka::{
4 error::KafkaError,
5 producer::{BaseProducer, FutureProducer, Producer},
6 ClientConfig,
7};
8use snafu::{ResultExt, Snafu};
9use tower::limit::RateLimit;
10use tracing::Span;
11use vrl::path::OwnedTargetPath;
12
13use super::config::KafkaSinkConfig;
14use crate::{
15 config::SinkHealthcheckOptions,
16 kafka::KafkaStatisticsContext,
17 sinks::{
18 kafka::{request_builder::KafkaRequestBuilder, service::KafkaService},
19 prelude::*,
20 },
21};
22
23#[derive(Debug, Snafu)]
24#[snafu(visibility(pub(crate)))]
25pub(super) enum BuildError {
26 #[snafu(display("creating kafka producer failed: {}", source))]
27 KafkaCreateFailed { source: KafkaError },
28}
29
30pub struct KafkaSink {
31 transformer: Transformer,
32 encoder: Encoder<()>,
33 service: RateLimit<KafkaService>,
34 topic: Template,
35 key_field: Option<OwnedTargetPath>,
36 headers_key: Option<OwnedTargetPath>,
37}
38
39pub(crate) fn create_producer(
40 client_config: ClientConfig,
41) -> crate::Result<FutureProducer<KafkaStatisticsContext>> {
42 let producer = client_config
43 .create_with_context(KafkaStatisticsContext {
44 expose_lag_metrics: false,
45 span: Span::current(),
46 })
47 .context(KafkaCreateFailedSnafu)?;
48 Ok(producer)
49}
50
51impl KafkaSink {
52 pub(crate) fn new(config: KafkaSinkConfig) -> crate::Result<Self> {
53 let producer_config = config.to_rdkafka()?;
54 let producer = create_producer(producer_config)?;
55 let transformer = config.encoding.transformer();
56 let serializer = config.encoding.build()?;
57 let encoder = Encoder::<()>::new(serializer);
58
59 Ok(KafkaSink {
60 headers_key: config.headers_key.map(|key| key.0),
61 transformer,
62 encoder,
63 service: ServiceBuilder::new()
64 .rate_limit(
65 config.rate_limit_num,
66 Duration::from_secs(config.rate_limit_duration_secs),
67 )
68 .service(KafkaService::new(producer)),
69 topic: config.topic,
70 key_field: config.key_field.map(|key| key.0),
71 })
72 }
73
74 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
75 let request_builder = KafkaRequestBuilder {
76 key_field: self.key_field,
77 headers_key: self.headers_key,
78 encoder: (self.transformer, self.encoder),
79 };
80
81 input
82 .filter_map(|event| {
83 future::ready(
85 self.topic
86 .render_string(&event)
87 .map_err(|error| {
88 emit!(TemplateRenderingError {
89 field: None,
90 drop_event: true,
91 error,
92 });
93 })
94 .ok()
95 .map(|topic| (topic, event)),
96 )
97 })
98 .request_builder(default_request_builder_concurrency_limit(), request_builder)
99 .filter_map(|request| async {
100 match request {
101 Err(error) => {
102 emit!(SinkRequestBuildError { error });
103 None
104 }
105 Ok(req) => Some(req),
106 }
107 })
108 .into_driver(self.service)
109 .protocol("kafka")
110 .run()
111 .await
112 }
113}
114
115pub(crate) async fn healthcheck(
116 config: KafkaSinkConfig,
117 healthcheck_options: SinkHealthcheckOptions,
118) -> crate::Result<()> {
119 trace!("Healthcheck started.");
120 let client_config = config.to_rdkafka().unwrap();
121 let topic: Option<String> = match config.healthcheck_topic {
122 Some(topic) => Some(topic),
123 _ => match config.topic.render_string(&LogEvent::from_str_legacy("")) {
124 Ok(topic) => Some(topic),
125 Err(error) => {
126 warn!(
127 message = "Could not generate topic for healthcheck.",
128 %error,
129 );
130 None
131 }
132 },
133 };
134
135 tokio::task::spawn_blocking(move || {
136 let producer: BaseProducer = client_config.create().unwrap();
137 let topic = topic.as_ref().map(|topic| &topic[..]);
138
139 producer
140 .client()
141 .fetch_metadata(topic, healthcheck_options.timeout)
142 .map(|_| ())
143 })
144 .await??;
145 trace!("Healthcheck completed.");
146 Ok(())
147}
148
149#[async_trait]
150impl StreamSink<Event> for KafkaSink {
151 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
152 self.run_inner(input).await
153 }
154}