1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
//! Allocation tracking exposed via internal telemetry.

mod allocator;
use std::{
    sync::{
        atomic::{AtomicBool, AtomicU64, Ordering},
        Mutex,
    },
    thread,
    time::Duration,
};

use arr_macro::arr;
use metrics::{counter, gauge};
use rand_distr::num_traits::ToPrimitive;

use self::allocator::Tracer;

pub(crate) use self::allocator::{
    without_allocation_tracing, AllocationGroupId, AllocationLayer, GroupedTraceableAllocator,
};

const NUM_GROUPS: usize = 128;

// Allocations are not tracked during startup.
// We use the Relaxed ordering for both stores and loads of this atomic as no other threads exist when
// this code is running, and all future threads will have a happens-after relationship with
// this thread -- the main thread -- ensuring that they see the latest value of TRACK_ALLOCATIONS.
pub static TRACK_ALLOCATIONS: AtomicBool = AtomicBool::new(false);

pub fn is_allocation_tracing_enabled() -> bool {
    TRACK_ALLOCATIONS.load(Ordering::Acquire)
}

/// Track allocations and deallocations separately.
struct GroupMemStatsStorage {
    allocations: [AtomicU64; NUM_GROUPS],
    deallocations: [AtomicU64; NUM_GROUPS],
}

// Reporting interval in milliseconds.
pub static REPORTING_INTERVAL_MS: AtomicU64 = AtomicU64::new(5000);

/// A registry for tracking each thread's group memory statistics.
static THREAD_LOCAL_REFS: Mutex<Vec<&'static GroupMemStatsStorage>> = Mutex::new(Vec::new());

/// Group memory statistics per thread.
struct GroupMemStats {
    stats: &'static GroupMemStatsStorage,
}

impl GroupMemStats {
    /// Allocates a [`GroupMemStatsStorage`], and updates the global [`THREAD_LOCAL_REFS`] registry
    /// with a reference to this newly allocated memory.
    pub fn new() -> Self {
        let mut mutex = THREAD_LOCAL_REFS.lock().unwrap();
        let stats_ref: &'static GroupMemStatsStorage = Box::leak(Box::new(GroupMemStatsStorage {
            allocations: arr![AtomicU64::new(0) ; 128],
            deallocations: arr![AtomicU64::new(0) ; 128],
        }));
        let group_mem_stats = GroupMemStats { stats: stats_ref };
        mutex.push(stats_ref);
        group_mem_stats
    }
}

thread_local! {
    static GROUP_MEM_STATS: GroupMemStats = GroupMemStats::new();
}

struct GroupInfo {
    component_kind: String,
    component_type: String,
    component_id: String,
}

impl GroupInfo {
    const fn new() -> Self {
        Self {
            component_id: String::new(),
            component_kind: String::new(),
            component_type: String::new(),
        }
    }
}

static GROUP_INFO: [Mutex<GroupInfo>; NUM_GROUPS] = arr![Mutex::new(GroupInfo::new()); 128];

pub type Allocator<A> = GroupedTraceableAllocator<A, MainTracer>;

pub const fn get_grouped_tracing_allocator<A>(allocator: A) -> Allocator<A> {
    GroupedTraceableAllocator::new(allocator, MainTracer)
}

pub struct MainTracer;

impl Tracer for MainTracer {
    #[inline(always)]
    fn trace_allocation(&self, object_size: usize, group_id: AllocationGroupId) {
        // Handle the case when thread local destructor is ran.
        _ = GROUP_MEM_STATS.try_with(|t| {
            t.stats.allocations[group_id.as_raw() as usize]
                .fetch_add(object_size as u64, Ordering::Relaxed)
        });
    }

    #[inline(always)]
    fn trace_deallocation(&self, object_size: usize, source_group_id: AllocationGroupId) {
        // Handle the case when thread local destructor is ran.
        _ = GROUP_MEM_STATS.try_with(|t| {
            t.stats.deallocations[source_group_id.as_raw() as usize]
                .fetch_add(object_size as u64, Ordering::Relaxed)
        });
    }
}

/// Initializes allocation tracing.
pub fn init_allocation_tracing() {
    for group in &GROUP_INFO {
        let mut writer = group.lock().unwrap();
        *writer = GroupInfo {
            component_id: "root".to_string(),
            component_kind: "root".to_string(),
            component_type: "root".to_string(),
        };
    }
    let alloc_processor = thread::Builder::new().name("vector-alloc-processor".to_string());
    alloc_processor
        .spawn(|| {
            without_allocation_tracing(|| loop {
                for (group_idx, group) in GROUP_INFO.iter().enumerate() {
                    let mut allocations_diff = 0;
                    let mut deallocations_diff = 0;
                    let mutex = THREAD_LOCAL_REFS.lock().unwrap();
                    for idx in 0..mutex.len() {
                        allocations_diff +=
                            mutex[idx].allocations[group_idx].swap(0, Ordering::Relaxed);
                        deallocations_diff +=
                            mutex[idx].deallocations[group_idx].swap(0, Ordering::Relaxed);
                    }
                    if allocations_diff == 0 && deallocations_diff == 0 {
                        continue;
                    }
                    let mem_used_diff = allocations_diff as i64 - deallocations_diff as i64;
                    let group_info = group.lock().unwrap();
                    if allocations_diff > 0 {
                        counter!(
                            "component_allocated_bytes_total", "component_kind" => group_info.component_kind.clone(),
                            "component_type" => group_info.component_type.clone(),
                            "component_id" => group_info.component_id.clone()).increment(allocations_diff);
                    }
                    if deallocations_diff > 0 {
                        counter!(
                            "component_deallocated_bytes_total", "component_kind" => group_info.component_kind.clone(),
                            "component_type" => group_info.component_type.clone(),
                            "component_id" => group_info.component_id.clone()).increment(deallocations_diff);
                    }
                    if mem_used_diff > 0 {
                        gauge!(
                            "component_allocated_bytes", "component_type" => group_info.component_type.clone(),
                            "component_id" => group_info.component_id.clone(),
                            "component_kind" => group_info.component_kind.clone())
                            .increment(mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
                    }
                    if mem_used_diff < 0 {
                        gauge!(
                            "component_allocated_bytes", "component_type" => group_info.component_type.clone(),
                            "component_id" => group_info.component_id.clone(),
                            "component_kind" => group_info.component_kind.clone())
                            .decrement(-mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
                    }
                }
                thread::sleep(Duration::from_millis(
                    REPORTING_INTERVAL_MS.load(Ordering::Relaxed),
                ));
            })
        })
        .unwrap();
}

/// Acquires an allocation group ID.
///
/// This creates an allocation group which allows callers to enter/exit the allocation group context, associating all
/// (de)allocations within the context with that group. An allocation group ID must be "attached" to
/// a [`tracing::Span`] to achieve this" we utilize the logical invariants provided by spans --
/// entering, exiting, and how spans exist as a stack -- in order to handle keeping the "current
/// allocation group" accurate across all threads.
pub fn acquire_allocation_group_id(
    component_id: String,
    component_type: String,
    component_kind: String,
) -> AllocationGroupId {
    if let Some(group_id) = AllocationGroupId::register() {
        if let Some(group_lock) = GROUP_INFO.get(group_id.as_raw() as usize) {
            let mut writer = group_lock.lock().unwrap();
            *writer = GroupInfo {
                component_id,
                component_kind,
                component_type,
            };

            return group_id;
        }
    }

    // TODO: Technically, `NUM_GROUPS` is lower (128) than the upper bound for the
    // `AllocationGroupId::register` call itself (253), so we can hardcode `NUM_GROUPS` here knowing
    // it's the lower of the two values and will trigger first.. but this may not always be true.
    warn!("Maximum number of registrable allocation group IDs reached ({}). Allocations for component '{}' will be attributed to the root allocation group.", NUM_GROUPS, component_id);
    AllocationGroupId::ROOT
}