1use std::{
2 collections::HashMap,
3 io::{BufRead, BufReader},
4 net::SocketAddr,
5 str::FromStr,
6};
7
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use chrono::{DateTime, Utc};
10use smallvec::SmallVec;
11use tokio_util::codec::Decoder as _;
12use vector_lib::{
13 codecs::{
14 StreamDecodingError,
15 decoding::{DeserializerConfig, FramingConfig},
16 },
17 config::{DataType, LegacyKey, LogNamespace},
18 configurable::configurable_component,
19 lookup::{lookup_v2::parse_value_path, owned_value_path, path},
20 schema::Definition,
21};
22use vrl::value::{Kind, kind::Collection};
23use warp::http::{HeaderMap, StatusCode};
24
25use crate::{
26 codecs::{Decoder, DecodingConfig},
27 common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
28 config::{
29 GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
30 SourceOutput, log_schema,
31 },
32 event::{Event, LogEvent},
33 http::KeepaliveConfig,
34 internal_events::{HerokuLogplexRequestReadError, HerokuLogplexRequestReceived},
35 serde::{bool_or_struct, default_decoding, default_framing_message_based},
36 sources::{
37 http_server::{HttpConfigParamKind, build_param_matcher, remove_duplicates},
38 util::{
39 HttpSource,
40 http::{HttpMethod, add_query_parameters},
41 },
42 },
43 tls::TlsEnableableConfig,
44};
45
46#[configurable_component(source(
48 "heroku_logs",
49 "Collect logs from Heroku's Logplex, the router responsible for receiving logs from your Heroku apps."
50))]
51#[derive(Clone, Debug)]
52pub struct LogplexConfig {
53 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
55 #[configurable(metadata(docs::examples = "localhost:80"))]
56 address: SocketAddr,
57
58 #[serde(default)]
66 #[configurable(metadata(docs::examples = "application"))]
67 #[configurable(metadata(docs::examples = "source"))]
68 #[configurable(metadata(docs::examples = "param*"))]
69 #[configurable(metadata(docs::examples = "*"))]
70 query_parameters: Vec<String>,
71
72 #[configurable(derived)]
73 tls: Option<TlsEnableableConfig>,
74
75 #[configurable(derived)]
76 auth: Option<HttpServerAuthConfig>,
77
78 #[configurable(derived)]
79 #[serde(default = "default_framing_message_based")]
80 framing: FramingConfig,
81
82 #[configurable(derived)]
83 #[serde(default = "default_decoding")]
84 decoding: DeserializerConfig,
85
86 #[configurable(derived)]
87 #[serde(default, deserialize_with = "bool_or_struct")]
88 acknowledgements: SourceAcknowledgementsConfig,
89
90 #[configurable(metadata(docs::hidden))]
92 #[serde(default)]
93 log_namespace: Option<bool>,
94
95 #[configurable(derived)]
96 #[serde(default)]
97 keepalive: KeepaliveConfig,
98}
99
100impl LogplexConfig {
101 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
103 let mut schema_definition = self
104 .decoding
105 .schema_definition(log_namespace)
106 .with_standard_vector_source_metadata()
107 .with_source_metadata(
108 LogplexConfig::NAME,
109 None,
110 &owned_value_path!("timestamp"),
111 Kind::timestamp().or_undefined(),
112 Some("timestamp"),
113 )
114 .with_source_metadata(
115 LogplexConfig::NAME,
116 log_schema()
117 .host_key()
118 .cloned()
119 .map(LegacyKey::InsertIfEmpty),
120 &owned_value_path!("host"),
121 Kind::bytes(),
122 Some("host"),
123 )
124 .with_source_metadata(
125 LogplexConfig::NAME,
126 Some(LegacyKey::InsertIfEmpty(owned_value_path!("app_name"))),
127 &owned_value_path!("app_name"),
128 Kind::bytes(),
129 Some("service"),
130 )
131 .with_source_metadata(
132 LogplexConfig::NAME,
133 Some(LegacyKey::InsertIfEmpty(owned_value_path!("proc_id"))),
134 &owned_value_path!("proc_id"),
135 Kind::bytes(),
136 None,
137 )
138 .with_source_metadata(
140 LogplexConfig::NAME,
141 None,
142 &owned_value_path!("query_parameters"),
143 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
144 None,
145 );
146
147 if log_namespace == LogNamespace::Legacy {
149 schema_definition = schema_definition.unknown_fields(Kind::bytes());
150 }
151
152 schema_definition
153 }
154}
155
156impl Default for LogplexConfig {
157 fn default() -> Self {
158 Self {
159 address: "0.0.0.0:80".parse().unwrap(),
160 query_parameters: Vec::new(),
161 tls: None,
162 auth: None,
163 framing: default_framing_message_based(),
164 decoding: default_decoding(),
165 acknowledgements: SourceAcknowledgementsConfig::default(),
166 log_namespace: None,
167 keepalive: KeepaliveConfig::default(),
168 }
169 }
170}
171
172impl GenerateConfig for LogplexConfig {
173 fn generate_config() -> toml::Value {
174 toml::Value::try_from(LogplexConfig::default()).unwrap()
175 }
176}
177
178#[async_trait::async_trait]
179#[typetag::serde(name = "heroku_logs")]
180impl SourceConfig for LogplexConfig {
181 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
182 let log_namespace = cx.log_namespace(self.log_namespace);
183
184 let decoder =
185 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
186 .build()?;
187
188 let source = LogplexSource {
189 query_parameters: build_param_matcher(&remove_duplicates(
190 self.query_parameters.clone(),
191 "query_parameters",
192 ))?,
193 decoder,
194 log_namespace,
195 };
196
197 source.run(
198 self.address,
199 "events",
200 HttpMethod::Post,
201 StatusCode::OK,
202 true,
203 self.tls.as_ref(),
204 self.auth.as_ref(),
205 cx,
206 self.acknowledgements,
207 self.keepalive.clone(),
208 )
209 }
210
211 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
212 let schema_def = self.schema_definition(global_log_namespace.merge(self.log_namespace));
215 vec![SourceOutput::new_maybe_logs(DataType::Log, schema_def)]
216 }
217
218 fn resources(&self) -> Vec<Resource> {
219 vec![Resource::tcp(self.address)]
220 }
221
222 fn can_acknowledge(&self) -> bool {
223 true
224 }
225}
226
227#[derive(Clone, Default)]
228struct LogplexSource {
229 query_parameters: Vec<HttpConfigParamKind>,
230 decoder: Decoder,
231 log_namespace: LogNamespace,
232}
233
234impl LogplexSource {
235 fn decode_message(
236 &self,
237 body: Bytes,
238 header_map: &HeaderMap,
239 ) -> Result<Vec<Event>, ErrorMessage> {
240 let msg_count = match usize::from_str(get_header(header_map, "Logplex-Msg-Count")?) {
242 Ok(v) => v,
243 Err(e) => return Err(header_error_message("Logplex-Msg-Count", &e.to_string())),
244 };
245 let frame_id = get_header(header_map, "Logplex-Frame-Id")?;
246 let drain_token = get_header(header_map, "Logplex-Drain-Token")?;
247
248 emit!(HerokuLogplexRequestReceived {
249 msg_count,
250 frame_id,
251 drain_token
252 });
253
254 let events = self.body_to_events(body);
256
257 if events.len() != msg_count {
258 let error_msg = format!(
259 "Parsed event count does not match message count header: {} vs {}",
260 events.len(),
261 msg_count
262 );
263
264 if cfg!(test) {
265 panic!("{}", error_msg);
266 }
267 return Err(header_error_message("Logplex-Msg-Count", &error_msg));
268 }
269
270 Ok(events)
271 }
272
273 fn body_to_events(&self, body: Bytes) -> Vec<Event> {
274 let rdr = BufReader::new(body.reader());
275 rdr.lines()
276 .filter_map(|res| {
277 res.map_err(|error| emit!(HerokuLogplexRequestReadError { error }))
278 .ok()
279 })
280 .filter(|s| !s.is_empty())
281 .flat_map(|line| line_to_events(self.decoder.clone(), self.log_namespace, line))
282 .collect()
283 }
284}
285
286impl HttpSource for LogplexSource {
287 fn build_events(
288 &self,
289 body: Bytes,
290 header_map: &HeaderMap,
291 _query_parameters: &HashMap<String, String>,
292 _full_path: &str,
293 ) -> Result<Vec<Event>, ErrorMessage> {
294 self.decode_message(body, header_map)
295 }
296
297 fn enrich_events(
298 &self,
299 events: &mut [Event],
300 _request_path: &str,
301 _headers_config: &HeaderMap,
302 query_parameters: &HashMap<String, String>,
303 _source_ip: Option<&SocketAddr>,
304 ) {
305 add_query_parameters(
306 events,
307 &self.query_parameters,
308 query_parameters,
309 self.log_namespace,
310 LogplexConfig::NAME,
311 );
312 }
313}
314
315fn get_header<'a>(header_map: &'a HeaderMap, name: &str) -> Result<&'a str, ErrorMessage> {
316 if let Some(header_value) = header_map.get(name) {
317 header_value
318 .to_str()
319 .map_err(|e| header_error_message(name, &e.to_string()))
320 } else {
321 Err(header_error_message(name, "Header does not exist"))
322 }
323}
324
325fn header_error_message(name: &str, msg: &str) -> ErrorMessage {
326 ErrorMessage::new(
327 StatusCode::BAD_REQUEST,
328 format!("Invalid request header {name:?}: {msg:?}"),
329 )
330}
331
332fn line_to_events(
333 mut decoder: Decoder,
334 log_namespace: LogNamespace,
335 line: String,
336) -> SmallVec<[Event; 1]> {
337 let parts = line.splitn(8, ' ').collect::<Vec<&str>>();
338
339 let mut events = SmallVec::<[Event; 1]>::new();
340
341 if parts.len() == 8 {
342 let timestamp = parts[2];
343 let hostname = parts[3];
344 let app_name = parts[4];
345 let proc_id = parts[5];
346 let message = parts[7];
347
348 let mut buffer = BytesMut::new();
349 buffer.put(message.as_bytes());
350
351 let legacy_host_key = log_schema().host_key().cloned();
352 let legacy_app_key = parse_value_path("app_name").ok();
353 let legacy_proc_key = parse_value_path("proc_id").ok();
354
355 loop {
356 match decoder.decode_eof(&mut buffer) {
357 Ok(Some((decoded, _byte_size))) => {
358 for mut event in decoded {
359 if let Event::Log(ref mut log) = event {
360 if let Ok(ts) = timestamp.parse::<DateTime<Utc>>() {
361 log_namespace.insert_vector_metadata(
362 log,
363 log_schema().timestamp_key(),
364 path!("timestamp"),
365 ts,
366 );
367 }
368
369 log_namespace.insert_source_metadata(
370 LogplexConfig::NAME,
371 log,
372 legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
373 path!("host"),
374 hostname.to_owned(),
375 );
376
377 log_namespace.insert_source_metadata(
378 LogplexConfig::NAME,
379 log,
380 legacy_app_key.as_ref().map(LegacyKey::InsertIfEmpty),
381 path!("app_name"),
382 app_name.to_owned(),
383 );
384
385 log_namespace.insert_source_metadata(
386 LogplexConfig::NAME,
387 log,
388 legacy_proc_key.as_ref().map(LegacyKey::InsertIfEmpty),
389 path!("proc_id"),
390 proc_id.to_owned(),
391 );
392 }
393
394 events.push(event);
395 }
396 }
397 Ok(None) => break,
398 Err(error) => {
399 if !error.can_continue() {
400 break;
401 }
402 }
403 }
404 }
405 } else {
406 warn!(
407 message = "Line didn't match expected logplex format, so raw message is forwarded.",
408 fields = parts.len(),
409 internal_log_rate_limit = true
410 );
411
412 events.push(LogEvent::from_str_legacy(line).into())
413 };
414
415 let now = Utc::now();
416
417 for event in &mut events {
418 if let Event::Log(log) = event {
419 log_namespace.insert_standard_vector_source_metadata(log, LogplexConfig::NAME, now);
420 }
421 }
422
423 events
424}
425
426#[cfg(test)]
427mod tests {
428 use std::net::SocketAddr;
429
430 use chrono::{DateTime, Utc};
431 use futures::Stream;
432 use similar_asserts::assert_eq;
433 use vector_lib::{
434 config::LogNamespace,
435 event::{Event, EventStatus, Value},
436 lookup::{OwnedTargetPath, owned_value_path},
437 schema::Definition,
438 };
439 use vrl::value::{Kind, kind::Collection};
440
441 use super::LogplexConfig;
442 use crate::{
443 SourceSender,
444 common::http::server_auth::HttpServerAuthConfig,
445 config::{SourceConfig, SourceContext, log_schema},
446 serde::{default_decoding, default_framing_message_based},
447 test_util::{
448 components::{HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
449 next_addr, random_string, spawn_collect_n, wait_for_tcp,
450 },
451 };
452
453 #[test]
454 fn generate_config() {
455 crate::test_util::test_generate_config::<LogplexConfig>();
456 }
457
458 async fn source(
459 auth: Option<HttpServerAuthConfig>,
460 query_parameters: Vec<String>,
461 status: EventStatus,
462 acknowledgements: bool,
463 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
464 let (sender, recv) = SourceSender::new_test_finalize(status);
465 let address = next_addr();
466 let context = SourceContext::new_test(sender, None);
467 tokio::spawn(async move {
468 LogplexConfig {
469 address,
470 query_parameters,
471 tls: None,
472 auth,
473 framing: default_framing_message_based(),
474 decoding: default_decoding(),
475 acknowledgements: acknowledgements.into(),
476 log_namespace: None,
477 keepalive: Default::default(),
478 }
479 .build(context)
480 .await
481 .unwrap()
482 .await
483 .unwrap()
484 });
485 wait_for_tcp(address).await;
486 (recv, address)
487 }
488
489 async fn send(
490 address: SocketAddr,
491 body: &str,
492 auth: Option<HttpServerAuthConfig>,
493 query: &str,
494 ) -> u16 {
495 let len = body.lines().count();
496 let mut req = reqwest::Client::new().post(format!("http://{address}/events?{query}"));
497 if let Some(HttpServerAuthConfig::Basic { username, password }) = auth {
498 req = req.basic_auth(username, Some(password.inner()));
499 }
500 req.header("Logplex-Msg-Count", len)
501 .header("Logplex-Frame-Id", "frame-foo")
502 .header("Logplex-Drain-Token", "drain-bar")
503 .body(body.to_owned())
504 .send()
505 .await
506 .unwrap()
507 .status()
508 .as_u16()
509 }
510
511 fn make_auth() -> HttpServerAuthConfig {
512 HttpServerAuthConfig::Basic {
513 username: random_string(16),
514 password: random_string(16).into(),
515 }
516 }
517
518 const SAMPLE_BODY: &str = r#"267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#;
519
520 #[tokio::test]
521 async fn logplex_handles_router_log() {
522 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
523 let auth = make_auth();
524
525 let (rx, addr) = source(
526 Some(auth.clone()),
527 vec!["appname".to_string(), "absent".to_string()],
528 EventStatus::Delivered,
529 true,
530 )
531 .await;
532
533 let mut events = spawn_collect_n(
534 async move {
535 assert_eq!(
536 200,
537 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
538 )
539 },
540 rx,
541 SAMPLE_BODY.lines().count(),
542 )
543 .await;
544
545 let event = events.remove(0);
546 let log = event.as_log();
547
548 assert_eq!(
549 *log.get_message().unwrap(),
550 r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
551 );
552 assert_eq!(
553 log[log_schema().timestamp_key().unwrap().to_string()],
554 "2020-01-08T22:33:57.353034+00:00"
555 .parse::<DateTime<Utc>>()
556 .unwrap()
557 .into()
558 );
559 assert_eq!(*log.get_host().unwrap(), "host".into());
560 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
561 assert_eq!(log["appname"], "lumberjack-store".into());
562 assert_eq!(log["absent"], Value::Null);
563 }).await;
564 }
565
566 #[tokio::test]
567 async fn logplex_query_parameters_wildcard() {
568 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
569 let auth = make_auth();
570
571 let (rx, addr) = source(
572 Some(auth.clone()),
573 vec!["*".to_string()],
574 EventStatus::Delivered,
575 true,
576 )
577 .await;
578
579 let mut events = spawn_collect_n(
580 async move {
581 assert_eq!(
582 200,
583 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
584 )
585 },
586 rx,
587 SAMPLE_BODY.lines().count(),
588 )
589 .await;
590
591 let event = events.remove(0);
592 let log = event.as_log();
593
594 assert_eq!(
595 *log.get_message().unwrap(),
596 r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
597 );
598 assert_eq!(
599 log[log_schema().timestamp_key().unwrap().to_string()],
600 "2020-01-08T22:33:57.353034+00:00"
601 .parse::<DateTime<Utc>>()
602 .unwrap()
603 .into()
604 );
605 assert_eq!(*log.get_host().unwrap(), "host".into());
606 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
607 assert_eq!(log["appname"], "lumberjack-store".into());
608 }).await;
609 }
610
611 #[tokio::test]
612 async fn logplex_handles_failures() {
613 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
614 let auth = make_auth();
615
616 let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, true).await;
617
618 let events = spawn_collect_n(
619 async move {
620 assert_eq!(
621 400,
622 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
623 )
624 },
625 rx,
626 SAMPLE_BODY.lines().count(),
627 )
628 .await;
629
630 assert_eq!(events.len(), SAMPLE_BODY.lines().count());
631 })
632 .await;
633 }
634
635 #[tokio::test]
636 async fn logplex_ignores_disabled_acknowledgements() {
637 let auth = make_auth();
638
639 let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, false).await;
640
641 let events = spawn_collect_n(
642 async move {
643 assert_eq!(
644 200,
645 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
646 )
647 },
648 rx,
649 SAMPLE_BODY.lines().count(),
650 )
651 .await;
652
653 assert_eq!(events.len(), SAMPLE_BODY.lines().count());
654 }
655
656 #[tokio::test]
657 async fn logplex_auth_failure() {
658 let (_rx, addr) = source(Some(make_auth()), vec![], EventStatus::Delivered, true).await;
659
660 assert_eq!(
661 401,
662 send(
663 addr,
664 SAMPLE_BODY,
665 Some(make_auth()),
666 "appname=lumberjack-store"
667 )
668 .await
669 );
670 }
671
672 #[test]
673 fn logplex_handles_normal_lines() {
674 let log_namespace = LogNamespace::Legacy;
675 let body = "267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - foo bar baz";
676 let events = super::line_to_events(Default::default(), log_namespace, body.into());
677 let log = events[0].as_log();
678
679 assert_eq!(*log.get_message().unwrap(), "foo bar baz".into());
680 assert_eq!(
681 log[log_schema().timestamp_key().unwrap().to_string()],
682 "2020-01-08T22:33:57.353034+00:00"
683 .parse::<DateTime<Utc>>()
684 .unwrap()
685 .into()
686 );
687 assert_eq!(*log.get_host().unwrap(), "host".into());
688 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
689 }
690
691 #[test]
692 fn logplex_handles_malformed_lines() {
693 let log_namespace = LogNamespace::Legacy;
694 let body = "what am i doing here";
695 let events = super::line_to_events(Default::default(), log_namespace, body.into());
696 let log = events[0].as_log();
697
698 assert_eq!(*log.get_message().unwrap(), "what am i doing here".into());
699 assert!(log.get_timestamp().is_some());
700 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
701 }
702
703 #[test]
704 fn logplex_doesnt_blow_up_on_bad_framing() {
705 let log_namespace = LogNamespace::Legacy;
706 let body = "1000000 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - i'm not that long";
707 let events = super::line_to_events(Default::default(), log_namespace, body.into());
708 let log = events[0].as_log();
709
710 assert_eq!(*log.get_message().unwrap(), "i'm not that long".into());
711 assert_eq!(
712 log[log_schema().timestamp_key().unwrap().to_string()],
713 "2020-01-08T22:33:57.353034+00:00"
714 .parse::<DateTime<Utc>>()
715 .unwrap()
716 .into()
717 );
718 assert_eq!(*log.get_host().unwrap(), "host".into());
719 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
720 }
721
722 #[test]
723 fn output_schema_definition_vector_namespace() {
724 let config = LogplexConfig {
725 log_namespace: Some(true),
726 ..Default::default()
727 };
728
729 let definitions = config
730 .outputs(LogNamespace::Vector)
731 .remove(0)
732 .schema_definition(true);
733
734 let expected_definition =
735 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
736 .with_meaning(OwnedTargetPath::event_root(), "message")
737 .with_metadata_field(
738 &owned_value_path!("vector", "source_type"),
739 Kind::bytes(),
740 None,
741 )
742 .with_metadata_field(
743 &owned_value_path!("vector", "ingest_timestamp"),
744 Kind::timestamp(),
745 None,
746 )
747 .with_metadata_field(
748 &owned_value_path!(LogplexConfig::NAME, "timestamp"),
749 Kind::timestamp().or_undefined(),
750 Some("timestamp"),
751 )
752 .with_metadata_field(
753 &owned_value_path!(LogplexConfig::NAME, "host"),
754 Kind::bytes(),
755 Some("host"),
756 )
757 .with_metadata_field(
758 &owned_value_path!(LogplexConfig::NAME, "app_name"),
759 Kind::bytes(),
760 Some("service"),
761 )
762 .with_metadata_field(
763 &owned_value_path!(LogplexConfig::NAME, "proc_id"),
764 Kind::bytes(),
765 None,
766 )
767 .with_metadata_field(
768 &owned_value_path!(LogplexConfig::NAME, "query_parameters"),
769 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
770 None,
771 );
772
773 assert_eq!(definitions, Some(expected_definition))
774 }
775
776 #[test]
777 fn output_schema_definition_legacy_namespace() {
778 let config = LogplexConfig::default();
779
780 let definitions = config
781 .outputs(LogNamespace::Legacy)
782 .remove(0)
783 .schema_definition(true);
784
785 let expected_definition = Definition::new_with_default_metadata(
786 Kind::object(Collection::empty()),
787 [LogNamespace::Legacy],
788 )
789 .with_event_field(
790 &owned_value_path!("message"),
791 Kind::bytes(),
792 Some("message"),
793 )
794 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
795 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
796 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
797 .with_event_field(
798 &owned_value_path!("app_name"),
799 Kind::bytes(),
800 Some("service"),
801 )
802 .with_event_field(&owned_value_path!("proc_id"), Kind::bytes(), None)
803 .unknown_fields(Kind::bytes());
804
805 assert_eq!(definitions, Some(expected_definition))
806 }
807}