vector/sinks/aws_s_s/sns/
client.rs

1use aws_sdk_sns::operation::publish::PublishError;
2use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
3use futures::TryFutureExt;
4use tracing::Instrument;
5
6use super::{Client, SendMessageEntry, SendMessageResponse};
7
8#[derive(Clone, Debug)]
9pub(super) struct SnsMessagePublisher {
10    client: aws_sdk_sns::Client,
11    topic_arn: String,
12}
13
14impl SnsMessagePublisher {
15    pub(super) const fn new(client: aws_sdk_sns::Client, topic_arn: String) -> Self {
16        Self { client, topic_arn }
17    }
18}
19
20impl Client<PublishError> for SnsMessagePublisher {
21    async fn send_message(
22        &self,
23        entry: SendMessageEntry,
24        byte_size: usize,
25    ) -> Result<SendMessageResponse, SdkError<PublishError, HttpResponse>> {
26        self.client
27            .publish()
28            .message(entry.message_body)
29            .set_message_group_id(entry.message_group_id)
30            .set_message_deduplication_id(entry.message_deduplication_id)
31            .topic_arn(self.topic_arn.clone())
32            .send()
33            .map_ok(|_| SendMessageResponse {
34                byte_size,
35                json_byte_size: entry
36                    .metadata
37                    .events_estimated_json_encoded_byte_size()
38                    .clone(),
39            })
40            .instrument(info_span!("request").or_current())
41            .await
42    }
43}