vector/sinks/azure_monitor_logs/
config.rs

1use openssl::{base64, pkey};
2use vector_lib::lookup::{lookup_v2::OptionalValuePath, OwnedValuePath};
3
4use vector_lib::configurable::configurable_component;
5use vector_lib::sensitive_string::SensitiveString;
6use vector_lib::{config::log_schema, schema};
7use vrl::value::Kind;
8
9use crate::{
10    http::{get_http_scheme_from_uri, HttpClient},
11    sinks::{
12        prelude::*,
13        util::{http::HttpStatusRetryLogic, RealtimeSizeBasedDefaultBatchSettings, UriSerde},
14    },
15};
16
17use super::{
18    service::{AzureMonitorLogsResponse, AzureMonitorLogsService},
19    sink::AzureMonitorLogsSink,
20};
21
22/// Max number of bytes in request body
23const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
24
25pub(super) fn default_host() -> String {
26    "ods.opinsights.azure.com".into()
27}
28
29/// Configuration for the `azure_monitor_logs` sink.
30#[configurable_component(sink(
31    "azure_monitor_logs",
32    "Publish log events to the Azure Monitor Logs service."
33))]
34#[derive(Clone, Debug)]
35#[serde(deny_unknown_fields)]
36pub struct AzureMonitorLogsConfig {
37    /// The [unique identifier][uniq_id] for the Log Analytics workspace.
38    ///
39    /// [uniq_id]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-uri-parameters
40    #[configurable(metadata(docs::examples = "5ce893d9-2c32-4b6c-91a9-b0887c2de2d6"))]
41    #[configurable(metadata(docs::examples = "97ce69d9-b4be-4241-8dbd-d265edcf06c4"))]
42    pub customer_id: String,
43
44    /// The [primary or the secondary key][shared_key] for the Log Analytics workspace.
45    ///
46    /// [shared_key]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#authorization
47    #[configurable(metadata(
48        docs::examples = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA=="
49    ))]
50    #[configurable(metadata(docs::examples = "${AZURE_MONITOR_SHARED_KEY_ENV_VAR}"))]
51    pub shared_key: SensitiveString,
52
53    /// The [record type][record_type] of the data that is being submitted.
54    ///
55    /// Can only contain letters, numbers, and underscores (_), and may not exceed 100 characters.
56    ///
57    /// [record_type]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-headers
58    #[configurable(validation(pattern = "[a-zA-Z0-9_]{1,100}"))]
59    #[configurable(metadata(docs::examples = "MyTableName"))]
60    #[configurable(metadata(docs::examples = "MyRecordType"))]
61    pub log_type: String,
62
63    /// The [Resource ID][resource_id] of the Azure resource the data should be associated with.
64    ///
65    /// [resource_id]: https://docs.microsoft.com/en-us/azure/azure-monitor/platform/data-collector-api#request-headers
66    #[configurable(metadata(
67        docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/otherResourceGroup/providers/Microsoft.Storage/storageAccounts/examplestorage"
68    ))]
69    #[configurable(metadata(
70        docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/examplegroup/providers/Microsoft.SQL/servers/serverName/databases/databaseName"
71    ))]
72    pub azure_resource_id: Option<String>,
73
74    /// [Alternative host][alt_host] for dedicated Azure regions.
75    ///
76    /// [alt_host]: https://docs.azure.cn/en-us/articles/guidance/developerdifferences#check-endpoints-in-azure
77    #[configurable(metadata(docs::examples = "ods.opinsights.azure.us"))]
78    #[configurable(metadata(docs::examples = "ods.opinsights.azure.cn"))]
79    #[serde(default = "default_host")]
80    pub(super) host: String,
81
82    #[configurable(derived)]
83    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
84    pub encoding: Transformer,
85
86    #[configurable(derived)]
87    #[serde(default)]
88    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
89
90    #[configurable(derived)]
91    #[serde(default)]
92    pub request: TowerRequestConfig,
93
94    /// Use this option to customize the log field used as [`TimeGenerated`][1] in Azure.
95    ///
96    /// The setting of `log_schema.timestamp_key`, usually `timestamp`, is used here by default.
97    /// This field should be used in rare cases where `TimeGenerated` should point to a specific log
98    /// field. For example, use this field to set the log field `source_timestamp` as holding the
99    /// value that should be used as `TimeGenerated` on the Azure side.
100    ///
101    /// [1]: https://learn.microsoft.com/en-us/azure/azure-monitor/logs/log-standard-columns#timegenerated
102    #[configurable(metadata(docs::examples = "time_generated"))]
103    pub time_generated_key: Option<OptionalValuePath>,
104
105    #[configurable(derived)]
106    pub tls: Option<TlsConfig>,
107
108    #[configurable(derived)]
109    #[serde(
110        default,
111        deserialize_with = "crate::serde::bool_or_struct",
112        skip_serializing_if = "crate::serde::is_default"
113    )]
114    pub acknowledgements: AcknowledgementsConfig,
115}
116
117impl Default for AzureMonitorLogsConfig {
118    fn default() -> Self {
119        Self {
120            customer_id: "my-customer-id".to_string(),
121            shared_key: Default::default(),
122            log_type: "MyRecordType".to_string(),
123            azure_resource_id: None,
124            host: default_host(),
125            encoding: Default::default(),
126            batch: Default::default(),
127            request: Default::default(),
128            time_generated_key: None,
129            tls: None,
130            acknowledgements: Default::default(),
131        }
132    }
133}
134
135impl AzureMonitorLogsConfig {
136    pub(super) fn build_shared_key(&self) -> crate::Result<pkey::PKey<pkey::Private>> {
137        if self.shared_key.inner().is_empty() {
138            return Err("shared_key cannot be an empty string".into());
139        }
140        let shared_key_bytes = base64::decode_block(self.shared_key.inner())?;
141        let shared_key = pkey::PKey::hmac(&shared_key_bytes)?;
142        Ok(shared_key)
143    }
144
145    fn get_time_generated_key(&self) -> Option<OwnedValuePath> {
146        self.time_generated_key
147            .clone()
148            .and_then(|k| k.path)
149            .or_else(|| log_schema().timestamp_key().cloned())
150    }
151
152    pub(super) async fn build_inner(
153        &self,
154        cx: SinkContext,
155        endpoint: UriSerde,
156    ) -> crate::Result<(VectorSink, Healthcheck)> {
157        let endpoint = endpoint.with_default_parts().uri;
158        let protocol = get_http_scheme_from_uri(&endpoint).to_string();
159
160        let batch_settings = self
161            .batch
162            .validate()?
163            .limit_max_bytes(MAX_BATCH_SIZE)?
164            .into_batcher_settings()?;
165
166        let shared_key = self.build_shared_key()?;
167        let time_generated_key = self.get_time_generated_key();
168
169        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
170        let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
171
172        let service = AzureMonitorLogsService::new(
173            client,
174            endpoint,
175            self.customer_id.clone(),
176            self.azure_resource_id.as_deref(),
177            &self.log_type,
178            time_generated_key.clone(),
179            shared_key,
180        )?;
181        let healthcheck = service.healthcheck();
182
183        let retry_logic =
184            HttpStatusRetryLogic::new(|res: &AzureMonitorLogsResponse| res.http_status);
185        let request_settings = self.request.into_settings();
186        let service = ServiceBuilder::new()
187            .settings(request_settings, retry_logic)
188            .service(service);
189
190        let sink = AzureMonitorLogsSink::new(
191            batch_settings,
192            self.encoding.clone(),
193            service,
194            time_generated_key,
195            protocol,
196        );
197
198        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
199    }
200}
201
202impl_generate_config_from_default!(AzureMonitorLogsConfig);
203
204#[async_trait::async_trait]
205#[typetag::serde(name = "azure_monitor_logs")]
206impl SinkConfig for AzureMonitorLogsConfig {
207    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
208        let endpoint = format!("https://{}.{}", self.customer_id, self.host).parse()?;
209        self.build_inner(cx, endpoint).await
210    }
211
212    fn input(&self) -> Input {
213        let requirements =
214            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
215
216        Input::log().with_schema_requirement(requirements)
217    }
218
219    fn acknowledgements(&self) -> &AcknowledgementsConfig {
220        &self.acknowledgements
221    }
222}