1#![allow(missing_docs)]
2use std::{
3 collections::HashMap,
4 marker::PhantomData,
5 str::FromStr,
6 sync::{
7 atomic::{AtomicBool, Ordering},
8 Mutex, MutexGuard, OnceLock,
9 },
10};
11
12use futures_util::{future::ready, Stream, StreamExt};
13use metrics_tracing_context::MetricsLayer;
14use tokio::sync::{
15 broadcast::{self, Receiver, Sender},
16 oneshot,
17};
18use tokio_stream::wrappers::BroadcastStream;
19use tracing::{Event, Subscriber};
20use tracing_limit::RateLimitedLayer;
21use tracing_subscriber::{
22 filter::LevelFilter,
23 layer::{Context, SubscriberExt},
24 registry::LookupSpan,
25 util::SubscriberInitExt,
26 Layer,
27};
28pub use tracing_tower::{InstrumentableService, InstrumentedService};
29use vector_lib::lookup::event_path;
30use vrl::value::Value;
31
32use crate::event::LogEvent;
33
34static BUFFER: Mutex<Option<Vec<LogEvent>>> = Mutex::new(Some(Vec::new()));
41
42static SHOULD_BUFFER: AtomicBool = AtomicBool::new(true);
45
46static SUBSCRIBERS: Mutex<Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> =
49 Mutex::new(Some(Vec::new()));
50
51static SENDER: OnceLock<Sender<LogEvent>> = OnceLock::new();
54
55fn metrics_layer_enabled() -> bool {
56 !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
57}
58
59pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64) {
60 let fmt_filter = tracing_subscriber::filter::Targets::from_str(levels).expect(
61 "logging filter targets were not formatted correctly or did not specify a valid level",
62 );
63
64 let metrics_layer =
65 metrics_layer_enabled().then(|| MetricsLayer::new().with_filter(LevelFilter::INFO));
66
67 let broadcast_layer = RateLimitedLayer::new(BroadcastLayer::new())
68 .with_default_limit(internal_log_rate_limit)
69 .with_filter(fmt_filter.clone());
70
71 let subscriber = tracing_subscriber::registry()
72 .with(metrics_layer)
73 .with(broadcast_layer);
74
75 #[cfg(feature = "tokio-console")]
76 let subscriber = {
77 let console_layer = console_subscriber::ConsoleLayer::builder()
78 .with_default_env()
79 .spawn();
80
81 subscriber.with(console_layer)
82 };
83
84 #[cfg(feature = "allocation-tracing")]
85 let subscriber = {
86 let allocation_layer = crate::internal_telemetry::allocations::AllocationLayer::new()
87 .with_filter(LevelFilter::ERROR);
88
89 subscriber.with(allocation_layer)
90 };
91
92 if json {
93 let formatter = tracing_subscriber::fmt::layer().json().flatten_event(true);
94
95 #[cfg(test)]
96 let formatter = formatter.with_test_writer();
97
98 let rate_limited =
99 RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
100 let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
101
102 _ = subscriber.try_init();
103 } else {
104 let formatter = tracing_subscriber::fmt::layer()
105 .with_ansi(color)
106 .with_writer(std::io::stderr);
107
108 #[cfg(test)]
109 let formatter = formatter.with_test_writer();
110
111 let rate_limited =
112 RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
113 let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
114
115 _ = subscriber.try_init();
116 }
117}
118
119#[cfg(test)]
120pub fn reset_early_buffer() -> Option<Vec<LogEvent>> {
121 get_early_buffer().replace(Vec::new())
122}
123
124fn get_early_buffer() -> MutexGuard<'static, Option<Vec<LogEvent>>> {
126 BUFFER
127 .lock()
128 .expect("Couldn't acquire lock on internal logs buffer")
129}
130
131fn should_process_tracing_event() -> bool {
136 get_early_buffer().is_some() || maybe_get_trace_sender().is_some()
137}
138
139fn try_buffer_event(log: &LogEvent) -> bool {
141 if SHOULD_BUFFER.load(Ordering::Acquire) {
142 if let Some(buffer) = get_early_buffer().as_mut() {
143 buffer.push(log.clone());
144 return true;
145 }
146 }
147
148 false
149}
150
151fn try_broadcast_event(log: LogEvent) {
155 if let Some(sender) = maybe_get_trace_sender() {
156 _ = sender.send(log);
157 }
158}
159
160fn consume_early_buffer() -> Vec<LogEvent> {
166 get_early_buffer()
167 .take()
168 .expect("early buffer was already consumed")
169}
170
171fn get_trace_sender() -> &'static broadcast::Sender<LogEvent> {
173 SENDER.get_or_init(|| broadcast::channel(99).0)
174}
175
176fn maybe_get_trace_sender() -> Option<&'static broadcast::Sender<LogEvent>> {
180 SENDER.get()
181}
182
183fn get_trace_receiver() -> broadcast::Receiver<LogEvent> {
187 get_trace_sender().subscribe()
188}
189
190fn get_trace_subscriber_list() -> MutexGuard<'static, Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> {
192 SUBSCRIBERS.lock().expect("poisoned locks are dumb")
193}
194
195fn try_register_for_early_events() -> Option<oneshot::Receiver<Vec<LogEvent>>> {
201 if SHOULD_BUFFER.load(Ordering::Acquire) {
202 get_trace_subscriber_list().as_mut().map(|subscribers| {
206 let (tx, rx) = oneshot::channel();
207 subscribers.push(tx);
208 rx
209 })
210 } else {
211 None
213 }
214}
215
216pub fn stop_early_buffering() {
221 if SHOULD_BUFFER
225 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
226 .is_err()
227 {
228 return;
229 }
230
231 let subscribers = get_trace_subscriber_list().take();
234 if let Some(subscribers_tx) = subscribers {
235 let buffered_events = consume_early_buffer();
237 for subscriber_tx in subscribers_tx {
238 _ = subscriber_tx.send(buffered_events.clone());
240 }
241 }
242}
243
244pub struct TraceSubscription {
250 buffered_events_rx: Option<oneshot::Receiver<Vec<LogEvent>>>,
251 trace_rx: Receiver<LogEvent>,
252}
253
254impl TraceSubscription {
255 pub fn subscribe() -> TraceSubscription {
257 let buffered_events_rx = try_register_for_early_events();
258 let trace_rx = get_trace_receiver();
259
260 Self {
261 buffered_events_rx,
262 trace_rx,
263 }
264 }
265
266 pub async fn buffered_events(&mut self) -> Option<Vec<LogEvent>> {
272 match self.buffered_events_rx.take() {
275 Some(rx) => rx.await.ok(),
276 None => None,
277 }
278 }
279
280 pub fn into_stream(self) -> impl Stream<Item = LogEvent> + Unpin {
282 BroadcastStream::new(self.trace_rx).filter_map(|event| ready(event.ok()))
285 }
286}
287
288struct BroadcastLayer<S> {
289 _subscriber: PhantomData<S>,
290}
291
292impl<S> BroadcastLayer<S> {
293 const fn new() -> Self {
294 BroadcastLayer {
295 _subscriber: PhantomData,
296 }
297 }
298}
299
300impl<S> Layer<S> for BroadcastLayer<S>
301where
302 S: Subscriber + 'static + for<'lookup> LookupSpan<'lookup>,
303{
304 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
305 if should_process_tracing_event() {
306 let mut log = LogEvent::from(event);
307 if let Some(parent_span) = ctx.event_span(event) {
309 for span in parent_span.scope().from_root() {
310 if let Some(fields) = span.extensions().get::<SpanFields>() {
311 for (k, v) in &fields.0 {
312 log.insert(event_path!("vector", *k), v.clone());
313 }
314 }
315 }
316 }
317 if !try_buffer_event(&log) {
320 try_broadcast_event(log);
321 }
322 }
323 }
324
325 fn on_new_span(
326 &self,
327 attrs: &tracing_core::span::Attributes<'_>,
328 id: &tracing_core::span::Id,
329 ctx: Context<'_, S>,
330 ) {
331 let span = ctx.span(id).expect("span must already exist!");
332 let mut fields = SpanFields::default();
333 attrs.values().record(&mut fields);
334 span.extensions_mut().insert(fields);
335 }
336}
337
338#[derive(Default, Debug)]
339struct SpanFields(HashMap<&'static str, Value>);
340
341impl SpanFields {
342 fn record(&mut self, field: &tracing_core::Field, value: impl Into<Value>) {
343 let name = field.name();
344 if name.starts_with("component_") {
351 self.0.insert(name, value.into());
352 }
353 }
354}
355
356impl tracing::field::Visit for SpanFields {
357 fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
358 self.record(field, value);
359 }
360
361 fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
362 self.record(field, value);
363 }
364
365 fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
366 self.record(field, value);
367 }
368
369 fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
370 self.record(field, value);
371 }
372
373 fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) {
374 self.record(field, format!("{value:?}"));
375 }
376}