vector/sinks/azure_common/
connection_string.rs

1/*!
2Minimal Azure Storage connection string parser and URL builder for Blob Storage.
3
4This module intentionally avoids relying on the legacy Azure Storage SDK crates.
5It extracts only the fields we need and composes container/blob URLs suitable
6for the newer `azure_storage_blob` crate (>= 0.7).
7
8Supported keys (case-insensitive):
9- AccountName
10- AccountKey
11- SharedAccessSignature
12- DefaultEndpointsProtocol
13- EndpointSuffix
14- BlobEndpoint
15- UseDevelopmentStorage
16- DevelopmentStorageProxyUri
17
18Behavior
19- If `BlobEndpoint` is present, it is used as the base for container/blob URLs.
20  It may already include the account segment (e.g., Azurite: http://127.0.0.1:10000/devstoreaccount1).
21- Otherwise, if `UseDevelopmentStorage=true`, we synthesize a dev endpoint:
22  `{protocol}://127.0.0.1:10000/{account_name}`, with `protocol` default `http` if unspecified.
23  If `DevelopmentStorageProxyUri` is present, it replaces the host/port while still appending
24  the account name path segment.
25- Otherwise, we synthesize the public cloud endpoint:
26  `{protocol}://{account_name}.blob.{endpoint_suffix}` where `endpoint_suffix` defaults to `core.windows.net`
27  and `protocol` defaults to `https`.
28
29SAS handling
30- If `SharedAccessSignature` exists, it will be appended to the generated URLs as a query string.
31  Both `sv=...` and `?sv=...` forms are accepted; the leading '?' is normalized.
32
33Examples:
34- Access key connection string:
35  "DefaultEndpointsProtocol=https;AccountName=myacct;AccountKey=base64key==;EndpointSuffix=core.windows.net"
36  Container URL: <https://myacct.blob.core.windows.net/logs>
37  Blob URL: <https://myacct.blob.core.windows.net/logs/file.txt>
38
39- SAS connection string:
40  "BlobEndpoint=<https://myacct.blob.core.windows.net/>;SharedAccessSignature=sv=2022-11-02&ss=b&..."
41  Container URL (with SAS): <https://myacct.blob.core.windows.net/logs?sv=2022-11-02&ss=b&...>
42  Blob URL (with SAS): <https://myacct.blob.core.windows.net/logs/file.txt?sv=2022-11-02&ss=b&...>
43
44- Azurite/dev storage:
45  "UseDevelopmentStorage=true;DefaultEndpointsProtocol=http;AccountName=devstoreaccount1"
46  Container URL: <http://127.0.0.1:10000/devstoreaccount1/logs>
47*/
48
49use std::collections::HashMap;
50
51/// Errors that can occur while parsing a connection string or composing URLs.
52#[derive(Debug, Clone)]
53pub enum ConnectionStringError {
54    InvalidFormat(&'static str),
55    InvalidPair(String),
56    MissingAccountName,
57    MissingEndpoint,
58}
59
60impl std::fmt::Display for ConnectionStringError {
61    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
62        match self {
63            ConnectionStringError::InvalidFormat(msg) => write!(f, "invalid format: {msg}"),
64            ConnectionStringError::InvalidPair(p) => write!(f, "invalid key=value pair: {p}"),
65            ConnectionStringError::MissingAccountName => write!(f, "account name is required"),
66            ConnectionStringError::MissingEndpoint => {
67                write!(f, "could not determine Blob endpoint")
68            }
69        }
70    }
71}
72
73impl std::error::Error for ConnectionStringError {}
74
75/// Represents the type of authentication present in the connection string.
76#[derive(Debug, Clone, PartialEq, Eq)]
77pub enum Auth {
78    /// Shared key-based authentication (account key).
79    SharedKey {
80        account_name: String,
81        account_key: String, // base64-encoded account key as provided
82    },
83    /// Shared access signature provided as query string (without the leading `?`).
84    Sas { query: String },
85    /// No credentials present.
86    None,
87}
88
89/// A parsed Azure Storage connection string and helpers to compose URLs for containers/blobs.
90#[derive(Debug, Clone, Default)]
91pub struct ParsedConnectionString {
92    pub account_name: Option<String>,
93    pub account_key: Option<String>,
94    pub shared_access_signature: Option<String>,
95    pub default_endpoints_protocol: Option<String>,
96    pub endpoint_suffix: Option<String>,
97    pub blob_endpoint: Option<String>,
98    pub use_development_storage: bool,
99    pub development_storage_proxy_uri: Option<String>,
100}
101
102impl ParsedConnectionString {
103    /// Parse a connection string into a `ParsedConnectionString`.
104    ///
105    /// The parser is case-insensitive for keys and ignores empty segments.
106    pub fn parse(s: &str) -> Result<Self, ConnectionStringError> {
107        let mut map: HashMap<String, String> = HashMap::new();
108
109        for seg in s.split(';') {
110            let seg = seg.trim();
111            if seg.is_empty() {
112                continue;
113            }
114            let (k, v) = seg
115                .split_once('=')
116                .ok_or_else(|| ConnectionStringError::InvalidPair(seg.to_string()))?;
117            let key = k.trim().to_ascii_lowercase();
118            let value = v.trim().to_string();
119            map.insert(key, value);
120        }
121
122        // Build the structure from the parsed map.
123        let parsed = ParsedConnectionString {
124            account_name: map.get("accountname").cloned(),
125            account_key: map.get("accountkey").cloned(),
126            shared_access_signature: map
127                .get("sharedaccesssignature")
128                .map(|s| normalize_sas(s.as_str())),
129            default_endpoints_protocol: map
130                .get("defaultendpointsprotocol")
131                .map(|s| s.to_ascii_lowercase()),
132            endpoint_suffix: map.get("endpointsuffix").cloned(),
133            blob_endpoint: map.get("blobendpoint").cloned(),
134            use_development_storage: map
135                .get("usedevelopmentstorage")
136                .map(|v| v.eq_ignore_ascii_case("true"))
137                .unwrap_or(false),
138            development_storage_proxy_uri: map.get("developmentstorageproxyuri").cloned(),
139        };
140
141        Ok(parsed)
142    }
143
144    /// Determine the authentication method present in this connection string.
145    pub fn auth(&self) -> Auth {
146        if let (Some(name), Some(key)) = (self.account_name.as_ref(), self.account_key.as_ref()) {
147            return Auth::SharedKey {
148                account_name: name.clone(),
149                account_key: key.clone(),
150            };
151        }
152        if let Some(sas) = self.shared_access_signature.as_ref() {
153            return Auth::Sas { query: sas.clone() };
154        }
155        Auth::None
156    }
157
158    /// Get the normalized default protocol, defaulting to:
159    /// - http for development storage
160    /// - https otherwise
161    pub fn default_protocol(&self) -> String {
162        if let Some(p) = self.default_endpoints_protocol.as_deref() {
163            match p {
164                "http" | "https" => p.to_string(),
165                _ => {
166                    // Fallbacks
167                    if self.use_development_storage {
168                        "http".to_string()
169                    } else {
170                        "https".to_string()
171                    }
172                }
173            }
174        } else if self.use_development_storage {
175            "http".to_string()
176        } else {
177            "https".to_string()
178        }
179    }
180
181    /// Get the normalized endpoint suffix, defaulting to "core.windows.net".
182    pub fn endpoint_suffix(&self) -> String {
183        self.endpoint_suffix
184            .clone()
185            .unwrap_or_else(|| "core.windows.net".to_string())
186    }
187
188    /// Build the base Blob endpoint URL (no container/blob path).
189    ///
190    /// Resolution order:
191    /// 1. BlobEndpoint (as-is, without trailing slash normalization)
192    /// 2. Development storage synthesized URL: `{proto}://127.0.0.1:10000/{account}`
193    ///    If DevelopmentStorageProxyUri is present, it will be used instead of 127.0.0.1:10000.
194    /// 3. Public cloud synthesized URL: `{proto}://{account}.blob.{suffix}`
195    pub fn blob_account_endpoint(&self) -> Result<String, ConnectionStringError> {
196        if let Some(explicit) = self.blob_endpoint.as_ref() {
197            return Ok(explicit.clone());
198        }
199
200        let account_name = self
201            .account_name
202            .as_ref()
203            .ok_or(ConnectionStringError::MissingAccountName)?;
204
205        let proto = self.default_protocol();
206
207        if self.use_development_storage {
208            // If the proxy URI is provided, use it. Otherwise default to 127.0.0.1:10000
209            let host = self
210                .development_storage_proxy_uri
211                .as_deref()
212                .map(|s| s.trim_end_matches('/').to_string())
213                .unwrap_or_else(|| "127.0.0.1:10000".to_string());
214
215            let base = if host.starts_with("http://") || host.starts_with("https://") {
216                format!("{}/{}", trim_trailing_slash(&host), account_name)
217            } else {
218                format!("{proto}://{host}/{}", account_name)
219            };
220            return Ok(base);
221        }
222
223        // Public cloud-style base
224        let suffix = self.endpoint_suffix();
225        Ok(format!("{proto}://{}.blob.{}", account_name, suffix))
226    }
227
228    /// Build a container URL, optionally appending SAS if present.
229    pub fn container_url(&self, container: &str) -> Result<String, ConnectionStringError> {
230        let base = self.blob_account_endpoint()?;
231        Ok(append_query_segment(
232            &format!("{}/{}", trim_trailing_slash(&base), container),
233            self.shared_access_signature.as_deref(),
234        ))
235    }
236
237    /// Build a blob URL, optionally appending SAS if present.
238    pub fn blob_url(&self, container: &str, blob: &str) -> Result<String, ConnectionStringError> {
239        // Build the base container URL without SAS, then append the blob path,
240        // and finally append the SAS so it appears after the full path.
241        let base = self.blob_account_endpoint()?;
242        let container_no_sas = format!("{}/{}", trim_trailing_slash(&base), container);
243        let blob_full = format!(
244            "{}/{}",
245            trim_trailing_slash(&container_no_sas),
246            encode_path_segment(blob)
247        );
248        Ok(append_query_segment(
249            &blob_full,
250            self.shared_access_signature.as_deref(),
251        ))
252    }
253}
254
255/// Normalize a SAS string by removing any leading '?'.
256fn normalize_sas(s: &str) -> String {
257    s.trim_start_matches('?').to_string()
258}
259
260/// Append a query segment `sas` to `base_url`, respecting whether `base_url` already has a query.
261fn append_query_segment(base_url: &str, sas: Option<&str>) -> String {
262    match sas {
263        None => base_url.to_string(),
264        Some("") => base_url.to_string(),
265        Some(q) => {
266            let sep = if base_url.contains('?') { '&' } else { '?' };
267            format!("{base_url}{sep}{q}")
268        }
269    }
270}
271
272/// Trim exactly one trailing slash from a string, if present.
273fn trim_trailing_slash(s: &str) -> String {
274    if let Some(stripped) = s.strip_suffix('/') {
275        stripped.to_string()
276    } else {
277        s.to_string()
278    }
279}
280
281/// Encode a path segment minimally (only slash needs special handling for our cases).
282/// For our purposes (blob names generated by Vector), we only replace spaces with %20.
283/// This avoids pulling an extra crate; refine if needed in the future.
284fn encode_path_segment(seg: &str) -> String {
285    seg.replace(' ', "%20")
286}
287
288#[cfg(test)]
289mod tests {
290    use super::*;
291
292    #[test]
293    fn parse_access_key_public_cloud() {
294        let cs = "DefaultEndpointsProtocol=https;AccountName=myacct;AccountKey=base64==;EndpointSuffix=core.windows.net";
295        let parsed = ParsedConnectionString::parse(cs).unwrap();
296        assert_eq!(parsed.account_name.as_deref(), Some("myacct"));
297        assert_eq!(parsed.account_key.as_deref(), Some("base64=="));
298        assert!(parsed.shared_access_signature.is_none());
299        assert_eq!(parsed.default_protocol(), "https");
300        assert_eq!(parsed.endpoint_suffix(), "core.windows.net");
301
302        let base = parsed.blob_account_endpoint().unwrap();
303        assert_eq!(base, "https://myacct.blob.core.windows.net");
304
305        let container_url = parsed.container_url("logs").unwrap();
306        assert_eq!(container_url, "https://myacct.blob.core.windows.net/logs");
307
308        let blob_url = parsed.blob_url("logs", "file.txt").unwrap();
309        assert_eq!(
310            blob_url,
311            "https://myacct.blob.core.windows.net/logs/file.txt"
312        );
313        assert_eq!(
314            parsed.auth(),
315            Auth::SharedKey {
316                account_name: "myacct".to_string(),
317                account_key: "base64==".to_string()
318            }
319        );
320    }
321
322    #[test]
323    fn parse_sas_with_blob_endpoint() {
324        let cs = "BlobEndpoint=https://myacct.blob.core.windows.net/;SharedAccessSignature=sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=...";
325        let parsed = ParsedConnectionString::parse(cs).unwrap();
326        assert_eq!(
327            parsed.shared_access_signature.as_deref(),
328            Some("sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=...")
329        );
330
331        let container_url = parsed.container_url("logs").unwrap();
332        assert_eq!(
333            container_url,
334            "https://myacct.blob.core.windows.net/logs?sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=..."
335        );
336
337        let blob_url = parsed.blob_url("logs", "file name.txt").unwrap();
338        assert_eq!(
339            blob_url,
340            "https://myacct.blob.core.windows.net/logs/file%20name.txt?sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=..."
341        );
342        assert_eq!(
343            parsed.auth(),
344            Auth::Sas {
345                query: "sv=2022-11-02&ss=b&srt=sco&sp=rcw&se=2099-01-01T00:00:00Z&sig=..."
346                    .to_string()
347            }
348        );
349    }
350
351    #[test]
352    fn parse_sas_with_leading_question_mark() {
353        let cs = "BlobEndpoint=https://myacct.blob.core.windows.net/;SharedAccessSignature=?sv=2022-11-02&ss=b";
354        let parsed = ParsedConnectionString::parse(cs).unwrap();
355        assert_eq!(
356            parsed.shared_access_signature.as_deref(),
357            Some("sv=2022-11-02&ss=b")
358        );
359        let url = parsed.container_url("logs").unwrap();
360        assert_eq!(
361            url,
362            "https://myacct.blob.core.windows.net/logs?sv=2022-11-02&ss=b"
363        );
364    }
365
366    #[test]
367    fn parse_development_storage_with_defaults() {
368        let cs =
369            "UseDevelopmentStorage=true;DefaultEndpointsProtocol=http;AccountName=devstoreaccount1";
370        let parsed = ParsedConnectionString::parse(cs).unwrap();
371        let base = parsed.blob_account_endpoint().unwrap();
372        assert_eq!(base, "http://127.0.0.1:10000/devstoreaccount1");
373
374        let container_url = parsed.container_url("logs").unwrap();
375        assert_eq!(
376            container_url,
377            "http://127.0.0.1:10000/devstoreaccount1/logs"
378        );
379    }
380
381    #[test]
382    fn parse_development_storage_with_proxy() {
383        let cs = "UseDevelopmentStorage=true;AccountName=devstoreaccount1;DevelopmentStorageProxyUri=http://localhost:10000";
384        let parsed = ParsedConnectionString::parse(cs).unwrap();
385        let base = parsed.blob_account_endpoint().unwrap();
386        assert_eq!(base, "http://localhost:10000/devstoreaccount1");
387
388        let container_url = parsed.container_url("logs").unwrap();
389        assert_eq!(
390            container_url,
391            "http://localhost:10000/devstoreaccount1/logs"
392        );
393    }
394
395    #[test]
396    fn parse_invalid_pairs() {
397        let cs = "AccountName;AccountKey=noequals";
398        let err = ParsedConnectionString::parse(cs).unwrap_err();
399        match err {
400            ConnectionStringError::InvalidPair(p) => {
401                assert!(p == "AccountName" || p == "AccountKey=noequals")
402            }
403            _ => panic!("unexpected error: {err}"),
404        }
405    }
406}