vector/sinks/azure_blob/
config.rs1use std::sync::Arc;
2
3use azure_storage_blobs::prelude::*;
4use tower::ServiceBuilder;
5use vector_lib::{
6 codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
7 configurable::configurable_component,
8 sensitive_string::SensitiveString,
9};
10
11use super::request_builder::AzureBlobRequestOptions;
12use crate::{
13 Result,
14 codecs::{Encoder, EncodingConfigWithFraming, SinkType},
15 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
16 sinks::{
17 Healthcheck, VectorSink,
18 azure_common::{
19 self, config::AzureBlobRetryLogic, service::AzureBlobService, sink::AzureBlobSink,
20 },
21 util::{
22 BatchConfig, BulkSizeBasedDefaultBatchSettings, Compression, ServiceBuilderExt,
23 TowerRequestConfig, partitioner::KeyPartitioner, service::TowerRequestConfigDefaults,
24 },
25 },
26 template::Template,
27};
28
29#[derive(Clone, Copy, Debug)]
30pub struct AzureBlobTowerRequestConfigDefaults;
31
32impl TowerRequestConfigDefaults for AzureBlobTowerRequestConfigDefaults {
33 const RATE_LIMIT_NUM: u64 = 250;
34}
35
36#[configurable_component(sink(
38 "azure_blob",
39 "Store your observability data in Azure Blob Storage."
40))]
41#[derive(Clone, Debug)]
42#[serde(deny_unknown_fields)]
43pub struct AzureBlobSinkConfig {
44 #[configurable(metadata(
59 docs::examples = "DefaultEndpointsProtocol=https;AccountName=mylogstorage;AccountKey=storageaccountkeybase64encoded;EndpointSuffix=core.windows.net"
60 ))]
61 #[configurable(metadata(
62 docs::examples = "BlobEndpoint=https://mylogstorage.blob.core.windows.net/;SharedAccessSignature=generatedsastoken"
63 ))]
64 pub connection_string: SensitiveString,
65
66 #[configurable(metadata(docs::examples = "my-logs"))]
68 pub(super) container_name: String,
69
70 #[configurable(metadata(docs::examples = "date/%F/hour/%H/"))]
76 #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d/"))]
77 #[configurable(metadata(
78 docs::examples = "kubernetes/{{ metadata.cluster }}/{{ metadata.application_name }}/"
79 ))]
80 #[serde(default = "default_blob_prefix")]
81 pub blob_prefix: Template,
82
83 #[configurable(metadata(docs::syntax_override = "strftime"))]
100 pub blob_time_format: Option<String>,
101
102 pub blob_append_uuid: Option<bool>,
112
113 #[serde(flatten)]
114 pub encoding: EncodingConfigWithFraming,
115
116 #[configurable(derived)]
123 #[serde(default = "Compression::gzip_default")]
124 pub compression: Compression,
125
126 #[configurable(derived)]
127 #[serde(default)]
128 pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
129
130 #[configurable(derived)]
131 #[serde(default)]
132 pub request: TowerRequestConfig<AzureBlobTowerRequestConfigDefaults>,
133
134 #[configurable(derived)]
135 #[serde(
136 default,
137 deserialize_with = "crate::serde::bool_or_struct",
138 skip_serializing_if = "crate::serde::is_default"
139 )]
140 pub(super) acknowledgements: AcknowledgementsConfig,
141}
142
143pub fn default_blob_prefix() -> Template {
144 Template::try_from(DEFAULT_KEY_PREFIX).unwrap()
145}
146
147impl GenerateConfig for AzureBlobSinkConfig {
148 fn generate_config() -> toml::Value {
149 toml::Value::try_from(Self {
150 connection_string: String::from("DefaultEndpointsProtocol=https;AccountName=some-account-name;AccountKey=some-account-key;").into(),
151 container_name: String::from("logs"),
152 blob_prefix: default_blob_prefix(),
153 blob_time_format: Some(String::from("%s")),
154 blob_append_uuid: Some(true),
155 encoding: (Some(NewlineDelimitedEncoderConfig::new()), JsonSerializerConfig::default()).into(),
156 compression: Compression::gzip_default(),
157 batch: BatchConfig::default(),
158 request: TowerRequestConfig::default(),
159 acknowledgements: Default::default(),
160 })
161 .unwrap()
162 }
163}
164
165#[async_trait::async_trait]
166#[typetag::serde(name = "azure_blob")]
167impl SinkConfig for AzureBlobSinkConfig {
168 async fn build(&self, _cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
169 let client = azure_common::config::build_client(
170 self.connection_string.clone().into(),
171 self.container_name.clone(),
172 )?;
173
174 let healthcheck = azure_common::config::build_healthcheck(
175 self.container_name.clone(),
176 Arc::clone(&client),
177 )?;
178 let sink = self.build_processor(client)?;
179 Ok((sink, healthcheck))
180 }
181
182 fn input(&self) -> Input {
183 Input::new(self.encoding.config().1.input_type() & DataType::Log)
184 }
185
186 fn acknowledgements(&self) -> &AcknowledgementsConfig {
187 &self.acknowledgements
188 }
189}
190
191const DEFAULT_KEY_PREFIX: &str = "blob/%F/";
192const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s";
193const DEFAULT_FILENAME_APPEND_UUID: bool = true;
194
195impl AzureBlobSinkConfig {
196 pub fn build_processor(&self, client: Arc<ContainerClient>) -> crate::Result<VectorSink> {
197 let request_limits = self.request.into_settings();
198 let service = ServiceBuilder::new()
199 .settings(request_limits, AzureBlobRetryLogic)
200 .service(AzureBlobService::new(client));
201
202 let batcher_settings = self.batch.into_batcher_settings()?;
204
205 let blob_time_format = self
206 .blob_time_format
207 .as_ref()
208 .cloned()
209 .unwrap_or_else(|| DEFAULT_FILENAME_TIME_FORMAT.into());
210 let blob_append_uuid = self
211 .blob_append_uuid
212 .unwrap_or(DEFAULT_FILENAME_APPEND_UUID);
213
214 let transformer = self.encoding.transformer();
215 let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
216 let encoder = Encoder::<Framer>::new(framer, serializer);
217
218 let request_options = AzureBlobRequestOptions {
219 container_name: self.container_name.clone(),
220 blob_time_format,
221 blob_append_uuid,
222 encoder: (transformer, encoder),
223 compression: self.compression,
224 };
225
226 let sink = AzureBlobSink::new(
227 service,
228 request_options,
229 self.key_partitioner()?,
230 batcher_settings,
231 );
232
233 Ok(VectorSink::from_event_streamsink(sink))
234 }
235
236 pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
237 Ok(KeyPartitioner::new(self.blob_prefix.clone(), None))
238 }
239}