1#![deny(warnings)]
2use std::fmt;
99
100use dashmap::DashMap;
101use tracing_core::{
102 Event, Metadata, Subscriber,
103 callsite::Identifier,
104 field::{Field, Value, Visit, display},
105 span,
106 subscriber::Interest,
107};
108use tracing_subscriber::layer::{Context, Layer};
109
110#[cfg(test)]
111#[macro_use]
112extern crate tracing;
113
114#[cfg(not(test))]
115use std::time::Instant;
116
117#[cfg(test)]
118use mock_instant::global::Instant;
119
120const RATE_LIMIT_FIELD: &str = "internal_log_rate_limit";
121const RATE_LIMIT_SECS_FIELD: &str = "internal_log_rate_secs";
122const MESSAGE_FIELD: &str = "message";
123
124const COMPONENT_ID_FIELD: &str = "component_id";
127
128#[derive(Eq, PartialEq, Hash, Clone)]
129struct RateKeyIdentifier {
130 callsite: Identifier,
131 rate_limit_key_values: RateLimitedSpanKeys,
132}
133
134pub struct RateLimitedLayer<S, L>
135where
136 L: Layer<S> + Sized,
137 S: Subscriber,
138{
139 events: DashMap<RateKeyIdentifier, State>,
140 inner: L,
141 internal_log_rate_limit: u64,
142 _subscriber: std::marker::PhantomData<S>,
143}
144
145impl<S, L> RateLimitedLayer<S, L>
146where
147 L: Layer<S> + Sized,
148 S: Subscriber,
149{
150 pub fn new(layer: L) -> Self {
151 RateLimitedLayer {
152 events: Default::default(),
153 internal_log_rate_limit: 10,
154 inner: layer,
155 _subscriber: std::marker::PhantomData,
156 }
157 }
158
159 pub fn with_default_limit(mut self, internal_log_rate_limit: u64) -> Self {
167 self.internal_log_rate_limit = internal_log_rate_limit;
168 self
169 }
170}
171
172impl<S, L> Layer<S> for RateLimitedLayer<S, L>
173where
174 L: Layer<S>,
175 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
176{
177 #[inline]
178 fn register_callsite(&self, metadata: &'static Metadata<'static>) -> Interest {
179 self.inner.register_callsite(metadata)
180 }
181
182 #[inline]
183 fn enabled(&self, metadata: &Metadata<'_>, ctx: Context<'_, S>) -> bool {
184 self.inner.enabled(metadata, ctx)
185 }
186
187 fn on_new_span(&self, attrs: &span::Attributes<'_>, id: &span::Id, ctx: Context<'_, S>) {
189 {
190 let span = ctx.span(id).expect("Span not found, this is a bug");
191 let mut extensions = span.extensions_mut();
192
193 if extensions.get_mut::<RateLimitedSpanKeys>().is_none() {
194 let mut fields = RateLimitedSpanKeys::default();
195 attrs.record(&mut fields);
196 extensions.insert(fields);
197 };
198 }
199 self.inner.on_new_span(attrs, id, ctx);
200 }
201
202 fn on_record(&self, id: &span::Id, values: &span::Record<'_>, ctx: Context<'_, S>) {
204 {
205 let span = ctx.span(id).expect("Span not found, this is a bug");
206 let mut extensions = span.extensions_mut();
207
208 match extensions.get_mut::<RateLimitedSpanKeys>() {
209 Some(fields) => {
210 values.record(fields);
211 }
212 None => {
213 let mut fields = RateLimitedSpanKeys::default();
214 values.record(&mut fields);
215 extensions.insert(fields);
216 }
217 };
218 }
219 self.inner.on_record(id, values, ctx);
220 }
221
222 #[inline]
223 fn on_follows_from(&self, span: &span::Id, follows: &span::Id, ctx: Context<'_, S>) {
224 self.inner.on_follows_from(span, follows, ctx);
225 }
226
227 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
228 let mut limit_visitor = LimitVisitor::default();
231 event.record(&mut limit_visitor);
232
233 let limit_exists = limit_visitor.limit.unwrap_or(true);
234 if !limit_exists {
235 return self.inner.on_event(event, ctx);
236 }
237
238 let limit = match limit_visitor.limit_secs {
239 Some(limit_secs) => limit_secs, None => self.internal_log_rate_limit,
241 };
242
243 let rate_limit_key_values = {
257 let mut keys = RateLimitedSpanKeys::default();
258 event.record(&mut keys);
260
261 ctx.lookup_current()
263 .into_iter()
264 .flat_map(|span| span.scope().from_root())
265 .fold(keys, |mut keys, span| {
266 let extensions = span.extensions();
267 if let Some(span_keys) = extensions.get::<RateLimitedSpanKeys>() {
268 keys.merge(span_keys);
269 }
270 keys
271 })
272 };
273
274 let metadata = event.metadata();
277 let id = RateKeyIdentifier {
278 callsite: metadata.callsite(),
279 rate_limit_key_values,
280 };
281
282 let mut state = self.events.entry(id).or_insert_with(|| {
283 let mut message_visitor = MessageVisitor::default();
284 event.record(&mut message_visitor);
285
286 let message = message_visitor
287 .message
288 .unwrap_or_else(|| metadata.name().into());
289
290 State::new(message, limit)
291 });
292
293 let previous_count = state.increment_count();
299 if state.should_limit() {
300 match previous_count {
301 0 => self.inner.on_event(event, ctx),
302 1 => {
303 let message = format!(
304 "Internal log [{}] is being suppressed to avoid flooding.",
305 state.message
306 );
307 self.create_event(&ctx, metadata, message, state.limit);
308 }
309 _ => {}
310 }
311 } else {
312 if previous_count > 1 {
315 let message = format!(
316 "Internal log [{}] has been suppressed {} times.",
317 state.message,
318 previous_count - 1
319 );
320
321 self.create_event(&ctx, metadata, message, state.limit);
322 }
323
324 self.inner.on_event(event, ctx);
327
328 state.reset();
329 }
330 }
331
332 #[inline]
333 fn on_enter(&self, id: &span::Id, ctx: Context<'_, S>) {
334 self.inner.on_enter(id, ctx);
335 }
336
337 #[inline]
338 fn on_exit(&self, id: &span::Id, ctx: Context<'_, S>) {
339 self.inner.on_exit(id, ctx);
340 }
341
342 #[inline]
343 fn on_close(&self, id: span::Id, ctx: Context<'_, S>) {
344 self.inner.on_close(id, ctx);
345 }
346
347 #[inline]
348 fn on_id_change(&self, old: &span::Id, new: &span::Id, ctx: Context<'_, S>) {
349 self.inner.on_id_change(old, new, ctx);
350 }
351
352 #[inline]
353 fn on_layer(&mut self, subscriber: &mut S) {
354 self.inner.on_layer(subscriber);
355 }
356}
357
358impl<S, L> RateLimitedLayer<S, L>
359where
360 S: Subscriber,
361 L: Layer<S>,
362{
363 fn create_event(
364 &self,
365 ctx: &Context<S>,
366 metadata: &'static Metadata<'static>,
367 message: String,
368 rate_limit: u64,
369 ) {
370 let fields = metadata.fields();
371
372 let message = display(message);
373
374 if let Some(message_field) = fields.field("message") {
375 let values = [(&message_field, Some(&message as &dyn Value))];
376
377 let valueset = fields.value_set(&values);
378 let event = Event::new(metadata, &valueset);
379 self.inner.on_event(&event, ctx.clone());
380 } else {
381 let values = [(
382 &fields.field(RATE_LIMIT_FIELD).unwrap(),
383 Some(&rate_limit as &dyn Value),
384 )];
385
386 let valueset = fields.value_set(&values);
387 let event = Event::new(metadata, &valueset);
388 self.inner.on_event(&event, ctx.clone());
389 }
390 }
391}
392
393#[derive(Debug)]
394struct State {
395 start: Instant,
396 count: u64,
397 limit: u64,
398 message: String,
399}
400
401impl State {
402 fn new(message: String, limit: u64) -> Self {
403 Self {
404 start: Instant::now(),
405 count: 0,
406 limit,
407 message,
408 }
409 }
410
411 fn reset(&mut self) {
412 self.start = Instant::now();
413 self.count = 1;
414 }
415
416 fn increment_count(&mut self) -> u64 {
417 let prev = self.count;
418 self.count += 1;
419 prev
420 }
421
422 fn should_limit(&self) -> bool {
423 self.start.elapsed().as_secs() < self.limit
424 }
425}
426
427#[derive(PartialEq, Eq, Clone, Hash)]
428enum TraceValue {
429 String(String),
430 Int(i64),
431 Uint(u64),
432 Bool(bool),
433}
434
435#[cfg(test)]
436impl fmt::Display for TraceValue {
437 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
438 match self {
439 TraceValue::String(s) => write!(f, "{}", s),
440 TraceValue::Int(i) => write!(f, "{}", i),
441 TraceValue::Uint(u) => write!(f, "{}", u),
442 TraceValue::Bool(b) => write!(f, "{}", b),
443 }
444 }
445}
446
447impl From<bool> for TraceValue {
448 fn from(b: bool) -> Self {
449 TraceValue::Bool(b)
450 }
451}
452
453impl From<i64> for TraceValue {
454 fn from(i: i64) -> Self {
455 TraceValue::Int(i)
456 }
457}
458
459impl From<u64> for TraceValue {
460 fn from(u: u64) -> Self {
461 TraceValue::Uint(u)
462 }
463}
464
465impl From<String> for TraceValue {
466 fn from(s: String) -> Self {
467 TraceValue::String(s)
468 }
469}
470
471#[derive(Default, Eq, PartialEq, Hash, Clone)]
485struct RateLimitedSpanKeys {
486 component_id: Option<TraceValue>,
487}
488
489impl RateLimitedSpanKeys {
490 fn record(&mut self, field: &Field, value: TraceValue) {
491 if field.name() == COMPONENT_ID_FIELD {
492 self.component_id = Some(value);
493 }
494 }
495
496 fn merge(&mut self, other: &Self) {
497 if let Some(component_id) = &other.component_id {
498 self.component_id = Some(component_id.clone());
499 }
500 }
501}
502
503impl Visit for RateLimitedSpanKeys {
504 fn record_i64(&mut self, field: &Field, value: i64) {
505 self.record(field, value.into());
506 }
507
508 fn record_u64(&mut self, field: &Field, value: u64) {
509 self.record(field, value.into());
510 }
511
512 fn record_bool(&mut self, field: &Field, value: bool) {
513 self.record(field, value.into());
514 }
515
516 fn record_str(&mut self, field: &Field, value: &str) {
517 self.record(field, value.to_owned().into());
518 }
519
520 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
521 self.record(field, format!("{value:?}").into());
522 }
523}
524
525#[derive(Default)]
526struct LimitVisitor {
527 pub limit: Option<bool>,
528 pub limit_secs: Option<u64>,
529}
530
531impl Visit for LimitVisitor {
532 fn record_bool(&mut self, field: &Field, value: bool) {
533 if field.name() == RATE_LIMIT_FIELD {
534 self.limit = Some(value);
535 }
536 }
537
538 fn record_i64(&mut self, field: &Field, value: i64) {
539 if field.name() == RATE_LIMIT_SECS_FIELD {
540 self.limit = Some(true); self.limit_secs = Some(u64::try_from(value).unwrap_or_default()); }
543 }
544
545 fn record_u64(&mut self, field: &Field, value: u64) {
546 if field.name() == RATE_LIMIT_SECS_FIELD {
547 self.limit = Some(true); self.limit_secs = Some(value); }
550 }
551
552 fn record_debug(&mut self, _field: &Field, _value: &dyn fmt::Debug) {}
553}
554
555#[derive(Default)]
556struct MessageVisitor {
557 pub message: Option<String>,
558}
559
560impl Visit for MessageVisitor {
561 fn record_str(&mut self, field: &Field, value: &str) {
562 if self.message.is_none() && field.name() == MESSAGE_FIELD {
563 self.message = Some(value.to_string());
564 }
565 }
566
567 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
568 if self.message.is_none() && field.name() == MESSAGE_FIELD {
569 self.message = Some(format!("{value:?}"));
570 }
571 }
572}
573
574#[cfg(test)]
575mod test {
576 use std::{
577 collections::BTreeMap,
578 sync::{Arc, Mutex},
579 time::Duration,
580 };
581
582 use mock_instant::global::MockClock;
583 use serial_test::serial;
584 use tracing_subscriber::layer::SubscriberExt;
585
586 use super::*;
587
588 #[derive(Debug, Clone, PartialEq, Eq)]
589 struct RecordedEvent {
590 message: String,
591 fields: BTreeMap<String, String>,
592 }
593
594 impl RecordedEvent {
595 fn new(message: impl Into<String>) -> Self {
596 Self {
597 message: message.into(),
598 fields: BTreeMap::new(),
599 }
600 }
601
602 fn with_field(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
603 self.fields.insert(key.into(), value.into());
604 self
605 }
606 }
607
608 macro_rules! event {
614 ($msg:expr) => {
615 RecordedEvent::new($msg)
616 };
617 ($msg:expr, $($key:ident: $value:expr),+ $(,)?) => {
618 RecordedEvent::new($msg)
619 $(.with_field(stringify!($key), $value))+
620 };
621 }
622
623 #[derive(Default)]
624 struct AllFieldsVisitor {
625 fields: BTreeMap<String, String>,
626 }
627
628 impl Visit for AllFieldsVisitor {
629 fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
630 self.fields
631 .insert(field.name().to_string(), format!("{value:?}"));
632 }
633
634 fn record_str(&mut self, field: &Field, value: &str) {
635 self.fields
636 .insert(field.name().to_string(), value.to_string());
637 }
638
639 fn record_i64(&mut self, field: &Field, value: i64) {
640 self.fields
641 .insert(field.name().to_string(), value.to_string());
642 }
643
644 fn record_u64(&mut self, field: &Field, value: u64) {
645 self.fields
646 .insert(field.name().to_string(), value.to_string());
647 }
648
649 fn record_bool(&mut self, field: &Field, value: bool) {
650 self.fields
651 .insert(field.name().to_string(), value.to_string());
652 }
653 }
654
655 impl AllFieldsVisitor {
656 fn into_event(self) -> RecordedEvent {
657 let message = self
658 .fields
659 .get("message")
660 .cloned()
661 .unwrap_or_else(|| String::from(""));
662
663 let mut fields = BTreeMap::new();
664 for (key, value) in self.fields {
665 if key != "message"
666 && key != "internal_log_rate_limit"
667 && key != "internal_log_rate_secs"
668 {
669 fields.insert(key, value);
670 }
671 }
672
673 RecordedEvent { message, fields }
674 }
675 }
676
677 #[derive(Default)]
678 struct RecordingLayer<S> {
679 events: Arc<Mutex<Vec<RecordedEvent>>>,
680
681 _subscriber: std::marker::PhantomData<S>,
682 }
683
684 impl<S> RecordingLayer<S> {
685 fn new(events: Arc<Mutex<Vec<RecordedEvent>>>) -> Self {
686 RecordingLayer {
687 events,
688
689 _subscriber: std::marker::PhantomData,
690 }
691 }
692 }
693
694 impl<S> Layer<S> for RecordingLayer<S>
695 where
696 S: Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
697 {
698 fn register_callsite(&self, _metadata: &'static Metadata<'static>) -> Interest {
699 Interest::always()
700 }
701
702 fn enabled(&self, _metadata: &Metadata<'_>, _ctx: Context<'_, S>) -> bool {
703 true
704 }
705
706 fn on_event(&self, event: &Event<'_>, ctx: Context<'_, S>) {
707 let mut visitor = AllFieldsVisitor::default();
708 event.record(&mut visitor);
709
710 if let Some(span) = ctx.lookup_current() {
712 for span_ref in span.scope().from_root() {
713 let extensions = span_ref.extensions();
714 if let Some(span_keys) = extensions.get::<RateLimitedSpanKeys>() {
715 if let Some(TraceValue::String(ref s)) = span_keys.component_id {
717 visitor.fields.insert("component_id".to_string(), s.clone());
718 }
719 }
720 }
721 }
722
723 let mut events = self.events.lock().unwrap();
724 events.push(visitor.into_event());
725 }
726 }
727
728 fn setup_test(
731 default_limit: u64,
732 ) -> (
733 Arc<Mutex<Vec<RecordedEvent>>>,
734 impl Subscriber + for<'a> tracing_subscriber::registry::LookupSpan<'a>,
735 ) {
736 let events: Arc<Mutex<Vec<RecordedEvent>>> = Default::default();
737 let recorder = RecordingLayer::new(Arc::clone(&events));
738 let sub = tracing_subscriber::registry::Registry::default()
739 .with(RateLimitedLayer::new(recorder).with_default_limit(default_limit));
740 (events, sub)
741 }
742
743 #[test]
744 #[serial]
745 fn rate_limits() {
746 let (events, sub) = setup_test(1);
747 tracing::subscriber::with_default(sub, || {
748 for _ in 0..21 {
749 info!(message = "Hello world!");
750 MockClock::advance(Duration::from_millis(100));
751 }
752 });
753
754 let events = events.lock().unwrap();
755
756 assert_eq!(
757 *events,
758 vec![
759 event!("Hello world!"),
760 event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
761 event!("Internal log [Hello world!] has been suppressed 9 times."),
762 event!("Hello world!"),
763 event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
764 event!("Internal log [Hello world!] has been suppressed 9 times."),
765 event!("Hello world!"),
766 ]
767 );
768 }
769
770 #[test]
771 #[serial]
772 fn override_rate_limit_at_callsite() {
773 let (events, sub) = setup_test(100);
774 tracing::subscriber::with_default(sub, || {
775 for _ in 0..31 {
776 info!(message = "Hello world!", internal_log_rate_secs = 2);
777 MockClock::advance(Duration::from_millis(100));
778 }
779 });
780
781 let events = events.lock().unwrap();
782
783 assert_eq!(
788 *events,
789 vec![
790 event!("Hello world!"),
791 event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
792 event!("Internal log [Hello world!] has been suppressed 19 times."),
793 event!("Hello world!"),
794 event!("Internal log [Hello world!] is being suppressed to avoid flooding."),
795 ]
796 );
797 }
798
799 #[test]
800 #[serial]
801 fn rate_limit_by_event_key() {
802 let (events, sub) = setup_test(1);
803 tracing::subscriber::with_default(sub, || {
804 for _ in 0..21 {
805 for key in &["foo", "bar"] {
806 info!(
807 message = format!("Hello {key}!").as_str(),
808 component_id = &key
809 );
810 }
811 MockClock::advance(Duration::from_millis(100));
812 }
813 });
814
815 let events = events.lock().unwrap();
816
817 assert_eq!(
819 *events,
820 vec![
821 event!("Hello foo!", component_id: "foo"),
822 event!("Hello bar!", component_id: "bar"),
823 event!("Internal log [Hello foo!] is being suppressed to avoid flooding."),
824 event!("Internal log [Hello bar!] is being suppressed to avoid flooding."),
825 event!("Internal log [Hello foo!] has been suppressed 9 times."),
826 event!("Hello foo!", component_id: "foo"),
827 event!("Internal log [Hello bar!] has been suppressed 9 times."),
828 event!("Hello bar!", component_id: "bar"),
829 event!("Internal log [Hello foo!] is being suppressed to avoid flooding."),
830 event!("Internal log [Hello bar!] is being suppressed to avoid flooding."),
831 event!("Internal log [Hello foo!] has been suppressed 9 times."),
832 event!("Hello foo!", component_id: "foo"),
833 event!("Internal log [Hello bar!] has been suppressed 9 times."),
834 event!("Hello bar!", component_id: "bar"),
835 ]
836 );
837 }
838
839 #[test]
840 #[serial]
841 fn disabled_rate_limit() {
842 let (events, sub) = setup_test(1);
843 tracing::subscriber::with_default(sub, || {
844 for _ in 0..21 {
845 info!(message = "Hello world!", internal_log_rate_limit = false);
846 MockClock::advance(Duration::from_millis(100));
847 }
848 });
849
850 let events = events.lock().unwrap();
851
852 assert_eq!(events.len(), 21);
854 assert!(events.iter().all(|e| e == &event!("Hello world!")));
855 }
856
857 #[test]
858 #[serial]
859 fn rate_limit_ignores_non_special_fields() {
860 let (events, sub) = setup_test(1);
861 tracing::subscriber::with_default(sub, || {
862 for i in 0..21 {
863 for _ in 0..3 {
866 let fanout = if i % 2 == 0 { "output_1" } else { "output_2" };
867 info!(
868 message = "Routing event",
869 component_id = "router",
870 fanout_id = fanout
871 );
872 }
873 MockClock::advance(Duration::from_millis(100));
874 }
875 });
876
877 let events = events.lock().unwrap();
878
879 assert_eq!(
883 *events,
884 vec![
885 event!("Routing event", component_id: "router", fanout_id: "output_1"),
887 event!("Internal log [Routing event] is being suppressed to avoid flooding."),
888 event!("Internal log [Routing event] has been suppressed 29 times."),
890 event!("Routing event", component_id: "router", fanout_id: "output_1"),
891 event!("Internal log [Routing event] is being suppressed to avoid flooding."),
892 event!("Internal log [Routing event] has been suppressed 29 times."),
893 event!("Routing event", component_id: "router", fanout_id: "output_1"),
894 event!("Internal log [Routing event] is being suppressed to avoid flooding."),
895 ]
896 );
897 }
898
899 #[test]
900 #[serial]
901 fn nested_spans_child_takes_precedence() {
902 let (events, sub) = setup_test(1);
903 tracing::subscriber::with_default(sub, || {
904 let outer = info_span!("outer", component_id = "parent");
906 let _outer_guard = outer.enter();
907
908 for _ in 0..21 {
909 let inner = info_span!("inner", component_id = "child");
911 let _inner_guard = inner.enter();
912 info!(message = "Nested event");
913 drop(_inner_guard);
914
915 MockClock::advance(Duration::from_millis(100));
916 }
917 });
918
919 let events = events.lock().unwrap();
920
921 assert_eq!(
924 *events,
925 vec![
926 event!("Nested event", component_id: "child"),
927 event!("Internal log [Nested event] is being suppressed to avoid flooding.", component_id: "child"),
928 event!("Internal log [Nested event] has been suppressed 9 times.", component_id: "child"),
929 event!("Nested event", component_id: "child"),
930 event!("Internal log [Nested event] is being suppressed to avoid flooding.", component_id: "child"),
931 event!("Internal log [Nested event] has been suppressed 9 times.", component_id: "child"),
932 event!("Nested event", component_id: "child"),
933 ]
934 );
935 }
936
937 #[test]
938 #[serial]
939 fn nested_spans_ignores_untracked_fields() {
940 let (events, sub) = setup_test(1);
941 tracing::subscriber::with_default(sub, || {
942 let outer = info_span!("outer", component_id = "transform");
944 let _outer_guard = outer.enter();
945
946 for _ in 0..21 {
947 let inner = info_span!("inner", some_field = "value");
948 let _inner_guard = inner.enter();
949 info!(message = "Event message");
950 drop(_inner_guard);
951
952 MockClock::advance(Duration::from_millis(100));
953 }
954 });
955
956 let events = events.lock().unwrap();
957
958 assert_eq!(
961 *events,
962 vec![
963 event!("Event message", component_id: "transform"),
964 event!(
965 "Internal log [Event message] is being suppressed to avoid flooding.",
966 component_id: "transform"
967 ),
968 event!(
969 "Internal log [Event message] has been suppressed 9 times.",
970 component_id: "transform"
971 ),
972 event!("Event message", component_id: "transform"),
973 event!(
974 "Internal log [Event message] is being suppressed to avoid flooding.",
975 component_id: "transform"
976 ),
977 event!(
978 "Internal log [Event message] has been suppressed 9 times.",
979 component_id: "transform"
980 ),
981 event!("Event message", component_id: "transform"),
982 ]
983 );
984 }
985
986 #[test]
987 #[serial]
988 fn rate_limit_same_message_different_component() {
989 let (events, sub) = setup_test(1);
990 tracing::subscriber::with_default(sub, || {
991 for component in &["foo", "foo", "bar"] {
994 info!(message = "Hello!", component_id = component);
995 MockClock::advance(Duration::from_millis(100));
996 }
997 });
998
999 let events = events.lock().unwrap();
1000
1001 assert_eq!(
1007 *events,
1008 vec![
1009 event!("Hello!", component_id: "foo"),
1010 event!("Internal log [Hello!] is being suppressed to avoid flooding."),
1011 event!("Hello!", component_id: "bar"),
1012 ]
1013 );
1014 }
1015}