vector/sinks/azure_logs_ingestion/
config.rs

1use std::sync::Arc;
2
3use azure_core::credentials::{TokenCredential, TokenRequestOptions};
4use azure_core::http::ClientMethodOptions;
5use azure_core::{Error, error::ErrorKind};
6
7use azure_identity::{
8    AzureCliCredential, ClientAssertion, ClientAssertionCredential, ClientSecretCredential,
9    ManagedIdentityCredential, ManagedIdentityCredentialOptions, UserAssignedId,
10    WorkloadIdentityCredential,
11};
12use vector_lib::{configurable::configurable_component, schema, sensitive_string::SensitiveString};
13use vrl::value::Kind;
14
15use crate::{
16    http::{HttpClient, get_http_scheme_from_uri},
17    sinks::{
18        prelude::*,
19        util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic},
20    },
21};
22
23use super::{
24    service::{AzureLogsIngestionResponse, AzureLogsIngestionService},
25    sink::AzureLogsIngestionSink,
26};
27
28/// Max number of bytes in request body
29const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
30
31pub(super) fn default_scope() -> String {
32    "https://monitor.azure.com/.default".into()
33}
34
35pub(super) fn default_timestamp_field() -> String {
36    "TimeGenerated".into()
37}
38
39/// Configuration for the `azure_logs_ingestion` sink.
40#[configurable_component(sink(
41    "azure_logs_ingestion",
42    "Publish log events to the Azure Monitor Logs Ingestion API."
43))]
44#[derive(Clone, Debug)]
45#[serde(deny_unknown_fields)]
46pub struct AzureLogsIngestionConfig {
47    /// The [Data collection endpoint URI][endpoint] associated with the Log Analytics workspace.
48    ///
49    /// [endpoint]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
50    #[configurable(metadata(
51        docs::examples = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"
52    ))]
53    pub endpoint: String,
54
55    /// The [Data collection rule immutable ID][dcr_immutable_id] for the Data collection endpoint.
56    ///
57    /// [dcr_immutable_id]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
58    #[configurable(metadata(docs::examples = "dcr-000a00a000a00000a000000aa000a0aa"))]
59    pub dcr_immutable_id: String,
60
61    /// The [Stream name][stream_name] for the Data collection rule.
62    ///
63    /// [stream_name]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
64    #[configurable(metadata(docs::examples = "Custom-MyTable"))]
65    pub stream_name: String,
66
67    #[configurable(derived)]
68    #[serde(default)]
69    pub auth: AzureAuthentication,
70
71    /// [Token scope][token_scope] for dedicated Azure regions.
72    ///
73    /// [token_scope]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
74    #[configurable(metadata(docs::examples = "https://monitor.azure.us/.default"))]
75    #[configurable(metadata(docs::examples = "https://monitor.azure.cn/.default"))]
76    #[serde(default = "default_scope")]
77    pub(super) token_scope: String,
78
79    /// The destination field (column) for the timestamp.
80    ///
81    /// The setting of `log_schema.timestamp_key`, usually `timestamp`, is used as the source.
82    /// Most schemas use `TimeGenerated`, but some use `Timestamp` (legacy) or `EventStartTime` (ASIM) [std_columns].
83    ///
84    /// [std_columns]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated
85    #[configurable(metadata(docs::examples = "EventStartTime"))]
86    #[configurable(metadata(docs::examples = "Timestamp"))]
87    #[serde(default = "default_timestamp_field")]
88    pub timestamp_field: String,
89
90    #[configurable(derived)]
91    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
92    pub encoding: Transformer,
93
94    #[configurable(derived)]
95    #[serde(default)]
96    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
97
98    #[configurable(derived)]
99    #[serde(default)]
100    pub request: TowerRequestConfig,
101
102    #[configurable(derived)]
103    pub tls: Option<TlsConfig>,
104
105    #[configurable(derived)]
106    #[serde(
107        default,
108        deserialize_with = "crate::serde::bool_or_struct",
109        skip_serializing_if = "crate::serde::is_default"
110    )]
111    pub acknowledgements: AcknowledgementsConfig,
112}
113
114impl Default for AzureLogsIngestionConfig {
115    fn default() -> Self {
116        Self {
117            endpoint: Default::default(),
118            dcr_immutable_id: Default::default(),
119            stream_name: Default::default(),
120            auth: Default::default(),
121            token_scope: default_scope(),
122            timestamp_field: default_timestamp_field(),
123            encoding: Default::default(),
124            batch: Default::default(),
125            request: Default::default(),
126            tls: None,
127            acknowledgements: Default::default(),
128        }
129    }
130}
131
132/// Configuration of the authentication strategy for interacting with Azure services.
133#[configurable_component]
134#[derive(Clone, Debug, Derivative, Eq, PartialEq)]
135#[derivative(Default)]
136#[serde(deny_unknown_fields, untagged)]
137pub enum AzureAuthentication {
138    /// Use client credentials
139    #[derivative(Default)]
140    ClientSecretCredential {
141        /// The [Azure Tenant ID][azure_tenant_id].
142        ///
143        /// [azure_tenant_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
144        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
145        azure_tenant_id: String,
146
147        /// The [Azure Client ID][azure_client_id].
148        ///
149        /// [azure_client_id]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
150        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
151        azure_client_id: String,
152
153        /// The [Azure Client Secret][azure_client_secret].
154        ///
155        /// [azure_client_secret]: https://learn.microsoft.com/entra/identity-platform/howto-create-service-principal-portal
156        #[configurable(metadata(docs::examples = "00-00~000000-0000000~0000000000000000000"))]
157        azure_client_secret: SensitiveString,
158    },
159
160    /// Use credentials from environment variables
161    #[configurable(metadata(docs::enum_tag_description = "The kind of Azure credential to use."))]
162    Specific(SpecificAzureCredential),
163}
164
165/// Specific Azure credential types.
166#[configurable_component]
167#[derive(Clone, Debug, Eq, PartialEq)]
168#[serde(
169    tag = "azure_credential_kind",
170    rename_all = "snake_case",
171    deny_unknown_fields
172)]
173pub enum SpecificAzureCredential {
174    /// Use Azure CLI credentials
175    #[cfg(not(target_arch = "wasm32"))]
176    AzureCli {},
177
178    /// Use Managed Identity credentials
179    ManagedIdentity {
180        /// The User Assigned Managed Identity (Client ID) to use.
181        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
182        #[serde(default, skip_serializing_if = "Option::is_none")]
183        user_assigned_managed_identity_id: Option<String>,
184    },
185
186    /// Use Managed Identity with Client Assertion credentials
187    ManagedIdentityClientAssertion {
188        /// The User Assigned Managed Identity (Client ID) to use for the managed identity.
189        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
190        #[serde(default, skip_serializing_if = "Option::is_none")]
191        user_assigned_managed_identity_id: Option<String>,
192
193        /// The target Tenant ID to use.
194        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
195        client_assertion_tenant_id: String,
196
197        /// The target Client ID to use.
198        #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
199        client_assertion_client_id: String,
200    },
201
202    /// Use Workload Identity credentials
203    WorkloadIdentity {},
204}
205
206#[derive(Debug)]
207struct ManagedIdentityClientAssertion {
208    credential: Arc<dyn TokenCredential>,
209    scope: String,
210}
211
212#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
213#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
214impl ClientAssertion for ManagedIdentityClientAssertion {
215    async fn secret(&self, options: Option<ClientMethodOptions<'_>>) -> azure_core::Result<String> {
216        Ok(self
217            .credential
218            .get_token(
219                &[&self.scope],
220                Some(TokenRequestOptions {
221                    method_options: options.unwrap_or_default(),
222                }),
223            )
224            .await?
225            .token
226            .secret()
227            .to_string())
228    }
229}
230
231impl AzureAuthentication {
232    /// Returns the provider for the credentials based on the authentication mechanism chosen.
233    pub async fn credential(&self) -> azure_core::Result<Arc<dyn TokenCredential>> {
234        match self {
235            Self::ClientSecretCredential {
236                azure_tenant_id,
237                azure_client_id,
238                azure_client_secret,
239            } => {
240                if azure_tenant_id.is_empty() {
241                    return Err(Error::with_message(ErrorKind::Credential,
242                        "`auth.azure_tenant_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
243                    ));
244                }
245                if azure_client_id.is_empty() {
246                    return Err(Error::with_message(ErrorKind::Credential,
247                        "`auth.azure_client_id` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
248                    ));
249                }
250                if azure_client_secret.inner().is_empty() {
251                    return Err(Error::with_message(ErrorKind::Credential,
252                        "`auth.azure_client_secret` is blank; either use `auth.azure_credential_kind`, or provide tenant ID, client ID, and secret.".to_string()
253                    ));
254                }
255                let secret: String = azure_client_secret.inner().into();
256                let credential: Arc<dyn TokenCredential> = ClientSecretCredential::new(
257                    &azure_tenant_id.clone(),
258                    azure_client_id.clone(),
259                    secret.into(),
260                    None,
261                )?;
262                Ok(credential)
263            }
264
265            Self::Specific(specific) => specific.credential().await,
266        }
267    }
268}
269
270impl SpecificAzureCredential {
271    /// Returns the provider for the credentials based on the specific credential type.
272    pub async fn credential(&self) -> azure_core::Result<Arc<dyn TokenCredential>> {
273        let credential: Arc<dyn TokenCredential> = match self {
274            #[cfg(not(target_arch = "wasm32"))]
275            Self::AzureCli {} => AzureCliCredential::new(None)?,
276
277            Self::ManagedIdentity {
278                user_assigned_managed_identity_id,
279            } => {
280                let mut options = ManagedIdentityCredentialOptions::default();
281                if let Some(id) = user_assigned_managed_identity_id {
282                    options.user_assigned_id = Some(UserAssignedId::ClientId(id.clone()));
283                }
284                ManagedIdentityCredential::new(Some(options))?
285            }
286
287            Self::ManagedIdentityClientAssertion {
288                user_assigned_managed_identity_id,
289                client_assertion_tenant_id,
290                client_assertion_client_id,
291            } => {
292                let mut options = ManagedIdentityCredentialOptions::default();
293                if let Some(id) = user_assigned_managed_identity_id {
294                    options.user_assigned_id = Some(UserAssignedId::ClientId(id.clone()));
295                }
296                let msi: Arc<dyn TokenCredential> = ManagedIdentityCredential::new(Some(options))?;
297                let assertion = ManagedIdentityClientAssertion {
298                    credential: msi,
299                    // Future: make this configurable for sovereign clouds? (no way to test...)
300                    scope: "api://AzureADTokenExchange/.default".to_string(),
301                };
302
303                ClientAssertionCredential::new(
304                    client_assertion_tenant_id.clone(),
305                    client_assertion_client_id.clone(),
306                    assertion,
307                    None,
308                )?
309            }
310
311            Self::WorkloadIdentity {} => WorkloadIdentityCredential::new(None)?,
312        };
313        Ok(credential)
314    }
315}
316
317impl AzureLogsIngestionConfig {
318    #[allow(clippy::too_many_arguments)]
319    pub(super) async fn build_inner(
320        &self,
321        cx: SinkContext,
322        endpoint: UriSerde,
323        dcr_immutable_id: String,
324        stream_name: String,
325        credential: Arc<dyn TokenCredential>,
326        token_scope: String,
327        timestamp_field: String,
328    ) -> crate::Result<(VectorSink, Healthcheck)> {
329        let endpoint = endpoint.with_default_parts().uri;
330        let protocol = get_http_scheme_from_uri(&endpoint).to_string();
331
332        let batch_settings = self
333            .batch
334            .validate()?
335            .limit_max_bytes(MAX_BATCH_SIZE)?
336            .into_batcher_settings()?;
337
338        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
339        let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
340
341        let service = AzureLogsIngestionService::new(
342            client,
343            endpoint,
344            dcr_immutable_id,
345            stream_name,
346            credential,
347            token_scope,
348        )?;
349        let healthcheck = service.healthcheck();
350
351        let retry_logic =
352            HttpStatusRetryLogic::new(|res: &AzureLogsIngestionResponse| res.http_status);
353        let request_settings = self.request.into_settings();
354        let service = ServiceBuilder::new()
355            .settings(request_settings, retry_logic)
356            .service(service);
357
358        let sink = AzureLogsIngestionSink::new(
359            batch_settings,
360            self.encoding.clone(),
361            service,
362            timestamp_field,
363            protocol,
364        );
365
366        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
367    }
368}
369
370impl_generate_config_from_default!(AzureLogsIngestionConfig);
371
372#[async_trait::async_trait]
373#[typetag::serde(name = "azure_logs_ingestion")]
374impl SinkConfig for AzureLogsIngestionConfig {
375    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
376        let endpoint: UriSerde = self.endpoint.parse()?;
377
378        let credential: Arc<dyn TokenCredential> = self.auth.credential().await?;
379
380        self.build_inner(
381            cx,
382            endpoint,
383            self.dcr_immutable_id.clone(),
384            self.stream_name.clone(),
385            credential,
386            self.token_scope.clone(),
387            self.timestamp_field.clone(),
388        )
389        .await
390    }
391
392    fn input(&self) -> Input {
393        let requirements =
394            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
395
396        Input::log().with_schema_requirement(requirements)
397    }
398
399    fn acknowledgements(&self) -> &AcknowledgementsConfig {
400        &self.acknowledgements
401    }
402}