1use std::{
2 collections::HashMap,
3 io::{BufRead, BufReader},
4 net::SocketAddr,
5 str::FromStr,
6};
7
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use chrono::{DateTime, Utc};
10use smallvec::SmallVec;
11use tokio_util::codec::Decoder as _;
12use vector_lib::{
13 codecs::{
14 StreamDecodingError,
15 decoding::{DeserializerConfig, FramingConfig},
16 },
17 config::{DataType, LegacyKey, LogNamespace},
18 configurable::configurable_component,
19 lookup::{lookup_v2::parse_value_path, owned_value_path, path},
20 schema::Definition,
21};
22use vrl::value::{Kind, kind::Collection};
23use warp::http::{HeaderMap, StatusCode};
24
25use crate::{
26 codecs::{Decoder, DecodingConfig},
27 common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
28 config::{
29 GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
30 SourceOutput, log_schema,
31 },
32 event::{Event, LogEvent},
33 http::KeepaliveConfig,
34 internal_events::{HerokuLogplexRequestReadError, HerokuLogplexRequestReceived},
35 serde::{bool_or_struct, default_decoding, default_framing_message_based},
36 sources::{
37 http_server::{HttpConfigParamKind, build_param_matcher, remove_duplicates},
38 util::{
39 HttpSource,
40 http::{HttpMethod, add_query_parameters},
41 },
42 },
43 tls::TlsEnableableConfig,
44};
45
46#[configurable_component(source(
48 "heroku_logs",
49 "Collect logs from Heroku's Logplex, the router responsible for receiving logs from your Heroku apps."
50))]
51#[derive(Clone, Debug)]
52pub struct LogplexConfig {
53 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
55 #[configurable(metadata(docs::examples = "localhost:80"))]
56 address: SocketAddr,
57
58 #[serde(default)]
66 #[configurable(metadata(docs::examples = "application"))]
67 #[configurable(metadata(docs::examples = "source"))]
68 #[configurable(metadata(docs::examples = "param*"))]
69 #[configurable(metadata(docs::examples = "*"))]
70 query_parameters: Vec<String>,
71
72 #[configurable(derived)]
73 tls: Option<TlsEnableableConfig>,
74
75 #[configurable(derived)]
76 auth: Option<HttpServerAuthConfig>,
77
78 #[configurable(derived)]
79 #[serde(default = "default_framing_message_based")]
80 framing: FramingConfig,
81
82 #[configurable(derived)]
83 #[serde(default = "default_decoding")]
84 decoding: DeserializerConfig,
85
86 #[configurable(derived)]
87 #[serde(default, deserialize_with = "bool_or_struct")]
88 acknowledgements: SourceAcknowledgementsConfig,
89
90 #[configurable(metadata(docs::hidden))]
92 #[serde(default)]
93 log_namespace: Option<bool>,
94
95 #[configurable(derived)]
96 #[serde(default)]
97 keepalive: KeepaliveConfig,
98}
99
100impl LogplexConfig {
101 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
103 let mut schema_definition = self
104 .decoding
105 .schema_definition(log_namespace)
106 .with_standard_vector_source_metadata()
107 .with_source_metadata(
108 LogplexConfig::NAME,
109 None,
110 &owned_value_path!("timestamp"),
111 Kind::timestamp().or_undefined(),
112 Some("timestamp"),
113 )
114 .with_source_metadata(
115 LogplexConfig::NAME,
116 log_schema()
117 .host_key()
118 .cloned()
119 .map(LegacyKey::InsertIfEmpty),
120 &owned_value_path!("host"),
121 Kind::bytes(),
122 Some("host"),
123 )
124 .with_source_metadata(
125 LogplexConfig::NAME,
126 Some(LegacyKey::InsertIfEmpty(owned_value_path!("app_name"))),
127 &owned_value_path!("app_name"),
128 Kind::bytes(),
129 Some("service"),
130 )
131 .with_source_metadata(
132 LogplexConfig::NAME,
133 Some(LegacyKey::InsertIfEmpty(owned_value_path!("proc_id"))),
134 &owned_value_path!("proc_id"),
135 Kind::bytes(),
136 None,
137 )
138 .with_source_metadata(
140 LogplexConfig::NAME,
141 None,
142 &owned_value_path!("query_parameters"),
143 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
144 None,
145 );
146
147 if log_namespace == LogNamespace::Legacy {
149 schema_definition = schema_definition.unknown_fields(Kind::bytes());
150 }
151
152 schema_definition
153 }
154}
155
156impl Default for LogplexConfig {
157 fn default() -> Self {
158 Self {
159 address: "0.0.0.0:80".parse().unwrap(),
160 query_parameters: Vec::new(),
161 tls: None,
162 auth: None,
163 framing: default_framing_message_based(),
164 decoding: default_decoding(),
165 acknowledgements: SourceAcknowledgementsConfig::default(),
166 log_namespace: None,
167 keepalive: KeepaliveConfig::default(),
168 }
169 }
170}
171
172impl GenerateConfig for LogplexConfig {
173 fn generate_config() -> toml::Value {
174 toml::Value::try_from(LogplexConfig::default()).unwrap()
175 }
176}
177
178#[async_trait::async_trait]
179#[typetag::serde(name = "heroku_logs")]
180impl SourceConfig for LogplexConfig {
181 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
182 let log_namespace = cx.log_namespace(self.log_namespace);
183
184 let decoder =
185 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
186 .build()?;
187
188 let source = LogplexSource {
189 query_parameters: build_param_matcher(&remove_duplicates(
190 self.query_parameters.clone(),
191 "query_parameters",
192 ))?,
193 decoder,
194 log_namespace,
195 };
196
197 source.run(
198 self.address,
199 "events",
200 HttpMethod::Post,
201 StatusCode::OK,
202 true,
203 self.tls.as_ref(),
204 self.auth.as_ref(),
205 cx,
206 self.acknowledgements,
207 self.keepalive.clone(),
208 )
209 }
210
211 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
212 let schema_def = self.schema_definition(global_log_namespace.merge(self.log_namespace));
215 vec![SourceOutput::new_maybe_logs(DataType::Log, schema_def)]
216 }
217
218 fn resources(&self) -> Vec<Resource> {
219 vec![Resource::tcp(self.address)]
220 }
221
222 fn can_acknowledge(&self) -> bool {
223 true
224 }
225}
226
227#[derive(Clone, Default)]
228struct LogplexSource {
229 query_parameters: Vec<HttpConfigParamKind>,
230 decoder: Decoder,
231 log_namespace: LogNamespace,
232}
233
234impl LogplexSource {
235 fn decode_message(
236 &self,
237 body: Bytes,
238 header_map: &HeaderMap,
239 ) -> Result<Vec<Event>, ErrorMessage> {
240 let msg_count = match usize::from_str(get_header(header_map, "Logplex-Msg-Count")?) {
242 Ok(v) => v,
243 Err(e) => return Err(header_error_message("Logplex-Msg-Count", &e.to_string())),
244 };
245 let frame_id = get_header(header_map, "Logplex-Frame-Id")?;
246 let drain_token = get_header(header_map, "Logplex-Drain-Token")?;
247
248 emit!(HerokuLogplexRequestReceived {
249 msg_count,
250 frame_id,
251 drain_token
252 });
253
254 let events = self.body_to_events(body);
256
257 if events.len() != msg_count {
258 let error_msg = format!(
259 "Parsed event count does not match message count header: {} vs {}",
260 events.len(),
261 msg_count
262 );
263
264 if cfg!(test) {
265 panic!("{}", error_msg);
266 }
267 return Err(header_error_message("Logplex-Msg-Count", &error_msg));
268 }
269
270 Ok(events)
271 }
272
273 fn body_to_events(&self, body: Bytes) -> Vec<Event> {
274 let rdr = BufReader::new(body.reader());
275 rdr.lines()
276 .filter_map(|res| {
277 res.map_err(|error| emit!(HerokuLogplexRequestReadError { error }))
278 .ok()
279 })
280 .filter(|s| !s.is_empty())
281 .flat_map(|line| line_to_events(self.decoder.clone(), self.log_namespace, line))
282 .collect()
283 }
284}
285
286impl HttpSource for LogplexSource {
287 fn build_events(
288 &self,
289 body: Bytes,
290 header_map: &HeaderMap,
291 _query_parameters: &HashMap<String, String>,
292 _full_path: &str,
293 ) -> Result<Vec<Event>, ErrorMessage> {
294 self.decode_message(body, header_map)
295 }
296
297 fn enrich_events(
298 &self,
299 events: &mut [Event],
300 _request_path: &str,
301 _headers_config: &HeaderMap,
302 query_parameters: &HashMap<String, String>,
303 _source_ip: Option<&SocketAddr>,
304 ) {
305 add_query_parameters(
306 events,
307 &self.query_parameters,
308 query_parameters,
309 self.log_namespace,
310 LogplexConfig::NAME,
311 );
312 }
313}
314
315fn get_header<'a>(header_map: &'a HeaderMap, name: &str) -> Result<&'a str, ErrorMessage> {
316 if let Some(header_value) = header_map.get(name) {
317 header_value
318 .to_str()
319 .map_err(|e| header_error_message(name, &e.to_string()))
320 } else {
321 Err(header_error_message(name, "Header does not exist"))
322 }
323}
324
325fn header_error_message(name: &str, msg: &str) -> ErrorMessage {
326 ErrorMessage::new(
327 StatusCode::BAD_REQUEST,
328 format!("Invalid request header {name:?}: {msg:?}"),
329 )
330}
331
332fn line_to_events(
333 mut decoder: Decoder,
334 log_namespace: LogNamespace,
335 line: String,
336) -> SmallVec<[Event; 1]> {
337 let parts = line.splitn(8, ' ').collect::<Vec<&str>>();
338
339 let mut events = SmallVec::<[Event; 1]>::new();
340
341 if parts.len() == 8 {
342 let timestamp = parts[2];
343 let hostname = parts[3];
344 let app_name = parts[4];
345 let proc_id = parts[5];
346 let message = parts[7];
347
348 let mut buffer = BytesMut::new();
349 buffer.put(message.as_bytes());
350
351 let legacy_host_key = log_schema().host_key().cloned();
352 let legacy_app_key = parse_value_path("app_name").ok();
353 let legacy_proc_key = parse_value_path("proc_id").ok();
354
355 loop {
356 match decoder.decode_eof(&mut buffer) {
357 Ok(Some((decoded, _byte_size))) => {
358 for mut event in decoded {
359 if let Event::Log(ref mut log) = event {
360 if let Ok(ts) = timestamp.parse::<DateTime<Utc>>() {
361 log_namespace.insert_vector_metadata(
362 log,
363 log_schema().timestamp_key(),
364 path!("timestamp"),
365 ts,
366 );
367 }
368
369 log_namespace.insert_source_metadata(
370 LogplexConfig::NAME,
371 log,
372 legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
373 path!("host"),
374 hostname.to_owned(),
375 );
376
377 log_namespace.insert_source_metadata(
378 LogplexConfig::NAME,
379 log,
380 legacy_app_key.as_ref().map(LegacyKey::InsertIfEmpty),
381 path!("app_name"),
382 app_name.to_owned(),
383 );
384
385 log_namespace.insert_source_metadata(
386 LogplexConfig::NAME,
387 log,
388 legacy_proc_key.as_ref().map(LegacyKey::InsertIfEmpty),
389 path!("proc_id"),
390 proc_id.to_owned(),
391 );
392 }
393
394 events.push(event);
395 }
396 }
397 Ok(None) => break,
398 Err(error) => {
399 if !error.can_continue() {
400 break;
401 }
402 }
403 }
404 }
405 } else {
406 warn!(
407 message = "Line didn't match expected logplex format, so raw message is forwarded.",
408 fields = parts.len()
409 );
410
411 events.push(LogEvent::from_str_legacy(line).into())
412 };
413
414 let now = Utc::now();
415
416 for event in &mut events {
417 if let Event::Log(log) = event {
418 log_namespace.insert_standard_vector_source_metadata(log, LogplexConfig::NAME, now);
419 }
420 }
421
422 events
423}
424
425#[cfg(test)]
426mod tests {
427 use std::net::SocketAddr;
428
429 use chrono::{DateTime, Utc};
430 use futures::Stream;
431 use similar_asserts::assert_eq;
432 use vector_lib::{
433 config::LogNamespace,
434 event::{Event, EventStatus, Value},
435 lookup::{OwnedTargetPath, owned_value_path},
436 schema::Definition,
437 };
438 use vrl::value::{Kind, kind::Collection};
439
440 use super::LogplexConfig;
441 use crate::{
442 SourceSender,
443 common::http::server_auth::HttpServerAuthConfig,
444 config::{SourceConfig, SourceContext, log_schema},
445 serde::{default_decoding, default_framing_message_based},
446 test_util::{
447 components::{HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
448 next_addr, random_string, spawn_collect_n, wait_for_tcp,
449 },
450 };
451
452 #[test]
453 fn generate_config() {
454 crate::test_util::test_generate_config::<LogplexConfig>();
455 }
456
457 async fn source(
458 auth: Option<HttpServerAuthConfig>,
459 query_parameters: Vec<String>,
460 status: EventStatus,
461 acknowledgements: bool,
462 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
463 let (sender, recv) = SourceSender::new_test_finalize(status);
464 let address = next_addr();
465 let context = SourceContext::new_test(sender, None);
466 tokio::spawn(async move {
467 LogplexConfig {
468 address,
469 query_parameters,
470 tls: None,
471 auth,
472 framing: default_framing_message_based(),
473 decoding: default_decoding(),
474 acknowledgements: acknowledgements.into(),
475 log_namespace: None,
476 keepalive: Default::default(),
477 }
478 .build(context)
479 .await
480 .unwrap()
481 .await
482 .unwrap()
483 });
484 wait_for_tcp(address).await;
485 (recv, address)
486 }
487
488 async fn send(
489 address: SocketAddr,
490 body: &str,
491 auth: Option<HttpServerAuthConfig>,
492 query: &str,
493 ) -> u16 {
494 let len = body.lines().count();
495 let mut req = reqwest::Client::new().post(format!("http://{address}/events?{query}"));
496 if let Some(HttpServerAuthConfig::Basic { username, password }) = auth {
497 req = req.basic_auth(username, Some(password.inner()));
498 }
499 req.header("Logplex-Msg-Count", len)
500 .header("Logplex-Frame-Id", "frame-foo")
501 .header("Logplex-Drain-Token", "drain-bar")
502 .body(body.to_owned())
503 .send()
504 .await
505 .unwrap()
506 .status()
507 .as_u16()
508 }
509
510 fn make_auth() -> HttpServerAuthConfig {
511 HttpServerAuthConfig::Basic {
512 username: random_string(16),
513 password: random_string(16).into(),
514 }
515 }
516
517 const SAMPLE_BODY: &str = r#"267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#;
518
519 #[tokio::test]
520 async fn logplex_handles_router_log() {
521 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
522 let auth = make_auth();
523
524 let (rx, addr) = source(
525 Some(auth.clone()),
526 vec!["appname".to_string(), "absent".to_string()],
527 EventStatus::Delivered,
528 true,
529 )
530 .await;
531
532 let mut events = spawn_collect_n(
533 async move {
534 assert_eq!(
535 200,
536 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
537 )
538 },
539 rx,
540 SAMPLE_BODY.lines().count(),
541 )
542 .await;
543
544 let event = events.remove(0);
545 let log = event.as_log();
546
547 assert_eq!(
548 *log.get_message().unwrap(),
549 r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
550 );
551 assert_eq!(
552 log[log_schema().timestamp_key().unwrap().to_string()],
553 "2020-01-08T22:33:57.353034+00:00"
554 .parse::<DateTime<Utc>>()
555 .unwrap()
556 .into()
557 );
558 assert_eq!(*log.get_host().unwrap(), "host".into());
559 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
560 assert_eq!(log["appname"], "lumberjack-store".into());
561 assert_eq!(log["absent"], Value::Null);
562 }).await;
563 }
564
565 #[tokio::test]
566 async fn logplex_query_parameters_wildcard() {
567 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
568 let auth = make_auth();
569
570 let (rx, addr) = source(
571 Some(auth.clone()),
572 vec!["*".to_string()],
573 EventStatus::Delivered,
574 true,
575 )
576 .await;
577
578 let mut events = spawn_collect_n(
579 async move {
580 assert_eq!(
581 200,
582 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
583 )
584 },
585 rx,
586 SAMPLE_BODY.lines().count(),
587 )
588 .await;
589
590 let event = events.remove(0);
591 let log = event.as_log();
592
593 assert_eq!(
594 *log.get_message().unwrap(),
595 r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
596 );
597 assert_eq!(
598 log[log_schema().timestamp_key().unwrap().to_string()],
599 "2020-01-08T22:33:57.353034+00:00"
600 .parse::<DateTime<Utc>>()
601 .unwrap()
602 .into()
603 );
604 assert_eq!(*log.get_host().unwrap(), "host".into());
605 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
606 assert_eq!(log["appname"], "lumberjack-store".into());
607 }).await;
608 }
609
610 #[tokio::test]
611 async fn logplex_handles_failures() {
612 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
613 let auth = make_auth();
614
615 let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, true).await;
616
617 let events = spawn_collect_n(
618 async move {
619 assert_eq!(
620 400,
621 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
622 )
623 },
624 rx,
625 SAMPLE_BODY.lines().count(),
626 )
627 .await;
628
629 assert_eq!(events.len(), SAMPLE_BODY.lines().count());
630 })
631 .await;
632 }
633
634 #[tokio::test]
635 async fn logplex_ignores_disabled_acknowledgements() {
636 let auth = make_auth();
637
638 let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, false).await;
639
640 let events = spawn_collect_n(
641 async move {
642 assert_eq!(
643 200,
644 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
645 )
646 },
647 rx,
648 SAMPLE_BODY.lines().count(),
649 )
650 .await;
651
652 assert_eq!(events.len(), SAMPLE_BODY.lines().count());
653 }
654
655 #[tokio::test]
656 async fn logplex_auth_failure() {
657 let (_rx, addr) = source(Some(make_auth()), vec![], EventStatus::Delivered, true).await;
658
659 assert_eq!(
660 401,
661 send(
662 addr,
663 SAMPLE_BODY,
664 Some(make_auth()),
665 "appname=lumberjack-store"
666 )
667 .await
668 );
669 }
670
671 #[test]
672 fn logplex_handles_normal_lines() {
673 let log_namespace = LogNamespace::Legacy;
674 let body = "267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - foo bar baz";
675 let events = super::line_to_events(Default::default(), log_namespace, body.into());
676 let log = events[0].as_log();
677
678 assert_eq!(*log.get_message().unwrap(), "foo bar baz".into());
679 assert_eq!(
680 log[log_schema().timestamp_key().unwrap().to_string()],
681 "2020-01-08T22:33:57.353034+00:00"
682 .parse::<DateTime<Utc>>()
683 .unwrap()
684 .into()
685 );
686 assert_eq!(*log.get_host().unwrap(), "host".into());
687 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
688 }
689
690 #[test]
691 fn logplex_handles_malformed_lines() {
692 let log_namespace = LogNamespace::Legacy;
693 let body = "what am i doing here";
694 let events = super::line_to_events(Default::default(), log_namespace, body.into());
695 let log = events[0].as_log();
696
697 assert_eq!(*log.get_message().unwrap(), "what am i doing here".into());
698 assert!(log.get_timestamp().is_some());
699 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
700 }
701
702 #[test]
703 fn logplex_doesnt_blow_up_on_bad_framing() {
704 let log_namespace = LogNamespace::Legacy;
705 let body = "1000000 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - i'm not that long";
706 let events = super::line_to_events(Default::default(), log_namespace, body.into());
707 let log = events[0].as_log();
708
709 assert_eq!(*log.get_message().unwrap(), "i'm not that long".into());
710 assert_eq!(
711 log[log_schema().timestamp_key().unwrap().to_string()],
712 "2020-01-08T22:33:57.353034+00:00"
713 .parse::<DateTime<Utc>>()
714 .unwrap()
715 .into()
716 );
717 assert_eq!(*log.get_host().unwrap(), "host".into());
718 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
719 }
720
721 #[test]
722 fn output_schema_definition_vector_namespace() {
723 let config = LogplexConfig {
724 log_namespace: Some(true),
725 ..Default::default()
726 };
727
728 let definitions = config
729 .outputs(LogNamespace::Vector)
730 .remove(0)
731 .schema_definition(true);
732
733 let expected_definition =
734 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
735 .with_meaning(OwnedTargetPath::event_root(), "message")
736 .with_metadata_field(
737 &owned_value_path!("vector", "source_type"),
738 Kind::bytes(),
739 None,
740 )
741 .with_metadata_field(
742 &owned_value_path!("vector", "ingest_timestamp"),
743 Kind::timestamp(),
744 None,
745 )
746 .with_metadata_field(
747 &owned_value_path!(LogplexConfig::NAME, "timestamp"),
748 Kind::timestamp().or_undefined(),
749 Some("timestamp"),
750 )
751 .with_metadata_field(
752 &owned_value_path!(LogplexConfig::NAME, "host"),
753 Kind::bytes(),
754 Some("host"),
755 )
756 .with_metadata_field(
757 &owned_value_path!(LogplexConfig::NAME, "app_name"),
758 Kind::bytes(),
759 Some("service"),
760 )
761 .with_metadata_field(
762 &owned_value_path!(LogplexConfig::NAME, "proc_id"),
763 Kind::bytes(),
764 None,
765 )
766 .with_metadata_field(
767 &owned_value_path!(LogplexConfig::NAME, "query_parameters"),
768 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
769 None,
770 );
771
772 assert_eq!(definitions, Some(expected_definition))
773 }
774
775 #[test]
776 fn output_schema_definition_legacy_namespace() {
777 let config = LogplexConfig::default();
778
779 let definitions = config
780 .outputs(LogNamespace::Legacy)
781 .remove(0)
782 .schema_definition(true);
783
784 let expected_definition = Definition::new_with_default_metadata(
785 Kind::object(Collection::empty()),
786 [LogNamespace::Legacy],
787 )
788 .with_event_field(
789 &owned_value_path!("message"),
790 Kind::bytes(),
791 Some("message"),
792 )
793 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
794 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
795 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
796 .with_event_field(
797 &owned_value_path!("app_name"),
798 Kind::bytes(),
799 Some("service"),
800 )
801 .with_event_field(&owned_value_path!("proc_id"), Kind::bytes(), None)
802 .unknown_fields(Kind::bytes());
803
804 assert_eq!(definitions, Some(expected_definition))
805 }
806}