vector/sinks/amqp/
request_builder.rs1use std::io;
5
6use bytes::Bytes;
7use lapin::BasicProperties;
8
9use super::{encoder::AmqpEncoder, service::AmqpRequest, sink::AmqpEvent};
10use crate::sinks::prelude::*;
11
12pub(super) struct AmqpMetadata {
13 exchange: String,
14 routing_key: String,
15 properties: BasicProperties,
16 finalizers: EventFinalizers,
17}
18
19pub(super) struct AmqpRequestBuilder {
23 pub(super) encoder: AmqpEncoder,
24}
25
26impl RequestBuilder<AmqpEvent> for AmqpRequestBuilder {
27 type Metadata = AmqpMetadata;
28 type Events = Event;
29 type Encoder = AmqpEncoder;
30 type Payload = Bytes;
31 type Request = AmqpRequest;
32 type Error = io::Error;
33
34 fn compression(&self) -> Compression {
35 Compression::None
36 }
37
38 fn encoder(&self) -> &Self::Encoder {
39 &self.encoder
40 }
41
42 fn split_input(
43 &self,
44 mut input: AmqpEvent,
45 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
46 let builder = RequestMetadataBuilder::from_event(&input.event);
47
48 let metadata = AmqpMetadata {
49 exchange: input.exchange,
50 routing_key: input.routing_key,
51 properties: input.properties,
52 finalizers: input.event.take_finalizers(),
53 };
54
55 (metadata, builder, input.event)
56 }
57
58 fn build_request(
59 &self,
60 amqp_metadata: Self::Metadata,
61 metadata: RequestMetadata,
62 payload: EncodeResult<Self::Payload>,
63 ) -> Self::Request {
64 let body = payload.into_payload();
65 AmqpRequest::new(
66 body,
67 amqp_metadata.exchange,
68 amqp_metadata.routing_key,
69 amqp_metadata.properties,
70 amqp_metadata.finalizers,
71 metadata,
72 )
73 }
74}