vector/sinks/azure_common/
config.rs1use std::sync::Arc;
2
3use azure_core::error::Error as AzureCoreError;
4
5use crate::sinks::azure_common::connection_string::{Auth, ParsedConnectionString};
6use crate::sinks::azure_common::shared_key_policy::SharedKeyAuthorizationPolicy;
7use azure_core::http::Url;
8use azure_storage_blob::{BlobContainerClient, BlobContainerClientOptions};
9
10use azure_core::http::StatusCode;
11use bytes::Bytes;
12use futures::FutureExt;
13use snafu::Snafu;
14use vector_lib::{
15 json_size::JsonSize,
16 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
17 stream::DriverResponse,
18};
19
20use crate::{
21 event::{EventFinalizers, EventStatus, Finalizable},
22 sinks::{Healthcheck, util::retries::RetryLogic},
23};
24
25#[derive(Debug, Clone)]
26pub struct AzureBlobRequest {
27 pub blob_data: Bytes,
28 pub content_encoding: Option<&'static str>,
29 pub content_type: &'static str,
30 pub metadata: AzureBlobMetadata,
31 pub request_metadata: RequestMetadata,
32}
33
34impl Finalizable for AzureBlobRequest {
35 fn take_finalizers(&mut self) -> EventFinalizers {
36 std::mem::take(&mut self.metadata.finalizers)
37 }
38}
39
40impl MetaDescriptive for AzureBlobRequest {
41 fn get_metadata(&self) -> &RequestMetadata {
42 &self.request_metadata
43 }
44
45 fn metadata_mut(&mut self) -> &mut RequestMetadata {
46 &mut self.request_metadata
47 }
48}
49
50#[derive(Clone, Debug)]
51pub struct AzureBlobMetadata {
52 pub partition_key: String,
53 pub count: usize,
54 pub byte_size: JsonSize,
55 pub finalizers: EventFinalizers,
56}
57
58#[derive(Debug, Clone)]
59pub struct AzureBlobRetryLogic;
60
61impl RetryLogic for AzureBlobRetryLogic {
62 type Error = AzureCoreError;
63 type Request = AzureBlobRequest;
64 type Response = AzureBlobResponse;
65
66 fn is_retriable_error(&self, error: &Self::Error) -> bool {
67 match error.http_status() {
68 Some(code) => code.is_server_error() || code == StatusCode::TooManyRequests,
69 None => false,
70 }
71 }
72}
73
74#[derive(Debug)]
75pub struct AzureBlobResponse {
76 pub events_byte_size: GroupedCountByteSize,
77 pub byte_size: usize,
78}
79
80impl DriverResponse for AzureBlobResponse {
81 fn event_status(&self) -> EventStatus {
82 EventStatus::Delivered
83 }
84
85 fn events_sent(&self) -> &GroupedCountByteSize {
86 &self.events_byte_size
87 }
88
89 fn bytes_sent(&self) -> Option<usize> {
90 Some(self.byte_size)
91 }
92}
93
94#[derive(Debug, Snafu)]
95pub enum HealthcheckError {
96 #[snafu(display("Invalid connection string specified"))]
97 InvalidCredentials,
98 #[snafu(display("Container: {:?} not found", container))]
99 UnknownContainer { container: String },
100 #[snafu(display("Unknown status code: {}", status))]
101 Unknown { status: StatusCode },
102}
103
104pub fn build_healthcheck(
105 container_name: String,
106 client: Arc<BlobContainerClient>,
107) -> crate::Result<Healthcheck> {
108 let healthcheck = async move {
109 let resp: crate::Result<()> = match client.get_properties(None).await {
110 Ok(_) => Ok(()),
111 Err(error) => {
112 let code = error.http_status();
113 Err(match code {
114 Some(StatusCode::Forbidden) => Box::new(HealthcheckError::InvalidCredentials),
115 Some(StatusCode::NotFound) => Box::new(HealthcheckError::UnknownContainer {
116 container: container_name,
117 }),
118 Some(status) => Box::new(HealthcheckError::Unknown { status }),
119 None => "unknown status code".into(),
120 })
121 }
122 };
123 resp
124 };
125
126 Ok(healthcheck.boxed())
127}
128
129pub fn build_client(
130 connection_string: String,
131 container_name: String,
132 proxy: &crate::config::ProxyConfig,
133) -> crate::Result<Arc<BlobContainerClient>> {
134 let parsed = ParsedConnectionString::parse(&connection_string)
136 .map_err(|e| format!("Invalid connection string: {e}"))?;
137 let container_url = parsed
139 .container_url(&container_name)
140 .map_err(|e| format!("Failed to build container URL: {e}"))?;
141 let url = Url::parse(&container_url).map_err(|e| format!("Invalid container URL: {e}"))?;
142
143 let mut options = BlobContainerClientOptions::default();
145 match parsed.auth() {
146 Auth::Sas { .. } | Auth::None => {
147 }
149 Auth::SharedKey {
150 account_name,
151 account_key,
152 } => {
153 let policy = SharedKeyAuthorizationPolicy::new(
154 account_name,
155 account_key,
156 String::from("2025-11-05"),
158 )
159 .map_err(|e| format!("Failed to create SharedKey policy: {e}"))?;
160 options
161 .client_options
162 .per_call_policies
163 .push(Arc::new(policy));
164 }
165 }
166
167 let mut reqwest_builder = reqwest_12::ClientBuilder::new();
169 let bypass_proxy = {
170 let host = url.host_str().unwrap_or("");
171 let port = url.port();
172 proxy.no_proxy.matches(host)
173 || port
174 .map(|p| proxy.no_proxy.matches(&format!("{}:{}", host, p)))
175 .unwrap_or(false)
176 };
177 if bypass_proxy || !proxy.enabled {
178 reqwest_builder = reqwest_builder.no_proxy();
180 } else {
181 if let Some(http) = &proxy.http {
182 let p = reqwest_12::Proxy::http(http)
183 .map_err(|e| format!("Invalid HTTP proxy URL: {e}"))?;
184 reqwest_builder = reqwest_builder.proxy(p);
186 }
187 if let Some(https) = &proxy.https {
188 let p = reqwest_12::Proxy::https(https)
189 .map_err(|e| format!("Invalid HTTPS proxy URL: {e}"))?;
190 reqwest_builder = reqwest_builder.proxy(p);
192 }
193 }
194 options.client_options.transport = Some(azure_core::http::Transport::new(std::sync::Arc::new(
195 reqwest_builder
196 .build()
197 .map_err(|e| format!("Failed to build reqwest client: {e}"))?,
198 )));
199 let client =
200 BlobContainerClient::from_url(url, None, Some(options)).map_err(|e| format!("{e}"))?;
201 Ok(Arc::new(client))
202}