vector/sinks/azure_monitor_logs/
service.rs

1use std::{
2    sync::LazyLock,
3    task::{Context, Poll},
4};
5
6use bytes::Bytes;
7use http::{
8    HeaderName, HeaderValue, Request, StatusCode, Uri,
9    header::{self, HeaderMap},
10};
11use hyper::Body;
12use openssl::{base64, hash, pkey, sign};
13use regex::Regex;
14use tracing::Instrument;
15use vector_lib::lookup::lookup_v2::OwnedValuePath;
16
17use crate::{http::HttpClient, sinks::prelude::*};
18
19static LOG_TYPE_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^\w+$").unwrap());
20static LOG_TYPE_HEADER: LazyLock<HeaderName> =
21    LazyLock::new(|| HeaderName::from_static("log-type"));
22static X_MS_DATE_HEADER: LazyLock<HeaderName> =
23    LazyLock::new(|| HeaderName::from_static(X_MS_DATE));
24static X_MS_AZURE_RESOURCE_HEADER: LazyLock<HeaderName> =
25    LazyLock::new(|| HeaderName::from_static("x-ms-azureresourceid"));
26static TIME_GENERATED_FIELD_HEADER: LazyLock<HeaderName> =
27    LazyLock::new(|| HeaderName::from_static("time-generated-field"));
28static CONTENT_TYPE_VALUE: LazyLock<HeaderValue> =
29    LazyLock::new(|| HeaderValue::from_static(CONTENT_TYPE));
30
31/// API endpoint for submitting logs
32const RESOURCE: &str = "/api/logs";
33/// JSON content type of logs
34const CONTENT_TYPE: &str = "application/json";
35/// Custom header used for signing logs
36const X_MS_DATE: &str = "x-ms-date";
37/// Shared key prefix
38const SHARED_KEY: &str = "SharedKey";
39/// API version
40const API_VERSION: &str = "2016-04-01";
41
42#[derive(Debug, Clone)]
43pub struct AzureMonitorLogsRequest {
44    pub body: Bytes,
45    pub finalizers: EventFinalizers,
46    pub metadata: RequestMetadata,
47}
48
49impl MetaDescriptive for AzureMonitorLogsRequest {
50    fn get_metadata(&self) -> &RequestMetadata {
51        &self.metadata
52    }
53
54    fn metadata_mut(&mut self) -> &mut RequestMetadata {
55        &mut self.metadata
56    }
57}
58
59impl Finalizable for AzureMonitorLogsRequest {
60    fn take_finalizers(&mut self) -> EventFinalizers {
61        self.finalizers.take_finalizers()
62    }
63}
64
65pub struct AzureMonitorLogsResponse {
66    pub http_status: StatusCode,
67    pub events_byte_size: GroupedCountByteSize,
68    pub raw_byte_size: usize,
69}
70
71impl DriverResponse for AzureMonitorLogsResponse {
72    fn event_status(&self) -> EventStatus {
73        match self.http_status.is_success() {
74            true => EventStatus::Delivered,
75            false => EventStatus::Rejected,
76        }
77    }
78
79    fn events_sent(&self) -> &GroupedCountByteSize {
80        &self.events_byte_size
81    }
82
83    fn bytes_sent(&self) -> Option<usize> {
84        Some(self.raw_byte_size)
85    }
86}
87
88/// `AzureMonitorLogsService` is a `Tower` service used to send logs to Azure.
89#[derive(Debug, Clone)]
90pub struct AzureMonitorLogsService {
91    client: HttpClient,
92    endpoint: Uri,
93    customer_id: String,
94    shared_key: pkey::PKey<pkey::Private>,
95    default_headers: HeaderMap,
96}
97
98impl AzureMonitorLogsService {
99    /// Creates a new `AzureMonitorLogsService`.
100    pub fn new(
101        client: HttpClient,
102        endpoint: Uri,
103        customer_id: String,
104        azure_resource_id: Option<&str>,
105        log_type: &str,
106        time_generated_key: Option<OwnedValuePath>,
107        shared_key: pkey::PKey<pkey::Private>,
108    ) -> crate::Result<Self> {
109        let mut parts = endpoint.into_parts();
110        parts.path_and_query = Some(
111            format!("{RESOURCE}?api-version={API_VERSION}")
112                .parse()
113                .expect("path and query should never fail to parse"),
114        );
115        let endpoint = Uri::from_parts(parts)?;
116
117        let default_headers = {
118            let mut headers = HeaderMap::new();
119
120            if log_type.len() > 100 || !LOG_TYPE_REGEX.is_match(log_type) {
121                return Err(format!(
122                "invalid log_type \"{log_type}\": log type can only contain letters, numbers, and underscore (_), and may not exceed 100 characters"
123            ).into());
124            }
125            let log_type = HeaderValue::from_str(log_type)?;
126            headers.insert(LOG_TYPE_HEADER.clone(), log_type);
127
128            if let Some(timestamp_key) = time_generated_key {
129                headers.insert(
130                    TIME_GENERATED_FIELD_HEADER.clone(),
131                    HeaderValue::try_from(timestamp_key.to_string())?,
132                );
133            }
134
135            if let Some(azure_resource_id) = azure_resource_id {
136                if azure_resource_id.is_empty() {
137                    return Err("azure_resource_id can't be an empty string".into());
138                }
139                headers.insert(
140                    X_MS_AZURE_RESOURCE_HEADER.clone(),
141                    HeaderValue::from_str(azure_resource_id)?,
142                );
143            }
144
145            headers.insert(header::CONTENT_TYPE, CONTENT_TYPE_VALUE.clone());
146            headers
147        };
148
149        Ok(Self {
150            client,
151            endpoint,
152            customer_id,
153            shared_key,
154            default_headers,
155        })
156    }
157
158    fn build_authorization_header_value(
159        &self,
160        rfc1123date: &str,
161        len: usize,
162    ) -> crate::Result<String> {
163        let string_to_hash =
164            format!("POST\n{len}\n{CONTENT_TYPE}\n{X_MS_DATE}:{rfc1123date}\n{RESOURCE}");
165        let mut signer = sign::Signer::new(hash::MessageDigest::sha256(), &self.shared_key)?;
166        signer.update(string_to_hash.as_bytes())?;
167
168        let signature = signer.sign_to_vec()?;
169        let signature_base64 = base64::encode_block(&signature);
170
171        Ok(format!(
172            "{} {}:{}",
173            SHARED_KEY, self.customer_id, signature_base64
174        ))
175    }
176
177    fn build_request(&self, body: Bytes) -> crate::Result<Request<Body>> {
178        let len = body.len();
179
180        let mut request = Request::post(&self.endpoint).body(Body::from(body))?;
181
182        let rfc1123date = chrono::Utc::now()
183            .format("%a, %d %b %Y %H:%M:%S GMT")
184            .to_string();
185        let authorization = self.build_authorization_header_value(&rfc1123date, len)?;
186
187        *request.headers_mut() = self.default_headers.clone();
188        request
189            .headers_mut()
190            .insert(header::AUTHORIZATION, authorization.parse()?);
191        request
192            .headers_mut()
193            .insert(X_MS_DATE_HEADER.clone(), rfc1123date.parse()?);
194
195        Ok(request)
196    }
197
198    pub fn healthcheck(&self) -> Healthcheck {
199        let mut client = self.client.clone();
200        let request = self.build_request(Bytes::from("[]"));
201        Box::pin(async move {
202            let request = request?;
203            let res = client.call(request).in_current_span().await?;
204
205            if res.status().is_server_error() {
206                return Err("Server returned a server error".into());
207            }
208
209            if res.status() == StatusCode::FORBIDDEN {
210                return Err("The service failed to authenticate the request. Verify that the workspace ID and connection key are valid".into());
211            }
212
213            if res.status() == StatusCode::NOT_FOUND {
214                return Err(
215                    "Either the URL provided is incorrect, or the request is too large".into(),
216                );
217            }
218
219            if res.status() == StatusCode::BAD_REQUEST {
220                return Err("The workspace has been closed or the request was invalid".into());
221            }
222
223            Ok(())
224        })
225    }
226}
227
228impl Service<AzureMonitorLogsRequest> for AzureMonitorLogsService {
229    type Response = AzureMonitorLogsResponse;
230    type Error = crate::Error;
231    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
232
233    // Emission of Error internal event is handled upstream by the caller.
234    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
235        Poll::Ready(Ok(()))
236    }
237
238    // Emission of Error internal event is handled upstream by the caller.
239    fn call(&mut self, request: AzureMonitorLogsRequest) -> Self::Future {
240        let mut client = self.client.clone();
241        let http_request = self.build_request(request.body);
242        Box::pin(async move {
243            let http_request = http_request?;
244            let response = client.call(http_request).in_current_span().await?;
245            Ok(AzureMonitorLogsResponse {
246                http_status: response.status(),
247                raw_byte_size: request.metadata.request_encoded_size(),
248                events_byte_size: request
249                    .metadata
250                    .into_events_estimated_json_encoded_byte_size(),
251            })
252        })
253    }
254}