vector/sinks/azure_monitor_logs/
service.rs

1use std::sync::LazyLock;
2use std::task::{Context, Poll};
3
4use bytes::Bytes;
5use http::{
6    header::{self, HeaderMap},
7    HeaderName, HeaderValue, Request, StatusCode, Uri,
8};
9use hyper::Body;
10use openssl::{base64, hash, pkey, sign};
11use regex::Regex;
12use tracing::Instrument;
13use vector_lib::lookup::lookup_v2::OwnedValuePath;
14
15use crate::{http::HttpClient, sinks::prelude::*};
16
17static LOG_TYPE_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^\w+$").unwrap());
18static LOG_TYPE_HEADER: LazyLock<HeaderName> =
19    LazyLock::new(|| HeaderName::from_static("log-type"));
20static X_MS_DATE_HEADER: LazyLock<HeaderName> =
21    LazyLock::new(|| HeaderName::from_static(X_MS_DATE));
22static X_MS_AZURE_RESOURCE_HEADER: LazyLock<HeaderName> =
23    LazyLock::new(|| HeaderName::from_static("x-ms-azureresourceid"));
24static TIME_GENERATED_FIELD_HEADER: LazyLock<HeaderName> =
25    LazyLock::new(|| HeaderName::from_static("time-generated-field"));
26static CONTENT_TYPE_VALUE: LazyLock<HeaderValue> =
27    LazyLock::new(|| HeaderValue::from_static(CONTENT_TYPE));
28
29/// API endpoint for submitting logs
30const RESOURCE: &str = "/api/logs";
31/// JSON content type of logs
32const CONTENT_TYPE: &str = "application/json";
33/// Custom header used for signing logs
34const X_MS_DATE: &str = "x-ms-date";
35/// Shared key prefix
36const SHARED_KEY: &str = "SharedKey";
37/// API version
38const API_VERSION: &str = "2016-04-01";
39
40#[derive(Debug, Clone)]
41pub struct AzureMonitorLogsRequest {
42    pub body: Bytes,
43    pub finalizers: EventFinalizers,
44    pub metadata: RequestMetadata,
45}
46
47impl MetaDescriptive for AzureMonitorLogsRequest {
48    fn get_metadata(&self) -> &RequestMetadata {
49        &self.metadata
50    }
51
52    fn metadata_mut(&mut self) -> &mut RequestMetadata {
53        &mut self.metadata
54    }
55}
56
57impl Finalizable for AzureMonitorLogsRequest {
58    fn take_finalizers(&mut self) -> EventFinalizers {
59        self.finalizers.take_finalizers()
60    }
61}
62
63pub struct AzureMonitorLogsResponse {
64    pub http_status: StatusCode,
65    pub events_byte_size: GroupedCountByteSize,
66    pub raw_byte_size: usize,
67}
68
69impl DriverResponse for AzureMonitorLogsResponse {
70    fn event_status(&self) -> EventStatus {
71        match self.http_status.is_success() {
72            true => EventStatus::Delivered,
73            false => EventStatus::Rejected,
74        }
75    }
76
77    fn events_sent(&self) -> &GroupedCountByteSize {
78        &self.events_byte_size
79    }
80
81    fn bytes_sent(&self) -> Option<usize> {
82        Some(self.raw_byte_size)
83    }
84}
85
86/// `AzureMonitorLogsService` is a `Tower` service used to send logs to Azure.
87#[derive(Debug, Clone)]
88pub struct AzureMonitorLogsService {
89    client: HttpClient,
90    endpoint: Uri,
91    customer_id: String,
92    shared_key: pkey::PKey<pkey::Private>,
93    default_headers: HeaderMap,
94}
95
96impl AzureMonitorLogsService {
97    /// Creates a new `AzureMonitorLogsService`.
98    pub fn new(
99        client: HttpClient,
100        endpoint: Uri,
101        customer_id: String,
102        azure_resource_id: Option<&str>,
103        log_type: &str,
104        time_generated_key: Option<OwnedValuePath>,
105        shared_key: pkey::PKey<pkey::Private>,
106    ) -> crate::Result<Self> {
107        let mut parts = endpoint.into_parts();
108        parts.path_and_query = Some(
109            format!("{RESOURCE}?api-version={API_VERSION}")
110                .parse()
111                .expect("path and query should never fail to parse"),
112        );
113        let endpoint = Uri::from_parts(parts)?;
114
115        let default_headers = {
116            let mut headers = HeaderMap::new();
117
118            if log_type.len() > 100 || !LOG_TYPE_REGEX.is_match(log_type) {
119                return Err(format!(
120                "invalid log_type \"{log_type}\": log type can only contain letters, numbers, and underscore (_), and may not exceed 100 characters"
121            ).into());
122            }
123            let log_type = HeaderValue::from_str(log_type)?;
124            headers.insert(LOG_TYPE_HEADER.clone(), log_type);
125
126            if let Some(timestamp_key) = time_generated_key {
127                headers.insert(
128                    TIME_GENERATED_FIELD_HEADER.clone(),
129                    HeaderValue::try_from(timestamp_key.to_string())?,
130                );
131            }
132
133            if let Some(azure_resource_id) = azure_resource_id {
134                if azure_resource_id.is_empty() {
135                    return Err("azure_resource_id can't be an empty string".into());
136                }
137                headers.insert(
138                    X_MS_AZURE_RESOURCE_HEADER.clone(),
139                    HeaderValue::from_str(azure_resource_id)?,
140                );
141            }
142
143            headers.insert(header::CONTENT_TYPE, CONTENT_TYPE_VALUE.clone());
144            headers
145        };
146
147        Ok(Self {
148            client,
149            endpoint,
150            customer_id,
151            shared_key,
152            default_headers,
153        })
154    }
155
156    fn build_authorization_header_value(
157        &self,
158        rfc1123date: &str,
159        len: usize,
160    ) -> crate::Result<String> {
161        let string_to_hash =
162            format!("POST\n{len}\n{CONTENT_TYPE}\n{X_MS_DATE}:{rfc1123date}\n{RESOURCE}");
163        let mut signer = sign::Signer::new(hash::MessageDigest::sha256(), &self.shared_key)?;
164        signer.update(string_to_hash.as_bytes())?;
165
166        let signature = signer.sign_to_vec()?;
167        let signature_base64 = base64::encode_block(&signature);
168
169        Ok(format!(
170            "{} {}:{}",
171            SHARED_KEY, self.customer_id, signature_base64
172        ))
173    }
174
175    fn build_request(&self, body: Bytes) -> crate::Result<Request<Body>> {
176        let len = body.len();
177
178        let mut request = Request::post(&self.endpoint).body(Body::from(body))?;
179
180        let rfc1123date = chrono::Utc::now()
181            .format("%a, %d %b %Y %H:%M:%S GMT")
182            .to_string();
183        let authorization = self.build_authorization_header_value(&rfc1123date, len)?;
184
185        *request.headers_mut() = self.default_headers.clone();
186        request
187            .headers_mut()
188            .insert(header::AUTHORIZATION, authorization.parse()?);
189        request
190            .headers_mut()
191            .insert(X_MS_DATE_HEADER.clone(), rfc1123date.parse()?);
192
193        Ok(request)
194    }
195
196    pub fn healthcheck(&self) -> Healthcheck {
197        let mut client = self.client.clone();
198        let request = self.build_request(Bytes::from("[]"));
199        Box::pin(async move {
200            let request = request?;
201            let res = client.call(request).in_current_span().await?;
202
203            if res.status().is_server_error() {
204                return Err("Server returned a server error".into());
205            }
206
207            if res.status() == StatusCode::FORBIDDEN {
208                return Err("The service failed to authenticate the request. Verify that the workspace ID and connection key are valid".into());
209            }
210
211            if res.status() == StatusCode::NOT_FOUND {
212                return Err(
213                    "Either the URL provided is incorrect, or the request is too large".into(),
214                );
215            }
216
217            if res.status() == StatusCode::BAD_REQUEST {
218                return Err("The workspace has been closed or the request was invalid".into());
219            }
220
221            Ok(())
222        })
223    }
224}
225
226impl Service<AzureMonitorLogsRequest> for AzureMonitorLogsService {
227    type Response = AzureMonitorLogsResponse;
228    type Error = crate::Error;
229    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
230
231    // Emission of Error internal event is handled upstream by the caller.
232    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
233        Poll::Ready(Ok(()))
234    }
235
236    // Emission of Error internal event is handled upstream by the caller.
237    fn call(&mut self, request: AzureMonitorLogsRequest) -> Self::Future {
238        let mut client = self.client.clone();
239        let http_request = self.build_request(request.body);
240        Box::pin(async move {
241            let http_request = http_request?;
242            let response = client.call(http_request).in_current_span().await?;
243            Ok(AzureMonitorLogsResponse {
244                http_status: response.status(),
245                raw_byte_size: request.metadata.request_encoded_size(),
246                events_byte_size: request
247                    .metadata
248                    .into_events_estimated_json_encoded_byte_size(),
249            })
250        })
251    }
252}