vector/sinks/azure_monitor_logs/
config.rs1use openssl::{base64, pkey};
2use vector_lib::lookup::{lookup_v2::OptionalValuePath, OwnedValuePath};
3
4use vector_lib::configurable::configurable_component;
5use vector_lib::sensitive_string::SensitiveString;
6use vector_lib::{config::log_schema, schema};
7use vrl::value::Kind;
8
9use crate::{
10 http::{get_http_scheme_from_uri, HttpClient},
11 sinks::{
12 prelude::*,
13 util::{http::HttpStatusRetryLogic, RealtimeSizeBasedDefaultBatchSettings, UriSerde},
14 },
15};
16
17use super::{
18 service::{AzureMonitorLogsResponse, AzureMonitorLogsService},
19 sink::AzureMonitorLogsSink,
20};
21
22const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
24
25pub(super) fn default_host() -> String {
26 "ods.opinsights.azure.com".into()
27}
28
29#[configurable_component(sink(
31 "azure_monitor_logs",
32 "Publish log events to the Azure Monitor Logs service."
33))]
34#[derive(Clone, Debug)]
35#[serde(deny_unknown_fields)]
36pub struct AzureMonitorLogsConfig {
37 #[configurable(metadata(docs::examples = "5ce893d9-2c32-4b6c-91a9-b0887c2de2d6"))]
41 #[configurable(metadata(docs::examples = "97ce69d9-b4be-4241-8dbd-d265edcf06c4"))]
42 pub customer_id: String,
43
44 #[configurable(metadata(
48 docs::examples = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA=="
49 ))]
50 #[configurable(metadata(docs::examples = "${AZURE_MONITOR_SHARED_KEY_ENV_VAR}"))]
51 pub shared_key: SensitiveString,
52
53 #[configurable(validation(pattern = "[a-zA-Z0-9_]{1,100}"))]
59 #[configurable(metadata(docs::examples = "MyTableName"))]
60 #[configurable(metadata(docs::examples = "MyRecordType"))]
61 pub log_type: String,
62
63 #[configurable(metadata(
67 docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/otherResourceGroup/providers/Microsoft.Storage/storageAccounts/examplestorage"
68 ))]
69 #[configurable(metadata(
70 docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/examplegroup/providers/Microsoft.SQL/servers/serverName/databases/databaseName"
71 ))]
72 pub azure_resource_id: Option<String>,
73
74 #[configurable(metadata(docs::examples = "ods.opinsights.azure.us"))]
78 #[configurable(metadata(docs::examples = "ods.opinsights.azure.cn"))]
79 #[serde(default = "default_host")]
80 pub(super) host: String,
81
82 #[configurable(derived)]
83 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
84 pub encoding: Transformer,
85
86 #[configurable(derived)]
87 #[serde(default)]
88 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
89
90 #[configurable(derived)]
91 #[serde(default)]
92 pub request: TowerRequestConfig,
93
94 #[configurable(metadata(docs::examples = "time_generated"))]
103 pub time_generated_key: Option<OptionalValuePath>,
104
105 #[configurable(derived)]
106 pub tls: Option<TlsConfig>,
107
108 #[configurable(derived)]
109 #[serde(
110 default,
111 deserialize_with = "crate::serde::bool_or_struct",
112 skip_serializing_if = "crate::serde::is_default"
113 )]
114 pub acknowledgements: AcknowledgementsConfig,
115}
116
117impl Default for AzureMonitorLogsConfig {
118 fn default() -> Self {
119 Self {
120 customer_id: "my-customer-id".to_string(),
121 shared_key: Default::default(),
122 log_type: "MyRecordType".to_string(),
123 azure_resource_id: None,
124 host: default_host(),
125 encoding: Default::default(),
126 batch: Default::default(),
127 request: Default::default(),
128 time_generated_key: None,
129 tls: None,
130 acknowledgements: Default::default(),
131 }
132 }
133}
134
135impl AzureMonitorLogsConfig {
136 pub(super) fn build_shared_key(&self) -> crate::Result<pkey::PKey<pkey::Private>> {
137 if self.shared_key.inner().is_empty() {
138 return Err("shared_key cannot be an empty string".into());
139 }
140 let shared_key_bytes = base64::decode_block(self.shared_key.inner())?;
141 let shared_key = pkey::PKey::hmac(&shared_key_bytes)?;
142 Ok(shared_key)
143 }
144
145 fn get_time_generated_key(&self) -> Option<OwnedValuePath> {
146 self.time_generated_key
147 .clone()
148 .and_then(|k| k.path)
149 .or_else(|| log_schema().timestamp_key().cloned())
150 }
151
152 pub(super) async fn build_inner(
153 &self,
154 cx: SinkContext,
155 endpoint: UriSerde,
156 ) -> crate::Result<(VectorSink, Healthcheck)> {
157 let endpoint = endpoint.with_default_parts().uri;
158 let protocol = get_http_scheme_from_uri(&endpoint).to_string();
159
160 let batch_settings = self
161 .batch
162 .validate()?
163 .limit_max_bytes(MAX_BATCH_SIZE)?
164 .into_batcher_settings()?;
165
166 let shared_key = self.build_shared_key()?;
167 let time_generated_key = self.get_time_generated_key();
168
169 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
170 let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
171
172 let service = AzureMonitorLogsService::new(
173 client,
174 endpoint,
175 self.customer_id.clone(),
176 self.azure_resource_id.as_deref(),
177 &self.log_type,
178 time_generated_key.clone(),
179 shared_key,
180 )?;
181 let healthcheck = service.healthcheck();
182
183 let retry_logic =
184 HttpStatusRetryLogic::new(|res: &AzureMonitorLogsResponse| res.http_status);
185 let request_settings = self.request.into_settings();
186 let service = ServiceBuilder::new()
187 .settings(request_settings, retry_logic)
188 .service(service);
189
190 let sink = AzureMonitorLogsSink::new(
191 batch_settings,
192 self.encoding.clone(),
193 service,
194 time_generated_key,
195 protocol,
196 );
197
198 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
199 }
200}
201
202impl_generate_config_from_default!(AzureMonitorLogsConfig);
203
204#[async_trait::async_trait]
205#[typetag::serde(name = "azure_monitor_logs")]
206impl SinkConfig for AzureMonitorLogsConfig {
207 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
208 let endpoint = format!("https://{}.{}", self.customer_id, self.host).parse()?;
209 self.build_inner(cx, endpoint).await
210 }
211
212 fn input(&self) -> Input {
213 let requirements =
214 schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
215
216 Input::log().with_schema_requirement(requirements)
217 }
218
219 fn acknowledgements(&self) -> &AcknowledgementsConfig {
220 &self.acknowledgements
221 }
222}