vector/sinks/azure_common/
service.rs1use std::{
2 result::Result as StdResult,
3 sync::Arc,
4 task::{Context, Poll},
5};
6
7use azure_core::http::RequestContent;
8use azure_storage_blob::{BlobContainerClient, models::BlockBlobClientUploadOptions};
9use futures::future::BoxFuture;
10use tower::Service;
11use tracing::Instrument;
12
13use crate::sinks::azure_common::config::{AzureBlobRequest, AzureBlobResponse};
14
15#[derive(Clone)]
16pub struct AzureBlobService {
17 client: Arc<BlobContainerClient>,
19}
20
21impl AzureBlobService {
22 pub const fn new(client: Arc<BlobContainerClient>) -> AzureBlobService {
23 AzureBlobService { client }
24 }
25}
26
27impl Service<AzureBlobRequest> for AzureBlobService {
28 type Response = AzureBlobResponse;
29 type Error = Box<dyn std::error::Error + std::marker::Send + std::marker::Sync>;
30 type Future = BoxFuture<'static, StdResult<Self::Response, Self::Error>>;
31
32 fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<StdResult<(), Self::Error>> {
34 Poll::Ready(Ok(()))
35 }
36
37 fn call(&mut self, request: AzureBlobRequest) -> Self::Future {
39 let this = self.clone();
40
41 Box::pin(async move {
42 let blob_client = this
43 .client
44 .blob_client(request.metadata.partition_key.as_str());
45 let byte_size = request.blob_data.len();
46 let upload_options = BlockBlobClientUploadOptions {
47 blob_content_type: Some(request.content_type.to_string()),
48 blob_content_encoding: request.content_encoding.map(|e| e.to_string()),
49 ..Default::default()
50 };
51
52 let result = blob_client
53 .upload(
54 RequestContent::from(request.blob_data.to_vec()),
55 false,
56 byte_size as u64,
57 Some(upload_options),
58 )
59 .instrument(info_span!("request").or_current())
60 .await
61 .map_err(|err| err.into());
62
63 result.map(|_resp| AzureBlobResponse {
64 events_byte_size: request
65 .request_metadata
66 .into_events_estimated_json_encoded_byte_size(),
67 byte_size,
68 })
69 })
70 }
71}