1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
use std::task::{Context, Poll};

use crate::sinks::prelude::*;
use bytes::Bytes;
use futures::future::BoxFuture;
use rumqttc::{AsyncClient, ClientError};
use snafu::Snafu;

use super::config::MqttQoS;

pub(super) struct MqttResponse {
    byte_size: usize,
    json_size: GroupedCountByteSize,
}

impl DriverResponse for MqttResponse {
    fn event_status(&self) -> EventStatus {
        EventStatus::Delivered
    }

    fn events_sent(&self) -> &GroupedCountByteSize {
        &self.json_size
    }

    fn bytes_sent(&self) -> Option<usize> {
        Some(self.byte_size)
    }
}

pub(super) struct MqttRequest {
    pub(super) body: Bytes,
    pub(super) topic: String,
    pub(super) finalizers: EventFinalizers,
    pub(super) metadata: RequestMetadata,
}

impl Finalizable for MqttRequest {
    fn take_finalizers(&mut self) -> EventFinalizers {
        std::mem::take(&mut self.finalizers)
    }
}

impl MetaDescriptive for MqttRequest {
    fn get_metadata(&self) -> &RequestMetadata {
        &self.metadata
    }

    fn metadata_mut(&mut self) -> &mut RequestMetadata {
        &mut self.metadata
    }
}

pub(super) struct MqttService {
    pub(super) client: AsyncClient,
    pub(super) quality_of_service: MqttQoS,
    pub(super) retain: bool,
}

#[derive(Debug, Snafu)]
pub(super) enum MqttError {
    #[snafu(display("error"))]
    Error { error: ClientError },
}

impl Service<MqttRequest> for MqttService {
    type Response = MqttResponse;
    type Error = MqttError;
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: MqttRequest) -> Self::Future {
        let quality_of_service = self.quality_of_service;
        let retain = self.retain;
        let client = self.client.clone();

        Box::pin(async move {
            let byte_size = req.body.len();

            let res = client
                .publish(&req.topic, quality_of_service.into(), retain, req.body)
                .await;
            match res {
                Ok(()) => Ok(MqttResponse {
                    byte_size,
                    json_size: req.metadata.into_events_estimated_json_encoded_byte_size(),
                }),
                Err(error) => Err(MqttError::Error { error }),
            }
        })
    }
}