vector/transforms/tag_cardinality_limit/
mod.rs

1use std::{future::ready, pin::Pin};
2
3use futures::{Stream, StreamExt};
4use hashbrown::HashMap;
5
6use crate::{
7    event::Event,
8    internal_events::{
9        TagCardinalityLimitRejectingEvent, TagCardinalityLimitRejectingTag,
10        TagCardinalityValueLimitReached,
11    },
12    transforms::{TaskTransform, tag_cardinality_limit::config::LimitExceededAction},
13};
14
15mod config;
16mod tag_value_set;
17
18#[cfg(test)]
19mod tests;
20
21pub use config::{TagCardinalityLimitConfig, TagCardinalityLimitInnerConfig};
22use tag_value_set::AcceptedTagValueSet;
23
24use crate::event::metric::TagValueSet;
25
26type MetricId = (Option<String>, String);
27
28#[derive(Debug)]
29pub struct TagCardinalityLimit {
30    config: TagCardinalityLimitConfig,
31    accepted_tags: HashMap<Option<MetricId>, HashMap<String, AcceptedTagValueSet>>,
32}
33
34impl TagCardinalityLimit {
35    fn new(config: TagCardinalityLimitConfig) -> Self {
36        Self {
37            config,
38            accepted_tags: HashMap::new(),
39        }
40    }
41
42    fn get_config_for_metric(
43        &self,
44        metric_key: Option<&MetricId>,
45    ) -> &TagCardinalityLimitInnerConfig {
46        match metric_key {
47            Some(id) => self
48                .config
49                .per_metric_limits
50                .iter()
51                .find(|(name, config)| {
52                    **name == id.1 && (config.namespace.is_none() || config.namespace == id.0)
53                })
54                .map(|(_, c)| &c.config)
55                .unwrap_or(&self.config.global),
56            None => &self.config.global,
57        }
58    }
59
60    /// Takes in key and a value corresponding to a tag on an incoming Metric
61    /// Event.  If that value is already part of set of accepted values for that
62    /// key, then simply returns true.  If that value is not yet part of the
63    /// accepted values for that key, checks whether we have hit the value_limit
64    /// for that key yet and if not adds the value to the set of accepted values
65    /// for the key and returns true, otherwise returns false.  A false return
66    /// value indicates to the caller that the value is not accepted for this
67    /// key, and the configured limit_exceeded_action should be taken.
68    fn try_accept_tag(
69        &mut self,
70        metric_key: Option<&MetricId>,
71        key: &str,
72        value: &TagValueSet,
73    ) -> bool {
74        let config = self.get_config_for_metric(metric_key).clone();
75        let metric_accepted_tags = self.accepted_tags.entry(metric_key.cloned()).or_default();
76        let tag_value_set = metric_accepted_tags
77            .entry_ref(key)
78            .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode));
79
80        if tag_value_set.contains(value) {
81            // Tag value has already been accepted, nothing more to do.
82            return true;
83        }
84
85        // Tag value not yet part of the accepted set.
86        if tag_value_set.len() < config.value_limit {
87            // accept the new value
88            tag_value_set.insert(value.clone());
89
90            if tag_value_set.len() == config.value_limit {
91                emit!(TagCardinalityValueLimitReached { key });
92            }
93
94            true
95        } else {
96            // New tag value is rejected.
97            false
98        }
99    }
100
101    /// Checks if recording a key and value corresponding to a tag on an incoming Metric would
102    /// exceed the cardinality limit.
103    fn tag_limit_exceeded(
104        &self,
105        metric_key: Option<&MetricId>,
106        key: &str,
107        value: &TagValueSet,
108    ) -> bool {
109        self.accepted_tags
110            .get(&metric_key.cloned())
111            .and_then(|metric_accepted_tags| {
112                metric_accepted_tags.get(key).map(|value_set| {
113                    !value_set.contains(value)
114                        && value_set.len() >= self.get_config_for_metric(metric_key).value_limit
115                })
116            })
117            .unwrap_or(false)
118    }
119
120    /// Record a key and value corresponding to a tag on an incoming Metric.
121    fn record_tag_value(&mut self, metric_key: Option<&MetricId>, key: &str, value: &TagValueSet) {
122        let config = self.get_config_for_metric(metric_key).clone();
123        let metric_accepted_tags = self.accepted_tags.entry(metric_key.cloned()).or_default();
124        metric_accepted_tags
125            .entry_ref(key)
126            .or_insert_with(|| AcceptedTagValueSet::new(config.value_limit, &config.mode))
127            .insert(value.clone());
128    }
129
130    fn transform_one(&mut self, mut event: Event) -> Option<Event> {
131        let metric = event.as_mut_metric();
132        let metric_name = metric.name().to_string();
133        let metric_namespace = metric.namespace().map(|n| n.to_string());
134        let has_per_metric_config = self.config.per_metric_limits.iter().any(|(name, config)| {
135            *name == metric_name
136                && (config.namespace.is_none() || config.namespace == metric_namespace)
137        });
138        let metric_key = if has_per_metric_config {
139            Some((metric_namespace, metric_name.clone()))
140        } else {
141            None
142        };
143        if let Some(tags_map) = metric.tags_mut() {
144            match self
145                .get_config_for_metric(metric_key.as_ref())
146                .limit_exceeded_action
147            {
148                LimitExceededAction::DropEvent => {
149                    // This needs to check all the tags, to ensure that the ordering of tag names
150                    // doesn't change the behavior of the check.
151
152                    for (key, value) in tags_map.iter_sets() {
153                        if self.tag_limit_exceeded(metric_key.as_ref(), key, value) {
154                            emit!(TagCardinalityLimitRejectingEvent {
155                                metric_name: &metric_name,
156                                tag_key: key,
157                                tag_value: &value.to_string(),
158                            });
159                            return None;
160                        }
161                    }
162                    for (key, value) in tags_map.iter_sets() {
163                        self.record_tag_value(metric_key.as_ref(), key, value);
164                    }
165                }
166                LimitExceededAction::DropTag => {
167                    tags_map.retain(|key, value| {
168                        if self.try_accept_tag(metric_key.as_ref(), key, value) {
169                            true
170                        } else {
171                            emit!(TagCardinalityLimitRejectingTag {
172                                metric_name: &metric_name,
173                                tag_key: key,
174                                tag_value: &value.to_string(),
175                            });
176                            false
177                        }
178                    });
179                }
180            }
181        }
182        Some(event)
183    }
184}
185
186impl TaskTransform<Event> for TagCardinalityLimit {
187    fn transform(
188        self: Box<Self>,
189        task: Pin<Box<dyn Stream<Item = Event> + Send>>,
190    ) -> Pin<Box<dyn Stream<Item = Event> + Send>>
191    where
192        Self: 'static,
193    {
194        let mut inner = self;
195        Box::pin(task.filter_map(move |v| ready(inner.transform_one(v))))
196    }
197}