vector/sinks/amqp/
service.rs

1//! The main tower service that takes the request created by the request builder
2//! and sends it to `AMQP`.
3use std::task::{Context, Poll};
4
5use bytes::Bytes;
6use futures::future::BoxFuture;
7use lapin::{BasicProperties, options::BasicPublishOptions};
8use snafu::Snafu;
9
10use super::channel::AmqpSinkChannels;
11use crate::sinks::prelude::*;
12
13/// The request contains the data to send to `AMQP` together
14/// with the information need to route the message.
15pub(super) struct AmqpRequest {
16    body: Bytes,
17    exchange: String,
18    routing_key: String,
19    properties: BasicProperties,
20    finalizers: EventFinalizers,
21    metadata: RequestMetadata,
22}
23
24impl AmqpRequest {
25    pub(super) const fn new(
26        body: Bytes,
27        exchange: String,
28        routing_key: String,
29        properties: BasicProperties,
30        finalizers: EventFinalizers,
31        metadata: RequestMetadata,
32    ) -> Self {
33        Self {
34            body,
35            exchange,
36            routing_key,
37            properties,
38            finalizers,
39            metadata,
40        }
41    }
42}
43
44impl Finalizable for AmqpRequest {
45    fn take_finalizers(&mut self) -> EventFinalizers {
46        std::mem::take(&mut self.finalizers)
47    }
48}
49
50impl MetaDescriptive for AmqpRequest {
51    fn get_metadata(&self) -> &RequestMetadata {
52        &self.metadata
53    }
54
55    fn metadata_mut(&mut self) -> &mut RequestMetadata {
56        &mut self.metadata
57    }
58}
59
60/// A successful response from `AMQP`.
61pub(super) struct AmqpResponse {
62    byte_size: usize,
63    json_size: GroupedCountByteSize,
64}
65
66impl DriverResponse for AmqpResponse {
67    fn event_status(&self) -> EventStatus {
68        EventStatus::Delivered
69    }
70
71    fn events_sent(&self) -> &GroupedCountByteSize {
72        &self.json_size
73    }
74
75    fn bytes_sent(&self) -> Option<usize> {
76        Some(self.byte_size)
77    }
78}
79
80/// The tower service that handles the actual sending of data to `AMQP`.
81pub(super) struct AmqpService {
82    pub(super) channels: AmqpSinkChannels,
83}
84
85#[derive(Debug, Snafu)]
86pub enum AmqpError {
87    #[snafu(display("Failed retrieving Acknowledgement: {}", error))]
88    AcknowledgementFailed { error: lapin::Error },
89
90    #[snafu(display("Failed AMQP request: {}", error))]
91    DeliveryFailed { error: lapin::Error },
92
93    #[snafu(display("Received Negative Acknowledgement from AMQP broker."))]
94    Nack,
95
96    #[snafu(display("Failed to open AMQP channel: {}", error))]
97    ConnectFailed { error: vector_common::Error },
98
99    #[snafu(display("Channel is not writeable: {:?}", state))]
100    ChannelClosed { state: lapin::ChannelState },
101
102    #[snafu(display("Channel pool error: {}", error))]
103    PoolError { error: vector_common::Error },
104}
105
106impl Service<AmqpRequest> for AmqpService {
107    type Response = AmqpResponse;
108
109    type Error = AmqpError;
110
111    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
112
113    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
114        Poll::Ready(Ok(()))
115    }
116
117    fn call(&mut self, req: AmqpRequest) -> Self::Future {
118        let channel = self.channels.clone();
119
120        Box::pin(async move {
121            let channel = channel.get().await.map_err(|error| AmqpError::PoolError {
122                error: Box::new(error),
123            })?;
124
125            let byte_size = req.body.len();
126            let fut = channel
127                .basic_publish(
128                    &req.exchange,
129                    &req.routing_key,
130                    BasicPublishOptions::default(),
131                    req.body.as_ref(),
132                    req.properties,
133                )
134                .await;
135
136            match fut {
137                Ok(result) => match result.await {
138                    Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => Err(AmqpError::Nack),
139                    Err(error) => Err(AmqpError::AcknowledgementFailed { error }),
140                    Ok(_) => Ok(AmqpResponse {
141                        json_size: req.metadata.into_events_estimated_json_encoded_byte_size(),
142                        byte_size,
143                    }),
144                },
145                Err(error) => Err(AmqpError::DeliveryFailed { error }),
146            }
147        })
148    }
149}