vector/sinks/mqtt/
request_builder.rs1use std::io;
2
3use bytes::{Bytes, BytesMut};
4use tokio_util::codec::Encoder as _;
5
6use super::{service::MqttRequest, sink::MqttEvent};
7use crate::sinks::prelude::*;
8
9pub(super) struct MqttMetadata {
10 topic: String,
11 finalizers: EventFinalizers,
12}
13
14pub(super) struct MqttEncoder {
15 pub(super) encoder: crate::codecs::Encoder<()>,
16 pub(super) transformer: crate::codecs::Transformer,
17}
18
19impl encoding::Encoder<Event> for MqttEncoder {
20 fn encode_input(
21 &self,
22 mut input: Event,
23 writer: &mut dyn io::Write,
24 ) -> io::Result<(usize, GroupedCountByteSize)> {
25 let mut body = BytesMut::new();
26 self.transformer.transform(&mut input);
27
28 let mut byte_size = telemetry().create_request_count_byte_size();
29 byte_size.add_event(&input, input.estimated_json_encoded_size_of());
30
31 let mut encoder = self.encoder.clone();
32 encoder
33 .encode(input, &mut body)
34 .map_err(|_| io::Error::other("unable to encode"))?;
35
36 let body = body.freeze();
37 write_all(writer, 1, body.as_ref())?;
38
39 Ok((body.len(), byte_size))
40 }
41}
42
43pub(super) struct MqttRequestBuilder {
44 pub(super) encoder: MqttEncoder,
45}
46
47impl RequestBuilder<MqttEvent> for MqttRequestBuilder {
48 type Metadata = MqttMetadata;
49 type Events = Event;
50 type Encoder = MqttEncoder;
51 type Payload = Bytes;
52 type Request = MqttRequest;
53 type Error = io::Error;
54
55 fn compression(&self) -> Compression {
56 Compression::None
57 }
58
59 fn encoder(&self) -> &Self::Encoder {
60 &self.encoder
61 }
62
63 fn split_input(
64 &self,
65 mut input: MqttEvent,
66 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
67 let builder = RequestMetadataBuilder::from_event(&input.event);
68
69 let metadata = MqttMetadata {
70 topic: input.topic,
71 finalizers: input.event.take_finalizers(),
72 };
73
74 (metadata, builder, input.event)
75 }
76
77 fn build_request(
78 &self,
79 mqtt_metadata: Self::Metadata,
80 metadata: RequestMetadata,
81 payload: EncodeResult<Self::Payload>,
82 ) -> Self::Request {
83 let body = payload.into_payload();
84 MqttRequest {
85 body,
86 topic: mqtt_metadata.topic,
87 finalizers: mqtt_metadata.finalizers,
88 metadata,
89 }
90 }
91}