vector/sinks/azure_blob/
config.rs

1use std::sync::Arc;
2
3use azure_storage_blobs::prelude::*;
4use tower::ServiceBuilder;
5use vector_lib::{
6    codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
7    configurable::configurable_component,
8    sensitive_string::SensitiveString,
9};
10
11use super::request_builder::AzureBlobRequestOptions;
12use crate::{
13    Result,
14    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
15    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
16    sinks::{
17        Healthcheck, VectorSink,
18        azure_common::{
19            self, config::AzureBlobRetryLogic, service::AzureBlobService, sink::AzureBlobSink,
20        },
21        util::{
22            BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
23            TowerRequestConfig, partitioner::KeyPartitioner, service::TowerRequestConfigDefaults,
24        },
25    },
26    template::Template,
27};
28
29#[derive(Clone, Copy, Debug)]
30pub struct AzureBlobTowerRequestConfigDefaults;
31
32impl TowerRequestConfigDefaults for AzureBlobTowerRequestConfigDefaults {
33    const RATE_LIMIT_NUM: u64 = 250;
34}
35
36/// Configuration for the `azure_blob` sink.
37#[configurable_component(sink(
38    "azure_blob",
39    "Store your observability data in Azure Blob Storage."
40))]
41#[derive(Clone, Debug)]
42#[serde(deny_unknown_fields)]
43pub struct AzureBlobSinkConfig {
44    /// The Azure Blob Storage Account connection string.
45    ///
46    /// Authentication with an access key or shared access signature (SAS)
47    /// are supported authentication methods. If using a non-account SAS,
48    /// healthchecks will fail and will need to be disabled by setting
49    /// `healthcheck.enabled` to `false` for this sink
50    ///
51    /// When generating an account SAS, the following are the minimum required option
52    /// settings for Vector to access blob storage and pass a health check.
53    /// | Option                 | Value              |
54    /// | ---------------------- | ------------------ |
55    /// | Allowed services       | Blob               |
56    /// | Allowed resource types | Container & Object |
57    /// | Allowed permissions    | Read & Create      |
58    #[configurable(metadata(
59        docs::examples = "DefaultEndpointsProtocol=https;AccountName=mylogstorage;AccountKey=storageaccountkeybase64encoded;EndpointSuffix=core.windows.net"
60    ))]
61    #[configurable(metadata(
62        docs::examples = "BlobEndpoint=https://mylogstorage.blob.core.windows.net/;SharedAccessSignature=generatedsastoken"
63    ))]
64    pub connection_string: SensitiveString,
65
66    /// The Azure Blob Storage Account container name.
67    #[configurable(metadata(docs::examples = "my-logs"))]
68    pub(super) container_name: String,
69
70    /// A prefix to apply to all blob keys.
71    ///
72    /// Prefixes are useful for partitioning objects, such as by creating a blob key that
73    /// stores blobs under a particular directory. If using a prefix for this purpose, it must end
74    /// in `/` to act as a directory path. A trailing `/` is **not** automatically added.
75    #[configurable(metadata(docs::examples = "date/%F/hour/%H/"))]
76    #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d/"))]
77    #[configurable(metadata(
78        docs::examples = "kubernetes/{{ metadata.cluster }}/{{ metadata.application_name }}/"
79    ))]
80    #[serde(default = "default_blob_prefix")]
81    pub blob_prefix: Template,
82
83    /// The timestamp format for the time component of the blob key.
84    ///
85    /// By default, blob keys are appended with a timestamp that reflects when the blob are sent to
86    /// Azure Blob Storage, such that the resulting blob key is functionally equivalent to joining
87    /// the blob prefix with the formatted timestamp, such as `date=2022-07-18/1658176486`.
88    ///
89    /// This would represent a `blob_prefix` set to `date=%F/` and the timestamp of Mon Jul 18 2022
90    /// 20:34:44 GMT+0000, with the `filename_time_format` being set to `%s`, which renders
91    /// timestamps in seconds since the Unix epoch.
92    ///
93    /// Supports the common [`strftime`][chrono_strftime_specifiers] specifiers found in most
94    /// languages.
95    ///
96    /// When set to an empty string, no timestamp is appended to the blob prefix.
97    ///
98    /// [chrono_strftime_specifiers]: https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers
99    #[configurable(metadata(docs::syntax_override = "strftime"))]
100    pub blob_time_format: Option<String>,
101
102    /// Whether or not to append a UUID v4 token to the end of the blob key.
103    ///
104    /// The UUID is appended to the timestamp portion of the object key, such that if the blob key
105    /// generated is `date=2022-07-18/1658176486`, setting this field to `true` results
106    /// in an blob key that looks like
107    /// `date=2022-07-18/1658176486-30f6652c-71da-4f9f-800d-a1189c47c547`.
108    ///
109    /// This ensures there are no name collisions, and can be useful in high-volume workloads where
110    /// blob keys must be unique.
111    pub blob_append_uuid: Option<bool>,
112
113    #[serde(flatten)]
114    pub encoding: EncodingConfigWithFraming,
115
116    /// Compression configuration.
117    ///
118    /// All compression algorithms use the default compression level unless otherwise specified.
119    ///
120    /// Some cloud storage API clients and browsers handle decompression transparently, so
121    /// depending on how they are accessed, files may not always appear to be compressed.
122    #[configurable(derived)]
123    #[serde(default = "Compression::gzip_default")]
124    pub compression: Compression,
125
126    #[configurable(derived)]
127    #[serde(default)]
128    pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
129
130    #[configurable(derived)]
131    #[serde(default)]
132    pub request: TowerRequestConfig<AzureBlobTowerRequestConfigDefaults>,
133
134    #[configurable(derived)]
135    #[serde(
136        default,
137        deserialize_with = "crate::serde::bool_or_struct",
138        skip_serializing_if = "crate::serde::is_default"
139    )]
140    pub(super) acknowledgements: AcknowledgementsConfig,
141}
142
143pub fn default_blob_prefix() -> Template {
144    Template::try_from(DEFAULT_KEY_PREFIX).unwrap()
145}
146
147impl GenerateConfig for AzureBlobSinkConfig {
148    fn generate_config() -> toml::Value {
149        toml::Value::try_from(Self {
150            connection_string: String::from("DefaultEndpointsProtocol=https;AccountName=some-account-name;AccountKey=some-account-key;").into(),
151            container_name: String::from("logs"),
152            blob_prefix: default_blob_prefix(),
153            blob_time_format: Some(String::from("%s")),
154            blob_append_uuid: Some(true),
155            encoding: (Some(NewlineDelimitedEncoderConfig::new()), JsonSerializerConfig::default()).into(),
156            compression: Compression::gzip_default(),
157            batch: BatchConfig::default(),
158            request: TowerRequestConfig::default(),
159            acknowledgements: Default::default(),
160        })
161        .unwrap()
162    }
163}
164
165#[async_trait::async_trait]
166#[typetag::serde(name = "azure_blob")]
167impl SinkConfig for AzureBlobSinkConfig {
168    async fn build(&self, _cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
169        let client = azure_common::config::build_client(
170            self.connection_string.clone().into(),
171            self.container_name.clone(),
172        )?;
173
174        let healthcheck = azure_common::config::build_healthcheck(
175            self.container_name.clone(),
176            Arc::clone(&client),
177        )?;
178        let sink = self.build_processor(client)?;
179        Ok((sink, healthcheck))
180    }
181
182    fn input(&self) -> Input {
183        Input::new(self.encoding.config().1.input_type() & DataType::Log)
184    }
185
186    fn acknowledgements(&self) -> &AcknowledgementsConfig {
187        &self.acknowledgements
188    }
189}
190
191const DEFAULT_KEY_PREFIX: &str = "blob/%F/";
192const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s";
193const DEFAULT_FILENAME_APPEND_UUID: bool = true;
194
195impl AzureBlobSinkConfig {
196    pub fn build_processor(&self, client: Arc<ContainerClient>) -> crate::Result<VectorSink> {
197        let request_limits = self.request.into_settings();
198        let service = ServiceBuilder::new()
199            .settings(request_limits, AzureBlobRetryLogic)
200            .service(AzureBlobService::new(client));
201
202        // Configure our partitioning/batching.
203        let batcher_settings = self.batch.into_batcher_settings()?;
204
205        let blob_time_format = self
206            .blob_time_format
207            .as_ref()
208            .cloned()
209            .unwrap_or_else(|| DEFAULT_FILENAME_TIME_FORMAT.into());
210        let blob_append_uuid = self
211            .blob_append_uuid
212            .unwrap_or(DEFAULT_FILENAME_APPEND_UUID);
213
214        let transformer = self.encoding.transformer();
215        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
216        let encoder = Encoder::<Framer>::new(framer, serializer);
217
218        let request_options = AzureBlobRequestOptions {
219            container_name: self.container_name.clone(),
220            blob_time_format,
221            blob_append_uuid,
222            encoder: (transformer, encoder),
223            compression: self.compression,
224        };
225
226        let sink = AzureBlobSink::new(
227            service,
228            request_options,
229            self.key_partitioner()?,
230            batcher_settings,
231        );
232
233        Ok(VectorSink::from_event_streamsink(sink))
234    }
235
236    pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
237        Ok(KeyPartitioner::new(self.blob_prefix.clone(), None))
238    }
239}