vector/sinks/amqp/
sink.rs

1//! The sink for the `AMQP` sink that wires together the main stream that takes the
2//! event and sends it to `AMQP`.
3use crate::sinks::prelude::*;
4use lapin::BasicProperties;
5use serde::Serialize;
6
7use super::channel::AmqpSinkChannels;
8use super::{
9    config::{AmqpPropertiesConfig, AmqpSinkConfig},
10    encoder::AmqpEncoder,
11    request_builder::AmqpRequestBuilder,
12    service::AmqpService,
13    BuildError,
14};
15
16/// Stores the event together with the rendered exchange and routing_key values.
17/// This is passed into the `RequestBuilder` which then splits it out into the event
18/// and metadata containing the exchange and routing_key.
19/// This event needs to be created prior to building the request so we can filter out
20/// any events that error whilst rendering the templates.
21#[derive(Serialize)]
22pub(super) struct AmqpEvent {
23    pub(super) event: Event,
24    pub(super) exchange: String,
25    pub(super) routing_key: String,
26    pub(super) properties: BasicProperties,
27}
28
29pub(super) struct AmqpSink {
30    pub(super) channels: AmqpSinkChannels,
31    exchange: Template,
32    routing_key: Option<Template>,
33    properties: Option<AmqpPropertiesConfig>,
34    transformer: Transformer,
35    encoder: crate::codecs::Encoder<()>,
36}
37
38impl AmqpSink {
39    pub(super) async fn new(config: AmqpSinkConfig) -> crate::Result<Self> {
40        let channels = super::channel::new_channel_pool(&config)
41            .map_err(|e| BuildError::AmqpCreateFailed { source: e })?;
42
43        let transformer = config.encoding.transformer();
44        let serializer = config.encoding.build()?;
45        let encoder = crate::codecs::Encoder::<()>::new(serializer);
46
47        Ok(AmqpSink {
48            channels,
49            exchange: config.exchange,
50            routing_key: config.routing_key,
51            properties: config.properties,
52            transformer,
53            encoder,
54        })
55    }
56
57    /// Transforms an event into an `AMQP` event by rendering the required template fields.
58    /// Returns None if there is an error whilst rendering.
59    fn make_amqp_event(&self, event: Event) -> Option<AmqpEvent> {
60        let exchange = self
61            .exchange
62            .render_string(&event)
63            .map_err(|missing_keys| {
64                emit!(TemplateRenderingError {
65                    error: missing_keys,
66                    field: Some("exchange"),
67                    drop_event: true,
68                })
69            })
70            .ok()?;
71
72        let routing_key = match &self.routing_key {
73            None => String::new(),
74            Some(key) => key
75                .render_string(&event)
76                .map_err(|missing_keys| {
77                    emit!(TemplateRenderingError {
78                        error: missing_keys,
79                        field: Some("routing_key"),
80                        drop_event: true,
81                    })
82                })
83                .ok()?,
84        };
85
86        let properties = match &self.properties {
87            None => BasicProperties::default(),
88            Some(prop) => prop.build(&event)?,
89        };
90
91        Some(AmqpEvent {
92            event,
93            exchange,
94            routing_key,
95            properties,
96        })
97    }
98
99    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
100        let request_builder = AmqpRequestBuilder {
101            encoder: AmqpEncoder {
102                encoder: self.encoder.clone(),
103                transformer: self.transformer.clone(),
104            },
105        };
106        let service = ServiceBuilder::new().service(AmqpService {
107            channels: self.channels.clone(),
108        });
109
110        input
111            .filter_map(|event| std::future::ready(self.make_amqp_event(event)))
112            .request_builder(default_request_builder_concurrency_limit(), request_builder)
113            .filter_map(|request| async move {
114                match request {
115                    Err(e) => {
116                        error!("Failed to build AMQP request: {:?}.", e);
117                        None
118                    }
119                    Ok(req) => Some(req),
120                }
121            })
122            .into_driver(service)
123            .protocol("amqp_0_9_1")
124            .run()
125            .await
126    }
127}
128
129#[async_trait]
130impl StreamSink<Event> for AmqpSink {
131    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
132        self.run_inner(input).await
133    }
134}