vector/sinks/azure_logs_ingestion/
config.rs

1use std::sync::Arc;
2
3use azure_core::credentials::TokenCredential;
4
5use vector_lib::{configurable::configurable_component, schema};
6use vrl::value::Kind;
7
8use crate::{
9    http::{HttpClient, get_http_scheme_from_uri},
10    sinks::{
11        azure_common::config::AzureAuthentication,
12        prelude::*,
13        util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic},
14    },
15};
16
17use super::{
18    service::{AzureLogsIngestionResponse, AzureLogsIngestionService},
19    sink::AzureLogsIngestionSink,
20};
21
22/// Max number of bytes in request body
23const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
24
25pub(super) fn default_scope() -> String {
26    "https://monitor.azure.com/.default".into()
27}
28
29pub(super) fn default_timestamp_field() -> String {
30    "TimeGenerated".into()
31}
32
33/// Configuration for the `azure_logs_ingestion` sink.
34#[configurable_component(sink(
35    "azure_logs_ingestion",
36    "Publish log events to the Azure Monitor Logs Ingestion API."
37))]
38#[derive(Clone, Debug)]
39#[serde(deny_unknown_fields)]
40pub struct AzureLogsIngestionConfig {
41    /// The [Data collection endpoint URI][endpoint] associated with the Log Analytics workspace.
42    ///
43    /// [endpoint]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
44    #[configurable(metadata(
45        docs::examples = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"
46    ))]
47    pub endpoint: String,
48
49    /// The [Data collection rule immutable ID][dcr_immutable_id] for the Data collection endpoint.
50    ///
51    /// [dcr_immutable_id]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
52    #[configurable(metadata(docs::examples = "dcr-000a00a000a00000a000000aa000a0aa"))]
53    pub dcr_immutable_id: String,
54
55    /// The [Stream name][stream_name] for the Data collection rule.
56    ///
57    /// [stream_name]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
58    #[configurable(metadata(docs::examples = "Custom-MyTable"))]
59    pub stream_name: String,
60
61    #[configurable(derived)]
62    pub auth: AzureAuthentication,
63
64    /// [Token scope][token_scope] for dedicated Azure regions.
65    ///
66    /// [token_scope]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/logs-ingestion-api-overview
67    #[configurable(metadata(docs::examples = "https://monitor.azure.us/.default"))]
68    #[configurable(metadata(docs::examples = "https://monitor.azure.cn/.default"))]
69    #[serde(default = "default_scope")]
70    pub(super) token_scope: String,
71
72    /// The destination field (column) for the timestamp.
73    ///
74    /// The setting of `log_schema.timestamp_key`, usually `timestamp`, is used as the source.
75    /// Most schemas use `TimeGenerated`, but some use `Timestamp` (legacy) or `EventStartTime` (ASIM) [std_columns].
76    ///
77    /// [std_columns]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated
78    #[configurable(metadata(docs::examples = "EventStartTime"))]
79    #[configurable(metadata(docs::examples = "Timestamp"))]
80    #[serde(default = "default_timestamp_field")]
81    pub timestamp_field: String,
82
83    #[configurable(derived)]
84    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
85    pub encoding: Transformer,
86
87    #[configurable(derived)]
88    #[serde(default)]
89    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
90
91    #[configurable(derived)]
92    #[serde(default)]
93    pub request: TowerRequestConfig,
94
95    #[configurable(derived)]
96    pub tls: Option<TlsConfig>,
97
98    #[configurable(derived)]
99    #[serde(
100        default,
101        deserialize_with = "crate::serde::bool_or_struct",
102        skip_serializing_if = "crate::serde::is_default"
103    )]
104    pub acknowledgements: AcknowledgementsConfig,
105}
106
107impl Default for AzureLogsIngestionConfig {
108    fn default() -> Self {
109        Self {
110            endpoint: Default::default(),
111            dcr_immutable_id: Default::default(),
112            stream_name: Default::default(),
113            auth: Default::default(),
114            token_scope: default_scope(),
115            timestamp_field: default_timestamp_field(),
116            encoding: Default::default(),
117            batch: Default::default(),
118            request: Default::default(),
119            tls: None,
120            acknowledgements: Default::default(),
121        }
122    }
123}
124
125impl AzureLogsIngestionConfig {
126    #[allow(clippy::too_many_arguments)]
127    pub(super) async fn build_inner(
128        &self,
129        cx: SinkContext,
130        endpoint: UriSerde,
131        dcr_immutable_id: String,
132        stream_name: String,
133        credential: Arc<dyn TokenCredential>,
134        token_scope: String,
135        timestamp_field: String,
136    ) -> crate::Result<(VectorSink, Healthcheck)> {
137        let endpoint = endpoint.with_default_parts().uri;
138        let protocol = get_http_scheme_from_uri(&endpoint).to_string();
139
140        let batch_settings = self
141            .batch
142            .validate()?
143            .limit_max_bytes(MAX_BATCH_SIZE)?
144            .into_batcher_settings()?;
145
146        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
147        let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
148
149        let service = AzureLogsIngestionService::new(
150            client,
151            endpoint,
152            dcr_immutable_id,
153            stream_name,
154            credential,
155            token_scope,
156        )?;
157        let healthcheck = service.healthcheck();
158
159        let retry_logic =
160            HttpStatusRetryLogic::new(|res: &AzureLogsIngestionResponse| res.http_status);
161        let request_settings = self.request.into_settings();
162        let service = ServiceBuilder::new()
163            .settings(request_settings, retry_logic)
164            .service(service);
165
166        let sink = AzureLogsIngestionSink::new(
167            batch_settings,
168            self.encoding.clone(),
169            service,
170            timestamp_field,
171            protocol,
172        );
173
174        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
175    }
176}
177
178impl_generate_config_from_default!(AzureLogsIngestionConfig);
179
180#[async_trait::async_trait]
181#[typetag::serde(name = "azure_logs_ingestion")]
182impl SinkConfig for AzureLogsIngestionConfig {
183    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
184        let endpoint: UriSerde = self.endpoint.parse()?;
185
186        let credential: Arc<dyn TokenCredential> = self.auth.credential().await?;
187
188        self.build_inner(
189            cx,
190            endpoint,
191            self.dcr_immutable_id.clone(),
192            self.stream_name.clone(),
193            credential,
194            self.token_scope.clone(),
195            self.timestamp_field.clone(),
196        )
197        .await
198    }
199
200    fn input(&self) -> Input {
201        let requirements =
202            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
203
204        Input::log().with_schema_requirement(requirements)
205    }
206
207    fn acknowledgements(&self) -> &AcknowledgementsConfig {
208        &self.acknowledgements
209    }
210}