vector/
trace.rs

1#![allow(missing_docs)]
2use std::{
3    collections::HashMap,
4    marker::PhantomData,
5    str::FromStr,
6    sync::{
7        Mutex, MutexGuard, OnceLock,
8        atomic::{AtomicBool, Ordering},
9    },
10};
11
12use futures_util::{Stream, StreamExt, future::ready};
13use metrics_tracing_context::MetricsLayer;
14use tokio::sync::{
15    broadcast::{self, Receiver, Sender},
16    oneshot,
17};
18use tokio_stream::wrappers::BroadcastStream;
19use tracing::{Event, Subscriber};
20use tracing_limit::RateLimitedLayer;
21use tracing_subscriber::{
22    Layer,
23    filter::LevelFilter,
24    layer::{Context, SubscriberExt},
25    registry::LookupSpan,
26    util::SubscriberInitExt,
27};
28pub use tracing_tower::{InstrumentableService, InstrumentedService};
29use vector_lib::lookup::event_path;
30use vrl::value::Value;
31
32use crate::event::LogEvent;
33
34/// BUFFER contains all of the internal log events generated by Vector between the initialization of `tracing` and early
35/// buffering being stopped, which occurs once the topology reports as having successfully started.
36///
37/// This means that callers must subscribe during the configuration phase of their components, and not in the core loop
38/// of the component, as the topology can only report when a component has been spawned, but not necessarily always
39/// when it has started doing, or waiting, for input.
40static BUFFER: Mutex<Option<Vec<LogEvent>>> = Mutex::new(Some(Vec::new()));
41
42/// SHOULD_BUFFER controls whether or not internal log events should be buffered or sent directly to the trace broadcast
43/// channel.
44static SHOULD_BUFFER: AtomicBool = AtomicBool::new(true);
45
46/// SUBSCRIBERS contains a list of callers interested in internal log events who will be notified when early buffering
47/// is disabled, by receiving a copy of all buffered internal log events.
48static SUBSCRIBERS: Mutex<Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> =
49    Mutex::new(Some(Vec::new()));
50
51/// SENDER holds the sender/receiver handle that will receive a copy of all the internal log events *after* the topology
52/// has been initialized.
53static SENDER: OnceLock<Sender<LogEvent>> = OnceLock::new();
54
55fn metrics_layer_enabled() -> bool {
56    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
57}
58
59pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64) {
60    let fmt_filter = tracing_subscriber::filter::Targets::from_str(levels).expect(
61        "logging filter targets were not formatted correctly or did not specify a valid level",
62    );
63
64    let metrics_layer =
65        metrics_layer_enabled().then(|| MetricsLayer::new().with_filter(LevelFilter::INFO));
66
67    // BroadcastLayer should NOT be rate limited because it feeds the internal_logs source,
68    // which users rely on to capture ALL internal Vector logs for debugging/monitoring.
69    // Console output (stdout/stderr) has its own separate rate limiting below.
70    let broadcast_layer = BroadcastLayer::new().with_filter(fmt_filter.clone());
71
72    let subscriber = tracing_subscriber::registry()
73        .with(metrics_layer)
74        .with(broadcast_layer);
75
76    #[cfg(feature = "tokio-console")]
77    let subscriber = {
78        let console_layer = console_subscriber::ConsoleLayer::builder()
79            .with_default_env()
80            .spawn();
81
82        subscriber.with(console_layer)
83    };
84
85    #[cfg(feature = "allocation-tracing")]
86    let subscriber = {
87        let allocation_layer = crate::internal_telemetry::allocations::AllocationLayer::new()
88            .with_filter(LevelFilter::ERROR);
89
90        subscriber.with(allocation_layer)
91    };
92
93    if json {
94        let formatter = tracing_subscriber::fmt::layer().json().flatten_event(true);
95
96        #[cfg(test)]
97        let formatter = formatter.with_test_writer();
98
99        let rate_limited =
100            RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
101        let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
102
103        _ = subscriber.try_init();
104    } else {
105        let formatter = tracing_subscriber::fmt::layer()
106            .with_ansi(color)
107            .with_writer(std::io::stderr);
108
109        #[cfg(test)]
110        let formatter = formatter.with_test_writer();
111
112        let rate_limited =
113            RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
114        let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
115
116        _ = subscriber.try_init();
117    }
118}
119
120#[cfg(test)]
121pub fn reset_early_buffer() -> Option<Vec<LogEvent>> {
122    get_early_buffer().replace(Vec::new())
123}
124
125/// Gets a  mutable reference to the early buffer.
126fn get_early_buffer() -> MutexGuard<'static, Option<Vec<LogEvent>>> {
127    BUFFER
128        .lock()
129        .expect("Couldn't acquire lock on internal logs buffer")
130}
131
132/// Determines whether tracing events should be processed (e.g. converted to log
133/// events) to avoid unnecessary performance overhead.
134///
135/// Checks if [`BUFFER`] is set or if a trace sender exists
136fn should_process_tracing_event() -> bool {
137    get_early_buffer().is_some() || maybe_get_trace_sender().is_some()
138}
139
140/// Attempts to buffer an event into the early buffer.
141fn try_buffer_event(log: &LogEvent) -> bool {
142    if SHOULD_BUFFER.load(Ordering::Acquire)
143        && let Some(buffer) = get_early_buffer().as_mut()
144    {
145        buffer.push(log.clone());
146        return true;
147    }
148
149    false
150}
151
152/// Attempts to broadcast an event to subscribers.
153///
154/// If no subscribers are connected, this does nothing.
155fn try_broadcast_event(log: LogEvent) {
156    if let Some(sender) = maybe_get_trace_sender() {
157        _ = sender.send(log);
158    }
159}
160
161/// Consumes the early buffered events.
162///
163/// # Panics
164///
165/// If the early buffered events have already been consumed, this function will panic.
166fn consume_early_buffer() -> Vec<LogEvent> {
167    get_early_buffer()
168        .take()
169        .expect("early buffer was already consumed")
170}
171
172/// Gets or creates a trace sender for sending internal log events.
173fn get_trace_sender() -> &'static broadcast::Sender<LogEvent> {
174    SENDER.get_or_init(|| broadcast::channel(99).0)
175}
176
177/// Attempts to get the trace sender for sending internal log events.
178///
179/// If the trace sender has not yet been created, `None` is returned.
180fn maybe_get_trace_sender() -> Option<&'static broadcast::Sender<LogEvent>> {
181    SENDER.get()
182}
183
184/// Creates a trace receiver that receives internal log events.
185///
186/// This will create a trace sender if one did not already exist.
187fn get_trace_receiver() -> broadcast::Receiver<LogEvent> {
188    get_trace_sender().subscribe()
189}
190
191/// Gets a mutable reference to the list of waiting subscribers, if it exists.
192fn get_trace_subscriber_list() -> MutexGuard<'static, Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> {
193    SUBSCRIBERS.lock().expect("poisoned locks are dumb")
194}
195
196/// Attempts to register for early buffered events.
197///
198/// If early buffering has not yet been stopped, `Some(receiver)` is returned. The given receiver will resolve to a
199/// vector of all early buffered events once early buffering has been stopped. Otherwise, if early buffering is already
200/// stopped, `None` is returned.
201fn try_register_for_early_events() -> Option<oneshot::Receiver<Vec<LogEvent>>> {
202    if SHOULD_BUFFER.load(Ordering::Acquire) {
203        // We're still in early buffering mode. Attempt to subscribe by adding a oneshot sender
204        // to SUBSCRIBERS. If it's already been consumed, then we've gotten beaten out by a
205        // caller that is disabling early buffering, so we just go with the flow either way.
206        get_trace_subscriber_list().as_mut().map(|subscribers| {
207            let (tx, rx) = oneshot::channel();
208            subscribers.push(tx);
209            rx
210        })
211    } else {
212        // Early buffering is being or has been disabled, so we can no longer register.
213        None
214    }
215}
216
217/// Stops early buffering.
218///
219/// This flushes any buffered log events to waiting subscribers and redirects log events from the buffer to the
220/// broadcast stream.
221pub fn stop_early_buffering() {
222    // Try and disable early buffering.
223    //
224    // If it was already disabled, or we lost the race to disable it, just return.
225    if SHOULD_BUFFER
226        .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
227        .is_err()
228    {
229        return;
230    }
231
232    // We won the right to capture all buffered events and forward them to any waiting subscribers,
233    // so let's grab the subscriber list and see if there's actually anyone waiting.
234    let subscribers = get_trace_subscriber_list().take();
235    if let Some(subscribers_tx) = subscribers {
236        // Consume the early buffer, and send a copy of it to every waiting subscriber.
237        let buffered_events = consume_early_buffer();
238        for subscriber_tx in subscribers_tx {
239            // Ignore any errors sending since the caller may have dropped or something else.
240            _ = subscriber_tx.send(buffered_events.clone());
241        }
242    }
243}
244
245/// A subscription to the log events flowing in via `tracing`, in the Vector native format.
246///
247/// Used to capture tracing events from internal log telemetry, via `tracing`, and convert them to native Vector events,
248/// specifically `LogEvent`, such that they can be shuttled around and treated as normal events.  Currently only powers
249/// the `internal_logs` source, but could be used for other purposes if need be.
250pub struct TraceSubscription {
251    buffered_events_rx: Option<oneshot::Receiver<Vec<LogEvent>>>,
252    trace_rx: Receiver<LogEvent>,
253}
254
255impl TraceSubscription {
256    /// Registers a subscription to the internal log event stream.
257    pub fn subscribe() -> TraceSubscription {
258        let buffered_events_rx = try_register_for_early_events();
259        let trace_rx = get_trace_receiver();
260
261        Self {
262            buffered_events_rx,
263            trace_rx,
264        }
265    }
266
267    /// Gets any early buffered log events.
268    ///
269    /// If this subscription was registered after early buffering was turned off, `None` will be returned immediately.
270    /// Otherwise, waits for early buffering to be stopped and returns `Some(events)` where `events` contains all events
271    /// seen from the moment `tracing` was initialized to the moment early buffering was stopped.
272    pub async fn buffered_events(&mut self) -> Option<Vec<LogEvent>> {
273        // If we have a receiver for buffered events, and it returns them successfully, then pass
274        // them back.  We don't care if the sender drops in the meantime, so just swallow that error.
275        match self.buffered_events_rx.take() {
276            Some(rx) => rx.await.ok(),
277            None => None,
278        }
279    }
280
281    /// Converts this subscription into a raw stream of log events.
282    pub fn into_stream(self) -> impl Stream<Item = LogEvent> + Unpin {
283        // We ignore errors because the only error we get is when the broadcast receiver lags, and there's nothing we
284        // can actually do about that so there's no reason to force callers to even deal with it.
285        BroadcastStream::new(self.trace_rx).filter_map(|event| ready(event.ok()))
286    }
287}
288
289struct BroadcastLayer<S> {
290    _subscriber: PhantomData<S>,
291}
292
293impl<S> BroadcastLayer<S> {
294    const fn new() -> Self {
295        BroadcastLayer {
296            _subscriber: PhantomData,
297        }
298    }
299}
300
301impl<S> Layer<S> for BroadcastLayer<S>
302where
303    S: Subscriber + 'static + for<'lookup> LookupSpan<'lookup>,
304{
305    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
306        if should_process_tracing_event() {
307            let mut log = LogEvent::from(event);
308            // Add span fields if available
309            if let Some(parent_span) = ctx.event_span(event) {
310                for span in parent_span.scope().from_root() {
311                    if let Some(fields) = span.extensions().get::<SpanFields>() {
312                        for (k, v) in &fields.0 {
313                            log.insert(event_path!("vector", *k), v.clone());
314                        }
315                    }
316                }
317            }
318            // Try buffering the event, and if we're not buffering anymore, try to
319            // send it along via the trace sender if it's been established.
320            if !try_buffer_event(&log) {
321                try_broadcast_event(log);
322            }
323        }
324    }
325
326    fn on_new_span(
327        &self,
328        attrs: &tracing_core::span::Attributes<'_>,
329        id: &tracing_core::span::Id,
330        ctx: Context<'_, S>,
331    ) {
332        let span = ctx.span(id).expect("span must already exist!");
333        let mut fields = SpanFields::default();
334        attrs.values().record(&mut fields);
335        span.extensions_mut().insert(fields);
336    }
337}
338
339#[derive(Default, Debug)]
340struct SpanFields(HashMap<&'static str, Value>);
341
342impl SpanFields {
343    fn record(&mut self, field: &tracing_core::Field, value: impl Into<Value>) {
344        let name = field.name();
345        // Filter for span fields such as component_id, component_type, etc.
346        //
347        // This captures all the basic component information provided in the
348        // span that each component is spawned with. We don't capture all fields
349        // to avoid adding unintentional noise and to prevent accidental
350        // security/privacy issues (e.g. leaking sensitive data).
351        if name.starts_with("component_") {
352            self.0.insert(name, value.into());
353        }
354    }
355}
356
357impl tracing::field::Visit for SpanFields {
358    fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
359        self.record(field, value);
360    }
361
362    fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
363        self.record(field, value);
364    }
365
366    fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
367        self.record(field, value);
368    }
369
370    fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
371        self.record(field, value);
372    }
373
374    fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) {
375        self.record(field, format!("{value:?}"));
376    }
377}