vector/sinks/azure_common/
config.rs

1use std::sync::Arc;
2
3use azure_core::{error::HttpError, RetryOptions};
4use azure_identity::{AutoRefreshingTokenCredential, DefaultAzureCredential};
5use azure_storage::{prelude::*, CloudLocation, ConnectionString};
6use azure_storage_blobs::{blob::operations::PutBlockBlobResponse, prelude::*};
7use bytes::Bytes;
8use futures::FutureExt;
9use http::StatusCode;
10use snafu::Snafu;
11use vector_lib::stream::DriverResponse;
12use vector_lib::{
13    json_size::JsonSize,
14    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
15};
16
17use crate::{
18    event::{EventFinalizers, EventStatus, Finalizable},
19    sinks::{util::retries::RetryLogic, Healthcheck},
20};
21
22#[derive(Debug, Clone)]
23pub struct AzureBlobRequest {
24    pub blob_data: Bytes,
25    pub content_encoding: Option<&'static str>,
26    pub content_type: &'static str,
27    pub metadata: AzureBlobMetadata,
28    pub request_metadata: RequestMetadata,
29}
30
31impl Finalizable for AzureBlobRequest {
32    fn take_finalizers(&mut self) -> EventFinalizers {
33        std::mem::take(&mut self.metadata.finalizers)
34    }
35}
36
37impl MetaDescriptive for AzureBlobRequest {
38    fn get_metadata(&self) -> &RequestMetadata {
39        &self.request_metadata
40    }
41
42    fn metadata_mut(&mut self) -> &mut RequestMetadata {
43        &mut self.request_metadata
44    }
45}
46
47#[derive(Clone, Debug)]
48pub struct AzureBlobMetadata {
49    pub partition_key: String,
50    pub count: usize,
51    pub byte_size: JsonSize,
52    pub finalizers: EventFinalizers,
53}
54
55#[derive(Debug, Clone)]
56pub struct AzureBlobRetryLogic;
57
58impl RetryLogic for AzureBlobRetryLogic {
59    type Error = HttpError;
60    type Request = AzureBlobRequest;
61    type Response = AzureBlobResponse;
62
63    fn is_retriable_error(&self, error: &Self::Error) -> bool {
64        error.status().is_server_error()
65            || StatusCode::TOO_MANY_REQUESTS.as_u16() == Into::<u16>::into(error.status())
66    }
67}
68
69#[derive(Debug)]
70pub struct AzureBlobResponse {
71    pub inner: PutBlockBlobResponse,
72    pub events_byte_size: GroupedCountByteSize,
73    pub byte_size: usize,
74}
75
76impl DriverResponse for AzureBlobResponse {
77    fn event_status(&self) -> EventStatus {
78        EventStatus::Delivered
79    }
80
81    fn events_sent(&self) -> &GroupedCountByteSize {
82        &self.events_byte_size
83    }
84
85    fn bytes_sent(&self) -> Option<usize> {
86        Some(self.byte_size)
87    }
88}
89
90#[derive(Debug, Snafu)]
91pub enum HealthcheckError {
92    #[snafu(display("Invalid connection string specified"))]
93    InvalidCredentials,
94    #[snafu(display("Container: {:?} not found", container))]
95    UnknownContainer { container: String },
96    #[snafu(display("Unknown status code: {}", status))]
97    Unknown { status: StatusCode },
98}
99
100pub fn build_healthcheck(
101    container_name: String,
102    client: Arc<ContainerClient>,
103) -> crate::Result<Healthcheck> {
104    let healthcheck = async move {
105        let response = client.get_properties().into_future().await;
106
107        let resp: crate::Result<()> = match response {
108            Ok(_) => Ok(()),
109            Err(reason) => Err(match reason.downcast_ref::<HttpError>() {
110                Some(err) => match StatusCode::from_u16(err.status().into()) {
111                    Ok(StatusCode::FORBIDDEN) => Box::new(HealthcheckError::InvalidCredentials),
112                    Ok(StatusCode::NOT_FOUND) => Box::new(HealthcheckError::UnknownContainer {
113                        container: container_name,
114                    }),
115                    Ok(status) => Box::new(HealthcheckError::Unknown { status }),
116                    Err(_) => "unknown status code".into(),
117                },
118                _ => reason.into(),
119            }),
120        };
121        resp
122    };
123
124    Ok(healthcheck.boxed())
125}
126
127pub fn build_client(
128    connection_string: Option<String>,
129    storage_account: Option<String>,
130    container_name: String,
131    endpoint: Option<String>,
132) -> crate::Result<Arc<ContainerClient>> {
133    let client;
134    match (connection_string, storage_account) {
135        (Some(connection_string_p), None) => {
136            let connection_string = ConnectionString::new(&connection_string_p)?;
137
138            client = match connection_string.blob_endpoint {
139                // When the blob_endpoint is provided, we use the Custom CloudLocation since it is
140                // required to contain the full URI to the blob storage API endpoint, this means
141                // that account_name is not required to exist in the connection_string since
142                // account_name is only used with the default CloudLocation in the Azure SDK to
143                // generate the storage API endpoint
144                Some(uri) => ClientBuilder::with_location(
145                    CloudLocation::Custom {
146                        uri: uri.to_string(),
147                    },
148                    connection_string.storage_credentials()?,
149                ),
150                // Without a valid blob_endpoint in the connection_string, assume we are in Azure
151                // Commercial (AzureCloud location) and create a default Blob Storage Client that
152                // builds the API endpoint location using the account_name as input
153                None => ClientBuilder::new(
154                    connection_string
155                        .account_name
156                        .ok_or("Account name missing in connection string")?,
157                    connection_string.storage_credentials()?,
158                ),
159            }
160            .retry(RetryOptions::none())
161            .container_client(container_name);
162        }
163        (None, Some(storage_account_p)) => {
164            let creds = std::sync::Arc::new(DefaultAzureCredential::default());
165            let auto_creds = std::sync::Arc::new(AutoRefreshingTokenCredential::new(creds));
166            let storage_credentials = StorageCredentials::token_credential(auto_creds);
167
168            client = match endpoint {
169                // If a blob_endpoint is provided in the configuration, use it with a Custom
170                // CloudLocation, to allow overriding the blob storage API endpoint
171                Some(endpoint) => ClientBuilder::with_location(
172                    CloudLocation::Custom { uri: endpoint },
173                    storage_credentials,
174                ),
175                // Use the storage_account configuration parameter and assume we are in Azure
176                // Commercial (AzureCloud location) and build the blob storage API endpoint using
177                // the storage_account as input.
178                None => ClientBuilder::new(storage_account_p, storage_credentials),
179            }
180            .retry(RetryOptions::none())
181            .container_client(container_name);
182        }
183        (None, None) => {
184            return Err("Either `connection_string` or `storage_account` has to be provided".into())
185        }
186        (Some(_), Some(_)) => {
187            return Err(
188                "`connection_string` and `storage_account` can't be provided at the same time"
189                    .into(),
190            )
191        }
192    }
193    Ok(std::sync::Arc::new(client))
194}