vector/sinks/azure_monitor_logs/
service.rs1use std::sync::LazyLock;
2use std::task::{Context, Poll};
3
4use bytes::Bytes;
5use http::{
6 header::{self, HeaderMap},
7 HeaderName, HeaderValue, Request, StatusCode, Uri,
8};
9use hyper::Body;
10use openssl::{base64, hash, pkey, sign};
11use regex::Regex;
12use tracing::Instrument;
13use vector_lib::lookup::lookup_v2::OwnedValuePath;
14
15use crate::{http::HttpClient, sinks::prelude::*};
16
17static LOG_TYPE_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^\w+$").unwrap());
18static LOG_TYPE_HEADER: LazyLock<HeaderName> =
19 LazyLock::new(|| HeaderName::from_static("log-type"));
20static X_MS_DATE_HEADER: LazyLock<HeaderName> =
21 LazyLock::new(|| HeaderName::from_static(X_MS_DATE));
22static X_MS_AZURE_RESOURCE_HEADER: LazyLock<HeaderName> =
23 LazyLock::new(|| HeaderName::from_static("x-ms-azureresourceid"));
24static TIME_GENERATED_FIELD_HEADER: LazyLock<HeaderName> =
25 LazyLock::new(|| HeaderName::from_static("time-generated-field"));
26static CONTENT_TYPE_VALUE: LazyLock<HeaderValue> =
27 LazyLock::new(|| HeaderValue::from_static(CONTENT_TYPE));
28
29const RESOURCE: &str = "/api/logs";
31const CONTENT_TYPE: &str = "application/json";
33const X_MS_DATE: &str = "x-ms-date";
35const SHARED_KEY: &str = "SharedKey";
37const API_VERSION: &str = "2016-04-01";
39
40#[derive(Debug, Clone)]
41pub struct AzureMonitorLogsRequest {
42 pub body: Bytes,
43 pub finalizers: EventFinalizers,
44 pub metadata: RequestMetadata,
45}
46
47impl MetaDescriptive for AzureMonitorLogsRequest {
48 fn get_metadata(&self) -> &RequestMetadata {
49 &self.metadata
50 }
51
52 fn metadata_mut(&mut self) -> &mut RequestMetadata {
53 &mut self.metadata
54 }
55}
56
57impl Finalizable for AzureMonitorLogsRequest {
58 fn take_finalizers(&mut self) -> EventFinalizers {
59 self.finalizers.take_finalizers()
60 }
61}
62
63pub struct AzureMonitorLogsResponse {
64 pub http_status: StatusCode,
65 pub events_byte_size: GroupedCountByteSize,
66 pub raw_byte_size: usize,
67}
68
69impl DriverResponse for AzureMonitorLogsResponse {
70 fn event_status(&self) -> EventStatus {
71 match self.http_status.is_success() {
72 true => EventStatus::Delivered,
73 false => EventStatus::Rejected,
74 }
75 }
76
77 fn events_sent(&self) -> &GroupedCountByteSize {
78 &self.events_byte_size
79 }
80
81 fn bytes_sent(&self) -> Option<usize> {
82 Some(self.raw_byte_size)
83 }
84}
85
86#[derive(Debug, Clone)]
88pub struct AzureMonitorLogsService {
89 client: HttpClient,
90 endpoint: Uri,
91 customer_id: String,
92 shared_key: pkey::PKey<pkey::Private>,
93 default_headers: HeaderMap,
94}
95
96impl AzureMonitorLogsService {
97 pub fn new(
99 client: HttpClient,
100 endpoint: Uri,
101 customer_id: String,
102 azure_resource_id: Option<&str>,
103 log_type: &str,
104 time_generated_key: Option<OwnedValuePath>,
105 shared_key: pkey::PKey<pkey::Private>,
106 ) -> crate::Result<Self> {
107 let mut parts = endpoint.into_parts();
108 parts.path_and_query = Some(
109 format!("{RESOURCE}?api-version={API_VERSION}")
110 .parse()
111 .expect("path and query should never fail to parse"),
112 );
113 let endpoint = Uri::from_parts(parts)?;
114
115 let default_headers = {
116 let mut headers = HeaderMap::new();
117
118 if log_type.len() > 100 || !LOG_TYPE_REGEX.is_match(log_type) {
119 return Err(format!(
120 "invalid log_type \"{log_type}\": log type can only contain letters, numbers, and underscore (_), and may not exceed 100 characters"
121 ).into());
122 }
123 let log_type = HeaderValue::from_str(log_type)?;
124 headers.insert(LOG_TYPE_HEADER.clone(), log_type);
125
126 if let Some(timestamp_key) = time_generated_key {
127 headers.insert(
128 TIME_GENERATED_FIELD_HEADER.clone(),
129 HeaderValue::try_from(timestamp_key.to_string())?,
130 );
131 }
132
133 if let Some(azure_resource_id) = azure_resource_id {
134 if azure_resource_id.is_empty() {
135 return Err("azure_resource_id can't be an empty string".into());
136 }
137 headers.insert(
138 X_MS_AZURE_RESOURCE_HEADER.clone(),
139 HeaderValue::from_str(azure_resource_id)?,
140 );
141 }
142
143 headers.insert(header::CONTENT_TYPE, CONTENT_TYPE_VALUE.clone());
144 headers
145 };
146
147 Ok(Self {
148 client,
149 endpoint,
150 customer_id,
151 shared_key,
152 default_headers,
153 })
154 }
155
156 fn build_authorization_header_value(
157 &self,
158 rfc1123date: &str,
159 len: usize,
160 ) -> crate::Result<String> {
161 let string_to_hash =
162 format!("POST\n{len}\n{CONTENT_TYPE}\n{X_MS_DATE}:{rfc1123date}\n{RESOURCE}");
163 let mut signer = sign::Signer::new(hash::MessageDigest::sha256(), &self.shared_key)?;
164 signer.update(string_to_hash.as_bytes())?;
165
166 let signature = signer.sign_to_vec()?;
167 let signature_base64 = base64::encode_block(&signature);
168
169 Ok(format!(
170 "{} {}:{}",
171 SHARED_KEY, self.customer_id, signature_base64
172 ))
173 }
174
175 fn build_request(&self, body: Bytes) -> crate::Result<Request<Body>> {
176 let len = body.len();
177
178 let mut request = Request::post(&self.endpoint).body(Body::from(body))?;
179
180 let rfc1123date = chrono::Utc::now()
181 .format("%a, %d %b %Y %H:%M:%S GMT")
182 .to_string();
183 let authorization = self.build_authorization_header_value(&rfc1123date, len)?;
184
185 *request.headers_mut() = self.default_headers.clone();
186 request
187 .headers_mut()
188 .insert(header::AUTHORIZATION, authorization.parse()?);
189 request
190 .headers_mut()
191 .insert(X_MS_DATE_HEADER.clone(), rfc1123date.parse()?);
192
193 Ok(request)
194 }
195
196 pub fn healthcheck(&self) -> Healthcheck {
197 let mut client = self.client.clone();
198 let request = self.build_request(Bytes::from("[]"));
199 Box::pin(async move {
200 let request = request?;
201 let res = client.call(request).in_current_span().await?;
202
203 if res.status().is_server_error() {
204 return Err("Server returned a server error".into());
205 }
206
207 if res.status() == StatusCode::FORBIDDEN {
208 return Err("The service failed to authenticate the request. Verify that the workspace ID and connection key are valid".into());
209 }
210
211 if res.status() == StatusCode::NOT_FOUND {
212 return Err(
213 "Either the URL provided is incorrect, or the request is too large".into(),
214 );
215 }
216
217 if res.status() == StatusCode::BAD_REQUEST {
218 return Err("The workspace has been closed or the request was invalid".into());
219 }
220
221 Ok(())
222 })
223 }
224}
225
226impl Service<AzureMonitorLogsRequest> for AzureMonitorLogsService {
227 type Response = AzureMonitorLogsResponse;
228 type Error = crate::Error;
229 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
230
231 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
233 Poll::Ready(Ok(()))
234 }
235
236 fn call(&mut self, request: AzureMonitorLogsRequest) -> Self::Future {
238 let mut client = self.client.clone();
239 let http_request = self.build_request(request.body);
240 Box::pin(async move {
241 let http_request = http_request?;
242 let response = client.call(http_request).in_current_span().await?;
243 Ok(AzureMonitorLogsResponse {
244 http_status: response.status(),
245 raw_byte_size: request.metadata.request_encoded_size(),
246 events_byte_size: request
247 .metadata
248 .into_events_estimated_json_encoded_byte_size(),
249 })
250 })
251 }
252}