vector/sinks/azure_blob/
config.rs

1use std::sync::Arc;
2
3use azure_storage_blobs::prelude::*;
4use tower::ServiceBuilder;
5use vector_lib::codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig};
6use vector_lib::configurable::configurable_component;
7use vector_lib::sensitive_string::SensitiveString;
8
9use super::request_builder::AzureBlobRequestOptions;
10use crate::sinks::util::service::TowerRequestConfigDefaults;
11use crate::{
12    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
13    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
14    sinks::{
15        azure_common::{
16            self, config::AzureBlobRetryLogic, service::AzureBlobService, sink::AzureBlobSink,
17        },
18        util::{
19            partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings,
20            Compression, ServiceBuilderExt, TowerRequestConfig,
21        },
22        Healthcheck, VectorSink,
23    },
24    template::Template,
25    Result,
26};
27
28#[derive(Clone, Copy, Debug)]
29pub struct AzureBlobTowerRequestConfigDefaults;
30
31impl TowerRequestConfigDefaults for AzureBlobTowerRequestConfigDefaults {
32    const RATE_LIMIT_NUM: u64 = 250;
33}
34
35/// Configuration for the `azure_blob` sink.
36#[configurable_component(sink(
37    "azure_blob",
38    "Store your observability data in Azure Blob Storage."
39))]
40#[derive(Clone, Debug)]
41#[serde(deny_unknown_fields)]
42pub struct AzureBlobSinkConfig {
43    /// The Azure Blob Storage Account connection string.
44    ///
45    /// Authentication with an access key or shared access signature (SAS)
46    /// are supported authentication methods. If using a non-account SAS,
47    /// healthchecks will fail and will need to be disabled by setting
48    /// `healthcheck.enabled` to `false` for this sink
49    ///
50    /// When generating an account SAS, the following are the minimum required option
51    /// settings for Vector to access blob storage and pass a health check.
52    /// | Option                 | Value              |
53    /// | ---------------------- | ------------------ |
54    /// | Allowed services       | Blob               |
55    /// | Allowed resource types | Container & Object |
56    /// | Allowed permissions    | Read & Create      |
57    ///
58    /// Either `storage_account`, or this field, must be specified.
59    #[configurable(metadata(
60        docs::examples = "DefaultEndpointsProtocol=https;AccountName=mylogstorage;AccountKey=storageaccountkeybase64encoded;EndpointSuffix=core.windows.net"
61    ))]
62    #[configurable(metadata(
63        docs::examples = "BlobEndpoint=https://mylogstorage.blob.core.windows.net/;SharedAccessSignature=generatedsastoken"
64    ))]
65    pub connection_string: Option<SensitiveString>,
66
67    /// The Azure Blob Storage Account name.
68    ///
69    /// Attempts to load credentials for the account in the following ways, in order:
70    ///
71    /// - read from environment variables ([more information][env_cred_docs])
72    /// - looks for a [Managed Identity][managed_ident_docs]
73    /// - uses the `az` CLI tool to get an access token ([more information][az_cli_docs])
74    ///
75    /// Either `connection_string`, or this field, must be specified.
76    ///
77    /// [env_cred_docs]: https://docs.rs/azure_identity/latest/azure_identity/struct.EnvironmentCredential.html
78    /// [managed_ident_docs]: https://docs.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/overview
79    /// [az_cli_docs]: https://docs.microsoft.com/en-us/cli/azure/account?view=azure-cli-latest#az-account-get-access-token
80    #[configurable(metadata(docs::examples = "mylogstorage"))]
81    pub storage_account: Option<String>,
82
83    /// The Azure Blob Storage Endpoint URL.
84    ///
85    /// This is used to override the default blob storage endpoint URL in cases where you are using
86    /// credentials read from the environment/managed identities or access tokens without using an
87    /// explicit connection_string (which already explicitly supports overriding the blob endpoint
88    /// URL).
89    ///
90    /// This may only be used with `storage_account` and is ignored when used with
91    /// `connection_string`.
92    #[configurable(metadata(docs::examples = "https://test.blob.core.usgovcloudapi.net/"))]
93    #[configurable(metadata(docs::examples = "https://test.blob.core.windows.net/"))]
94    pub endpoint: Option<String>,
95
96    /// The Azure Blob Storage Account container name.
97    #[configurable(metadata(docs::examples = "my-logs"))]
98    pub(super) container_name: String,
99
100    /// A prefix to apply to all blob keys.
101    ///
102    /// Prefixes are useful for partitioning objects, such as by creating a blob key that
103    /// stores blobs under a particular directory. If using a prefix for this purpose, it must end
104    /// in `/` to act as a directory path. A trailing `/` is **not** automatically added.
105    #[configurable(metadata(docs::examples = "date/%F/hour/%H/"))]
106    #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d/"))]
107    #[configurable(metadata(
108        docs::examples = "kubernetes/{{ metadata.cluster }}/{{ metadata.application_name }}/"
109    ))]
110    #[serde(default = "default_blob_prefix")]
111    pub blob_prefix: Template,
112
113    /// The timestamp format for the time component of the blob key.
114    ///
115    /// By default, blob keys are appended with a timestamp that reflects when the blob are sent to
116    /// Azure Blob Storage, such that the resulting blob key is functionally equivalent to joining
117    /// the blob prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
118    ///
119    /// This would represent a `blob_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
120    /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
121    /// timestamps in seconds since the Unix epoch.
122    ///
123    /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
124    /// languages.
125    ///
126    /// When set to an empty string, no timestamp is appended to the blob prefix.
127    ///
128    /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
129    #[configurable(metadata(docs::syntax_override = "strftime"))]
130    pub blob_time_format: Option<String>,
131
132    /// Whether or not to append a UUID v4 token to the end of the blob key.
133    ///
134    /// The UUID is appended to the timestamp portion of the object key, such that if the blob key
135    /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
136    /// in an blob key that looks like
137    /// `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
138    ///
139    /// This ensures there are no name collisions, and can be useful in high-volume workloads where
140    /// blob keys must be unique.
141    pub blob_append_uuid: Option<bool>,
142
143    #[serde(flatten)]
144    pub encoding: EncodingConfigWithFraming,
145
146    /// Compression configuration.
147    ///
148    /// All compression algorithms use the default compression level unless otherwise specified.
149    ///
150    /// Some cloud storage API clients and browsers handle decompression transparently, so
151    /// depending on how they are accessed, files may not always appear to be compressed.
152    #[configurable(derived)]
153    #[serde(default = "Compression::gzip_default")]
154    pub compression: Compression,
155
156    #[configurable(derived)]
157    #[serde(default)]
158    pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
159
160    #[configurable(derived)]
161    #[serde(default)]
162    pub request: TowerRequestConfig<AzureBlobTowerRequestConfigDefaults>,
163
164    #[configurable(derived)]
165    #[serde(
166        default,
167        deserialize_with = "crate::serde::bool_or_struct",
168        skip_serializing_if = "crate::serde::is_default"
169    )]
170    pub(super) acknowledgements: AcknowledgementsConfig,
171}
172
173pub fn default_blob_prefix() -> Template {
174    Template::try_from(DEFAULT_KEY_PREFIX).unwrap()
175}
176
177impl GenerateConfig for AzureBlobSinkConfig {
178    fn generate_config() -> toml::Value {
179        toml::Value::try_from(Self {
180            connection_string: Some(String::from("DefaultEndpointsProtocol=https;AccountName=some-account-name;AccountKey=some-account-key;").into()),
181            storage_account: Some(String::from("some-account-name")),
182            container_name: String::from("logs"),
183            endpoint: None,
184            blob_prefix: default_blob_prefix(),
185            blob_time_format: Some(String::from("%s")),
186            blob_append_uuid: Some(true),
187            encoding: (Some(NewlineDelimitedEncoderConfig::new()), JsonSerializerConfig::default()).into(),
188            compression: Compression::gzip_default(),
189            batch: BatchConfig::default(),
190            request: TowerRequestConfig::default(),
191            acknowledgements: Default::default(),
192        })
193        .unwrap()
194    }
195}
196
197#[async_trait::async_trait]
198#[typetag::serde(name = "azure_blob")]
199impl SinkConfig for AzureBlobSinkConfig {
200    async fn build(&self, _cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
201        let client = azure_common::config::build_client(
202            self.connection_string
203                .as_ref()
204                .map(|v| v.inner().to_string()),
205            self.storage_account.as_ref().map(|v| v.to_string()),
206            self.container_name.clone(),
207            self.endpoint.clone(),
208        )?;
209
210        let healthcheck = azure_common::config::build_healthcheck(
211            self.container_name.clone(),
212            Arc::clone(&client),
213        )?;
214        let sink = self.build_processor(client)?;
215        Ok((sink, healthcheck))
216    }
217
218    fn input(&self) -> Input {
219        Input::new(self.encoding.config().1.input_type() & DataType::Log)
220    }
221
222    fn acknowledgements(&self) -> &AcknowledgementsConfig {
223        &self.acknowledgements
224    }
225}
226
227const DEFAULT_KEY_PREFIX: &str = "blob/%F/";
228const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s";
229const DEFAULT_FILENAME_APPEND_UUID: bool = true;
230
231impl AzureBlobSinkConfig {
232    pub fn build_processor(&self, client: Arc<ContainerClient>) -> crate::Result<VectorSink> {
233        let request_limits = self.request.into_settings();
234        let service = ServiceBuilder::new()
235            .settings(request_limits, AzureBlobRetryLogic)
236            .service(AzureBlobService::new(client));
237
238        // Configure our partitioning/batching.
239        let batcher_settings = self.batch.into_batcher_settings()?;
240
241        let blob_time_format = self
242            .blob_time_format
243            .as_ref()
244            .cloned()
245            .unwrap_or_else(|| DEFAULT_FILENAME_TIME_FORMAT.into());
246        let blob_append_uuid = self
247            .blob_append_uuid
248            .unwrap_or(DEFAULT_FILENAME_APPEND_UUID);
249
250        let transformer = self.encoding.transformer();
251        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
252        let encoder = Encoder::<Framer>::new(framer, serializer);
253
254        let request_options = AzureBlobRequestOptions {
255            container_name: self.container_name.clone(),
256            blob_time_format,
257            blob_append_uuid,
258            encoder: (transformer, encoder),
259            compression: self.compression,
260        };
261
262        let sink = AzureBlobSink::new(
263            service,
264            request_options,
265            self.key_partitioner()?,
266            batcher_settings,
267        );
268
269        Ok(VectorSink::from_event_streamsink(sink))
270    }
271
272    pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
273        Ok(KeyPartitioner::new(self.blob_prefix.clone(), None))
274    }
275}