vector/sinks/azure_monitor_logs/
service.rs1use std::{
2 sync::LazyLock,
3 task::{Context, Poll},
4};
5
6use bytes::Bytes;
7use http::{
8 HeaderName, HeaderValue, Request, StatusCode, Uri,
9 header::{self, HeaderMap},
10};
11use hyper::Body;
12use openssl::{base64, hash, pkey, sign};
13use regex::Regex;
14use tracing::Instrument;
15use vector_lib::lookup::lookup_v2::OwnedValuePath;
16
17use crate::{http::HttpClient, sinks::prelude::*};
18
19static LOG_TYPE_REGEX: LazyLock<Regex> = LazyLock::new(|| Regex::new(r"^\w+$").unwrap());
20static LOG_TYPE_HEADER: LazyLock<HeaderName> =
21 LazyLock::new(|| HeaderName::from_static("log-type"));
22static X_MS_DATE_HEADER: LazyLock<HeaderName> =
23 LazyLock::new(|| HeaderName::from_static(X_MS_DATE));
24static X_MS_AZURE_RESOURCE_HEADER: LazyLock<HeaderName> =
25 LazyLock::new(|| HeaderName::from_static("x-ms-azureresourceid"));
26static TIME_GENERATED_FIELD_HEADER: LazyLock<HeaderName> =
27 LazyLock::new(|| HeaderName::from_static("time-generated-field"));
28static CONTENT_TYPE_VALUE: LazyLock<HeaderValue> =
29 LazyLock::new(|| HeaderValue::from_static(CONTENT_TYPE));
30
31const RESOURCE: &str = "/api/logs";
33const CONTENT_TYPE: &str = "application/json";
35const X_MS_DATE: &str = "x-ms-date";
37const SHARED_KEY: &str = "SharedKey";
39const API_VERSION: &str = "2016-04-01";
41
42#[derive(Debug, Clone)]
43pub struct AzureMonitorLogsRequest {
44 pub body: Bytes,
45 pub finalizers: EventFinalizers,
46 pub metadata: RequestMetadata,
47}
48
49impl MetaDescriptive for AzureMonitorLogsRequest {
50 fn get_metadata(&self) -> &RequestMetadata {
51 &self.metadata
52 }
53
54 fn metadata_mut(&mut self) -> &mut RequestMetadata {
55 &mut self.metadata
56 }
57}
58
59impl Finalizable for AzureMonitorLogsRequest {
60 fn take_finalizers(&mut self) -> EventFinalizers {
61 self.finalizers.take_finalizers()
62 }
63}
64
65pub struct AzureMonitorLogsResponse {
66 pub http_status: StatusCode,
67 pub events_byte_size: GroupedCountByteSize,
68 pub raw_byte_size: usize,
69}
70
71impl DriverResponse for AzureMonitorLogsResponse {
72 fn event_status(&self) -> EventStatus {
73 match self.http_status.is_success() {
74 true => EventStatus::Delivered,
75 false => EventStatus::Rejected,
76 }
77 }
78
79 fn events_sent(&self) -> &GroupedCountByteSize {
80 &self.events_byte_size
81 }
82
83 fn bytes_sent(&self) -> Option<usize> {
84 Some(self.raw_byte_size)
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct AzureMonitorLogsService {
91 client: HttpClient,
92 endpoint: Uri,
93 customer_id: String,
94 shared_key: pkey::PKey<pkey::Private>,
95 default_headers: HeaderMap,
96}
97
98impl AzureMonitorLogsService {
99 pub fn new(
101 client: HttpClient,
102 endpoint: Uri,
103 customer_id: String,
104 azure_resource_id: Option<&str>,
105 log_type: &str,
106 time_generated_key: Option<OwnedValuePath>,
107 shared_key: pkey::PKey<pkey::Private>,
108 ) -> crate::Result<Self> {
109 let mut parts = endpoint.into_parts();
110 parts.path_and_query = Some(
111 format!("{RESOURCE}?api-version={API_VERSION}")
112 .parse()
113 .expect("path and query should never fail to parse"),
114 );
115 let endpoint = Uri::from_parts(parts)?;
116
117 let default_headers = {
118 let mut headers = HeaderMap::new();
119
120 if log_type.len() > 100 || !LOG_TYPE_REGEX.is_match(log_type) {
121 return Err(format!(
122 "invalid log_type \"{log_type}\": log type can only contain letters, numbers, and underscore (_), and may not exceed 100 characters"
123 ).into());
124 }
125 let log_type = HeaderValue::from_str(log_type)?;
126 headers.insert(LOG_TYPE_HEADER.clone(), log_type);
127
128 if let Some(timestamp_key) = time_generated_key {
129 headers.insert(
130 TIME_GENERATED_FIELD_HEADER.clone(),
131 HeaderValue::try_from(timestamp_key.to_string())?,
132 );
133 }
134
135 if let Some(azure_resource_id) = azure_resource_id {
136 if azure_resource_id.is_empty() {
137 return Err("azure_resource_id can't be an empty string".into());
138 }
139 headers.insert(
140 X_MS_AZURE_RESOURCE_HEADER.clone(),
141 HeaderValue::from_str(azure_resource_id)?,
142 );
143 }
144
145 headers.insert(header::CONTENT_TYPE, CONTENT_TYPE_VALUE.clone());
146 headers
147 };
148
149 Ok(Self {
150 client,
151 endpoint,
152 customer_id,
153 shared_key,
154 default_headers,
155 })
156 }
157
158 fn build_authorization_header_value(
159 &self,
160 rfc1123date: &str,
161 len: usize,
162 ) -> crate::Result<String> {
163 let string_to_hash =
164 format!("POST\n{len}\n{CONTENT_TYPE}\n{X_MS_DATE}:{rfc1123date}\n{RESOURCE}");
165 let mut signer = sign::Signer::new(hash::MessageDigest::sha256(), &self.shared_key)?;
166 signer.update(string_to_hash.as_bytes())?;
167
168 let signature = signer.sign_to_vec()?;
169 let signature_base64 = base64::encode_block(&signature);
170
171 Ok(format!(
172 "{} {}:{}",
173 SHARED_KEY, self.customer_id, signature_base64
174 ))
175 }
176
177 fn build_request(&self, body: Bytes) -> crate::Result<Request<Body>> {
178 let len = body.len();
179
180 let mut request = Request::post(&self.endpoint).body(Body::from(body))?;
181
182 let rfc1123date = chrono::Utc::now()
183 .format("%a, %d %b %Y %H:%M:%S GMT")
184 .to_string();
185 let authorization = self.build_authorization_header_value(&rfc1123date, len)?;
186
187 *request.headers_mut() = self.default_headers.clone();
188 request
189 .headers_mut()
190 .insert(header::AUTHORIZATION, authorization.parse()?);
191 request
192 .headers_mut()
193 .insert(X_MS_DATE_HEADER.clone(), rfc1123date.parse()?);
194
195 Ok(request)
196 }
197
198 pub fn healthcheck(&self) -> Healthcheck {
199 let mut client = self.client.clone();
200 let request = self.build_request(Bytes::from("[]"));
201 Box::pin(async move {
202 let request = request?;
203 let res = client.call(request).in_current_span().await?;
204
205 if res.status().is_server_error() {
206 return Err("Server returned a server error".into());
207 }
208
209 if res.status() == StatusCode::FORBIDDEN {
210 return Err("The service failed to authenticate the request. Verify that the workspace ID and connection key are valid".into());
211 }
212
213 if res.status() == StatusCode::NOT_FOUND {
214 return Err(
215 "Either the URL provided is incorrect, or the request is too large".into(),
216 );
217 }
218
219 if res.status() == StatusCode::BAD_REQUEST {
220 return Err("The workspace has been closed or the request was invalid".into());
221 }
222
223 Ok(())
224 })
225 }
226}
227
228impl Service<AzureMonitorLogsRequest> for AzureMonitorLogsService {
229 type Response = AzureMonitorLogsResponse;
230 type Error = crate::Error;
231 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
232
233 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
235 Poll::Ready(Ok(()))
236 }
237
238 fn call(&mut self, request: AzureMonitorLogsRequest) -> Self::Future {
240 let mut client = self.client.clone();
241 let http_request = self.build_request(request.body);
242 Box::pin(async move {
243 let http_request = http_request?;
244 let response = client.call(http_request).in_current_span().await?;
245 Ok(AzureMonitorLogsResponse {
246 http_status: response.status(),
247 raw_byte_size: request.metadata.request_encoded_size(),
248 events_byte_size: request
249 .metadata
250 .into_events_estimated_json_encoded_byte_size(),
251 })
252 })
253 }
254}