1use chrono::Utc;
2use futures::{StreamExt, stream};
3use vector_lib::{
4 codecs::BytesDeserializerConfig,
5 config::{LegacyKey, LogNamespace, log_schema},
6 configurable::configurable_component,
7 lookup::{OwnedValuePath, lookup_v2::OptionalValuePath, owned_value_path, path},
8 schema::Definition,
9};
10use vrl::value::Kind;
11
12use crate::{
13 SourceSender,
14 config::{DataType, SourceConfig, SourceContext, SourceOutput},
15 event::{EstimatedJsonEncodedSizeOf, Event},
16 internal_events::{InternalLogsBytesReceived, InternalLogsEventsReceived, StreamClosedError},
17 shutdown::ShutdownSignal,
18 trace::TraceSubscription,
19};
20
21#[configurable_component(source(
23 "internal_logs",
24 "Expose internal log messages emitted by the running Vector instance."
25))]
26#[derive(Clone, Debug)]
27#[serde(deny_unknown_fields)]
28pub struct InternalLogsConfig {
29 host_key: Option<OptionalValuePath>,
37
38 #[serde(default = "default_pid_key")]
44 pid_key: OptionalValuePath,
45
46 #[configurable(metadata(docs::hidden))]
48 #[serde(default)]
49 log_namespace: Option<bool>,
50}
51
52fn default_pid_key() -> OptionalValuePath {
53 OptionalValuePath::from(owned_value_path!("pid"))
54}
55
56impl_generate_config_from_default!(InternalLogsConfig);
57
58impl Default for InternalLogsConfig {
59 fn default() -> InternalLogsConfig {
60 InternalLogsConfig {
61 host_key: None,
62 pid_key: default_pid_key(),
63 log_namespace: None,
64 }
65 }
66}
67
68impl InternalLogsConfig {
69 fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
71 let host_key = self
72 .host_key
73 .clone()
74 .unwrap_or(log_schema().host_key().cloned().into())
75 .path
76 .map(LegacyKey::Overwrite);
77 let pid_key = self.pid_key.clone().path.map(LegacyKey::Overwrite);
78
79 BytesDeserializerConfig
82 .schema_definition(log_namespace)
83 .with_standard_vector_source_metadata()
84 .with_source_metadata(
85 InternalLogsConfig::NAME,
86 host_key,
87 &owned_value_path!("host"),
88 Kind::bytes().or_undefined(),
89 Some("host"),
90 )
91 .with_source_metadata(
92 InternalLogsConfig::NAME,
93 pid_key,
94 &owned_value_path!("pid"),
95 Kind::integer(),
96 None,
97 )
98 }
99}
100
101#[async_trait::async_trait]
102#[typetag::serde(name = "internal_logs")]
103impl SourceConfig for InternalLogsConfig {
104 async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
105 let host_key = self
106 .host_key
107 .clone()
108 .unwrap_or(log_schema().host_key().cloned().into())
109 .path;
110 let pid_key = self.pid_key.clone().path;
111
112 let subscription = TraceSubscription::subscribe();
113
114 let log_namespace = cx.log_namespace(self.log_namespace);
115
116 Ok(Box::pin(run(
117 host_key,
118 pid_key,
119 subscription,
120 cx.out,
121 cx.shutdown,
122 log_namespace,
123 )))
124 }
125
126 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
127 let schema_definition =
128 self.schema_definition(global_log_namespace.merge(self.log_namespace));
129
130 vec![SourceOutput::new_maybe_logs(
131 DataType::Log,
132 schema_definition,
133 )]
134 }
135
136 fn can_acknowledge(&self) -> bool {
137 false
138 }
139}
140
141async fn run(
142 host_key: Option<OwnedValuePath>,
143 pid_key: Option<OwnedValuePath>,
144 mut subscription: TraceSubscription,
145 mut out: SourceSender,
146 shutdown: ShutdownSignal,
147 log_namespace: LogNamespace,
148) -> Result<(), ()> {
149 let hostname = crate::get_hostname();
150 let pid = std::process::id();
151
152 let buffered_events = subscription.buffered_events().await;
155 let mut rx = stream::iter(buffered_events.into_iter().flatten())
156 .chain(subscription.into_stream())
157 .take_until(shutdown);
158
159 while let Some(mut log) = rx.next().await {
163 let byte_size = log.estimated_json_encoded_size_of().get();
165 let json_byte_size = log.estimated_json_encoded_size_of();
166 emit!(InternalLogsBytesReceived { byte_size });
168 emit!(InternalLogsEventsReceived {
169 count: 1,
170 byte_size: json_byte_size,
171 });
172
173 if let Ok(hostname) = &hostname {
174 let legacy_host_key = host_key.as_ref().map(LegacyKey::Overwrite);
175 log_namespace.insert_source_metadata(
176 InternalLogsConfig::NAME,
177 &mut log,
178 legacy_host_key,
179 path!("host"),
180 hostname.to_owned(),
181 );
182 }
183
184 let legacy_pid_key = pid_key.as_ref().map(LegacyKey::Overwrite);
185 log_namespace.insert_source_metadata(
186 InternalLogsConfig::NAME,
187 &mut log,
188 legacy_pid_key,
189 path!("pid"),
190 pid,
191 );
192
193 log_namespace.insert_standard_vector_source_metadata(
194 &mut log,
195 InternalLogsConfig::NAME,
196 Utc::now(),
197 );
198
199 if (out.send_event(Event::from(log)).await).is_err() {
200 emit!(StreamClosedError { count: 1 });
202 return Err(());
203 }
204 }
205
206 Ok(())
207}
208
209#[cfg(test)]
210mod tests {
211 use futures::Stream;
212 use tokio::time::{Duration, sleep};
213 use vector_lib::{event::Value, lookup::OwnedTargetPath};
214 use vrl::value::kind::Collection;
215
216 use super::*;
217 use crate::{
218 event::Event,
219 test_util::{
220 collect_ready,
221 components::{SOURCE_TAGS, assert_source_compliance},
222 },
223 trace,
224 };
225
226 #[test]
227 fn generates_config() {
228 crate::test_util::test_generate_config::<InternalLogsConfig>();
229 }
230
231 #[tokio::test]
237 async fn receives_logs() {
238 trace::init(false, false, "debug", 10);
239 trace::reset_early_buffer();
240
241 assert_source_compliance(&SOURCE_TAGS, run_test()).await;
242 }
243
244 async fn run_test() {
245 let test_id: u8 = rand::random();
246 let start = chrono::Utc::now();
247
248 error!(message = "Before source started without span.", %test_id);
249
250 let span = error_span!(
251 "source",
252 component_kind = "source",
253 component_id = "foo",
254 component_type = "internal_logs",
255 );
256 let _enter = span.enter();
257
258 error!(message = "Before source started.", %test_id);
259
260 let rx = start_source().await;
261
262 error!(message = "After source started.", %test_id);
263
264 {
265 let nested_span = error_span!(
266 "nested span",
267 component_kind = "bar",
268 component_new_field = "baz",
269 component_numerical_field = 1,
270 ignored_field = "foobarbaz",
271 );
272 let _enter = nested_span.enter();
273 error!(message = "In a nested span.", %test_id);
274 }
275
276 sleep(Duration::from_millis(1)).await;
277 let mut events = collect_ready(rx).await;
278 let test_id = Value::from(test_id.to_string());
279 events.retain(|event| event.as_log().get("test_id") == Some(&test_id));
280
281 let end = chrono::Utc::now();
282
283 assert_eq!(events.len(), 4);
284
285 assert_eq!(
286 events[0].as_log()["message"],
287 "Before source started without span.".into()
288 );
289 assert_eq!(
290 events[1].as_log()["message"],
291 "Before source started.".into()
292 );
293 assert_eq!(
294 events[2].as_log()["message"],
295 "After source started.".into()
296 );
297 assert_eq!(events[3].as_log()["message"], "In a nested span.".into());
298
299 for (i, event) in events.iter().enumerate() {
300 let log = event.as_log();
301 let timestamp = *log["timestamp"]
302 .as_timestamp()
303 .expect("timestamp isn't a timestamp");
304 assert!(timestamp >= start);
305 assert!(timestamp <= end);
306 assert_eq!(log["metadata.kind"], "event".into());
307 assert_eq!(log["metadata.level"], "ERROR".into());
308 if i == 0 {
310 assert!(log.get("vector.component_id").is_none());
311 assert!(log.get("vector.component_kind").is_none());
312 assert!(log.get("vector.component_type").is_none());
313 } else if i < 3 {
314 assert_eq!(log["vector.component_id"], "foo".into());
315 assert_eq!(log["vector.component_kind"], "source".into());
316 assert_eq!(log["vector.component_type"], "internal_logs".into());
317 } else {
318 assert_eq!(log["vector.component_id"], "foo".into());
322 assert_eq!(log["vector.component_kind"], "bar".into());
323 assert_eq!(log["vector.component_type"], "internal_logs".into());
324 assert_eq!(log["vector.component_new_field"], "baz".into());
325 assert_eq!(log["vector.component_numerical_field"], 1.into());
326 assert!(log.get("vector.ignored_field").is_none());
327 }
328 }
329 }
330
331 async fn start_source() -> impl Stream<Item = Event> + Unpin {
332 let (tx, rx) = SourceSender::new_test();
333
334 let source = InternalLogsConfig::default()
335 .build(SourceContext::new_test(tx, None))
336 .await
337 .unwrap();
338 tokio::spawn(source);
339 sleep(Duration::from_millis(1)).await;
340 trace::stop_early_buffering();
341 rx
342 }
343
344 #[test]
345 fn output_schema_definition_vector_namespace() {
346 let config = InternalLogsConfig::default();
347
348 let definitions = config
349 .outputs(LogNamespace::Vector)
350 .remove(0)
351 .schema_definition(true);
352
353 let expected_definition =
354 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
355 .with_meaning(OwnedTargetPath::event_root(), "message")
356 .with_metadata_field(
357 &owned_value_path!("vector", "source_type"),
358 Kind::bytes(),
359 None,
360 )
361 .with_metadata_field(
362 &owned_value_path!(InternalLogsConfig::NAME, "pid"),
363 Kind::integer(),
364 None,
365 )
366 .with_metadata_field(
367 &owned_value_path!("vector", "ingest_timestamp"),
368 Kind::timestamp(),
369 None,
370 )
371 .with_metadata_field(
372 &owned_value_path!(InternalLogsConfig::NAME, "host"),
373 Kind::bytes().or_undefined(),
374 Some("host"),
375 );
376
377 assert_eq!(definitions, Some(expected_definition))
378 }
379
380 #[test]
381 fn output_schema_definition_legacy_namespace() {
382 let mut config = InternalLogsConfig::default();
383
384 let pid_key = "pid_a_pid_a_pid_pid_pid";
385
386 config.pid_key = OptionalValuePath::from(owned_value_path!(pid_key));
387
388 let definitions = config
389 .outputs(LogNamespace::Legacy)
390 .remove(0)
391 .schema_definition(true);
392
393 let expected_definition = Definition::new_with_default_metadata(
394 Kind::object(Collection::empty()),
395 [LogNamespace::Legacy],
396 )
397 .with_event_field(
398 &owned_value_path!("message"),
399 Kind::bytes(),
400 Some("message"),
401 )
402 .with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
403 .with_event_field(&owned_value_path!(pid_key), Kind::integer(), None)
404 .with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
405 .with_event_field(
406 &owned_value_path!("host"),
407 Kind::bytes().or_undefined(),
408 Some("host"),
409 );
410
411 assert_eq!(definitions, Some(expected_definition))
412 }
413}