vector/sinks/azure_monitor_logs/
config.rs1use openssl::{base64, pkey};
2use vector_lib::{
3    config::log_schema,
4    configurable::configurable_component,
5    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath},
6    schema,
7    sensitive_string::SensitiveString,
8};
9use vrl::value::Kind;
10
11use super::{
12    service::{AzureMonitorLogsResponse, AzureMonitorLogsService},
13    sink::AzureMonitorLogsSink,
14};
15use crate::{
16    http::{HttpClient, get_http_scheme_from_uri},
17    sinks::{
18        prelude::*,
19        util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, http::HttpStatusRetryLogic},
20    },
21};
22
23const MAX_BATCH_SIZE: usize = 30 * 1024 * 1024;
25
26pub(super) fn default_host() -> String {
27    "ods.opinsights.azure.com".into()
28}
29
30#[configurable_component(sink(
32    "azure_monitor_logs",
33    "Publish log events to the Azure Monitor Logs service."
34))]
35#[derive(Clone, Debug)]
36#[serde(deny_unknown_fields)]
37pub struct AzureMonitorLogsConfig {
38    #[configurable(metadata(docs::examples = "5ce893d9-2c32-4b6c-91a9-b0887c2de2d6"))]
42    #[configurable(metadata(docs::examples = "97ce69d9-b4be-4241-8dbd-d265edcf06c4"))]
43    pub customer_id: String,
44
45    #[configurable(metadata(
49        docs::examples = "SERsIYhgMVlJB6uPsq49gCxNiruf6v0vhMYE+lfzbSGcXjdViZdV/e5pEMTYtw9f8SkVLf4LFlLCc2KxtRZfCA=="
50    ))]
51    #[configurable(metadata(docs::examples = "${AZURE_MONITOR_SHARED_KEY_ENV_VAR}"))]
52    pub shared_key: SensitiveString,
53
54    #[configurable(validation(pattern = "[a-zA-Z0-9_]{1,100}"))]
60    #[configurable(metadata(docs::examples = "MyTableName"))]
61    #[configurable(metadata(docs::examples = "MyRecordType"))]
62    pub log_type: String,
63
64    #[configurable(metadata(
68        docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/otherResourceGroup/providers/Microsoft.Storage/storageAccounts/examplestorage"
69    ))]
70    #[configurable(metadata(
71        docs::examples = "/subscriptions/11111111-1111-1111-1111-111111111111/resourceGroups/examplegroup/providers/Microsoft.SQL/servers/serverName/databases/databaseName"
72    ))]
73    pub azure_resource_id: Option<String>,
74
75    #[configurable(metadata(docs::examples = "ods.opinsights.azure.us"))]
79    #[configurable(metadata(docs::examples = "ods.opinsights.azure.cn"))]
80    #[serde(default = "default_host")]
81    pub(super) host: String,
82
83    #[configurable(derived)]
84    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
85    pub encoding: Transformer,
86
87    #[configurable(derived)]
88    #[serde(default)]
89    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
90
91    #[configurable(derived)]
92    #[serde(default)]
93    pub request: TowerRequestConfig,
94
95    #[configurable(metadata(docs::examples = "time_generated"))]
104    pub time_generated_key: Option<OptionalValuePath>,
105
106    #[configurable(derived)]
107    pub tls: Option<TlsConfig>,
108
109    #[configurable(derived)]
110    #[serde(
111        default,
112        deserialize_with = "crate::serde::bool_or_struct",
113        skip_serializing_if = "crate::serde::is_default"
114    )]
115    pub acknowledgements: AcknowledgementsConfig,
116}
117
118impl Default for AzureMonitorLogsConfig {
119    fn default() -> Self {
120        Self {
121            customer_id: "my-customer-id".to_string(),
122            shared_key: Default::default(),
123            log_type: "MyRecordType".to_string(),
124            azure_resource_id: None,
125            host: default_host(),
126            encoding: Default::default(),
127            batch: Default::default(),
128            request: Default::default(),
129            time_generated_key: None,
130            tls: None,
131            acknowledgements: Default::default(),
132        }
133    }
134}
135
136impl AzureMonitorLogsConfig {
137    pub(super) fn build_shared_key(&self) -> crate::Result<pkey::PKey<pkey::Private>> {
138        if self.shared_key.inner().is_empty() {
139            return Err("shared_key cannot be an empty string".into());
140        }
141        let shared_key_bytes = base64::decode_block(self.shared_key.inner())?;
142        let shared_key = pkey::PKey::hmac(&shared_key_bytes)?;
143        Ok(shared_key)
144    }
145
146    fn get_time_generated_key(&self) -> Option<OwnedValuePath> {
147        self.time_generated_key
148            .clone()
149            .and_then(|k| k.path)
150            .or_else(|| log_schema().timestamp_key().cloned())
151    }
152
153    pub(super) async fn build_inner(
154        &self,
155        cx: SinkContext,
156        endpoint: UriSerde,
157    ) -> crate::Result<(VectorSink, Healthcheck)> {
158        let endpoint = endpoint.with_default_parts().uri;
159        let protocol = get_http_scheme_from_uri(&endpoint).to_string();
160
161        let batch_settings = self
162            .batch
163            .validate()?
164            .limit_max_bytes(MAX_BATCH_SIZE)?
165            .into_batcher_settings()?;
166
167        let shared_key = self.build_shared_key()?;
168        let time_generated_key = self.get_time_generated_key();
169
170        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
171        let client = HttpClient::new(Some(tls_settings), &cx.proxy)?;
172
173        let service = AzureMonitorLogsService::new(
174            client,
175            endpoint,
176            self.customer_id.clone(),
177            self.azure_resource_id.as_deref(),
178            &self.log_type,
179            time_generated_key.clone(),
180            shared_key,
181        )?;
182        let healthcheck = service.healthcheck();
183
184        let retry_logic =
185            HttpStatusRetryLogic::new(|res: &AzureMonitorLogsResponse| res.http_status);
186        let request_settings = self.request.into_settings();
187        let service = ServiceBuilder::new()
188            .settings(request_settings, retry_logic)
189            .service(service);
190
191        let sink = AzureMonitorLogsSink::new(
192            batch_settings,
193            self.encoding.clone(),
194            service,
195            time_generated_key,
196            protocol,
197        );
198
199        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
200    }
201}
202
203impl_generate_config_from_default!(AzureMonitorLogsConfig);
204
205#[async_trait::async_trait]
206#[typetag::serde(name = "azure_monitor_logs")]
207impl SinkConfig for AzureMonitorLogsConfig {
208    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
209        let endpoint = format!("https://{}.{}", self.customer_id, self.host).parse()?;
210        self.build_inner(cx, endpoint).await
211    }
212
213    fn input(&self) -> Input {
214        let requirements =
215            schema::Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
216
217        Input::log().with_schema_requirement(requirements)
218    }
219
220    fn acknowledgements(&self) -> &AcknowledgementsConfig {
221        &self.acknowledgements
222    }
223}