vector/sinks/amqp/
sink.rs1use crate::sinks::prelude::*;
4use lapin::BasicProperties;
5use serde::Serialize;
6
7use super::channel::AmqpSinkChannels;
8use super::{
9 config::{AmqpPropertiesConfig, AmqpSinkConfig},
10 encoder::AmqpEncoder,
11 request_builder::AmqpRequestBuilder,
12 service::AmqpService,
13 BuildError,
14};
15
16#[derive(Serialize)]
22pub(super) struct AmqpEvent {
23 pub(super) event: Event,
24 pub(super) exchange: String,
25 pub(super) routing_key: String,
26 pub(super) properties: BasicProperties,
27}
28
29pub(super) struct AmqpSink {
30 pub(super) channels: AmqpSinkChannels,
31 exchange: Template,
32 routing_key: Option<Template>,
33 properties: Option<AmqpPropertiesConfig>,
34 transformer: Transformer,
35 encoder: crate::codecs::Encoder<()>,
36}
37
38impl AmqpSink {
39 pub(super) async fn new(config: AmqpSinkConfig) -> crate::Result<Self> {
40 let channels = super::channel::new_channel_pool(&config)
41 .map_err(|e| BuildError::AmqpCreateFailed { source: e })?;
42
43 let transformer = config.encoding.transformer();
44 let serializer = config.encoding.build()?;
45 let encoder = crate::codecs::Encoder::<()>::new(serializer);
46
47 Ok(AmqpSink {
48 channels,
49 exchange: config.exchange,
50 routing_key: config.routing_key,
51 properties: config.properties,
52 transformer,
53 encoder,
54 })
55 }
56
57 fn make_amqp_event(&self, event: Event) -> Option<AmqpEvent> {
60 let exchange = self
61 .exchange
62 .render_string(&event)
63 .map_err(|missing_keys| {
64 emit!(TemplateRenderingError {
65 error: missing_keys,
66 field: Some("exchange"),
67 drop_event: true,
68 })
69 })
70 .ok()?;
71
72 let routing_key = match &self.routing_key {
73 None => String::new(),
74 Some(key) => key
75 .render_string(&event)
76 .map_err(|missing_keys| {
77 emit!(TemplateRenderingError {
78 error: missing_keys,
79 field: Some("routing_key"),
80 drop_event: true,
81 })
82 })
83 .ok()?,
84 };
85
86 let properties = match &self.properties {
87 None => BasicProperties::default(),
88 Some(prop) => prop.build(&event)?,
89 };
90
91 Some(AmqpEvent {
92 event,
93 exchange,
94 routing_key,
95 properties,
96 })
97 }
98
99 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
100 let request_builder = AmqpRequestBuilder {
101 encoder: AmqpEncoder {
102 encoder: self.encoder.clone(),
103 transformer: self.transformer.clone(),
104 },
105 };
106 let service = ServiceBuilder::new().service(AmqpService {
107 channels: self.channels.clone(),
108 });
109
110 input
111 .filter_map(|event| std::future::ready(self.make_amqp_event(event)))
112 .request_builder(default_request_builder_concurrency_limit(), request_builder)
113 .filter_map(|request| async move {
114 match request {
115 Err(e) => {
116 error!("Failed to build AMQP request: {:?}.", e);
117 None
118 }
119 Ok(req) => Some(req),
120 }
121 })
122 .into_driver(service)
123 .protocol("amqp_0_9_1")
124 .run()
125 .await
126 }
127}
128
129#[async_trait]
130impl StreamSink<Event> for AmqpSink {
131 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
132 self.run_inner(input).await
133 }
134}