vector/sinks/azure_blob/
config.rs

1use std::sync::Arc;
2
3use azure_storage_blob::BlobContainerClient;
4use tower::ServiceBuilder;
5use vector_lib::{
6    codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
7    configurable::configurable_component,
8    sensitive_string::SensitiveString,
9};
10
11use super::request_builder::AzureBlobRequestOptions;
12use crate::{
13    Result,
14    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
15    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
16    sinks::{
17        Healthcheck, VectorSink,
18        azure_common::{
19            self, config::AzureAuthentication, config::AzureBlobRetryLogic,
20            config::AzureBlobTlsConfig, service::AzureBlobService, sink::AzureBlobSink,
21        },
22        util::{
23            BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
24            TowerRequestConfig, partitioner::KeyPartitioner, service::TowerRequestConfigDefaults,
25        },
26    },
27    template::Template,
28};
29
30#[derive(Clone, Copy, Debug)]
31pub struct AzureBlobTowerRequestConfigDefaults;
32
33impl TowerRequestConfigDefaults for AzureBlobTowerRequestConfigDefaults {
34    const RATE_LIMIT_NUM: u64 = 250;
35}
36
37/// Configuration for the `azure_blob` sink.
38#[configurable_component(sink(
39    "azure_blob",
40    "Store your observability data in Azure Blob Storage."
41))]
42#[derive(Clone, Debug)]
43#[serde(deny_unknown_fields)]
44pub struct AzureBlobSinkConfig {
45    #[configurable(derived)]
46    #[serde(default)]
47    pub auth: Option<AzureAuthentication>,
48
49    /// The Azure Blob Storage Account connection string.
50    ///
51    /// Authentication with an access key or shared access signature (SAS)
52    /// are supported authentication methods. If using a non-account SAS,
53    /// healthchecks will fail and will need to be disabled by setting
54    /// `healthcheck.enabled` to `false` for this sink
55    ///
56    /// When generating an account SAS, the following are the minimum required option
57    /// settings for Vector to access blob storage and pass a health check.
58    /// | Option                 | Value              |
59    /// | ---------------------- | ------------------ |
60    /// | Allowed services       | Blob               |
61    /// | Allowed resource types | Container & Object |
62    /// | Allowed permissions    | Read & Create      |
63    #[configurable(metadata(
64        docs::warnings = "Access keys and SAS tokens can be used to gain unauthorized access to Azure Blob Storage \
65        resources. Numerous security breaches have occurred due to leaked connection strings. It is important to keep \
66        connection strings secure and not expose them in logs, error messages, or version control systems."
67    ))]
68    #[configurable(metadata(
69        docs::examples = "DefaultEndpointsProtocol=https;AccountName=mylogstorage;AccountKey=storageaccountkeybase64encoded;EndpointSuffix=core.windows.net"
70    ))]
71    #[configurable(metadata(
72        docs::examples = "BlobEndpoint=https://mylogstorage.blob.core.windows.net/;SharedAccessSignature=generatedsastoken"
73    ))]
74    #[configurable(metadata(docs::examples = "AccountName=mylogstorage"))]
75    pub connection_string: Option<SensitiveString>,
76
77    /// The Azure Blob Storage Account name.
78    ///
79    /// If provided, this will be used instead of the `connection_string`.
80    /// This is useful for authenticating with an Azure credential.
81    #[configurable(metadata(docs::examples = "mylogstorage"))]
82    pub(super) account_name: Option<String>,
83
84    /// The Azure Blob Storage endpoint.
85    ///
86    /// If provided, this will be used instead of the `connection_string`.
87    /// This is useful for authenticating with an Azure credential.
88    #[configurable(metadata(docs::examples = "https://mylogstorage.blob.core.windows.net/"))]
89    pub(super) blob_endpoint: Option<String>,
90
91    /// The Azure Blob Storage Account container name.
92    #[configurable(metadata(docs::examples = "my-logs"))]
93    pub(super) container_name: String,
94
95    /// A prefix to apply to all blob keys.
96    ///
97    /// Prefixes are useful for partitioning objects, such as by creating a blob key that
98    /// stores blobs under a particular directory. If using a prefix for this purpose, it must end
99    /// in `/` to act as a directory path. A trailing `/` is **not** automatically added.
100    #[configurable(metadata(docs::examples = "date/%F/hour/%H/"))]
101    #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d/"))]
102    #[configurable(metadata(
103        docs::examples = "kubernetes/{{ metadata.cluster }}/{{ metadata.application_name }}/"
104    ))]
105    #[serde(default = "default_blob_prefix")]
106    pub blob_prefix: Template,
107
108    /// The timestamp format for the time component of the blob key.
109    ///
110    /// By default, blob keys are appended with a timestamp that reflects when the blob are sent to
111    /// Azure Blob Storage, such that the resulting blob key is functionally equivalent to joining
112    /// the blob prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
113    ///
114    /// This would represent a `blob_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
115    /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
116    /// timestamps in seconds since the Unix epoch.
117    ///
118    /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
119    /// languages.
120    ///
121    /// When set to an empty string, no timestamp is appended to the blob prefix.
122    ///
123    /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
124    #[configurable(metadata(docs::syntax_override = "strftime"))]
125    pub blob_time_format: Option<String>,
126
127    /// Whether or not to append a UUID v4 token to the end of the blob key.
128    ///
129    /// The UUID is appended to the timestamp portion of the object key, such that if the blob key
130    /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
131    /// in an blob key that looks like
132    /// `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
133    ///
134    /// This ensures there are no name collisions, and can be useful in high-volume workloads where
135    /// blob keys must be unique.
136    pub blob_append_uuid: Option<bool>,
137
138    #[serde(flatten)]
139    pub encoding: EncodingConfigWithFraming,
140
141    /// Compression configuration.
142    ///
143    /// All compression algorithms use the default compression level unless otherwise specified.
144    ///
145    /// Some cloud storage API clients and browsers handle decompression transparently, so
146    /// depending on how they are accessed, files may not always appear to be compressed.
147    #[configurable(derived)]
148    #[serde(default = "Compression::gzip_default")]
149    pub compression: Compression,
150
151    #[configurable(derived)]
152    #[serde(default)]
153    pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
154
155    #[configurable(derived)]
156    #[serde(default)]
157    pub request: TowerRequestConfig<AzureBlobTowerRequestConfigDefaults>,
158
159    #[configurable(derived)]
160    #[serde(
161        default,
162        deserialize_with = "crate::serde::bool_or_struct",
163        skip_serializing_if = "crate::serde::is_default"
164    )]
165    pub(super) acknowledgements: AcknowledgementsConfig,
166
167    #[configurable(derived)]
168    #[serde(default)]
169    pub tls: Option<AzureBlobTlsConfig>,
170}
171
172pub fn default_blob_prefix() -> Template {
173    Template::try_from(DEFAULT_KEY_PREFIX).unwrap()
174}
175
176impl GenerateConfig for AzureBlobSinkConfig {
177    fn generate_config() -> toml::Value {
178        toml::Value::try_from(Self {
179            auth: None,
180            connection_string: Some(String::from("DefaultEndpointsProtocol=https;AccountName=some-account-name;AccountKey=some-account-key;").into()),
181            account_name: None,
182            blob_endpoint: None,
183            container_name: String::from("logs"),
184            blob_prefix: default_blob_prefix(),
185            blob_time_format: Some(String::from("%s")),
186            blob_append_uuid: Some(true),
187            encoding: (Some(NewlineDelimitedEncoderConfig::new()), JsonSerializerConfig::default()).into(),
188            compression: Compression::gzip_default(),
189            batch: BatchConfig::default(),
190            request: TowerRequestConfig::default(),
191            acknowledgements: Default::default(),
192            tls: None,
193        })
194        .unwrap()
195    }
196}
197
198#[async_trait::async_trait]
199#[typetag::serde(name = "azure_blob")]
200impl SinkConfig for AzureBlobSinkConfig {
201    async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
202        let connection_string: String = match (
203            &self.connection_string,
204            &self.account_name,
205            &self.blob_endpoint,
206        ) {
207            (Some(connstr), None, None) => connstr.inner().into(),
208            (None, Some(account_name), None) => {
209                if self.auth.is_none() {
210                    return Err(
211                        "`auth` configuration must be provided when using `account_name`".into(),
212                    );
213                }
214                format!("AccountName={}", account_name)
215            }
216            (None, None, Some(blob_endpoint)) => {
217                if self.auth.is_none() {
218                    return Err(
219                        "`auth` configuration must be provided when using `blob_endpoint`".into(),
220                    );
221                }
222                // BlobEndpoint must always end in a trailing slash
223                let blob_endpoint = if blob_endpoint.ends_with('/') {
224                    blob_endpoint.clone()
225                } else {
226                    format!("{}/", blob_endpoint)
227                };
228                format!("BlobEndpoint={}", blob_endpoint)
229            }
230            (None, None, None) => {
231                return Err("One of `connection_string`, `account_name`, or `blob_endpoint` must be provided".into());
232            }
233            (Some(_), Some(_), _) => {
234                return Err("Cannot provide both `connection_string` and `account_name`".into());
235            }
236            (Some(_), _, Some(_)) => {
237                return Err("Cannot provide both `connection_string` and `blob_endpoint`".into());
238            }
239            (_, Some(_), Some(_)) => {
240                return Err("Cannot provide both `account_name` and `blob_endpoint`".into());
241            }
242        };
243
244        let client = azure_common::config::build_client(
245            self.auth.clone(),
246            connection_string.clone(),
247            self.container_name.clone(),
248            cx.proxy(),
249            self.tls.clone(),
250        )
251        .await?;
252
253        let healthcheck = azure_common::config::build_healthcheck(
254            self.container_name.clone(),
255            Arc::clone(&client),
256        )?;
257        let sink = self.build_processor(client)?;
258        Ok((sink, healthcheck))
259    }
260
261    fn input(&self) -> Input {
262        Input::new(self.encoding.config().1.input_type() & DataType::Log)
263    }
264
265    fn acknowledgements(&self) -> &AcknowledgementsConfig {
266        &self.acknowledgements
267    }
268}
269
270const DEFAULT_KEY_PREFIX: &str = "blob/%F/";
271const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s";
272const DEFAULT_FILENAME_APPEND_UUID: bool = true;
273
274impl AzureBlobSinkConfig {
275    pub fn build_processor(&self, client: Arc<BlobContainerClient>) -> crate::Result<VectorSink> {
276        let request_limits = self.request.into_settings();
277        let service = ServiceBuilder::new()
278            .settings(request_limits, AzureBlobRetryLogic)
279            .service(AzureBlobService::new(client));
280
281        // Configure our partitioning/batching.
282        let batcher_settings = self.batch.into_batcher_settings()?;
283
284        let blob_time_format = self
285            .blob_time_format
286            .as_ref()
287            .cloned()
288            .unwrap_or_else(|| DEFAULT_FILENAME_TIME_FORMAT.into());
289        let blob_append_uuid = self
290            .blob_append_uuid
291            .unwrap_or(DEFAULT_FILENAME_APPEND_UUID);
292
293        let transformer = self.encoding.transformer();
294        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
295        let encoder = Encoder::<Framer>::new(framer, serializer);
296
297        let request_options = AzureBlobRequestOptions {
298            container_name: self.container_name.clone(),
299            blob_time_format,
300            blob_append_uuid,
301            encoder: (transformer, encoder),
302            compression: self.compression,
303        };
304
305        let sink = AzureBlobSink::new(
306            service,
307            request_options,
308            self.key_partitioner()?,
309            batcher_settings,
310        );
311
312        Ok(VectorSink::from_event_streamsink(sink))
313    }
314
315    pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
316        Ok(KeyPartitioner::new(self.blob_prefix.clone(), None))
317    }
318}