1use std::{
2 collections::HashMap,
3 io::{BufRead, BufReader},
4 net::SocketAddr,
5 str::FromStr,
6};
7
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use chrono::{DateTime, Utc};
10use smallvec::SmallVec;
11use tokio_util::codec::Decoder as _;
12use vector_lib::lookup::{lookup_v2::parse_value_path, owned_value_path, path};
13use vector_lib::{
14 codecs::{
15 decoding::{DeserializerConfig, FramingConfig},
16 StreamDecodingError,
17 },
18 config::DataType,
19};
20use vrl::value::{kind::Collection, Kind};
21use warp::http::{HeaderMap, StatusCode};
22
23use vector_lib::configurable::configurable_component;
24use vector_lib::{
25 config::{LegacyKey, LogNamespace},
26 schema::Definition,
27};
28
29use crate::{
30 codecs::{Decoder, DecodingConfig},
31 common::http::{server_auth::HttpServerAuthConfig, ErrorMessage},
32 config::{
33 log_schema, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
34 SourceContext, SourceOutput,
35 },
36 event::{Event, LogEvent},
37 http::KeepaliveConfig,
38 internal_events::{HerokuLogplexRequestReadError, HerokuLogplexRequestReceived},
39 serde::{bool_or_struct, default_decoding, default_framing_message_based},
40 sources::{
41 http_server::{build_param_matcher, remove_duplicates, HttpConfigParamKind},
42 util::{
43 http::{add_query_parameters, HttpMethod},
44 HttpSource,
45 },
46 },
47 tls::TlsEnableableConfig,
48};
49
50#[configurable_component(source("heroku_logs", "Collect logs from Heroku's Logplex, the router responsible for receiving logs from your Heroku apps."))]
52#[derive(Clone, Debug)]
53pub struct LogplexConfig {
54 #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
56 #[configurable(metadata(docs::examples = "localhost:80"))]
57 address: SocketAddr,
58
59 #[serde(default)]
67 #[configurable(metadata(docs::examples = "application"))]
68 #[configurable(metadata(docs::examples = "source"))]
69 #[configurable(metadata(docs::examples = "param*"))]
70 #[configurable(metadata(docs::examples = "*"))]
71 query_parameters: Vec<String>,
72
73 #[configurable(derived)]
74 tls: Option<TlsEnableableConfig>,
75
76 #[configurable(derived)]
77 auth: Option<HttpServerAuthConfig>,
78
79 #[configurable(derived)]
80 #[serde(default = "default_framing_message_based")]
81 framing: FramingConfig,
82
83 #[configurable(derived)]
84 #[serde(default = "default_decoding")]
85 decoding: DeserializerConfig,
86
87 #[configurable(derived)]
88 #[serde(default, deserialize_with = "bool_or_struct")]
89 acknowledgements: SourceAcknowledgementsConfig,
90
91 #[configurable(metadata(docs::hidden))]
93 #[serde(default)]
94 log_namespace: Option<bool>,
95
96 #[configurable(derived)]
97 #[serde(default)]
98 keepalive: KeepaliveConfig,
99}
100
101impl LogplexConfig {
102 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
104 let mut schema_definition = self
105 .decoding
106 .schema_definition(log_namespace)
107 .with_standard_vector_source_metadata()
108 .with_source_metadata(
109 LogplexConfig::NAME,
110 None,
111 &owned_value_path!("timestamp"),
112 Kind::timestamp().or_undefined(),
113 Some("timestamp"),
114 )
115 .with_source_metadata(
116 LogplexConfig::NAME,
117 log_schema()
118 .host_key()
119 .cloned()
120 .map(LegacyKey::InsertIfEmpty),
121 &owned_value_path!("host"),
122 Kind::bytes(),
123 Some("host"),
124 )
125 .with_source_metadata(
126 LogplexConfig::NAME,
127 Some(LegacyKey::InsertIfEmpty(owned_value_path!("app_name"))),
128 &owned_value_path!("app_name"),
129 Kind::bytes(),
130 Some("service"),
131 )
132 .with_source_metadata(
133 LogplexConfig::NAME,
134 Some(LegacyKey::InsertIfEmpty(owned_value_path!("proc_id"))),
135 &owned_value_path!("proc_id"),
136 Kind::bytes(),
137 None,
138 )
139 .with_source_metadata(
141 LogplexConfig::NAME,
142 None,
143 &owned_value_path!("query_parameters"),
144 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
145 None,
146 );
147
148 if log_namespace == LogNamespace::Legacy {
150 schema_definition = schema_definition.unknown_fields(Kind::bytes());
151 }
152
153 schema_definition
154 }
155}
156
157impl Default for LogplexConfig {
158 fn default() -> Self {
159 Self {
160 address: "0.0.0.0:80".parse().unwrap(),
161 query_parameters: Vec::new(),
162 tls: None,
163 auth: None,
164 framing: default_framing_message_based(),
165 decoding: default_decoding(),
166 acknowledgements: SourceAcknowledgementsConfig::default(),
167 log_namespace: None,
168 keepalive: KeepaliveConfig::default(),
169 }
170 }
171}
172
173impl GenerateConfig for LogplexConfig {
174 fn generate_config() -> toml::Value {
175 toml::Value::try_from(LogplexConfig::default()).unwrap()
176 }
177}
178
179#[async_trait::async_trait]
180#[typetag::serde(name = "heroku_logs")]
181impl SourceConfig for LogplexConfig {
182 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
183 let log_namespace = cx.log_namespace(self.log_namespace);
184
185 let decoder =
186 DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
187 .build()?;
188
189 let source = LogplexSource {
190 query_parameters: build_param_matcher(&remove_duplicates(
191 self.query_parameters.clone(),
192 "query_parameters",
193 ))?,
194 decoder,
195 log_namespace,
196 };
197
198 source.run(
199 self.address,
200 "events",
201 HttpMethod::Post,
202 StatusCode::OK,
203 true,
204 self.tls.as_ref(),
205 self.auth.as_ref(),
206 cx,
207 self.acknowledgements,
208 self.keepalive.clone(),
209 )
210 }
211
212 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
213 let schema_def = self.schema_definition(global_log_namespace.merge(self.log_namespace));
216 vec![SourceOutput::new_maybe_logs(DataType::Log, schema_def)]
217 }
218
219 fn resources(&self) -> Vec<Resource> {
220 vec![Resource::tcp(self.address)]
221 }
222
223 fn can_acknowledge(&self) -> bool {
224 true
225 }
226}
227
228#[derive(Clone, Default)]
229struct LogplexSource {
230 query_parameters: Vec<HttpConfigParamKind>,
231 decoder: Decoder,
232 log_namespace: LogNamespace,
233}
234
235impl LogplexSource {
236 fn decode_message(
237 &self,
238 body: Bytes,
239 header_map: &HeaderMap,
240 ) -> Result<Vec<Event>, ErrorMessage> {
241 let msg_count = match usize::from_str(get_header(header_map, "Logplex-Msg-Count")?) {
243 Ok(v) => v,
244 Err(e) => return Err(header_error_message("Logplex-Msg-Count", &e.to_string())),
245 };
246 let frame_id = get_header(header_map, "Logplex-Frame-Id")?;
247 let drain_token = get_header(header_map, "Logplex-Drain-Token")?;
248
249 emit!(HerokuLogplexRequestReceived {
250 msg_count,
251 frame_id,
252 drain_token
253 });
254
255 let events = self.body_to_events(body);
257
258 if events.len() != msg_count {
259 let error_msg = format!(
260 "Parsed event count does not match message count header: {} vs {}",
261 events.len(),
262 msg_count
263 );
264
265 if cfg!(test) {
266 panic!("{}", error_msg);
267 }
268 return Err(header_error_message("Logplex-Msg-Count", &error_msg));
269 }
270
271 Ok(events)
272 }
273
274 fn body_to_events(&self, body: Bytes) -> Vec<Event> {
275 let rdr = BufReader::new(body.reader());
276 rdr.lines()
277 .filter_map(|res| {
278 res.map_err(|error| emit!(HerokuLogplexRequestReadError { error }))
279 .ok()
280 })
281 .filter(|s| !s.is_empty())
282 .flat_map(|line| line_to_events(self.decoder.clone(), self.log_namespace, line))
283 .collect()
284 }
285}
286
287impl HttpSource for LogplexSource {
288 fn build_events(
289 &self,
290 body: Bytes,
291 header_map: &HeaderMap,
292 _query_parameters: &HashMap<String, String>,
293 _full_path: &str,
294 ) -> Result<Vec<Event>, ErrorMessage> {
295 self.decode_message(body, header_map)
296 }
297
298 fn enrich_events(
299 &self,
300 events: &mut [Event],
301 _request_path: &str,
302 _headers_config: &HeaderMap,
303 query_parameters: &HashMap<String, String>,
304 _source_ip: Option<&SocketAddr>,
305 ) {
306 add_query_parameters(
307 events,
308 &self.query_parameters,
309 query_parameters,
310 self.log_namespace,
311 LogplexConfig::NAME,
312 );
313 }
314}
315
316fn get_header<'a>(header_map: &'a HeaderMap, name: &str) -> Result<&'a str, ErrorMessage> {
317 if let Some(header_value) = header_map.get(name) {
318 header_value
319 .to_str()
320 .map_err(|e| header_error_message(name, &e.to_string()))
321 } else {
322 Err(header_error_message(name, "Header does not exist"))
323 }
324}
325
326fn header_error_message(name: &str, msg: &str) -> ErrorMessage {
327 ErrorMessage::new(
328 StatusCode::BAD_REQUEST,
329 format!("Invalid request header {name:?}: {msg:?}"),
330 )
331}
332
333fn line_to_events(
334 mut decoder: Decoder,
335 log_namespace: LogNamespace,
336 line: String,
337) -> SmallVec<[Event; 1]> {
338 let parts = line.splitn(8, ' ').collect::<Vec<&str>>();
339
340 let mut events = SmallVec::<[Event; 1]>::new();
341
342 if parts.len() == 8 {
343 let timestamp = parts[2];
344 let hostname = parts[3];
345 let app_name = parts[4];
346 let proc_id = parts[5];
347 let message = parts[7];
348
349 let mut buffer = BytesMut::new();
350 buffer.put(message.as_bytes());
351
352 let legacy_host_key = log_schema().host_key().cloned();
353 let legacy_app_key = parse_value_path("app_name").ok();
354 let legacy_proc_key = parse_value_path("proc_id").ok();
355
356 loop {
357 match decoder.decode_eof(&mut buffer) {
358 Ok(Some((decoded, _byte_size))) => {
359 for mut event in decoded {
360 if let Event::Log(ref mut log) = event {
361 if let Ok(ts) = timestamp.parse::<DateTime<Utc>>() {
362 log_namespace.insert_vector_metadata(
363 log,
364 log_schema().timestamp_key(),
365 path!("timestamp"),
366 ts,
367 );
368 }
369
370 log_namespace.insert_source_metadata(
371 LogplexConfig::NAME,
372 log,
373 legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
374 path!("host"),
375 hostname.to_owned(),
376 );
377
378 log_namespace.insert_source_metadata(
379 LogplexConfig::NAME,
380 log,
381 legacy_app_key.as_ref().map(LegacyKey::InsertIfEmpty),
382 path!("app_name"),
383 app_name.to_owned(),
384 );
385
386 log_namespace.insert_source_metadata(
387 LogplexConfig::NAME,
388 log,
389 legacy_proc_key.as_ref().map(LegacyKey::InsertIfEmpty),
390 path!("proc_id"),
391 proc_id.to_owned(),
392 );
393 }
394
395 events.push(event);
396 }
397 }
398 Ok(None) => break,
399 Err(error) => {
400 if !error.can_continue() {
401 break;
402 }
403 }
404 }
405 }
406 } else {
407 warn!(
408 message = "Line didn't match expected logplex format, so raw message is forwarded.",
409 fields = parts.len(),
410 internal_log_rate_limit = true
411 );
412
413 events.push(LogEvent::from_str_legacy(line).into())
414 };
415
416 let now = Utc::now();
417
418 for event in &mut events {
419 if let Event::Log(log) = event {
420 log_namespace.insert_standard_vector_source_metadata(log, LogplexConfig::NAME, now);
421 }
422 }
423
424 events
425}
426
427#[cfg(test)]
428mod tests {
429 use std::net::SocketAddr;
430
431 use chrono::{DateTime, Utc};
432 use futures::Stream;
433 use similar_asserts::assert_eq;
434 use vector_lib::lookup::{owned_value_path, OwnedTargetPath};
435 use vector_lib::{
436 config::LogNamespace,
437 event::{Event, EventStatus, Value},
438 schema::Definition,
439 };
440 use vrl::value::{kind::Collection, Kind};
441
442 use super::LogplexConfig;
443 use crate::common::http::server_auth::HttpServerAuthConfig;
444 use crate::{
445 config::{log_schema, SourceConfig, SourceContext},
446 serde::{default_decoding, default_framing_message_based},
447 test_util::{
448 components::{assert_source_compliance, HTTP_PUSH_SOURCE_TAGS},
449 next_addr, random_string, spawn_collect_n, wait_for_tcp,
450 },
451 SourceSender,
452 };
453
454 #[test]
455 fn generate_config() {
456 crate::test_util::test_generate_config::<LogplexConfig>();
457 }
458
459 async fn source(
460 auth: Option<HttpServerAuthConfig>,
461 query_parameters: Vec<String>,
462 status: EventStatus,
463 acknowledgements: bool,
464 ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
465 let (sender, recv) = SourceSender::new_test_finalize(status);
466 let address = next_addr();
467 let context = SourceContext::new_test(sender, None);
468 tokio::spawn(async move {
469 LogplexConfig {
470 address,
471 query_parameters,
472 tls: None,
473 auth,
474 framing: default_framing_message_based(),
475 decoding: default_decoding(),
476 acknowledgements: acknowledgements.into(),
477 log_namespace: None,
478 keepalive: Default::default(),
479 }
480 .build(context)
481 .await
482 .unwrap()
483 .await
484 .unwrap()
485 });
486 wait_for_tcp(address).await;
487 (recv, address)
488 }
489
490 async fn send(
491 address: SocketAddr,
492 body: &str,
493 auth: Option<HttpServerAuthConfig>,
494 query: &str,
495 ) -> u16 {
496 let len = body.lines().count();
497 let mut req = reqwest::Client::new().post(format!("http://{address}/events?{query}"));
498 if let Some(HttpServerAuthConfig::Basic { username, password }) = auth {
499 req = req.basic_auth(username, Some(password.inner()));
500 }
501 req.header("Logplex-Msg-Count", len)
502 .header("Logplex-Frame-Id", "frame-foo")
503 .header("Logplex-Drain-Token", "drain-bar")
504 .body(body.to_owned())
505 .send()
506 .await
507 .unwrap()
508 .status()
509 .as_u16()
510 }
511
512 fn make_auth() -> HttpServerAuthConfig {
513 HttpServerAuthConfig::Basic {
514 username: random_string(16),
515 password: random_string(16).into(),
516 }
517 }
518
519 const SAMPLE_BODY: &str = r#"267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#;
520
521 #[tokio::test]
522 async fn logplex_handles_router_log() {
523 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
524 let auth = make_auth();
525
526 let (rx, addr) = source(
527 Some(auth.clone()),
528 vec!["appname".to_string(), "absent".to_string()],
529 EventStatus::Delivered,
530 true,
531 )
532 .await;
533
534 let mut events = spawn_collect_n(
535 async move {
536 assert_eq!(
537 200,
538 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
539 )
540 },
541 rx,
542 SAMPLE_BODY.lines().count(),
543 )
544 .await;
545
546 let event = events.remove(0);
547 let log = event.as_log();
548
549 assert_eq!(
550 *log.get_message().unwrap(),
551 r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
552 );
553 assert_eq!(
554 log[log_schema().timestamp_key().unwrap().to_string()],
555 "2020-01-08T22:33:57.353034+00:00"
556 .parse::<DateTime<Utc>>()
557 .unwrap()
558 .into()
559 );
560 assert_eq!(*log.get_host().unwrap(), "host".into());
561 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
562 assert_eq!(log["appname"], "lumberjack-store".into());
563 assert_eq!(log["absent"], Value::Null);
564 }).await;
565 }
566
567 #[tokio::test]
568 async fn logplex_query_parameters_wildcard() {
569 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
570 let auth = make_auth();
571
572 let (rx, addr) = source(
573 Some(auth.clone()),
574 vec!["*".to_string()],
575 EventStatus::Delivered,
576 true,
577 )
578 .await;
579
580 let mut events = spawn_collect_n(
581 async move {
582 assert_eq!(
583 200,
584 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
585 )
586 },
587 rx,
588 SAMPLE_BODY.lines().count(),
589 )
590 .await;
591
592 let event = events.remove(0);
593 let log = event.as_log();
594
595 assert_eq!(
596 *log.get_message().unwrap(),
597 r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
598 );
599 assert_eq!(
600 log[log_schema().timestamp_key().unwrap().to_string()],
601 "2020-01-08T22:33:57.353034+00:00"
602 .parse::<DateTime<Utc>>()
603 .unwrap()
604 .into()
605 );
606 assert_eq!(*log.get_host().unwrap(), "host".into());
607 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
608 assert_eq!(log["appname"], "lumberjack-store".into());
609 }).await;
610 }
611
612 #[tokio::test]
613 async fn logplex_handles_failures() {
614 assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
615 let auth = make_auth();
616
617 let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, true).await;
618
619 let events = spawn_collect_n(
620 async move {
621 assert_eq!(
622 400,
623 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
624 )
625 },
626 rx,
627 SAMPLE_BODY.lines().count(),
628 )
629 .await;
630
631 assert_eq!(events.len(), SAMPLE_BODY.lines().count());
632 })
633 .await;
634 }
635
636 #[tokio::test]
637 async fn logplex_ignores_disabled_acknowledgements() {
638 let auth = make_auth();
639
640 let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, false).await;
641
642 let events = spawn_collect_n(
643 async move {
644 assert_eq!(
645 200,
646 send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
647 )
648 },
649 rx,
650 SAMPLE_BODY.lines().count(),
651 )
652 .await;
653
654 assert_eq!(events.len(), SAMPLE_BODY.lines().count());
655 }
656
657 #[tokio::test]
658 async fn logplex_auth_failure() {
659 let (_rx, addr) = source(Some(make_auth()), vec![], EventStatus::Delivered, true).await;
660
661 assert_eq!(
662 401,
663 send(
664 addr,
665 SAMPLE_BODY,
666 Some(make_auth()),
667 "appname=lumberjack-store"
668 )
669 .await
670 );
671 }
672
673 #[test]
674 fn logplex_handles_normal_lines() {
675 let log_namespace = LogNamespace::Legacy;
676 let body = "267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - foo bar baz";
677 let events = super::line_to_events(Default::default(), log_namespace, body.into());
678 let log = events[0].as_log();
679
680 assert_eq!(*log.get_message().unwrap(), "foo bar baz".into());
681 assert_eq!(
682 log[log_schema().timestamp_key().unwrap().to_string()],
683 "2020-01-08T22:33:57.353034+00:00"
684 .parse::<DateTime<Utc>>()
685 .unwrap()
686 .into()
687 );
688 assert_eq!(*log.get_host().unwrap(), "host".into());
689 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
690 }
691
692 #[test]
693 fn logplex_handles_malformed_lines() {
694 let log_namespace = LogNamespace::Legacy;
695 let body = "what am i doing here";
696 let events = super::line_to_events(Default::default(), log_namespace, body.into());
697 let log = events[0].as_log();
698
699 assert_eq!(*log.get_message().unwrap(), "what am i doing here".into());
700 assert!(log.get_timestamp().is_some());
701 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
702 }
703
704 #[test]
705 fn logplex_doesnt_blow_up_on_bad_framing() {
706 let log_namespace = LogNamespace::Legacy;
707 let body = "1000000 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - i'm not that long";
708 let events = super::line_to_events(Default::default(), log_namespace, body.into());
709 let log = events[0].as_log();
710
711 assert_eq!(*log.get_message().unwrap(), "i'm not that long".into());
712 assert_eq!(
713 log[log_schema().timestamp_key().unwrap().to_string()],
714 "2020-01-08T22:33:57.353034+00:00"
715 .parse::<DateTime<Utc>>()
716 .unwrap()
717 .into()
718 );
719 assert_eq!(*log.get_host().unwrap(), "host".into());
720 assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
721 }
722
723 #[test]
724 fn output_schema_definition_vector_namespace() {
725 let config = LogplexConfig {
726 log_namespace: Some(true),
727 ..Default::default()
728 };
729
730 let definitions = config
731 .outputs(LogNamespace::Vector)
732 .remove(0)
733 .schema_definition(true);
734
735 let expected_definition =
736 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
737 .with_meaning(OwnedTargetPath::event_root(), "message")
738 .with_metadata_field(
739 &owned_value_path!("vector", "source_type"),
740 Kind::bytes(),
741 None,
742 )
743 .with_metadata_field(
744 &owned_value_path!("vector", "ingest_timestamp"),
745 Kind::timestamp(),
746 None,
747 )
748 .with_metadata_field(
749 &owned_value_path!(LogplexConfig::NAME, "timestamp"),
750 Kind::timestamp().or_undefined(),
751 Some("timestamp"),
752 )
753 .with_metadata_field(
754 &owned_value_path!(LogplexConfig::NAME, "host"),
755 Kind::bytes(),
756 Some("host"),
757 )
758 .with_metadata_field(
759 &owned_value_path!(LogplexConfig::NAME, "app_name"),
760 Kind::bytes(),
761 Some("service"),
762 )
763 .with_metadata_field(
764 &owned_value_path!(LogplexConfig::NAME, "proc_id"),
765 Kind::bytes(),
766 None,
767 )
768 .with_metadata_field(
769 &owned_value_path!(LogplexConfig::NAME, "query_parameters"),
770 Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
771 None,
772 );
773
774 assert_eq!(definitions, Some(expected_definition))
775 }
776
777 #[test]
778 fn output_schema_definition_legacy_namespace() {
779 let config = LogplexConfig::default();
780
781 let definitions = config
782 .outputs(LogNamespace::Legacy)
783 .remove(0)
784 .schema_definition(true);
785
786 let expected_definition = Definition::new_with_default_metadata(
787 Kind::object(Collection::empty()),
788 [LogNamespace::Legacy],
789 )
790 .with_event_field(
791 &owned_value_path!("message"),
792 Kind::bytes(),
793 Some("message"),
794 )
795 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
796 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
797 .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
798 .with_event_field(
799 &owned_value_path!("app_name"),
800 Kind::bytes(),
801 Some("service"),
802 )
803 .with_event_field(&owned_value_path!("proc_id"), Kind::bytes(), None)
804 .unknown_fields(Kind::bytes());
805
806 assert_eq!(definitions, Some(expected_definition))
807 }
808}