1use chrono::Utc;
2use futures::{StreamExt, stream};
3use vector_lib::{
4    codecs::BytesDeserializerConfig,
5    config::{LegacyKey, LogNamespace, log_schema},
6    configurable::configurable_component,
7    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
8    schema::Definition,
9};
10use vrl::value::Kind;
11
12use crate::{
13    SourceSender,
14    config::{DataType, SourceConfig, SourceContext, SourceOutput},
15    event::{EstimatedJsonEncodedSizeOf, Event},
16    internal_events::{InternalLogsBytesReceived, InternalLogsEventsReceived, StreamClosedError},
17    shutdown::ShutdownSignal,
18    trace::TraceSubscription,
19};
20
21#[configurable_component(source(
23    "internal_logs",
24    "Expose internal log messages emitted by the running Vector instance."
25))]
26#[derive(Clone, Debug)]
27#[serde(deny_unknown_fields)]
28pub struct InternalLogsConfig {
29    host_key: Option<OptionalValuePath>,
37
38    #[serde(default = "default_pid_key")]
44    pid_key: OptionalValuePath,
45
46    #[configurable(metadata(docs::hidden))]
48    #[serde(default)]
49    log_namespace: Option<bool>,
50}
51
52fn default_pid_key() -> OptionalValuePath {
53    OptionalValuePath::from(owned_value_path!("pid"))
54}
55
56impl_generate_config_from_default!(InternalLogsConfig);
57
58impl Default for InternalLogsConfig {
59    fn default() -> InternalLogsConfig {
60        InternalLogsConfig {
61            host_key: None,
62            pid_key: default_pid_key(),
63            log_namespace: None,
64        }
65    }
66}
67
68impl InternalLogsConfig {
69    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
71        let host_key = self
72            .host_key
73            .clone()
74            .unwrap_or(log_schema().host_key().cloned().into())
75            .path
76            .map(LegacyKey::Overwrite);
77        let pid_key = self.pid_key.clone().path.map(LegacyKey::Overwrite);
78
79        BytesDeserializerConfig
82            .schema_definition(log_namespace)
83            .with_standard_vector_source_metadata()
84            .with_source_metadata(
85                InternalLogsConfig::NAME,
86                host_key,
87                &owned_value_path!("host"),
88                Kind::bytes().or_undefined(),
89                Some("host"),
90            )
91            .with_source_metadata(
92                InternalLogsConfig::NAME,
93                pid_key,
94                &owned_value_path!("pid"),
95                Kind::integer(),
96                None,
97            )
98    }
99}
100
101#[async_trait::async_trait]
102#[typetag::serde(name = "internal_logs")]
103impl SourceConfig for InternalLogsConfig {
104    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
105        let host_key = self
106            .host_key
107            .clone()
108            .unwrap_or(log_schema().host_key().cloned().into())
109            .path;
110        let pid_key = self.pid_key.clone().path;
111
112        let subscription = TraceSubscription::subscribe();
113
114        let log_namespace = cx.log_namespace(self.log_namespace);
115
116        Ok(Box::pin(run(
117            host_key,
118            pid_key,
119            subscription,
120            cx.out,
121            cx.shutdown,
122            log_namespace,
123        )))
124    }
125
126    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
127        let schema_definition =
128            self.schema_definition(global_log_namespace.merge(self.log_namespace));
129
130        vec![SourceOutput::new_maybe_logs(
131            DataType::Log,
132            schema_definition,
133        )]
134    }
135
136    fn can_acknowledge(&self) -> bool {
137        false
138    }
139}
140
141async fn run(
142    host_key: Option<OwnedValuePath>,
143    pid_key: Option<OwnedValuePath>,
144    mut subscription: TraceSubscription,
145    mut out: SourceSender,
146    shutdown: ShutdownSignal,
147    log_namespace: LogNamespace,
148) -> Result<(), ()> {
149    let hostname = crate::get_hostname();
150    let pid = std::process::id();
151
152    let buffered_events = subscription.buffered_events().await;
155    let mut rx = stream::iter(buffered_events.into_iter().flatten())
156        .chain(subscription.into_stream())
157        .take_until(shutdown);
158
159    while let Some(mut log) = rx.next().await {
163        let byte_size = log.estimated_json_encoded_size_of().get();
165        let json_byte_size = log.estimated_json_encoded_size_of();
166        emit!(InternalLogsBytesReceived { byte_size });
168        emit!(InternalLogsEventsReceived {
169            count: 1,
170            byte_size: json_byte_size,
171        });
172
173        if let Ok(hostname) = &hostname {
174            let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
175            log_namespace.insert_source_metadata(
176                InternalLogsConfig::NAME,
177                &mut log,
178                legacy_host_key,
179                path!("host"),
180                hostname.to_owned(),
181            );
182        }
183
184        let legacy_pid_key = pid_key.as_ref().map(LegacyKey::Overwrite);
185        log_namespace.insert_source_metadata(
186            InternalLogsConfig::NAME,
187            &mut log,
188            legacy_pid_key,
189            path!("pid"),
190            pid,
191        );
192
193        log_namespace.insert_standard_vector_source_metadata(
194            &mut log,
195            InternalLogsConfig::NAME,
196            Utc::now(),
197        );
198
199        if (out.send_event(Event::from(log)).await).is_err() {
200            emit!(StreamClosedError { count: 1 });
202            return Err(());
203        }
204    }
205
206    Ok(())
207}
208
209#[cfg(test)]
210mod tests {
211    use futures::Stream;
212    use tokio::time::{Duration, sleep};
213    use vector_lib::{event::Value, lookup::OwnedTargetPath};
214    use vrl::value::kind::Collection;
215
216    use super::*;
217    use crate::{
218        event::Event,
219        test_util::{
220            collect_ready,
221            components::{SOURCE_TAGS, assert_source_compliance},
222        },
223        trace,
224    };
225
226    #[test]
227    fn generates_config() {
228        crate::test_util::test_generate_config::<InternalLogsConfig>();
229    }
230
231    #[tokio::test]
237    async fn receives_logs() {
238        trace::init(false, false, "debug", 10);
239        trace::reset_early_buffer();
240
241        assert_source_compliance(&SOURCE_TAGS, run_test()).await;
242    }
243
244    async fn run_test() {
245        let test_id: u8 = rand::random();
246        let start = chrono::Utc::now();
247
248        error!(message = "Before source started without span.", %test_id);
249
250        let span = error_span!(
251            "source",
252            component_kind = "source",
253            component_id = "foo",
254            component_type = "internal_logs",
255        );
256        let _enter = span.enter();
257
258        error!(message = "Before source started.", %test_id);
259
260        let rx = start_source().await;
261
262        error!(message = "After source started.", %test_id);
263
264        {
265            let nested_span = error_span!(
266                "nested span",
267                component_kind = "bar",
268                component_new_field = "baz",
269                component_numerical_field = 1,
270                ignored_field = "foobarbaz",
271            );
272            let _enter = nested_span.enter();
273            error!(message = "In a nested span.", %test_id);
274        }
275
276        sleep(Duration::from_millis(1)).await;
277        let mut events = collect_ready(rx).await;
278        let test_id = Value::from(test_id.to_string());
279        events.retain(|event| event.as_log().get("test_id") == Some(&test_id));
280
281        let end = chrono::Utc::now();
282
283        assert_eq!(events.len(), 4);
284
285        assert_eq!(
286            events[0].as_log()["message"],
287            "Before source started without span.".into()
288        );
289        assert_eq!(
290            events[1].as_log()["message"],
291            "Before source started.".into()
292        );
293        assert_eq!(
294            events[2].as_log()["message"],
295            "After source started.".into()
296        );
297        assert_eq!(events[3].as_log()["message"], "In a nested span.".into());
298
299        for (i, event) in events.iter().enumerate() {
300            let log = event.as_log();
301            let timestamp = *log["timestamp"]
302                .as_timestamp()
303                .expect("timestamp isn't a timestamp");
304            assert!(timestamp >= start);
305            assert!(timestamp <= end);
306            assert_eq!(log["metadata.kind"], "event".into());
307            assert_eq!(log["metadata.level"], "ERROR".into());
308            if i == 0 {
310                assert!(log.get("vector.component_id").is_none());
311                assert!(log.get("vector.component_kind").is_none());
312                assert!(log.get("vector.component_type").is_none());
313            } else if i < 3 {
314                assert_eq!(log["vector.component_id"], "foo".into());
315                assert_eq!(log["vector.component_kind"], "source".into());
316                assert_eq!(log["vector.component_type"], "internal_logs".into());
317            } else {
318                assert_eq!(log["vector.component_id"], "foo".into());
322                assert_eq!(log["vector.component_kind"], "bar".into());
323                assert_eq!(log["vector.component_type"], "internal_logs".into());
324                assert_eq!(log["vector.component_new_field"], "baz".into());
325                assert_eq!(log["vector.component_numerical_field"], 1.into());
326                assert!(log.get("vector.ignored_field").is_none());
327            }
328        }
329    }
330
331    async fn start_source() -> impl Stream<Item = Event> + Unpin {
332        let (tx, rx) = SourceSender::new_test();
333
334        let source = InternalLogsConfig::default()
335            .build(SourceContext::new_test(tx, None))
336            .await
337            .unwrap();
338        tokio::spawn(source);
339        sleep(Duration::from_millis(1)).await;
340        trace::stop_early_buffering();
341        rx
342    }
343
344    #[test]
345    fn output_schema_definition_vector_namespace() {
346        let config = InternalLogsConfig::default();
347
348        let definitions = config
349            .outputs(LogNamespace::Vector)
350            .remove(0)
351            .schema_definition(true);
352
353        let expected_definition =
354            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
355                .with_meaning(OwnedTargetPath::event_root(), "message")
356                .with_metadata_field(
357                    &owned_value_path!("vector", "source_type"),
358                    Kind::bytes(),
359                    None,
360                )
361                .with_metadata_field(
362                    &owned_value_path!(InternalLogsConfig::NAME, "pid"),
363                    Kind::integer(),
364                    None,
365                )
366                .with_metadata_field(
367                    &owned_value_path!("vector", "ingest_timestamp"),
368                    Kind::timestamp(),
369                    None,
370                )
371                .with_metadata_field(
372                    &owned_value_path!(InternalLogsConfig::NAME, "host"),
373                    Kind::bytes().or_undefined(),
374                    Some("host"),
375                );
376
377        assert_eq!(definitions, Some(expected_definition))
378    }
379
380    #[test]
381    fn output_schema_definition_legacy_namespace() {
382        let mut config = InternalLogsConfig::default();
383
384        let pid_key = "pid_a_pid_a_pid_pid_pid";
385
386        config.pid_key = OptionalValuePath::from(owned_value_path!(pid_key));
387
388        let definitions = config
389            .outputs(LogNamespace::Legacy)
390            .remove(0)
391            .schema_definition(true);
392
393        let expected_definition = Definition::new_with_default_metadata(
394            Kind::object(Collection::empty()),
395            [LogNamespace::Legacy],
396        )
397        .with_event_field(
398            &owned_value_path!("message"),
399            Kind::bytes(),
400            Some("message"),
401        )
402        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
403        .with_event_field(&owned_value_path!(pid_key), Kind::integer(), None)
404        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
405        .with_event_field(
406            &owned_value_path!("host"),
407            Kind::bytes().or_undefined(),
408            Some("host"),
409        );
410
411        assert_eq!(definitions, Some(expected_definition))
412    }
413}