vector/internal_telemetry/allocations/
mod.rs

1//! Allocation tracking exposed via internal telemetry.
2
3mod allocator;
4use std::{
5    sync::{
6        atomic::{AtomicBool, AtomicU64, Ordering},
7        Mutex,
8    },
9    thread,
10    time::Duration,
11};
12
13use arr_macro::arr;
14use metrics::{counter, gauge};
15use rand_distr::num_traits::ToPrimitive;
16
17use self::allocator::Tracer;
18
19pub(crate) use self::allocator::{
20    without_allocation_tracing, AllocationGroupId, AllocationLayer, GroupedTraceableAllocator,
21};
22
23const NUM_GROUPS: usize = 256;
24
25// Allocations are not tracked during startup.
26// We use the Relaxed ordering for both stores and loads of this atomic as no other threads exist when
27// this code is running, and all future threads will have a happens-after relationship with
28// this thread -- the main thread -- ensuring that they see the latest value of TRACK_ALLOCATIONS.
29pub static TRACK_ALLOCATIONS: AtomicBool = AtomicBool::new(false);
30
31pub fn is_allocation_tracing_enabled() -> bool {
32    TRACK_ALLOCATIONS.load(Ordering::Acquire)
33}
34
35/// Track allocations and deallocations separately.
36struct GroupMemStatsStorage {
37    allocations: [AtomicU64; NUM_GROUPS],
38    deallocations: [AtomicU64; NUM_GROUPS],
39}
40
41// Reporting interval in milliseconds.
42pub static REPORTING_INTERVAL_MS: AtomicU64 = AtomicU64::new(5000);
43
44/// A registry for tracking each thread's group memory statistics.
45static THREAD_LOCAL_REFS: Mutex<Vec<&'static GroupMemStatsStorage>> = Mutex::new(Vec::new());
46
47/// Group memory statistics per thread.
48struct GroupMemStats {
49    stats: &'static GroupMemStatsStorage,
50}
51
52impl GroupMemStats {
53    /// Allocates a [`GroupMemStatsStorage`], and updates the global [`THREAD_LOCAL_REFS`] registry
54    /// with a reference to this newly allocated memory.
55    pub fn new() -> Self {
56        let mut mutex = THREAD_LOCAL_REFS.lock().unwrap();
57        let stats_ref: &'static GroupMemStatsStorage = Box::leak(Box::new(GroupMemStatsStorage {
58            allocations: arr![AtomicU64::new(0) ; 256],
59            deallocations: arr![AtomicU64::new(0) ; 256],
60        }));
61        let group_mem_stats = GroupMemStats { stats: stats_ref };
62        mutex.push(stats_ref);
63        group_mem_stats
64    }
65}
66
67thread_local! {
68    static GROUP_MEM_STATS: GroupMemStats = GroupMemStats::new();
69}
70
71struct GroupInfo {
72    component_kind: String,
73    component_type: String,
74    component_id: String,
75}
76
77impl GroupInfo {
78    const fn new() -> Self {
79        Self {
80            component_id: String::new(),
81            component_kind: String::new(),
82            component_type: String::new(),
83        }
84    }
85}
86
87static GROUP_INFO: [Mutex<GroupInfo>; NUM_GROUPS] = arr![Mutex::new(GroupInfo::new()); 256];
88
89pub type Allocator<A> = GroupedTraceableAllocator<A, MainTracer>;
90
91pub const fn get_grouped_tracing_allocator<A>(allocator: A) -> Allocator<A> {
92    GroupedTraceableAllocator::new(allocator, MainTracer)
93}
94
95pub struct MainTracer;
96
97impl Tracer for MainTracer {
98    #[inline(always)]
99    fn trace_allocation(&self, object_size: usize, group_id: AllocationGroupId) {
100        // Handle the case when thread local destructor is ran.
101        _ = GROUP_MEM_STATS.try_with(|t| {
102            t.stats.allocations[group_id.as_raw() as usize]
103                .fetch_add(object_size as u64, Ordering::Relaxed)
104        });
105    }
106
107    #[inline(always)]
108    fn trace_deallocation(&self, object_size: usize, source_group_id: AllocationGroupId) {
109        // Handle the case when thread local destructor is ran.
110        _ = GROUP_MEM_STATS.try_with(|t| {
111            t.stats.deallocations[source_group_id.as_raw() as usize]
112                .fetch_add(object_size as u64, Ordering::Relaxed)
113        });
114    }
115}
116
117/// Initializes allocation tracing.
118pub fn init_allocation_tracing() {
119    for group in &GROUP_INFO {
120        let mut writer = group.lock().unwrap();
121        *writer = GroupInfo {
122            component_id: "root".to_string(),
123            component_kind: "root".to_string(),
124            component_type: "root".to_string(),
125        };
126    }
127    let alloc_processor = thread::Builder::new().name("vector-alloc-processor".to_string());
128    alloc_processor
129        .spawn(|| {
130            without_allocation_tracing(|| loop {
131                for (group_idx, group) in GROUP_INFO.iter().enumerate() {
132                    let mut allocations_diff = 0;
133                    let mut deallocations_diff = 0;
134                    let mutex = THREAD_LOCAL_REFS.lock().unwrap();
135                    for idx in 0..mutex.len() {
136                        allocations_diff +=
137                            mutex[idx].allocations[group_idx].swap(0, Ordering::Relaxed);
138                        deallocations_diff +=
139                            mutex[idx].deallocations[group_idx].swap(0, Ordering::Relaxed);
140                    }
141                    if allocations_diff == 0 && deallocations_diff == 0 {
142                        continue;
143                    }
144                    let mem_used_diff = allocations_diff as i64 - deallocations_diff as i64;
145                    let group_info = group.lock().unwrap();
146                    if allocations_diff > 0 {
147                        counter!(
148                            "component_allocated_bytes_total", "component_kind" => group_info.component_kind.clone(),
149                            "component_type" => group_info.component_type.clone(),
150                            "component_id" => group_info.component_id.clone()).increment(allocations_diff);
151                    }
152                    if deallocations_diff > 0 {
153                        counter!(
154                            "component_deallocated_bytes_total", "component_kind" => group_info.component_kind.clone(),
155                            "component_type" => group_info.component_type.clone(),
156                            "component_id" => group_info.component_id.clone()).increment(deallocations_diff);
157                    }
158                    if mem_used_diff > 0 {
159                        gauge!(
160                            "component_allocated_bytes", "component_type" => group_info.component_type.clone(),
161                            "component_id" => group_info.component_id.clone(),
162                            "component_kind" => group_info.component_kind.clone())
163                            .increment(mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
164                    }
165                    if mem_used_diff < 0 {
166                        gauge!(
167                            "component_allocated_bytes", "component_type" => group_info.component_type.clone(),
168                            "component_id" => group_info.component_id.clone(),
169                            "component_kind" => group_info.component_kind.clone())
170                            .decrement(-mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
171                    }
172                }
173                thread::sleep(Duration::from_millis(
174                    REPORTING_INTERVAL_MS.load(Ordering::Relaxed),
175                ));
176            })
177        })
178        .unwrap();
179}
180
181/// Acquires an allocation group ID.
182///
183/// This creates an allocation group which allows callers to enter/exit the allocation group context, associating all
184/// (de)allocations within the context with that group. An allocation group ID must be "attached" to
185/// a [`tracing::Span`] to achieve this" we utilize the logical invariants provided by spans --
186/// entering, exiting, and how spans exist as a stack -- in order to handle keeping the "current
187/// allocation group" accurate across all threads.
188pub fn acquire_allocation_group_id(
189    component_id: String,
190    component_type: String,
191    component_kind: String,
192) -> AllocationGroupId {
193    if let Some(group_id) = AllocationGroupId::register() {
194        if let Some(group_lock) = GROUP_INFO.get(group_id.as_raw() as usize) {
195            let mut writer = group_lock.lock().unwrap();
196            *writer = GroupInfo {
197                component_id,
198                component_kind,
199                component_type,
200            };
201
202            return group_id;
203        }
204    }
205
206    warn!("Maximum number of registrable allocation group IDs reached ({}). Allocations for component '{}' will be attributed to the root allocation group.", NUM_GROUPS, component_id);
207    AllocationGroupId::ROOT
208}