vector/sources/util/http/
headers.rs1use bytes::Bytes;
2use vector_lib::{
3 config::{LegacyKey, LogNamespace},
4 event::Event,
5 lookup::path,
6};
7use warp::http::{HeaderMap, HeaderValue};
8
9use crate::{event::Value, sources::http_server::HttpConfigParamKind};
10
11pub fn add_headers(
12 events: &mut [Event],
13 headers_config: &[HttpConfigParamKind],
14 headers: &HeaderMap,
15 log_namespace: LogNamespace,
16 source_name: &'static str,
17) {
18 for h in headers_config {
19 match h {
20 HttpConfigParamKind::Exact(header_name) => {
24 let value = headers.get(header_name).map(HeaderValue::as_bytes);
25
26 for event in events.iter_mut() {
27 match event {
28 Event::Log(log) => {
29 log_namespace.insert_source_metadata(
30 source_name,
31 log,
32 Some(LegacyKey::InsertIfEmpty(path!(header_name))),
33 path!("headers", header_name),
34 Value::from(value.map(Bytes::copy_from_slice)),
35 );
36 }
37 Event::Metric(_) | Event::Trace(_) => {
38 event.metadata_mut().value_mut().insert(
39 path!(source_name, "headers", header_name),
40 Value::from(value.map(Bytes::copy_from_slice)),
41 );
42 }
43 }
44 }
45 }
46 HttpConfigParamKind::Glob(header_pattern) => {
49 for header_name in headers.keys() {
50 if header_pattern
51 .matches_with(header_name.as_str(), glob::MatchOptions::default())
52 {
53 let value = headers.get(header_name).map(HeaderValue::as_bytes);
54
55 for event in events.iter_mut() {
56 match event {
57 Event::Log(log) => {
58 log_namespace.insert_source_metadata(
59 source_name,
60 log,
61 Some(LegacyKey::InsertIfEmpty(path!(header_name.as_str()))),
62 path!("headers", header_name.as_str()),
63 Value::from(value.map(Bytes::copy_from_slice)),
64 );
65 }
66 Event::Metric(_) | Event::Trace(_) => {
67 event.metadata_mut().value_mut().insert(
68 path!(source_name, "headers", header_name.as_str()),
69 Value::from(value.map(Bytes::copy_from_slice)),
70 );
71 }
72 }
73 }
74 }
75 }
76 }
77 };
78 }
79}
80
81#[cfg(test)]
82mod tests {
83 use chrono::{DateTime, Utc};
84 use std::time::SystemTime;
85 use vector_lib::config::LogNamespace;
86 use vrl::{path, value};
87 use warp::http::HeaderMap;
88
89 use crate::event::{Event, MetricKind, MetricTags, MetricValue};
90 use crate::{
91 event::{LogEvent, Metric, TraceEvent},
92 sources::{http_server::HttpConfigParamKind, util::add_headers},
93 };
94
95 #[test]
96 fn multiple_headers() {
97 let header_names = [
98 HttpConfigParamKind::Exact("Content-Type".into()),
99 HttpConfigParamKind::Exact("User-Agent".into()),
100 ];
101 let mut headers = HeaderMap::new();
102 headers.insert("Content-Type", "application/x-protobuf".parse().unwrap());
103 headers.insert("User-Agent", "Test".parse().unwrap());
104 headers.insert("Content-Encoding", "gzip".parse().unwrap());
105
106 let mut base_log = [LogEvent::from(value!({})).into()];
107 add_headers(
108 &mut base_log,
109 &header_names,
110 &headers,
111 LogNamespace::Legacy,
112 "test",
113 );
114 let mut namespaced_log = [LogEvent::from(value!({})).into()];
115 add_headers(
116 &mut namespaced_log,
117 &header_names,
118 &headers,
119 LogNamespace::Vector,
120 "test",
121 );
122
123 assert_eq!(
124 base_log[0].as_log().value(),
125 namespaced_log[0]
126 .metadata()
127 .value()
128 .get(path!("test", "headers"))
129 .unwrap()
130 );
131
132 let mut metric = [Event::from(
133 Metric::new(
134 "some.random.metric",
135 MetricKind::Incremental,
136 MetricValue::Counter { value: 123.4 },
137 )
138 .with_timestamp(Some(DateTime::<Utc>::from(SystemTime::now())))
139 .with_tags(Some(MetricTags::default())),
140 )];
141
142 add_headers(
143 &mut metric,
144 &header_names,
145 &headers,
146 LogNamespace::default(),
147 "test",
148 );
149
150 let mut trace = [TraceEvent::from(btreemap! {
151 "span_id" => "abc123",
152 "trace_id" => "xyz789",
153 "span_name" => "test-span",
154 "service" => "my-service",
155 })
156 .into()];
157
158 add_headers(
159 &mut trace,
160 &header_names,
161 &headers,
162 LogNamespace::default(),
163 "test",
164 );
165
166 assert_eq!(
167 metric[0]
168 .metadata()
169 .value()
170 .get(path!("test", "headers"))
171 .unwrap(),
172 trace[0]
173 .metadata()
174 .value()
175 .get(path!("test", "headers"))
176 .unwrap()
177 );
178 }
179
180 #[test]
181 fn multiple_headers_wildcard() {
182 let header_names = [HttpConfigParamKind::Glob(
183 glob::Pattern::new("Content-*").unwrap(),
184 )];
185 let mut headers = HeaderMap::new();
186 headers.insert("Content-Type", "application/x-protobuf".parse().unwrap());
187 headers.insert("User-Agent", "Test".parse().unwrap());
188 headers.insert("Content-Encoding", "gzip".parse().unwrap());
189
190 let mut base_log = [LogEvent::from(value!({})).into()];
191 add_headers(
192 &mut base_log,
193 &header_names,
194 &headers,
195 LogNamespace::Legacy,
196 "test",
197 );
198 let mut namespaced_log = [LogEvent::from(value!({})).into()];
199 add_headers(
200 &mut namespaced_log,
201 &header_names,
202 &headers,
203 LogNamespace::Vector,
204 "test",
205 );
206
207 let log = base_log[0].as_log();
208 assert_eq!(
209 log.value(),
210 namespaced_log[0]
211 .metadata()
212 .value()
213 .get(path!("test", "headers"))
214 .unwrap(),
215 "Checking legacy and namespaced log contain headers string"
216 );
217 assert_eq!(
218 log["content-type"],
219 "application/x-protobuf".into(),
220 "Checking log contains Content-Type header"
221 );
222 assert!(
223 !log.contains("user-agent"),
224 "Checking log does not contain User-Agent header"
225 );
226 assert_eq!(
227 log["content-encoding"],
228 "gzip".into(),
229 "Checking log contains Content-Encoding header"
230 );
231
232 let mut metric = [Event::from(
233 Metric::new(
234 "some.random.metric",
235 MetricKind::Incremental,
236 MetricValue::Counter { value: 123.4 },
237 )
238 .with_timestamp(Some(DateTime::<Utc>::from(SystemTime::now())))
239 .with_tags(Some(MetricTags::default())),
240 )];
241
242 add_headers(
243 &mut metric,
244 &header_names,
245 &headers,
246 LogNamespace::default(),
247 "test",
248 );
249
250 let metric_headers = metric[0]
251 .metadata()
252 .value()
253 .get(path!("test", "headers"))
254 .unwrap();
255
256 assert_eq!(
257 metric_headers.get("content-type").unwrap(),
258 &value!("application/x-protobuf"),
259 "Checking metric contains Content-Type header"
260 );
261 assert!(
262 !metric_headers.contains("user-agent"),
263 "Checking metric does not contain User-Agent header"
264 );
265 assert_eq!(
266 metric_headers.get("content-encoding").unwrap(),
267 &value!("gzip"),
268 "Checking metric contains Content-Encoding header"
269 );
270
271 let mut trace = [TraceEvent::from(btreemap! {
272 "span_id" => "abc123",
273 "trace_id" => "xyz789",
274 "span_name" => "test-span",
275 "service" => "my-service",
276 })
277 .into()];
278
279 add_headers(
280 &mut trace,
281 &header_names,
282 &headers,
283 LogNamespace::default(),
284 "test",
285 );
286
287 let trace_headers = trace[0]
288 .metadata()
289 .value()
290 .get(path!("test", "headers"))
291 .unwrap();
292
293 assert_eq!(
294 trace_headers.get("content-type").unwrap(),
295 &value!("application/x-protobuf"),
296 "Checking trace contains Content-Type header"
297 );
298 assert!(
299 !trace_headers.contains("user-agent"),
300 "Checking trace does not contain User-Agent header"
301 );
302 assert_eq!(
303 trace_headers.get("content-encoding").unwrap(),
304 &value!("gzip"),
305 "Checking trace contains Content-Encoding header"
306 );
307 }
308}