1use std::{
2 collections::HashMap,
3 io::{BufRead, BufReader},
4 net::SocketAddr,
5 str::FromStr,
6};
7
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use chrono::{DateTime, Utc};
10use smallvec::SmallVec;
11use tokio_util::codec::Decoder as _;
12use vector_lib::{
13 codecs::{
14 StreamDecodingError,
15 decoding::{DeserializerConfig, FramingConfig},
16 },
17 config::{DataType, LegacyKey, LogNamespace},
18 configurable::configurable_component,
19 lookup::{lookup_v2::parse_value_path, owned_value_path, path},
20 schema::Definition,
21};
22use vrl::value::{Kind, kind::Collection};
23use warp::http::{HeaderMap, StatusCode};
24
25use crate::{
26 codecs::{Decoder, DecodingConfig},
27 common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
28 config::{
29 GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
30 SourceOutput, log_schema,
31 },
32 event::{Event, LogEvent},
33 http::KeepaliveConfig,
34 internal_events::{HerokuLogplexRequestReadError, HerokuLogplexRequestReceived},
35 serde::{bool_or_struct, default_decoding, default_framing_message_based},
36 sources::{
37 http_server::{HttpConfigParamKind, build_param_matcher, remove_duplicates},
38 util::{
39 HttpSource,
40 http::{HttpMethod, add_query_parameters},
41 },
42 },
43 tls::TlsEnableableConfig,
44};
45
46#[configurable_component(source(
48 "heroku_logs",
49 "Collect logs from Heroku's Logplex, the router responsible for receiving logs from your Heroku apps."
50))]
51#[derive(Clone, Debug)]
52pub struct LogplexConfig {
53 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
55 #[configurable(metadata(docs::examples = "localhost:80"))]
56 address: SocketAddr,
57
58 #[serde(default)]
66 #[configurable(metadata(docs::examples = "application"))]
67 #[configurable(metadata(docs::examples = "source"))]
68 #[configurable(metadata(docs::examples = "param*"))]
69 #[configurable(metadata(docs::examples = "*"))]
70 query_parameters: Vec<String>,
71
72 #[configurable(derived)]
73 tls: Option<TlsEnableableConfig>,
74
75 #[configurable(derived)]
76 auth: Option<HttpServerAuthConfig>,
77
78 #[configurable(derived)]
79 #[serde(default = "default_framing_message_based")]
80 framing: FramingConfig,
81
82 #[configurable(derived)]
83 #[serde(default = "default_decoding")]
84 decoding: DeserializerConfig,
85
86 #[configurable(derived)]
87 #[serde(default, deserialize_with = "bool_or_struct")]
88 acknowledgements: SourceAcknowledgementsConfig,
89
90 #[configurable(metadata(docs::hidden))]
92 #[serde(default)]
93 log_namespace: Option<bool>,
94
95 #[configurable(derived)]
96 #[serde(default)]
97 keepalive: KeepaliveConfig,
98}
99
100impl LogplexConfig {
101 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
103 let mut schema_definition = self
104 .decoding
105 .schema_definition(log_namespace)
106 .with_standard_vector_source_metadata()
107 .with_source_metadata(
108 LogplexConfig::NAME,
109 None,
110 &owned_value_path!("timestamp"),
111 Kind::timestamp().or_undefined(),
112 Some("timestamp"),
113 )
114 .with_source_metadata(
115 LogplexConfig::NAME,
116 log_schema()
117 .host_key()
118 .cloned()
119 .map(LegacyKey::InsertIfEmpty),
120 &owned_value_path!("host"),
121 Kind::bytes(),
122 Some("host"),
123 )
124 .with_source_metadata(
125 LogplexConfig::NAME,
126 Some(LegacyKey::InsertIfEmpty(owned_value_path!("app_name"))),
127 &owned_value_path!("app_name"),
128 Kind::bytes(),
129 Some("service"),
130 )
131 .with_source_metadata(
132 LogplexConfig::NAME,
133 Some(LegacyKey::InsertIfEmpty(owned_value_path!("proc_id"))),
134 &owned_value_path!("proc_id"),
135 Kind::bytes(),
136 None,
137 )
138 .with_source_metadata(
140 LogplexConfig::NAME,
141 None,
142 &owned_value_path!("query_parameters"),
143 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
144 None,
145 );
146
147 if log_namespace == LogNamespace::Legacy {
149 schema_definition = schema_definition.unknown_fields(Kind::bytes());
150 }
151
152 schema_definition
153 }
154}
155
156impl Default for LogplexConfig {
157 fn default() -> Self {
158 Self {
159 address: "0.0.0.0:80".parse().unwrap(),
160 query_parameters: Vec::new(),
161 tls: None,
162 auth: None,
163 framing: default_framing_message_based(),
164 decoding: default_decoding(),
165 acknowledgements: SourceAcknowledgementsConfig::default(),
166 log_namespace: None,
167 keepalive: KeepaliveConfig::default(),
168 }
169 }
170}
171
172impl GenerateConfig for LogplexConfig {
173 fn generate_config() -> toml::Value {
174 toml::Value::try_from(LogplexConfig::default()).unwrap()
175 }
176}
177
178#[async_trait::async_trait]
179#[typetag::serde(name = "heroku_logs")]
180impl SourceConfig for LogplexConfig {
181 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
182 let log_namespace = cx.log_namespace(self.log_namespace);
183
184 let decoder =
185 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
186 .build()?;
187
188 let source = LogplexSource {
189 query_parameters: build_param_matcher(&remove_duplicates(
190 self.query_parameters.clone(),
191 "query_parameters",
192 ))?,
193 decoder,
194 log_namespace,
195 };
196
197 source.run(
198 self.address,
199 "events",
200 HttpMethod::Post,
201 StatusCode::OK,
202 true,
203 self.tls.as_ref(),
204 self.auth.as_ref(),
205 cx,
206 self.acknowledgements,
207 self.keepalive.clone(),
208 )
209 }
210
211 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
212 let schema_def = self.schema_definition(global_log_namespace.merge(self.log_namespace));
215 vec![SourceOutput::new_maybe_logs(DataType::Log, schema_def)]
216 }
217
218 fn resources(&self) -> Vec<Resource> {
219 vec![Resource::tcp(self.address)]
220 }
221
222 fn can_acknowledge(&self) -> bool {
223 true
224 }
225}
226
227#[derive(Clone, Default)]
228struct LogplexSource {
229 query_parameters: Vec<HttpConfigParamKind>,
230 decoder: Decoder,
231 log_namespace: LogNamespace,
232}
233
234impl LogplexSource {
235 fn decode_message(
236 &self,
237 body: Bytes,
238 header_map: &HeaderMap,
239 ) -> Result<Vec<Event>, ErrorMessage> {
240 let msg_count = match usize::from_str(get_header(header_map, "Logplex-Msg-Count")?) {
242 Ok(v) => v,
243 Err(e) => return Err(header_error_message("Logplex-Msg-Count", &e.to_string())),
244 };
245 let frame_id = get_header(header_map, "Logplex-Frame-Id")?;
246 let drain_token = get_header(header_map, "Logplex-Drain-Token")?;
247
248 emit!(HerokuLogplexRequestReceived {
249 msg_count,
250 frame_id,
251 drain_token
252 });
253
254 let events = self.body_to_events(body);
256
257 if events.len() != msg_count {
258 let error_msg = format!(
259 "Parsed event count does not match message count header: {} vs {}",
260 events.len(),
261 msg_count
262 );
263
264 if cfg!(test) {
265 panic!("{}", error_msg);
266 }
267 return Err(header_error_message("Logplex-Msg-Count", &error_msg));
268 }
269
270 Ok(events)
271 }
272
273 fn body_to_events(&self, body: Bytes) -> Vec<Event> {
274 let rdr = BufReader::new(body.reader());
275 rdr.lines()
276 .filter_map(|res| {
277 res.map_err(|error| emit!(HerokuLogplexRequestReadError { error }))
278 .ok()
279 })
280 .filter(|s| !s.is_empty())
281 .flat_map(|line| line_to_events(self.decoder.clone(), self.log_namespace, line))
282 .collect()
283 }
284}
285
286impl HttpSource for LogplexSource {
287 fn build_events(
288 &self,
289 body: Bytes,
290 header_map: &HeaderMap,
291 _query_parameters: &HashMap<String, String>,
292 _full_path: &str,
293 ) -> Result<Vec<Event>, ErrorMessage> {
294 self.decode_message(body, header_map)
295 }
296
297 fn enrich_events(
298 &self,
299 events: &mut [Event],
300 _request_path: &str,
301 _headers_config: &HeaderMap,
302 query_parameters: &HashMap<String, String>,
303 _source_ip: Option<&SocketAddr>,
304 ) {
305 add_query_parameters(
306 events,
307 &self.query_parameters,
308 query_parameters,
309 self.log_namespace,
310 LogplexConfig::NAME,
311 );
312 }
313}
314
315fn get_header<'a>(header_map: &'a HeaderMap, name: &str) -> Result<&'a str, ErrorMessage> {
316 if let Some(header_value) = header_map.get(name) {
317 header_value
318 .to_str()
319 .map_err(|e| header_error_message(name, &e.to_string()))
320 } else {
321 Err(header_error_message(name, "Header does not exist"))
322 }
323}
324
325fn header_error_message(name: &str, msg: &str) -> ErrorMessage {
326 ErrorMessage::new(
327 StatusCode::BAD_REQUEST,
328 format!("Invalid request header {name:?}: {msg:?}"),
329 )
330}
331
332fn line_to_events(
333 mut decoder: Decoder,
334 log_namespace: LogNamespace,
335 line: String,
336) -> SmallVec<[Event; 1]> {
337 let parts = line.splitn(8, ' ').collect::<Vec<&str>>();
338
339 let mut events = SmallVec::<[Event; 1]>::new();
340
341 if parts.len() == 8 {
342 let timestamp = parts[2];
343 let hostname = parts[3];
344 let app_name = parts[4];
345 let proc_id = parts[5];
346 let message = parts[7];
347
348 let mut buffer = BytesMut::new();
349 buffer.put(message.as_bytes());
350
351 let legacy_host_key = log_schema().host_key().cloned();
352 let legacy_app_key = parse_value_path("app_name").ok();
353 let legacy_proc_key = parse_value_path("proc_id").ok();
354
355 loop {
356 match decoder.decode_eof(&mut buffer) {
357 Ok(Some((decoded, _byte_size))) => {
358 for mut event in decoded {
359 if let Event::Log(ref mut log) = event {
360 if let Ok(ts) = timestamp.parse::<DateTime<Utc>>() {
361 log_namespace.insert_vector_metadata(
362 log,
363 log_schema().timestamp_key(),
364 path!("timestamp"),
365 ts,
366 );
367 }
368
369 log_namespace.insert_source_metadata(
370 LogplexConfig::NAME,
371 log,
372 legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
373 path!("host"),
374 hostname.to_owned(),
375 );
376
377 log_namespace.insert_source_metadata(
378 LogplexConfig::NAME,
379 log,
380 legacy_app_key.as_ref().map(LegacyKey::InsertIfEmpty),
381 path!("app_name"),
382 app_name.to_owned(),
383 );
384
385 log_namespace.insert_source_metadata(
386 LogplexConfig::NAME,
387 log,
388 legacy_proc_key.as_ref().map(LegacyKey::InsertIfEmpty),
389 path!("proc_id"),
390 proc_id.to_owned(),
391 );
392 }
393
394 events.push(event);
395 }
396 }
397 Ok(None) => break,
398 Err(error) => {
399 if !error.can_continue() {
400 break;
401 }
402 }
403 }
404 }
405 } else {
406 warn!(
407 message = "Line didn't match expected logplex format, so raw message is forwarded.",
408 fields = parts.len()
409 );
410
411 events.push(LogEvent::from_str_legacy(line).into())
412 };
413
414 let now = Utc::now();
415
416 for event in &mut events {
417 if let Event::Log(log) = event {
418 log_namespace.insert_standard_vector_source_metadata(log, LogplexConfig::NAME, now);
419 }
420 }
421
422 events
423}
424
425#[cfg(test)]
426mod tests {
427 use std::net::SocketAddr;
428
429 use chrono::{DateTime, Utc};
430 use futures::Stream;
431 use similar_asserts::assert_eq;
432 use vector_lib::{
433 config::LogNamespace,
434 event::{Event, EventStatus, Value},
435 lookup::{OwnedTargetPath, owned_value_path},
436 schema::Definition,
437 };
438 use vrl::value::{Kind, kind::Collection};
439
440 use super::LogplexConfig;
441 use crate::{
442 SourceSender,
443 common::http::server_auth::HttpServerAuthConfig,
444 config::{SourceConfig, SourceContext, log_schema},
445 serde::{default_decoding, default_framing_message_based},
446 test_util::{
447 addr::{PortGuard, next_addr},
448 components::{HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
449 random_string, spawn_collect_n, wait_for_tcp,
450 },
451 };
452
453 #[test]
454 fn generate_config() {
455 crate::test_util::test_generate_config::<LogplexConfig>();
456 }
457
458 async fn source(
459 auth: Option<HttpServerAuthConfig>,
460 query_parameters: Vec<String>,
461 status: EventStatus,
462 acknowledgements: bool,
463 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr, PortGuard) {
464 let (sender, recv) = SourceSender::new_test_finalize(status);
465 let (_guard, address) = next_addr();
466 let context = SourceContext::new_test(sender, None);
467 tokio::spawn(async move {
468 LogplexConfig {
469 address,
470 query_parameters,
471 tls: None,
472 auth,
473 framing: default_framing_message_based(),
474 decoding: default_decoding(),
475 acknowledgements: acknowledgements.into(),
476 log_namespace: None,
477 keepalive: Default::default(),
478 }
479 .build(context)
480 .await
481 .unwrap()
482 .await
483 .unwrap()
484 });
485 wait_for_tcp(address).await;
486 (recv, address, _guard)
487 }
488
489 async fn send(
490 address: SocketAddr,
491 body: &str,
492 auth: Option<HttpServerAuthConfig>,
493 query: &str,
494 ) -> u16 {
495 let len = body.lines().count();
496 let mut req = reqwest::Client::new().post(format!("http://{address}/events?{query}"));
497 if let Some(HttpServerAuthConfig::Basic { username, password }) = auth {
498 req = req.basic_auth(username, Some(password.inner()));
499 }
500 req.header("Logplex-Msg-Count", len)
501 .header("Logplex-Frame-Id", "frame-foo")
502 .header("Logplex-Drain-Token", "drain-bar")
503 .body(body.to_owned())
504 .send()
505 .await
506 .unwrap()
507 .status()
508 .as_u16()
509 }
510
511 fn make_auth() -> HttpServerAuthConfig {
512 HttpServerAuthConfig::Basic {
513 username: random_string(16),
514 password: random_string(16).into(),
515 }
516 }
517
518 const SAMPLE_BODY: &str = r#"267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#;
519
520 #[tokio::test]
521 async fn logplex_handles_router_log() {
522 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
523 let auth = make_auth();
524
525 let (rx, addr, _guard) = source(
526 Some(auth.clone()),
527 vec!["appname".to_string(), "absent".to_string()],
528 EventStatus::Delivered,
529 true,
530 )
531 .await;
532
533 let mut events = spawn_collect_n(
534 async move {
535 assert_eq!(
536 200,
537 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
538 )
539 },
540 rx,
541 SAMPLE_BODY.lines().count(),
542 )
543 .await;
544
545 let event = events.remove(0);
546 let log = event.as_log();
547
548 assert_eq!(
549 *log.get_message().unwrap(),
550 r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
551 );
552 assert_eq!(
553 log[log_schema().timestamp_key().unwrap().to_string()],
554 "2020-01-08T22:33:57.353034+00:00"
555 .parse::<DateTime<Utc>>()
556 .unwrap()
557 .into()
558 );
559 assert_eq!(*log.get_host().unwrap(), "host".into());
560 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
561 assert_eq!(log["appname"], "lumberjack-store".into());
562 assert_eq!(log["absent"], Value::Null);
563 }).await;
564 }
565
566 #[tokio::test]
567 async fn logplex_query_parameters_wildcard() {
568 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
569 let auth = make_auth();
570
571 let (rx, addr, _guard) = source(
572 Some(auth.clone()),
573 vec!["*".to_string()],
574 EventStatus::Delivered,
575 true,
576 )
577 .await;
578
579 let mut events = spawn_collect_n(
580 async move {
581 assert_eq!(
582 200,
583 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
584 )
585 },
586 rx,
587 SAMPLE_BODY.lines().count(),
588 )
589 .await;
590
591 let event = events.remove(0);
592 let log = event.as_log();
593
594 assert_eq!(
595 *log.get_message().unwrap(),
596 r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
597 );
598 assert_eq!(
599 log[log_schema().timestamp_key().unwrap().to_string()],
600 "2020-01-08T22:33:57.353034+00:00"
601 .parse::<DateTime<Utc>>()
602 .unwrap()
603 .into()
604 );
605 assert_eq!(*log.get_host().unwrap(), "host".into());
606 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
607 assert_eq!(log["appname"], "lumberjack-store".into());
608 }).await;
609 }
610
611 #[tokio::test]
612 async fn logplex_handles_failures() {
613 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
614 let auth = make_auth();
615
616 let (rx, addr, _guard) =
617 source(Some(auth.clone()), vec![], EventStatus::Rejected, true).await;
618
619 let events = spawn_collect_n(
620 async move {
621 assert_eq!(
622 400,
623 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
624 )
625 },
626 rx,
627 SAMPLE_BODY.lines().count(),
628 )
629 .await;
630
631 assert_eq!(events.len(), SAMPLE_BODY.lines().count());
632 })
633 .await;
634 }
635
636 #[tokio::test]
637 async fn logplex_ignores_disabled_acknowledgements() {
638 let auth = make_auth();
639
640 let (rx, addr, _guard) =
641 source(Some(auth.clone()), vec![], EventStatus::Rejected, false).await;
642
643 let events = spawn_collect_n(
644 async move {
645 assert_eq!(
646 200,
647 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
648 )
649 },
650 rx,
651 SAMPLE_BODY.lines().count(),
652 )
653 .await;
654
655 assert_eq!(events.len(), SAMPLE_BODY.lines().count());
656 }
657
658 #[tokio::test]
659 async fn logplex_auth_failure() {
660 let (_rx, addr, _guard) =
661 source(Some(make_auth()), vec![], EventStatus::Delivered, true).await;
662
663 assert_eq!(
664 401,
665 send(
666 addr,
667 SAMPLE_BODY,
668 Some(make_auth()),
669 "appname=lumberjack-store"
670 )
671 .await
672 );
673 }
674
675 #[test]
676 fn logplex_handles_normal_lines() {
677 let log_namespace = LogNamespace::Legacy;
678 let body = "267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - foo bar baz";
679 let events = super::line_to_events(Default::default(), log_namespace, body.into());
680 let log = events[0].as_log();
681
682 assert_eq!(*log.get_message().unwrap(), "foo bar baz".into());
683 assert_eq!(
684 log[log_schema().timestamp_key().unwrap().to_string()],
685 "2020-01-08T22:33:57.353034+00:00"
686 .parse::<DateTime<Utc>>()
687 .unwrap()
688 .into()
689 );
690 assert_eq!(*log.get_host().unwrap(), "host".into());
691 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
692 }
693
694 #[test]
695 fn logplex_handles_malformed_lines() {
696 let log_namespace = LogNamespace::Legacy;
697 let body = "what am i doing here";
698 let events = super::line_to_events(Default::default(), log_namespace, body.into());
699 let log = events[0].as_log();
700
701 assert_eq!(*log.get_message().unwrap(), "what am i doing here".into());
702 assert!(log.get_timestamp().is_some());
703 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
704 }
705
706 #[test]
707 fn logplex_doesnt_blow_up_on_bad_framing() {
708 let log_namespace = LogNamespace::Legacy;
709 let body = "1000000 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - i'm not that long";
710 let events = super::line_to_events(Default::default(), log_namespace, body.into());
711 let log = events[0].as_log();
712
713 assert_eq!(*log.get_message().unwrap(), "i'm not that long".into());
714 assert_eq!(
715 log[log_schema().timestamp_key().unwrap().to_string()],
716 "2020-01-08T22:33:57.353034+00:00"
717 .parse::<DateTime<Utc>>()
718 .unwrap()
719 .into()
720 );
721 assert_eq!(*log.get_host().unwrap(), "host".into());
722 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
723 }
724
725 #[test]
726 fn output_schema_definition_vector_namespace() {
727 let config = LogplexConfig {
728 log_namespace: Some(true),
729 ..Default::default()
730 };
731
732 let definitions = config
733 .outputs(LogNamespace::Vector)
734 .remove(0)
735 .schema_definition(true);
736
737 let expected_definition =
738 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
739 .with_meaning(OwnedTargetPath::event_root(), "message")
740 .with_metadata_field(
741 &owned_value_path!("vector", "source_type"),
742 Kind::bytes(),
743 None,
744 )
745 .with_metadata_field(
746 &owned_value_path!("vector", "ingest_timestamp"),
747 Kind::timestamp(),
748 None,
749 )
750 .with_metadata_field(
751 &owned_value_path!(LogplexConfig::NAME, "timestamp"),
752 Kind::timestamp().or_undefined(),
753 Some("timestamp"),
754 )
755 .with_metadata_field(
756 &owned_value_path!(LogplexConfig::NAME, "host"),
757 Kind::bytes(),
758 Some("host"),
759 )
760 .with_metadata_field(
761 &owned_value_path!(LogplexConfig::NAME, "app_name"),
762 Kind::bytes(),
763 Some("service"),
764 )
765 .with_metadata_field(
766 &owned_value_path!(LogplexConfig::NAME, "proc_id"),
767 Kind::bytes(),
768 None,
769 )
770 .with_metadata_field(
771 &owned_value_path!(LogplexConfig::NAME, "query_parameters"),
772 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
773 None,
774 );
775
776 assert_eq!(definitions, Some(expected_definition))
777 }
778
779 #[test]
780 fn output_schema_definition_legacy_namespace() {
781 let config = LogplexConfig::default();
782
783 let definitions = config
784 .outputs(LogNamespace::Legacy)
785 .remove(0)
786 .schema_definition(true);
787
788 let expected_definition = Definition::new_with_default_metadata(
789 Kind::object(Collection::empty()),
790 [LogNamespace::Legacy],
791 )
792 .with_event_field(
793 &owned_value_path!("message"),
794 Kind::bytes(),
795 Some("message"),
796 )
797 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
798 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
799 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
800 .with_event_field(
801 &owned_value_path!("app_name"),
802 Kind::bytes(),
803 Some("service"),
804 )
805 .with_event_field(&owned_value_path!("proc_id"), Kind::bytes(), None)
806 .unknown_fields(Kind::bytes());
807
808 assert_eq!(definitions, Some(expected_definition))
809 }
810}