1#![deny(warnings)]
2use std::fmt;
99
100use dashmap::DashMap;
101use tracing_core::{
102 Event, Metadata, Subscriber,
103 callsite::Identifier,
104 field::{Field, Value, Visit, display},
105 span,
106 subscriber::Interest,
107};
108use tracing_subscriber::layer::{Context, Layer};
109
110#[cfg(test)]
111#[macro_use]
112extern crate tracing;
113
114#[cfg(not(test))]
115use std::time::Instant;
116
117#[cfg(test)]
118use mock_instant::global::Instant;
119
120const RATE_LIMIT_FIELD: &str = "internal_log_rate_limit";
121const RATE_LIMIT_SECS_FIELD: &str = "internal_log_rate_secs";
122const MESSAGE_FIELD: &str = "message";
123
124const COMPONENT_ID_FIELD: &str = "component_id";
127
128#[derive(Eq, PartialEq, Hash, Clone)]
129struct RateKeyIdentifier {
130 callsite: Identifier,
131 rate_limit_key_values: RateLimitedSpanKeys,
132}
133
134pub struct RateLimitedLayer<S, L>
135where
136 L: Layer<S> + Sized,
137 S: Subscriber,
138{
139 events: DashMap<RateKeyIdentifier, State>,
140 inner: L,
141 internal_log_rate_limit: u64,
142 _subscriber: std::marker::PhantomData<S>,
143}
144
145impl<S, L> RateLimitedLayer<S, L>
146where
147 L: Layer<S> + Sized,
148 S: Subscriber,
149{
150 pub fn new(layer: L) -> Self {
151 RateLimitedLayer {
152 events: Default::default(),
153 internal_log_rate_limit: 10,
154 inner: layer,
155 _subscriber: std::marker::PhantomData,
156 }
157 }
158
159 pub fn with_default_limit(mut self, internal_log_rate_limit: u64) -> Self {
167 self.internal_log_rate_limit = internal_log_rate_limit;
168 self
169 }
170}
171
172impl<S, L> Layer<S> for RateLimitedLayer<S, L>
173where
174 L: Layer<S>,
175 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
176{
177 #[inline]
178 fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
179 self.inner.register_callsite(metadata)
180 }
181
182 #[inline]
183 fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
184 self.inner.enabled(metadata, ctx)
185 }
186
187 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
189 {
190 let span = ctx.span(id).expect("Span not found, this is a bug");
191 let mut extensions = span.extensions_mut();
192
193 if extensions.get_mut::<RateLimitedSpanKeys>().is_none() {
194 let mut fields = RateLimitedSpanKeys::default();
195 attrs.record(&mut fields);
196 extensions.insert(fields);
197 };
198 }
199 self.inner.on_new_span(attrs, id, ctx);
200 }
201
202 fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
204 {
205 let span = ctx.span(id).expect("Span not found, this is a bug");
206 let mut extensions = span.extensions_mut();
207
208 match extensions.get_mut::<RateLimitedSpanKeys>() {
209 Some(fields) => {
210 values.record(fields);
211 }
212 None => {
213 let mut fields = RateLimitedSpanKeys::default();
214 values.record(&mut fields);
215 extensions.insert(fields);
216 }
217 };
218 }
219 self.inner.on_record(id, values, ctx);
220 }
221
222 #[inline]
223 fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, S>) {
224 self.inner.on_follows_from(span, follows, ctx);
225 }
226
227 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
228 let mut limit_visitor = LimitVisitor::default();
231 event.record(&mut limit_visitor);
232
233 let limit_exists = limit_visitor.limit.unwrap_or(true);
234 if !limit_exists {
235 return self.inner.on_event(event, ctx);
236 }
237
238 let limit = match limit_visitor.limit_secs {
239 Some(limit_secs) => limit_secs, None => self.internal_log_rate_limit,
241 };
242
243 let rate_limit_key_values = {
257 let mut keys = RateLimitedSpanKeys::default();
258 event.record(&mut keys);
260
261 ctx.lookup_current()
263 .into_iter()
264 .flat_map(|span| span.scope().from_root())
265 .fold(keys, |mut keys, span| {
266 let extensions = span.extensions();
267 if let Some(span_keys) = extensions.get::<RateLimitedSpanKeys>() {
268 keys.merge(span_keys);
269 }
270 keys
271 })
272 };
273
274 let metadata = event.metadata();
277 let id = RateKeyIdentifier {
278 callsite: metadata.callsite(),
279 rate_limit_key_values,
280 };
281
282 let mut state = self.events.entry(id).or_insert_with(|| {
283 let mut message_visitor = MessageVisitor::default();
284 event.record(&mut message_visitor);
285
286 let message = message_visitor
287 .message
288 .unwrap_or_else(|| metadata.name().into());
289
290 State::new(message, limit)
291 });
292
293 let previous_count = state.increment_count();
299 if state.should_limit() {
300 match previous_count {
301 0 => self.inner.on_event(event, ctx),
302 1 => {
303 let message = format!(
304 "Internal log [{}] is being suppressed to avoid flooding.",
305 state.message
306 );
307 self.create_event(&ctx, metadata, message, state.limit);
308 }
309 _ => {}
310 }
311 } else {
312 if previous_count > 1 {
315 let message = format!(
316 "Internal log [{}] has been suppressed {} times.",
317 state.message,
318 previous_count - 1
319 );
320
321 self.create_event(&ctx, metadata, message, state.limit);
322 }
323
324 self.inner.on_event(event, ctx);
327
328 state.reset();
329 }
330 }
331
332 #[inline]
333 fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) {
334 self.inner.on_enter(id, ctx);
335 }
336
337 #[inline]
338 fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
339 self.inner.on_exit(id, ctx);
340 }
341
342 #[inline]
343 fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {
344 self.inner.on_close(id, ctx);
345 }
346
347 #[inline]
348 fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, S>) {
349 self.inner.on_id_change(old, new, ctx);
350 }
351
352 #[inline]
353 fn on_layer(&mut self, subscriber: &mut S) {
354 self.inner.on_layer(subscriber);
355 }
356}
357
358impl<S, L> RateLimitedLayer<S, L>
359where
360 S: Subscriber,
361 L: Layer<S>,
362{
363 fn create_event(
364 &self,
365 ctx: &Context<S>,
366 metadata: &'static Metadata<'static>,
367 message: String,
368 rate_limit: u64,
369 ) {
370 let fields = metadata.fields();
371
372 let message = display(message);
373
374 if let Some(message_field) = fields.field("message") {
375 let values = [(&message_field, Some(&message as &dyn Value))];
376
377 let valueset = fields.value_set(&values);
378 let event = Event::new(metadata, &valueset);
379 self.inner.on_event(&event, ctx.clone());
380 } else if let Some(rate_limit_field) = fields.field(RATE_LIMIT_FIELD) {
381 let values = [(&rate_limit_field, Some(&rate_limit as &dyn Value))];
382
383 let valueset = fields.value_set(&values);
384 let event = Event::new(metadata, &valueset);
385 self.inner.on_event(&event, ctx.clone());
386 } else {
387 }
392 }
393}
394
395#[derive(Debug)]
396struct State {
397 start: Instant,
398 count: u64,
399 limit: u64,
400 message: String,
401}
402
403impl State {
404 fn new(message: String, limit: u64) -> Self {
405 Self {
406 start: Instant::now(),
407 count: 0,
408 limit,
409 message,
410 }
411 }
412
413 fn reset(&mut self) {
414 self.start = Instant::now();
415 self.count = 1;
416 }
417
418 fn increment_count(&mut self) -> u64 {
419 let prev = self.count;
420 self.count += 1;
421 prev
422 }
423
424 fn should_limit(&self) -> bool {
425 self.start.elapsed().as_secs() < self.limit
426 }
427}
428
429#[derive(PartialEq, Eq, Clone, Hash)]
430enum TraceValue {
431 String(String),
432 Int(i64),
433 Uint(u64),
434 Bool(bool),
435}
436
437#[cfg(test)]
438impl fmt::Display for TraceValue {
439 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
440 match self {
441 TraceValue::String(s) => write!(f, "{}", s),
442 TraceValue::Int(i) => write!(f, "{}", i),
443 TraceValue::Uint(u) => write!(f, "{}", u),
444 TraceValue::Bool(b) => write!(f, "{}", b),
445 }
446 }
447}
448
449impl From<bool> for TraceValue {
450 fn from(b: bool) -> Self {
451 TraceValue::Bool(b)
452 }
453}
454
455impl From<i64> for TraceValue {
456 fn from(i: i64) -> Self {
457 TraceValue::Int(i)
458 }
459}
460
461impl From<u64> for TraceValue {
462 fn from(u: u64) -> Self {
463 TraceValue::Uint(u)
464 }
465}
466
467impl From<String> for TraceValue {
468 fn from(s: String) -> Self {
469 TraceValue::String(s)
470 }
471}
472
473#[derive(Default, Eq, PartialEq, Hash, Clone)]
487struct RateLimitedSpanKeys {
488 component_id: Option<TraceValue>,
489}
490
491impl RateLimitedSpanKeys {
492 fn record(&mut self, field: &Field, value: TraceValue) {
493 if field.name() == COMPONENT_ID_FIELD {
494 self.component_id = Some(value);
495 }
496 }
497
498 fn merge(&mut self, other: &Self) {
499 if let Some(component_id) = &other.component_id {
500 self.component_id = Some(component_id.clone());
501 }
502 }
503}
504
505impl Visit for RateLimitedSpanKeys {
506 fn record_i64(&mut self, field: &Field, value: i64) {
507 self.record(field, value.into());
508 }
509
510 fn record_u64(&mut self, field: &Field, value: u64) {
511 self.record(field, value.into());
512 }
513
514 fn record_bool(&mut self, field: &Field, value: bool) {
515 self.record(field, value.into());
516 }
517
518 fn record_str(&mut self, field: &Field, value: &str) {
519 self.record(field, value.to_owned().into());
520 }
521
522 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
523 self.record(field, format!("{value:?}").into());
524 }
525}
526
527#[derive(Default)]
528struct LimitVisitor {
529 pub limit: Option<bool>,
530 pub limit_secs: Option<u64>,
531}
532
533impl Visit for LimitVisitor {
534 fn record_bool(&mut self, field: &Field, value: bool) {
535 if field.name() == RATE_LIMIT_FIELD {
536 self.limit = Some(value);
537 }
538 }
539
540 fn record_i64(&mut self, field: &Field, value: i64) {
541 if field.name() == RATE_LIMIT_SECS_FIELD {
542 self.limit = Some(true); self.limit_secs = Some(u64::try_from(value).unwrap_or_default()); }
545 }
546
547 fn record_u64(&mut self, field: &Field, value: u64) {
548 if field.name() == RATE_LIMIT_SECS_FIELD {
549 self.limit = Some(true); self.limit_secs = Some(value); }
552 }
553
554 fn record_debug(&mut self, _field: &Field, _value: &dyn fmt::Debug) {}
555}
556
557#[derive(Default)]
558struct MessageVisitor {
559 pub message: Option<String>,
560}
561
562impl Visit for MessageVisitor {
563 fn record_str(&mut self, field: &Field, value: &str) {
564 if self.message.is_none() && field.name() == MESSAGE_FIELD {
565 self.message = Some(value.to_string());
566 }
567 }
568
569 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
570 if self.message.is_none() && field.name() == MESSAGE_FIELD {
571 self.message = Some(format!("{value:?}"));
572 }
573 }
574}
575
576#[cfg(test)]
577mod test {
578 use std::{
579 collections::BTreeMap,
580 sync::{Arc, Mutex},
581 time::Duration,
582 };
583
584 use mock_instant::global::MockClock;
585 use serial_test::serial;
586 use tracing_subscriber::layer::SubscriberExt;
587
588 use super::*;
589
590 #[derive(Debug, Clone, PartialEq, Eq)]
591 struct RecordedEvent {
592 message: String,
593 fields: BTreeMap<String, String>,
594 }
595
596 impl RecordedEvent {
597 fn new(message: impl Into<String>) -> Self {
598 Self {
599 message: message.into(),
600 fields: BTreeMap::new(),
601 }
602 }
603
604 fn with_field(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
605 self.fields.insert(key.into(), value.into());
606 self
607 }
608 }
609
610 macro_rules! event {
616 ($msg:expr) => {
617 RecordedEvent::new($msg)
618 };
619 ($msg:expr, $($key:ident: $value:expr),+ $(,)?) => {
620 RecordedEvent::new($msg)
621 $(.with_field(stringify!($key), $value))+
622 };
623 }
624
625 #[derive(Default)]
626 struct AllFieldsVisitor {
627 fields: BTreeMap<String, String>,
628 }
629
630 impl Visit for AllFieldsVisitor {
631 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
632 self.fields
633 .insert(field.name().to_string(), format!("{value:?}"));
634 }
635
636 fn record_str(&mut self, field: &Field, value: &str) {
637 self.fields
638 .insert(field.name().to_string(), value.to_string());
639 }
640
641 fn record_i64(&mut self, field: &Field, value: i64) {
642 self.fields
643 .insert(field.name().to_string(), value.to_string());
644 }
645
646 fn record_u64(&mut self, field: &Field, value: u64) {
647 self.fields
648 .insert(field.name().to_string(), value.to_string());
649 }
650
651 fn record_bool(&mut self, field: &Field, value: bool) {
652 self.fields
653 .insert(field.name().to_string(), value.to_string());
654 }
655 }
656
657 impl AllFieldsVisitor {
658 fn into_event(self) -> RecordedEvent {
659 let message = self
660 .fields
661 .get("message")
662 .cloned()
663 .unwrap_or_else(|| String::from(""));
664
665 let mut fields = BTreeMap::new();
666 for (key, value) in self.fields {
667 if key != "message"
668 && key != "internal_log_rate_limit"
669 && key != "internal_log_rate_secs"
670 {
671 fields.insert(key, value);
672 }
673 }
674
675 RecordedEvent { message, fields }
676 }
677 }
678
679 #[derive(Default)]
680 struct RecordingLayer<S> {
681 events: Arc<Mutex<Vec<RecordedEvent>>>,
682
683 _subscriber: std::marker::PhantomData<S>,
684 }
685
686 impl<S> RecordingLayer<S> {
687 fn new(events: Arc<Mutex<Vec<RecordedEvent>>>) -> Self {
688 RecordingLayer {
689 events,
690
691 _subscriber: std::marker::PhantomData,
692 }
693 }
694 }
695
696 impl<S> Layer<S> for RecordingLayer<S>
697 where
698 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
699 {
700 fn register_callsite(&self, _metadata: &'static Metadata<'static>) -> Interest {
701 Interest::always()
702 }
703
704 fn enabled(&self, _metadata: &Metadata<'_>, _ctx: Context<'_, S>) -> bool {
705 true
706 }
707
708 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
709 let mut visitor = AllFieldsVisitor::default();
710 event.record(&mut visitor);
711
712 if let Some(span) = ctx.lookup_current() {
714 for span_ref in span.scope().from_root() {
715 let extensions = span_ref.extensions();
716 if let Some(span_keys) = extensions.get::<RateLimitedSpanKeys>() {
717 if let Some(TraceValue::String(ref s)) = span_keys.component_id {
719 visitor.fields.insert("component_id".to_string(), s.clone());
720 }
721 }
722 }
723 }
724
725 let mut events = self.events.lock().unwrap();
726 events.push(visitor.into_event());
727 }
728 }
729
730 fn setup_test(
733 default_limit: u64,
734 ) -> (
735 Arc<Mutex<Vec<RecordedEvent>>>,
736 impl Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
737 ) {
738 let events: Arc<Mutex<Vec<RecordedEvent>>> = Default::default();
739 let recorder = RecordingLayer::new(Arc::clone(&events));
740 let sub = tracing_subscriber::registry::Registry::default()
741 .with(RateLimitedLayer::new(recorder).with_default_limit(default_limit));
742 (events, sub)
743 }
744
745 #[test]
746 #[serial]
747 fn rate_limits() {
748 let (events, sub) = setup_test(1);
749 tracing::subscriber::with_default(sub, || {
750 for _ in 0..21 {
751 info!(message = "Hello world!");
752 MockClock::advance(Duration::from_millis(100));
753 }
754 });
755
756 let events = events.lock().unwrap();
757
758 assert_eq!(
759 *events,
760 vec![
761 event!("Hello world!"),
762 event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
763 event!("Internal log [Hello world!] has been suppressed 9 times."),
764 event!("Hello world!"),
765 event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
766 event!("Internal log [Hello world!] has been suppressed 9 times."),
767 event!("Hello world!"),
768 ]
769 );
770 }
771
772 #[test]
773 #[serial]
774 fn override_rate_limit_at_callsite() {
775 let (events, sub) = setup_test(100);
776 tracing::subscriber::with_default(sub, || {
777 for _ in 0..31 {
778 info!(message = "Hello world!", internal_log_rate_secs = 2);
779 MockClock::advance(Duration::from_millis(100));
780 }
781 });
782
783 let events = events.lock().unwrap();
784
785 assert_eq!(
790 *events,
791 vec![
792 event!("Hello world!"),
793 event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
794 event!("Internal log [Hello world!] has been suppressed 19 times."),
795 event!("Hello world!"),
796 event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
797 ]
798 );
799 }
800
801 #[test]
802 #[serial]
803 fn rate_limit_by_event_key() {
804 let (events, sub) = setup_test(1);
805 tracing::subscriber::with_default(sub, || {
806 for _ in 0..21 {
807 for key in &["foo", "bar"] {
808 info!(
809 message = format!("Hello {key}!").as_str(),
810 component_id = &key
811 );
812 }
813 MockClock::advance(Duration::from_millis(100));
814 }
815 });
816
817 let events = events.lock().unwrap();
818
819 assert_eq!(
821 *events,
822 vec![
823 event!("Hello foo!", component_id: "foo"),
824 event!("Hello bar!", component_id: "bar"),
825 event!("Internal log [Hello foo!] is being suppressed to avoid flooding."),
826 event!("Internal log [Hello bar!] is being suppressed to avoid flooding."),
827 event!("Internal log [Hello foo!] has been suppressed 9 times."),
828 event!("Hello foo!", component_id: "foo"),
829 event!("Internal log [Hello bar!] has been suppressed 9 times."),
830 event!("Hello bar!", component_id: "bar"),
831 event!("Internal log [Hello foo!] is being suppressed to avoid flooding."),
832 event!("Internal log [Hello bar!] is being suppressed to avoid flooding."),
833 event!("Internal log [Hello foo!] has been suppressed 9 times."),
834 event!("Hello foo!", component_id: "foo"),
835 event!("Internal log [Hello bar!] has been suppressed 9 times."),
836 event!("Hello bar!", component_id: "bar"),
837 ]
838 );
839 }
840
841 #[test]
842 #[serial]
843 fn disabled_rate_limit() {
844 let (events, sub) = setup_test(1);
845 tracing::subscriber::with_default(sub, || {
846 for _ in 0..21 {
847 info!(message = "Hello world!", internal_log_rate_limit = false);
848 MockClock::advance(Duration::from_millis(100));
849 }
850 });
851
852 let events = events.lock().unwrap();
853
854 assert_eq!(events.len(), 21);
856 assert!(events.iter().all(|e| e == &event!("Hello world!")));
857 }
858
859 #[test]
860 #[serial]
861 fn rate_limit_ignores_non_special_fields() {
862 let (events, sub) = setup_test(1);
863 tracing::subscriber::with_default(sub, || {
864 for i in 0..21 {
865 for _ in 0..3 {
868 let fanout = if i % 2 == 0 { "output_1" } else { "output_2" };
869 info!(
870 message = "Routing event",
871 component_id = "router",
872 fanout_id = fanout
873 );
874 }
875 MockClock::advance(Duration::from_millis(100));
876 }
877 });
878
879 let events = events.lock().unwrap();
880
881 assert_eq!(
885 *events,
886 vec![
887 event!("Routing event", component_id: "router", fanout_id: "output_1"),
889 event!("Internal log [Routing event] is being suppressed to avoid flooding."),
890 event!("Internal log [Routing event] has been suppressed 29 times."),
892 event!("Routing event", component_id: "router", fanout_id: "output_1"),
893 event!("Internal log [Routing event] is being suppressed to avoid flooding."),
894 event!("Internal log [Routing event] has been suppressed 29 times."),
895 event!("Routing event", component_id: "router", fanout_id: "output_1"),
896 event!("Internal log [Routing event] is being suppressed to avoid flooding."),
897 ]
898 );
899 }
900
901 #[test]
902 #[serial]
903 fn nested_spans_child_takes_precedence() {
904 let (events, sub) = setup_test(1);
905 tracing::subscriber::with_default(sub, || {
906 let outer = info_span!("outer", component_id = "parent");
908 let _outer_guard = outer.enter();
909
910 for _ in 0..21 {
911 let inner = info_span!("inner", component_id = "child");
913 let _inner_guard = inner.enter();
914 info!(message = "Nested event");
915 drop(_inner_guard);
916
917 MockClock::advance(Duration::from_millis(100));
918 }
919 });
920
921 let events = events.lock().unwrap();
922
923 assert_eq!(
926 *events,
927 vec![
928 event!("Nested event", component_id: "child"),
929 event!("Internal log [Nested event] is being suppressed to avoid flooding.", component_id: "child"),
930 event!("Internal log [Nested event] has been suppressed 9 times.", component_id: "child"),
931 event!("Nested event", component_id: "child"),
932 event!("Internal log [Nested event] is being suppressed to avoid flooding.", component_id: "child"),
933 event!("Internal log [Nested event] has been suppressed 9 times.", component_id: "child"),
934 event!("Nested event", component_id: "child"),
935 ]
936 );
937 }
938
939 #[test]
940 #[serial]
941 fn nested_spans_ignores_untracked_fields() {
942 let (events, sub) = setup_test(1);
943 tracing::subscriber::with_default(sub, || {
944 let outer = info_span!("outer", component_id = "transform");
946 let _outer_guard = outer.enter();
947
948 for _ in 0..21 {
949 let inner = info_span!("inner", some_field = "value");
950 let _inner_guard = inner.enter();
951 info!(message = "Event message");
952 drop(_inner_guard);
953
954 MockClock::advance(Duration::from_millis(100));
955 }
956 });
957
958 let events = events.lock().unwrap();
959
960 assert_eq!(
963 *events,
964 vec![
965 event!("Event message", component_id: "transform"),
966 event!(
967 "Internal log [Event message] is being suppressed to avoid flooding.",
968 component_id: "transform"
969 ),
970 event!(
971 "Internal log [Event message] has been suppressed 9 times.",
972 component_id: "transform"
973 ),
974 event!("Event message", component_id: "transform"),
975 event!(
976 "Internal log [Event message] is being suppressed to avoid flooding.",
977 component_id: "transform"
978 ),
979 event!(
980 "Internal log [Event message] has been suppressed 9 times.",
981 component_id: "transform"
982 ),
983 event!("Event message", component_id: "transform"),
984 ]
985 );
986 }
987
988 #[test]
989 #[serial]
990 fn rate_limit_same_message_different_component() {
991 let (events, sub) = setup_test(1);
992 tracing::subscriber::with_default(sub, || {
993 for component in &["foo", "foo", "bar"] {
996 info!(message = "Hello!", component_id = component);
997 MockClock::advance(Duration::from_millis(100));
998 }
999 });
1000
1001 let events = events.lock().unwrap();
1002
1003 assert_eq!(
1009 *events,
1010 vec![
1011 event!("Hello!", component_id: "foo"),
1012 event!("Internal log [Hello!] is being suppressed to avoid flooding."),
1013 event!("Hello!", component_id: "bar"),
1014 ]
1015 );
1016 }
1017
1018 #[test]
1019 #[serial]
1020 fn events_with_custom_fields_no_message_dont_panic() {
1021 let (events, sub) = setup_test(1);
1024 tracing::subscriber::with_default(sub, || {
1025 let emit_event = || {
1027 debug!(component_id = "test_component", utilization = 0.85);
1028 };
1029
1030 for _ in 0..5 {
1032 emit_event();
1033 MockClock::advance(Duration::from_millis(100));
1034 }
1035
1036 MockClock::advance(Duration::from_millis(1000));
1038
1039 emit_event();
1041 });
1042
1043 let events = events.lock().unwrap();
1044
1045 assert_eq!(
1048 *events,
1049 vec![
1050 event!("", component_id: "test_component", utilization: "0.85"),
1051 event!("", component_id: "test_component", utilization: "0.85"),
1052 ]
1053 );
1054 }
1055}