vector/sinks/mqtt/
request_builder.rs1use std::io;
2
3use bytes::{Bytes, BytesMut};
4use tokio_util::codec::Encoder as _;
5
6use crate::sinks::prelude::*;
7
8use super::{service::MqttRequest, sink::MqttEvent};
9
10pub(super) struct MqttMetadata {
11 topic: String,
12 finalizers: EventFinalizers,
13}
14
15pub(super) struct MqttEncoder {
16 pub(super) encoder: crate::codecs::Encoder<()>,
17 pub(super) transformer: crate::codecs::Transformer,
18}
19
20impl encoding::Encoder<Event> for MqttEncoder {
21 fn encode_input(
22 &self,
23 mut input: Event,
24 writer: &mut dyn io::Write,
25 ) -> io::Result<(usize, GroupedCountByteSize)> {
26 let mut body = BytesMut::new();
27 self.transformer.transform(&mut input);
28
29 let mut byte_size = telemetry().create_request_count_byte_size();
30 byte_size.add_event(&input, input.estimated_json_encoded_size_of());
31
32 let mut encoder = self.encoder.clone();
33 encoder
34 .encode(input, &mut body)
35 .map_err(|_| io::Error::other("unable to encode"))?;
36
37 let body = body.freeze();
38 write_all(writer, 1, body.as_ref())?;
39
40 Ok((body.len(), byte_size))
41 }
42}
43
44pub(super) struct MqttRequestBuilder {
45 pub(super) encoder: MqttEncoder,
46}
47
48impl RequestBuilder<MqttEvent> for MqttRequestBuilder {
49 type Metadata = MqttMetadata;
50 type Events = Event;
51 type Encoder = MqttEncoder;
52 type Payload = Bytes;
53 type Request = MqttRequest;
54 type Error = io::Error;
55
56 fn compression(&self) -> Compression {
57 Compression::None
58 }
59
60 fn encoder(&self) -> &Self::Encoder {
61 &self.encoder
62 }
63
64 fn split_input(
65 &self,
66 mut input: MqttEvent,
67 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
68 let builder = RequestMetadataBuilder::from_event(&input.event);
69
70 let metadata = MqttMetadata {
71 topic: input.topic,
72 finalizers: input.event.take_finalizers(),
73 };
74
75 (metadata, builder, input.event)
76 }
77
78 fn build_request(
79 &self,
80 mqtt_metadata: Self::Metadata,
81 metadata: RequestMetadata,
82 payload: EncodeResult<Self::Payload>,
83 ) -> Self::Request {
84 let body = payload.into_payload();
85 MqttRequest {
86 body,
87 topic: mqtt_metadata.topic,
88 finalizers: mqtt_metadata.finalizers,
89 metadata,
90 }
91 }
92}