vector/sinks/amqp/
request_builder.rs1use crate::sinks::prelude::*;
5use bytes::Bytes;
6use lapin::BasicProperties;
7use std::io;
8
9use super::{encoder::AmqpEncoder, service::AmqpRequest, sink::AmqpEvent};
10
11pub(super) struct AmqpMetadata {
12 exchange: String,
13 routing_key: String,
14 properties: BasicProperties,
15 finalizers: EventFinalizers,
16}
17
18pub(super) struct AmqpRequestBuilder {
22 pub(super) encoder: AmqpEncoder,
23}
24
25impl RequestBuilder<AmqpEvent> for AmqpRequestBuilder {
26 type Metadata = AmqpMetadata;
27 type Events = Event;
28 type Encoder = AmqpEncoder;
29 type Payload = Bytes;
30 type Request = AmqpRequest;
31 type Error = io::Error;
32
33 fn compression(&self) -> Compression {
34 Compression::None
35 }
36
37 fn encoder(&self) -> &Self::Encoder {
38 &self.encoder
39 }
40
41 fn split_input(
42 &self,
43 mut input: AmqpEvent,
44 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
45 let builder = RequestMetadataBuilder::from_event(&input.event);
46
47 let metadata = AmqpMetadata {
48 exchange: input.exchange,
49 routing_key: input.routing_key,
50 properties: input.properties,
51 finalizers: input.event.take_finalizers(),
52 };
53
54 (metadata, builder, input.event)
55 }
56
57 fn build_request(
58 &self,
59 amqp_metadata: Self::Metadata,
60 metadata: RequestMetadata,
61 payload: EncodeResult<Self::Payload>,
62 ) -> Self::Request {
63 let body = payload.into_payload();
64 AmqpRequest::new(
65 body,
66 amqp_metadata.exchange,
67 amqp_metadata.routing_key,
68 amqp_metadata.properties,
69 amqp_metadata.finalizers,
70 metadata,
71 )
72 }
73}