vector/sinks/aws_s_s/sns/
client.rs

1use super::{Client, SendMessageEntry, SendMessageResponse};
2use aws_sdk_sns::operation::publish::PublishError;
3use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
4use futures::TryFutureExt;
5use tracing::Instrument;
6
7#[derive(Clone, Debug)]
8pub(super) struct SnsMessagePublisher {
9    client: aws_sdk_sns::Client,
10    topic_arn: String,
11}
12
13impl SnsMessagePublisher {
14    pub(super) const fn new(client: aws_sdk_sns::Client, topic_arn: String) -> Self {
15        Self { client, topic_arn }
16    }
17}
18
19impl Client<PublishError> for SnsMessagePublisher {
20    async fn send_message(
21        &self,
22        entry: SendMessageEntry,
23        byte_size: usize,
24    ) -> Result<SendMessageResponse, SdkError<PublishError, HttpResponse>> {
25        self.client
26            .publish()
27            .message(entry.message_body)
28            .set_message_group_id(entry.message_group_id)
29            .set_message_deduplication_id(entry.message_deduplication_id)
30            .topic_arn(self.topic_arn.clone())
31            .send()
32            .map_ok(|_| SendMessageResponse {
33                byte_size,
34                json_byte_size: entry
35                    .metadata
36                    .events_estimated_json_encoded_byte_size()
37                    .clone(),
38            })
39            .instrument(info_span!("request").or_current())
40            .await
41    }
42}