vector/sinks/azure_common/
config.rs

1use std::sync::Arc;
2
3use azure_core::error::HttpError;
4use azure_core_for_storage::RetryOptions;
5use azure_storage::{CloudLocation, ConnectionString};
6use azure_storage_blobs::{blob::operations::PutBlockBlobResponse, prelude::*};
7use bytes::Bytes;
8use futures::FutureExt;
9use http::StatusCode;
10use snafu::Snafu;
11use vector_lib::{
12    json_size::JsonSize,
13    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
14    stream::DriverResponse,
15};
16
17use crate::{
18    event::{EventFinalizers, EventStatus, Finalizable},
19    sinks::{Healthcheck, util::retries::RetryLogic},
20};
21
22#[derive(Debug, Clone)]
23pub struct AzureBlobRequest {
24    pub blob_data: Bytes,
25    pub content_encoding: Option<&'static str>,
26    pub content_type: &'static str,
27    pub metadata: AzureBlobMetadata,
28    pub request_metadata: RequestMetadata,
29}
30
31impl Finalizable for AzureBlobRequest {
32    fn take_finalizers(&mut self) -> EventFinalizers {
33        std::mem::take(&mut self.metadata.finalizers)
34    }
35}
36
37impl MetaDescriptive for AzureBlobRequest {
38    fn get_metadata(&self) -> &RequestMetadata {
39        &self.request_metadata
40    }
41
42    fn metadata_mut(&mut self) -> &mut RequestMetadata {
43        &mut self.request_metadata
44    }
45}
46
47#[derive(Clone, Debug)]
48pub struct AzureBlobMetadata {
49    pub partition_key: String,
50    pub count: usize,
51    pub byte_size: JsonSize,
52    pub finalizers: EventFinalizers,
53}
54
55#[derive(Debug, Clone)]
56pub struct AzureBlobRetryLogic;
57
58impl RetryLogic for AzureBlobRetryLogic {
59    type Error = HttpError;
60    type Request = AzureBlobRequest;
61    type Response = AzureBlobResponse;
62
63    fn is_retriable_error(&self, error: &Self::Error) -> bool {
64        error.status().is_server_error()
65            || StatusCode::TOO_MANY_REQUESTS.as_u16() == Into::<u16>::into(error.status())
66    }
67}
68
69#[derive(Debug)]
70pub struct AzureBlobResponse {
71    pub inner: PutBlockBlobResponse,
72    pub events_byte_size: GroupedCountByteSize,
73    pub byte_size: usize,
74}
75
76impl DriverResponse for AzureBlobResponse {
77    fn event_status(&self) -> EventStatus {
78        EventStatus::Delivered
79    }
80
81    fn events_sent(&self) -> &GroupedCountByteSize {
82        &self.events_byte_size
83    }
84
85    fn bytes_sent(&self) -> Option<usize> {
86        Some(self.byte_size)
87    }
88}
89
90#[derive(Debug, Snafu)]
91pub enum HealthcheckError {
92    #[snafu(display("Invalid connection string specified"))]
93    InvalidCredentials,
94    #[snafu(display("Container: {:?} not found", container))]
95    UnknownContainer { container: String },
96    #[snafu(display("Unknown status code: {}", status))]
97    Unknown { status: StatusCode },
98}
99
100pub fn build_healthcheck(
101    container_name: String,
102    client: Arc<ContainerClient>,
103) -> crate::Result<Healthcheck> {
104    let healthcheck = async move {
105        let response = client.get_properties().into_future().await;
106
107        let resp: crate::Result<()> = match response {
108            Ok(_) => Ok(()),
109            Err(error) => Err(match error.as_http_error() {
110                Some(err) => match StatusCode::from_u16(err.status().into()) {
111                    Ok(StatusCode::FORBIDDEN) => Box::new(HealthcheckError::InvalidCredentials),
112                    Ok(StatusCode::NOT_FOUND) => Box::new(HealthcheckError::UnknownContainer {
113                        container: container_name,
114                    }),
115                    Ok(status) => Box::new(HealthcheckError::Unknown { status }),
116                    Err(_) => "unknown status code".into(),
117                },
118                _ => error.into(),
119            }),
120        };
121        resp
122    };
123
124    Ok(healthcheck.boxed())
125}
126
127pub fn build_client(
128    connection_string: String,
129    container_name: String,
130) -> crate::Result<Arc<ContainerClient>> {
131    let client = {
132        let connection_string = ConnectionString::new(&connection_string)?;
133        let account_name = connection_string
134            .account_name
135            .ok_or("Account name missing in connection string")?;
136
137        match connection_string.blob_endpoint {
138            // When the blob_endpoint is provided, we use the Custom CloudLocation since it is
139            // required to contain the full URI to the blob storage API endpoint, this means
140            // that account_name is not required to exist in the connection_string since
141            // account_name is only used with the default CloudLocation in the Azure SDK to
142            // generate the storage API endpoint
143            Some(uri) => ClientBuilder::with_location(
144                CloudLocation::Custom {
145                    uri: uri.to_string(),
146                    account: account_name.to_string(),
147                },
148                connection_string.storage_credentials()?,
149            ),
150            // Without a valid blob_endpoint in the connection_string, assume we are in Azure
151            // Commercial (AzureCloud location) and create a default Blob Storage Client that
152            // builds the API endpoint location using the account_name as input
153            None => ClientBuilder::new(account_name, connection_string.storage_credentials()?),
154        }
155        .retry(RetryOptions::none())
156        .container_client(container_name)
157    };
158    Ok(Arc::new(client))
159}