vector/sinks/mqtt/
request_builder.rs

1use std::io;
2
3use bytes::{Bytes, BytesMut};
4use tokio_util::codec::Encoder as _;
5
6use crate::sinks::prelude::*;
7
8use super::{service::MqttRequest, sink::MqttEvent};
9
10pub(super) struct MqttMetadata {
11    topic: String,
12    finalizers: EventFinalizers,
13}
14
15pub(super) struct MqttEncoder {
16    pub(super) encoder: crate::codecs::Encoder<()>,
17    pub(super) transformer: crate::codecs::Transformer,
18}
19
20impl encoding::Encoder<Event> for MqttEncoder {
21    fn encode_input(
22        &self,
23        mut input: Event,
24        writer: &mut dyn io::Write,
25    ) -> io::Result<(usize, GroupedCountByteSize)> {
26        let mut body = BytesMut::new();
27        self.transformer.transform(&mut input);
28
29        let mut byte_size = telemetry().create_request_count_byte_size();
30        byte_size.add_event(&input, input.estimated_json_encoded_size_of());
31
32        let mut encoder = self.encoder.clone();
33        encoder
34            .encode(input, &mut body)
35            .map_err(|_| io::Error::other("unable to encode"))?;
36
37        let body = body.freeze();
38        write_all(writer, 1, body.as_ref())?;
39
40        Ok((body.len(), byte_size))
41    }
42}
43
44pub(super) struct MqttRequestBuilder {
45    pub(super) encoder: MqttEncoder,
46}
47
48impl RequestBuilder<MqttEvent> for MqttRequestBuilder {
49    type Metadata = MqttMetadata;
50    type Events = Event;
51    type Encoder = MqttEncoder;
52    type Payload = Bytes;
53    type Request = MqttRequest;
54    type Error = io::Error;
55
56    fn compression(&self) -> Compression {
57        Compression::None
58    }
59
60    fn encoder(&self) -> &Self::Encoder {
61        &self.encoder
62    }
63
64    fn split_input(
65        &self,
66        mut input: MqttEvent,
67    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
68        let builder = RequestMetadataBuilder::from_event(&input.event);
69
70        let metadata = MqttMetadata {
71            topic: input.topic,
72            finalizers: input.event.take_finalizers(),
73        };
74
75        (metadata, builder, input.event)
76    }
77
78    fn build_request(
79        &self,
80        mqtt_metadata: Self::Metadata,
81        metadata: RequestMetadata,
82        payload: EncodeResult<Self::Payload>,
83    ) -> Self::Request {
84        let body = payload.into_payload();
85        MqttRequest {
86            body,
87            topic: mqtt_metadata.topic,
88            finalizers: mqtt_metadata.finalizers,
89            metadata,
90        }
91    }
92}