vector/sources/
heroku_logs.rs

1use std::{
2    collections::HashMap,
3    io::{BufRead, BufReader},
4    net::SocketAddr,
5    str::FromStr,
6};
7
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use chrono::{DateTime, Utc};
10use smallvec::SmallVec;
11use tokio_util::codec::Decoder as _;
12use vector_lib::{
13    codecs::{
14        StreamDecodingError,
15        decoding::{DeserializerConfig, FramingConfig},
16    },
17    config::{DataType, LegacyKey, LogNamespace},
18    configurable::configurable_component,
19    lookup::{lookup_v2::parse_value_path, owned_value_path, path},
20    schema::Definition,
21};
22use vrl::value::{Kind, kind::Collection};
23use warp::http::{HeaderMap, StatusCode};
24
25use crate::{
26    codecs::{Decoder, DecodingConfig},
27    common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
28    config::{
29        GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
30        SourceOutput, log_schema,
31    },
32    event::{Event, LogEvent},
33    http::KeepaliveConfig,
34    internal_events::{HerokuLogplexRequestReadError, HerokuLogplexRequestReceived},
35    serde::{bool_or_struct, default_decoding, default_framing_message_based},
36    sources::{
37        http_server::{HttpConfigParamKind, build_param_matcher, remove_duplicates},
38        util::{
39            HttpSource,
40            http::{HttpMethod, add_query_parameters},
41        },
42    },
43    tls::TlsEnableableConfig,
44};
45
46/// Configuration for `heroku_logs` source.
47#[configurable_component(source(
48    "heroku_logs",
49    "Collect logs from Heroku's Logplex, the router responsible for receiving logs from your Heroku apps."
50))]
51#[derive(Clone, Debug)]
52pub struct LogplexConfig {
53    /// The socket address to listen for connections on.
54    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
55    #[configurable(metadata(docs::examples = "localhost:80"))]
56    address: SocketAddr,
57
58    /// A list of URL query parameters to include in the log event.
59    ///
60    /// Accepts the wildcard (`*`) character for query parameters matching a specified pattern.
61    ///
62    /// Specifying "*" results in all query parameters included in the log event.
63    ///
64    /// These override any values included in the body with conflicting names.
65    #[serde(default)]
66    #[configurable(metadata(docs::examples = "application"))]
67    #[configurable(metadata(docs::examples = "source"))]
68    #[configurable(metadata(docs::examples = "param*"))]
69    #[configurable(metadata(docs::examples = "*"))]
70    query_parameters: Vec<String>,
71
72    #[configurable(derived)]
73    tls: Option<TlsEnableableConfig>,
74
75    #[configurable(derived)]
76    auth: Option<HttpServerAuthConfig>,
77
78    #[configurable(derived)]
79    #[serde(default = "default_framing_message_based")]
80    framing: FramingConfig,
81
82    #[configurable(derived)]
83    #[serde(default = "default_decoding")]
84    decoding: DeserializerConfig,
85
86    #[configurable(derived)]
87    #[serde(default, deserialize_with = "bool_or_struct")]
88    acknowledgements: SourceAcknowledgementsConfig,
89
90    /// The namespace to use for logs. This overrides the global setting.
91    #[configurable(metadata(docs::hidden))]
92    #[serde(default)]
93    log_namespace: Option<bool>,
94
95    #[configurable(derived)]
96    #[serde(default)]
97    keepalive: KeepaliveConfig,
98}
99
100impl LogplexConfig {
101    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
102    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
103        let mut schema_definition = self
104            .decoding
105            .schema_definition(log_namespace)
106            .with_standard_vector_source_metadata()
107            .with_source_metadata(
108                LogplexConfig::NAME,
109                None,
110                &owned_value_path!("timestamp"),
111                Kind::timestamp().or_undefined(),
112                Some("timestamp"),
113            )
114            .with_source_metadata(
115                LogplexConfig::NAME,
116                log_schema()
117                    .host_key()
118                    .cloned()
119                    .map(LegacyKey::InsertIfEmpty),
120                &owned_value_path!("host"),
121                Kind::bytes(),
122                Some("host"),
123            )
124            .with_source_metadata(
125                LogplexConfig::NAME,
126                Some(LegacyKey::InsertIfEmpty(owned_value_path!("app_name"))),
127                &owned_value_path!("app_name"),
128                Kind::bytes(),
129                Some("service"),
130            )
131            .with_source_metadata(
132                LogplexConfig::NAME,
133                Some(LegacyKey::InsertIfEmpty(owned_value_path!("proc_id"))),
134                &owned_value_path!("proc_id"),
135                Kind::bytes(),
136                None,
137            )
138            // for metadata that is added to the events dynamically from the self.query_parameters
139            .with_source_metadata(
140                LogplexConfig::NAME,
141                None,
142                &owned_value_path!("query_parameters"),
143                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
144                None,
145            );
146
147        // for metadata that is added to the events dynamically from config options
148        if log_namespace == LogNamespace::Legacy {
149            schema_definition = schema_definition.unknown_fields(Kind::bytes());
150        }
151
152        schema_definition
153    }
154}
155
156impl Default for LogplexConfig {
157    fn default() -> Self {
158        Self {
159            address: "0.0.0.0:80".parse().unwrap(),
160            query_parameters: Vec::new(),
161            tls: None,
162            auth: None,
163            framing: default_framing_message_based(),
164            decoding: default_decoding(),
165            acknowledgements: SourceAcknowledgementsConfig::default(),
166            log_namespace: None,
167            keepalive: KeepaliveConfig::default(),
168        }
169    }
170}
171
172impl GenerateConfig for LogplexConfig {
173    fn generate_config() -> toml::Value {
174        toml::Value::try_from(LogplexConfig::default()).unwrap()
175    }
176}
177
178#[async_trait::async_trait]
179#[typetag::serde(name = "heroku_logs")]
180impl SourceConfig for LogplexConfig {
181    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
182        let log_namespace = cx.log_namespace(self.log_namespace);
183
184        let decoder =
185            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
186                .build()?;
187
188        let source = LogplexSource {
189            query_parameters: build_param_matcher(&remove_duplicates(
190                self.query_parameters.clone(),
191                "query_parameters",
192            ))?,
193            decoder,
194            log_namespace,
195        };
196
197        source.run(
198            self.address,
199            "events",
200            HttpMethod::Post,
201            StatusCode::OK,
202            true,
203            self.tls.as_ref(),
204            self.auth.as_ref(),
205            cx,
206            self.acknowledgements,
207            self.keepalive.clone(),
208        )
209    }
210
211    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
212        // There is a global and per-source `log_namespace` config.
213        // The source config overrides the global setting and is merged here.
214        let schema_def = self.schema_definition(global_log_namespace.merge(self.log_namespace));
215        vec![SourceOutput::new_maybe_logs(DataType::Log, schema_def)]
216    }
217
218    fn resources(&self) -> Vec<Resource> {
219        vec![Resource::tcp(self.address)]
220    }
221
222    fn can_acknowledge(&self) -> bool {
223        true
224    }
225}
226
227#[derive(Clone, Default)]
228struct LogplexSource {
229    query_parameters: Vec<HttpConfigParamKind>,
230    decoder: Decoder,
231    log_namespace: LogNamespace,
232}
233
234impl LogplexSource {
235    fn decode_message(
236        &self,
237        body: Bytes,
238        header_map: &HeaderMap,
239    ) -> Result<Vec<Event>, ErrorMessage> {
240        // Deal with headers
241        let msg_count = match usize::from_str(get_header(header_map, "Logplex-Msg-Count")?) {
242            Ok(v) => v,
243            Err(e) => return Err(header_error_message("Logplex-Msg-Count", &e.to_string())),
244        };
245        let frame_id = get_header(header_map, "Logplex-Frame-Id")?;
246        let drain_token = get_header(header_map, "Logplex-Drain-Token")?;
247
248        emit!(HerokuLogplexRequestReceived {
249            msg_count,
250            frame_id,
251            drain_token
252        });
253
254        // Deal with body
255        let events = self.body_to_events(body);
256
257        if events.len() != msg_count {
258            let error_msg = format!(
259                "Parsed event count does not match message count header: {} vs {}",
260                events.len(),
261                msg_count
262            );
263
264            if cfg!(test) {
265                panic!("{}", error_msg);
266            }
267            return Err(header_error_message("Logplex-Msg-Count", &error_msg));
268        }
269
270        Ok(events)
271    }
272
273    fn body_to_events(&self, body: Bytes) -> Vec<Event> {
274        let rdr = BufReader::new(body.reader());
275        rdr.lines()
276            .filter_map(|res| {
277                res.map_err(|error| emit!(HerokuLogplexRequestReadError { error }))
278                    .ok()
279            })
280            .filter(|s| !s.is_empty())
281            .flat_map(|line| line_to_events(self.decoder.clone(), self.log_namespace, line))
282            .collect()
283    }
284}
285
286impl HttpSource for LogplexSource {
287    fn build_events(
288        &self,
289        body: Bytes,
290        header_map: &HeaderMap,
291        _query_parameters: &HashMap<String, String>,
292        _full_path: &str,
293    ) -> Result<Vec<Event>, ErrorMessage> {
294        self.decode_message(body, header_map)
295    }
296
297    fn enrich_events(
298        &self,
299        events: &mut [Event],
300        _request_path: &str,
301        _headers_config: &HeaderMap,
302        query_parameters: &HashMap<String, String>,
303        _source_ip: Option<&SocketAddr>,
304    ) {
305        add_query_parameters(
306            events,
307            &self.query_parameters,
308            query_parameters,
309            self.log_namespace,
310            LogplexConfig::NAME,
311        );
312    }
313}
314
315fn get_header<'a>(header_map: &'a HeaderMap, name: &str) -> Result<&'a str, ErrorMessage> {
316    if let Some(header_value) = header_map.get(name) {
317        header_value
318            .to_str()
319            .map_err(|e| header_error_message(name, &e.to_string()))
320    } else {
321        Err(header_error_message(name, "Header does not exist"))
322    }
323}
324
325fn header_error_message(name: &str, msg: &str) -> ErrorMessage {
326    ErrorMessage::new(
327        StatusCode::BAD_REQUEST,
328        format!("Invalid request header {name:?}: {msg:?}"),
329    )
330}
331
332fn line_to_events(
333    mut decoder: Decoder,
334    log_namespace: LogNamespace,
335    line: String,
336) -> SmallVec<[Event; 1]> {
337    let parts = line.splitn(8, ' ').collect::<Vec<&str>>();
338
339    let mut events = SmallVec::<[Event; 1]>::new();
340
341    if parts.len() == 8 {
342        let timestamp = parts[2];
343        let hostname = parts[3];
344        let app_name = parts[4];
345        let proc_id = parts[5];
346        let message = parts[7];
347
348        let mut buffer = BytesMut::new();
349        buffer.put(message.as_bytes());
350
351        let legacy_host_key = log_schema().host_key().cloned();
352        let legacy_app_key = parse_value_path("app_name").ok();
353        let legacy_proc_key = parse_value_path("proc_id").ok();
354
355        loop {
356            match decoder.decode_eof(&mut buffer) {
357                Ok(Some((decoded, _byte_size))) => {
358                    for mut event in decoded {
359                        if let Event::Log(ref mut log) = event {
360                            if let Ok(ts) = timestamp.parse::<DateTime<Utc>>() {
361                                log_namespace.insert_vector_metadata(
362                                    log,
363                                    log_schema().timestamp_key(),
364                                    path!("timestamp"),
365                                    ts,
366                                );
367                            }
368
369                            log_namespace.insert_source_metadata(
370                                LogplexConfig::NAME,
371                                log,
372                                legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
373                                path!("host"),
374                                hostname.to_owned(),
375                            );
376
377                            log_namespace.insert_source_metadata(
378                                LogplexConfig::NAME,
379                                log,
380                                legacy_app_key.as_ref().map(LegacyKey::InsertIfEmpty),
381                                path!("app_name"),
382                                app_name.to_owned(),
383                            );
384
385                            log_namespace.insert_source_metadata(
386                                LogplexConfig::NAME,
387                                log,
388                                legacy_proc_key.as_ref().map(LegacyKey::InsertIfEmpty),
389                                path!("proc_id"),
390                                proc_id.to_owned(),
391                            );
392                        }
393
394                        events.push(event);
395                    }
396                }
397                Ok(None) => break,
398                Err(error) => {
399                    if !error.can_continue() {
400                        break;
401                    }
402                }
403            }
404        }
405    } else {
406        warn!(
407            message = "Line didn't match expected logplex format, so raw message is forwarded.",
408            fields = parts.len(),
409            internal_log_rate_limit = true
410        );
411
412        events.push(LogEvent::from_str_legacy(line).into())
413    };
414
415    let now = Utc::now();
416
417    for event in &mut events {
418        if let Event::Log(log) = event {
419            log_namespace.insert_standard_vector_source_metadata(log, LogplexConfig::NAME, now);
420        }
421    }
422
423    events
424}
425
426#[cfg(test)]
427mod tests {
428    use std::net::SocketAddr;
429
430    use chrono::{DateTime, Utc};
431    use futures::Stream;
432    use similar_asserts::assert_eq;
433    use vector_lib::{
434        config::LogNamespace,
435        event::{Event, EventStatus, Value},
436        lookup::{OwnedTargetPath, owned_value_path},
437        schema::Definition,
438    };
439    use vrl::value::{Kind, kind::Collection};
440
441    use super::LogplexConfig;
442    use crate::{
443        SourceSender,
444        common::http::server_auth::HttpServerAuthConfig,
445        config::{SourceConfig, SourceContext, log_schema},
446        serde::{default_decoding, default_framing_message_based},
447        test_util::{
448            components::{HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
449            next_addr, random_string, spawn_collect_n, wait_for_tcp,
450        },
451    };
452
453    #[test]
454    fn generate_config() {
455        crate::test_util::test_generate_config::<LogplexConfig>();
456    }
457
458    async fn source(
459        auth: Option<HttpServerAuthConfig>,
460        query_parameters: Vec<String>,
461        status: EventStatus,
462        acknowledgements: bool,
463    ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
464        let (sender, recv) = SourceSender::new_test_finalize(status);
465        let address = next_addr();
466        let context = SourceContext::new_test(sender, None);
467        tokio::spawn(async move {
468            LogplexConfig {
469                address,
470                query_parameters,
471                tls: None,
472                auth,
473                framing: default_framing_message_based(),
474                decoding: default_decoding(),
475                acknowledgements: acknowledgements.into(),
476                log_namespace: None,
477                keepalive: Default::default(),
478            }
479            .build(context)
480            .await
481            .unwrap()
482            .await
483            .unwrap()
484        });
485        wait_for_tcp(address).await;
486        (recv, address)
487    }
488
489    async fn send(
490        address: SocketAddr,
491        body: &str,
492        auth: Option<HttpServerAuthConfig>,
493        query: &str,
494    ) -> u16 {
495        let len = body.lines().count();
496        let mut req = reqwest::Client::new().post(format!("http://{address}/events?{query}"));
497        if let Some(HttpServerAuthConfig::Basic { username, password }) = auth {
498            req = req.basic_auth(username, Some(password.inner()));
499        }
500        req.header("Logplex-Msg-Count", len)
501            .header("Logplex-Frame-Id", "frame-foo")
502            .header("Logplex-Drain-Token", "drain-bar")
503            .body(body.to_owned())
504            .send()
505            .await
506            .unwrap()
507            .status()
508            .as_u16()
509    }
510
511    fn make_auth() -> HttpServerAuthConfig {
512        HttpServerAuthConfig::Basic {
513            username: random_string(16),
514            password: random_string(16).into(),
515        }
516    }
517
518    const SAMPLE_BODY: &str = r#"267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#;
519
520    #[tokio::test]
521    async fn logplex_handles_router_log() {
522        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
523            let auth = make_auth();
524
525            let (rx, addr) = source(
526                Some(auth.clone()),
527                vec!["appname".to_string(), "absent".to_string()],
528                EventStatus::Delivered,
529                true,
530            )
531            .await;
532
533            let mut events = spawn_collect_n(
534                async move {
535                    assert_eq!(
536                        200,
537                        send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
538                    )
539                },
540                rx,
541                SAMPLE_BODY.lines().count(),
542            )
543            .await;
544
545            let event = events.remove(0);
546            let log = event.as_log();
547
548            assert_eq!(
549                *log.get_message().unwrap(),
550                r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
551            );
552            assert_eq!(
553                log[log_schema().timestamp_key().unwrap().to_string()],
554                "2020-01-08T22:33:57.353034+00:00"
555                    .parse::<DateTime<Utc>>()
556                    .unwrap()
557                    .into()
558            );
559            assert_eq!(*log.get_host().unwrap(), "host".into());
560            assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
561            assert_eq!(log["appname"], "lumberjack-store".into());
562            assert_eq!(log["absent"], Value::Null);
563        }).await;
564    }
565
566    #[tokio::test]
567    async fn logplex_query_parameters_wildcard() {
568        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
569            let auth = make_auth();
570
571            let (rx, addr) = source(
572                Some(auth.clone()),
573                vec!["*".to_string()],
574                EventStatus::Delivered,
575                true,
576            )
577            .await;
578
579            let mut events = spawn_collect_n(
580                async move {
581                    assert_eq!(
582                        200,
583                        send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
584                    )
585                },
586                rx,
587                SAMPLE_BODY.lines().count(),
588            )
589            .await;
590
591            let event = events.remove(0);
592            let log = event.as_log();
593
594            assert_eq!(
595                *log.get_message().unwrap(),
596                r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
597            );
598            assert_eq!(
599                log[log_schema().timestamp_key().unwrap().to_string()],
600                "2020-01-08T22:33:57.353034+00:00"
601                    .parse::<DateTime<Utc>>()
602                    .unwrap()
603                    .into()
604            );
605            assert_eq!(*log.get_host().unwrap(), "host".into());
606            assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
607            assert_eq!(log["appname"], "lumberjack-store".into());
608        }).await;
609    }
610
611    #[tokio::test]
612    async fn logplex_handles_failures() {
613        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
614            let auth = make_auth();
615
616            let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, true).await;
617
618            let events = spawn_collect_n(
619                async move {
620                    assert_eq!(
621                        400,
622                        send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
623                    )
624                },
625                rx,
626                SAMPLE_BODY.lines().count(),
627            )
628            .await;
629
630            assert_eq!(events.len(), SAMPLE_BODY.lines().count());
631        })
632        .await;
633    }
634
635    #[tokio::test]
636    async fn logplex_ignores_disabled_acknowledgements() {
637        let auth = make_auth();
638
639        let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, false).await;
640
641        let events = spawn_collect_n(
642            async move {
643                assert_eq!(
644                    200,
645                    send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
646                )
647            },
648            rx,
649            SAMPLE_BODY.lines().count(),
650        )
651        .await;
652
653        assert_eq!(events.len(), SAMPLE_BODY.lines().count());
654    }
655
656    #[tokio::test]
657    async fn logplex_auth_failure() {
658        let (_rx, addr) = source(Some(make_auth()), vec![], EventStatus::Delivered, true).await;
659
660        assert_eq!(
661            401,
662            send(
663                addr,
664                SAMPLE_BODY,
665                Some(make_auth()),
666                "appname=lumberjack-store"
667            )
668            .await
669        );
670    }
671
672    #[test]
673    fn logplex_handles_normal_lines() {
674        let log_namespace = LogNamespace::Legacy;
675        let body = "267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - foo bar baz";
676        let events = super::line_to_events(Default::default(), log_namespace, body.into());
677        let log = events[0].as_log();
678
679        assert_eq!(*log.get_message().unwrap(), "foo bar baz".into());
680        assert_eq!(
681            log[log_schema().timestamp_key().unwrap().to_string()],
682            "2020-01-08T22:33:57.353034+00:00"
683                .parse::<DateTime<Utc>>()
684                .unwrap()
685                .into()
686        );
687        assert_eq!(*log.get_host().unwrap(), "host".into());
688        assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
689    }
690
691    #[test]
692    fn logplex_handles_malformed_lines() {
693        let log_namespace = LogNamespace::Legacy;
694        let body = "what am i doing here";
695        let events = super::line_to_events(Default::default(), log_namespace, body.into());
696        let log = events[0].as_log();
697
698        assert_eq!(*log.get_message().unwrap(), "what am i doing here".into());
699        assert!(log.get_timestamp().is_some());
700        assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
701    }
702
703    #[test]
704    fn logplex_doesnt_blow_up_on_bad_framing() {
705        let log_namespace = LogNamespace::Legacy;
706        let body = "1000000 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - i'm not that long";
707        let events = super::line_to_events(Default::default(), log_namespace, body.into());
708        let log = events[0].as_log();
709
710        assert_eq!(*log.get_message().unwrap(), "i'm not that long".into());
711        assert_eq!(
712            log[log_schema().timestamp_key().unwrap().to_string()],
713            "2020-01-08T22:33:57.353034+00:00"
714                .parse::<DateTime<Utc>>()
715                .unwrap()
716                .into()
717        );
718        assert_eq!(*log.get_host().unwrap(), "host".into());
719        assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
720    }
721
722    #[test]
723    fn output_schema_definition_vector_namespace() {
724        let config = LogplexConfig {
725            log_namespace: Some(true),
726            ..Default::default()
727        };
728
729        let definitions = config
730            .outputs(LogNamespace::Vector)
731            .remove(0)
732            .schema_definition(true);
733
734        let expected_definition =
735            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
736                .with_meaning(OwnedTargetPath::event_root(), "message")
737                .with_metadata_field(
738                    &owned_value_path!("vector", "source_type"),
739                    Kind::bytes(),
740                    None,
741                )
742                .with_metadata_field(
743                    &owned_value_path!("vector", "ingest_timestamp"),
744                    Kind::timestamp(),
745                    None,
746                )
747                .with_metadata_field(
748                    &owned_value_path!(LogplexConfig::NAME, "timestamp"),
749                    Kind::timestamp().or_undefined(),
750                    Some("timestamp"),
751                )
752                .with_metadata_field(
753                    &owned_value_path!(LogplexConfig::NAME, "host"),
754                    Kind::bytes(),
755                    Some("host"),
756                )
757                .with_metadata_field(
758                    &owned_value_path!(LogplexConfig::NAME, "app_name"),
759                    Kind::bytes(),
760                    Some("service"),
761                )
762                .with_metadata_field(
763                    &owned_value_path!(LogplexConfig::NAME, "proc_id"),
764                    Kind::bytes(),
765                    None,
766                )
767                .with_metadata_field(
768                    &owned_value_path!(LogplexConfig::NAME, "query_parameters"),
769                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
770                    None,
771                );
772
773        assert_eq!(definitions, Some(expected_definition))
774    }
775
776    #[test]
777    fn output_schema_definition_legacy_namespace() {
778        let config = LogplexConfig::default();
779
780        let definitions = config
781            .outputs(LogNamespace::Legacy)
782            .remove(0)
783            .schema_definition(true);
784
785        let expected_definition = Definition::new_with_default_metadata(
786            Kind::object(Collection::empty()),
787            [LogNamespace::Legacy],
788        )
789        .with_event_field(
790            &owned_value_path!("message"),
791            Kind::bytes(),
792            Some("message"),
793        )
794        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
795        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
796        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
797        .with_event_field(
798            &owned_value_path!("app_name"),
799            Kind::bytes(),
800            Some("service"),
801        )
802        .with_event_field(&owned_value_path!("proc_id"), Kind::bytes(), None)
803        .unknown_fields(Kind::bytes());
804
805        assert_eq!(definitions, Some(expected_definition))
806    }
807}