vector/sinks/azure_common/
config.rs

1use std::sync::Arc;
2
3use azure_core::error::Error as AzureCoreError;
4
5use crate::sinks::azure_common::connection_string::{Auth, ParsedConnectionString};
6use crate::sinks::azure_common::shared_key_policy::SharedKeyAuthorizationPolicy;
7use azure_core::http::Url;
8use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions};
9
10use azure_core::http::StatusCode;
11use bytes::Bytes;
12use futures::FutureExt;
13use snafu::Snafu;
14use vector_lib::{
15    json_size::JsonSize,
16    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
17    stream::DriverResponse,
18};
19
20use crate::{
21    event::{EventFinalizers, EventStatus, Finalizable},
22    sinks::{Healthcheck, util::retries::RetryLogic},
23};
24
25#[derive(Debug, Clone)]
26pub struct AzureBlobRequest {
27    pub blob_data: Bytes,
28    pub content_encoding: Option<&'static str>,
29    pub content_type: &'static str,
30    pub metadata: AzureBlobMetadata,
31    pub request_metadata: RequestMetadata,
32}
33
34impl Finalizable for AzureBlobRequest {
35    fn take_finalizers(&mut self) -> EventFinalizers {
36        std::mem::take(&mut self.metadata.finalizers)
37    }
38}
39
40impl MetaDescriptive for AzureBlobRequest {
41    fn get_metadata(&self) -> &RequestMetadata {
42        &self.request_metadata
43    }
44
45    fn metadata_mut(&mut self) -> &mut RequestMetadata {
46        &mut self.request_metadata
47    }
48}
49
50#[derive(Clone, Debug)]
51pub struct AzureBlobMetadata {
52    pub partition_key: String,
53    pub count: usize,
54    pub byte_size: JsonSize,
55    pub finalizers: EventFinalizers,
56}
57
58#[derive(Debug, Clone)]
59pub struct AzureBlobRetryLogic;
60
61impl RetryLogic for AzureBlobRetryLogic {
62    type Error = AzureCoreError;
63    type Request = AzureBlobRequest;
64    type Response = AzureBlobResponse;
65
66    fn is_retriable_error(&self, error: &Self::Error) -> bool {
67        match error.http_status() {
68            Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests,
69            None => false,
70        }
71    }
72}
73
74#[derive(Debug)]
75pub struct AzureBlobResponse {
76    pub events_byte_size: GroupedCountByteSize,
77    pub byte_size: usize,
78}
79
80impl DriverResponse for AzureBlobResponse {
81    fn event_status(&self) -> EventStatus {
82        EventStatus::Delivered
83    }
84
85    fn events_sent(&self) -> &GroupedCountByteSize {
86        &self.events_byte_size
87    }
88
89    fn bytes_sent(&self) -> Option<usize> {
90        Some(self.byte_size)
91    }
92}
93
94#[derive(Debug, Snafu)]
95pub enum HealthcheckError {
96    #[snafu(display("Invalid connection string specified"))]
97    InvalidCredentials,
98    #[snafu(display("Container: {:?} not found", container))]
99    UnknownContainer { container: String },
100    #[snafu(display("Unknown status code: {}", status))]
101    Unknown { status: StatusCode },
102}
103
104pub fn build_healthcheck(
105    container_name: String,
106    client: Arc<BlobContainerClient>,
107) -> crate::Result<Healthcheck> {
108    let healthcheck = async move {
109        let resp: crate::Result<()> = match client.get_properties(None).await {
110            Ok(_) => Ok(()),
111            Err(error) => {
112                let code = error.http_status();
113                Err(match code {
114                    Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials),
115                    Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer {
116                        container: container_name,
117                    }),
118                    Some(status) => Box::new(HealthcheckError::Unknown { status }),
119                    None => "unknown status code".into(),
120                })
121            }
122        };
123        resp
124    };
125
126    Ok(healthcheck.boxed())
127}
128
129pub fn build_client(
130    connection_string: String,
131    container_name: String,
132    proxy: &crate::config::ProxyConfig,
133) -> crate::Result<Arc<BlobContainerClient>> {
134    // Parse connection string without legacy SDK
135    let parsed = ParsedConnectionString::parse(&connection_string)
136        .map_err(|e| format!("Invalid connection string: {e}"))?;
137    // Compose container URL (SAS appended if present)
138    let container_url = parsed
139        .container_url(&container_name)
140        .map_err(|e| format!("Failed to build container URL: {e}"))?;
141    let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?;
142
143    // Prepare options; attach Shared Key policy if needed
144    let mut options = BlobContainerClientOptions::default();
145    match parsed.auth() {
146        Auth::Sas { .. } | Auth::None => {
147            // No extra policy; SAS is in the URL already (or anonymous)
148        }
149        Auth::SharedKey {
150            account_name,
151            account_key,
152        } => {
153            let policy = SharedKeyAuthorizationPolicy::new(
154                account_name,
155                account_key,
156                // Use an Azurite-supported storage service version
157                String::from("2025-11-05"),
158            )
159            .map_err(|e| format!("Failed to create SharedKey policy: {e}"))?;
160            options
161                .client_options
162                .per_call_policies
163                .push(Arc::new(policy));
164        }
165    }
166
167    // Use reqwest v0.12 since Azure SDK only implements HttpClient for reqwest::Client v0.12
168    let mut reqwest_builder = reqwest_12::ClientBuilder::new();
169    let bypass_proxy = {
170        let host = url.host_str().unwrap_or("");
171        let port = url.port();
172        proxy.no_proxy.matches(host)
173            || port
174                .map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p)))
175                .unwrap_or(false)
176    };
177    if bypass_proxy || !proxy.enabled {
178        // Ensure no proxy (and disable any potential system proxy auto-detection)
179        reqwest_builder = reqwest_builder.no_proxy();
180    } else {
181        if let Some(http) = &proxy.http {
182            let p = reqwest_12::Proxy::http(http)
183                .map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?;
184            // If credentials are embedded in the proxy URL, reqwest will handle them.
185            reqwest_builder = reqwest_builder.proxy(p);
186        }
187        if let Some(https) = &proxy.https {
188            let p = reqwest_12::Proxy::https(https)
189                .map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?;
190            // If credentials are embedded in the proxy URL, reqwest will handle them.
191            reqwest_builder = reqwest_builder.proxy(p);
192        }
193    }
194    options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new(
195        reqwest_builder
196            .build()
197            .map_err(|e| format!("Failed to build reqwest client: {e}"))?,
198    )));
199    let client =
200        BlobContainerClient::from_url(url, None, Some(options)).map_err(|e| format!("{e}"))?;
201    Ok(Arc::new(client))
202}