vector/sinks/azure_common/
config.rs1use std::sync::Arc;
2
3use azure_core::error::HttpError;
4use azure_core_for_storage::RetryOptions;
5use azure_storage::{CloudLocation, ConnectionString};
6use azure_storage_blobs::{blob::operations::PutBlockBlobResponse, prelude::*};
7use bytes::Bytes;
8use futures::FutureExt;
9use http::StatusCode;
10use snafu::Snafu;
11use vector_lib::{
12 json_size::JsonSize,
13 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
14 stream::DriverResponse,
15};
16
17use crate::{
18 event::{EventFinalizers, EventStatus, Finalizable},
19 sinks::{Healthcheck, util::retries::RetryLogic},
20};
21
22#[derive(Debug, Clone)]
23pub struct AzureBlobRequest {
24 pub blob_data: Bytes,
25 pub content_encoding: Option<&'static str>,
26 pub content_type: &'static str,
27 pub metadata: AzureBlobMetadata,
28 pub request_metadata: RequestMetadata,
29}
30
31impl Finalizable for AzureBlobRequest {
32 fn take_finalizers(&mut self) -> EventFinalizers {
33 std::mem::take(&mut self.metadata.finalizers)
34 }
35}
36
37impl MetaDescriptive for AzureBlobRequest {
38 fn get_metadata(&self) -> &RequestMetadata {
39 &self.request_metadata
40 }
41
42 fn metadata_mut(&mut self) -> &mut RequestMetadata {
43 &mut self.request_metadata
44 }
45}
46
47#[derive(Clone, Debug)]
48pub struct AzureBlobMetadata {
49 pub partition_key: String,
50 pub count: usize,
51 pub byte_size: JsonSize,
52 pub finalizers: EventFinalizers,
53}
54
55#[derive(Debug, Clone)]
56pub struct AzureBlobRetryLogic;
57
58impl RetryLogic for AzureBlobRetryLogic {
59 type Error = HttpError;
60 type Request = AzureBlobRequest;
61 type Response = AzureBlobResponse;
62
63 fn is_retriable_error(&self, error: &Self::Error) -> bool {
64 error.status().is_server_error()
65 || StatusCode::TOO_MANY_REQUESTS.as_u16() == Into::<u16>::into(error.status())
66 }
67}
68
69#[derive(Debug)]
70pub struct AzureBlobResponse {
71 pub inner: PutBlockBlobResponse,
72 pub events_byte_size: GroupedCountByteSize,
73 pub byte_size: usize,
74}
75
76impl DriverResponse for AzureBlobResponse {
77 fn event_status(&self) -> EventStatus {
78 EventStatus::Delivered
79 }
80
81 fn events_sent(&self) -> &GroupedCountByteSize {
82 &self.events_byte_size
83 }
84
85 fn bytes_sent(&self) -> Option<usize> {
86 Some(self.byte_size)
87 }
88}
89
90#[derive(Debug, Snafu)]
91pub enum HealthcheckError {
92 #[snafu(display("Invalid connection string specified"))]
93 InvalidCredentials,
94 #[snafu(display("Container: {:?} not found", container))]
95 UnknownContainer { container: String },
96 #[snafu(display("Unknown status code: {}", status))]
97 Unknown { status: StatusCode },
98}
99
100pub fn build_healthcheck(
101 container_name: String,
102 client: Arc<ContainerClient>,
103) -> crate::Result<Healthcheck> {
104 let healthcheck = async move {
105 let response = client.get_properties().into_future().await;
106
107 let resp: crate::Result<()> = match response {
108 Ok(_) => Ok(()),
109 Err(error) => Err(match error.as_http_error() {
110 Some(err) => match StatusCode::from_u16(err.status().into()) {
111 Ok(StatusCode::FORBIDDEN) => Box::new(HealthcheckError::InvalidCredentials),
112 Ok(StatusCode::NOT_FOUND) => Box::new(HealthcheckError::UnknownContainer {
113 container: container_name,
114 }),
115 Ok(status) => Box::new(HealthcheckError::Unknown { status }),
116 Err(_) => "unknown status code".into(),
117 },
118 _ => error.into(),
119 }),
120 };
121 resp
122 };
123
124 Ok(healthcheck.boxed())
125}
126
127pub fn build_client(
128 connection_string: String,
129 container_name: String,
130) -> crate::Result<Arc<ContainerClient>> {
131 let client = {
132 let connection_string = ConnectionString::new(&connection_string)?;
133 let account_name = connection_string
134 .account_name
135 .ok_or("Account name missing in connection string")?;
136
137 match connection_string.blob_endpoint {
138 Some(uri) => ClientBuilder::with_location(
144 CloudLocation::Custom {
145 uri: uri.to_string(),
146 account: account_name.to_string(),
147 },
148 connection_string.storage_credentials()?,
149 ),
150 None => ClientBuilder::new(account_name, connection_string.storage_credentials()?),
154 }
155 .retry(RetryOptions::none())
156 .container_client(container_name)
157 };
158 Ok(Arc::new(client))
159}