1use std::sync::Arc;
2
3use azure_storage_blob::BlobContainerClient;
4use tower::ServiceBuilder;
5use vector_lib::{
6 codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
7 configurable::configurable_component,
8 sensitive_string::SensitiveString,
9};
10
11use super::request_builder::AzureBlobRequestOptions;
12use crate::{
13 Result,
14 codecs::{Encoder, EncodingConfigWithFraming, SinkType},
15 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
16 sinks::{
17 Healthcheck, VectorSink,
18 azure_common::{
19 self, config::AzureAuthentication, config::AzureBlobRetryLogic,
20 config::AzureBlobTlsConfig, service::AzureBlobService, sink::AzureBlobSink,
21 },
22 util::{
23 BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
24 TowerRequestConfig, partitioner::KeyPartitioner, service::TowerRequestConfigDefaults,
25 },
26 },
27 template::Template,
28};
29
30#[derive(Clone, Copy, Debug)]
31pub struct AzureBlobTowerRequestConfigDefaults;
32
33impl TowerRequestConfigDefaults for AzureBlobTowerRequestConfigDefaults {
34 const RATE_LIMIT_NUM: u64 = 250;
35}
36
37#[configurable_component(sink(
39 "azure_blob",
40 "Store your observability data in Azure Blob Storage."
41))]
42#[derive(Clone, Debug)]
43#[serde(deny_unknown_fields)]
44pub struct AzureBlobSinkConfig {
45 #[configurable(derived)]
46 #[serde(default)]
47 pub auth: Option<AzureAuthentication>,
48
49 #[configurable(metadata(
64 docs::warnings = "Access keys and SAS tokens can be used to gain unauthorized access to Azure Blob Storage \
65 resources. Numerous security breaches have occurred due to leaked connection strings. It is important to keep \
66 connection strings secure and not expose them in logs, error messages, or version control systems."
67 ))]
68 #[configurable(metadata(
69 docs::examples = "DefaultEndpointsProtocol=https;AccountName=mylogstorage;AccountKey=storageaccountkeybase64encoded;EndpointSuffix=core.windows.net"
70 ))]
71 #[configurable(metadata(
72 docs::examples = "BlobEndpoint=https://mylogstorage.blob.core.windows.net/;SharedAccessSignature=generatedsastoken"
73 ))]
74 #[configurable(metadata(docs::examples = "AccountName=mylogstorage"))]
75 pub connection_string: Option<SensitiveString>,
76
77 #[configurable(metadata(docs::examples = "mylogstorage"))]
82 pub(super) account_name: Option<String>,
83
84 #[configurable(metadata(docs::examples = "https://mylogstorage.blob.core.windows.net/"))]
89 pub(super) blob_endpoint: Option<String>,
90
91 #[configurable(metadata(docs::examples = "my-logs"))]
93 pub(super) container_name: String,
94
95 #[configurable(metadata(docs::examples = "date/%F/hour/%H/"))]
101 #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d/"))]
102 #[configurable(metadata(
103 docs::examples = "kubernetes/{{ metadata.cluster }}/{{ metadata.application_name }}/"
104 ))]
105 #[serde(default = "default_blob_prefix")]
106 pub blob_prefix: Template,
107
108 #[configurable(metadata(docs::syntax_override = "strftime"))]
125 pub blob_time_format: Option<String>,
126
127 pub blob_append_uuid: Option<bool>,
137
138 #[serde(flatten)]
139 pub encoding: EncodingConfigWithFraming,
140
141 #[configurable(derived)]
148 #[serde(default = "Compression::gzip_default")]
149 pub compression: Compression,
150
151 #[configurable(derived)]
152 #[serde(default)]
153 pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
154
155 #[configurable(derived)]
156 #[serde(default)]
157 pub request: TowerRequestConfig<AzureBlobTowerRequestConfigDefaults>,
158
159 #[configurable(derived)]
160 #[serde(
161 default,
162 deserialize_with = "crate::serde::bool_or_struct",
163 skip_serializing_if = "crate::serde::is_default"
164 )]
165 pub(super) acknowledgements: AcknowledgementsConfig,
166
167 #[configurable(derived)]
168 #[serde(default)]
169 pub tls: Option<AzureBlobTlsConfig>,
170}
171
172pub fn default_blob_prefix() -> Template {
173 Template::try_from(DEFAULT_KEY_PREFIX).unwrap()
174}
175
176impl GenerateConfig for AzureBlobSinkConfig {
177 fn generate_config() -> toml::Value {
178 toml::Value::try_from(Self {
179 auth: None,
180 connection_string: Some(String::from("DefaultEndpointsProtocol=https;AccountName=some-account-name;AccountKey=some-account-key;").into()),
181 account_name: None,
182 blob_endpoint: None,
183 container_name: String::from("logs"),
184 blob_prefix: default_blob_prefix(),
185 blob_time_format: Some(String::from("%s")),
186 blob_append_uuid: Some(true),
187 encoding: (Some(NewlineDelimitedEncoderConfig::new()), JsonSerializerConfig::default()).into(),
188 compression: Compression::gzip_default(),
189 batch: BatchConfig::default(),
190 request: TowerRequestConfig::default(),
191 acknowledgements: Default::default(),
192 tls: None,
193 })
194 .unwrap()
195 }
196}
197
198#[async_trait::async_trait]
199#[typetag::serde(name = "azure_blob")]
200impl SinkConfig for AzureBlobSinkConfig {
201 async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
202 let connection_string: String = match (
203 &self.connection_string,
204 &self.account_name,
205 &self.blob_endpoint,
206 ) {
207 (Some(connstr), None, None) => connstr.inner().into(),
208 (None, Some(account_name), None) => {
209 if self.auth.is_none() {
210 return Err(
211 "`auth` configuration must be provided when using `account_name`".into(),
212 );
213 }
214 format!("AccountName={}", account_name)
215 }
216 (None, None, Some(blob_endpoint)) => {
217 if self.auth.is_none() {
218 return Err(
219 "`auth` configuration must be provided when using `blob_endpoint`".into(),
220 );
221 }
222 let blob_endpoint = if blob_endpoint.ends_with('/') {
224 blob_endpoint.clone()
225 } else {
226 format!("{}/", blob_endpoint)
227 };
228 format!("BlobEndpoint={}", blob_endpoint)
229 }
230 (None, None, None) => {
231 return Err("One of `connection_string`, `account_name`, or `blob_endpoint` must be provided".into());
232 }
233 (Some(_), Some(_), _) => {
234 return Err("Cannot provide both `connection_string` and `account_name`".into());
235 }
236 (Some(_), _, Some(_)) => {
237 return Err("Cannot provide both `connection_string` and `blob_endpoint`".into());
238 }
239 (_, Some(_), Some(_)) => {
240 return Err("Cannot provide both `account_name` and `blob_endpoint`".into());
241 }
242 };
243
244 let client = azure_common::config::build_client(
245 self.auth.clone(),
246 connection_string.clone(),
247 self.container_name.clone(),
248 cx.proxy(),
249 self.tls.clone(),
250 )
251 .await?;
252
253 let healthcheck = azure_common::config::build_healthcheck(
254 self.container_name.clone(),
255 Arc::clone(&client),
256 )?;
257 let sink = self.build_processor(client)?;
258 Ok((sink, healthcheck))
259 }
260
261 fn input(&self) -> Input {
262 Input::new(self.encoding.config().1.input_type() & DataType::Log)
263 }
264
265 fn acknowledgements(&self) -> &AcknowledgementsConfig {
266 &self.acknowledgements
267 }
268}
269
270const DEFAULT_KEY_PREFIX: &str = "blob/%F/";
271const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s";
272const DEFAULT_FILENAME_APPEND_UUID: bool = true;
273
274impl AzureBlobSinkConfig {
275 pub fn build_processor(&self, client: Arc<BlobContainerClient>) -> crate::Result<VectorSink> {
276 let request_limits = self.request.into_settings();
277 let service = ServiceBuilder::new()
278 .settings(request_limits, AzureBlobRetryLogic)
279 .service(AzureBlobService::new(client));
280
281 let batcher_settings = self.batch.into_batcher_settings()?;
283
284 let blob_time_format = self
285 .blob_time_format
286 .as_ref()
287 .cloned()
288 .unwrap_or_else(|| DEFAULT_FILENAME_TIME_FORMAT.into());
289 let blob_append_uuid = self
290 .blob_append_uuid
291 .unwrap_or(DEFAULT_FILENAME_APPEND_UUID);
292
293 let transformer = self.encoding.transformer();
294 let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
295 let encoder = Encoder::<Framer>::new(framer, serializer);
296
297 let request_options = AzureBlobRequestOptions {
298 container_name: self.container_name.clone(),
299 blob_time_format,
300 blob_append_uuid,
301 encoder: (transformer, encoder),
302 compression: self.compression,
303 };
304
305 let sink = AzureBlobSink::new(
306 service,
307 request_options,
308 self.key_partitioner()?,
309 batcher_settings,
310 );
311
312 Ok(VectorSink::from_event_streamsink(sink))
313 }
314
315 pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
316 Ok(KeyPartitioner::new(self.blob_prefix.clone(), None))
317 }
318}