vector/sources/
internal_logs.rs

1use chrono::Utc;
2use futures::{stream, StreamExt};
3use vector_lib::codecs::BytesDeserializerConfig;
4use vector_lib::config::log_schema;
5use vector_lib::configurable::configurable_component;
6use vector_lib::lookup::lookup_v2::OptionalValuePath;
7use vector_lib::lookup::{owned_value_path, path, OwnedValuePath};
8use vector_lib::{
9    config::{LegacyKey, LogNamespace},
10    schema::Definition,
11};
12use vrl::value::Kind;
13
14use crate::{
15    config::{DataType, SourceConfig, SourceContext, SourceOutput},
16    event::{EstimatedJsonEncodedSizeOf, Event},
17    internal_events::{InternalLogsBytesReceived, InternalLogsEventsReceived, StreamClosedError},
18    shutdown::ShutdownSignal,
19    trace::TraceSubscription,
20    SourceSender,
21};
22
23/// Configuration for the `internal_logs` source.
24#[configurable_component(source(
25    "internal_logs",
26    "Expose internal log messages emitted by the running Vector instance."
27))]
28#[derive(Clone, Debug)]
29#[serde(deny_unknown_fields)]
30pub struct InternalLogsConfig {
31    /// Overrides the name of the log field used to add the current hostname to each event.
32    ///
33    /// By default, the [global `log_schema.host_key` option][global_host_key] is used.
34    ///
35    /// Set to `""` to suppress this key.
36    ///
37    /// [global_host_key]: https://vector.dev/docs/reference/configuration/global-options/#log_schema.host_key
38    host_key: Option<OptionalValuePath>,
39
40    /// Overrides the name of the log field used to add the current process ID to each event.
41    ///
42    /// By default, `"pid"` is used.
43    ///
44    /// Set to `""` to suppress this key.
45    #[serde(default = "default_pid_key")]
46    pid_key: OptionalValuePath,
47
48    /// The namespace to use for logs. This overrides the global setting.
49    #[configurable(metadata(docs::hidden))]
50    #[serde(default)]
51    log_namespace: Option<bool>,
52}
53
54fn default_pid_key() -> OptionalValuePath {
55    OptionalValuePath::from(owned_value_path!("pid"))
56}
57
58impl_generate_config_from_default!(InternalLogsConfig);
59
60impl Default for InternalLogsConfig {
61    fn default() -> InternalLogsConfig {
62        InternalLogsConfig {
63            host_key: None,
64            pid_key: default_pid_key(),
65            log_namespace: None,
66        }
67    }
68}
69
70impl InternalLogsConfig {
71    /// Generates the `schema::Definition` for this component.
72    fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
73        let host_key = self
74            .host_key
75            .clone()
76            .unwrap_or(log_schema().host_key().cloned().into())
77            .path
78            .map(LegacyKey::Overwrite);
79        let pid_key = self.pid_key.clone().path.map(LegacyKey::Overwrite);
80
81        // There is a global and per-source `log_namespace` config.
82        // The source config overrides the global setting and is merged here.
83        BytesDeserializerConfig
84            .schema_definition(log_namespace)
85            .with_standard_vector_source_metadata()
86            .with_source_metadata(
87                InternalLogsConfig::NAME,
88                host_key,
89                &owned_value_path!("host"),
90                Kind::bytes().or_undefined(),
91                Some("host"),
92            )
93            .with_source_metadata(
94                InternalLogsConfig::NAME,
95                pid_key,
96                &owned_value_path!("pid"),
97                Kind::integer(),
98                None,
99            )
100    }
101}
102
103#[async_trait::async_trait]
104#[typetag::serde(name = "internal_logs")]
105impl SourceConfig for InternalLogsConfig {
106    async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
107        let host_key = self
108            .host_key
109            .clone()
110            .unwrap_or(log_schema().host_key().cloned().into())
111            .path;
112        let pid_key = self.pid_key.clone().path;
113
114        let subscription = TraceSubscription::subscribe();
115
116        let log_namespace = cx.log_namespace(self.log_namespace);
117
118        Ok(Box::pin(run(
119            host_key,
120            pid_key,
121            subscription,
122            cx.out,
123            cx.shutdown,
124            log_namespace,
125        )))
126    }
127
128    fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
129        let schema_definition =
130            self.schema_definition(global_log_namespace.merge(self.log_namespace));
131
132        vec![SourceOutput::new_maybe_logs(
133            DataType::Log,
134            schema_definition,
135        )]
136    }
137
138    fn can_acknowledge(&self) -> bool {
139        false
140    }
141}
142
143async fn run(
144    host_key: Option<OwnedValuePath>,
145    pid_key: Option<OwnedValuePath>,
146    mut subscription: TraceSubscription,
147    mut out: SourceSender,
148    shutdown: ShutdownSignal,
149    log_namespace: LogNamespace,
150) -> Result<(), ()> {
151    let hostname = crate::get_hostname();
152    let pid = std::process::id();
153
154    // Chain any log events that were captured during early buffering to the front,
155    // and then continue with the normal stream of internal log events.
156    let buffered_events = subscription.buffered_events().await;
157    let mut rx = stream::iter(buffered_events.into_iter().flatten())
158        .chain(subscription.into_stream())
159        .take_until(shutdown);
160
161    // Note: This loop, or anything called within it, MUST NOT generate
162    // any logs that don't break the loop, as that could cause an
163    // infinite loop since it receives all such logs.
164    while let Some(mut log) = rx.next().await {
165        // TODO: Should this actually be in memory size?
166        let byte_size = log.estimated_json_encoded_size_of().get();
167        let json_byte_size = log.estimated_json_encoded_size_of();
168        // This event doesn't emit any log
169        emit!(InternalLogsBytesReceived { byte_size });
170        emit!(InternalLogsEventsReceived {
171            count: 1,
172            byte_size: json_byte_size,
173        });
174
175        if let Ok(hostname) = &hostname {
176            let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
177            log_namespace.insert_source_metadata(
178                InternalLogsConfig::NAME,
179                &mut log,
180                legacy_host_key,
181                path!("host"),
182                hostname.to_owned(),
183            );
184        }
185
186        let legacy_pid_key = pid_key.as_ref().map(LegacyKey::Overwrite);
187        log_namespace.insert_source_metadata(
188            InternalLogsConfig::NAME,
189            &mut log,
190            legacy_pid_key,
191            path!("pid"),
192            pid,
193        );
194
195        log_namespace.insert_standard_vector_source_metadata(
196            &mut log,
197            InternalLogsConfig::NAME,
198            Utc::now(),
199        );
200
201        if (out.send_event(Event::from(log)).await).is_err() {
202            // this wont trigger any infinite loop considering it stops the component
203            emit!(StreamClosedError { count: 1 });
204            return Err(());
205        }
206    }
207
208    Ok(())
209}
210
211#[cfg(test)]
212mod tests {
213    use futures::Stream;
214    use tokio::time::{sleep, Duration};
215    use vector_lib::event::Value;
216    use vector_lib::lookup::OwnedTargetPath;
217    use vrl::value::kind::Collection;
218
219    use super::*;
220    use crate::{
221        event::Event,
222        test_util::{
223            collect_ready,
224            components::{assert_source_compliance, SOURCE_TAGS},
225        },
226        trace,
227    };
228
229    #[test]
230    fn generates_config() {
231        crate::test_util::test_generate_config::<InternalLogsConfig>();
232    }
233
234    // This test is fairly overloaded with different cases.
235    //
236    // Unfortunately, this can't be easily split out into separate test
237    // cases because `consume_early_buffer` (called within the
238    // `start_source` helper) panics when called more than once.
239    #[tokio::test]
240    async fn receives_logs() {
241        trace::init(false, false, "debug", 10);
242        trace::reset_early_buffer();
243
244        assert_source_compliance(&SOURCE_TAGS, run_test()).await;
245    }
246
247    async fn run_test() {
248        let test_id: u8 = rand::random();
249        let start = chrono::Utc::now();
250
251        error!(message = "Before source started without span.", %test_id);
252
253        let span = error_span!(
254            "source",
255            component_kind = "source",
256            component_id = "foo",
257            component_type = "internal_logs",
258        );
259        let _enter = span.enter();
260
261        error!(message = "Before source started.", %test_id);
262
263        let rx = start_source().await;
264
265        error!(message = "After source started.", %test_id);
266
267        {
268            let nested_span = error_span!(
269                "nested span",
270                component_kind = "bar",
271                component_new_field = "baz",
272                component_numerical_field = 1,
273                ignored_field = "foobarbaz",
274            );
275            let _enter = nested_span.enter();
276            error!(message = "In a nested span.", %test_id);
277        }
278
279        sleep(Duration::from_millis(1)).await;
280        let mut events = collect_ready(rx).await;
281        let test_id = Value::from(test_id.to_string());
282        events.retain(|event| event.as_log().get("test_id") == Some(&test_id));
283
284        let end = chrono::Utc::now();
285
286        assert_eq!(events.len(), 4);
287
288        assert_eq!(
289            events[0].as_log()["message"],
290            "Before source started without span.".into()
291        );
292        assert_eq!(
293            events[1].as_log()["message"],
294            "Before source started.".into()
295        );
296        assert_eq!(
297            events[2].as_log()["message"],
298            "After source started.".into()
299        );
300        assert_eq!(events[3].as_log()["message"], "In a nested span.".into());
301
302        for (i, event) in events.iter().enumerate() {
303            let log = event.as_log();
304            let timestamp = *log["timestamp"]
305                .as_timestamp()
306                .expect("timestamp isn't a timestamp");
307            assert!(timestamp >= start);
308            assert!(timestamp <= end);
309            assert_eq!(log["metadata.kind"], "event".into());
310            assert_eq!(log["metadata.level"], "ERROR".into());
311            // The first log event occurs outside our custom span
312            if i == 0 {
313                assert!(log.get("vector.component_id").is_none());
314                assert!(log.get("vector.component_kind").is_none());
315                assert!(log.get("vector.component_type").is_none());
316            } else if i < 3 {
317                assert_eq!(log["vector.component_id"], "foo".into());
318                assert_eq!(log["vector.component_kind"], "source".into());
319                assert_eq!(log["vector.component_type"], "internal_logs".into());
320            } else {
321                // The last event occurs in a nested span. Here, we expect
322                // parent fields to be preserved (unless overwritten), new
323                // fields to be added, and filtered fields to not exist.
324                assert_eq!(log["vector.component_id"], "foo".into());
325                assert_eq!(log["vector.component_kind"], "bar".into());
326                assert_eq!(log["vector.component_type"], "internal_logs".into());
327                assert_eq!(log["vector.component_new_field"], "baz".into());
328                assert_eq!(log["vector.component_numerical_field"], 1.into());
329                assert!(log.get("vector.ignored_field").is_none());
330            }
331        }
332    }
333
334    async fn start_source() -> impl Stream<Item = Event> + Unpin {
335        let (tx, rx) = SourceSender::new_test();
336
337        let source = InternalLogsConfig::default()
338            .build(SourceContext::new_test(tx, None))
339            .await
340            .unwrap();
341        tokio::spawn(source);
342        sleep(Duration::from_millis(1)).await;
343        trace::stop_early_buffering();
344        rx
345    }
346
347    #[test]
348    fn output_schema_definition_vector_namespace() {
349        let config = InternalLogsConfig::default();
350
351        let definitions = config
352            .outputs(LogNamespace::Vector)
353            .remove(0)
354            .schema_definition(true);
355
356        let expected_definition =
357            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
358                .with_meaning(OwnedTargetPath::event_root(), "message")
359                .with_metadata_field(
360                    &owned_value_path!("vector", "source_type"),
361                    Kind::bytes(),
362                    None,
363                )
364                .with_metadata_field(
365                    &owned_value_path!(InternalLogsConfig::NAME, "pid"),
366                    Kind::integer(),
367                    None,
368                )
369                .with_metadata_field(
370                    &owned_value_path!("vector", "ingest_timestamp"),
371                    Kind::timestamp(),
372                    None,
373                )
374                .with_metadata_field(
375                    &owned_value_path!(InternalLogsConfig::NAME, "host"),
376                    Kind::bytes().or_undefined(),
377                    Some("host"),
378                );
379
380        assert_eq!(definitions, Some(expected_definition))
381    }
382
383    #[test]
384    fn output_schema_definition_legacy_namespace() {
385        let mut config = InternalLogsConfig::default();
386
387        let pid_key = "pid_a_pid_a_pid_pid_pid";
388
389        config.pid_key = OptionalValuePath::from(owned_value_path!(pid_key));
390
391        let definitions = config
392            .outputs(LogNamespace::Legacy)
393            .remove(0)
394            .schema_definition(true);
395
396        let expected_definition = Definition::new_with_default_metadata(
397            Kind::object(Collection::empty()),
398            [LogNamespace::Legacy],
399        )
400        .with_event_field(
401            &owned_value_path!("message"),
402            Kind::bytes(),
403            Some("message"),
404        )
405        .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
406        .with_event_field(&owned_value_path!(pid_key), Kind::integer(), None)
407        .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
408        .with_event_field(
409            &owned_value_path!("host"),
410            Kind::bytes().or_undefined(),
411            Some("host"),
412        );
413
414        assert_eq!(definitions, Some(expected_definition))
415    }
416}