vector/sources/
heroku_logs.rs

1use std::{
2    collections::HashMap,
3    io::{BufRead, BufReader},
4    net::SocketAddr,
5    str::FromStr,
6};
7
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9use chrono::{DateTime, Utc};
10use smallvec::SmallVec;
11use tokio_util::codec::Decoder as _;
12use vector_lib::{
13    codecs::{
14        StreamDecodingError,
15        decoding::{DeserializerConfig, FramingConfig},
16    },
17    config::{DataType, LegacyKey, LogNamespace},
18    configurable::configurable_component,
19    lookup::{lookup_v2::parse_value_path, owned_value_path, path},
20    schema::Definition,
21};
22use vrl::value::{Kind, kind::Collection};
23use warp::http::{HeaderMap, StatusCode};
24
25use crate::{
26    codecs::{Decoder, DecodingConfig},
27    common::http::{ErrorMessage, server_auth::HttpServerAuthConfig},
28    config::{
29        GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
30        SourceOutput, log_schema,
31    },
32    event::{Event, LogEvent},
33    http::KeepaliveConfig,
34    internal_events::{HerokuLogplexRequestReadError, HerokuLogplexRequestReceived},
35    serde::{bool_or_struct, default_decoding, default_framing_message_based},
36    sources::{
37        http_server::{HttpConfigParamKind, build_param_matcher, remove_duplicates},
38        util::{
39            HttpSource,
40            http::{HttpMethod, add_query_parameters},
41        },
42    },
43    tls::TlsEnableableConfig,
44};
45
46/// Configuration for `heroku_logs` source.
47#[configurable_component(source(
48    "heroku_logs",
49    "Collect logs from Heroku's Logplex, the router responsible for receiving logs from your Heroku apps."
50))]
51#[derive(Clone, Debug)]
52pub struct LogplexConfig {
53    /// The socket address to listen for connections on.
54    #[configurable(metadata(docs::examples = "0.0.0.0:80"))]
55    #[configurable(metadata(docs::examples = "localhost:80"))]
56    address: SocketAddr,
57
58    /// A list of URL query parameters to include in the log event.
59    ///
60    /// Accepts the wildcard (`*`) character for query parameters matching a specified pattern.
61    ///
62    /// Specifying "*" results in all query parameters included in the log event.
63    ///
64    /// These override any values included in the body with conflicting names.
65    #[serde(default)]
66    #[configurable(metadata(docs::examples = "application"))]
67    #[configurable(metadata(docs::examples = "source"))]
68    #[configurable(metadata(docs::examples = "param*"))]
69    #[configurable(metadata(docs::examples = "*"))]
70    query_parameters: Vec<String>,
71
72    #[configurable(derived)]
73    tls: Option<TlsEnableableConfig>,
74
75    #[configurable(derived)]
76    auth: Option<HttpServerAuthConfig>,
77
78    #[configurable(derived)]
79    #[serde(default = "default_framing_message_based")]
80    framing: FramingConfig,
81
82    #[configurable(derived)]
83    #[serde(default = "default_decoding")]
84    decoding: DeserializerConfig,
85
86    #[configurable(derived)]
87    #[serde(default, deserialize_with = "bool_or_struct")]
88    acknowledgements: SourceAcknowledgementsConfig,
89
90    /// The namespace to use for logs. This overrides the global setting.
91    #[configurable(metadata(docs::hidden))]
92    #[serde(default)]
93    log_namespace: Option<bool>,
94
95    #[configurable(derived)]
96    #[serde(default)]
97    keepalive: KeepaliveConfig,
98}
99
100impl LogplexConfig {
101    /// Builds the `schema::Definition` for this source using the provided `LogNamespace`.
102    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
103        let mut schema_definition = self
104            .decoding
105            .schema_definition(log_namespace)
106            .with_standard_vector_source_metadata()
107            .with_source_metadata(
108                LogplexConfig::NAME,
109                None,
110                &owned_value_path!("timestamp"),
111                Kind::timestamp().or_undefined(),
112                Some("timestamp"),
113            )
114            .with_source_metadata(
115                LogplexConfig::NAME,
116                log_schema()
117                    .host_key()
118                    .cloned()
119                    .map(LegacyKey::InsertIfEmpty),
120                &owned_value_path!("host"),
121                Kind::bytes(),
122                Some("host"),
123            )
124            .with_source_metadata(
125                LogplexConfig::NAME,
126                Some(LegacyKey::InsertIfEmpty(owned_value_path!("app_name"))),
127                &owned_value_path!("app_name"),
128                Kind::bytes(),
129                Some("service"),
130            )
131            .with_source_metadata(
132                LogplexConfig::NAME,
133                Some(LegacyKey::InsertIfEmpty(owned_value_path!("proc_id"))),
134                &owned_value_path!("proc_id"),
135                Kind::bytes(),
136                None,
137            )
138            // for metadata that is added to the events dynamically from the self.query_parameters
139            .with_source_metadata(
140                LogplexConfig::NAME,
141                None,
142                &owned_value_path!("query_parameters"),
143                Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
144                None,
145            );
146
147        // for metadata that is added to the events dynamically from config options
148        if log_namespace == LogNamespace::Legacy {
149            schema_definition = schema_definition.unknown_fields(Kind::bytes());
150        }
151
152        schema_definition
153    }
154}
155
156impl Default for LogplexConfig {
157    fn default() -> Self {
158        Self {
159            address: "0.0.0.0:80".parse().unwrap(),
160            query_parameters: Vec::new(),
161            tls: None,
162            auth: None,
163            framing: default_framing_message_based(),
164            decoding: default_decoding(),
165            acknowledgements: SourceAcknowledgementsConfig::default(),
166            log_namespace: None,
167            keepalive: KeepaliveConfig::default(),
168        }
169    }
170}
171
172impl GenerateConfig for LogplexConfig {
173    fn generate_config() -> toml::Value {
174        toml::Value::try_from(LogplexConfig::default()).unwrap()
175    }
176}
177
178#[async_trait::async_trait]
179#[typetag::serde(name = "heroku_logs")]
180impl SourceConfig for LogplexConfig {
181    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
182        let log_namespace = cx.log_namespace(self.log_namespace);
183
184        let decoder =
185            DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
186                .build()?;
187
188        let source = LogplexSource {
189            query_parameters: build_param_matcher(&remove_duplicates(
190                self.query_parameters.clone(),
191                "query_parameters",
192            ))?,
193            decoder,
194            log_namespace,
195        };
196
197        source.run(
198            self.address,
199            "events",
200            HttpMethod::Post,
201            StatusCode::OK,
202            true,
203            self.tls.as_ref(),
204            self.auth.as_ref(),
205            cx,
206            self.acknowledgements,
207            self.keepalive.clone(),
208        )
209    }
210
211    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
212        // There is a global and per-source `log_namespace` config.
213        // The source config overrides the global setting and is merged here.
214        let schema_def = self.schema_definition(global_log_namespace.merge(self.log_namespace));
215        vec![SourceOutput::new_maybe_logs(DataType::Log, schema_def)]
216    }
217
218    fn resources(&self) -> Vec<Resource> {
219        vec![Resource::tcp(self.address)]
220    }
221
222    fn can_acknowledge(&self) -> bool {
223        true
224    }
225}
226
227#[derive(Clone, Default)]
228struct LogplexSource {
229    query_parameters: Vec<HttpConfigParamKind>,
230    decoder: Decoder,
231    log_namespace: LogNamespace,
232}
233
234impl LogplexSource {
235    fn decode_message(
236        &self,
237        body: Bytes,
238        header_map: &HeaderMap,
239    ) -> Result<Vec<Event>, ErrorMessage> {
240        // Deal with headers
241        let msg_count = match usize::from_str(get_header(header_map, "Logplex-Msg-Count")?) {
242            Ok(v) => v,
243            Err(e) => return Err(header_error_message("Logplex-Msg-Count", &e.to_string())),
244        };
245        let frame_id = get_header(header_map, "Logplex-Frame-Id")?;
246        let drain_token = get_header(header_map, "Logplex-Drain-Token")?;
247
248        emit!(HerokuLogplexRequestReceived {
249            msg_count,
250            frame_id,
251            drain_token
252        });
253
254        // Deal with body
255        let events = self.body_to_events(body);
256
257        if events.len() != msg_count {
258            let error_msg = format!(
259                "Parsed event count does not match message count header: {} vs {}",
260                events.len(),
261                msg_count
262            );
263
264            if cfg!(test) {
265                panic!("{}", error_msg);
266            }
267            return Err(header_error_message("Logplex-Msg-Count", &error_msg));
268        }
269
270        Ok(events)
271    }
272
273    fn body_to_events(&self, body: Bytes) -> Vec<Event> {
274        let rdr = BufReader::new(body.reader());
275        rdr.lines()
276            .filter_map(|res| {
277                res.map_err(|error| emit!(HerokuLogplexRequestReadError { error }))
278                    .ok()
279            })
280            .filter(|s| !s.is_empty())
281            .flat_map(|line| line_to_events(self.decoder.clone(), self.log_namespace, line))
282            .collect()
283    }
284}
285
286impl HttpSource for LogplexSource {
287    fn build_events(
288        &self,
289        body: Bytes,
290        header_map: &HeaderMap,
291        _query_parameters: &HashMap<String, String>,
292        _full_path: &str,
293    ) -> Result<Vec<Event>, ErrorMessage> {
294        self.decode_message(body, header_map)
295    }
296
297    fn enrich_events(
298        &self,
299        events: &mut [Event],
300        _request_path: &str,
301        _headers_config: &HeaderMap,
302        query_parameters: &HashMap<String, String>,
303        _source_ip: Option<&SocketAddr>,
304    ) {
305        add_query_parameters(
306            events,
307            &self.query_parameters,
308            query_parameters,
309            self.log_namespace,
310            LogplexConfig::NAME,
311        );
312    }
313}
314
315fn get_header<'a>(header_map: &'a HeaderMap, name: &str) -> Result<&'a str, ErrorMessage> {
316    if let Some(header_value) = header_map.get(name) {
317        header_value
318            .to_str()
319            .map_err(|e| header_error_message(name, &e.to_string()))
320    } else {
321        Err(header_error_message(name, "Header does not exist"))
322    }
323}
324
325fn header_error_message(name: &str, msg: &str) -> ErrorMessage {
326    ErrorMessage::new(
327        StatusCode::BAD_REQUEST,
328        format!("Invalid request header {name:?}: {msg:?}"),
329    )
330}
331
332fn line_to_events(
333    mut decoder: Decoder,
334    log_namespace: LogNamespace,
335    line: String,
336) -> SmallVec<[Event; 1]> {
337    let parts = line.splitn(8, ' ').collect::<Vec<&str>>();
338
339    let mut events = SmallVec::<[Event; 1]>::new();
340
341    if parts.len() == 8 {
342        let timestamp = parts[2];
343        let hostname = parts[3];
344        let app_name = parts[4];
345        let proc_id = parts[5];
346        let message = parts[7];
347
348        let mut buffer = BytesMut::new();
349        buffer.put(message.as_bytes());
350
351        let legacy_host_key = log_schema().host_key().cloned();
352        let legacy_app_key = parse_value_path("app_name").ok();
353        let legacy_proc_key = parse_value_path("proc_id").ok();
354
355        loop {
356            match decoder.decode_eof(&mut buffer) {
357                Ok(Some((decoded, _byte_size))) => {
358                    for mut event in decoded {
359                        if let Event::Log(ref mut log) = event {
360                            if let Ok(ts) = timestamp.parse::<DateTime<Utc>>() {
361                                log_namespace.insert_vector_metadata(
362                                    log,
363                                    log_schema().timestamp_key(),
364                                    path!("timestamp"),
365                                    ts,
366                                );
367                            }
368
369                            log_namespace.insert_source_metadata(
370                                LogplexConfig::NAME,
371                                log,
372                                legacy_host_key.as_ref().map(LegacyKey::InsertIfEmpty),
373                                path!("host"),
374                                hostname.to_owned(),
375                            );
376
377                            log_namespace.insert_source_metadata(
378                                LogplexConfig::NAME,
379                                log,
380                                legacy_app_key.as_ref().map(LegacyKey::InsertIfEmpty),
381                                path!("app_name"),
382                                app_name.to_owned(),
383                            );
384
385                            log_namespace.insert_source_metadata(
386                                LogplexConfig::NAME,
387                                log,
388                                legacy_proc_key.as_ref().map(LegacyKey::InsertIfEmpty),
389                                path!("proc_id"),
390                                proc_id.to_owned(),
391                            );
392                        }
393
394                        events.push(event);
395                    }
396                }
397                Ok(None) => break,
398                Err(error) => {
399                    if !error.can_continue() {
400                        break;
401                    }
402                }
403            }
404        }
405    } else {
406        warn!(
407            message = "Line didn't match expected logplex format, so raw message is forwarded.",
408            fields = parts.len()
409        );
410
411        events.push(LogEvent::from_str_legacy(line).into())
412    };
413
414    let now = Utc::now();
415
416    for event in &mut events {
417        if let Event::Log(log) = event {
418            log_namespace.insert_standard_vector_source_metadata(log, LogplexConfig::NAME, now);
419        }
420    }
421
422    events
423}
424
425#[cfg(test)]
426mod tests {
427    use std::net::SocketAddr;
428
429    use chrono::{DateTime, Utc};
430    use futures::Stream;
431    use similar_asserts::assert_eq;
432    use vector_lib::{
433        config::LogNamespace,
434        event::{Event, EventStatus, Value},
435        lookup::{OwnedTargetPath, owned_value_path},
436        schema::Definition,
437    };
438    use vrl::value::{Kind, kind::Collection};
439
440    use super::LogplexConfig;
441    use crate::{
442        SourceSender,
443        common::http::server_auth::HttpServerAuthConfig,
444        config::{SourceConfig, SourceContext, log_schema},
445        serde::{default_decoding, default_framing_message_based},
446        test_util::{
447            components::{HTTP_PUSH_SOURCE_TAGS, assert_source_compliance},
448            next_addr, random_string, spawn_collect_n, wait_for_tcp,
449        },
450    };
451
452    #[test]
453    fn generate_config() {
454        crate::test_util::test_generate_config::<LogplexConfig>();
455    }
456
457    async fn source(
458        auth: Option<HttpServerAuthConfig>,
459        query_parameters: Vec<String>,
460        status: EventStatus,
461        acknowledgements: bool,
462    ) -> (impl Stream<Item = Event> + Unpin, SocketAddr) {
463        let (sender, recv) = SourceSender::new_test_finalize(status);
464        let address = next_addr();
465        let context = SourceContext::new_test(sender, None);
466        tokio::spawn(async move {
467            LogplexConfig {
468                address,
469                query_parameters,
470                tls: None,
471                auth,
472                framing: default_framing_message_based(),
473                decoding: default_decoding(),
474                acknowledgements: acknowledgements.into(),
475                log_namespace: None,
476                keepalive: Default::default(),
477            }
478            .build(context)
479            .await
480            .unwrap()
481            .await
482            .unwrap()
483        });
484        wait_for_tcp(address).await;
485        (recv, address)
486    }
487
488    async fn send(
489        address: SocketAddr,
490        body: &str,
491        auth: Option<HttpServerAuthConfig>,
492        query: &str,
493    ) -> u16 {
494        let len = body.lines().count();
495        let mut req = reqwest::Client::new().post(format!("http://{address}/events?{query}"));
496        if let Some(HttpServerAuthConfig::Basic { username, password }) = auth {
497            req = req.basic_auth(username, Some(password.inner()));
498        }
499        req.header("Logplex-Msg-Count", len)
500            .header("Logplex-Frame-Id", "frame-foo")
501            .header("Logplex-Drain-Token", "drain-bar")
502            .body(body.to_owned())
503            .send()
504            .await
505            .unwrap()
506            .status()
507            .as_u16()
508    }
509
510    fn make_auth() -> HttpServerAuthConfig {
511        HttpServerAuthConfig::Basic {
512            username: random_string(16),
513            password: random_string(16).into(),
514        }
515    }
516
517    const SAMPLE_BODY: &str = r#"267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#;
518
519    #[tokio::test]
520    async fn logplex_handles_router_log() {
521        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
522            let auth = make_auth();
523
524            let (rx, addr) = source(
525                Some(auth.clone()),
526                vec!["appname".to_string(), "absent".to_string()],
527                EventStatus::Delivered,
528                true,
529            )
530            .await;
531
532            let mut events = spawn_collect_n(
533                async move {
534                    assert_eq!(
535                        200,
536                        send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
537                    )
538                },
539                rx,
540                SAMPLE_BODY.lines().count(),
541            )
542            .await;
543
544            let event = events.remove(0);
545            let log = event.as_log();
546
547            assert_eq!(
548                *log.get_message().unwrap(),
549                r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
550            );
551            assert_eq!(
552                log[log_schema().timestamp_key().unwrap().to_string()],
553                "2020-01-08T22:33:57.353034+00:00"
554                    .parse::<DateTime<Utc>>()
555                    .unwrap()
556                    .into()
557            );
558            assert_eq!(*log.get_host().unwrap(), "host".into());
559            assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
560            assert_eq!(log["appname"], "lumberjack-store".into());
561            assert_eq!(log["absent"], Value::Null);
562        }).await;
563    }
564
565    #[tokio::test]
566    async fn logplex_query_parameters_wildcard() {
567        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
568            let auth = make_auth();
569
570            let (rx, addr) = source(
571                Some(auth.clone()),
572                vec!["*".to_string()],
573                EventStatus::Delivered,
574                true,
575            )
576            .await;
577
578            let mut events = spawn_collect_n(
579                async move {
580                    assert_eq!(
581                        200,
582                        send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
583                    )
584                },
585                rx,
586                SAMPLE_BODY.lines().count(),
587            )
588            .await;
589
590            let event = events.remove(0);
591            let log = event.as_log();
592
593            assert_eq!(
594                *log.get_message().unwrap(),
595                r#"at=info method=GET path="/cart_link" host=lumberjack-store.timber.io request_id=05726858-c44e-4f94-9a20-37df73be9006 fwd="73.75.38.87" dyno=web.1 connect=1ms service=22ms status=304 bytes=656 protocol=http"#.into()
596            );
597            assert_eq!(
598                log[log_schema().timestamp_key().unwrap().to_string()],
599                "2020-01-08T22:33:57.353034+00:00"
600                    .parse::<DateTime<Utc>>()
601                    .unwrap()
602                    .into()
603            );
604            assert_eq!(*log.get_host().unwrap(), "host".into());
605            assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
606            assert_eq!(log["appname"], "lumberjack-store".into());
607        }).await;
608    }
609
610    #[tokio::test]
611    async fn logplex_handles_failures() {
612        assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
613            let auth = make_auth();
614
615            let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, true).await;
616
617            let events = spawn_collect_n(
618                async move {
619                    assert_eq!(
620                        400,
621                        send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
622                    )
623                },
624                rx,
625                SAMPLE_BODY.lines().count(),
626            )
627            .await;
628
629            assert_eq!(events.len(), SAMPLE_BODY.lines().count());
630        })
631        .await;
632    }
633
634    #[tokio::test]
635    async fn logplex_ignores_disabled_acknowledgements() {
636        let auth = make_auth();
637
638        let (rx, addr) = source(Some(auth.clone()), vec![], EventStatus::Rejected, false).await;
639
640        let events = spawn_collect_n(
641            async move {
642                assert_eq!(
643                    200,
644                    send(addr, SAMPLE_BODY, Some(auth), "appname=lumberjack-store").await
645                )
646            },
647            rx,
648            SAMPLE_BODY.lines().count(),
649        )
650        .await;
651
652        assert_eq!(events.len(), SAMPLE_BODY.lines().count());
653    }
654
655    #[tokio::test]
656    async fn logplex_auth_failure() {
657        let (_rx, addr) = source(Some(make_auth()), vec![], EventStatus::Delivered, true).await;
658
659        assert_eq!(
660            401,
661            send(
662                addr,
663                SAMPLE_BODY,
664                Some(make_auth()),
665                "appname=lumberjack-store"
666            )
667            .await
668        );
669    }
670
671    #[test]
672    fn logplex_handles_normal_lines() {
673        let log_namespace = LogNamespace::Legacy;
674        let body = "267 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - foo bar baz";
675        let events = super::line_to_events(Default::default(), log_namespace, body.into());
676        let log = events[0].as_log();
677
678        assert_eq!(*log.get_message().unwrap(), "foo bar baz".into());
679        assert_eq!(
680            log[log_schema().timestamp_key().unwrap().to_string()],
681            "2020-01-08T22:33:57.353034+00:00"
682                .parse::<DateTime<Utc>>()
683                .unwrap()
684                .into()
685        );
686        assert_eq!(*log.get_host().unwrap(), "host".into());
687        assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
688    }
689
690    #[test]
691    fn logplex_handles_malformed_lines() {
692        let log_namespace = LogNamespace::Legacy;
693        let body = "what am i doing here";
694        let events = super::line_to_events(Default::default(), log_namespace, body.into());
695        let log = events[0].as_log();
696
697        assert_eq!(*log.get_message().unwrap(), "what am i doing here".into());
698        assert!(log.get_timestamp().is_some());
699        assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
700    }
701
702    #[test]
703    fn logplex_doesnt_blow_up_on_bad_framing() {
704        let log_namespace = LogNamespace::Legacy;
705        let body = "1000000 <158>1 2020-01-08T22:33:57.353034+00:00 host heroku router - i'm not that long";
706        let events = super::line_to_events(Default::default(), log_namespace, body.into());
707        let log = events[0].as_log();
708
709        assert_eq!(*log.get_message().unwrap(), "i'm not that long".into());
710        assert_eq!(
711            log[log_schema().timestamp_key().unwrap().to_string()],
712            "2020-01-08T22:33:57.353034+00:00"
713                .parse::<DateTime<Utc>>()
714                .unwrap()
715                .into()
716        );
717        assert_eq!(*log.get_host().unwrap(), "host".into());
718        assert_eq!(*log.get_source_type().unwrap(), "heroku_logs".into());
719    }
720
721    #[test]
722    fn output_schema_definition_vector_namespace() {
723        let config = LogplexConfig {
724            log_namespace: Some(true),
725            ..Default::default()
726        };
727
728        let definitions = config
729            .outputs(LogNamespace::Vector)
730            .remove(0)
731            .schema_definition(true);
732
733        let expected_definition =
734            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
735                .with_meaning(OwnedTargetPath::event_root(), "message")
736                .with_metadata_field(
737                    &owned_value_path!("vector", "source_type"),
738                    Kind::bytes(),
739                    None,
740                )
741                .with_metadata_field(
742                    &owned_value_path!("vector", "ingest_timestamp"),
743                    Kind::timestamp(),
744                    None,
745                )
746                .with_metadata_field(
747                    &owned_value_path!(LogplexConfig::NAME, "timestamp"),
748                    Kind::timestamp().or_undefined(),
749                    Some("timestamp"),
750                )
751                .with_metadata_field(
752                    &owned_value_path!(LogplexConfig::NAME, "host"),
753                    Kind::bytes(),
754                    Some("host"),
755                )
756                .with_metadata_field(
757                    &owned_value_path!(LogplexConfig::NAME, "app_name"),
758                    Kind::bytes(),
759                    Some("service"),
760                )
761                .with_metadata_field(
762                    &owned_value_path!(LogplexConfig::NAME, "proc_id"),
763                    Kind::bytes(),
764                    None,
765                )
766                .with_metadata_field(
767                    &owned_value_path!(LogplexConfig::NAME, "query_parameters"),
768                    Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
769                    None,
770                );
771
772        assert_eq!(definitions, Some(expected_definition))
773    }
774
775    #[test]
776    fn output_schema_definition_legacy_namespace() {
777        let config = LogplexConfig::default();
778
779        let definitions = config
780            .outputs(LogNamespace::Legacy)
781            .remove(0)
782            .schema_definition(true);
783
784        let expected_definition = Definition::new_with_default_metadata(
785            Kind::object(Collection::empty()),
786            [LogNamespace::Legacy],
787        )
788        .with_event_field(
789            &owned_value_path!("message"),
790            Kind::bytes(),
791            Some("message"),
792        )
793        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
794        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
795        .with_event_field(&owned_value_path!("host"), Kind::bytes(), Some("host"))
796        .with_event_field(
797            &owned_value_path!("app_name"),
798            Kind::bytes(),
799            Some("service"),
800        )
801        .with_event_field(&owned_value_path!("proc_id"), Kind::bytes(), None)
802        .unknown_fields(Kind::bytes());
803
804        assert_eq!(definitions, Some(expected_definition))
805    }
806}