vector/sources/
heroku_logs.rs

1use std::{
2    collections::HashMap,
3    io::{BufRead, BufReader},
4    net::SocketAddr,
5    str::FromStr,
6};
7
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use chrono::{DateTime, Utc};
10use smallvec::SmallVec;
11use tokio_util::codec::Decoder as _;
12use vector_lib::lookup::{lookup_v2::parse_value_path, owned_value_path, path};
13use vector_lib::{
14    codecs::{
15        decoding::{DeserializerConfig, FramingConfig},
16        StreamDecodingError,
17    },
18    config::DataType,
19};
20use vrl::value::{kind::Collection, Kind};
21use warp::http::{HeaderMap, StatusCode};
22
23use vector_lib::configurable::configurable_component;
24use vector_lib::{
25    config::{LegacyKey, LogNamespace},
26    schema::Definition,
27};
28
29use crate::{
30    codecs::{Decoder, DecodingConfig},
31    common::http::{server_auth::HttpServerAuthConfig, ErrorMessage},
32    config::{
33        log_schema, GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig,
34        SourceContext, SourceOutput,
35    },
36    event::{Event, LogEvent},
37    http::KeepaliveConfig,
38    internal_events::{HerokuLogplexRequestReadError, HerokuLogplexRequestReceived},
39    serde::{bool_or_struct, default_decoding, default_framing_message_based},
40    sources::{
41        http_server::{build_param_matcher, remove_duplicates, HttpConfigParamKind},
42        util::{
43            http::{add_query_parameters, HttpMethod},
44            HttpSource,
45        },
46    },
47    tls::TlsEnableableConfig,
48};
49
50/// Configuration for `heroku_logs` source.
51#[configurable_component(source("heroku_logs", "Collect logs from Heroku's Logplex, the router responsible for receiving logs from your Heroku apps."))]
52#[derive(Clone, Debug)]
53pub struct LogplexConfig {
54    /// The socket address to listen for connections on.
55    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
56    #[configurable(metadata(docs::examples = "localhost:80"))]
57    address: SocketAddr,
58
59    /// A list of URL query parameters to include in the log event.
60    ///
61    /// Accepts the wildcard (`*`) character for query parameters matching a specified pattern.
62    ///
63    /// Specifying "*" results in all query parameters included in the log event.
64    ///
65    /// These override any values included in the body with conflicting names.
66    #[serde(default)]
67    #[configurable(metadata(docs::examples = "application"))]
68    #[configurable(metadata(docs::examples = "source"))]
69    #[configurable(metadata(docs::examples = "param*"))]
70    #[configurable(metadata(docs::examples = "*"))]
71    query_parameters: Vec<String>,
72
73    #[configurable(derived)]
74    tls: Option<TlsEnableableConfig>,
75
76    #[configurable(derived)]
77    auth: Option<HttpServerAuthConfig>,
78
79    #[configurable(derived)]
80    #[serde(default = "default_framing_message_based")]
81    framing: FramingConfig,
82
83    #[configurable(derived)]
84    #[serde(default = "default_decoding")]
85    decoding: DeserializerConfig,
86
87    #[configurable(derived)]
88    #[serde(default, deserialize_with = "bool_or_struct")]
89    acknowledgements: SourceAcknowledgementsConfig,
90
91    /// The namespace to use for logs. This overrides the global setting.
92    #[configurable(metadata(docs::hidden))]
93    #[serde(default)]
94    log_namespace: Option<bool>,
95
96    #[configurable(derived)]
97    #[serde(default)]
98    keepalive: KeepaliveConfig,
99}
100
101impl LogplexConfig {
102    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
103    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
104        let mut schema_definition = self
105            .decoding
106            .schema_definition(log_namespace)
107            .with_standard_vector_source_metadata()
108            .with_source_metadata(
109                LogplexConfig::NAME,
110                None,
111                &owned_value_path!("timestamp"),
112                Kind::timestamp().or_undefined(),
113                Some("timestamp"),
114            )
115            .with_source_metadata(
116                LogplexConfig::NAME,
117                log_schema()
118                    .host_key()
119                    .cloned()
120                    .map(LegacyKey::InsertIfEmpty),
121                &owned_value_path!("host"),
122                Kind::bytes(),
123                Some("host"),
124            )
125            .with_source_metadata(
126                LogplexConfig::NAME,
127                Some(LegacyKey::InsertIfEmpty(owned_value_path!("app_name"))),
128                &owned_value_path!("app_name"),
129                Kind::bytes(),
130                Some("service"),
131            )
132            .with_source_metadata(
133                LogplexConfig::NAME,
134                Some(LegacyKey::InsertIfEmpty(owned_value_path!("proc_id"))),
135                &owned_value_path!("proc_id"),
136                Kind::bytes(),
137                None,
138            )
139            // for metadata that is added to the events dynamically from the self.query_parameters
140            .with_source_metadata(
141                LogplexConfig::NAME,
142                None,
143                &owned_value_path!("query_parameters"),
144                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
145                None,
146            );
147
148        // for metadata that is added to the events dynamically from config options
149        if log_namespace == LogNamespace::Legacy {
150            schema_definition = schema_definition.unknown_fields(Kind::bytes());
151        }
152
153        schema_definition
154    }
155}
156
157impl Default for LogplexConfig {
158    fn default() -> Self {
159        Self {
160            address: "0.0.0.0:80".parse().unwrap(),
161            query_parameters: Vec::new(),
162            tls: None,
163            auth: None,
164            framing: default_framing_message_based(),
165            decoding: default_decoding(),
166            acknowledgements: SourceAcknowledgementsConfig::default(),
167            log_namespace: None,
168            keepalive: KeepaliveConfig::default(),
169        }
170    }
171}
172
173impl GenerateConfig for LogplexConfig {
174    fn generate_config() -> toml::Value {
175        toml::Value::try_from(LogplexConfig::default()).unwrap()
176    }
177}
178
179#[async_trait::async_trait]
180#[typetag::serde(name = "heroku_logs")]
181impl SourceConfig for LogplexConfig {
182    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
183        let log_namespace = cx.log_namespace(self.log_namespace);
184
185        let decoder =
186            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
187                .build()?;
188
189        let source = LogplexSource {
190            query_parameters: build_param_matcher(&remove_duplicates(
191                self.query_parameters.clone(),
192                "query_parameters",
193            ))?,
194            decoder,
195            log_namespace,
196        };
197
198        source.run(
199            self.address,
200            "events",
201            HttpMethod::Post,
202            StatusCode::OK,
203            true,
204            self.tls.as_ref(),
205            self.auth.as_ref(),
206            cx,
207            self.acknowledgements,
208            self.keepalive.clone(),
209        )
210    }
211
212    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
213        // There is a global and per-source `log_namespace` config.
214        // The source config overrides the global setting and is merged here.
215        let schema_def = self.schema_definition(global_log_namespace.merge(self.log_namespace));
216        vec![SourceOutput::new_maybe_logs(DataType::Log, schema_def)]
217    }
218
219    fn resources(&self) -> Vec<Resource> {
220        vec![Resource::tcp(self.address)]
221    }
222
223    fn can_acknowledge(&self) -> bool {
224        true
225    }
226}
227
228#[derive(Clone, Default)]
229struct LogplexSource {
230    query_parameters: Vec<HttpConfigParamKind>,
231    decoder: Decoder,
232    log_namespace: LogNamespace,
233}
234
235impl LogplexSource {
236    fn decode_message(
237        &self,
238        body: Bytes,
239        header_map: &HeaderMap,
240    ) -> Result<Vec<Event>, ErrorMessage> {
241        // Deal with headers
242        let msg_count = match usize::from_str(get_header(header_map, "Logplex-Msg-Count")?) {
243            Ok(v) => v,
244            Err(e) => return Err(header_error_message("Logplex-Msg-Count", &e.to_string())),
245        };
246        let frame_id = get_header(header_map, "Logplex-Frame-Id")?;
247        let drain_token = get_header(header_map, "Logplex-Drain-Token")?;
248
249        emit!(HerokuLogplexRequestReceived {
250            msg_count,
251            frame_id,
252            drain_token
253        });
254
255        // Deal with body
256        let events = self.body_to_events(body);
257
258        if events.len() != msg_count {
259            let error_msg = format!(
260                "Parsed event count does not match message count header: {} vs {}",
261                events.len(),
262                msg_count
263            );
264
265            if cfg!(test) {
266                panic!("{}", error_msg);
267            }
268            return Err(header_error_message("Logplex-Msg-Count", &error_msg));
269        }
270
271        Ok(events)
272    }
273
274    fn body_to_events(&self, body: Bytes) -> Vec<Event> {
275        let rdr = BufReader::new(body.reader());
276        rdr.lines()
277            .filter_map(|res| {
278                res.map_err(|error| emit!(HerokuLogplexRequestReadError { error }))
279                    .ok()
280            })
281            .filter(|s| !s.is_empty())
282            .flat_map(|line| line_to_events(self.decoder.clone(), self.log_namespace, line))
283            .collect()
284    }
285}
286
287impl HttpSource for LogplexSource {
288    fn build_events(
289        &self,
290        body: Bytes,
291        header_map: &HeaderMap,
292        _query_parameters: &HashMap<String, String>,
293        _full_path: &str,
294    ) -> Result<Vec<Event>, ErrorMessage> {
295        self.decode_message(body, header_map)
296    }
297
298    fn enrich_events(
299        &self,
300        events: &mut [Event],
301        _request_path: &str,
302        _headers_config: &HeaderMap,
303        query_parameters: &HashMap<String, String>,
304        _source_ip: Option<&SocketAddr>,
305    ) {
306        add_query_parameters(
307            events,
308            &self.query_parameters,
309            query_parameters,
310            self.log_namespace,
311            LogplexConfig::NAME,
312        );
313    }
314}
315
316fn get_header<'a>(header_map: &'a HeaderMap, name: &str) -> Result<&'a str, ErrorMessage> {
317    if let Some(header_value) = header_map.get(name) {
318        header_value
319            .to_str()
320            .map_err(|e| header_error_message(name, &e.to_string()))
321    } else {
322        Err(header_error_message(name, "Header does not exist"))
323    }
324}
325
326fn header_error_message(name: &str, msg: &str) -> ErrorMessage {
327    ErrorMessage::new(
328        StatusCode::BAD_REQUEST,
329        format!("Invalid request header {name:?}: {msg:?}"),
330    )
331}
332
333fn line_to_events(
334    mut decoder: Decoder,
335    log_namespace: LogNamespace,
336    line: String,
337) -> SmallVec<[Event; 1]> {
338    let parts = line.splitn(8, ' ').collect::<Vec<&str>>();
339
340    let mut events = SmallVec::<[Event; 1]>::new();
341
342    if parts.len() == 8 {
343        let timestamp = parts[2];
344        let hostname = parts[3];
345        let app_name = parts[4];
346        let proc_id = parts[5];
347        let message = parts[7];
348
349        let mut buffer = BytesMut::new();
350        buffer.put(message.as_bytes());
351
352        let legacy_host_key = log_schema().host_key().cloned();
353        let legacy_app_key = parse_value_path("app_name").ok();
354        let legacy_proc_key = parse_value_path("proc_id").ok();
355
356        loop {
357            match decoder.decode_eof(&mut buffer) {
358                Ok(Some((decoded, _byte_size))) => {
359                    for mut event in decoded {
360                        if let Event::Log(ref mut log) = event {
361                            if let Ok(ts) = timestamp.parse::<DateTime<Utc>>() {
362                                log_namespace.insert_vector_metadata(
363                                    log,
364                                    log_schema().timestamp_key(),
365                                    path!("timestamp"),
366                                    ts,
367                                );
368                            }
369
370                            log_namespace.insert_source_metadata(
371                                LogplexConfig::NAME,
372                                log,
373                                legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
374                                path!("host"),
375                                hostname.to_owned(),
376                            );
377
378                            log_namespace.insert_source_metadata(
379                                LogplexConfig::NAME,
380                                log,
381                                legacy_app_key.as_ref().map(LegacyKey::InsertIfEmpty),
382                                path!("app_name"),
383                                app_name.to_owned(),
384                            );
385
386                            log_namespace.insert_source_metadata(
387                                LogplexConfig::NAME,
388                                log,
389                                legacy_proc_key.as_ref().map(LegacyKey::InsertIfEmpty),
390                                path!("proc_id"),
391                                proc_id.to_owned(),
392                            );
393                        }
394
395                        events.push(event);
396                    }
397                }
398                Ok(None) => break,
399                Err(error) => {
400                    if !error.can_continue() {
401                        break;
402                    }
403                }
404            }
405        }
406    } else {
407        warn!(
408            message = "Line didn't match expected logplex format, so raw message is forwarded.",
409            fields = parts.len(),
410            internal_log_rate_limit = true
411        );
412
413        events.push(LogEvent::from_str_legacy(line).into())
414    };
415
416    let now = Utc::now();
417
418    for event in &mut events {
419        if let Event::Log(log) = event {
420            log_namespace.insert_standard_vector_source_metadata(log, LogplexConfig::NAME, now);
421        }
422    }
423
424    events
425}
426
427#[cfg(test)]
428mod tests {
429    use std::net::SocketAddr;
430
431    use chrono::{DateTime, Utc};
432    use futures::Stream;
433    use similar_asserts::assert_eq;
434    use vector_lib::lookup::{owned_value_path, OwnedTargetPath};
435    use vector_lib::{
436        config::LogNamespace,
437        event::{Event, EventStatus, Value},
438        schema::Definition,
439    };
440    use vrl::value::{kind::Collection, Kind};
441
442    use super::LogplexConfig;
443    use crate::common::http::server_auth::HttpServerAuthConfig;
444    use crate::{
445        config::{log_schema, SourceConfig, SourceContext},
446        serde::{default_decoding, default_framing_message_based},
447        test_util::{
448            components::{assert_source_compliance, HTTP_PUSH_SOURCE_TAGS},
449            next_addr, random_string, spawn_collect_n, wait_for_tcp,
450        },
451        SourceSender,
452    };
453
454    #[test]
455    fn generate_config() {
456        crate::test_util::test_generate_config::<LogplexConfig>();
457    }
458
459    async fn source(
460        auth: Option<HttpServerAuthConfig>,
461        query_parameters: Vec<String>,
462        status: EventStatus,
463        acknowledgements: bool,
464    ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
465        let (sender, recv) = SourceSender::new_test_finalize(status);
466        let address = next_addr();
467        let context = SourceContext::new_test(sender, None);
468        tokio::spawn(async move {
469            LogplexConfig {
470                address,
471                query_parameters,
472                tls: None,
473                auth,
474                framing: default_framing_message_based(),
475                decoding: default_decoding(),
476                acknowledgements: acknowledgements.into(),
477                log_namespace: None,
478                keepalive: Default::default(),
479            }
480            .build(context)
481            .await
482            .unwrap()
483            .await
484            .unwrap()
485        });
486        wait_for_tcp(address).await;
487        (recv, address)
488    }
489
490    async fn send(
491        address: SocketAddr,
492        body: &str,
493        auth: Option<HttpServerAuthConfig>,
494        query: &str,
495    ) -> u16 {
496        let len = body.lines().count();
497        let mut req = reqwest::Client::new().post(format!("http://{address}/events?{query}"));
498        if let Some(HttpServerAuthConfig::Basic { username, password }) = auth {
499            req = req.basic_auth(username, Some(password.inner()));
500        }
501        req.header("Logplex-Msg-Count", len)
502            .header("Logplex-Frame-Id", "frame-foo")
503            .header("Logplex-Drain-Token", "drain-bar")
504            .body(body.to_owned())
505            .send()
506            .await
507            .unwrap()
508            .status()
509            .as_u16()
510    }
511
512    fn make_auth() -> HttpServerAuthConfig {
513        HttpServerAuthConfig::Basic {
514            username: random_string(16),
515            password: random_string(16).into(),
516        }
517    }
518
519    const SAMPLE_BODY: &str = r#"267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#;
520
521    #[tokio::test]
522    async fn logplex_handles_router_log() {
523        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
524            let auth = make_auth();
525
526            let (rx, addr) = source(
527                Some(auth.clone()),
528                vec!["appname".to_string(), "absent".to_string()],
529                EventStatus::Delivered,
530                true,
531            )
532            .await;
533
534            let mut events = spawn_collect_n(
535                async move {
536                    assert_eq!(
537                        200,
538                        send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
539                    )
540                },
541                rx,
542                SAMPLE_BODY.lines().count(),
543            )
544            .await;
545
546            let event = events.remove(0);
547            let log = event.as_log();
548
549            assert_eq!(
550                *log.get_message().unwrap(),
551                r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
552            );
553            assert_eq!(
554                log[log_schema().timestamp_key().unwrap().to_string()],
555                "2020-01-08T22:33:57.353034+00:00"
556                    .parse::<DateTime<Utc>>()
557                    .unwrap()
558                    .into()
559            );
560            assert_eq!(*log.get_host().unwrap(), "host".into());
561            assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
562            assert_eq!(log["appname"], "lumberjack-store".into());
563            assert_eq!(log["absent"], Value::Null);
564        }).await;
565    }
566
567    #[tokio::test]
568    async fn logplex_query_parameters_wildcard() {
569        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
570            let auth = make_auth();
571
572            let (rx, addr) = source(
573                Some(auth.clone()),
574                vec!["*".to_string()],
575                EventStatus::Delivered,
576                true,
577            )
578            .await;
579
580            let mut events = spawn_collect_n(
581                async move {
582                    assert_eq!(
583                        200,
584                        send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
585                    )
586                },
587                rx,
588                SAMPLE_BODY.lines().count(),
589            )
590            .await;
591
592            let event = events.remove(0);
593            let log = event.as_log();
594
595            assert_eq!(
596                *log.get_message().unwrap(),
597                r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
598            );
599            assert_eq!(
600                log[log_schema().timestamp_key().unwrap().to_string()],
601                "2020-01-08T22:33:57.353034+00:00"
602                    .parse::<DateTime<Utc>>()
603                    .unwrap()
604                    .into()
605            );
606            assert_eq!(*log.get_host().unwrap(), "host".into());
607            assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
608            assert_eq!(log["appname"], "lumberjack-store".into());
609        }).await;
610    }
611
612    #[tokio::test]
613    async fn logplex_handles_failures() {
614        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
615            let auth = make_auth();
616
617            let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, true).await;
618
619            let events = spawn_collect_n(
620                async move {
621                    assert_eq!(
622                        400,
623                        send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
624                    )
625                },
626                rx,
627                SAMPLE_BODY.lines().count(),
628            )
629            .await;
630
631            assert_eq!(events.len(), SAMPLE_BODY.lines().count());
632        })
633        .await;
634    }
635
636    #[tokio::test]
637    async fn logplex_ignores_disabled_acknowledgements() {
638        let auth = make_auth();
639
640        let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, false).await;
641
642        let events = spawn_collect_n(
643            async move {
644                assert_eq!(
645                    200,
646                    send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
647                )
648            },
649            rx,
650            SAMPLE_BODY.lines().count(),
651        )
652        .await;
653
654        assert_eq!(events.len(), SAMPLE_BODY.lines().count());
655    }
656
657    #[tokio::test]
658    async fn logplex_auth_failure() {
659        let (_rx, addr) = source(Some(make_auth()), vec![], EventStatus::Delivered, true).await;
660
661        assert_eq!(
662            401,
663            send(
664                addr,
665                SAMPLE_BODY,
666                Some(make_auth()),
667                "appname=lumberjack-store"
668            )
669            .await
670        );
671    }
672
673    #[test]
674    fn logplex_handles_normal_lines() {
675        let log_namespace = LogNamespace::Legacy;
676        let body = "267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - foo bar baz";
677        let events = super::line_to_events(Default::default(), log_namespace, body.into());
678        let log = events[0].as_log();
679
680        assert_eq!(*log.get_message().unwrap(), "foo bar baz".into());
681        assert_eq!(
682            log[log_schema().timestamp_key().unwrap().to_string()],
683            "2020-01-08T22:33:57.353034+00:00"
684                .parse::<DateTime<Utc>>()
685                .unwrap()
686                .into()
687        );
688        assert_eq!(*log.get_host().unwrap(), "host".into());
689        assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
690    }
691
692    #[test]
693    fn logplex_handles_malformed_lines() {
694        let log_namespace = LogNamespace::Legacy;
695        let body = "what am i doing here";
696        let events = super::line_to_events(Default::default(), log_namespace, body.into());
697        let log = events[0].as_log();
698
699        assert_eq!(*log.get_message().unwrap(), "what am i doing here".into());
700        assert!(log.get_timestamp().is_some());
701        assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
702    }
703
704    #[test]
705    fn logplex_doesnt_blow_up_on_bad_framing() {
706        let log_namespace = LogNamespace::Legacy;
707        let body = "1000000 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - i'm not that long";
708        let events = super::line_to_events(Default::default(), log_namespace, body.into());
709        let log = events[0].as_log();
710
711        assert_eq!(*log.get_message().unwrap(), "i'm not that long".into());
712        assert_eq!(
713            log[log_schema().timestamp_key().unwrap().to_string()],
714            "2020-01-08T22:33:57.353034+00:00"
715                .parse::<DateTime<Utc>>()
716                .unwrap()
717                .into()
718        );
719        assert_eq!(*log.get_host().unwrap(), "host".into());
720        assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
721    }
722
723    #[test]
724    fn output_schema_definition_vector_namespace() {
725        let config = LogplexConfig {
726            log_namespace: Some(true),
727            ..Default::default()
728        };
729
730        let definitions = config
731            .outputs(LogNamespace::Vector)
732            .remove(0)
733            .schema_definition(true);
734
735        let expected_definition =
736            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
737                .with_meaning(OwnedTargetPath::event_root(), "message")
738                .with_metadata_field(
739                    &owned_value_path!("vector", "source_type"),
740                    Kind::bytes(),
741                    None,
742                )
743                .with_metadata_field(
744                    &owned_value_path!("vector", "ingest_timestamp"),
745                    Kind::timestamp(),
746                    None,
747                )
748                .with_metadata_field(
749                    &owned_value_path!(LogplexConfig::NAME, "timestamp"),
750                    Kind::timestamp().or_undefined(),
751                    Some("timestamp"),
752                )
753                .with_metadata_field(
754                    &owned_value_path!(LogplexConfig::NAME, "host"),
755                    Kind::bytes(),
756                    Some("host"),
757                )
758                .with_metadata_field(
759                    &owned_value_path!(LogplexConfig::NAME, "app_name"),
760                    Kind::bytes(),
761                    Some("service"),
762                )
763                .with_metadata_field(
764                    &owned_value_path!(LogplexConfig::NAME, "proc_id"),
765                    Kind::bytes(),
766                    None,
767                )
768                .with_metadata_field(
769                    &owned_value_path!(LogplexConfig::NAME, "query_parameters"),
770                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
771                    None,
772                );
773
774        assert_eq!(definitions, Some(expected_definition))
775    }
776
777    #[test]
778    fn output_schema_definition_legacy_namespace() {
779        let config = LogplexConfig::default();
780
781        let definitions = config
782            .outputs(LogNamespace::Legacy)
783            .remove(0)
784            .schema_definition(true);
785
786        let expected_definition = Definition::new_with_default_metadata(
787            Kind::object(Collection::empty()),
788            [LogNamespace::Legacy],
789        )
790        .with_event_field(
791            &owned_value_path!("message"),
792            Kind::bytes(),
793            Some("message"),
794        )
795        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
796        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
797        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
798        .with_event_field(
799            &owned_value_path!("app_name"),
800            Kind::bytes(),
801            Some("service"),
802        )
803        .with_event_field(&owned_value_path!("proc_id"), Kind::bytes(), None)
804        .unknown_fields(Kind::bytes());
805
806        assert_eq!(definitions, Some(expected_definition))
807    }
808}