1use chrono::Utc;
2use futures::{stream, StreamExt};
3use vector_lib::codecs::BytesDeserializerConfig;
4use vector_lib::config::log_schema;
5use vector_lib::configurable::configurable_component;
6use vector_lib::lookup::lookup_v2::OptionalValuePath;
7use vector_lib::lookup::{owned_value_path, path, OwnedValuePath};
8use vector_lib::{
9 config::{LegacyKey, LogNamespace},
10 schema::Definition,
11};
12use vrl::value::Kind;
13
14use crate::{
15 config::{DataType, SourceConfig, SourceContext, SourceOutput},
16 event::{EstimatedJsonEncodedSizeOf, Event},
17 internal_events::{InternalLogsBytesReceived, InternalLogsEventsReceived, StreamClosedError},
18 shutdown::ShutdownSignal,
19 trace::TraceSubscription,
20 SourceSender,
21};
22
23#[configurable_component(source(
25 "internal_logs",
26 "Expose internal log messages emitted by the running Vector instance."
27))]
28#[derive(Clone, Debug)]
29#[serde(deny_unknown_fields)]
30pub struct InternalLogsConfig {
31 host_key: Option<OptionalValuePath>,
39
40 #[serde(default = "default_pid_key")]
46 pid_key: OptionalValuePath,
47
48 #[configurable(metadata(docs::hidden))]
50 #[serde(default)]
51 log_namespace: Option<bool>,
52}
53
54fn default_pid_key() -> OptionalValuePath {
55 OptionalValuePath::from(owned_value_path!("pid"))
56}
57
58impl_generate_config_from_default!(InternalLogsConfig);
59
60impl Default for InternalLogsConfig {
61 fn default() -> InternalLogsConfig {
62 InternalLogsConfig {
63 host_key: None,
64 pid_key: default_pid_key(),
65 log_namespace: None,
66 }
67 }
68}
69
70impl InternalLogsConfig {
71 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
73 let host_key = self
74 .host_key
75 .clone()
76 .unwrap_or(log_schema().host_key().cloned().into())
77 .path
78 .map(LegacyKey::Overwrite);
79 let pid_key = self.pid_key.clone().path.map(LegacyKey::Overwrite);
80
81 BytesDeserializerConfig
84 .schema_definition(log_namespace)
85 .with_standard_vector_source_metadata()
86 .with_source_metadata(
87 InternalLogsConfig::NAME,
88 host_key,
89 &owned_value_path!("host"),
90 Kind::bytes().or_undefined(),
91 Some("host"),
92 )
93 .with_source_metadata(
94 InternalLogsConfig::NAME,
95 pid_key,
96 &owned_value_path!("pid"),
97 Kind::integer(),
98 None,
99 )
100 }
101}
102
103#[async_trait::async_trait]
104#[typetag::serde(name = "internal_logs")]
105impl SourceConfig for InternalLogsConfig {
106 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
107 let host_key = self
108 .host_key
109 .clone()
110 .unwrap_or(log_schema().host_key().cloned().into())
111 .path;
112 let pid_key = self.pid_key.clone().path;
113
114 let subscription = TraceSubscription::subscribe();
115
116 let log_namespace = cx.log_namespace(self.log_namespace);
117
118 Ok(Box::pin(run(
119 host_key,
120 pid_key,
121 subscription,
122 cx.out,
123 cx.shutdown,
124 log_namespace,
125 )))
126 }
127
128 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
129 let schema_definition =
130 self.schema_definition(global_log_namespace.merge(self.log_namespace));
131
132 vec![SourceOutput::new_maybe_logs(
133 DataType::Log,
134 schema_definition,
135 )]
136 }
137
138 fn can_acknowledge(&self) -> bool {
139 false
140 }
141}
142
143async fn run(
144 host_key: Option<OwnedValuePath>,
145 pid_key: Option<OwnedValuePath>,
146 mut subscription: TraceSubscription,
147 mut out: SourceSender,
148 shutdown: ShutdownSignal,
149 log_namespace: LogNamespace,
150) -> Result<(), ()> {
151 let hostname = crate::get_hostname();
152 let pid = std::process::id();
153
154 let buffered_events = subscription.buffered_events().await;
157 let mut rx = stream::iter(buffered_events.into_iter().flatten())
158 .chain(subscription.into_stream())
159 .take_until(shutdown);
160
161 while let Some(mut log) = rx.next().await {
165 let byte_size = log.estimated_json_encoded_size_of().get();
167 let json_byte_size = log.estimated_json_encoded_size_of();
168 emit!(InternalLogsBytesReceived { byte_size });
170 emit!(InternalLogsEventsReceived {
171 count: 1,
172 byte_size: json_byte_size,
173 });
174
175 if let Ok(hostname) = &hostname {
176 let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
177 log_namespace.insert_source_metadata(
178 InternalLogsConfig::NAME,
179 &mut log,
180 legacy_host_key,
181 path!("host"),
182 hostname.to_owned(),
183 );
184 }
185
186 let legacy_pid_key = pid_key.as_ref().map(LegacyKey::Overwrite);
187 log_namespace.insert_source_metadata(
188 InternalLogsConfig::NAME,
189 &mut log,
190 legacy_pid_key,
191 path!("pid"),
192 pid,
193 );
194
195 log_namespace.insert_standard_vector_source_metadata(
196 &mut log,
197 InternalLogsConfig::NAME,
198 Utc::now(),
199 );
200
201 if (out.send_event(Event::from(log)).await).is_err() {
202 emit!(StreamClosedError { count: 1 });
204 return Err(());
205 }
206 }
207
208 Ok(())
209}
210
211#[cfg(test)]
212mod tests {
213 use futures::Stream;
214 use tokio::time::{sleep, Duration};
215 use vector_lib::event::Value;
216 use vector_lib::lookup::OwnedTargetPath;
217 use vrl::value::kind::Collection;
218
219 use super::*;
220 use crate::{
221 event::Event,
222 test_util::{
223 collect_ready,
224 components::{assert_source_compliance, SOURCE_TAGS},
225 },
226 trace,
227 };
228
229 #[test]
230 fn generates_config() {
231 crate::test_util::test_generate_config::<InternalLogsConfig>();
232 }
233
234 #[tokio::test]
240 async fn receives_logs() {
241 trace::init(false, false, "debug", 10);
242 trace::reset_early_buffer();
243
244 assert_source_compliance(&SOURCE_TAGS, run_test()).await;
245 }
246
247 async fn run_test() {
248 let test_id: u8 = rand::random();
249 let start = chrono::Utc::now();
250
251 error!(message = "Before source started without span.", %test_id);
252
253 let span = error_span!(
254 "source",
255 component_kind = "source",
256 component_id = "foo",
257 component_type = "internal_logs",
258 );
259 let _enter = span.enter();
260
261 error!(message = "Before source started.", %test_id);
262
263 let rx = start_source().await;
264
265 error!(message = "After source started.", %test_id);
266
267 {
268 let nested_span = error_span!(
269 "nested span",
270 component_kind = "bar",
271 component_new_field = "baz",
272 component_numerical_field = 1,
273 ignored_field = "foobarbaz",
274 );
275 let _enter = nested_span.enter();
276 error!(message = "In a nested span.", %test_id);
277 }
278
279 sleep(Duration::from_millis(1)).await;
280 let mut events = collect_ready(rx).await;
281 let test_id = Value::from(test_id.to_string());
282 events.retain(|event| event.as_log().get("test_id") == Some(&test_id));
283
284 let end = chrono::Utc::now();
285
286 assert_eq!(events.len(), 4);
287
288 assert_eq!(
289 events[0].as_log()["message"],
290 "Before source started without span.".into()
291 );
292 assert_eq!(
293 events[1].as_log()["message"],
294 "Before source started.".into()
295 );
296 assert_eq!(
297 events[2].as_log()["message"],
298 "After source started.".into()
299 );
300 assert_eq!(events[3].as_log()["message"], "In a nested span.".into());
301
302 for (i, event) in events.iter().enumerate() {
303 let log = event.as_log();
304 let timestamp = *log["timestamp"]
305 .as_timestamp()
306 .expect("timestamp isn't a timestamp");
307 assert!(timestamp >= start);
308 assert!(timestamp <= end);
309 assert_eq!(log["metadata.kind"], "event".into());
310 assert_eq!(log["metadata.level"], "ERROR".into());
311 if i == 0 {
313 assert!(log.get("vector.component_id").is_none());
314 assert!(log.get("vector.component_kind").is_none());
315 assert!(log.get("vector.component_type").is_none());
316 } else if i < 3 {
317 assert_eq!(log["vector.component_id"], "foo".into());
318 assert_eq!(log["vector.component_kind"], "source".into());
319 assert_eq!(log["vector.component_type"], "internal_logs".into());
320 } else {
321 assert_eq!(log["vector.component_id"], "foo".into());
325 assert_eq!(log["vector.component_kind"], "bar".into());
326 assert_eq!(log["vector.component_type"], "internal_logs".into());
327 assert_eq!(log["vector.component_new_field"], "baz".into());
328 assert_eq!(log["vector.component_numerical_field"], 1.into());
329 assert!(log.get("vector.ignored_field").is_none());
330 }
331 }
332 }
333
334 async fn start_source() -> impl Stream<Item = Event> + Unpin {
335 let (tx, rx) = SourceSender::new_test();
336
337 let source = InternalLogsConfig::default()
338 .build(SourceContext::new_test(tx, None))
339 .await
340 .unwrap();
341 tokio::spawn(source);
342 sleep(Duration::from_millis(1)).await;
343 trace::stop_early_buffering();
344 rx
345 }
346
347 #[test]
348 fn output_schema_definition_vector_namespace() {
349 let config = InternalLogsConfig::default();
350
351 let definitions = config
352 .outputs(LogNamespace::Vector)
353 .remove(0)
354 .schema_definition(true);
355
356 let expected_definition =
357 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
358 .with_meaning(OwnedTargetPath::event_root(), "message")
359 .with_metadata_field(
360 &owned_value_path!("vector", "source_type"),
361 Kind::bytes(),
362 None,
363 )
364 .with_metadata_field(
365 &owned_value_path!(InternalLogsConfig::NAME, "pid"),
366 Kind::integer(),
367 None,
368 )
369 .with_metadata_field(
370 &owned_value_path!("vector", "ingest_timestamp"),
371 Kind::timestamp(),
372 None,
373 )
374 .with_metadata_field(
375 &owned_value_path!(InternalLogsConfig::NAME, "host"),
376 Kind::bytes().or_undefined(),
377 Some("host"),
378 );
379
380 assert_eq!(definitions, Some(expected_definition))
381 }
382
383 #[test]
384 fn output_schema_definition_legacy_namespace() {
385 let mut config = InternalLogsConfig::default();
386
387 let pid_key = "pid_a_pid_a_pid_pid_pid";
388
389 config.pid_key = OptionalValuePath::from(owned_value_path!(pid_key));
390
391 let definitions = config
392 .outputs(LogNamespace::Legacy)
393 .remove(0)
394 .schema_definition(true);
395
396 let expected_definition = Definition::new_with_default_metadata(
397 Kind::object(Collection::empty()),
398 [LogNamespace::Legacy],
399 )
400 .with_event_field(
401 &owned_value_path!("message"),
402 Kind::bytes(),
403 Some("message"),
404 )
405 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
406 .with_event_field(&owned_value_path!(pid_key), Kind::integer(), None)
407 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
408 .with_event_field(
409 &owned_value_path!("host"),
410 Kind::bytes().or_undefined(),
411 Some("host"),
412 );
413
414 assert_eq!(definitions, Some(expected_definition))
415 }
416}