vector/sinks/azure_common/
connection_string.rs1use std::collections::HashMap;
50
51#[derive(Debug, Clone)]
53pub enum ConnectionStringError {
54 InvalidFormat(&'static str),
55 InvalidPair(String),
56 MissingAccountName,
57 MissingEndpoint,
58}
59
60impl std::fmt::Display for ConnectionStringError {
61 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62 match self {
63 ConnectionStringError::InvalidFormat(msg) => write!(f, "invalid format: {msg}"),
64 ConnectionStringError::InvalidPair(p) => write!(f, "invalid key=value pair: {p}"),
65 ConnectionStringError::MissingAccountName => write!(f, "account name is required"),
66 ConnectionStringError::MissingEndpoint => {
67 write!(f, "could not determine Blob endpoint")
68 }
69 }
70 }
71}
72
73impl std::error::Error for ConnectionStringError {}
74
75#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum Auth {
78 SharedKey {
80 account_name: String,
81 account_key: String, },
83 Sas { query: String },
85 None,
87}
88
89#[derive(Debug, Clone, Default)]
91pub struct ParsedConnectionString {
92 pub account_name: Option<String>,
93 pub account_key: Option<String>,
94 pub shared_access_signature: Option<String>,
95 pub default_endpoints_protocol: Option<String>,
96 pub endpoint_suffix: Option<String>,
97 pub blob_endpoint: Option<String>,
98 pub use_development_storage: bool,
99 pub development_storage_proxy_uri: Option<String>,
100}
101
102impl ParsedConnectionString {
103 pub fn parse(s: &str) -> Result<Self, ConnectionStringError> {
107 let mut map: HashMap<String, String> = HashMap::new();
108
109 for seg in s.split(';') {
110 let seg = seg.trim();
111 if seg.is_empty() {
112 continue;
113 }
114 let (k, v) = seg
115 .split_once('=')
116 .ok_or_else(|| ConnectionStringError::InvalidPair(seg.to_string()))?;
117 let key = k.trim().to_ascii_lowercase();
118 let value = v.trim().to_string();
119 map.insert(key, value);
120 }
121
122 let parsed = ParsedConnectionString {
124 account_name: map.get("accountname").cloned(),
125 account_key: map.get("accountkey").cloned(),
126 shared_access_signature: map
127 .get("sharedaccesssignature")
128 .map(|s| normalize_sas(s.as_str())),
129 default_endpoints_protocol: map
130 .get("defaultendpointsprotocol")
131 .map(|s| s.to_ascii_lowercase()),
132 endpoint_suffix: map.get("endpointsuffix").cloned(),
133 blob_endpoint: map.get("blobendpoint").cloned(),
134 use_development_storage: map
135 .get("usedevelopmentstorage")
136 .map(|v| v.eq_ignore_ascii_case("true"))
137 .unwrap_or(false),
138 development_storage_proxy_uri: map.get("developmentstorageproxyuri").cloned(),
139 };
140
141 Ok(parsed)
142 }
143
144 pub fn auth(&self) -> Auth {
146 if let (Some(name), Some(key)) = (self.account_name.as_ref(), self.account_key.as_ref()) {
147 return Auth::SharedKey {
148 account_name: name.clone(),
149 account_key: key.clone(),
150 };
151 }
152 if let Some(sas) = self.shared_access_signature.as_ref() {
153 return Auth::Sas { query: sas.clone() };
154 }
155 Auth::None
156 }
157
158 pub fn default_protocol(&self) -> String {
162 if let Some(p) = self.default_endpoints_protocol.as_deref() {
163 match p {
164 "http" | "https" => p.to_string(),
165 _ => {
166 if self.use_development_storage {
168 "http".to_string()
169 } else {
170 "https".to_string()
171 }
172 }
173 }
174 } else if self.use_development_storage {
175 "http".to_string()
176 } else {
177 "https".to_string()
178 }
179 }
180
181 pub fn endpoint_suffix(&self) -> String {
183 self.endpoint_suffix
184 .clone()
185 .unwrap_or_else(|| "core.windows.net".to_string())
186 }
187
188 pub fn blob_account_endpoint(&self) -> Result<String, ConnectionStringError> {
196 if let Some(explicit) = self.blob_endpoint.as_ref() {
197 return Ok(explicit.clone());
198 }
199
200 let account_name = self
201 .account_name
202 .as_ref()
203 .ok_or(ConnectionStringError::MissingAccountName)?;
204
205 let proto = self.default_protocol();
206
207 if self.use_development_storage {
208 let host = self
210 .development_storage_proxy_uri
211 .as_deref()
212 .map(|s| s.trim_end_matches('/').to_string())
213 .unwrap_or_else(|| "127.0.0.1:10000".to_string());
214
215 let base = if host.starts_with("http://") || host.starts_with("https://") {
216 format!("{}/{}", trim_trailing_slash(&host), account_name)
217 } else {
218 format!("{proto}://{host}/{}", account_name)
219 };
220 return Ok(base);
221 }
222
223 let suffix = self.endpoint_suffix();
225 Ok(format!("{proto}://{}.blob.{}", account_name, suffix))
226 }
227
228 pub fn container_url(&self, container: &str) -> Result<String, ConnectionStringError> {
230 let base = self.blob_account_endpoint()?;
231 Ok(append_query_segment(
232 &format!("{}/{}", trim_trailing_slash(&base), container),
233 self.shared_access_signature.as_deref(),
234 ))
235 }
236
237 pub fn blob_url(&self, container: &str, blob: &str) -> Result<String, ConnectionStringError> {
239 let base = self.blob_account_endpoint()?;
242 let container_no_sas = format!("{}/{}", trim_trailing_slash(&base), container);
243 let blob_full = format!(
244 "{}/{}",
245 trim_trailing_slash(&container_no_sas),
246 encode_path_segment(blob)
247 );
248 Ok(append_query_segment(
249 &blob_full,
250 self.shared_access_signature.as_deref(),
251 ))
252 }
253}
254
255fn normalize_sas(s: &str) -> String {
257 s.trim_start_matches('?').to_string()
258}
259
260fn append_query_segment(base_url: &str, sas: Option<&str>) -> String {
262 match sas {
263 None => base_url.to_string(),
264 Some("") => base_url.to_string(),
265 Some(q) => {
266 let sep = if base_url.contains('?') { '&' } else { '?' };
267 format!("{base_url}{sep}{q}")
268 }
269 }
270}
271
272fn trim_trailing_slash(s: &str) -> String {
274 if let Some(stripped) = s.strip_suffix('/') {
275 stripped.to_string()
276 } else {
277 s.to_string()
278 }
279}
280
281fn encode_path_segment(seg: &str) -> String {
285 seg.replace(' ', "%20")
286}
287
288#[cfg(test)]
289mod tests {
290 use super::*;
291
292 #[test]
293 fn parse_access_key_public_cloud() {
294 let cs = "DefaultEndpointsProtocol=https;AccountName=myacct;AccountKey=base64==;EndpointSuffix=core.windows.net";
295 let parsed = ParsedConnectionString::parse(cs).unwrap();
296 assert_eq!(parsed.account_name.as_deref(), Some("myacct"));
297 assert_eq!(parsed.account_key.as_deref(), Some("base64=="));
298 assert!(parsed.shared_access_signature.is_none());
299 assert_eq!(parsed.default_protocol(), "https");
300 assert_eq!(parsed.endpoint_suffix(), "core.windows.net");
301
302 let base = parsed.blob_account_endpoint().unwrap();
303 assert_eq!(base, "https://myacct.blob.core.windows.net");
304
305 let container_url = parsed.container_url("logs").unwrap();
306 assert_eq!(container_url, "https://myacct.blob.core.windows.net/logs");
307
308 let blob_url = parsed.blob_url("logs", "file.txt").unwrap();
309 assert_eq!(
310 blob_url,
311 "https://myacct.blob.core.windows.net/logs/file.txt"
312 );
313 assert_eq!(
314 parsed.auth(),
315 Auth::SharedKey {
316 account_name: "myacct".to_string(),
317 account_key: "base64==".to_string()
318 }
319 );
320 }
321
322 #[test]
323 fn parse_sas_with_blob_endpoint() {
324 let cs = "BlobEndpoint=https://myacct.blob.core.windows.net/;SharedAccessSignature=sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=...";
325 let parsed = ParsedConnectionString::parse(cs).unwrap();
326 assert_eq!(
327 parsed.shared_access_signature.as_deref(),
328 Some("sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=...")
329 );
330
331 let container_url = parsed.container_url("logs").unwrap();
332 assert_eq!(
333 container_url,
334 "https://myacct.blob.core.windows.net/logs?sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=..."
335 );
336
337 let blob_url = parsed.blob_url("logs", "file name.txt").unwrap();
338 assert_eq!(
339 blob_url,
340 "https://myacct.blob.core.windows.net/logs/file%20name.txt?sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=..."
341 );
342 assert_eq!(
343 parsed.auth(),
344 Auth::Sas {
345 query: "sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=..."
346 .to_string()
347 }
348 );
349 }
350
351 #[test]
352 fn parse_sas_with_leading_question_mark() {
353 let cs = "BlobEndpoint=https://myacct.blob.core.windows.net/;SharedAccessSignature=?sv=2022-11-02&ss=b";
354 let parsed = ParsedConnectionString::parse(cs).unwrap();
355 assert_eq!(
356 parsed.shared_access_signature.as_deref(),
357 Some("sv=2022-11-02&ss=b")
358 );
359 let url = parsed.container_url("logs").unwrap();
360 assert_eq!(
361 url,
362 "https://myacct.blob.core.windows.net/logs?sv=2022-11-02&ss=b"
363 );
364 }
365
366 #[test]
367 fn parse_development_storage_with_defaults() {
368 let cs =
369 "UseDevelopmentStorage=true;DefaultEndpointsProtocol=http;AccountName=devstoreaccount1";
370 let parsed = ParsedConnectionString::parse(cs).unwrap();
371 let base = parsed.blob_account_endpoint().unwrap();
372 assert_eq!(base, "http://127.0.0.1:10000/devstoreaccount1");
373
374 let container_url = parsed.container_url("logs").unwrap();
375 assert_eq!(
376 container_url,
377 "http://127.0.0.1:10000/devstoreaccount1/logs"
378 );
379 }
380
381 #[test]
382 fn parse_development_storage_with_proxy() {
383 let cs = "UseDevelopmentStorage=true;AccountName=devstoreaccount1;DevelopmentStorageProxyUri=http://localhost:10000";
384 let parsed = ParsedConnectionString::parse(cs).unwrap();
385 let base = parsed.blob_account_endpoint().unwrap();
386 assert_eq!(base, "http://localhost:10000/devstoreaccount1");
387
388 let container_url = parsed.container_url("logs").unwrap();
389 assert_eq!(
390 container_url,
391 "http://localhost:10000/devstoreaccount1/logs"
392 );
393 }
394
395 #[test]
396 fn parse_invalid_pairs() {
397 let cs = "AccountName;AccountKey=noequals";
398 let err = ParsedConnectionString::parse(cs).unwrap_err();
399 match err {
400 ConnectionStringError::InvalidPair(p) => {
401 assert!(p == "AccountName" || p == "AccountKey=noequals")
402 }
403 _ => panic!("unexpected error: {err}"),
404 }
405 }
406}