vector/sources/util/http/
headers.rs

1use bytes::Bytes;
2use vector_lib::{
3    config::{LegacyKey, LogNamespace},
4    event::Event,
5    lookup::path,
6};
7use warp::http::{HeaderMap, HeaderValue};
8
9use crate::{event::Value, sources::http_server::HttpConfigParamKind};
10
11pub fn add_headers(
12    events: &mut [Event],
13    headers_config: &[HttpConfigParamKind],
14    headers: &HeaderMap,
15    log_namespace: LogNamespace,
16    source_name: &'static str,
17) {
18    for h in headers_config {
19        match h {
20            // Add each non-wildcard containing header that was specified
21            // in the `headers` config option to the event if an exact match
22            // is found.
23            HttpConfigParamKind::Exact(header_name) => {
24                let value = headers.get(header_name).map(HeaderValue::as_bytes);
25
26                for event in events.iter_mut() {
27                    match event {
28                        Event::Log(log) => {
29                            log_namespace.insert_source_metadata(
30                                source_name,
31                                log,
32                                Some(LegacyKey::InsertIfEmpty(path!(header_name))),
33                                path!("headers", header_name),
34                                Value::from(value.map(Bytes::copy_from_slice)),
35                            );
36                        }
37                        Event::Metric(_) | Event::Trace(_) => {
38                            event.metadata_mut().value_mut().insert(
39                                path!(source_name, "headers", header_name),
40                                Value::from(value.map(Bytes::copy_from_slice)),
41                            );
42                        }
43                    }
44                }
45            }
46            // Add all headers that match against wildcard pattens specified
47            // in the `headers` config option to the event.
48            HttpConfigParamKind::Glob(header_pattern) => {
49                for header_name in headers.keys() {
50                    if header_pattern
51                        .matches_with(header_name.as_str(), glob::MatchOptions::default())
52                    {
53                        let value = headers.get(header_name).map(HeaderValue::as_bytes);
54
55                        for event in events.iter_mut() {
56                            match event {
57                                Event::Log(log) => {
58                                    log_namespace.insert_source_metadata(
59                                        source_name,
60                                        log,
61                                        Some(LegacyKey::InsertIfEmpty(path!(header_name.as_str()))),
62                                        path!("headers", header_name.as_str()),
63                                        Value::from(value.map(Bytes::copy_from_slice)),
64                                    );
65                                }
66                                Event::Metric(_) | Event::Trace(_) => {
67                                    event.metadata_mut().value_mut().insert(
68                                        path!(source_name, "headers", header_name.as_str()),
69                                        Value::from(value.map(Bytes::copy_from_slice)),
70                                    );
71                                }
72                            }
73                        }
74                    }
75                }
76            }
77        };
78    }
79}
80
81#[cfg(test)]
82mod tests {
83    use chrono::{DateTime, Utc};
84    use std::time::SystemTime;
85    use vector_lib::config::LogNamespace;
86    use vrl::{path, value};
87    use warp::http::HeaderMap;
88
89    use crate::event::{Event, MetricKind, MetricTags, MetricValue};
90    use crate::{
91        event::{LogEvent, Metric, TraceEvent},
92        sources::{http_server::HttpConfigParamKind, util::add_headers},
93    };
94
95    #[test]
96    fn multiple_headers() {
97        let header_names = [
98            HttpConfigParamKind::Exact("Content-Type".into()),
99            HttpConfigParamKind::Exact("User-Agent".into()),
100        ];
101        let mut headers = HeaderMap::new();
102        headers.insert("Content-Type", "application/x-protobuf".parse().unwrap());
103        headers.insert("User-Agent", "Test".parse().unwrap());
104        headers.insert("Content-Encoding", "gzip".parse().unwrap());
105
106        let mut base_log = [LogEvent::from(value!({})).into()];
107        add_headers(
108            &mut base_log,
109            &header_names,
110            &headers,
111            LogNamespace::Legacy,
112            "test",
113        );
114        let mut namespaced_log = [LogEvent::from(value!({})).into()];
115        add_headers(
116            &mut namespaced_log,
117            &header_names,
118            &headers,
119            LogNamespace::Vector,
120            "test",
121        );
122
123        assert_eq!(
124            base_log[0].as_log().value(),
125            namespaced_log[0]
126                .metadata()
127                .value()
128                .get(path!("test", "headers"))
129                .unwrap()
130        );
131
132        let mut metric = [Event::from(
133            Metric::new(
134                "some.random.metric",
135                MetricKind::Incremental,
136                MetricValue::Counter { value: 123.4 },
137            )
138            .with_timestamp(Some(DateTime::<Utc>::from(SystemTime::now())))
139            .with_tags(Some(MetricTags::default())),
140        )];
141
142        add_headers(
143            &mut metric,
144            &header_names,
145            &headers,
146            LogNamespace::default(),
147            "test",
148        );
149
150        let mut trace = [TraceEvent::from(btreemap! {
151            "span_id" => "abc123",
152            "trace_id" => "xyz789",
153            "span_name" => "test-span",
154            "service" => "my-service",
155        })
156        .into()];
157
158        add_headers(
159            &mut trace,
160            &header_names,
161            &headers,
162            LogNamespace::default(),
163            "test",
164        );
165
166        assert_eq!(
167            metric[0]
168                .metadata()
169                .value()
170                .get(path!("test", "headers"))
171                .unwrap(),
172            trace[0]
173                .metadata()
174                .value()
175                .get(path!("test", "headers"))
176                .unwrap()
177        );
178    }
179
180    #[test]
181    fn multiple_headers_wildcard() {
182        let header_names = [HttpConfigParamKind::Glob(
183            glob::Pattern::new("Content-*").unwrap(),
184        )];
185        let mut headers = HeaderMap::new();
186        headers.insert("Content-Type", "application/x-protobuf".parse().unwrap());
187        headers.insert("User-Agent", "Test".parse().unwrap());
188        headers.insert("Content-Encoding", "gzip".parse().unwrap());
189
190        let mut base_log = [LogEvent::from(value!({})).into()];
191        add_headers(
192            &mut base_log,
193            &header_names,
194            &headers,
195            LogNamespace::Legacy,
196            "test",
197        );
198        let mut namespaced_log = [LogEvent::from(value!({})).into()];
199        add_headers(
200            &mut namespaced_log,
201            &header_names,
202            &headers,
203            LogNamespace::Vector,
204            "test",
205        );
206
207        let log = base_log[0].as_log();
208        assert_eq!(
209            log.value(),
210            namespaced_log[0]
211                .metadata()
212                .value()
213                .get(path!("test", "headers"))
214                .unwrap(),
215            "Checking legacy and namespaced log contain headers string"
216        );
217        assert_eq!(
218            log["content-type"],
219            "application/x-protobuf".into(),
220            "Checking log contains Content-Type header"
221        );
222        assert!(
223            !log.contains("user-agent"),
224            "Checking log does not contain User-Agent header"
225        );
226        assert_eq!(
227            log["content-encoding"],
228            "gzip".into(),
229            "Checking log contains Content-Encoding header"
230        );
231
232        let mut metric = [Event::from(
233            Metric::new(
234                "some.random.metric",
235                MetricKind::Incremental,
236                MetricValue::Counter { value: 123.4 },
237            )
238            .with_timestamp(Some(DateTime::<Utc>::from(SystemTime::now())))
239            .with_tags(Some(MetricTags::default())),
240        )];
241
242        add_headers(
243            &mut metric,
244            &header_names,
245            &headers,
246            LogNamespace::default(),
247            "test",
248        );
249
250        let metric_headers = metric[0]
251            .metadata()
252            .value()
253            .get(path!("test", "headers"))
254            .unwrap();
255
256        assert_eq!(
257            metric_headers.get("content-type").unwrap(),
258            &value!("application/x-protobuf"),
259            "Checking metric contains Content-Type header"
260        );
261        assert!(
262            !metric_headers.contains("user-agent"),
263            "Checking metric does not contain User-Agent header"
264        );
265        assert_eq!(
266            metric_headers.get("content-encoding").unwrap(),
267            &value!("gzip"),
268            "Checking metric contains Content-Encoding header"
269        );
270
271        let mut trace = [TraceEvent::from(btreemap! {
272            "span_id" => "abc123",
273            "trace_id" => "xyz789",
274            "span_name" => "test-span",
275            "service" => "my-service",
276        })
277        .into()];
278
279        add_headers(
280            &mut trace,
281            &header_names,
282            &headers,
283            LogNamespace::default(),
284            "test",
285        );
286
287        let trace_headers = trace[0]
288            .metadata()
289            .value()
290            .get(path!("test", "headers"))
291            .unwrap();
292
293        assert_eq!(
294            trace_headers.get("content-type").unwrap(),
295            &value!("application/x-protobuf"),
296            "Checking trace contains Content-Type header"
297        );
298        assert!(
299            !trace_headers.contains("user-agent"),
300            "Checking trace does not contain User-Agent header"
301        );
302        assert_eq!(
303            trace_headers.get("content-encoding").unwrap(),
304            &value!("gzip"),
305            "Checking trace contains Content-Encoding header"
306        );
307    }
308}