vector/sinks/amqp/
service.rs1use crate::sinks::prelude::*;
4use bytes::Bytes;
5use futures::future::BoxFuture;
6use lapin::{options::BasicPublishOptions, BasicProperties};
7use snafu::Snafu;
8use std::task::{Context, Poll};
9
10use super::channel::AmqpSinkChannels;
11
12pub(super) struct AmqpRequest {
15 body: Bytes,
16 exchange: String,
17 routing_key: String,
18 properties: BasicProperties,
19 finalizers: EventFinalizers,
20 metadata: RequestMetadata,
21}
22
23impl AmqpRequest {
24 pub(super) const fn new(
25 body: Bytes,
26 exchange: String,
27 routing_key: String,
28 properties: BasicProperties,
29 finalizers: EventFinalizers,
30 metadata: RequestMetadata,
31 ) -> Self {
32 Self {
33 body,
34 exchange,
35 routing_key,
36 properties,
37 finalizers,
38 metadata,
39 }
40 }
41}
42
43impl Finalizable for AmqpRequest {
44 fn take_finalizers(&mut self) -> EventFinalizers {
45 std::mem::take(&mut self.finalizers)
46 }
47}
48
49impl MetaDescriptive for AmqpRequest {
50 fn get_metadata(&self) -> &RequestMetadata {
51 &self.metadata
52 }
53
54 fn metadata_mut(&mut self) -> &mut RequestMetadata {
55 &mut self.metadata
56 }
57}
58
59pub(super) struct AmqpResponse {
61 byte_size: usize,
62 json_size: GroupedCountByteSize,
63}
64
65impl DriverResponse for AmqpResponse {
66 fn event_status(&self) -> EventStatus {
67 EventStatus::Delivered
68 }
69
70 fn events_sent(&self) -> &GroupedCountByteSize {
71 &self.json_size
72 }
73
74 fn bytes_sent(&self) -> Option<usize> {
75 Some(self.byte_size)
76 }
77}
78
79pub(super) struct AmqpService {
81 pub(super) channels: AmqpSinkChannels,
82}
83
84#[derive(Debug, Snafu)]
85pub enum AmqpError {
86 #[snafu(display("Failed retrieving Acknowledgement: {}", error))]
87 AcknowledgementFailed { error: lapin::Error },
88
89 #[snafu(display("Failed AMQP request: {}", error))]
90 DeliveryFailed { error: lapin::Error },
91
92 #[snafu(display("Received Negative Acknowledgement from AMQP broker."))]
93 Nack,
94
95 #[snafu(display("Failed to open AMQP channel: {}", error))]
96 ConnectFailed { error: vector_common::Error },
97
98 #[snafu(display("Channel is not writeable: {:?}", state))]
99 ChannelClosed { state: lapin::ChannelState },
100
101 #[snafu(display("Channel pool error: {}", error))]
102 PoolError { error: vector_common::Error },
103}
104
105impl Service<AmqpRequest> for AmqpService {
106 type Response = AmqpResponse;
107
108 type Error = AmqpError;
109
110 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
111
112 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
113 Poll::Ready(Ok(()))
114 }
115
116 fn call(&mut self, req: AmqpRequest) -> Self::Future {
117 let channel = self.channels.clone();
118
119 Box::pin(async move {
120 let channel = channel.get().await.map_err(|error| AmqpError::PoolError {
121 error: Box::new(error),
122 })?;
123
124 let byte_size = req.body.len();
125 let fut = channel
126 .basic_publish(
127 &req.exchange,
128 &req.routing_key,
129 BasicPublishOptions::default(),
130 req.body.as_ref(),
131 req.properties,
132 )
133 .await;
134
135 match fut {
136 Ok(result) => match result.await {
137 Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => Err(AmqpError::Nack),
138 Err(error) => Err(AmqpError::AcknowledgementFailed { error }),
139 Ok(_) => Ok(AmqpResponse {
140 json_size: req.metadata.into_events_estimated_json_encoded_byte_size(),
141 byte_size,
142 }),
143 },
144 Err(error) => Err(AmqpError::DeliveryFailed { error }),
145 }
146 })
147 }
148}