1#![allow(missing_docs)]
2use std::{
3 collections::HashMap,
4 marker::PhantomData,
5 str::FromStr,
6 sync::{
7 Mutex, MutexGuard, OnceLock,
8 atomic::{AtomicBool, Ordering},
9 },
10};
11
12use futures_util::{Stream, StreamExt, future::ready};
13use metrics_tracing_context::MetricsLayer;
14use tokio::sync::{
15 broadcast::{self, Receiver, Sender},
16 oneshot,
17};
18use tokio_stream::wrappers::BroadcastStream;
19use tracing::{Event, Subscriber};
20use tracing_limit::RateLimitedLayer;
21use tracing_subscriber::{
22 Layer,
23 filter::LevelFilter,
24 layer::{Context, SubscriberExt},
25 registry::LookupSpan,
26 util::SubscriberInitExt,
27};
28pub use tracing_tower::{InstrumentableService, InstrumentedService};
29use vector_lib::lookup::event_path;
30use vrl::value::Value;
31
32use crate::event::LogEvent;
33
34static BUFFER: Mutex<Option<Vec<LogEvent>>> = Mutex::new(Some(Vec::new()));
41
42static SHOULD_BUFFER: AtomicBool = AtomicBool::new(true);
45
46static SUBSCRIBERS: Mutex<Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> =
49 Mutex::new(Some(Vec::new()));
50
51static SENDER: OnceLock<Sender<LogEvent>> = OnceLock::new();
54
55fn metrics_layer_enabled() -> bool {
56 !matches!(std::env::var("DISABLE_INTERNAL_METRICS_TRACING_INTEGRATION"), Ok(x) if x == "true")
57}
58
59pub fn init(color: bool, json: bool, levels: &str, internal_log_rate_limit: u64) {
60 let fmt_filter = tracing_subscriber::filter::Targets::from_str(levels).expect(
61 "logging filter targets were not formatted correctly or did not specify a valid level",
62 );
63
64 let metrics_layer =
65 metrics_layer_enabled().then(|| MetricsLayer::new().with_filter(LevelFilter::INFO));
66
67 let broadcast_layer = BroadcastLayer::new().with_filter(fmt_filter.clone());
71
72 let subscriber = tracing_subscriber::registry()
73 .with(metrics_layer)
74 .with(broadcast_layer);
75
76 #[cfg(feature = "tokio-console")]
77 let subscriber = {
78 let console_layer = console_subscriber::ConsoleLayer::builder()
79 .with_default_env()
80 .spawn();
81
82 subscriber.with(console_layer)
83 };
84
85 #[cfg(feature = "allocation-tracing")]
86 let subscriber = {
87 let allocation_layer = crate::internal_telemetry::allocations::AllocationLayer::new()
88 .with_filter(LevelFilter::ERROR);
89
90 subscriber.with(allocation_layer)
91 };
92
93 if json {
94 let formatter = tracing_subscriber::fmt::layer().json().flatten_event(true);
95
96 #[cfg(test)]
97 let formatter = formatter.with_test_writer();
98
99 let rate_limited =
100 RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
101 let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
102
103 _ = subscriber.try_init();
104 } else {
105 let formatter = tracing_subscriber::fmt::layer()
106 .with_ansi(color)
107 .with_writer(std::io::stderr);
108
109 #[cfg(test)]
110 let formatter = formatter.with_test_writer();
111
112 let rate_limited =
113 RateLimitedLayer::new(formatter).with_default_limit(internal_log_rate_limit);
114 let subscriber = subscriber.with(rate_limited.with_filter(fmt_filter));
115
116 _ = subscriber.try_init();
117 }
118}
119
120#[cfg(test)]
121pub fn reset_early_buffer() -> Option<Vec<LogEvent>> {
122 get_early_buffer().replace(Vec::new())
123}
124
125fn get_early_buffer() -> MutexGuard<'static, Option<Vec<LogEvent>>> {
127 BUFFER
128 .lock()
129 .expect("Couldn't acquire lock on internal logs buffer")
130}
131
132fn should_process_tracing_event() -> bool {
137 get_early_buffer().is_some() || maybe_get_trace_sender().is_some()
138}
139
140fn try_buffer_event(log: &LogEvent) -> bool {
142 if SHOULD_BUFFER.load(Ordering::Acquire)
143 && let Some(buffer) = get_early_buffer().as_mut()
144 {
145 buffer.push(log.clone());
146 return true;
147 }
148
149 false
150}
151
152fn try_broadcast_event(log: LogEvent) {
156 if let Some(sender) = maybe_get_trace_sender() {
157 _ = sender.send(log);
158 }
159}
160
161fn consume_early_buffer() -> Vec<LogEvent> {
167 get_early_buffer()
168 .take()
169 .expect("early buffer was already consumed")
170}
171
172fn get_trace_sender() -> &'static broadcast::Sender<LogEvent> {
174 SENDER.get_or_init(|| broadcast::channel(99).0)
175}
176
177fn maybe_get_trace_sender() -> Option<&'static broadcast::Sender<LogEvent>> {
181 SENDER.get()
182}
183
184fn get_trace_receiver() -> broadcast::Receiver<LogEvent> {
188 get_trace_sender().subscribe()
189}
190
191fn get_trace_subscriber_list() -> MutexGuard<'static, Option<Vec<oneshot::Sender<Vec<LogEvent>>>>> {
193 SUBSCRIBERS.lock().expect("poisoned locks are dumb")
194}
195
196fn try_register_for_early_events() -> Option<oneshot::Receiver<Vec<LogEvent>>> {
202 if SHOULD_BUFFER.load(Ordering::Acquire) {
203 get_trace_subscriber_list().as_mut().map(|subscribers| {
207 let (tx, rx) = oneshot::channel();
208 subscribers.push(tx);
209 rx
210 })
211 } else {
212 None
214 }
215}
216
217pub fn stop_early_buffering() {
222 if SHOULD_BUFFER
226 .compare_exchange(true, false, Ordering::SeqCst, Ordering::SeqCst)
227 .is_err()
228 {
229 return;
230 }
231
232 let subscribers = get_trace_subscriber_list().take();
235 if let Some(subscribers_tx) = subscribers {
236 let buffered_events = consume_early_buffer();
238 for subscriber_tx in subscribers_tx {
239 _ = subscriber_tx.send(buffered_events.clone());
241 }
242 }
243}
244
245pub struct TraceSubscription {
251 buffered_events_rx: Option<oneshot::Receiver<Vec<LogEvent>>>,
252 trace_rx: Receiver<LogEvent>,
253}
254
255impl TraceSubscription {
256 pub fn subscribe() -> TraceSubscription {
258 let buffered_events_rx = try_register_for_early_events();
259 let trace_rx = get_trace_receiver();
260
261 Self {
262 buffered_events_rx,
263 trace_rx,
264 }
265 }
266
267 pub async fn buffered_events(&mut self) -> Option<Vec<LogEvent>> {
273 match self.buffered_events_rx.take() {
276 Some(rx) => rx.await.ok(),
277 None => None,
278 }
279 }
280
281 pub fn into_stream(self) -> impl Stream<Item = LogEvent> + Unpin {
283 BroadcastStream::new(self.trace_rx).filter_map(|event| ready(event.ok()))
286 }
287}
288
289struct BroadcastLayer<S> {
290 _subscriber: PhantomData<S>,
291}
292
293impl<S> BroadcastLayer<S> {
294 const fn new() -> Self {
295 BroadcastLayer {
296 _subscriber: PhantomData,
297 }
298 }
299}
300
301impl<S> Layer<S> for BroadcastLayer<S>
302where
303 S: Subscriber + 'static + for<'lookup> LookupSpan<'lookup>,
304{
305 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
306 if should_process_tracing_event() {
307 let mut log = LogEvent::from(event);
308 if let Some(parent_span) = ctx.event_span(event) {
310 for span in parent_span.scope().from_root() {
311 if let Some(fields) = span.extensions().get::<SpanFields>() {
312 for (k, v) in &fields.0 {
313 log.insert(event_path!("vector", *k), v.clone());
314 }
315 }
316 }
317 }
318 if !try_buffer_event(&log) {
321 try_broadcast_event(log);
322 }
323 }
324 }
325
326 fn on_new_span(
327 &self,
328 attrs: &tracing_core::span::Attributes<'_>,
329 id: &tracing_core::span::Id,
330 ctx: Context<'_, S>,
331 ) {
332 let span = ctx.span(id).expect("span must already exist!");
333 let mut fields = SpanFields::default();
334 attrs.values().record(&mut fields);
335 span.extensions_mut().insert(fields);
336 }
337}
338
339#[derive(Default, Debug)]
340struct SpanFields(HashMap<&'static str, Value>);
341
342impl SpanFields {
343 fn record(&mut self, field: &tracing_core::Field, value: impl Into<Value>) {
344 let name = field.name();
345 if name.starts_with("component_") {
352 self.0.insert(name, value.into());
353 }
354 }
355}
356
357impl tracing::field::Visit for SpanFields {
358 fn record_i64(&mut self, field: &tracing_core::Field, value: i64) {
359 self.record(field, value);
360 }
361
362 fn record_u64(&mut self, field: &tracing_core::Field, value: u64) {
363 self.record(field, value);
364 }
365
366 fn record_bool(&mut self, field: &tracing_core::Field, value: bool) {
367 self.record(field, value);
368 }
369
370 fn record_str(&mut self, field: &tracing_core::Field, value: &str) {
371 self.record(field, value);
372 }
373
374 fn record_debug(&mut self, field: &tracing_core::Field, value: &dyn std::fmt::Debug) {
375 self.record(field, format!("{value:?}"));
376 }
377}