vector/sinks/mqtt/
request_builder.rs

1use std::io;
2
3use bytes::{Bytes, BytesMut};
4use tokio_util::codec::Encoder as _;
5
6use super::{service::MqttRequest, sink::MqttEvent};
7use crate::sinks::prelude::*;
8
9pub(super) struct MqttMetadata {
10    topic: String,
11    finalizers: EventFinalizers,
12}
13
14pub(super) struct MqttEncoder {
15    pub(super) encoder: crate::codecs::Encoder<()>,
16    pub(super) transformer: crate::codecs::Transformer,
17}
18
19impl encoding::Encoder<Event> for MqttEncoder {
20    fn encode_input(
21        &self,
22        mut input: Event,
23        writer: &mut dyn io::Write,
24    ) -> io::Result<(usize, GroupedCountByteSize)> {
25        let mut body = BytesMut::new();
26        self.transformer.transform(&mut input);
27
28        let mut byte_size = telemetry().create_request_count_byte_size();
29        byte_size.add_event(&input, input.estimated_json_encoded_size_of());
30
31        let mut encoder = self.encoder.clone();
32        encoder
33            .encode(input, &mut body)
34            .map_err(|_| io::Error::other("unable to encode"))?;
35
36        let body = body.freeze();
37        write_all(writer, 1, body.as_ref())?;
38
39        Ok((body.len(), byte_size))
40    }
41}
42
43pub(super) struct MqttRequestBuilder {
44    pub(super) encoder: MqttEncoder,
45}
46
47impl RequestBuilder<MqttEvent> for MqttRequestBuilder {
48    type Metadata = MqttMetadata;
49    type Events = Event;
50    type Encoder = MqttEncoder;
51    type Payload = Bytes;
52    type Request = MqttRequest;
53    type Error = io::Error;
54
55    fn compression(&self) -> Compression {
56        Compression::None
57    }
58
59    fn encoder(&self) -> &Self::Encoder {
60        &self.encoder
61    }
62
63    fn split_input(
64        &self,
65        mut input: MqttEvent,
66    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
67        let builder = RequestMetadataBuilder::from_event(&input.event);
68
69        let metadata = MqttMetadata {
70            topic: input.topic,
71            finalizers: input.event.take_finalizers(),
72        };
73
74        (metadata, builder, input.event)
75    }
76
77    fn build_request(
78        &self,
79        mqtt_metadata: Self::Metadata,
80        metadata: RequestMetadata,
81        payload: EncodeResult<Self::Payload>,
82    ) -> Self::Request {
83        let body = payload.into_payload();
84        MqttRequest {
85            body,
86            topic: mqtt_metadata.topic,
87            finalizers: mqtt_metadata.finalizers,
88            metadata,
89        }
90    }
91}