vector/sinks/azure_common/
config.rs1use std::sync::Arc;
2
3use azure_core::{error::HttpError, RetryOptions};
4use azure_identity::{AutoRefreshingTokenCredential, DefaultAzureCredential};
5use azure_storage::{prelude::*, CloudLocation, ConnectionString};
6use azure_storage_blobs::{blob::operations::PutBlockBlobResponse, prelude::*};
7use bytes::Bytes;
8use futures::FutureExt;
9use http::StatusCode;
10use snafu::Snafu;
11use vector_lib::stream::DriverResponse;
12use vector_lib::{
13 json_size::JsonSize,
14 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
15};
16
17use crate::{
18 event::{EventFinalizers, EventStatus, Finalizable},
19 sinks::{util::retries::RetryLogic, Healthcheck},
20};
21
22#[derive(Debug, Clone)]
23pub struct AzureBlobRequest {
24 pub blob_data: Bytes,
25 pub content_encoding: Option<&'static str>,
26 pub content_type: &'static str,
27 pub metadata: AzureBlobMetadata,
28 pub request_metadata: RequestMetadata,
29}
30
31impl Finalizable for AzureBlobRequest {
32 fn take_finalizers(&mut self) -> EventFinalizers {
33 std::mem::take(&mut self.metadata.finalizers)
34 }
35}
36
37impl MetaDescriptive for AzureBlobRequest {
38 fn get_metadata(&self) -> &RequestMetadata {
39 &self.request_metadata
40 }
41
42 fn metadata_mut(&mut self) -> &mut RequestMetadata {
43 &mut self.request_metadata
44 }
45}
46
47#[derive(Clone, Debug)]
48pub struct AzureBlobMetadata {
49 pub partition_key: String,
50 pub count: usize,
51 pub byte_size: JsonSize,
52 pub finalizers: EventFinalizers,
53}
54
55#[derive(Debug, Clone)]
56pub struct AzureBlobRetryLogic;
57
58impl RetryLogic for AzureBlobRetryLogic {
59 type Error = HttpError;
60 type Request = AzureBlobRequest;
61 type Response = AzureBlobResponse;
62
63 fn is_retriable_error(&self, error: &Self::Error) -> bool {
64 error.status().is_server_error()
65 || StatusCode::TOO_MANY_REQUESTS.as_u16() == Into::<u16>::into(error.status())
66 }
67}
68
69#[derive(Debug)]
70pub struct AzureBlobResponse {
71 pub inner: PutBlockBlobResponse,
72 pub events_byte_size: GroupedCountByteSize,
73 pub byte_size: usize,
74}
75
76impl DriverResponse for AzureBlobResponse {
77 fn event_status(&self) -> EventStatus {
78 EventStatus::Delivered
79 }
80
81 fn events_sent(&self) -> &GroupedCountByteSize {
82 &self.events_byte_size
83 }
84
85 fn bytes_sent(&self) -> Option<usize> {
86 Some(self.byte_size)
87 }
88}
89
90#[derive(Debug, Snafu)]
91pub enum HealthcheckError {
92 #[snafu(display("Invalid connection string specified"))]
93 InvalidCredentials,
94 #[snafu(display("Container: {:?} not found", container))]
95 UnknownContainer { container: String },
96 #[snafu(display("Unknown status code: {}", status))]
97 Unknown { status: StatusCode },
98}
99
100pub fn build_healthcheck(
101 container_name: String,
102 client: Arc<ContainerClient>,
103) -> crate::Result<Healthcheck> {
104 let healthcheck = async move {
105 let response = client.get_properties().into_future().await;
106
107 let resp: crate::Result<()> = match response {
108 Ok(_) => Ok(()),
109 Err(reason) => Err(match reason.downcast_ref::<HttpError>() {
110 Some(err) => match StatusCode::from_u16(err.status().into()) {
111 Ok(StatusCode::FORBIDDEN) => Box::new(HealthcheckError::InvalidCredentials),
112 Ok(StatusCode::NOT_FOUND) => Box::new(HealthcheckError::UnknownContainer {
113 container: container_name,
114 }),
115 Ok(status) => Box::new(HealthcheckError::Unknown { status }),
116 Err(_) => "unknown status code".into(),
117 },
118 _ => reason.into(),
119 }),
120 };
121 resp
122 };
123
124 Ok(healthcheck.boxed())
125}
126
127pub fn build_client(
128 connection_string: Option<String>,
129 storage_account: Option<String>,
130 container_name: String,
131 endpoint: Option<String>,
132) -> crate::Result<Arc<ContainerClient>> {
133 let client;
134 match (connection_string, storage_account) {
135 (Some(connection_string_p), None) => {
136 let connection_string = ConnectionString::new(&connection_string_p)?;
137
138 client = match connection_string.blob_endpoint {
139 Some(uri) => ClientBuilder::with_location(
145 CloudLocation::Custom {
146 uri: uri.to_string(),
147 },
148 connection_string.storage_credentials()?,
149 ),
150 None => ClientBuilder::new(
154 connection_string
155 .account_name
156 .ok_or("Account name missing in connection string")?,
157 connection_string.storage_credentials()?,
158 ),
159 }
160 .retry(RetryOptions::none())
161 .container_client(container_name);
162 }
163 (None, Some(storage_account_p)) => {
164 let creds = std::sync::Arc::new(DefaultAzureCredential::default());
165 let auto_creds = std::sync::Arc::new(AutoRefreshingTokenCredential::new(creds));
166 let storage_credentials = StorageCredentials::token_credential(auto_creds);
167
168 client = match endpoint {
169 Some(endpoint) => ClientBuilder::with_location(
172 CloudLocation::Custom { uri: endpoint },
173 storage_credentials,
174 ),
175 None => ClientBuilder::new(storage_account_p, storage_credentials),
179 }
180 .retry(RetryOptions::none())
181 .container_client(container_name);
182 }
183 (None, None) => {
184 return Err("Either `connection_string` or `storage_account` has to be provided".into())
185 }
186 (Some(_), Some(_)) => {
187 return Err(
188 "`connection_string` and `storage_account` can't be provided at the same time"
189 .into(),
190 )
191 }
192 }
193 Ok(std::sync::Arc::new(client))
194}