vector/sinks/azure_common/
service.rs1use std::{
2 result::Result as StdResult,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use azure_storage_blobs::prelude::*;
8use futures::future::BoxFuture;
9use tower::Service;
10use tracing::Instrument;
11
12use crate::sinks::azure_common::config::{AzureBlobRequest, AzureBlobResponse};
13
14#[derive(Clone)]
15pub struct AzureBlobService {
16 client: Arc<ContainerClient>,
17}
18
19impl AzureBlobService {
20 pub const fn new(client: Arc<ContainerClient>) -> AzureBlobService {
21 AzureBlobService { client }
22 }
23}
24
25impl Service<AzureBlobRequest> for AzureBlobService {
26 type Response = AzureBlobResponse;
27 type Error = Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>;
28 type Future = BoxFuture<'static, StdResult<Self::Response, Self::Error>>;
29
30 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<StdResult<(), Self::Error>> {
32 Poll::Ready(Ok(()))
33 }
34
35 fn call(&mut self, request: AzureBlobRequest) -> Self::Future {
37 let this = self.clone();
38
39 Box::pin(async move {
40 let client = this
41 .client
42 .blob_client(request.metadata.partition_key.as_str());
43 let byte_size = request.blob_data.len();
44 let blob = client
45 .put_block_blob(request.blob_data)
46 .content_type(request.content_type);
47 let blob = match request.content_encoding {
48 Some(encoding) => blob.content_encoding(encoding),
49 None => blob,
50 };
51
52 let result = blob
53 .into_future()
54 .instrument(info_span!("request").or_current())
55 .await
56 .map_err(|err| err.into());
57
58 result.map(|inner| AzureBlobResponse {
59 inner,
60 events_byte_size: request
61 .request_metadata
62 .into_events_estimated_json_encoded_byte_size(),
63 byte_size,
64 })
65 })
66 }
67}