vector/sinks/amqp/
service.rs

1//! The main tower service that takes the request created by the request builder
2//! and sends it to `AMQP`.
3use crate::sinks::prelude::*;
4use bytes::Bytes;
5use futures::future::BoxFuture;
6use lapin::{options::BasicPublishOptions, BasicProperties};
7use snafu::Snafu;
8use std::task::{Context, Poll};
9
10use super::channel::AmqpSinkChannels;
11
12/// The request contains the data to send to `AMQP` together
13/// with the information need to route the message.
14pub(super) struct AmqpRequest {
15    body: Bytes,
16    exchange: String,
17    routing_key: String,
18    properties: BasicProperties,
19    finalizers: EventFinalizers,
20    metadata: RequestMetadata,
21}
22
23impl AmqpRequest {
24    pub(super) const fn new(
25        body: Bytes,
26        exchange: String,
27        routing_key: String,
28        properties: BasicProperties,
29        finalizers: EventFinalizers,
30        metadata: RequestMetadata,
31    ) -> Self {
32        Self {
33            body,
34            exchange,
35            routing_key,
36            properties,
37            finalizers,
38            metadata,
39        }
40    }
41}
42
43impl Finalizable for AmqpRequest {
44    fn take_finalizers(&mut self) -> EventFinalizers {
45        std::mem::take(&mut self.finalizers)
46    }
47}
48
49impl MetaDescriptive for AmqpRequest {
50    fn get_metadata(&self) -> &RequestMetadata {
51        &self.metadata
52    }
53
54    fn metadata_mut(&mut self) -> &mut RequestMetadata {
55        &mut self.metadata
56    }
57}
58
59/// A successful response from `AMQP`.
60pub(super) struct AmqpResponse {
61    byte_size: usize,
62    json_size: GroupedCountByteSize,
63}
64
65impl DriverResponse for AmqpResponse {
66    fn event_status(&self) -> EventStatus {
67        EventStatus::Delivered
68    }
69
70    fn events_sent(&self) -> &GroupedCountByteSize {
71        &self.json_size
72    }
73
74    fn bytes_sent(&self) -> Option<usize> {
75        Some(self.byte_size)
76    }
77}
78
79/// The tower service that handles the actual sending of data to `AMQP`.
80pub(super) struct AmqpService {
81    pub(super) channels: AmqpSinkChannels,
82}
83
84#[derive(Debug, Snafu)]
85pub enum AmqpError {
86    #[snafu(display("Failed retrieving Acknowledgement: {}", error))]
87    AcknowledgementFailed { error: lapin::Error },
88
89    #[snafu(display("Failed AMQP request: {}", error))]
90    DeliveryFailed { error: lapin::Error },
91
92    #[snafu(display("Received Negative Acknowledgement from AMQP broker."))]
93    Nack,
94
95    #[snafu(display("Failed to open AMQP channel: {}", error))]
96    ConnectFailed { error: vector_common::Error },
97
98    #[snafu(display("Channel is not writeable: {:?}", state))]
99    ChannelClosed { state: lapin::ChannelState },
100
101    #[snafu(display("Channel pool error: {}", error))]
102    PoolError { error: vector_common::Error },
103}
104
105impl Service<AmqpRequest> for AmqpService {
106    type Response = AmqpResponse;
107
108    type Error = AmqpError;
109
110    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
111
112    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
113        Poll::Ready(Ok(()))
114    }
115
116    fn call(&mut self, req: AmqpRequest) -> Self::Future {
117        let channel = self.channels.clone();
118
119        Box::pin(async move {
120            let channel = channel.get().await.map_err(|error| AmqpError::PoolError {
121                error: Box::new(error),
122            })?;
123
124            let byte_size = req.body.len();
125            let fut = channel
126                .basic_publish(
127                    &req.exchange,
128                    &req.routing_key,
129                    BasicPublishOptions::default(),
130                    req.body.as_ref(),
131                    req.properties,
132                )
133                .await;
134
135            match fut {
136                Ok(result) => match result.await {
137                    Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => Err(AmqpError::Nack),
138                    Err(error) => Err(AmqpError::AcknowledgementFailed { error }),
139                    Ok(_) => Ok(AmqpResponse {
140                        json_size: req.metadata.into_events_estimated_json_encoded_byte_size(),
141                        byte_size,
142                    }),
143                },
144                Err(error) => Err(AmqpError::DeliveryFailed { error }),
145            }
146        })
147    }
148}