vector/internal_telemetry/allocations/
mod.rs1mod allocator;
4use std::{
5 sync::{
6 atomic::{AtomicBool, AtomicU64, Ordering},
7 Mutex,
8 },
9 thread,
10 time::Duration,
11};
12
13use arr_macro::arr;
14use metrics::{counter, gauge};
15use rand_distr::num_traits::ToPrimitive;
16
17use self::allocator::Tracer;
18
19pub(crate) use self::allocator::{
20 without_allocation_tracing, AllocationGroupId, AllocationLayer, GroupedTraceableAllocator,
21};
22
23const NUM_GROUPS: usize = 256;
24
25pub static TRACK_ALLOCATIONS: AtomicBool = AtomicBool::new(false);
30
31pub fn is_allocation_tracing_enabled() -> bool {
32 TRACK_ALLOCATIONS.load(Ordering::Acquire)
33}
34
35struct GroupMemStatsStorage {
37 allocations: [AtomicU64; NUM_GROUPS],
38 deallocations: [AtomicU64; NUM_GROUPS],
39}
40
41pub static REPORTING_INTERVAL_MS: AtomicU64 = AtomicU64::new(5000);
43
44static THREAD_LOCAL_REFS: Mutex<Vec<&'static GroupMemStatsStorage>> = Mutex::new(Vec::new());
46
47struct GroupMemStats {
49 stats: &'static GroupMemStatsStorage,
50}
51
52impl GroupMemStats {
53 pub fn new() -> Self {
56 let mut mutex = THREAD_LOCAL_REFS.lock().unwrap();
57 let stats_ref: &'static GroupMemStatsStorage = Box::leak(Box::new(GroupMemStatsStorage {
58 allocations: arr![AtomicU64::new(0) ; 256],
59 deallocations: arr![AtomicU64::new(0) ; 256],
60 }));
61 let group_mem_stats = GroupMemStats { stats: stats_ref };
62 mutex.push(stats_ref);
63 group_mem_stats
64 }
65}
66
67thread_local! {
68 static GROUP_MEM_STATS: GroupMemStats = GroupMemStats::new();
69}
70
71struct GroupInfo {
72 component_kind: String,
73 component_type: String,
74 component_id: String,
75}
76
77impl GroupInfo {
78 const fn new() -> Self {
79 Self {
80 component_id: String::new(),
81 component_kind: String::new(),
82 component_type: String::new(),
83 }
84 }
85}
86
87static GROUP_INFO: [Mutex<GroupInfo>; NUM_GROUPS] = arr![Mutex::new(GroupInfo::new()); 256];
88
89pub type Allocator<A> = GroupedTraceableAllocator<A, MainTracer>;
90
91pub const fn get_grouped_tracing_allocator<A>(allocator: A) -> Allocator<A> {
92 GroupedTraceableAllocator::new(allocator, MainTracer)
93}
94
95pub struct MainTracer;
96
97impl Tracer for MainTracer {
98 #[inline(always)]
99 fn trace_allocation(&self, object_size: usize, group_id: AllocationGroupId) {
100 _ = GROUP_MEM_STATS.try_with(|t| {
102 t.stats.allocations[group_id.as_raw() as usize]
103 .fetch_add(object_size as u64, Ordering::Relaxed)
104 });
105 }
106
107 #[inline(always)]
108 fn trace_deallocation(&self, object_size: usize, source_group_id: AllocationGroupId) {
109 _ = GROUP_MEM_STATS.try_with(|t| {
111 t.stats.deallocations[source_group_id.as_raw() as usize]
112 .fetch_add(object_size as u64, Ordering::Relaxed)
113 });
114 }
115}
116
117pub fn init_allocation_tracing() {
119 for group in &GROUP_INFO {
120 let mut writer = group.lock().unwrap();
121 *writer = GroupInfo {
122 component_id: "root".to_string(),
123 component_kind: "root".to_string(),
124 component_type: "root".to_string(),
125 };
126 }
127 let alloc_processor = thread::Builder::new().name("vector-alloc-processor".to_string());
128 alloc_processor
129 .spawn(|| {
130 without_allocation_tracing(|| loop {
131 for (group_idx, group) in GROUP_INFO.iter().enumerate() {
132 let mut allocations_diff = 0;
133 let mut deallocations_diff = 0;
134 let mutex = THREAD_LOCAL_REFS.lock().unwrap();
135 for idx in 0..mutex.len() {
136 allocations_diff +=
137 mutex[idx].allocations[group_idx].swap(0, Ordering::Relaxed);
138 deallocations_diff +=
139 mutex[idx].deallocations[group_idx].swap(0, Ordering::Relaxed);
140 }
141 if allocations_diff == 0 && deallocations_diff == 0 {
142 continue;
143 }
144 let mem_used_diff = allocations_diff as i64 - deallocations_diff as i64;
145 let group_info = group.lock().unwrap();
146 if allocations_diff > 0 {
147 counter!(
148 "component_allocated_bytes_total", "component_kind" => group_info.component_kind.clone(),
149 "component_type" => group_info.component_type.clone(),
150 "component_id" => group_info.component_id.clone()).increment(allocations_diff);
151 }
152 if deallocations_diff > 0 {
153 counter!(
154 "component_deallocated_bytes_total", "component_kind" => group_info.component_kind.clone(),
155 "component_type" => group_info.component_type.clone(),
156 "component_id" => group_info.component_id.clone()).increment(deallocations_diff);
157 }
158 if mem_used_diff > 0 {
159 gauge!(
160 "component_allocated_bytes", "component_type" => group_info.component_type.clone(),
161 "component_id" => group_info.component_id.clone(),
162 "component_kind" => group_info.component_kind.clone())
163 .increment(mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
164 }
165 if mem_used_diff < 0 {
166 gauge!(
167 "component_allocated_bytes", "component_type" => group_info.component_type.clone(),
168 "component_id" => group_info.component_id.clone(),
169 "component_kind" => group_info.component_kind.clone())
170 .decrement(-mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
171 }
172 }
173 thread::sleep(Duration::from_millis(
174 REPORTING_INTERVAL_MS.load(Ordering::Relaxed),
175 ));
176 })
177 })
178 .unwrap();
179}
180
181pub fn acquire_allocation_group_id(
189 component_id: String,
190 component_type: String,
191 component_kind: String,
192) -> AllocationGroupId {
193 if let Some(group_id) = AllocationGroupId::register() {
194 if let Some(group_lock) = GROUP_INFO.get(group_id.as_raw() as usize) {
195 let mut writer = group_lock.lock().unwrap();
196 *writer = GroupInfo {
197 component_id,
198 component_kind,
199 component_type,
200 };
201
202 return group_id;
203 }
204 }
205
206 warn!("Maximum number of registrable allocation group IDs reached ({}). Allocations for component '{}' will be attributed to the root allocation group.", NUM_GROUPS, component_id);
207 AllocationGroupId::ROOT
208}