vector/sinks/azure_blob/
config.rs1use std::sync::Arc;
2
3use azure_storage_blobs::prelude::*;
4use tower::ServiceBuilder;
5use vector_lib::codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig};
6use vector_lib::configurable::configurable_component;
7use vector_lib::sensitive_string::SensitiveString;
8
9use super::request_builder::AzureBlobRequestOptions;
10use crate::sinks::util::service::TowerRequestConfigDefaults;
11use crate::{
12 codecs::{Encoder, EncodingConfigWithFraming, SinkType},
13 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
14 sinks::{
15 azure_common::{
16 self, config::AzureBlobRetryLogic, service::AzureBlobService, sink::AzureBlobSink,
17 },
18 util::{
19 partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings,
20 Compression, ServiceBuilderExt, TowerRequestConfig,
21 },
22 Healthcheck, VectorSink,
23 },
24 template::Template,
25 Result,
26};
27
28#[derive(Clone, Copy, Debug)]
29pub struct AzureBlobTowerRequestConfigDefaults;
30
31impl TowerRequestConfigDefaults for AzureBlobTowerRequestConfigDefaults {
32 const RATE_LIMIT_NUM: u64 = 250;
33}
34
35#[configurable_component(sink(
37 "azure_blob",
38 "Store your observability data in Azure Blob Storage."
39))]
40#[derive(Clone, Debug)]
41#[serde(deny_unknown_fields)]
42pub struct AzureBlobSinkConfig {
43 #[configurable(metadata(
60 docs::examples = "DefaultEndpointsProtocol=https;AccountName=mylogstorage;AccountKey=storageaccountkeybase64encoded;EndpointSuffix=core.windows.net"
61 ))]
62 #[configurable(metadata(
63 docs::examples = "BlobEndpoint=https://mylogstorage.blob.core.windows.net/;SharedAccessSignature=generatedsastoken"
64 ))]
65 pub connection_string: Option<SensitiveString>,
66
67 #[configurable(metadata(docs::examples = "mylogstorage"))]
81 pub storage_account: Option<String>,
82
83 #[configurable(metadata(docs::examples = "https://test.blob.core.usgovcloudapi.net/"))]
93 #[configurable(metadata(docs::examples = "https://test.blob.core.windows.net/"))]
94 pub endpoint: Option<String>,
95
96 #[configurable(metadata(docs::examples = "my-logs"))]
98 pub(super) container_name: String,
99
100 #[configurable(metadata(docs::examples = "date/%F/hour/%H/"))]
106 #[configurable(metadata(docs::examples = "year=%Y/month=%m/day=%d/"))]
107 #[configurable(metadata(
108 docs::examples = "kubernetes/{{ metadata.cluster }}/{{ metadata.application_name }}/"
109 ))]
110 #[serde(default = "default_blob_prefix")]
111 pub blob_prefix: Template,
112
113 #[configurable(metadata(docs::syntax_override = "strftime"))]
130 pub blob_time_format: Option<String>,
131
132 pub blob_append_uuid: Option<bool>,
142
143 #[serde(flatten)]
144 pub encoding: EncodingConfigWithFraming,
145
146 #[configurable(derived)]
153 #[serde(default = "Compression::gzip_default")]
154 pub compression: Compression,
155
156 #[configurable(derived)]
157 #[serde(default)]
158 pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
159
160 #[configurable(derived)]
161 #[serde(default)]
162 pub request: TowerRequestConfig<AzureBlobTowerRequestConfigDefaults>,
163
164 #[configurable(derived)]
165 #[serde(
166 default,
167 deserialize_with = "crate::serde::bool_or_struct",
168 skip_serializing_if = "crate::serde::is_default"
169 )]
170 pub(super) acknowledgements: AcknowledgementsConfig,
171}
172
173pub fn default_blob_prefix() -> Template {
174 Template::try_from(DEFAULT_KEY_PREFIX).unwrap()
175}
176
177impl GenerateConfig for AzureBlobSinkConfig {
178 fn generate_config() -> toml::Value {
179 toml::Value::try_from(Self {
180 connection_string: Some(String::from("DefaultEndpointsProtocol=https;AccountName=some-account-name;AccountKey=some-account-key;").into()),
181 storage_account: Some(String::from("some-account-name")),
182 container_name: String::from("logs"),
183 endpoint: None,
184 blob_prefix: default_blob_prefix(),
185 blob_time_format: Some(String::from("%s")),
186 blob_append_uuid: Some(true),
187 encoding: (Some(NewlineDelimitedEncoderConfig::new()), JsonSerializerConfig::default()).into(),
188 compression: Compression::gzip_default(),
189 batch: BatchConfig::default(),
190 request: TowerRequestConfig::default(),
191 acknowledgements: Default::default(),
192 })
193 .unwrap()
194 }
195}
196
197#[async_trait::async_trait]
198#[typetag::serde(name = "azure_blob")]
199impl SinkConfig for AzureBlobSinkConfig {
200 async fn build(&self, _cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
201 let client = azure_common::config::build_client(
202 self.connection_string
203 .as_ref()
204 .map(|v| v.inner().to_string()),
205 self.storage_account.as_ref().map(|v| v.to_string()),
206 self.container_name.clone(),
207 self.endpoint.clone(),
208 )?;
209
210 let healthcheck = azure_common::config::build_healthcheck(
211 self.container_name.clone(),
212 Arc::clone(&client),
213 )?;
214 let sink = self.build_processor(client)?;
215 Ok((sink, healthcheck))
216 }
217
218 fn input(&self) -> Input {
219 Input::new(self.encoding.config().1.input_type() & DataType::Log)
220 }
221
222 fn acknowledgements(&self) -> &AcknowledgementsConfig {
223 &self.acknowledgements
224 }
225}
226
227const DEFAULT_KEY_PREFIX: &str = "blob/%F/";
228const DEFAULT_FILENAME_TIME_FORMAT: &str = "%s";
229const DEFAULT_FILENAME_APPEND_UUID: bool = true;
230
231impl AzureBlobSinkConfig {
232 pub fn build_processor(&self, client: Arc<ContainerClient>) -> crate::Result<VectorSink> {
233 let request_limits = self.request.into_settings();
234 let service = ServiceBuilder::new()
235 .settings(request_limits, AzureBlobRetryLogic)
236 .service(AzureBlobService::new(client));
237
238 let batcher_settings = self.batch.into_batcher_settings()?;
240
241 let blob_time_format = self
242 .blob_time_format
243 .as_ref()
244 .cloned()
245 .unwrap_or_else(|| DEFAULT_FILENAME_TIME_FORMAT.into());
246 let blob_append_uuid = self
247 .blob_append_uuid
248 .unwrap_or(DEFAULT_FILENAME_APPEND_UUID);
249
250 let transformer = self.encoding.transformer();
251 let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
252 let encoder = Encoder::<Framer>::new(framer, serializer);
253
254 let request_options = AzureBlobRequestOptions {
255 container_name: self.container_name.clone(),
256 blob_time_format,
257 blob_append_uuid,
258 encoder: (transformer, encoder),
259 compression: self.compression,
260 };
261
262 let sink = AzureBlobSink::new(
263 service,
264 request_options,
265 self.key_partitioner()?,
266 batcher_settings,
267 );
268
269 Ok(VectorSink::from_event_streamsink(sink))
270 }
271
272 pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
273 Ok(KeyPartitioner::new(self.blob_prefix.clone(), None))
274 }
275}