tracing_limit/
lib.rs

1#![deny(warnings)]
2//! Rate limiting for tracing events.
3//!
4//! This crate provides a tracing-subscriber layer that rate limits log events to prevent
5//! log flooding. Events are grouped by their callsite and contextual fields, with each
6//! unique combination rate limited independently.
7//!
8//! # How it works
9//!
10//! Within each rate limit window (default 10 seconds):
11//! - **1st occurrence**: Event is emitted normally
12//! - **2nd occurrence**: Emits a "suppressing" warning
13//! - **3rd+ occurrences**: Silent until window expires
14//! - **After window**: Emits a summary of suppressed count, then next event normally
15//!
16//! # Rate limit grouping
17//!
18//! Events are rate limited independently based on a combination of:
19//! - **Callsite**: The code location where the log statement appears
20//! - **Contextual fields**: Any fields attached to the event or its parent spans
21//!
22//! ## How fields contribute to grouping
23//!
24//! **Only these fields create distinct rate limit groups:**
25//! - `component_id` - Different components are rate limited independently
26//!
27//! **All other fields are ignored for grouping**, including:
28//! - `fanout_id`, `input_id`, `output_id` - Not used for grouping to avoid resource/cost implications from high-cardinality tags
29//! - `message` - The log message itself doesn't differentiate groups
30//! - `internal_log_rate_limit` - Control field for enabling/disabling rate limiting
31//! - `internal_log_rate_secs` - Control field for customizing the rate limit window
32//! - Any custom fields you add
33//!
34//! ## Examples
35//!
36//! ```rust,ignore
37//! // Example 1: Different component_id values create separate rate limit groups
38//! info!(component_id = "transform_1", "Processing event");  // Group A
39//! info!(component_id = "transform_2", "Processing event");  // Group B
40//! // Even though the message is identical, these are rate limited independently
41//!
42//! // Example 2: Only component_id matters for grouping
43//! info!(component_id = "router", fanout_id = "output_1", "Routing event");  // Group C
44//! info!(component_id = "router", fanout_id = "output_2", "Routing event");  // Group C (same group!)
45//! info!(component_id = "router", fanout_id = "output_1", "Routing event");  // Group C (same group!)
46//! info!(component_id = "router", fanout_id = "output_1", input_id = "kafka", "Routing event");  // Group C (same!)
47//! // All of these share the same group because they have the same component_id
48//! // The fanout_id and input_id fields are ignored to avoid resource/cost implications
49//!
50//! // Example 3: Span fields contribute to grouping
51//! let span = info_span!("process", component_id = "transform_1");
52//! let _enter = span.enter();
53//! info!("Processing event");  // Group E: callsite + component_id from span
54//! drop(_enter);
55//!
56//! let span = info_span!("process", component_id = "transform_2");
57//! let _enter = span.enter();
58//! info!("Processing event");  // Group F: same callsite but different component_id
59//!
60//! // Example 4: Nested spans - child span fields take precedence
61//! let outer = info_span!("outer", component_id = "parent");
62//! let _outer_guard = outer.enter();
63//! let inner = info_span!("inner", component_id = "child");
64//! let _inner_guard = inner.enter();
65//! info!("Nested event");  // Grouped by component_id = "child"
66//!
67//! // Example 5: Same callsite with no fields = single rate limit group
68//! info!("Simple message");  // Group G
69//! info!("Simple message");  // Group G
70//! info!("Simple message");  // Group G
71//!
72//! // Example 6: Custom fields are ignored for grouping
73//! info!(component_id = "source", input_id = "in_1", "Received data");  // Group H
74//! info!(component_id = "source", input_id = "in_2", "Received data");  // Group H (same group!)
75//! // The input_id field is ignored - only component_id matters
76//!
77//! // Example 7: Disabling rate limiting for specific logs
78//! // Rate limiting is ON by default - explicitly disable for important logs
79//! warn!(
80//!     component_id = "critical_component",
81//!     message = "Fatal error occurred",
82//!     internal_log_rate_limit = false
83//! );
84//! // This event will NEVER be rate limited, regardless of how often it fires
85//!
86//! // Example 8: Custom rate limit window for specific events
87//! info!(
88//!     component_id = "noisy_component",
89//!     message = "Frequent status update",
90//!     internal_log_rate_secs = 60  // Only log once per minute
91//! );
92//! // Override the default window for this specific log
93//! ```
94//!
95//! This ensures logs from different components are rate limited independently,
96//! while avoiding resource/cost implications from high-cardinality tags.
97
98use std::fmt;
99
100use dashmap::DashMap;
101use tracing_core::{
102    Event, Metadata, Subscriber,
103    callsite::Identifier,
104    field::{Field, Value, Visit, display},
105    span,
106    subscriber::Interest,
107};
108use tracing_subscriber::layer::{Context, Layer};
109
110#[cfg(test)]
111#[macro_use]
112extern crate tracing;
113
114#[cfg(not(test))]
115use std::time::Instant;
116
117#[cfg(test)]
118use mock_instant::global::Instant;
119
120const RATE_LIMIT_FIELD: &str = "internal_log_rate_limit";
121const RATE_LIMIT_SECS_FIELD: &str = "internal_log_rate_secs";
122const MESSAGE_FIELD: &str = "message";
123
124// These fields will cause events to be independently rate limited by the values
125// for these keys
126const COMPONENT_ID_FIELD: &str = "component_id";
127
128#[derive(Eq, PartialEq, Hash, Clone)]
129struct RateKeyIdentifier {
130    callsite: Identifier,
131    rate_limit_key_values: RateLimitedSpanKeys,
132}
133
134pub struct RateLimitedLayer<S, L>
135where
136    L: Layer<S> + Sized,
137    S: Subscriber,
138{
139    events: DashMap<RateKeyIdentifier, State>,
140    inner: L,
141    internal_log_rate_limit: u64,
142    _subscriber: std::marker::PhantomData<S>,
143}
144
145impl<S, L> RateLimitedLayer<S, L>
146where
147    L: Layer<S> + Sized,
148    S: Subscriber,
149{
150    pub fn new(layer: L) -> Self {
151        RateLimitedLayer {
152            events: Default::default(),
153            internal_log_rate_limit: 10,
154            inner: layer,
155            _subscriber: std::marker::PhantomData,
156        }
157    }
158
159    /// Sets the default rate limit window in seconds.
160    ///
161    /// This controls how long logs are suppressed before they can be emitted again.
162    /// Within each window:
163    /// - 1st occurrence: Emitted normally
164    /// - 2nd occurrence: Shows "suppressing" warning
165    /// - 3rd+ occurrences: Silent until window expires
166    pub fn with_default_limit(mut self, internal_log_rate_limit: u64) -> Self {
167        self.internal_log_rate_limit = internal_log_rate_limit;
168        self
169    }
170}
171
172impl<S, L> Layer<S> for RateLimitedLayer<S, L>
173where
174    L: Layer<S>,
175    S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
176{
177    #[inline]
178    fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
179        self.inner.register_callsite(metadata)
180    }
181
182    #[inline]
183    fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
184        self.inner.enabled(metadata, ctx)
185    }
186
187    // keep track of any span fields we use for grouping rate limiting
188    fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
189        {
190            let span = ctx.span(id).expect("Span not found, this is a bug");
191            let mut extensions = span.extensions_mut();
192
193            if extensions.get_mut::<RateLimitedSpanKeys>().is_none() {
194                let mut fields = RateLimitedSpanKeys::default();
195                attrs.record(&mut fields);
196                extensions.insert(fields);
197            };
198        }
199        self.inner.on_new_span(attrs, id, ctx);
200    }
201
202    // keep track of any span fields we use for grouping rate limiting
203    fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
204        {
205            let span = ctx.span(id).expect("Span not found, this is a bug");
206            let mut extensions = span.extensions_mut();
207
208            match extensions.get_mut::<RateLimitedSpanKeys>() {
209                Some(fields) => {
210                    values.record(fields);
211                }
212                None => {
213                    let mut fields = RateLimitedSpanKeys::default();
214                    values.record(&mut fields);
215                    extensions.insert(fields);
216                }
217            };
218        }
219        self.inner.on_record(id, values, ctx);
220    }
221
222    #[inline]
223    fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, S>) {
224        self.inner.on_follows_from(span, follows, ctx);
225    }
226
227    fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
228        // Visit the event, grabbing the limit status if one is defined. Rate limiting is ON by default
229        // unless explicitly disabled by setting `internal_log_rate_limit = false`.
230        let mut limit_visitor = LimitVisitor::default();
231        event.record(&mut limit_visitor);
232
233        let limit_exists = limit_visitor.limit.unwrap_or(true);
234        if !limit_exists {
235            return self.inner.on_event(event, ctx);
236        }
237
238        let limit = match limit_visitor.limit_secs {
239            Some(limit_secs) => limit_secs, // override the cli limit
240            None => self.internal_log_rate_limit,
241        };
242
243        // Build a composite key from event fields and span context to determine the rate limit group.
244        // This multi-step process ensures we capture all relevant contextual information:
245        //
246        // 1. Start with event-level fields (e.g., fields directly on the log macro call)
247        // 2. Walk up the span hierarchy from root to current span
248        // 3. Merge in fields from each span, with child spans taking precedence
249        //
250        // This means an event's rate limit group is determined by the combination of:
251        // - Its callsite (handled separately via RateKeyIdentifier)
252        // - All contextual fields from both the event and its span ancestry
253        //
254        // Example: The same `info!("msg")` callsite in different component contexts becomes
255        // distinct rate limit groups, allowing fine-grained control over log flooding.
256        let rate_limit_key_values = {
257            let mut keys = RateLimitedSpanKeys::default();
258            // Capture fields directly on this event
259            event.record(&mut keys);
260
261            // Walk span hierarchy and merge in contextual fields
262            ctx.lookup_current()
263                .into_iter()
264                .flat_map(|span| span.scope().from_root())
265                .fold(keys, |mut keys, span| {
266                    let extensions = span.extensions();
267                    if let Some(span_keys) = extensions.get::<RateLimitedSpanKeys>() {
268                        keys.merge(span_keys);
269                    }
270                    keys
271                })
272        };
273
274        // Build the key to represent this event, given its span fields, and see if we're already rate limiting it. If
275        // not, we'll initialize an entry for it.
276        let metadata = event.metadata();
277        let id = RateKeyIdentifier {
278            callsite: metadata.callsite(),
279            rate_limit_key_values,
280        };
281
282        let mut state = self.events.entry(id).or_insert_with(|| {
283            let mut message_visitor = MessageVisitor::default();
284            event.record(&mut message_visitor);
285
286            let message = message_visitor
287                .message
288                .unwrap_or_else(|| metadata.name().into());
289
290            State::new(message, limit)
291        });
292
293        // Update our suppressed state for this event, and see if we should still be suppressing it.
294        //
295        // When this is the first time seeing the event, we emit it like we normally would. The second time we see it in
296        // the limit period, we emit a new event to indicate that the original event is being actively suppressed.
297        // Otherwise, we don't emit anything.
298        let previous_count = state.increment_count();
299        if state.should_limit() {
300            match previous_count {
301                0 => self.inner.on_event(event, ctx),
302                1 => {
303                    let message = format!(
304                        "Internal log [{}] is being suppressed to avoid flooding.",
305                        state.message
306                    );
307                    self.create_event(&ctx, metadata, message, state.limit);
308                }
309                _ => {}
310            }
311        } else {
312            // If we saw this event 3 or more times total, emit an event that indicates the total number of times we
313            // suppressed the event in the limit period.
314            if previous_count > 1 {
315                let message = format!(
316                    "Internal log [{}] has been suppressed {} times.",
317                    state.message,
318                    previous_count - 1
319                );
320
321                self.create_event(&ctx, metadata, message, state.limit);
322            }
323
324            // We're not suppressing anymore, so we also emit the current event as normal.. but we update our rate
325            // limiting state since this is effectively equivalent to seeing the event again for the first time.
326            self.inner.on_event(event, ctx);
327
328            state.reset();
329        }
330    }
331
332    #[inline]
333    fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) {
334        self.inner.on_enter(id, ctx);
335    }
336
337    #[inline]
338    fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
339        self.inner.on_exit(id, ctx);
340    }
341
342    #[inline]
343    fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {
344        self.inner.on_close(id, ctx);
345    }
346
347    #[inline]
348    fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, S>) {
349        self.inner.on_id_change(old, new, ctx);
350    }
351
352    #[inline]
353    fn on_layer(&mut self, subscriber: &mut S) {
354        self.inner.on_layer(subscriber);
355    }
356}
357
358impl<S, L> RateLimitedLayer<S, L>
359where
360    S: Subscriber,
361    L: Layer<S>,
362{
363    fn create_event(
364        &self,
365        ctx: &Context<S>,
366        metadata: &'static Metadata<'static>,
367        message: String,
368        rate_limit: u64,
369    ) {
370        let fields = metadata.fields();
371
372        let message = display(message);
373
374        if let Some(message_field) = fields.field("message") {
375            let values = [(&message_field, Some(&message as &dyn Value))];
376
377            let valueset = fields.value_set(&values);
378            let event = Event::new(metadata, &valueset);
379            self.inner.on_event(&event, ctx.clone());
380        } else {
381            let values = [(
382                &fields.field(RATE_LIMIT_FIELD).unwrap(),
383                Some(&rate_limit as &dyn Value),
384            )];
385
386            let valueset = fields.value_set(&values);
387            let event = Event::new(metadata, &valueset);
388            self.inner.on_event(&event, ctx.clone());
389        }
390    }
391}
392
393#[derive(Debug)]
394struct State {
395    start: Instant,
396    count: u64,
397    limit: u64,
398    message: String,
399}
400
401impl State {
402    fn new(message: String, limit: u64) -> Self {
403        Self {
404            start: Instant::now(),
405            count: 0,
406            limit,
407            message,
408        }
409    }
410
411    fn reset(&mut self) {
412        self.start = Instant::now();
413        self.count = 1;
414    }
415
416    fn increment_count(&mut self) -> u64 {
417        let prev = self.count;
418        self.count += 1;
419        prev
420    }
421
422    fn should_limit(&self) -> bool {
423        self.start.elapsed().as_secs() < self.limit
424    }
425}
426
427#[derive(PartialEq, Eq, Clone, Hash)]
428enum TraceValue {
429    String(String),
430    Int(i64),
431    Uint(u64),
432    Bool(bool),
433}
434
435#[cfg(test)]
436impl fmt::Display for TraceValue {
437    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438        match self {
439            TraceValue::String(s) => write!(f, "{}", s),
440            TraceValue::Int(i) => write!(f, "{}", i),
441            TraceValue::Uint(u) => write!(f, "{}", u),
442            TraceValue::Bool(b) => write!(f, "{}", b),
443        }
444    }
445}
446
447impl From<bool> for TraceValue {
448    fn from(b: bool) -> Self {
449        TraceValue::Bool(b)
450    }
451}
452
453impl From<i64> for TraceValue {
454    fn from(i: i64) -> Self {
455        TraceValue::Int(i)
456    }
457}
458
459impl From<u64> for TraceValue {
460    fn from(u: u64) -> Self {
461        TraceValue::Uint(u)
462    }
463}
464
465impl From<String> for TraceValue {
466    fn from(s: String) -> Self {
467        TraceValue::String(s)
468    }
469}
470
471/// RateLimitedSpanKeys records span and event fields that differentiate rate limit groups.
472///
473/// This struct is used to build a composite key that uniquely identifies a rate limit bucket.
474/// Events with different field values will be rate limited independently, even if they come
475/// from the same callsite.
476///
477/// ## Field categories:
478///
479/// **Tracked fields** (only these create distinct rate limit groups):
480/// - `component_id` - Different components are rate limited independently
481///
482/// **Ignored fields**: All other fields are ignored for grouping purposes. This avoids resource/cost implications from high-cardinality tags.
483/// ```
484#[derive(Default, Eq, PartialEq, Hash, Clone)]
485struct RateLimitedSpanKeys {
486    component_id: Option<TraceValue>,
487}
488
489impl RateLimitedSpanKeys {
490    fn record(&mut self, field: &Field, value: TraceValue) {
491        if field.name() == COMPONENT_ID_FIELD {
492            self.component_id = Some(value);
493        }
494    }
495
496    fn merge(&mut self, other: &Self) {
497        if let Some(component_id) = &other.component_id {
498            self.component_id = Some(component_id.clone());
499        }
500    }
501}
502
503impl Visit for RateLimitedSpanKeys {
504    fn record_i64(&mut self, field: &Field, value: i64) {
505        self.record(field, value.into());
506    }
507
508    fn record_u64(&mut self, field: &Field, value: u64) {
509        self.record(field, value.into());
510    }
511
512    fn record_bool(&mut self, field: &Field, value: bool) {
513        self.record(field, value.into());
514    }
515
516    fn record_str(&mut self, field: &Field, value: &str) {
517        self.record(field, value.to_owned().into());
518    }
519
520    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
521        self.record(field, format!("{value:?}").into());
522    }
523}
524
525#[derive(Default)]
526struct LimitVisitor {
527    pub limit: Option<bool>,
528    pub limit_secs: Option<u64>,
529}
530
531impl Visit for LimitVisitor {
532    fn record_bool(&mut self, field: &Field, value: bool) {
533        if field.name() == RATE_LIMIT_FIELD {
534            self.limit = Some(value);
535        }
536    }
537
538    fn record_i64(&mut self, field: &Field, value: i64) {
539        if field.name() == RATE_LIMIT_SECS_FIELD {
540            self.limit = Some(true); // limit if we have this field
541            self.limit_secs = Some(u64::try_from(value).unwrap_or_default()); // override the cli passed limit
542        }
543    }
544
545    fn record_u64(&mut self, field: &Field, value: u64) {
546        if field.name() == RATE_LIMIT_SECS_FIELD {
547            self.limit = Some(true); // limit if we have this field
548            self.limit_secs = Some(value); // override the cli passed limit
549        }
550    }
551
552    fn record_debug(&mut self, _field: &Field, _value: &dyn fmt::Debug) {}
553}
554
555#[derive(Default)]
556struct MessageVisitor {
557    pub message: Option<String>,
558}
559
560impl Visit for MessageVisitor {
561    fn record_str(&mut self, field: &Field, value: &str) {
562        if self.message.is_none() && field.name() == MESSAGE_FIELD {
563            self.message = Some(value.to_string());
564        }
565    }
566
567    fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
568        if self.message.is_none() && field.name() == MESSAGE_FIELD {
569            self.message = Some(format!("{value:?}"));
570        }
571    }
572}
573
574#[cfg(test)]
575mod test {
576    use std::{
577        collections::BTreeMap,
578        sync::{Arc, Mutex},
579        time::Duration,
580    };
581
582    use mock_instant::global::MockClock;
583    use serial_test::serial;
584    use tracing_subscriber::layer::SubscriberExt;
585
586    use super::*;
587
588    #[derive(Debug, Clone, PartialEq, Eq)]
589    struct RecordedEvent {
590        message: String,
591        fields: BTreeMap<String, String>,
592    }
593
594    impl RecordedEvent {
595        fn new(message: impl Into<String>) -> Self {
596            Self {
597                message: message.into(),
598                fields: BTreeMap::new(),
599            }
600        }
601
602        fn with_field(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
603            self.fields.insert(key.into(), value.into());
604            self
605        }
606    }
607
608    /// Macro to create RecordedEvent with optional fields
609    /// Usage:
610    /// - `event!("message")` - just message
611    /// - `event!("message", key1: "value1")` - message with one field
612    /// - `event!("message", key1: "value1", key2: "value2")` - message with multiple fields
613    macro_rules! event {
614        ($msg:expr) => {
615            RecordedEvent::new($msg)
616        };
617        ($msg:expr, $($key:ident: $value:expr),+ $(,)?) => {
618            RecordedEvent::new($msg)
619                $(.with_field(stringify!($key), $value))+
620        };
621    }
622
623    #[derive(Default)]
624    struct AllFieldsVisitor {
625        fields: BTreeMap<String, String>,
626    }
627
628    impl Visit for AllFieldsVisitor {
629        fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
630            self.fields
631                .insert(field.name().to_string(), format!("{value:?}"));
632        }
633
634        fn record_str(&mut self, field: &Field, value: &str) {
635            self.fields
636                .insert(field.name().to_string(), value.to_string());
637        }
638
639        fn record_i64(&mut self, field: &Field, value: i64) {
640            self.fields
641                .insert(field.name().to_string(), value.to_string());
642        }
643
644        fn record_u64(&mut self, field: &Field, value: u64) {
645            self.fields
646                .insert(field.name().to_string(), value.to_string());
647        }
648
649        fn record_bool(&mut self, field: &Field, value: bool) {
650            self.fields
651                .insert(field.name().to_string(), value.to_string());
652        }
653    }
654
655    impl AllFieldsVisitor {
656        fn into_event(self) -> RecordedEvent {
657            let message = self
658                .fields
659                .get("message")
660                .cloned()
661                .unwrap_or_else(|| String::from(""));
662
663            let mut fields = BTreeMap::new();
664            for (key, value) in self.fields {
665                if key != "message"
666                    && key != "internal_log_rate_limit"
667                    && key != "internal_log_rate_secs"
668                {
669                    fields.insert(key, value);
670                }
671            }
672
673            RecordedEvent { message, fields }
674        }
675    }
676
677    #[derive(Default)]
678    struct RecordingLayer<S> {
679        events: Arc<Mutex<Vec<RecordedEvent>>>,
680
681        _subscriber: std::marker::PhantomData<S>,
682    }
683
684    impl<S> RecordingLayer<S> {
685        fn new(events: Arc<Mutex<Vec<RecordedEvent>>>) -> Self {
686            RecordingLayer {
687                events,
688
689                _subscriber: std::marker::PhantomData,
690            }
691        }
692    }
693
694    impl<S> Layer<S> for RecordingLayer<S>
695    where
696        S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
697    {
698        fn register_callsite(&self, _metadata: &'static Metadata<'static>) -> Interest {
699            Interest::always()
700        }
701
702        fn enabled(&self, _metadata: &Metadata<'_>, _ctx: Context<'_, S>) -> bool {
703            true
704        }
705
706        fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
707            let mut visitor = AllFieldsVisitor::default();
708            event.record(&mut visitor);
709
710            // Also capture fields from span context
711            if let Some(span) = ctx.lookup_current() {
712                for span_ref in span.scope().from_root() {
713                    let extensions = span_ref.extensions();
714                    if let Some(span_keys) = extensions.get::<RateLimitedSpanKeys>() {
715                        // Add component_id
716                        if let Some(TraceValue::String(ref s)) = span_keys.component_id {
717                            visitor.fields.insert("component_id".to_string(), s.clone());
718                        }
719                    }
720                }
721            }
722
723            let mut events = self.events.lock().unwrap();
724            events.push(visitor.into_event());
725        }
726    }
727
728    /// Helper function to set up a test with a rate-limited subscriber.
729    /// Returns the events Arc for asserting on collected events.
730    fn setup_test(
731        default_limit: u64,
732    ) -> (
733        Arc<Mutex<Vec<RecordedEvent>>>,
734        impl Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
735    ) {
736        let events: Arc<Mutex<Vec<RecordedEvent>>> = Default::default();
737        let recorder = RecordingLayer::new(Arc::clone(&events));
738        let sub = tracing_subscriber::registry::Registry::default()
739            .with(RateLimitedLayer::new(recorder).with_default_limit(default_limit));
740        (events, sub)
741    }
742
743    #[test]
744    #[serial]
745    fn rate_limits() {
746        let (events, sub) = setup_test(1);
747        tracing::subscriber::with_default(sub, || {
748            for _ in 0..21 {
749                info!(message = "Hello world!");
750                MockClock::advance(Duration::from_millis(100));
751            }
752        });
753
754        let events = events.lock().unwrap();
755
756        assert_eq!(
757            *events,
758            vec![
759                event!("Hello world!"),
760                event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
761                event!("Internal log [Hello world!] has been suppressed 9 times."),
762                event!("Hello world!"),
763                event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
764                event!("Internal log [Hello world!] has been suppressed 9 times."),
765                event!("Hello world!"),
766            ]
767        );
768    }
769
770    #[test]
771    #[serial]
772    fn override_rate_limit_at_callsite() {
773        let (events, sub) = setup_test(100);
774        tracing::subscriber::with_default(sub, || {
775            for _ in 0..31 {
776                info!(message = "Hello world!", internal_log_rate_secs = 2);
777                MockClock::advance(Duration::from_millis(100));
778            }
779        });
780
781        let events = events.lock().unwrap();
782
783        // With a 2-second window and 100ms advances, we get:
784        // - Event every 20 iterations (2000ms / 100ms = 20)
785        // - First window: iteration 0-19 (suppressed 19 times after first 2)
786        // - Second window: iteration 20-39 (but we only go to 30)
787        assert_eq!(
788            *events,
789            vec![
790                event!("Hello world!"),
791                event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
792                event!("Internal log [Hello world!] has been suppressed 19 times."),
793                event!("Hello world!"),
794                event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
795            ]
796        );
797    }
798
799    #[test]
800    #[serial]
801    fn rate_limit_by_event_key() {
802        let (events, sub) = setup_test(1);
803        tracing::subscriber::with_default(sub, || {
804            for _ in 0..21 {
805                for key in &["foo", "bar"] {
806                    info!(
807                        message = format!("Hello {key}!").as_str(),
808                        component_id = &key
809                    );
810                }
811                MockClock::advance(Duration::from_millis(100));
812            }
813        });
814
815        let events = events.lock().unwrap();
816
817        // Events with different component_id values create separate rate limit groups
818        assert_eq!(
819            *events,
820            vec![
821                event!("Hello foo!", component_id: "foo"),
822                event!("Hello bar!", component_id: "bar"),
823                event!("Internal log [Hello foo!] is being suppressed to avoid flooding."),
824                event!("Internal log [Hello bar!] is being suppressed to avoid flooding."),
825                event!("Internal log [Hello foo!] has been suppressed 9 times."),
826                event!("Hello foo!", component_id: "foo"),
827                event!("Internal log [Hello bar!] has been suppressed 9 times."),
828                event!("Hello bar!", component_id: "bar"),
829                event!("Internal log [Hello foo!] is being suppressed to avoid flooding."),
830                event!("Internal log [Hello bar!] is being suppressed to avoid flooding."),
831                event!("Internal log [Hello foo!] has been suppressed 9 times."),
832                event!("Hello foo!", component_id: "foo"),
833                event!("Internal log [Hello bar!] has been suppressed 9 times."),
834                event!("Hello bar!", component_id: "bar"),
835            ]
836        );
837    }
838
839    #[test]
840    #[serial]
841    fn disabled_rate_limit() {
842        let (events, sub) = setup_test(1);
843        tracing::subscriber::with_default(sub, || {
844            for _ in 0..21 {
845                info!(message = "Hello world!", internal_log_rate_limit = false);
846                MockClock::advance(Duration::from_millis(100));
847            }
848        });
849
850        let events = events.lock().unwrap();
851
852        // All 21 events should be emitted since rate limiting is disabled
853        assert_eq!(events.len(), 21);
854        assert!(events.iter().all(|e| e == &event!("Hello world!")));
855    }
856
857    #[test]
858    #[serial]
859    fn rate_limit_ignores_non_special_fields() {
860        let (events, sub) = setup_test(1);
861        tracing::subscriber::with_default(sub, || {
862            for i in 0..21 {
863                // Call the SAME info! macro multiple times per iteration with varying fanout_id
864                // to verify that fanout_id doesn't create separate rate limit groups
865                for _ in 0..3 {
866                    let fanout = if i % 2 == 0 { "output_1" } else { "output_2" };
867                    info!(
868                        message = "Routing event",
869                        component_id = "router",
870                        fanout_id = fanout
871                    );
872                }
873                MockClock::advance(Duration::from_millis(100));
874            }
875        });
876
877        let events = events.lock().unwrap();
878
879        // All events share the same rate limit group (same callsite + component_id)
880        // First event emits normally, second shows suppression, third and beyond are silent
881        // until the window expires
882        assert_eq!(
883            *events,
884            vec![
885                // First iteration - first emits, second shows suppression, 3rd+ silent
886                event!("Routing event", component_id: "router", fanout_id: "output_1"),
887                event!("Internal log [Routing event] is being suppressed to avoid flooding."),
888                // After rate limit window (1 sec) - summary shows suppressions
889                event!("Internal log [Routing event] has been suppressed 29 times."),
890                event!("Routing event", component_id: "router", fanout_id: "output_1"),
891                event!("Internal log [Routing event] is being suppressed to avoid flooding."),
892                event!("Internal log [Routing event] has been suppressed 29 times."),
893                event!("Routing event", component_id: "router", fanout_id: "output_1"),
894                event!("Internal log [Routing event] is being suppressed to avoid flooding."),
895            ]
896        );
897    }
898
899    #[test]
900    #[serial]
901    fn nested_spans_child_takes_precedence() {
902        let (events, sub) = setup_test(1);
903        tracing::subscriber::with_default(sub, || {
904            // Create nested spans where child overrides parent's component_id
905            let outer = info_span!("outer", component_id = "parent");
906            let _outer_guard = outer.enter();
907
908            for _ in 0..21 {
909                // Inner span with different component_id should take precedence
910                let inner = info_span!("inner", component_id = "child");
911                let _inner_guard = inner.enter();
912                info!(message = "Nested event");
913                drop(_inner_guard);
914
915                MockClock::advance(Duration::from_millis(100));
916            }
917        });
918
919        let events = events.lock().unwrap();
920
921        // All events should be grouped by component_id = "child" (from inner span)
922        // not "parent" (from outer span), demonstrating child precedence
923        assert_eq!(
924            *events,
925            vec![
926                event!("Nested event", component_id: "child"),
927                event!("Internal log [Nested event] is being suppressed to avoid flooding.", component_id: "child"),
928                event!("Internal log [Nested event] has been suppressed 9 times.", component_id: "child"),
929                event!("Nested event", component_id: "child"),
930                event!("Internal log [Nested event] is being suppressed to avoid flooding.", component_id: "child"),
931                event!("Internal log [Nested event] has been suppressed 9 times.", component_id: "child"),
932                event!("Nested event", component_id: "child"),
933            ]
934        );
935    }
936
937    #[test]
938    #[serial]
939    fn nested_spans_ignores_untracked_fields() {
940        let (events, sub) = setup_test(1);
941        tracing::subscriber::with_default(sub, || {
942            // Parent has component_id, child has some_field - only component_id is tracked
943            let outer = info_span!("outer", component_id = "transform");
944            let _outer_guard = outer.enter();
945
946            for _ in 0..21 {
947                let inner = info_span!("inner", some_field = "value");
948                let _inner_guard = inner.enter();
949                info!(message = "Event message");
950                drop(_inner_guard);
951
952                MockClock::advance(Duration::from_millis(100));
953            }
954        });
955
956        let events = events.lock().unwrap();
957
958        // Events should have component_id from parent, some_field from child is ignored for grouping
959        // All events are in the same rate limit group
960        assert_eq!(
961            *events,
962            vec![
963                event!("Event message", component_id: "transform"),
964                event!(
965                    "Internal log [Event message] is being suppressed to avoid flooding.",
966                    component_id: "transform"
967                ),
968                event!(
969                    "Internal log [Event message] has been suppressed 9 times.",
970                    component_id: "transform"
971                ),
972                event!("Event message", component_id: "transform"),
973                event!(
974                    "Internal log [Event message] is being suppressed to avoid flooding.",
975                    component_id: "transform"
976                ),
977                event!(
978                    "Internal log [Event message] has been suppressed 9 times.",
979                    component_id: "transform"
980                ),
981                event!("Event message", component_id: "transform"),
982            ]
983        );
984    }
985
986    #[test]
987    #[serial]
988    fn rate_limit_same_message_different_component() {
989        let (events, sub) = setup_test(1);
990        tracing::subscriber::with_default(sub, || {
991            // Use a loop with the SAME callsite to demonstrate that identical messages
992            // with different component_ids create separate rate limit groups
993            for component in &["foo", "foo", "bar"] {
994                info!(message = "Hello!", component_id = component);
995                MockClock::advance(Duration::from_millis(100));
996            }
997        });
998
999        let events = events.lock().unwrap();
1000
1001        // The first "foo" event is emitted normally (count=0)
1002        // The second "foo" event triggers suppression warning (count=1)
1003        // The "bar" event is emitted normally (count=0 for its group)
1004        // This proves that even with identical message text, different component_ids
1005        // create separate rate limit groups
1006        assert_eq!(
1007            *events,
1008            vec![
1009                event!("Hello!", component_id: "foo"),
1010                event!("Internal log [Hello!] is being suppressed to avoid flooding."),
1011                event!("Hello!", component_id: "bar"),
1012            ]
1013        );
1014    }
1015}