vector/transforms/tag_cardinality_limit/
mod.rs1use std::{future::ready, pin::Pin};
2
3use futures::{Stream, StreamExt};
4use hashbrown::HashMap;
5
6use crate::{
7 event::Event,
8 internal_events::{
9 TagCardinalityLimitRejectingEvent, TagCardinalityLimitRejectingTag,
10 TagCardinalityValueLimitReached,
11 },
12 transforms::{TaskTransform, tag_cardinality_limit::config::LimitExceededAction},
13};
14
15mod config;
16mod tag_value_set;
17
18#[cfg(test)]
19mod tests;
20
21pub use config::{TagCardinalityLimitConfig, TagCardinalityLimitInnerConfig};
22use tag_value_set::AcceptedTagValueSet;
23
24use crate::event::metric::TagValueSet;
25
26type MetricId = (Option<String>, String);
27
28#[derive(Debug)]
29pub struct TagCardinalityLimit {
30 config: TagCardinalityLimitConfig,
31 accepted_tags: HashMap<Option<MetricId>, HashMap<String, AcceptedTagValueSet>>,
32}
33
34impl TagCardinalityLimit {
35 fn new(config: TagCardinalityLimitConfig) -> Self {
36 Self {
37 config,
38 accepted_tags: HashMap::new(),
39 }
40 }
41
42 fn get_config_for_metric(
43 &self,
44 metric_key: Option<&MetricId>,
45 ) -> &TagCardinalityLimitInnerConfig {
46 match metric_key {
47 Some(id) => self
48 .config
49 .per_metric_limits
50 .iter()
51 .find(|(name, config)| {
52 **name == id.1 && (config.namespace.is_none() || config.namespace == id.0)
53 })
54 .map(|(_, c)| &c.config)
55 .unwrap_or(&self.config.global),
56 None => &self.config.global,
57 }
58 }
59
60 fn try_accept_tag(
69 &mut self,
70 metric_key: Option<&MetricId>,
71 key: &str,
72 value: &TagValueSet,
73 ) -> bool {
74 let config = self.get_config_for_metric(metric_key).clone();
75 let metric_accepted_tags = self.accepted_tags.entry(metric_key.cloned()).or_default();
76 let tag_value_set = metric_accepted_tags
77 .entry_ref(key)
78 .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode));
79
80 if tag_value_set.contains(value) {
81 return true;
83 }
84
85 if tag_value_set.len() < config.value_limit {
87 tag_value_set.insert(value.clone());
89
90 if tag_value_set.len() == config.value_limit {
91 emit!(TagCardinalityValueLimitReached { key });
92 }
93
94 true
95 } else {
96 false
98 }
99 }
100
101 fn tag_limit_exceeded(
104 &self,
105 metric_key: Option<&MetricId>,
106 key: &str,
107 value: &TagValueSet,
108 ) -> bool {
109 self.accepted_tags
110 .get(&metric_key.cloned())
111 .and_then(|metric_accepted_tags| {
112 metric_accepted_tags.get(key).map(|value_set| {
113 !value_set.contains(value)
114 && value_set.len() >= self.get_config_for_metric(metric_key).value_limit
115 })
116 })
117 .unwrap_or(false)
118 }
119
120 fn record_tag_value(&mut self, metric_key: Option<&MetricId>, key: &str, value: &TagValueSet) {
122 let config = self.get_config_for_metric(metric_key).clone();
123 let metric_accepted_tags = self.accepted_tags.entry(metric_key.cloned()).or_default();
124 metric_accepted_tags
125 .entry_ref(key)
126 .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode))
127 .insert(value.clone());
128 }
129
130 fn transform_one(&mut self, mut event: Event) -> Option<Event> {
131 let metric = event.as_mut_metric();
132 let metric_name = metric.name().to_string();
133 let metric_namespace = metric.namespace().map(|n| n.to_string());
134 let has_per_metric_config = self.config.per_metric_limits.iter().any(|(name, config)| {
135 *name == metric_name
136 && (config.namespace.is_none() || config.namespace == metric_namespace)
137 });
138 let metric_key = if has_per_metric_config {
139 Some((metric_namespace, metric_name.clone()))
140 } else {
141 None
142 };
143 if let Some(tags_map) = metric.tags_mut() {
144 match self
145 .get_config_for_metric(metric_key.as_ref())
146 .limit_exceeded_action
147 {
148 LimitExceededAction::DropEvent => {
149 for (key, value) in tags_map.iter_sets() {
153 if self.tag_limit_exceeded(metric_key.as_ref(), key, value) {
154 emit!(TagCardinalityLimitRejectingEvent {
155 metric_name: &metric_name,
156 tag_key: key,
157 tag_value: &value.to_string(),
158 });
159 return None;
160 }
161 }
162 for (key, value) in tags_map.iter_sets() {
163 self.record_tag_value(metric_key.as_ref(), key, value);
164 }
165 }
166 LimitExceededAction::DropTag => {
167 tags_map.retain(|key, value| {
168 if self.try_accept_tag(metric_key.as_ref(), key, value) {
169 true
170 } else {
171 emit!(TagCardinalityLimitRejectingTag {
172 metric_name: &metric_name,
173 tag_key: key,
174 tag_value: &value.to_string(),
175 });
176 false
177 }
178 });
179 }
180 }
181 }
182 Some(event)
183 }
184}
185
186impl TaskTransform<Event> for TagCardinalityLimit {
187 fn transform(
188 self: Box<Self>,
189 task: Pin<Box<dyn Stream<Item = Event> + Send>>,
190 ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
191 where
192 Self: 'static,
193 {
194 let mut inner = self;
195 Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
196 }
197}