tracing_limit/
lib.rs

1#![deny(warnings)]
2//! Rate limiting for tracing events.
3//!
4//! This crate provides a tracing-subscriber layer that rate limits log events to prevent
5//! log flooding. Events are grouped by their callsite and contextual fields, with each
6//! unique combination rate limited independently.
7//!
8//! # How it works
9//!
10//! Within each rate limit window (default 10 seconds):
11//! - **1st occurrence**: Event is emitted normally
12//! - **2nd occurrence**: Emits a "suppressing" warning
13//! - **3rd+ occurrences**: Silent until window expires
14//! - **After window**: Emits a summary of suppressed count, then next event normally
15//!
16//! # Rate limit grouping
17//!
18//! Events are rate limited independently based on a combination of:
19//! - **Callsite**: The code location where the log statement appears
20//! - **Contextual fields**: Any fields attached to the event or its parent spans
21//!
22//! ## How fields contribute to grouping
23//!
24//! **Only these fields create distinct rate limit groups:**
25//! - `component_id` - Different components are rate limited independently
26//!
27//! **All other fields are ignored for grouping**, including:
28//! - `fanout_id`, `input_id`, `output_id` - Not used for grouping to avoid resource/cost implications from high-cardinality tags
29//! - `message` - The log message itself doesn't differentiate groups
30//! - `internal_log_rate_limit` - Control field for enabling/disabling rate limiting
31//! - `internal_log_rate_secs` - Control field for customizing the rate limit window
32//! - Any custom fields you add
33//!
34//! ## Examples
35//!
36//! ```rust,ignore
37//! // Example 1: Different component_id values create separate rate limit groups
38//! info!(component_id = "transform_1", "Processing event");  // Group A
39//! info!(component_id = "transform_2", "Processing event");  // Group B
40//! // Even though the message is identical, these are rate limited independently
41//!
42//! // Example 2: Only component_id matters for grouping
43//! info!(component_id = "router", fanout_id = "output_1", "Routing event");  // Group C
44//! info!(component_id = "router", fanout_id = "output_2", "Routing event");  // Group C (same group!)
45//! info!(component_id = "router", fanout_id = "output_1", "Routing event");  // Group C (same group!)
46//! info!(component_id = "router", fanout_id = "output_1", input_id = "kafka", "Routing event");  // Group C (same!)
47//! // All of these share the same group because they have the same component_id
48//! // The fanout_id and input_id fields are ignored to avoid resource/cost implications
49//!
50//! // Example 3: Span fields contribute to grouping
51//! let span = info_span!("process", component_id = "transform_1");
52//! let _enter = span.enter();
53//! info!("Processing event");  // Group E: callsite + component_id from span
54//! drop(_enter);
55//!
56//! let span = info_span!("process", component_id = "transform_2");
57//! let _enter = span.enter();
58//! info!("Processing event");  // Group F: same callsite but different component_id
59//!
60//! // Example 4: Nested spans - child span fields take precedence
61//! let outer = info_span!("outer", component_id = "parent");
62//! let _outer_guard = outer.enter();
63//! let inner = info_span!("inner", component_id = "child");
64//! let _inner_guard = inner.enter();
65//! info!("Nested event");  // Grouped by component_id = "child"
66//!
67//! // Example 5: Same callsite with no fields = single rate limit group
68//! info!("Simple message");  // Group G
69//! info!("Simple message");  // Group G
70//! info!("Simple message");  // Group G
71//!
72//! // Example 6: Custom fields are ignored for grouping
73//! info!(component_id = "source", input_id = "in_1", "Received data");  // Group H
74//! info!(component_id = "source", input_id = "in_2", "Received data");  // Group H (same group!)
75//! // The input_id field is ignored - only component_id matters
76//!
77//! // Example 7: Disabling rate limiting for specific logs
78//! // Rate limiting is ON by default - explicitly disable for important logs
79//! warn!(
80//!     component_id = "critical_component",
81//!     message = "Fatal error occurred",
82//!     internal_log_rate_limit = false
83//! );
84//! // This event will NEVER be rate limited, regardless of how often it fires
85//!
86//! // Example 8: Custom rate limit window for specific events
87//! info!(
88//!     component_id = "noisy_component",
89//!     message = "Frequent status update",
90//!     internal_log_rate_secs = 60  // Only log once per minute
91//! );
92//! // Override the default window for this specific log
93//! ```
94//!
95//! This ensures logs from different components are rate limited independently,
96//! while avoiding resource/cost implications from high-cardinality tags.
97
98use std::fmt;
99
100use dashmap::DashMap;
101use tracing_core::{
102    Event, Metadata, Subscriber,
103    callsite::Identifier,
104    field::{Field, Value, Visit, display},
105    span,
106    subscriber::Interest,
107};
108use tracing_subscriber::layer::{Context, Layer};
109
110#[cfg(test)]
111#[macro_use]
112extern crate tracing;
113
114#[cfg(not(test))]
115use std::time::Instant;
116
117#[cfg(test)]
118use mock_instant::global::Instant;
119
120const RATE_LIMIT_FIELD: &str = "internal_log_rate_limit";
121const RATE_LIMIT_SECS_FIELD: &str = "internal_log_rate_secs";
122const MESSAGE_FIELD: &str = "message";
123
124// These fields will cause events to be independently rate limited by the values
125// for these keys
126const COMPONENT_ID_FIELD: &str = "component_id";
127
128#[derive(Eq, PartialEq, Hash, Clone)]
129struct RateKeyIdentifier {
130    callsite: Identifier,
131    rate_limit_key_values: RateLimitedSpanKeys,
132}
133
134pub struct RateLimitedLayer<S, L>
135where
136    L: Layer<S> + Sized,
137    S: Subscriber,
138{
139    events: DashMap<RateKeyIdentifier, State>,
140    inner: L,
141    internal_log_rate_limit: u64,
142    _subscriber: std::marker::PhantomData<S>,
143}
144
145impl<S, L> RateLimitedLayer<S, L>
146where
147    L: Layer<S> + Sized,
148    S: Subscriber,
149{
150    pub fn new(layer: L) -> Self {
151        RateLimitedLayer {
152            events: Default::default(),
153            internal_log_rate_limit: 10,
154            inner: layer,
155            _subscriber: std::marker::PhantomData,
156        }
157    }
158
159    /// Sets the default rate limit window in seconds.
160    ///
161    /// This controls how long logs are suppressed before they can be emitted again.
162    /// Within each window:
163    /// - 1st occurrence: Emitted normally
164    /// - 2nd occurrence: Shows "suppressing" warning
165    /// - 3rd+ occurrences: Silent until window expires
166    pub fn with_default_limit(mut self, internal_log_rate_limit: u64) -> Self {
167        self.internal_log_rate_limit = internal_log_rate_limit;
168        self
169    }
170}
171
172impl<S, L> Layer<S> for RateLimitedLayer<S, L>
173where
174    L: Layer<S>,
175    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
176{
177    #[inline]
178    fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
179        self.inner.register_callsite(metadata)
180    }
181
182    #[inline]
183    fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
184        self.inner.enabled(metadata, ctx)
185    }
186
187    // keep track of any span fields we use for grouping rate limiting
188    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
189        {
190            let span = ctx.span(id).expect("Span not found, this is a bug");
191            let mut extensions = span.extensions_mut();
192
193            if extensions.get_mut::<RateLimitedSpanKeys>().is_none() {
194                let mut fields = RateLimitedSpanKeys::default();
195                attrs.record(&mut fields);
196                extensions.insert(fields);
197            };
198        }
199        self.inner.on_new_span(attrs, id, ctx);
200    }
201
202    // keep track of any span fields we use for grouping rate limiting
203    fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
204        {
205            let span = ctx.span(id).expect("Span not found, this is a bug");
206            let mut extensions = span.extensions_mut();
207
208            match extensions.get_mut::<RateLimitedSpanKeys>() {
209                Some(fields) => {
210                    values.record(fields);
211                }
212                None => {
213                    let mut fields = RateLimitedSpanKeys::default();
214                    values.record(&mut fields);
215                    extensions.insert(fields);
216                }
217            };
218        }
219        self.inner.on_record(id, values, ctx);
220    }
221
222    #[inline]
223    fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, S>) {
224        self.inner.on_follows_from(span, follows, ctx);
225    }
226
227    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
228        // Visit the event, grabbing the limit status if one is defined. Rate limiting is ON by default
229        // unless explicitly disabled by setting `internal_log_rate_limit = false`.
230        let mut limit_visitor = LimitVisitor::default();
231        event.record(&mut limit_visitor);
232
233        let limit_exists = limit_visitor.limit.unwrap_or(true);
234        if !limit_exists {
235            return self.inner.on_event(event, ctx);
236        }
237
238        let limit = match limit_visitor.limit_secs {
239            Some(limit_secs) => limit_secs, // override the cli limit
240            None => self.internal_log_rate_limit,
241        };
242
243        // Build a composite key from event fields and span context to determine the rate limit group.
244        // This multi-step process ensures we capture all relevant contextual information:
245        //
246        // 1. Start with event-level fields (e.g., fields directly on the log macro call)
247        // 2. Walk up the span hierarchy from root to current span
248        // 3. Merge in fields from each span, with child spans taking precedence
249        //
250        // This means an event's rate limit group is determined by the combination of:
251        // - Its callsite (handled separately via RateKeyIdentifier)
252        // - All contextual fields from both the event and its span ancestry
253        //
254        // Example: The same `info!("msg")` callsite in different component contexts becomes
255        // distinct rate limit groups, allowing fine-grained control over log flooding.
256        let rate_limit_key_values = {
257            let mut keys = RateLimitedSpanKeys::default();
258            // Capture fields directly on this event
259            event.record(&mut keys);
260
261            // Walk span hierarchy and merge in contextual fields
262            ctx.lookup_current()
263                .into_iter()
264                .flat_map(|span| span.scope().from_root())
265                .fold(keys, |mut keys, span| {
266                    let extensions = span.extensions();
267                    if let Some(span_keys) = extensions.get::<RateLimitedSpanKeys>() {
268                        keys.merge(span_keys);
269                    }
270                    keys
271                })
272        };
273
274        // Build the key to represent this event, given its span fields, and see if we're already rate limiting it. If
275        // not, we'll initialize an entry for it.
276        let metadata = event.metadata();
277        let id = RateKeyIdentifier {
278            callsite: metadata.callsite(),
279            rate_limit_key_values,
280        };
281
282        let mut state = self.events.entry(id).or_insert_with(|| {
283            let mut message_visitor = MessageVisitor::default();
284            event.record(&mut message_visitor);
285
286            let message = message_visitor
287                .message
288                .unwrap_or_else(|| metadata.name().into());
289
290            State::new(message, limit)
291        });
292
293        // Update our suppressed state for this event, and see if we should still be suppressing it.
294        //
295        // When this is the first time seeing the event, we emit it like we normally would. The second time we see it in
296        // the limit period, we emit a new event to indicate that the original event is being actively suppressed.
297        // Otherwise, we don't emit anything.
298        let previous_count = state.increment_count();
299        if state.should_limit() {
300            match previous_count {
301                0 => self.inner.on_event(event, ctx),
302                1 => {
303                    let message = format!(
304                        "Internal log [{}] is being suppressed to avoid flooding.",
305                        state.message
306                    );
307                    self.create_event(&ctx, metadata, message, state.limit);
308                }
309                _ => {}
310            }
311        } else {
312            // If we saw this event 3 or more times total, emit an event that indicates the total number of times we
313            // suppressed the event in the limit period.
314            if previous_count > 1 {
315                let message = format!(
316                    "Internal log [{}] has been suppressed {} times.",
317                    state.message,
318                    previous_count - 1
319                );
320
321                self.create_event(&ctx, metadata, message, state.limit);
322            }
323
324            // We're not suppressing anymore, so we also emit the current event as normal.. but we update our rate
325            // limiting state since this is effectively equivalent to seeing the event again for the first time.
326            self.inner.on_event(event, ctx);
327
328            state.reset();
329        }
330    }
331
332    #[inline]
333    fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) {
334        self.inner.on_enter(id, ctx);
335    }
336
337    #[inline]
338    fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
339        self.inner.on_exit(id, ctx);
340    }
341
342    #[inline]
343    fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {
344        self.inner.on_close(id, ctx);
345    }
346
347    #[inline]
348    fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, S>) {
349        self.inner.on_id_change(old, new, ctx);
350    }
351
352    #[inline]
353    fn on_layer(&mut self, subscriber: &mut S) {
354        self.inner.on_layer(subscriber);
355    }
356}
357
358impl<S, L> RateLimitedLayer<S, L>
359where
360    S: Subscriber,
361    L: Layer<S>,
362{
363    fn create_event(
364        &self,
365        ctx: &Context<S>,
366        metadata: &'static Metadata<'static>,
367        message: String,
368        rate_limit: u64,
369    ) {
370        let fields = metadata.fields();
371
372        let message = display(message);
373
374        if let Some(message_field) = fields.field("message") {
375            let values = [(&message_field, Some(&message as &dyn Value))];
376
377            let valueset = fields.value_set(&values);
378            let event = Event::new(metadata, &valueset);
379            self.inner.on_event(&event, ctx.clone());
380        } else if let Some(rate_limit_field) = fields.field(RATE_LIMIT_FIELD) {
381            let values = [(&rate_limit_field, Some(&rate_limit as &dyn Value))];
382
383            let valueset = fields.value_set(&values);
384            let event = Event::new(metadata, &valueset);
385            self.inner.on_event(&event, ctx.clone());
386        } else {
387            // If the event metadata has neither a "message" nor "internal_log_rate_limit" field,
388            // we cannot create a proper synthetic event. This can happen with custom debug events
389            // that have their own field structure. In this case, we simply skip emitting the
390            // rate limit notification rather than panicking.
391        }
392    }
393}
394
395#[derive(Debug)]
396struct State {
397    start: Instant,
398    count: u64,
399    limit: u64,
400    message: String,
401}
402
403impl State {
404    fn new(message: String, limit: u64) -> Self {
405        Self {
406            start: Instant::now(),
407            count: 0,
408            limit,
409            message,
410        }
411    }
412
413    fn reset(&mut self) {
414        self.start = Instant::now();
415        self.count = 1;
416    }
417
418    fn increment_count(&mut self) -> u64 {
419        let prev = self.count;
420        self.count += 1;
421        prev
422    }
423
424    fn should_limit(&self) -> bool {
425        self.start.elapsed().as_secs() < self.limit
426    }
427}
428
429#[derive(PartialEq, Eq, Clone, Hash)]
430enum TraceValue {
431    String(String),
432    Int(i64),
433    Uint(u64),
434    Bool(bool),
435}
436
437#[cfg(test)]
438impl fmt::Display for TraceValue {
439    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
440        match self {
441            TraceValue::String(s) => write!(f, "{}", s),
442            TraceValue::Int(i) => write!(f, "{}", i),
443            TraceValue::Uint(u) => write!(f, "{}", u),
444            TraceValue::Bool(b) => write!(f, "{}", b),
445        }
446    }
447}
448
449impl From<bool> for TraceValue {
450    fn from(b: bool) -> Self {
451        TraceValue::Bool(b)
452    }
453}
454
455impl From<i64> for TraceValue {
456    fn from(i: i64) -> Self {
457        TraceValue::Int(i)
458    }
459}
460
461impl From<u64> for TraceValue {
462    fn from(u: u64) -> Self {
463        TraceValue::Uint(u)
464    }
465}
466
467impl From<String> for TraceValue {
468    fn from(s: String) -> Self {
469        TraceValue::String(s)
470    }
471}
472
473/// RateLimitedSpanKeys records span and event fields that differentiate rate limit groups.
474///
475/// This struct is used to build a composite key that uniquely identifies a rate limit bucket.
476/// Events with different field values will be rate limited independently, even if they come
477/// from the same callsite.
478///
479/// ## Field categories:
480///
481/// **Tracked fields** (only these create distinct rate limit groups):
482/// - `component_id` - Different components are rate limited independently
483///
484/// **Ignored fields**: All other fields are ignored for grouping purposes. This avoids resource/cost implications from high-cardinality tags.
485/// ```
486#[derive(Default, Eq, PartialEq, Hash, Clone)]
487struct RateLimitedSpanKeys {
488    component_id: Option<TraceValue>,
489}
490
491impl RateLimitedSpanKeys {
492    fn record(&mut self, field: &Field, value: TraceValue) {
493        if field.name() == COMPONENT_ID_FIELD {
494            self.component_id = Some(value);
495        }
496    }
497
498    fn merge(&mut self, other: &Self) {
499        if let Some(component_id) = &other.component_id {
500            self.component_id = Some(component_id.clone());
501        }
502    }
503}
504
505impl Visit for RateLimitedSpanKeys {
506    fn record_i64(&mut self, field: &Field, value: i64) {
507        self.record(field, value.into());
508    }
509
510    fn record_u64(&mut self, field: &Field, value: u64) {
511        self.record(field, value.into());
512    }
513
514    fn record_bool(&mut self, field: &Field, value: bool) {
515        self.record(field, value.into());
516    }
517
518    fn record_str(&mut self, field: &Field, value: &str) {
519        self.record(field, value.to_owned().into());
520    }
521
522    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
523        self.record(field, format!("{value:?}").into());
524    }
525}
526
527#[derive(Default)]
528struct LimitVisitor {
529    pub limit: Option<bool>,
530    pub limit_secs: Option<u64>,
531}
532
533impl Visit for LimitVisitor {
534    fn record_bool(&mut self, field: &Field, value: bool) {
535        if field.name() == RATE_LIMIT_FIELD {
536            self.limit = Some(value);
537        }
538    }
539
540    fn record_i64(&mut self, field: &Field, value: i64) {
541        if field.name() == RATE_LIMIT_SECS_FIELD {
542            self.limit = Some(true); // limit if we have this field
543            self.limit_secs = Some(u64::try_from(value).unwrap_or_default()); // override the cli passed limit
544        }
545    }
546
547    fn record_u64(&mut self, field: &Field, value: u64) {
548        if field.name() == RATE_LIMIT_SECS_FIELD {
549            self.limit = Some(true); // limit if we have this field
550            self.limit_secs = Some(value); // override the cli passed limit
551        }
552    }
553
554    fn record_debug(&mut self, _field: &Field, _value: &dyn fmt::Debug) {}
555}
556
557#[derive(Default)]
558struct MessageVisitor {
559    pub message: Option<String>,
560}
561
562impl Visit for MessageVisitor {
563    fn record_str(&mut self, field: &Field, value: &str) {
564        if self.message.is_none() && field.name() == MESSAGE_FIELD {
565            self.message = Some(value.to_string());
566        }
567    }
568
569    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
570        if self.message.is_none() && field.name() == MESSAGE_FIELD {
571            self.message = Some(format!("{value:?}"));
572        }
573    }
574}
575
576#[cfg(test)]
577mod test {
578    use std::{
579        collections::BTreeMap,
580        sync::{Arc, Mutex},
581        time::Duration,
582    };
583
584    use mock_instant::global::MockClock;
585    use serial_test::serial;
586    use tracing_subscriber::layer::SubscriberExt;
587
588    use super::*;
589
590    #[derive(Debug, Clone, PartialEq, Eq)]
591    struct RecordedEvent {
592        message: String,
593        fields: BTreeMap<String, String>,
594    }
595
596    impl RecordedEvent {
597        fn new(message: impl Into<String>) -> Self {
598            Self {
599                message: message.into(),
600                fields: BTreeMap::new(),
601            }
602        }
603
604        fn with_field(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
605            self.fields.insert(key.into(), value.into());
606            self
607        }
608    }
609
610    /// Macro to create RecordedEvent with optional fields
611    /// Usage:
612    /// - `event!("message")` - just message
613    /// - `event!("message", key1: "value1")` - message with one field
614    /// - `event!("message", key1: "value1", key2: "value2")` - message with multiple fields
615    macro_rules! event {
616        ($msg:expr) => {
617            RecordedEvent::new($msg)
618        };
619        ($msg:expr, $($key:ident: $value:expr),+ $(,)?) => {
620            RecordedEvent::new($msg)
621                $(.with_field(stringify!($key), $value))+
622        };
623    }
624
625    #[derive(Default)]
626    struct AllFieldsVisitor {
627        fields: BTreeMap<String, String>,
628    }
629
630    impl Visit for AllFieldsVisitor {
631        fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
632            self.fields
633                .insert(field.name().to_string(), format!("{value:?}"));
634        }
635
636        fn record_str(&mut self, field: &Field, value: &str) {
637            self.fields
638                .insert(field.name().to_string(), value.to_string());
639        }
640
641        fn record_i64(&mut self, field: &Field, value: i64) {
642            self.fields
643                .insert(field.name().to_string(), value.to_string());
644        }
645
646        fn record_u64(&mut self, field: &Field, value: u64) {
647            self.fields
648                .insert(field.name().to_string(), value.to_string());
649        }
650
651        fn record_bool(&mut self, field: &Field, value: bool) {
652            self.fields
653                .insert(field.name().to_string(), value.to_string());
654        }
655    }
656
657    impl AllFieldsVisitor {
658        fn into_event(self) -> RecordedEvent {
659            let message = self
660                .fields
661                .get("message")
662                .cloned()
663                .unwrap_or_else(|| String::from(""));
664
665            let mut fields = BTreeMap::new();
666            for (key, value) in self.fields {
667                if key != "message"
668                    && key != "internal_log_rate_limit"
669                    && key != "internal_log_rate_secs"
670                {
671                    fields.insert(key, value);
672                }
673            }
674
675            RecordedEvent { message, fields }
676        }
677    }
678
679    #[derive(Default)]
680    struct RecordingLayer<S> {
681        events: Arc<Mutex<Vec<RecordedEvent>>>,
682
683        _subscriber: std::marker::PhantomData<S>,
684    }
685
686    impl<S> RecordingLayer<S> {
687        fn new(events: Arc<Mutex<Vec<RecordedEvent>>>) -> Self {
688            RecordingLayer {
689                events,
690
691                _subscriber: std::marker::PhantomData,
692            }
693        }
694    }
695
696    impl<S> Layer<S> for RecordingLayer<S>
697    where
698        S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
699    {
700        fn register_callsite(&self, _metadata: &'static Metadata<'static>) -> Interest {
701            Interest::always()
702        }
703
704        fn enabled(&self, _metadata: &Metadata<'_>, _ctx: Context<'_, S>) -> bool {
705            true
706        }
707
708        fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
709            let mut visitor = AllFieldsVisitor::default();
710            event.record(&mut visitor);
711
712            // Also capture fields from span context
713            if let Some(span) = ctx.lookup_current() {
714                for span_ref in span.scope().from_root() {
715                    let extensions = span_ref.extensions();
716                    if let Some(span_keys) = extensions.get::<RateLimitedSpanKeys>() {
717                        // Add component_id
718                        if let Some(TraceValue::String(ref s)) = span_keys.component_id {
719                            visitor.fields.insert("component_id".to_string(), s.clone());
720                        }
721                    }
722                }
723            }
724
725            let mut events = self.events.lock().unwrap();
726            events.push(visitor.into_event());
727        }
728    }
729
730    /// Helper function to set up a test with a rate-limited subscriber.
731    /// Returns the events Arc for asserting on collected events.
732    fn setup_test(
733        default_limit: u64,
734    ) -> (
735        Arc<Mutex<Vec<RecordedEvent>>>,
736        impl Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
737    ) {
738        let events: Arc<Mutex<Vec<RecordedEvent>>> = Default::default();
739        let recorder = RecordingLayer::new(Arc::clone(&events));
740        let sub = tracing_subscriber::registry::Registry::default()
741            .with(RateLimitedLayer::new(recorder).with_default_limit(default_limit));
742        (events, sub)
743    }
744
745    #[test]
746    #[serial]
747    fn rate_limits() {
748        let (events, sub) = setup_test(1);
749        tracing::subscriber::with_default(sub, || {
750            for _ in 0..21 {
751                info!(message = "Hello world!");
752                MockClock::advance(Duration::from_millis(100));
753            }
754        });
755
756        let events = events.lock().unwrap();
757
758        assert_eq!(
759            *events,
760            vec![
761                event!("Hello world!"),
762                event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
763                event!("Internal log [Hello world!] has been suppressed 9 times."),
764                event!("Hello world!"),
765                event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
766                event!("Internal log [Hello world!] has been suppressed 9 times."),
767                event!("Hello world!"),
768            ]
769        );
770    }
771
772    #[test]
773    #[serial]
774    fn override_rate_limit_at_callsite() {
775        let (events, sub) = setup_test(100);
776        tracing::subscriber::with_default(sub, || {
777            for _ in 0..31 {
778                info!(message = "Hello world!", internal_log_rate_secs = 2);
779                MockClock::advance(Duration::from_millis(100));
780            }
781        });
782
783        let events = events.lock().unwrap();
784
785        // With a 2-second window and 100ms advances, we get:
786        // - Event every 20 iterations (2000ms / 100ms = 20)
787        // - First window: iteration 0-19 (suppressed 19 times after first 2)
788        // - Second window: iteration 20-39 (but we only go to 30)
789        assert_eq!(
790            *events,
791            vec![
792                event!("Hello world!"),
793                event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
794                event!("Internal log [Hello world!] has been suppressed 19 times."),
795                event!("Hello world!"),
796                event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
797            ]
798        );
799    }
800
801    #[test]
802    #[serial]
803    fn rate_limit_by_event_key() {
804        let (events, sub) = setup_test(1);
805        tracing::subscriber::with_default(sub, || {
806            for _ in 0..21 {
807                for key in &["foo", "bar"] {
808                    info!(
809                        message = format!("Hello {key}!").as_str(),
810                        component_id = &key
811                    );
812                }
813                MockClock::advance(Duration::from_millis(100));
814            }
815        });
816
817        let events = events.lock().unwrap();
818
819        // Events with different component_id values create separate rate limit groups
820        assert_eq!(
821            *events,
822            vec![
823                event!("Hello foo!", component_id: "foo"),
824                event!("Hello bar!", component_id: "bar"),
825                event!("Internal log [Hello foo!] is being suppressed to avoid flooding."),
826                event!("Internal log [Hello bar!] is being suppressed to avoid flooding."),
827                event!("Internal log [Hello foo!] has been suppressed 9 times."),
828                event!("Hello foo!", component_id: "foo"),
829                event!("Internal log [Hello bar!] has been suppressed 9 times."),
830                event!("Hello bar!", component_id: "bar"),
831                event!("Internal log [Hello foo!] is being suppressed to avoid flooding."),
832                event!("Internal log [Hello bar!] is being suppressed to avoid flooding."),
833                event!("Internal log [Hello foo!] has been suppressed 9 times."),
834                event!("Hello foo!", component_id: "foo"),
835                event!("Internal log [Hello bar!] has been suppressed 9 times."),
836                event!("Hello bar!", component_id: "bar"),
837            ]
838        );
839    }
840
841    #[test]
842    #[serial]
843    fn disabled_rate_limit() {
844        let (events, sub) = setup_test(1);
845        tracing::subscriber::with_default(sub, || {
846            for _ in 0..21 {
847                info!(message = "Hello world!", internal_log_rate_limit = false);
848                MockClock::advance(Duration::from_millis(100));
849            }
850        });
851
852        let events = events.lock().unwrap();
853
854        // All 21 events should be emitted since rate limiting is disabled
855        assert_eq!(events.len(), 21);
856        assert!(events.iter().all(|e| e == &event!("Hello world!")));
857    }
858
859    #[test]
860    #[serial]
861    fn rate_limit_ignores_non_special_fields() {
862        let (events, sub) = setup_test(1);
863        tracing::subscriber::with_default(sub, || {
864            for i in 0..21 {
865                // Call the SAME info! macro multiple times per iteration with varying fanout_id
866                // to verify that fanout_id doesn't create separate rate limit groups
867                for _ in 0..3 {
868                    let fanout = if i % 2 == 0 { "output_1" } else { "output_2" };
869                    info!(
870                        message = "Routing event",
871                        component_id = "router",
872                        fanout_id = fanout
873                    );
874                }
875                MockClock::advance(Duration::from_millis(100));
876            }
877        });
878
879        let events = events.lock().unwrap();
880
881        // All events share the same rate limit group (same callsite + component_id)
882        // First event emits normally, second shows suppression, third and beyond are silent
883        // until the window expires
884        assert_eq!(
885            *events,
886            vec![
887                // First iteration - first emits, second shows suppression, 3rd+ silent
888                event!("Routing event", component_id: "router", fanout_id: "output_1"),
889                event!("Internal log [Routing event] is being suppressed to avoid flooding."),
890                // After rate limit window (1 sec) - summary shows suppressions
891                event!("Internal log [Routing event] has been suppressed 29 times."),
892                event!("Routing event", component_id: "router", fanout_id: "output_1"),
893                event!("Internal log [Routing event] is being suppressed to avoid flooding."),
894                event!("Internal log [Routing event] has been suppressed 29 times."),
895                event!("Routing event", component_id: "router", fanout_id: "output_1"),
896                event!("Internal log [Routing event] is being suppressed to avoid flooding."),
897            ]
898        );
899    }
900
901    #[test]
902    #[serial]
903    fn nested_spans_child_takes_precedence() {
904        let (events, sub) = setup_test(1);
905        tracing::subscriber::with_default(sub, || {
906            // Create nested spans where child overrides parent's component_id
907            let outer = info_span!("outer", component_id = "parent");
908            let _outer_guard = outer.enter();
909
910            for _ in 0..21 {
911                // Inner span with different component_id should take precedence
912                let inner = info_span!("inner", component_id = "child");
913                let _inner_guard = inner.enter();
914                info!(message = "Nested event");
915                drop(_inner_guard);
916
917                MockClock::advance(Duration::from_millis(100));
918            }
919        });
920
921        let events = events.lock().unwrap();
922
923        // All events should be grouped by component_id = "child" (from inner span)
924        // not "parent" (from outer span), demonstrating child precedence
925        assert_eq!(
926            *events,
927            vec![
928                event!("Nested event", component_id: "child"),
929                event!("Internal log [Nested event] is being suppressed to avoid flooding.", component_id: "child"),
930                event!("Internal log [Nested event] has been suppressed 9 times.", component_id: "child"),
931                event!("Nested event", component_id: "child"),
932                event!("Internal log [Nested event] is being suppressed to avoid flooding.", component_id: "child"),
933                event!("Internal log [Nested event] has been suppressed 9 times.", component_id: "child"),
934                event!("Nested event", component_id: "child"),
935            ]
936        );
937    }
938
939    #[test]
940    #[serial]
941    fn nested_spans_ignores_untracked_fields() {
942        let (events, sub) = setup_test(1);
943        tracing::subscriber::with_default(sub, || {
944            // Parent has component_id, child has some_field - only component_id is tracked
945            let outer = info_span!("outer", component_id = "transform");
946            let _outer_guard = outer.enter();
947
948            for _ in 0..21 {
949                let inner = info_span!("inner", some_field = "value");
950                let _inner_guard = inner.enter();
951                info!(message = "Event message");
952                drop(_inner_guard);
953
954                MockClock::advance(Duration::from_millis(100));
955            }
956        });
957
958        let events = events.lock().unwrap();
959
960        // Events should have component_id from parent, some_field from child is ignored for grouping
961        // All events are in the same rate limit group
962        assert_eq!(
963            *events,
964            vec![
965                event!("Event message", component_id: "transform"),
966                event!(
967                    "Internal log [Event message] is being suppressed to avoid flooding.",
968                    component_id: "transform"
969                ),
970                event!(
971                    "Internal log [Event message] has been suppressed 9 times.",
972                    component_id: "transform"
973                ),
974                event!("Event message", component_id: "transform"),
975                event!(
976                    "Internal log [Event message] is being suppressed to avoid flooding.",
977                    component_id: "transform"
978                ),
979                event!(
980                    "Internal log [Event message] has been suppressed 9 times.",
981                    component_id: "transform"
982                ),
983                event!("Event message", component_id: "transform"),
984            ]
985        );
986    }
987
988    #[test]
989    #[serial]
990    fn rate_limit_same_message_different_component() {
991        let (events, sub) = setup_test(1);
992        tracing::subscriber::with_default(sub, || {
993            // Use a loop with the SAME callsite to demonstrate that identical messages
994            // with different component_ids create separate rate limit groups
995            for component in &["foo", "foo", "bar"] {
996                info!(message = "Hello!", component_id = component);
997                MockClock::advance(Duration::from_millis(100));
998            }
999        });
1000
1001        let events = events.lock().unwrap();
1002
1003        // The first "foo" event is emitted normally (count=0)
1004        // The second "foo" event triggers suppression warning (count=1)
1005        // The "bar" event is emitted normally (count=0 for its group)
1006        // This proves that even with identical message text, different component_ids
1007        // create separate rate limit groups
1008        assert_eq!(
1009            *events,
1010            vec![
1011                event!("Hello!", component_id: "foo"),
1012                event!("Internal log [Hello!] is being suppressed to avoid flooding."),
1013                event!("Hello!", component_id: "bar"),
1014            ]
1015        );
1016    }
1017
1018    #[test]
1019    #[serial]
1020    fn events_with_custom_fields_no_message_dont_panic() {
1021        // Verify events without "message" or "internal_log_rate_limit" fields don't panic
1022        // when rate limiting skips suppression notifications.
1023        let (events, sub) = setup_test(1);
1024        tracing::subscriber::with_default(sub, || {
1025            // Use closure to ensure all events share the same callsite
1026            let emit_event = || {
1027                debug!(component_id = "test_component", utilization = 0.85);
1028            };
1029
1030            // First window: emit 5 events, only the first one should be logged
1031            for _ in 0..5 {
1032                emit_event();
1033                MockClock::advance(Duration::from_millis(100));
1034            }
1035
1036            // Advance to the next window
1037            MockClock::advance(Duration::from_millis(1000));
1038
1039            // Second window: this event should be logged
1040            emit_event();
1041        });
1042
1043        let events = events.lock().unwrap();
1044
1045        // First event from window 1, first event from window 2
1046        // Suppression notifications are skipped (no message field)
1047        assert_eq!(
1048            *events,
1049            vec![
1050                event!("", component_id: "test_component", utilization: "0.85"),
1051                event!("", component_id: "test_component", utilization: "0.85"),
1052            ]
1053        );
1054    }
1055}