vector/sinks/azure_common/
service.rs

1use std::{
2    result::Result as StdResult,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use azure_storage_blobs::prelude::*;
8use futures::future::BoxFuture;
9use tower::Service;
10use tracing::Instrument;
11
12use crate::sinks::azure_common::config::{AzureBlobRequest, AzureBlobResponse};
13
14#[derive(Clone)]
15pub struct AzureBlobService {
16    client: Arc<ContainerClient>,
17}
18
19impl AzureBlobService {
20    pub const fn new(client: Arc<ContainerClient>) -> AzureBlobService {
21        AzureBlobService { client }
22    }
23}
24
25impl Service<AzureBlobRequest> for AzureBlobService {
26    type Response = AzureBlobResponse;
27    type Error = Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>;
28    type Future = BoxFuture<'static, StdResult<Self::Response, Self::Error>>;
29
30    // Emission of an internal event in case of errors is handled upstream by the caller.
31    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<StdResult<(), Self::Error>> {
32        Poll::Ready(Ok(()))
33    }
34
35    // Emission of internal events for errors and dropped events is handled upstream by the caller.
36    fn call(&mut self, request: AzureBlobRequest) -> Self::Future {
37        let this = self.clone();
38
39        Box::pin(async move {
40            let client = this
41                .client
42                .blob_client(request.metadata.partition_key.as_str());
43            let byte_size = request.blob_data.len();
44            let blob = client
45                .put_block_blob(request.blob_data)
46                .content_type(request.content_type);
47            let blob = match request.content_encoding {
48                Some(encoding) => blob.content_encoding(encoding),
49                None => blob,
50            };
51
52            let result = blob
53                .into_future()
54                .instrument(info_span!("request").or_current())
55                .await
56                .map_err(|err| err.into());
57
58            result.map(|inner| AzureBlobResponse {
59                inner,
60                events_byte_size: request
61                    .request_metadata
62                    .into_events_estimated_json_encoded_byte_size(),
63                byte_size,
64            })
65        })
66    }
67}