1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
//! The sink for the `AMQP` sink that wires together the main stream that takes the
//! event and sends it to `AMQP`.
use crate::sinks::prelude::*;
use lapin::{options::ConfirmSelectOptions, BasicProperties};
use serde::Serialize;
use std::sync::Arc;

use super::{
    config::{AmqpPropertiesConfig, AmqpSinkConfig},
    encoder::AmqpEncoder,
    request_builder::AmqpRequestBuilder,
    service::AmqpService,
    BuildError,
};

/// Stores the event together with the rendered exchange and routing_key values.
/// This is passed into the `RequestBuilder` which then splits it out into the event
/// and metadata containing the exchange and routing_key.
/// This event needs to be created prior to building the request so we can filter out
/// any events that error whilst rendering the templates.
#[derive(Serialize)]
pub(super) struct AmqpEvent {
    pub(super) event: Event,
    pub(super) exchange: String,
    pub(super) routing_key: String,
    pub(super) properties: BasicProperties,
}

pub(super) struct AmqpSink {
    pub(super) channel: Arc<lapin::Channel>,
    exchange: Template,
    routing_key: Option<Template>,
    properties: Option<AmqpPropertiesConfig>,
    transformer: Transformer,
    encoder: crate::codecs::Encoder<()>,
}

impl AmqpSink {
    pub(super) async fn new(config: AmqpSinkConfig) -> crate::Result<Self> {
        let (_, channel) = config
            .connection
            .connect()
            .await
            .map_err(|e| BuildError::AmqpCreateFailed { source: e })?;

        // Enable confirmations on the channel.
        channel
            .confirm_select(ConfirmSelectOptions::default())
            .await
            .map_err(|e| BuildError::AmqpCreateFailed {
                source: Box::new(e),
            })?;

        let transformer = config.encoding.transformer();
        let serializer = config.encoding.build()?;
        let encoder = crate::codecs::Encoder::<()>::new(serializer);

        Ok(AmqpSink {
            channel: Arc::new(channel),
            exchange: config.exchange,
            routing_key: config.routing_key,
            properties: config.properties,
            transformer,
            encoder,
        })
    }

    /// Transforms an event into an `AMQP` event by rendering the required template fields.
    /// Returns None if there is an error whilst rendering.
    fn make_amqp_event(&self, event: Event) -> Option<AmqpEvent> {
        let exchange = self
            .exchange
            .render_string(&event)
            .map_err(|missing_keys| {
                emit!(TemplateRenderingError {
                    error: missing_keys,
                    field: Some("exchange"),
                    drop_event: true,
                })
            })
            .ok()?;

        let routing_key = match &self.routing_key {
            None => String::new(),
            Some(key) => key
                .render_string(&event)
                .map_err(|missing_keys| {
                    emit!(TemplateRenderingError {
                        error: missing_keys,
                        field: Some("routing_key"),
                        drop_event: true,
                    })
                })
                .ok()?,
        };

        let properties = match &self.properties {
            None => BasicProperties::default(),
            Some(prop) => prop.build(),
        };

        Some(AmqpEvent {
            event,
            exchange,
            routing_key,
            properties,
        })
    }

    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        let request_builder = AmqpRequestBuilder {
            encoder: AmqpEncoder {
                encoder: self.encoder.clone(),
                transformer: self.transformer.clone(),
            },
        };
        let service = ServiceBuilder::new().service(AmqpService {
            channel: Arc::clone(&self.channel),
        });

        input
            .filter_map(|event| std::future::ready(self.make_amqp_event(event)))
            .request_builder(default_request_builder_concurrency_limit(), request_builder)
            .filter_map(|request| async move {
                match request {
                    Err(e) => {
                        error!("Failed to build AMQP request: {:?}.", e);
                        None
                    }
                    Ok(req) => Some(req),
                }
            })
            .into_driver(service)
            .protocol("amqp_0_9_1")
            .run()
            .await
    }
}

#[async_trait]
impl StreamSink<Event> for AmqpSink {
    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
        self.run_inner(input).await
    }
}