1use chrono::Utc;
2use futures::{StreamExt, stream};
3use vector_lib::{
4 codecs::BytesDeserializerConfig,
5 config::{LegacyKey, LogNamespace, log_schema},
6 configurable::configurable_component,
7 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
8 schema::Definition,
9};
10use vrl::value::Kind;
11
12use crate::{
13 SourceSender,
14 config::{DataType, SourceConfig, SourceContext, SourceOutput},
15 event::{EstimatedJsonEncodedSizeOf, Event},
16 internal_events::{InternalLogsBytesReceived, InternalLogsEventsReceived, StreamClosedError},
17 shutdown::ShutdownSignal,
18 trace::TraceSubscription,
19};
20
21#[configurable_component(source(
23 "internal_logs",
24 "Expose internal log messages emitted by the running Vector instance."
25))]
26#[derive(Clone, Debug)]
27#[serde(deny_unknown_fields)]
28pub struct InternalLogsConfig {
29 host_key: Option<OptionalValuePath>,
37
38 #[serde(default = "default_pid_key")]
44 pid_key: OptionalValuePath,
45
46 #[configurable(metadata(docs::hidden))]
48 #[serde(default)]
49 log_namespace: Option<bool>,
50}
51
52fn default_pid_key() -> OptionalValuePath {
53 OptionalValuePath::from(owned_value_path!("pid"))
54}
55
56impl_generate_config_from_default!(InternalLogsConfig);
57
58impl Default for InternalLogsConfig {
59 fn default() -> InternalLogsConfig {
60 InternalLogsConfig {
61 host_key: None,
62 pid_key: default_pid_key(),
63 log_namespace: None,
64 }
65 }
66}
67
68impl InternalLogsConfig {
69 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
71 let host_key = self
72 .host_key
73 .clone()
74 .unwrap_or(log_schema().host_key().cloned().into())
75 .path
76 .map(LegacyKey::Overwrite);
77 let pid_key = self.pid_key.clone().path.map(LegacyKey::Overwrite);
78
79 BytesDeserializerConfig
82 .schema_definition(log_namespace)
83 .with_standard_vector_source_metadata()
84 .with_source_metadata(
85 InternalLogsConfig::NAME,
86 host_key,
87 &owned_value_path!("host"),
88 Kind::bytes().or_undefined(),
89 Some("host"),
90 )
91 .with_source_metadata(
92 InternalLogsConfig::NAME,
93 pid_key,
94 &owned_value_path!("pid"),
95 Kind::integer(),
96 None,
97 )
98 }
99}
100
101#[async_trait::async_trait]
102#[typetag::serde(name = "internal_logs")]
103impl SourceConfig for InternalLogsConfig {
104 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
105 let host_key = self
106 .host_key
107 .clone()
108 .unwrap_or(log_schema().host_key().cloned().into())
109 .path;
110 let pid_key = self.pid_key.clone().path;
111
112 let subscription = TraceSubscription::subscribe();
113
114 let log_namespace = cx.log_namespace(self.log_namespace);
115
116 Ok(Box::pin(run(
117 host_key,
118 pid_key,
119 subscription,
120 cx.out,
121 cx.shutdown,
122 log_namespace,
123 )))
124 }
125
126 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
127 let schema_definition =
128 self.schema_definition(global_log_namespace.merge(self.log_namespace));
129
130 vec![SourceOutput::new_maybe_logs(
131 DataType::Log,
132 schema_definition,
133 )]
134 }
135
136 fn can_acknowledge(&self) -> bool {
137 false
138 }
139}
140
141async fn run(
142 host_key: Option<OwnedValuePath>,
143 pid_key: Option<OwnedValuePath>,
144 mut subscription: TraceSubscription,
145 mut out: SourceSender,
146 shutdown: ShutdownSignal,
147 log_namespace: LogNamespace,
148) -> Result<(), ()> {
149 let hostname = crate::get_hostname();
150 let pid = std::process::id();
151
152 let buffered_events = subscription.buffered_events().await;
155 let mut rx = stream::iter(buffered_events.into_iter().flatten())
156 .chain(subscription.into_stream())
157 .take_until(shutdown);
158
159 while let Some(mut log) = rx.next().await {
163 let byte_size = log.estimated_json_encoded_size_of().get();
165 let json_byte_size = log.estimated_json_encoded_size_of();
166 emit!(InternalLogsBytesReceived { byte_size });
168 emit!(InternalLogsEventsReceived {
169 count: 1,
170 byte_size: json_byte_size,
171 });
172
173 if let Ok(hostname) = &hostname {
174 let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
175 log_namespace.insert_source_metadata(
176 InternalLogsConfig::NAME,
177 &mut log,
178 legacy_host_key,
179 path!("host"),
180 hostname.to_owned(),
181 );
182 }
183
184 let legacy_pid_key = pid_key.as_ref().map(LegacyKey::Overwrite);
185 log_namespace.insert_source_metadata(
186 InternalLogsConfig::NAME,
187 &mut log,
188 legacy_pid_key,
189 path!("pid"),
190 pid,
191 );
192
193 log_namespace.insert_standard_vector_source_metadata(
194 &mut log,
195 InternalLogsConfig::NAME,
196 Utc::now(),
197 );
198
199 if (out.send_event(Event::from(log)).await).is_err() {
200 emit!(StreamClosedError { count: 1 });
202 return Err(());
203 }
204 }
205
206 Ok(())
207}
208
209#[cfg(test)]
210mod tests {
211 use futures::Stream;
212 use tokio::time::{Duration, sleep};
213 use vector_lib::{event::Value, lookup::OwnedTargetPath};
214 use vrl::value::kind::Collection;
215
216 use serial_test::serial;
217
218 use super::*;
219 use crate::{
220 event::Event,
221 test_util::{
222 collect_ready,
223 components::{SOURCE_TAGS, assert_source_compliance},
224 },
225 trace,
226 };
227
228 #[test]
229 fn generates_config() {
230 crate::test_util::test_generate_config::<InternalLogsConfig>();
231 }
232
233 #[tokio::test]
239 #[serial]
240 async fn receives_logs() {
241 trace::init(false, false, "debug", 10);
242 trace::reset_early_buffer();
243
244 assert_source_compliance(&SOURCE_TAGS, run_test()).await;
245 }
246
247 async fn run_test() {
248 let test_id: u8 = rand::random();
249 let start = chrono::Utc::now();
250
251 error!(message = "Before source started without span.", %test_id);
252
253 let span = error_span!(
254 "source",
255 component_kind = "source",
256 component_id = "foo",
257 component_type = "internal_logs",
258 );
259 let _enter = span.enter();
260
261 error!(message = "Before source started.", %test_id);
262
263 let rx = start_source().await;
264
265 error!(message = "After source started.", %test_id);
266
267 {
268 let nested_span = error_span!(
269 "nested span",
270 component_kind = "bar",
271 component_new_field = "baz",
272 component_numerical_field = 1,
273 ignored_field = "foobarbaz",
274 );
275 let _enter = nested_span.enter();
276 error!(message = "In a nested span.", %test_id);
277 }
278
279 sleep(Duration::from_millis(1)).await;
280 let mut events = collect_ready(rx).await;
281 let test_id = Value::from(test_id.to_string());
282 events.retain(|event| event.as_log().get("test_id") == Some(&test_id));
283
284 let end = chrono::Utc::now();
285
286 assert_eq!(events.len(), 4);
287
288 assert_eq!(
289 events[0].as_log()["message"],
290 "Before source started without span.".into()
291 );
292 assert_eq!(
293 events[1].as_log()["message"],
294 "Before source started.".into()
295 );
296 assert_eq!(
297 events[2].as_log()["message"],
298 "After source started.".into()
299 );
300 assert_eq!(events[3].as_log()["message"], "In a nested span.".into());
301
302 for (i, event) in events.iter().enumerate() {
303 let log = event.as_log();
304 let timestamp = *log["timestamp"]
305 .as_timestamp()
306 .expect("timestamp isn't a timestamp");
307 assert!(timestamp >= start);
308 assert!(timestamp <= end);
309 assert_eq!(log["metadata.kind"], "event".into());
310 assert_eq!(log["metadata.level"], "ERROR".into());
311 if i == 0 {
313 assert!(log.get("vector.component_id").is_none());
314 assert!(log.get("vector.component_kind").is_none());
315 assert!(log.get("vector.component_type").is_none());
316 } else if i < 3 {
317 assert_eq!(log["vector.component_id"], "foo".into());
318 assert_eq!(log["vector.component_kind"], "source".into());
319 assert_eq!(log["vector.component_type"], "internal_logs".into());
320 } else {
321 assert_eq!(log["vector.component_id"], "foo".into());
325 assert_eq!(log["vector.component_kind"], "bar".into());
326 assert_eq!(log["vector.component_type"], "internal_logs".into());
327 assert_eq!(log["vector.component_new_field"], "baz".into());
328 assert_eq!(log["vector.component_numerical_field"], 1.into());
329 assert!(log.get("vector.ignored_field").is_none());
330 }
331 }
332 }
333
334 async fn start_source() -> impl Stream<Item = Event> + Unpin {
335 let (tx, rx) = SourceSender::new_test();
336
337 let source = InternalLogsConfig::default()
338 .build(SourceContext::new_test(tx, None))
339 .await
340 .unwrap();
341 tokio::spawn(source);
342 sleep(Duration::from_millis(1)).await;
343 trace::stop_early_buffering();
344 rx
345 }
346
347 #[tokio::test]
350 #[serial]
351 async fn repeated_logs_are_not_rate_limited() {
352 trace::init(false, false, "info", 10);
353 trace::reset_early_buffer();
354
355 let rx = start_source().await;
356
357 for _ in 0..20 {
359 info!(component_id = "test", "Repeated test message.");
360 }
361
362 sleep(Duration::from_millis(50)).await;
363 let events = collect_ready(rx).await;
364
365 let test_events: Vec<_> = events
367 .iter()
368 .filter(|e| {
369 e.as_log()
370 .get("message")
371 .map(|m| m.to_string_lossy() == "Repeated test message.")
372 .unwrap_or(false)
373 })
374 .collect();
375
376 assert_eq!(
378 test_events.len(),
379 20,
380 "internal_logs source should capture all repeated messages without rate limiting"
381 );
382 }
383
384 #[test]
385 fn output_schema_definition_vector_namespace() {
386 let config = InternalLogsConfig::default();
387
388 let definitions = config
389 .outputs(LogNamespace::Vector)
390 .remove(0)
391 .schema_definition(true);
392
393 let expected_definition =
394 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
395 .with_meaning(OwnedTargetPath::event_root(), "message")
396 .with_metadata_field(
397 &owned_value_path!("vector", "source_type"),
398 Kind::bytes(),
399 None,
400 )
401 .with_metadata_field(
402 &owned_value_path!(InternalLogsConfig::NAME, "pid"),
403 Kind::integer(),
404 None,
405 )
406 .with_metadata_field(
407 &owned_value_path!("vector", "ingest_timestamp"),
408 Kind::timestamp(),
409 None,
410 )
411 .with_metadata_field(
412 &owned_value_path!(InternalLogsConfig::NAME, "host"),
413 Kind::bytes().or_undefined(),
414 Some("host"),
415 );
416
417 assert_eq!(definitions, Some(expected_definition))
418 }
419
420 #[test]
421 fn output_schema_definition_legacy_namespace() {
422 let mut config = InternalLogsConfig::default();
423
424 let pid_key = "pid_a_pid_a_pid_pid_pid";
425
426 config.pid_key = OptionalValuePath::from(owned_value_path!(pid_key));
427
428 let definitions = config
429 .outputs(LogNamespace::Legacy)
430 .remove(0)
431 .schema_definition(true);
432
433 let expected_definition = Definition::new_with_default_metadata(
434 Kind::object(Collection::empty()),
435 [LogNamespace::Legacy],
436 )
437 .with_event_field(
438 &owned_value_path!("message"),
439 Kind::bytes(),
440 Some("message"),
441 )
442 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
443 .with_event_field(&owned_value_path!(pid_key), Kind::integer(), None)
444 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
445 .with_event_field(
446 &owned_value_path!("host"),
447 Kind::bytes().or_undefined(),
448 Some("host"),
449 );
450
451 assert_eq!(definitions, Some(expected_definition))
452 }
453}