vector/sinks/amqp/
service.rs1use std::task::{Context, Poll};
4
5use bytes::Bytes;
6use futures::future::BoxFuture;
7use lapin::{BasicProperties, options::BasicPublishOptions};
8use snafu::Snafu;
9
10use super::channel::AmqpSinkChannels;
11use crate::sinks::prelude::*;
12
13pub(super) struct AmqpRequest {
16 body: Bytes,
17 exchange: String,
18 routing_key: String,
19 properties: BasicProperties,
20 finalizers: EventFinalizers,
21 metadata: RequestMetadata,
22}
23
24impl AmqpRequest {
25 pub(super) const fn new(
26 body: Bytes,
27 exchange: String,
28 routing_key: String,
29 properties: BasicProperties,
30 finalizers: EventFinalizers,
31 metadata: RequestMetadata,
32 ) -> Self {
33 Self {
34 body,
35 exchange,
36 routing_key,
37 properties,
38 finalizers,
39 metadata,
40 }
41 }
42}
43
44impl Finalizable for AmqpRequest {
45 fn take_finalizers(&mut self) -> EventFinalizers {
46 std::mem::take(&mut self.finalizers)
47 }
48}
49
50impl MetaDescriptive for AmqpRequest {
51 fn get_metadata(&self) -> &RequestMetadata {
52 &self.metadata
53 }
54
55 fn metadata_mut(&mut self) -> &mut RequestMetadata {
56 &mut self.metadata
57 }
58}
59
60pub(super) struct AmqpResponse {
62 byte_size: usize,
63 json_size: GroupedCountByteSize,
64}
65
66impl DriverResponse for AmqpResponse {
67 fn event_status(&self) -> EventStatus {
68 EventStatus::Delivered
69 }
70
71 fn events_sent(&self) -> &GroupedCountByteSize {
72 &self.json_size
73 }
74
75 fn bytes_sent(&self) -> Option<usize> {
76 Some(self.byte_size)
77 }
78}
79
80pub(super) struct AmqpService {
82 pub(super) channels: AmqpSinkChannels,
83}
84
85#[derive(Debug, Snafu)]
86pub enum AmqpError {
87 #[snafu(display("Failed retrieving Acknowledgement: {}", error))]
88 AcknowledgementFailed { error: lapin::Error },
89
90 #[snafu(display("Failed AMQP request: {}", error))]
91 DeliveryFailed { error: lapin::Error },
92
93 #[snafu(display("Received Negative Acknowledgement from AMQP broker."))]
94 Nack,
95
96 #[snafu(display("Failed to open AMQP channel: {}", error))]
97 ConnectFailed { error: vector_common::Error },
98
99 #[snafu(display("Channel is not writeable: {:?}", state))]
100 ChannelClosed { state: lapin::ChannelState },
101
102 #[snafu(display("Channel pool error: {}", error))]
103 PoolError { error: vector_common::Error },
104}
105
106impl Service<AmqpRequest> for AmqpService {
107 type Response = AmqpResponse;
108
109 type Error = AmqpError;
110
111 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
112
113 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
114 Poll::Ready(Ok(()))
115 }
116
117 fn call(&mut self, req: AmqpRequest) -> Self::Future {
118 let channel = self.channels.clone();
119
120 Box::pin(async move {
121 let channel = channel.get().await.map_err(|error| AmqpError::PoolError {
122 error: Box::new(error),
123 })?;
124
125 let byte_size = req.body.len();
126 let fut = channel
127 .basic_publish(
128 &req.exchange,
129 &req.routing_key,
130 BasicPublishOptions::default(),
131 req.body.as_ref(),
132 req.properties,
133 )
134 .await;
135
136 match fut {
137 Ok(result) => match result.await {
138 Ok(lapin::publisher_confirm::Confirmation::Nack(_)) => Err(AmqpError::Nack),
139 Err(error) => Err(AmqpError::AcknowledgementFailed { error }),
140 Ok(_) => Ok(AmqpResponse {
141 json_size: req.metadata.into_events_estimated_json_encoded_byte_size(),
142 byte_size,
143 }),
144 },
145 Err(error) => Err(AmqpError::DeliveryFailed { error }),
146 }
147 })
148 }
149}