vector/sinks/aws_s_s/sns/
client.rs1use super::{Client, SendMessageEntry, SendMessageResponse};
2use aws_sdk_sns::operation::publish::PublishError;
3use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError};
4use futures::TryFutureExt;
5use tracing::Instrument;
6
7#[derive(Clone, Debug)]
8pub(super) struct SnsMessagePublisher {
9 client: aws_sdk_sns::Client,
10 topic_arn: String,
11}
12
13impl SnsMessagePublisher {
14 pub(super) const fn new(client: aws_sdk_sns::Client, topic_arn: String) -> Self {
15 Self { client, topic_arn }
16 }
17}
18
19impl Client<PublishError> for SnsMessagePublisher {
20 async fn send_message(
21 &self,
22 entry: SendMessageEntry,
23 byte_size: usize,
24 ) -> Result<SendMessageResponse, SdkError<PublishError, HttpResponse>> {
25 self.client
26 .publish()
27 .message(entry.message_body)
28 .set_message_group_id(entry.message_group_id)
29 .set_message_deduplication_id(entry.message_deduplication_id)
30 .topic_arn(self.topic_arn.clone())
31 .send()
32 .map_ok(|_| SendMessageResponse {
33 byte_size,
34 json_byte_size: entry
35 .metadata
36 .events_estimated_json_encoded_byte_size()
37 .clone(),
38 })
39 .instrument(info_span!("request").or_current())
40 .await
41 }
42}