vector/internal_telemetry/allocations/
mod.rs

1//! Allocation tracking exposed via internal telemetry.
2
3mod allocator;
4use std::{
5    sync::{
6        Mutex,
7        atomic::{AtomicBool, AtomicU64, Ordering},
8    },
9    thread,
10    time::Duration,
11};
12
13use arr_macro::arr;
14use metrics::{counter, gauge};
15use rand_distr::num_traits::ToPrimitive;
16
17use self::allocator::Tracer;
18pub(crate) use self::allocator::{
19    AllocationGroupId, AllocationLayer, GroupedTraceableAllocator, without_allocation_tracing,
20};
21
22const NUM_GROUPS: usize = 256;
23
24// Allocations are not tracked during startup.
25// We use the Relaxed ordering for both stores and loads of this atomic as no other threads exist when
26// this code is running, and all future threads will have a happens-after relationship with
27// this thread -- the main thread -- ensuring that they see the latest value of TRACK_ALLOCATIONS.
28pub static TRACK_ALLOCATIONS: AtomicBool = AtomicBool::new(false);
29
30pub fn is_allocation_tracing_enabled() -> bool {
31    TRACK_ALLOCATIONS.load(Ordering::Acquire)
32}
33
34/// Track allocations and deallocations separately.
35struct GroupMemStatsStorage {
36    allocations: [AtomicU64; NUM_GROUPS],
37    deallocations: [AtomicU64; NUM_GROUPS],
38}
39
40// Reporting interval in milliseconds.
41pub static REPORTING_INTERVAL_MS: AtomicU64 = AtomicU64::new(5000);
42
43/// A registry for tracking each thread's group memory statistics.
44static THREAD_LOCAL_REFS: Mutex<Vec<&'static GroupMemStatsStorage>> = Mutex::new(Vec::new());
45
46/// Group memory statistics per thread.
47struct GroupMemStats {
48    stats: &'static GroupMemStatsStorage,
49}
50
51impl GroupMemStats {
52    /// Allocates a [`GroupMemStatsStorage`], and updates the global [`THREAD_LOCAL_REFS`] registry
53    /// with a reference to this newly allocated memory.
54    pub fn new() -> Self {
55        let mut mutex = THREAD_LOCAL_REFS.lock().unwrap();
56        let stats_ref: &'static GroupMemStatsStorage = Box::leak(Box::new(GroupMemStatsStorage {
57            allocations: arr![AtomicU64::new(0) ; 256],
58            deallocations: arr![AtomicU64::new(0) ; 256],
59        }));
60        let group_mem_stats = GroupMemStats { stats: stats_ref };
61        mutex.push(stats_ref);
62        group_mem_stats
63    }
64}
65
66thread_local! {
67    static GROUP_MEM_STATS: GroupMemStats = GroupMemStats::new();
68}
69
70struct GroupInfo {
71    component_kind: String,
72    component_type: String,
73    component_id: String,
74}
75
76impl GroupInfo {
77    const fn new() -> Self {
78        Self {
79            component_id: String::new(),
80            component_kind: String::new(),
81            component_type: String::new(),
82        }
83    }
84}
85
86static GROUP_INFO: [Mutex<GroupInfo>; NUM_GROUPS] = arr![Mutex::new(GroupInfo::new()); 256];
87
88pub type Allocator<A> = GroupedTraceableAllocator<A, MainTracer>;
89
90pub const fn get_grouped_tracing_allocator<A>(allocator: A) -> Allocator<A> {
91    GroupedTraceableAllocator::new(allocator, MainTracer)
92}
93
94pub struct MainTracer;
95
96impl Tracer for MainTracer {
97    #[inline(always)]
98    fn trace_allocation(&self, object_size: usize, group_id: AllocationGroupId) {
99        // Handle the case when thread local destructor is ran.
100        _ = GROUP_MEM_STATS.try_with(|t| {
101            t.stats.allocations[group_id.as_raw() as usize]
102                .fetch_add(object_size as u64, Ordering::Relaxed)
103        });
104    }
105
106    #[inline(always)]
107    fn trace_deallocation(&self, object_size: usize, source_group_id: AllocationGroupId) {
108        // Handle the case when thread local destructor is ran.
109        _ = GROUP_MEM_STATS.try_with(|t| {
110            t.stats.deallocations[source_group_id.as_raw() as usize]
111                .fetch_add(object_size as u64, Ordering::Relaxed)
112        });
113    }
114}
115
116/// Initializes allocation tracing.
117pub fn init_allocation_tracing() {
118    for group in &GROUP_INFO {
119        let mut writer = group.lock().unwrap();
120        *writer = GroupInfo {
121            component_id: "root".to_string(),
122            component_kind: "root".to_string(),
123            component_type: "root".to_string(),
124        };
125    }
126    let alloc_processor = thread::Builder::new().name("vector-alloc-processor".to_string());
127    alloc_processor
128        .spawn(|| {
129            without_allocation_tracing(|| loop {
130                for (group_idx, group) in GROUP_INFO.iter().enumerate() {
131                    let mut allocations_diff = 0;
132                    let mut deallocations_diff = 0;
133                    let mutex = THREAD_LOCAL_REFS.lock().unwrap();
134                    for idx in 0..mutex.len() {
135                        allocations_diff +=
136                            mutex[idx].allocations[group_idx].swap(0, Ordering::Relaxed);
137                        deallocations_diff +=
138                            mutex[idx].deallocations[group_idx].swap(0, Ordering::Relaxed);
139                    }
140                    if allocations_diff == 0 && deallocations_diff == 0 {
141                        continue;
142                    }
143                    let mem_used_diff = allocations_diff as i64 - deallocations_diff as i64;
144                    let group_info = group.lock().unwrap();
145                    if allocations_diff > 0 {
146                        counter!(
147                            "component_allocated_bytes_total", "component_kind" => group_info.component_kind.clone(),
148                            "component_type" => group_info.component_type.clone(),
149                            "component_id" => group_info.component_id.clone()).increment(allocations_diff);
150                    }
151                    if deallocations_diff > 0 {
152                        counter!(
153                            "component_deallocated_bytes_total", "component_kind" => group_info.component_kind.clone(),
154                            "component_type" => group_info.component_type.clone(),
155                            "component_id" => group_info.component_id.clone()).increment(deallocations_diff);
156                    }
157                    if mem_used_diff > 0 {
158                        gauge!(
159                            "component_allocated_bytes", "component_type" => group_info.component_type.clone(),
160                            "component_id" => group_info.component_id.clone(),
161                            "component_kind" => group_info.component_kind.clone())
162                            .increment(mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
163                    }
164                    if mem_used_diff < 0 {
165                        gauge!(
166                            "component_allocated_bytes", "component_type" => group_info.component_type.clone(),
167                            "component_id" => group_info.component_id.clone(),
168                            "component_kind" => group_info.component_kind.clone())
169                            .decrement(-mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
170                    }
171                }
172                thread::sleep(Duration::from_millis(
173                    REPORTING_INTERVAL_MS.load(Ordering::Relaxed),
174                ));
175            })
176        })
177        .unwrap();
178}
179
180/// Acquires an allocation group ID.
181///
182/// This creates an allocation group which allows callers to enter/exit the allocation group context, associating all
183/// (de)allocations within the context with that group. An allocation group ID must be "attached" to
184/// a [`tracing::Span`] to achieve this" we utilize the logical invariants provided by spans --
185/// entering, exiting, and how spans exist as a stack -- in order to handle keeping the "current
186/// allocation group" accurate across all threads.
187pub fn acquire_allocation_group_id(
188    component_id: String,
189    component_type: String,
190    component_kind: String,
191) -> AllocationGroupId {
192    if let Some(group_id) = AllocationGroupId::register()
193        && let Some(group_lock) = GROUP_INFO.get(group_id.as_raw() as usize)
194    {
195        let mut writer = group_lock.lock().unwrap();
196        *writer = GroupInfo {
197            component_id,
198            component_kind,
199            component_type,
200        };
201
202        return group_id;
203    }
204
205    warn!(
206        "Maximum number of registrable allocation group IDs reached ({}). Allocations for component '{}' will be attributed to the root allocation group.",
207        NUM_GROUPS, component_id
208    );
209    AllocationGroupId::ROOT
210}