vector/internal_telemetry/allocations/
mod.rs1mod allocator;
4use std::{
5 sync::{
6 Mutex,
7 atomic::{AtomicBool, AtomicU64, Ordering},
8 },
9 thread,
10 time::Duration,
11};
12
13use arr_macro::arr;
14use metrics::{counter, gauge};
15use rand_distr::num_traits::ToPrimitive;
16
17use self::allocator::Tracer;
18pub(crate) use self::allocator::{
19 AllocationGroupId, AllocationLayer, GroupedTraceableAllocator, without_allocation_tracing,
20};
21
22const NUM_GROUPS: usize = 256;
23
24pub static TRACK_ALLOCATIONS: AtomicBool = AtomicBool::new(false);
29
30pub fn is_allocation_tracing_enabled() -> bool {
31 TRACK_ALLOCATIONS.load(Ordering::Acquire)
32}
33
34struct GroupMemStatsStorage {
36 allocations: [AtomicU64; NUM_GROUPS],
37 deallocations: [AtomicU64; NUM_GROUPS],
38}
39
40pub static REPORTING_INTERVAL_MS: AtomicU64 = AtomicU64::new(5000);
42
43static THREAD_LOCAL_REFS: Mutex<Vec<&'static GroupMemStatsStorage>> = Mutex::new(Vec::new());
45
46struct GroupMemStats {
48 stats: &'static GroupMemStatsStorage,
49}
50
51impl GroupMemStats {
52 pub fn new() -> Self {
55 let mut mutex = THREAD_LOCAL_REFS.lock().unwrap();
56 let stats_ref: &'static GroupMemStatsStorage = Box::leak(Box::new(GroupMemStatsStorage {
57 allocations: arr![AtomicU64::new(0) ; 256],
58 deallocations: arr![AtomicU64::new(0) ; 256],
59 }));
60 let group_mem_stats = GroupMemStats { stats: stats_ref };
61 mutex.push(stats_ref);
62 group_mem_stats
63 }
64}
65
66thread_local! {
67 static GROUP_MEM_STATS: GroupMemStats = GroupMemStats::new();
68}
69
70struct GroupInfo {
71 component_kind: String,
72 component_type: String,
73 component_id: String,
74}
75
76impl GroupInfo {
77 const fn new() -> Self {
78 Self {
79 component_id: String::new(),
80 component_kind: String::new(),
81 component_type: String::new(),
82 }
83 }
84}
85
86static GROUP_INFO: [Mutex<GroupInfo>; NUM_GROUPS] = arr![Mutex::new(GroupInfo::new()); 256];
87
88pub type Allocator<A> = GroupedTraceableAllocator<A, MainTracer>;
89
90pub const fn get_grouped_tracing_allocator<A>(allocator: A) -> Allocator<A> {
91 GroupedTraceableAllocator::new(allocator, MainTracer)
92}
93
94pub struct MainTracer;
95
96impl Tracer for MainTracer {
97 #[inline(always)]
98 fn trace_allocation(&self, object_size: usize, group_id: AllocationGroupId) {
99 _ = GROUP_MEM_STATS.try_with(|t| {
101 t.stats.allocations[group_id.as_raw() as usize]
102 .fetch_add(object_size as u64, Ordering::Relaxed)
103 });
104 }
105
106 #[inline(always)]
107 fn trace_deallocation(&self, object_size: usize, source_group_id: AllocationGroupId) {
108 _ = GROUP_MEM_STATS.try_with(|t| {
110 t.stats.deallocations[source_group_id.as_raw() as usize]
111 .fetch_add(object_size as u64, Ordering::Relaxed)
112 });
113 }
114}
115
116pub fn init_allocation_tracing() {
118 for group in &GROUP_INFO {
119 let mut writer = group.lock().unwrap();
120 *writer = GroupInfo {
121 component_id: "root".to_string(),
122 component_kind: "root".to_string(),
123 component_type: "root".to_string(),
124 };
125 }
126 let alloc_processor = thread::Builder::new().name("vector-alloc-processor".to_string());
127 alloc_processor
128 .spawn(|| {
129 without_allocation_tracing(|| loop {
130 for (group_idx, group) in GROUP_INFO.iter().enumerate() {
131 let mut allocations_diff = 0;
132 let mut deallocations_diff = 0;
133 let mutex = THREAD_LOCAL_REFS.lock().unwrap();
134 for idx in 0..mutex.len() {
135 allocations_diff +=
136 mutex[idx].allocations[group_idx].swap(0, Ordering::Relaxed);
137 deallocations_diff +=
138 mutex[idx].deallocations[group_idx].swap(0, Ordering::Relaxed);
139 }
140 if allocations_diff == 0 && deallocations_diff == 0 {
141 continue;
142 }
143 let mem_used_diff = allocations_diff as i64 - deallocations_diff as i64;
144 let group_info = group.lock().unwrap();
145 if allocations_diff > 0 {
146 counter!(
147 "component_allocated_bytes_total", "component_kind" => group_info.component_kind.clone(),
148 "component_type" => group_info.component_type.clone(),
149 "component_id" => group_info.component_id.clone()).increment(allocations_diff);
150 }
151 if deallocations_diff > 0 {
152 counter!(
153 "component_deallocated_bytes_total", "component_kind" => group_info.component_kind.clone(),
154 "component_type" => group_info.component_type.clone(),
155 "component_id" => group_info.component_id.clone()).increment(deallocations_diff);
156 }
157 if mem_used_diff > 0 {
158 gauge!(
159 "component_allocated_bytes", "component_type" => group_info.component_type.clone(),
160 "component_id" => group_info.component_id.clone(),
161 "component_kind" => group_info.component_kind.clone())
162 .increment(mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
163 }
164 if mem_used_diff < 0 {
165 gauge!(
166 "component_allocated_bytes", "component_type" => group_info.component_type.clone(),
167 "component_id" => group_info.component_id.clone(),
168 "component_kind" => group_info.component_kind.clone())
169 .decrement(-mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
170 }
171 }
172 thread::sleep(Duration::from_millis(
173 REPORTING_INTERVAL_MS.load(Ordering::Relaxed),
174 ));
175 })
176 })
177 .unwrap();
178}
179
180pub fn acquire_allocation_group_id(
188 component_id: String,
189 component_type: String,
190 component_kind: String,
191) -> AllocationGroupId {
192 if let Some(group_id) = AllocationGroupId::register()
193 && let Some(group_lock) = GROUP_INFO.get(group_id.as_raw() as usize)
194 {
195 let mut writer = group_lock.lock().unwrap();
196 *writer = GroupInfo {
197 component_id,
198 component_kind,
199 component_type,
200 };
201
202 return group_id;
203 }
204
205 warn!(
206 "Maximum number of registrable allocation group IDs reached ({}). Allocations for component '{}' will be attributed to the root allocation group.",
207 NUM_GROUPS, component_id
208 );
209 AllocationGroupId::ROOT
210}