vector/sinks/azure_common/
service.rs

1use std::{
2    result::Result as StdResult,
3    sync::Arc,
4    task::{Context, Poll},
5};
6
7use azure_core::http::RequestContent;
8use azure_storage_blob::{BlobContainerClient, models::BlockBlobClientUploadOptions};
9use futures::future::BoxFuture;
10use tower::Service;
11use tracing::Instrument;
12
13use crate::sinks::azure_common::config::{AzureBlobRequest, AzureBlobResponse};
14
15#[derive(Clone)]
16pub struct AzureBlobService {
17    // Using the new azure_storage_blob container client.
18    client: Arc<BlobContainerClient>,
19}
20
21impl AzureBlobService {
22    pub const fn new(client: Arc<BlobContainerClient>) -> AzureBlobService {
23        AzureBlobService { client }
24    }
25}
26
27impl Service<AzureBlobRequest> for AzureBlobService {
28    type Response = AzureBlobResponse;
29    type Error = Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>;
30    type Future = BoxFuture<'static, StdResult<Self::Response, Self::Error>>;
31
32    // Emission of an internal event in case of errors is handled upstream by the caller.
33    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<StdResult<(), Self::Error>> {
34        Poll::Ready(Ok(()))
35    }
36
37    // Emission of internal events for errors and dropped events is handled upstream by the caller.
38    fn call(&mut self, request: AzureBlobRequest) -> Self::Future {
39        let this = self.clone();
40
41        Box::pin(async move {
42            let blob_client = this
43                .client
44                .blob_client(request.metadata.partition_key.as_str());
45            let byte_size = request.blob_data.len();
46            let upload_options = BlockBlobClientUploadOptions {
47                blob_content_type: Some(request.content_type.to_string()),
48                blob_content_encoding: request.content_encoding.map(|e| e.to_string()),
49                ..Default::default()
50            };
51
52            let result = blob_client
53                .upload(
54                    RequestContent::from(request.blob_data.to_vec()),
55                    false,
56                    byte_size as u64,
57                    Some(upload_options),
58                )
59                .instrument(info_span!("request").or_current())
60                .await
61                .map_err(|err| err.into());
62
63            result.map(|_resp| AzureBlobResponse {
64                events_byte_size: request
65                    .request_metadata
66                    .into_events_estimated_json_encoded_byte_size(),
67                byte_size,
68            })
69        })
70    }
71}