vector/sources/
internal_logs.rs

1use chrono::Utc;
2use futures::{StreamExt, stream};
3use vector_lib::{
4    codecs::BytesDeserializerConfig,
5    config::{LegacyKey, LogNamespace, log_schema},
6    configurable::configurable_component,
7    lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
8    schema::Definition,
9};
10use vrl::value::Kind;
11
12use crate::{
13    SourceSender,
14    config::{DataType, SourceConfig, SourceContext, SourceOutput},
15    event::{EstimatedJsonEncodedSizeOf, Event},
16    internal_events::{InternalLogsBytesReceived, InternalLogsEventsReceived, StreamClosedError},
17    shutdown::ShutdownSignal,
18    trace::TraceSubscription,
19};
20
21/// Configuration for the `internal_logs` source.
22#[configurable_component(source(
23    "internal_logs",
24    "Expose internal log messages emitted by the running Vector instance."
25))]
26#[derive(Clone, Debug)]
27#[serde(deny_unknown_fields)]
28pub struct InternalLogsConfig {
29    /// Overrides the name of the log field used to add the current hostname to each event.
30    ///
31    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
32    ///
33    /// Set to `""` to suppress this key.
34    ///
35    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
36    host_key: Option<OptionalValuePath>,
37
38    /// Overrides the name of the log field used to add the current process ID to each event.
39    ///
40    /// By default, `"pid"` is used.
41    ///
42    /// Set to `""` to suppress this key.
43    #[serde(default = "default_pid_key")]
44    pid_key: OptionalValuePath,
45
46    /// The namespace to use for logs. This overrides the global setting.
47    #[configurable(metadata(docs::hidden))]
48    #[serde(default)]
49    log_namespace: Option<bool>,
50}
51
52fn default_pid_key() -> OptionalValuePath {
53    OptionalValuePath::from(owned_value_path!("pid"))
54}
55
56impl_generate_config_from_default!(InternalLogsConfig);
57
58impl Default for InternalLogsConfig {
59    fn default() -> InternalLogsConfig {
60        InternalLogsConfig {
61            host_key: None,
62            pid_key: default_pid_key(),
63            log_namespace: None,
64        }
65    }
66}
67
68impl InternalLogsConfig {
69    /// Generates the `schema::Definition` for this component.
70    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
71        let host_key = self
72            .host_key
73            .clone()
74            .unwrap_or(log_schema().host_key().cloned().into())
75            .path
76            .map(LegacyKey::Overwrite);
77        let pid_key = self.pid_key.clone().path.map(LegacyKey::Overwrite);
78
79        // There is a global and per-source `log_namespace` config.
80        // The source config overrides the global setting and is merged here.
81        BytesDeserializerConfig
82            .schema_definition(log_namespace)
83            .with_standard_vector_source_metadata()
84            .with_source_metadata(
85                InternalLogsConfig::NAME,
86                host_key,
87                &owned_value_path!("host"),
88                Kind::bytes().or_undefined(),
89                Some("host"),
90            )
91            .with_source_metadata(
92                InternalLogsConfig::NAME,
93                pid_key,
94                &owned_value_path!("pid"),
95                Kind::integer(),
96                None,
97            )
98    }
99}
100
101#[async_trait::async_trait]
102#[typetag::serde(name = "internal_logs")]
103impl SourceConfig for InternalLogsConfig {
104    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
105        let host_key = self
106            .host_key
107            .clone()
108            .unwrap_or(log_schema().host_key().cloned().into())
109            .path;
110        let pid_key = self.pid_key.clone().path;
111
112        let subscription = TraceSubscription::subscribe();
113
114        let log_namespace = cx.log_namespace(self.log_namespace);
115
116        Ok(Box::pin(run(
117            host_key,
118            pid_key,
119            subscription,
120            cx.out,
121            cx.shutdown,
122            log_namespace,
123        )))
124    }
125
126    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
127        let schema_definition =
128            self.schema_definition(global_log_namespace.merge(self.log_namespace));
129
130        vec![SourceOutput::new_maybe_logs(
131            DataType::Log,
132            schema_definition,
133        )]
134    }
135
136    fn can_acknowledge(&self) -> bool {
137        false
138    }
139}
140
141async fn run(
142    host_key: Option<OwnedValuePath>,
143    pid_key: Option<OwnedValuePath>,
144    mut subscription: TraceSubscription,
145    mut out: SourceSender,
146    shutdown: ShutdownSignal,
147    log_namespace: LogNamespace,
148) -> Result<(), ()> {
149    let hostname = crate::get_hostname();
150    let pid = std::process::id();
151
152    // Chain any log events that were captured during early buffering to the front,
153    // and then continue with the normal stream of internal log events.
154    let buffered_events = subscription.buffered_events().await;
155    let mut rx = stream::iter(buffered_events.into_iter().flatten())
156        .chain(subscription.into_stream())
157        .take_until(shutdown);
158
159    // Note: This loop, or anything called within it, MUST NOT generate
160    // any logs that don't break the loop, as that could cause an
161    // infinite loop since it receives all such logs.
162    while let Some(mut log) = rx.next().await {
163        // TODO: Should this actually be in memory size?
164        let byte_size = log.estimated_json_encoded_size_of().get();
165        let json_byte_size = log.estimated_json_encoded_size_of();
166        // This event doesn't emit any log
167        emit!(InternalLogsBytesReceived { byte_size });
168        emit!(InternalLogsEventsReceived {
169            count: 1,
170            byte_size: json_byte_size,
171        });
172
173        if let Ok(hostname) = &hostname {
174            let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
175            log_namespace.insert_source_metadata(
176                InternalLogsConfig::NAME,
177                &mut log,
178                legacy_host_key,
179                path!("host"),
180                hostname.to_owned(),
181            );
182        }
183
184        let legacy_pid_key = pid_key.as_ref().map(LegacyKey::Overwrite);
185        log_namespace.insert_source_metadata(
186            InternalLogsConfig::NAME,
187            &mut log,
188            legacy_pid_key,
189            path!("pid"),
190            pid,
191        );
192
193        log_namespace.insert_standard_vector_source_metadata(
194            &mut log,
195            InternalLogsConfig::NAME,
196            Utc::now(),
197        );
198
199        if (out.send_event(Event::from(log)).await).is_err() {
200            // this wont trigger any infinite loop considering it stops the component
201            emit!(StreamClosedError { count: 1 });
202            return Err(());
203        }
204    }
205
206    Ok(())
207}
208
209#[cfg(test)]
210mod tests {
211    use futures::Stream;
212    use tokio::time::{Duration, sleep};
213    use vector_lib::{event::Value, lookup::OwnedTargetPath};
214    use vrl::value::kind::Collection;
215
216    use serial_test::serial;
217
218    use super::*;
219    use crate::{
220        event::Event,
221        test_util::{
222            collect_ready,
223            components::{SOURCE_TAGS, assert_source_compliance},
224        },
225        trace,
226    };
227
228    #[test]
229    fn generates_config() {
230        crate::test_util::test_generate_config::<InternalLogsConfig>();
231    }
232
233    // This test is fairly overloaded with different cases.
234    //
235    // Unfortunately, this can't be easily split out into separate test
236    // cases because `consume_early_buffer` (called within the
237    // `start_source` helper) panics when called more than once.
238    #[tokio::test]
239    #[serial]
240    async fn receives_logs() {
241        trace::init(false, false, "debug", 10);
242        trace::reset_early_buffer();
243
244        assert_source_compliance(&SOURCE_TAGS, run_test()).await;
245    }
246
247    async fn run_test() {
248        let test_id: u8 = rand::random();
249        let start = chrono::Utc::now();
250
251        error!(message = "Before source started without span.", %test_id);
252
253        let span = error_span!(
254            "source",
255            component_kind = "source",
256            component_id = "foo",
257            component_type = "internal_logs",
258        );
259        let _enter = span.enter();
260
261        error!(message = "Before source started.", %test_id);
262
263        let rx = start_source().await;
264
265        error!(message = "After source started.", %test_id);
266
267        {
268            let nested_span = error_span!(
269                "nested span",
270                component_kind = "bar",
271                component_new_field = "baz",
272                component_numerical_field = 1,
273                ignored_field = "foobarbaz",
274            );
275            let _enter = nested_span.enter();
276            error!(message = "In a nested span.", %test_id);
277        }
278
279        sleep(Duration::from_millis(1)).await;
280        let mut events = collect_ready(rx).await;
281        let test_id = Value::from(test_id.to_string());
282        events.retain(|event| event.as_log().get("test_id") == Some(&test_id));
283
284        let end = chrono::Utc::now();
285
286        assert_eq!(events.len(), 4);
287
288        assert_eq!(
289            events[0].as_log()["message"],
290            "Before source started without span.".into()
291        );
292        assert_eq!(
293            events[1].as_log()["message"],
294            "Before source started.".into()
295        );
296        assert_eq!(
297            events[2].as_log()["message"],
298            "After source started.".into()
299        );
300        assert_eq!(events[3].as_log()["message"], "In a nested span.".into());
301
302        for (i, event) in events.iter().enumerate() {
303            let log = event.as_log();
304            let timestamp = *log["timestamp"]
305                .as_timestamp()
306                .expect("timestamp isn't a timestamp");
307            assert!(timestamp >= start);
308            assert!(timestamp <= end);
309            assert_eq!(log["metadata.kind"], "event".into());
310            assert_eq!(log["metadata.level"], "ERROR".into());
311            // The first log event occurs outside our custom span
312            if i == 0 {
313                assert!(log.get("vector.component_id").is_none());
314                assert!(log.get("vector.component_kind").is_none());
315                assert!(log.get("vector.component_type").is_none());
316            } else if i < 3 {
317                assert_eq!(log["vector.component_id"], "foo".into());
318                assert_eq!(log["vector.component_kind"], "source".into());
319                assert_eq!(log["vector.component_type"], "internal_logs".into());
320            } else {
321                // The last event occurs in a nested span. Here, we expect
322                // parent fields to be preserved (unless overwritten), new
323                // fields to be added, and filtered fields to not exist.
324                assert_eq!(log["vector.component_id"], "foo".into());
325                assert_eq!(log["vector.component_kind"], "bar".into());
326                assert_eq!(log["vector.component_type"], "internal_logs".into());
327                assert_eq!(log["vector.component_new_field"], "baz".into());
328                assert_eq!(log["vector.component_numerical_field"], 1.into());
329                assert!(log.get("vector.ignored_field").is_none());
330            }
331        }
332    }
333
334    async fn start_source() -> impl Stream<Item = Event> + Unpin {
335        let (tx, rx) = SourceSender::new_test();
336
337        let source = InternalLogsConfig::default()
338            .build(SourceContext::new_test(tx, None))
339            .await
340            .unwrap();
341        tokio::spawn(source);
342        sleep(Duration::from_millis(1)).await;
343        trace::stop_early_buffering();
344        rx
345    }
346
347    // NOTE: This test requires #[serial] because it directly interacts with global tracing state.
348    // This is a pre-existing limitation around tracing initialization in tests.
349    #[tokio::test]
350    #[serial]
351    async fn repeated_logs_are_not_rate_limited() {
352        trace::init(false, false, "info", 10);
353        trace::reset_early_buffer();
354
355        let rx = start_source().await;
356
357        // Generate 20 identical log messages with the same component_id
358        for _ in 0..20 {
359            info!(component_id = "test", "Repeated test message.");
360        }
361
362        sleep(Duration::from_millis(50)).await;
363        let events = collect_ready(rx).await;
364
365        // Filter to only our test messages
366        let test_events: Vec<_> = events
367            .iter()
368            .filter(|e| {
369                e.as_log()
370                    .get("message")
371                    .map(|m| m.to_string_lossy() == "Repeated test message.")
372                    .unwrap_or(false)
373            })
374            .collect();
375
376        // We should receive all 20 messages, no rate limiting.
377        assert_eq!(
378            test_events.len(),
379            20,
380            "internal_logs source should capture all repeated messages without rate limiting"
381        );
382    }
383
384    #[test]
385    fn output_schema_definition_vector_namespace() {
386        let config = InternalLogsConfig::default();
387
388        let definitions = config
389            .outputs(LogNamespace::Vector)
390            .remove(0)
391            .schema_definition(true);
392
393        let expected_definition =
394            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
395                .with_meaning(OwnedTargetPath::event_root(), "message")
396                .with_metadata_field(
397                    &owned_value_path!("vector", "source_type"),
398                    Kind::bytes(),
399                    None,
400                )
401                .with_metadata_field(
402                    &owned_value_path!(InternalLogsConfig::NAME, "pid"),
403                    Kind::integer(),
404                    None,
405                )
406                .with_metadata_field(
407                    &owned_value_path!("vector", "ingest_timestamp"),
408                    Kind::timestamp(),
409                    None,
410                )
411                .with_metadata_field(
412                    &owned_value_path!(InternalLogsConfig::NAME, "host"),
413                    Kind::bytes().or_undefined(),
414                    Some("host"),
415                );
416
417        assert_eq!(definitions, Some(expected_definition))
418    }
419
420    #[test]
421    fn output_schema_definition_legacy_namespace() {
422        let mut config = InternalLogsConfig::default();
423
424        let pid_key = "pid_a_pid_a_pid_pid_pid";
425
426        config.pid_key = OptionalValuePath::from(owned_value_path!(pid_key));
427
428        let definitions = config
429            .outputs(LogNamespace::Legacy)
430            .remove(0)
431            .schema_definition(true);
432
433        let expected_definition = Definition::new_with_default_metadata(
434            Kind::object(Collection::empty()),
435            [LogNamespace::Legacy],
436        )
437        .with_event_field(
438            &owned_value_path!("message"),
439            Kind::bytes(),
440            Some("message"),
441        )
442        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
443        .with_event_field(&owned_value_path!(pid_key), Kind::integer(), None)
444        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
445        .with_event_field(
446            &owned_value_path!("host"),
447            Kind::bytes().or_undefined(),
448            Some("host"),
449        );
450
451        assert_eq!(definitions, Some(expected_definition))
452    }
453}