vector/sinks/azure_logs_ingestion/
config.rs1use std::sync::Arc;
2
3use azure_core::credentials::TokenCredential;
4
5use vector_lib::{configurable::configurable_component, schema};
6use vrl::value::Kind;
7
8use crate::{
9 http::{HttpClient, get_http_scheme_from_uri},
10 sinks::{
11 azure_common::config::AzureAuthentication,
12 prelude::*,
13 util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic},
14 },
15};
16
17use super::{
18 service::{AzureLogsIngestionResponse, AzureLogsIngestionService},
19 sink::AzureLogsIngestionSink,
20};
21
22const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
24
25pub(super) fn default_scope() -> String {
26 "https://monitor.azure.com/.default".into()
27}
28
29pub(super) fn default_timestamp_field() -> String {
30 "TimeGenerated".into()
31}
32
33#[configurable_component(sink(
35 "azure_logs_ingestion",
36 "Publish log events to the Azure Monitor Logs Ingestion API."
37))]
38#[derive(Clone, Debug)]
39#[serde(deny_unknown_fields)]
40pub struct AzureLogsIngestionConfig {
41 #[configurable(metadata(
45 docs::examples = "https://my-dce-5kyl.eastus-1.ingest.monitor.azure.com"
46 ))]
47 pub endpoint: String,
48
49 #[configurable(metadata(docs::examples = "dcr-000a00a000a00000a000000aa000a0aa"))]
53 pub dcr_immutable_id: String,
54
55 #[configurable(metadata(docs::examples = "Custom-MyTable"))]
59 pub stream_name: String,
60
61 #[configurable(derived)]
62 pub auth: AzureAuthentication,
63
64 #[configurable(metadata(docs::examples = "https://monitor.azure.us/.default"))]
68 #[configurable(metadata(docs::examples = "https://monitor.azure.cn/.default"))]
69 #[serde(default = "default_scope")]
70 pub(super) token_scope: String,
71
72 #[configurable(metadata(docs::examples = "EventStartTime"))]
79 #[configurable(metadata(docs::examples = "Timestamp"))]
80 #[serde(default = "default_timestamp_field")]
81 pub timestamp_field: String,
82
83 #[configurable(derived)]
84 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
85 pub encoding: Transformer,
86
87 #[configurable(derived)]
88 #[serde(default)]
89 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
90
91 #[configurable(derived)]
92 #[serde(default)]
93 pub request: TowerRequestConfig,
94
95 #[configurable(derived)]
96 pub tls: Option<TlsConfig>,
97
98 #[configurable(derived)]
99 #[serde(
100 default,
101 deserialize_with = "crate::serde::bool_or_struct",
102 skip_serializing_if = "crate::serde::is_default"
103 )]
104 pub acknowledgements: AcknowledgementsConfig,
105}
106
107impl Default for AzureLogsIngestionConfig {
108 fn default() -> Self {
109 Self {
110 endpoint: Default::default(),
111 dcr_immutable_id: Default::default(),
112 stream_name: Default::default(),
113 auth: Default::default(),
114 token_scope: default_scope(),
115 timestamp_field: default_timestamp_field(),
116 encoding: Default::default(),
117 batch: Default::default(),
118 request: Default::default(),
119 tls: None,
120 acknowledgements: Default::default(),
121 }
122 }
123}
124
125impl AzureLogsIngestionConfig {
126 #[allow(clippy::too_many_arguments)]
127 pub(super) async fn build_inner(
128 &self,
129 cx: SinkContext,
130 endpoint: UriSerde,
131 dcr_immutable_id: String,
132 stream_name: String,
133 credential: Arc<dyn TokenCredential>,
134 token_scope: String,
135 timestamp_field: String,
136 ) -> crate::Result<(VectorSink, Healthcheck)> {
137 let endpoint = endpoint.with_default_parts().uri;
138 let protocol = get_http_scheme_from_uri(&endpoint).to_string();
139
140 let batch_settings = self
141 .batch
142 .validate()?
143 .limit_max_bytes(MAX_BATCH_SIZE)?
144 .into_batcher_settings()?;
145
146 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
147 let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
148
149 let service = AzureLogsIngestionService::new(
150 client,
151 endpoint,
152 dcr_immutable_id,
153 stream_name,
154 credential,
155 token_scope,
156 )?;
157 let healthcheck = service.healthcheck();
158
159 let retry_logic =
160 HttpStatusRetryLogic::new(|res: &AzureLogsIngestionResponse| res.http_status);
161 let request_settings = self.request.into_settings();
162 let service = ServiceBuilder::new()
163 .settings(request_settings, retry_logic)
164 .service(service);
165
166 let sink = AzureLogsIngestionSink::new(
167 batch_settings,
168 self.encoding.clone(),
169 service,
170 timestamp_field,
171 protocol,
172 );
173
174 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
175 }
176}
177
178impl_generate_config_from_default!(AzureLogsIngestionConfig);
179
180#[async_trait::async_trait]
181#[typetag::serde(name = "azure_logs_ingestion")]
182impl SinkConfig for AzureLogsIngestionConfig {
183 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
184 let endpoint: UriSerde = self.endpoint.parse()?;
185
186 let credential: Arc<dyn TokenCredential> = self.auth.credential().await?;
187
188 self.build_inner(
189 cx,
190 endpoint,
191 self.dcr_immutable_id.clone(),
192 self.stream_name.clone(),
193 credential,
194 self.token_scope.clone(),
195 self.timestamp_field.clone(),
196 )
197 .await
198 }
199
200 fn input(&self) -> Input {
201 let requirements =
202 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
203
204 Input::log().with_schema_requirement(requirements)
205 }
206
207 fn acknowledgements(&self) -> &AcknowledgementsConfig {
208 &self.acknowledgements
209 }
210}