vector/sinks/azure_logs_ingestion/
config.rs1use std::sync::Arc;
2
3use azure_core::credentials::{TokenCredential, TokenRequestOptions};
4use azure_core::http::ClientMethodOptions;
5use azure_core::{Error, error::ErrorKind};
6
7use azure_identity::{
8 AzureCliCredential, ClientAssertion, ClientAssertionCredential, ClientSecretCredential,
9 ManagedIdentityCredential, ManagedIdentityCredentialOptions, UserAssignedId,
10 WorkloadIdentityCredential,
11};
12use vector_lib::{configurable::configurable_component, schema, sensitive_string::SensitiveString};
13use vrl::value::Kind;
14
15use crate::{
16 http::{HttpClient, get_http_scheme_from_uri},
17 sinks::{
18 prelude::*,
19 util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic},
20 },
21};
22
23use super::{
24 service::{AzureLogsIngestionResponse, AzureLogsIngestionService},
25 sink::AzureLogsIngestionSink,
26};
27
28const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
30
31pub(super) fn default_scope() -> String {
32 "https://monitor.azure.com/.default".into()
33}
34
35pub(super) fn default_timestamp_field() -> String {
36 "TimeGenerated".into()
37}
38
39#[configurable_component(sink(
41 "azure_logs_ingestion",
42 "Publish log events to the Azure Monitor Logs Ingestion API."
43))]
44#[derive(Clone, Debug)]
45#[serde(deny_unknown_fields)]
46pub struct AzureLogsIngestionConfig {
47 #[configurable(metadata(
51 docs::examples = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"
52 ))]
53 pub endpoint: String,
54
55 #[configurable(metadata(docs::examples = "dcr-000a00a000a00000a000000aa000a0aa"))]
59 pub dcr_immutable_id: String,
60
61 #[configurable(metadata(docs::examples = "Custom-MyTable"))]
65 pub stream_name: String,
66
67 #[configurable(derived)]
68 #[serde(default)]
69 pub auth: AzureAuthentication,
70
71 #[configurable(metadata(docs::examples = "https://monitor.azure.us/.default"))]
75 #[configurable(metadata(docs::examples = "https://monitor.azure.cn/.default"))]
76 #[serde(default = "default_scope")]
77 pub(super) token_scope: String,
78
79 #[configurable(metadata(docs::examples = "EventStartTime"))]
86 #[configurable(metadata(docs::examples = "Timestamp"))]
87 #[serde(default = "default_timestamp_field")]
88 pub timestamp_field: String,
89
90 #[configurable(derived)]
91 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
92 pub encoding: Transformer,
93
94 #[configurable(derived)]
95 #[serde(default)]
96 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
97
98 #[configurable(derived)]
99 #[serde(default)]
100 pub request: TowerRequestConfig,
101
102 #[configurable(derived)]
103 pub tls: Option<TlsConfig>,
104
105 #[configurable(derived)]
106 #[serde(
107 default,
108 deserialize_with = "crate::serde::bool_or_struct",
109 skip_serializing_if = "crate::serde::is_default"
110 )]
111 pub acknowledgements: AcknowledgementsConfig,
112}
113
114impl Default for AzureLogsIngestionConfig {
115 fn default() -> Self {
116 Self {
117 endpoint: Default::default(),
118 dcr_immutable_id: Default::default(),
119 stream_name: Default::default(),
120 auth: Default::default(),
121 token_scope: default_scope(),
122 timestamp_field: default_timestamp_field(),
123 encoding: Default::default(),
124 batch: Default::default(),
125 request: Default::default(),
126 tls: None,
127 acknowledgements: Default::default(),
128 }
129 }
130}
131
132#[configurable_component]
134#[derive(Clone, Debug, Derivative, Eq, PartialEq)]
135#[derivative(Default)]
136#[serde(deny_unknown_fields, untagged)]
137pub enum AzureAuthentication {
138 #[derivative(Default)]
140 ClientSecretCredential {
141 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
145 azure_tenant_id: String,
146
147 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
151 azure_client_id: String,
152
153 #[configurable(metadata(docs::examples = "00-00~000000-0000000~0000000000000000000"))]
157 azure_client_secret: SensitiveString,
158 },
159
160 #[configurable(metadata(docs::enum_tag_description = "The kind of Azure credential to use."))]
162 Specific(SpecificAzureCredential),
163}
164
165#[configurable_component]
167#[derive(Clone, Debug, Eq, PartialEq)]
168#[serde(
169 tag = "azure_credential_kind",
170 rename_all = "snake_case",
171 deny_unknown_fields
172)]
173pub enum SpecificAzureCredential {
174 #[cfg(not(target_arch = "wasm32"))]
176 AzureCli {},
177
178 ManagedIdentity {
180 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
182 #[serde(default, skip_serializing_if = "Option::is_none")]
183 user_assigned_managed_identity_id: Option<String>,
184 },
185
186 ManagedIdentityClientAssertion {
188 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
190 #[serde(default, skip_serializing_if = "Option::is_none")]
191 user_assigned_managed_identity_id: Option<String>,
192
193 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
195 client_assertion_tenant_id: String,
196
197 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
199 client_assertion_client_id: String,
200 },
201
202 WorkloadIdentity {},
204}
205
206#[derive(Debug)]
207struct ManagedIdentityClientAssertion {
208 credential: Arc<dyn TokenCredential>,
209 scope: String,
210}
211
212#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
213#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
214impl ClientAssertion for ManagedIdentityClientAssertion {
215 async fn secret(&self, options: Option<ClientMethodOptions<'_>>) -> azure_core::Result<String> {
216 Ok(self
217 .credential
218 .get_token(
219 &[&self.scope],
220 Some(TokenRequestOptions {
221 method_options: options.unwrap_or_default(),
222 }),
223 )
224 .await?
225 .token
226 .secret()
227 .to_string())
228 }
229}
230
231impl AzureAuthentication {
232 pub async fn credential(&self) -> azure_core::Result<Arc<dyn TokenCredential>> {
234 match self {
235 Self::ClientSecretCredential {
236 azure_tenant_id,
237 azure_client_id,
238 azure_client_secret,
239 } => {
240 if azure_tenant_id.is_empty() {
241 return Err(Error::with_message(ErrorKind::Credential,
242 "`auth.azure_tenant_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
243 ));
244 }
245 if azure_client_id.is_empty() {
246 return Err(Error::with_message(ErrorKind::Credential,
247 "`auth.azure_client_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
248 ));
249 }
250 if azure_client_secret.inner().is_empty() {
251 return Err(Error::with_message(ErrorKind::Credential,
252 "`auth.azure_client_secret` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
253 ));
254 }
255 let secret: String = azure_client_secret.inner().into();
256 let credential: Arc<dyn TokenCredential> = ClientSecretCredential::new(
257 &azure_tenant_id.clone(),
258 azure_client_id.clone(),
259 secret.into(),
260 None,
261 )?;
262 Ok(credential)
263 }
264
265 Self::Specific(specific) => specific.credential().await,
266 }
267 }
268}
269
270impl SpecificAzureCredential {
271 pub async fn credential(&self) -> azure_core::Result<Arc<dyn TokenCredential>> {
273 let credential: Arc<dyn TokenCredential> = match self {
274 #[cfg(not(target_arch = "wasm32"))]
275 Self::AzureCli {} => AzureCliCredential::new(None)?,
276
277 Self::ManagedIdentity {
278 user_assigned_managed_identity_id,
279 } => {
280 let mut options = ManagedIdentityCredentialOptions::default();
281 if let Some(id) = user_assigned_managed_identity_id {
282 options.user_assigned_id = Some(UserAssignedId::ClientId(id.clone()));
283 }
284 ManagedIdentityCredential::new(Some(options))?
285 }
286
287 Self::ManagedIdentityClientAssertion {
288 user_assigned_managed_identity_id,
289 client_assertion_tenant_id,
290 client_assertion_client_id,
291 } => {
292 let mut options = ManagedIdentityCredentialOptions::default();
293 if let Some(id) = user_assigned_managed_identity_id {
294 options.user_assigned_id = Some(UserAssignedId::ClientId(id.clone()));
295 }
296 let msi: Arc<dyn TokenCredential> = ManagedIdentityCredential::new(Some(options))?;
297 let assertion = ManagedIdentityClientAssertion {
298 credential: msi,
299 scope: "api://AzureADTokenExchange/.default".to_string(),
301 };
302
303 ClientAssertionCredential::new(
304 client_assertion_tenant_id.clone(),
305 client_assertion_client_id.clone(),
306 assertion,
307 None,
308 )?
309 }
310
311 Self::WorkloadIdentity {} => WorkloadIdentityCredential::new(None)?,
312 };
313 Ok(credential)
314 }
315}
316
317impl AzureLogsIngestionConfig {
318 #[allow(clippy::too_many_arguments)]
319 pub(super) async fn build_inner(
320 &self,
321 cx: SinkContext,
322 endpoint: UriSerde,
323 dcr_immutable_id: String,
324 stream_name: String,
325 credential: Arc<dyn TokenCredential>,
326 token_scope: String,
327 timestamp_field: String,
328 ) -> crate::Result<(VectorSink, Healthcheck)> {
329 let endpoint = endpoint.with_default_parts().uri;
330 let protocol = get_http_scheme_from_uri(&endpoint).to_string();
331
332 let batch_settings = self
333 .batch
334 .validate()?
335 .limit_max_bytes(MAX_BATCH_SIZE)?
336 .into_batcher_settings()?;
337
338 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
339 let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
340
341 let service = AzureLogsIngestionService::new(
342 client,
343 endpoint,
344 dcr_immutable_id,
345 stream_name,
346 credential,
347 token_scope,
348 )?;
349 let healthcheck = service.healthcheck();
350
351 let retry_logic =
352 HttpStatusRetryLogic::new(|res: &AzureLogsIngestionResponse| res.http_status);
353 let request_settings = self.request.into_settings();
354 let service = ServiceBuilder::new()
355 .settings(request_settings, retry_logic)
356 .service(service);
357
358 let sink = AzureLogsIngestionSink::new(
359 batch_settings,
360 self.encoding.clone(),
361 service,
362 timestamp_field,
363 protocol,
364 );
365
366 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
367 }
368}
369
370impl_generate_config_from_default!(AzureLogsIngestionConfig);
371
372#[async_trait::async_trait]
373#[typetag::serde(name = "azure_logs_ingestion")]
374impl SinkConfig for AzureLogsIngestionConfig {
375 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
376 let endpoint: UriSerde = self.endpoint.parse()?;
377
378 let credential: Arc<dyn TokenCredential> = self.auth.credential().await?;
379
380 self.build_inner(
381 cx,
382 endpoint,
383 self.dcr_immutable_id.clone(),
384 self.stream_name.clone(),
385 credential,
386 self.token_scope.clone(),
387 self.timestamp_field.clone(),
388 )
389 .await
390 }
391
392 fn input(&self) -> Input {
393 let requirements =
394 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
395
396 Input::log().with_schema_requirement(requirements)
397 }
398
399 fn acknowledgements(&self) -> &AcknowledgementsConfig {
400 &self.acknowledgements
401 }
402}