vector/sinks/azure_monitor_logs/
config.rs

1use openssl::{base64, pkey};
2use vector_lib::{
3    config::log_schema,
4    configurable::configurable_component,
5    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath},
6    schema,
7    sensitive_string::SensitiveString,
8};
9use vrl::value::Kind;
10
11use super::{
12    service::{AzureMonitorLogsResponse, AzureMonitorLogsService},
13    sink::AzureMonitorLogsSink,
14};
15use crate::{
16    http::{HttpClient, get_http_scheme_from_uri},
17    sinks::{
18        prelude::*,
19        util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic},
20    },
21};
22
23/// Max number of bytes in request body
24const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
25
26pub(super) fn default_host() -> String {
27    "ods.opinsights.azure.com".into()
28}
29
30/// Configuration for the `azure_monitor_logs` sink.
31#[configurable_component(sink(
32    "azure_monitor_logs",
33    "Publish log events to the Azure Monitor Logs service."
34))]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields)]
37pub struct AzureMonitorLogsConfig {
38    /// The [unique identifier][uniq_id] for the Log Analytics workspace.
39    ///
40    /// [uniq_id]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-uri-parameters
41    #[configurable(metadata(docs::examples = "5ce893d9-2c32-4b6c-91a9-b0887c2de2d6"))]
42    #[configurable(metadata(docs::examples = "97ce69d9-b4be-4241-8dbd-d265edcf06c4"))]
43    pub customer_id: String,
44
45    /// The [primary or the secondary key][shared_key] for the Log Analytics workspace.
46    ///
47    /// [shared_key]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#authorization
48    #[configurable(metadata(
49        docs::examples = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA=="
50    ))]
51    #[configurable(metadata(docs::examples = "${AZURE_MONITOR_SHARED_KEY_ENV_VAR}"))]
52    pub shared_key: SensitiveString,
53
54    /// The [record type][record_type] of the data that is being submitted.
55    ///
56    /// Can only contain letters, numbers, and underscores (_), and may not exceed 100 characters.
57    ///
58    /// [record_type]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-headers
59    #[configurable(validation(pattern = "[a-zA-Z0-9_]{1,100}"))]
60    #[configurable(metadata(docs::examples = "MyTableName"))]
61    #[configurable(metadata(docs::examples = "MyRecordType"))]
62    pub log_type: String,
63
64    /// The [Resource ID][resource_id] of the Azure resource the data should be associated with.
65    ///
66    /// [resource_id]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-headers
67    #[configurable(metadata(
68        docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/otherResourceGroup/providers/Microsoft.Storage/storageAccounts/examplestorage"
69    ))]
70    #[configurable(metadata(
71        docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/examplegroup/providers/Microsoft.SQL/servers/serverName/databases/databaseName"
72    ))]
73    pub azure_resource_id: Option<String>,
74
75    /// [Alternative host][alt_host] for dedicated Azure regions.
76    ///
77    /// [alt_host]: https://docs.azure.cn/en-us/articles/guidance/developerdifferences#check-endpoints-in-azure
78    #[configurable(metadata(docs::examples = "ods.opinsights.azure.us"))]
79    #[configurable(metadata(docs::examples = "ods.opinsights.azure.cn"))]
80    #[serde(default = "default_host")]
81    pub(super) host: String,
82
83    #[configurable(derived)]
84    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
85    pub encoding: Transformer,
86
87    #[configurable(derived)]
88    #[serde(default)]
89    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
90
91    #[configurable(derived)]
92    #[serde(default)]
93    pub request: TowerRequestConfig,
94
95    /// Use this option to customize the log field used as [`TimeGenerated`][1] in Azure.
96    ///
97    /// The setting of `log_schema.timestamp_key`, usually `timestamp`, is used here by default.
98    /// This field should be used in rare cases where `TimeGenerated` should point to a specific log
99    /// field. For example, use this field to set the log field `source_timestamp` as holding the
100    /// value that should be used as `TimeGenerated` on the Azure side.
101    ///
102    /// [1]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated
103    #[configurable(metadata(docs::examples = "time_generated"))]
104    pub time_generated_key: Option<OptionalValuePath>,
105
106    #[configurable(derived)]
107    pub tls: Option<TlsConfig>,
108
109    #[configurable(derived)]
110    #[serde(
111        default,
112        deserialize_with = "crate::serde::bool_or_struct",
113        skip_serializing_if = "crate::serde::is_default"
114    )]
115    pub acknowledgements: AcknowledgementsConfig,
116}
117
118impl Default for AzureMonitorLogsConfig {
119    fn default() -> Self {
120        Self {
121            customer_id: "my-customer-id".to_string(),
122            shared_key: Default::default(),
123            log_type: "MyRecordType".to_string(),
124            azure_resource_id: None,
125            host: default_host(),
126            encoding: Default::default(),
127            batch: Default::default(),
128            request: Default::default(),
129            time_generated_key: None,
130            tls: None,
131            acknowledgements: Default::default(),
132        }
133    }
134}
135
136impl AzureMonitorLogsConfig {
137    pub(super) fn build_shared_key(&self) -> crate::Result<pkey::PKey<pkey::Private>> {
138        if self.shared_key.inner().is_empty() {
139            return Err("shared_key cannot be an empty string".into());
140        }
141        let shared_key_bytes = base64::decode_block(self.shared_key.inner())?;
142        let shared_key = pkey::PKey::hmac(&shared_key_bytes)?;
143        Ok(shared_key)
144    }
145
146    fn get_time_generated_key(&self) -> Option<OwnedValuePath> {
147        self.time_generated_key
148            .clone()
149            .and_then(|k| k.path)
150            .or_else(|| log_schema().timestamp_key().cloned())
151    }
152
153    pub(super) async fn build_inner(
154        &self,
155        cx: SinkContext,
156        endpoint: UriSerde,
157    ) -> crate::Result<(VectorSink, Healthcheck)> {
158        let endpoint = endpoint.with_default_parts().uri;
159        let protocol = get_http_scheme_from_uri(&endpoint).to_string();
160
161        let batch_settings = self
162            .batch
163            .validate()?
164            .limit_max_bytes(MAX_BATCH_SIZE)?
165            .into_batcher_settings()?;
166
167        let shared_key = self.build_shared_key()?;
168        let time_generated_key = self.get_time_generated_key();
169
170        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
171        let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
172
173        let service = AzureMonitorLogsService::new(
174            client,
175            endpoint,
176            self.customer_id.clone(),
177            self.azure_resource_id.as_deref(),
178            &self.log_type,
179            time_generated_key.clone(),
180            shared_key,
181        )?;
182        let healthcheck = service.healthcheck();
183
184        let retry_logic =
185            HttpStatusRetryLogic::new(|res: &AzureMonitorLogsResponse| res.http_status);
186        let request_settings = self.request.into_settings();
187        let service = ServiceBuilder::new()
188            .settings(request_settings, retry_logic)
189            .service(service);
190
191        let sink = AzureMonitorLogsSink::new(
192            batch_settings,
193            self.encoding.clone(),
194            service,
195            time_generated_key,
196            protocol,
197        );
198
199        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
200    }
201}
202
203impl_generate_config_from_default!(AzureMonitorLogsConfig);
204
205#[async_trait::async_trait]
206#[typetag::serde(name = "azure_monitor_logs")]
207impl SinkConfig for AzureMonitorLogsConfig {
208    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
209        let endpoint = format!("https://{}.{}", self.customer_id, self.host).parse()?;
210        self.build_inner(cx, endpoint).await
211    }
212
213    fn input(&self) -> Input {
214        let requirements =
215            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
216
217        Input::log().with_schema_requirement(requirements)
218    }
219
220    fn acknowledgements(&self) -> &AcknowledgementsConfig {
221        &self.acknowledgements
222    }
223}