vector_common/internal_event/
events_sent.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
use std::sync::Arc;

use metrics::{counter, Counter};
use tracing::trace;

use crate::config::ComponentKey;

use super::{CountByteSize, OptionalTag, Output, SharedString};

pub const DEFAULT_OUTPUT: &str = "_default";

crate::registered_event!(
    EventsSent {
        output: Option<SharedString>,
    } => {
        events: Counter = if let Some(output) = &self.output {
            counter!("component_sent_events_total", "output" => output.clone())
        } else {
            counter!("component_sent_events_total")
        },
        event_bytes: Counter = if let Some(output) = &self.output {
            counter!("component_sent_event_bytes_total", "output" => output.clone())
        } else {
            counter!("component_sent_event_bytes_total")
        },
        output: Option<SharedString> = self.output,
    }

    fn emit(&self, data: CountByteSize) {
        let CountByteSize(count, byte_size) = data;

        match &self.output {
            Some(output) => {
                trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get(), output = %output);
            }
            None => {
                trace!(message = "Events sent.", count = %count, byte_size = %byte_size.get());
            }
        }

        self.events.increment(count as u64);
        self.event_bytes.increment(byte_size.get() as u64);
    }
);

impl From<Output> for EventsSent {
    fn from(output: Output) -> Self {
        Self { output: output.0 }
    }
}

/// Makes a list of the tags to use with the events sent event.
fn make_tags(
    source: &OptionalTag<Arc<ComponentKey>>,
    service: &OptionalTag<String>,
) -> Vec<(&'static str, String)> {
    let mut tags = Vec::new();
    if let OptionalTag::Specified(tag) = source {
        tags.push((
            "source",
            tag.as_ref()
                .map_or_else(|| "-".to_string(), |tag| tag.id().to_string()),
        ));
    }

    if let OptionalTag::Specified(tag) = service {
        tags.push(("service", tag.clone().unwrap_or("-".to_string())));
    }

    tags
}

crate::registered_event!(
    TaggedEventsSent {
        source: OptionalTag<Arc<ComponentKey>>,
        service: OptionalTag<String>,
    } => {
        events: Counter = {
            counter!("component_sent_events_total", &make_tags(&self.source, &self.service))
        },
        event_bytes: Counter = {
            counter!("component_sent_event_bytes_total", &make_tags(&self.source, &self.service))
        },
    }

    fn emit(&self, data: CountByteSize) {
        let CountByteSize(count, byte_size) = data;
        trace!(message = "Events sent.", %count, %byte_size);

        self.events.increment(count as u64);
        self.event_bytes.increment(byte_size.get() as u64);
    }

    fn register(_fixed: (), tags: TaggedEventsSent) {
        super::register(tags)
    }
);

impl TaggedEventsSent {
    #[must_use]
    pub fn new_empty() -> Self {
        Self {
            source: OptionalTag::Specified(None),
            service: OptionalTag::Specified(None),
        }
    }

    #[must_use]
    pub fn new_unspecified() -> Self {
        Self {
            source: OptionalTag::Ignored,
            service: OptionalTag::Ignored,
        }
    }
}