vector/sinks/azure_monitor_logs/
config.rs1use openssl::{base64, pkey};
2use vector_lib::{
3 config::log_schema,
4 configurable::configurable_component,
5 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath},
6 schema,
7 sensitive_string::SensitiveString,
8};
9use vrl::value::Kind;
10
11use super::{
12 service::{AzureMonitorLogsResponse, AzureMonitorLogsService},
13 sink::AzureMonitorLogsSink,
14};
15use crate::{
16 http::{HttpClient, get_http_scheme_from_uri},
17 sinks::{
18 prelude::*,
19 util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic},
20 },
21};
22
23const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
25
26pub(super) fn default_host() -> String {
27 "ods.opinsights.azure.com".into()
28}
29
30#[configurable_component(sink(
32 "azure_monitor_logs",
33 "Publish log events to the Azure Monitor Logs service."
34))]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields)]
37pub struct AzureMonitorLogsConfig {
38 #[configurable(metadata(docs::examples = "5ce893d9-2c32-4b6c-91a9-b0887c2de2d6"))]
42 #[configurable(metadata(docs::examples = "97ce69d9-b4be-4241-8dbd-d265edcf06c4"))]
43 pub customer_id: String,
44
45 #[configurable(metadata(
49 docs::examples = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA=="
50 ))]
51 #[configurable(metadata(docs::examples = "${AZURE_MONITOR_SHARED_KEY_ENV_VAR}"))]
52 pub shared_key: SensitiveString,
53
54 #[configurable(validation(pattern = "[a-zA-Z0-9_]{1,100}"))]
60 #[configurable(metadata(docs::examples = "MyTableName"))]
61 #[configurable(metadata(docs::examples = "MyRecordType"))]
62 pub log_type: String,
63
64 #[configurable(metadata(
68 docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/otherResourceGroup/providers/Microsoft.Storage/storageAccounts/examplestorage"
69 ))]
70 #[configurable(metadata(
71 docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/examplegroup/providers/Microsoft.SQL/servers/serverName/databases/databaseName"
72 ))]
73 pub azure_resource_id: Option<String>,
74
75 #[configurable(metadata(docs::examples = "ods.opinsights.azure.us"))]
79 #[configurable(metadata(docs::examples = "ods.opinsights.azure.cn"))]
80 #[serde(default = "default_host")]
81 pub(super) host: String,
82
83 #[configurable(derived)]
84 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
85 pub encoding: Transformer,
86
87 #[configurable(derived)]
88 #[serde(default)]
89 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
90
91 #[configurable(derived)]
92 #[serde(default)]
93 pub request: TowerRequestConfig,
94
95 #[configurable(metadata(docs::examples = "time_generated"))]
104 pub time_generated_key: Option<OptionalValuePath>,
105
106 #[configurable(derived)]
107 pub tls: Option<TlsConfig>,
108
109 #[configurable(derived)]
110 #[serde(
111 default,
112 deserialize_with = "crate::serde::bool_or_struct",
113 skip_serializing_if = "crate::serde::is_default"
114 )]
115 pub acknowledgements: AcknowledgementsConfig,
116}
117
118impl Default for AzureMonitorLogsConfig {
119 fn default() -> Self {
120 Self {
121 customer_id: "my-customer-id".to_string(),
122 shared_key: Default::default(),
123 log_type: "MyRecordType".to_string(),
124 azure_resource_id: None,
125 host: default_host(),
126 encoding: Default::default(),
127 batch: Default::default(),
128 request: Default::default(),
129 time_generated_key: None,
130 tls: None,
131 acknowledgements: Default::default(),
132 }
133 }
134}
135
136impl AzureMonitorLogsConfig {
137 pub(super) fn build_shared_key(&self) -> crate::Result<pkey::PKey<pkey::Private>> {
138 if self.shared_key.inner().is_empty() {
139 return Err("shared_key cannot be an empty string".into());
140 }
141 let shared_key_bytes = base64::decode_block(self.shared_key.inner())?;
142 let shared_key = pkey::PKey::hmac(&shared_key_bytes)?;
143 Ok(shared_key)
144 }
145
146 fn get_time_generated_key(&self) -> Option<OwnedValuePath> {
147 self.time_generated_key
148 .clone()
149 .and_then(|k| k.path)
150 .or_else(|| log_schema().timestamp_key().cloned())
151 }
152
153 pub(super) async fn build_inner(
154 &self,
155 cx: SinkContext,
156 endpoint: UriSerde,
157 ) -> crate::Result<(VectorSink, Healthcheck)> {
158 let endpoint = endpoint.with_default_parts().uri;
159 let protocol = get_http_scheme_from_uri(&endpoint).to_string();
160
161 let batch_settings = self
162 .batch
163 .validate()?
164 .limit_max_bytes(MAX_BATCH_SIZE)?
165 .into_batcher_settings()?;
166
167 let shared_key = self.build_shared_key()?;
168 let time_generated_key = self.get_time_generated_key();
169
170 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
171 let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
172
173 let service = AzureMonitorLogsService::new(
174 client,
175 endpoint,
176 self.customer_id.clone(),
177 self.azure_resource_id.as_deref(),
178 &self.log_type,
179 time_generated_key.clone(),
180 shared_key,
181 )?;
182 let healthcheck = service.healthcheck();
183
184 let retry_logic =
185 HttpStatusRetryLogic::new(|res: &AzureMonitorLogsResponse| res.http_status);
186 let request_settings = self.request.into_settings();
187 let service = ServiceBuilder::new()
188 .settings(request_settings, retry_logic)
189 .service(service);
190
191 let sink = AzureMonitorLogsSink::new(
192 batch_settings,
193 self.encoding.clone(),
194 service,
195 time_generated_key,
196 protocol,
197 );
198
199 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
200 }
201}
202
203impl_generate_config_from_default!(AzureMonitorLogsConfig);
204
205#[async_trait::async_trait]
206#[typetag::serde(name = "azure_monitor_logs")]
207impl SinkConfig for AzureMonitorLogsConfig {
208 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
209 let endpoint = format!("https://{}.{}", self.customer_id, self.host).parse()?;
210 self.build_inner(cx, endpoint).await
211 }
212
213 fn input(&self) -> Input {
214 let requirements =
215 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
216
217 Input::log().with_schema_requirement(requirements)
218 }
219
220 fn acknowledgements(&self) -> &AcknowledgementsConfig {
221 &self.acknowledgements
222 }
223}