vector/
trace.rs

1#![allow(missing_docs)]
2use std::{
3    collections::HashMap,
4    marker::PhantomData,
5    str::FromStr,
6    sync::{
7        atomic::{AtomicBool, Ordering},
8        Mutex, MutexGuard, OnceLock,
9    },
10};
11
12use futures_util::{future::ready, Stream, StreamExt};
13use metrics_tracing_context::MetricsLayer;
14use tokio::sync::{
15    broadcast::{self, Receiver, Sender},
16    oneshot,
17};
18use tokio_stream::wrappers::BroadcastStream;
19use tracing::{Event, Subscriber};
20use tracing_limit::RateLimitedLayer;
21use tracing_subscriber::{
22    filter::LevelFilter,
23    layer::{Context, SubscriberExt},
24    registry::LookupSpan,
25    util::SubscriberInitExt,
26    Layer,
27};
28pub use tracing_tower::{InstrumentableService, InstrumentedService};
29use vector_lib::lookup::event_path;
30use vrl::value::Value;
31
32use crate::event::LogEvent;
33
34/// BUFFER contains all of the internal log events generated by Vector between the initialization of `tracing` and early
35/// buffering being stopped, which occurs once the topology reports as having successfully started.
36///
37/// This means that callers must subscribe during the configuration phase of their components, and not in the core loop
38/// of the component, as the topology can only report when a component has been spawned, but not necessarily always
39/// when it has started doing, or waiting, for input.
40static BUFFER: Mutex<Option<Vec<LogEvent>>> = Mutex::new(Some(Vec::new()));
41
42/// SHOULD_BUFFER controls whether or not internal log events should be buffered or sent directly to the trace broadcast
43/// channel.
44static SHOULD_BUFFER: AtomicBool = AtomicBool::new(true);
45
46/// SUBSCRIBERS contains a list of callers interested in internal log events who will be notified when early buffering
47/// is disabled, by receiving a copy of all buffered internal log events.
48static SUBSCRIBERS: Mutex<Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> =
49    Mutex::new(Some(Vec::new()));
50
51/// SENDER holds the sender/receiver handle that will receive a copy of all the internal log events *after* the topology
52/// has been initialized.
53static SENDER: OnceLock<Sender<LogEvent>> = OnceLock::new();
54
55fn metrics_layer_enabled() -> bool {
56    !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
57}
58
59pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64) {
60    let fmt_filter = tracing_subscriber::filter::Targets::from_str(levels).expect(
61        "logging filter targets were not formatted correctly or did not specify a valid level",
62    );
63
64    let metrics_layer =
65        metrics_layer_enabled().then(|| MetricsLayer::new().with_filter(LevelFilter::INFO));
66
67    let broadcast_layer = RateLimitedLayer::new(BroadcastLayer::new())
68        .with_default_limit(internal_log_rate_limit)
69        .with_filter(fmt_filter.clone());
70
71    let subscriber = tracing_subscriber::registry()
72        .with(metrics_layer)
73        .with(broadcast_layer);
74
75    #[cfg(feature = "tokio-console")]
76    let subscriber = {
77        let console_layer = console_subscriber::ConsoleLayer::builder()
78            .with_default_env()
79            .spawn();
80
81        subscriber.with(console_layer)
82    };
83
84    #[cfg(feature = "allocation-tracing")]
85    let subscriber = {
86        let allocation_layer = crate::internal_telemetry::allocations::AllocationLayer::new()
87            .with_filter(LevelFilter::ERROR);
88
89        subscriber.with(allocation_layer)
90    };
91
92    if json {
93        let formatter = tracing_subscriber::fmt::layer().json().flatten_event(true);
94
95        #[cfg(test)]
96        let formatter = formatter.with_test_writer();
97
98        let rate_limited =
99            RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
100        let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
101
102        _ = subscriber.try_init();
103    } else {
104        let formatter = tracing_subscriber::fmt::layer()
105            .with_ansi(color)
106            .with_writer(std::io::stderr);
107
108        #[cfg(test)]
109        let formatter = formatter.with_test_writer();
110
111        let rate_limited =
112            RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
113        let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
114
115        _ = subscriber.try_init();
116    }
117}
118
119#[cfg(test)]
120pub fn reset_early_buffer() -> Option<Vec<LogEvent>> {
121    get_early_buffer().replace(Vec::new())
122}
123
124/// Gets a  mutable reference to the early buffer.
125fn get_early_buffer() -> MutexGuard<'static, Option<Vec<LogEvent>>> {
126    BUFFER
127        .lock()
128        .expect("Couldn't acquire lock on internal logs buffer")
129}
130
131/// Determines whether tracing events should be processed (e.g. converted to log
132/// events) to avoid unnecessary performance overhead.
133///
134/// Checks if [`BUFFER`] is set or if a trace sender exists
135fn should_process_tracing_event() -> bool {
136    get_early_buffer().is_some() || maybe_get_trace_sender().is_some()
137}
138
139/// Attempts to buffer an event into the early buffer.
140fn try_buffer_event(log: &LogEvent) -> bool {
141    if SHOULD_BUFFER.load(Ordering::Acquire) {
142        if let Some(buffer) = get_early_buffer().as_mut() {
143            buffer.push(log.clone());
144            return true;
145        }
146    }
147
148    false
149}
150
151/// Attempts to broadcast an event to subscribers.
152///
153/// If no subscribers are connected, this does nothing.
154fn try_broadcast_event(log: LogEvent) {
155    if let Some(sender) = maybe_get_trace_sender() {
156        _ = sender.send(log);
157    }
158}
159
160/// Consumes the early buffered events.
161///
162/// # Panics
163///
164/// If the early buffered events have already been consumed, this function will panic.
165fn consume_early_buffer() -> Vec<LogEvent> {
166    get_early_buffer()
167        .take()
168        .expect("early buffer was already consumed")
169}
170
171/// Gets or creates a trace sender for sending internal log events.
172fn get_trace_sender() -> &'static broadcast::Sender<LogEvent> {
173    SENDER.get_or_init(|| broadcast::channel(99).0)
174}
175
176/// Attempts to get the trace sender for sending internal log events.
177///
178/// If the trace sender has not yet been created, `None` is returned.
179fn maybe_get_trace_sender() -> Option<&'static broadcast::Sender<LogEvent>> {
180    SENDER.get()
181}
182
183/// Creates a trace receiver that receives internal log events.
184///
185/// This will create a trace sender if one did not already exist.
186fn get_trace_receiver() -> broadcast::Receiver<LogEvent> {
187    get_trace_sender().subscribe()
188}
189
190/// Gets a mutable reference to the list of waiting subscribers, if it exists.
191fn get_trace_subscriber_list() -> MutexGuard<'static, Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> {
192    SUBSCRIBERS.lock().expect("poisoned locks are dumb")
193}
194
195/// Attempts to register for early buffered events.
196///
197/// If early buffering has not yet been stopped, `Some(receiver)` is returned. The given receiver will resolve to a
198/// vector of all early buffered events once early buffering has been stopped. Otherwise, if early buffering is already
199/// stopped, `None` is returned.
200fn try_register_for_early_events() -> Option<oneshot::Receiver<Vec<LogEvent>>> {
201    if SHOULD_BUFFER.load(Ordering::Acquire) {
202        // We're still in early buffering mode. Attempt to subscribe by adding a oneshot sender
203        // to SUBSCRIBERS. If it's already been consumed, then we've gotten beaten out by a
204        // caller that is disabling early buffering, so we just go with the flow either way.
205        get_trace_subscriber_list().as_mut().map(|subscribers| {
206            let (tx, rx) = oneshot::channel();
207            subscribers.push(tx);
208            rx
209        })
210    } else {
211        // Early buffering is being or has been disabled, so we can no longer register.
212        None
213    }
214}
215
216/// Stops early buffering.
217///
218/// This flushes any buffered log events to waiting subscribers and redirects log events from the buffer to the
219/// broadcast stream.
220pub fn stop_early_buffering() {
221    // Try and disable early buffering.
222    //
223    // If it was already disabled, or we lost the race to disable it, just return.
224    if SHOULD_BUFFER
225        .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
226        .is_err()
227    {
228        return;
229    }
230
231    // We won the right to capture all buffered events and forward them to any waiting subscribers,
232    // so let's grab the subscriber list and see if there's actually anyone waiting.
233    let subscribers = get_trace_subscriber_list().take();
234    if let Some(subscribers_tx) = subscribers {
235        // Consume the early buffer, and send a copy of it to every waiting subscriber.
236        let buffered_events = consume_early_buffer();
237        for subscriber_tx in subscribers_tx {
238            // Ignore any errors sending since the caller may have dropped or something else.
239            _ = subscriber_tx.send(buffered_events.clone());
240        }
241    }
242}
243
244/// A subscription to the log events flowing in via `tracing`, in the Vector native format.
245///
246/// Used to capture tracing events from internal log telemetry, via `tracing`, and convert them to native Vector events,
247/// specifically `LogEvent`, such that they can be shuttled around and treated as normal events.  Currently only powers
248/// the `internal_logs` source, but could be used for other purposes if need be.
249pub struct TraceSubscription {
250    buffered_events_rx: Option<oneshot::Receiver<Vec<LogEvent>>>,
251    trace_rx: Receiver<LogEvent>,
252}
253
254impl TraceSubscription {
255    /// Registers a subscription to the internal log event stream.
256    pub fn subscribe() -> TraceSubscription {
257        let buffered_events_rx = try_register_for_early_events();
258        let trace_rx = get_trace_receiver();
259
260        Self {
261            buffered_events_rx,
262            trace_rx,
263        }
264    }
265
266    /// Gets any early buffered log events.
267    ///
268    /// If this subscription was registered after early buffering was turned off, `None` will be returned immediately.
269    /// Otherwise, waits for early buffering to be stopped and returns `Some(events)` where `events` contains all events
270    /// seen from the moment `tracing` was initialized to the moment early buffering was stopped.
271    pub async fn buffered_events(&mut self) -> Option<Vec<LogEvent>> {
272        // If we have a receiver for buffered events, and it returns them successfully, then pass
273        // them back.  We don't care if the sender drops in the meantime, so just swallow that error.
274        match self.buffered_events_rx.take() {
275            Some(rx) => rx.await.ok(),
276            None => None,
277        }
278    }
279
280    /// Converts this subscription into a raw stream of log events.
281    pub fn into_stream(self) -> impl Stream<Item = LogEvent> + Unpin {
282        // We ignore errors because the only error we get is when the broadcast receiver lags, and there's nothing we
283        // can actually do about that so there's no reason to force callers to even deal with it.
284        BroadcastStream::new(self.trace_rx).filter_map(|event| ready(event.ok()))
285    }
286}
287
288struct BroadcastLayer<S> {
289    _subscriber: PhantomData<S>,
290}
291
292impl<S> BroadcastLayer<S> {
293    const fn new() -> Self {
294        BroadcastLayer {
295            _subscriber: PhantomData,
296        }
297    }
298}
299
300impl<S> Layer<S> for BroadcastLayer<S>
301where
302    S: Subscriber + 'static + for<'lookup> LookupSpan<'lookup>,
303{
304    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
305        if should_process_tracing_event() {
306            let mut log = LogEvent::from(event);
307            // Add span fields if available
308            if let Some(parent_span) = ctx.event_span(event) {
309                for span in parent_span.scope().from_root() {
310                    if let Some(fields) = span.extensions().get::<SpanFields>() {
311                        for (k, v) in &fields.0 {
312                            log.insert(event_path!("vector", *k), v.clone());
313                        }
314                    }
315                }
316            }
317            // Try buffering the event, and if we're not buffering anymore, try to
318            // send it along via the trace sender if it's been established.
319            if !try_buffer_event(&log) {
320                try_broadcast_event(log);
321            }
322        }
323    }
324
325    fn on_new_span(
326        &self,
327        attrs: &tracing_core::span::Attributes<'_>,
328        id: &tracing_core::span::Id,
329        ctx: Context<'_, S>,
330    ) {
331        let span = ctx.span(id).expect("span must already exist!");
332        let mut fields = SpanFields::default();
333        attrs.values().record(&mut fields);
334        span.extensions_mut().insert(fields);
335    }
336}
337
338#[derive(Default, Debug)]
339struct SpanFields(HashMap<&'static str, Value>);
340
341impl SpanFields {
342    fn record(&mut self, field: &tracing_core::Field, value: impl Into<Value>) {
343        let name = field.name();
344        // Filter for span fields such as component_id, component_type, etc.
345        //
346        // This captures all the basic component information provided in the
347        // span that each component is spawned with. We don't capture all fields
348        // to avoid adding unintentional noise and to prevent accidental
349        // security/privacy issues (e.g. leaking sensitive data).
350        if name.starts_with("component_") {
351            self.0.insert(name, value.into());
352        }
353    }
354}
355
356impl tracing::field::Visit for SpanFields {
357    fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
358        self.record(field, value);
359    }
360
361    fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
362        self.record(field, value);
363    }
364
365    fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
366        self.record(field, value);
367    }
368
369    fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
370        self.record(field, value);
371    }
372
373    fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) {
374        self.record(field, format!("{value:?}"));
375    }
376}