vector/internal_telemetry/allocations/
mod.rsmod allocator;
use std::{
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Mutex,
},
thread,
time::Duration,
};
use arr_macro::arr;
use metrics::{counter, gauge};
use rand_distr::num_traits::ToPrimitive;
use self::allocator::Tracer;
pub(crate) use self::allocator::{
without_allocation_tracing, AllocationGroupId, AllocationLayer, GroupedTraceableAllocator,
};
const NUM_GROUPS: usize = 128;
pub static TRACK_ALLOCATIONS: AtomicBool = AtomicBool::new(false);
pub fn is_allocation_tracing_enabled() -> bool {
TRACK_ALLOCATIONS.load(Ordering::Acquire)
}
struct GroupMemStatsStorage {
allocations: [AtomicU64; NUM_GROUPS],
deallocations: [AtomicU64; NUM_GROUPS],
}
pub static REPORTING_INTERVAL_MS: AtomicU64 = AtomicU64::new(5000);
static THREAD_LOCAL_REFS: Mutex<Vec<&'static GroupMemStatsStorage>> = Mutex::new(Vec::new());
struct GroupMemStats {
stats: &'static GroupMemStatsStorage,
}
impl GroupMemStats {
pub fn new() -> Self {
let mut mutex = THREAD_LOCAL_REFS.lock().unwrap();
let stats_ref: &'static GroupMemStatsStorage = Box::leak(Box::new(GroupMemStatsStorage {
allocations: arr![AtomicU64::new(0) ; 128],
deallocations: arr![AtomicU64::new(0) ; 128],
}));
let group_mem_stats = GroupMemStats { stats: stats_ref };
mutex.push(stats_ref);
group_mem_stats
}
}
thread_local! {
static GROUP_MEM_STATS: GroupMemStats = GroupMemStats::new();
}
struct GroupInfo {
component_kind: String,
component_type: String,
component_id: String,
}
impl GroupInfo {
const fn new() -> Self {
Self {
component_id: String::new(),
component_kind: String::new(),
component_type: String::new(),
}
}
}
static GROUP_INFO: [Mutex<GroupInfo>; NUM_GROUPS] = arr![Mutex::new(GroupInfo::new()); 128];
pub type Allocator<A> = GroupedTraceableAllocator<A, MainTracer>;
pub const fn get_grouped_tracing_allocator<A>(allocator: A) -> Allocator<A> {
GroupedTraceableAllocator::new(allocator, MainTracer)
}
pub struct MainTracer;
impl Tracer for MainTracer {
#[inline(always)]
fn trace_allocation(&self, object_size: usize, group_id: AllocationGroupId) {
_ = GROUP_MEM_STATS.try_with(|t| {
t.stats.allocations[group_id.as_raw() as usize]
.fetch_add(object_size as u64, Ordering::Relaxed)
});
}
#[inline(always)]
fn trace_deallocation(&self, object_size: usize, source_group_id: AllocationGroupId) {
_ = GROUP_MEM_STATS.try_with(|t| {
t.stats.deallocations[source_group_id.as_raw() as usize]
.fetch_add(object_size as u64, Ordering::Relaxed)
});
}
}
pub fn init_allocation_tracing() {
for group in &GROUP_INFO {
let mut writer = group.lock().unwrap();
*writer = GroupInfo {
component_id: "root".to_string(),
component_kind: "root".to_string(),
component_type: "root".to_string(),
};
}
let alloc_processor = thread::Builder::new().name("vector-alloc-processor".to_string());
alloc_processor
.spawn(|| {
without_allocation_tracing(|| loop {
for (group_idx, group) in GROUP_INFO.iter().enumerate() {
let mut allocations_diff = 0;
let mut deallocations_diff = 0;
let mutex = THREAD_LOCAL_REFS.lock().unwrap();
for idx in 0..mutex.len() {
allocations_diff +=
mutex[idx].allocations[group_idx].swap(0, Ordering::Relaxed);
deallocations_diff +=
mutex[idx].deallocations[group_idx].swap(0, Ordering::Relaxed);
}
if allocations_diff == 0 && deallocations_diff == 0 {
continue;
}
let mem_used_diff = allocations_diff as i64 - deallocations_diff as i64;
let group_info = group.lock().unwrap();
if allocations_diff > 0 {
counter!(
"component_allocated_bytes_total", "component_kind" => group_info.component_kind.clone(),
"component_type" => group_info.component_type.clone(),
"component_id" => group_info.component_id.clone()).increment(allocations_diff);
}
if deallocations_diff > 0 {
counter!(
"component_deallocated_bytes_total", "component_kind" => group_info.component_kind.clone(),
"component_type" => group_info.component_type.clone(),
"component_id" => group_info.component_id.clone()).increment(deallocations_diff);
}
if mem_used_diff > 0 {
gauge!(
"component_allocated_bytes", "component_type" => group_info.component_type.clone(),
"component_id" => group_info.component_id.clone(),
"component_kind" => group_info.component_kind.clone())
.increment(mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
}
if mem_used_diff < 0 {
gauge!(
"component_allocated_bytes", "component_type" => group_info.component_type.clone(),
"component_id" => group_info.component_id.clone(),
"component_kind" => group_info.component_kind.clone())
.decrement(-mem_used_diff.to_f64().expect("failed to convert mem_used from int to float"));
}
}
thread::sleep(Duration::from_millis(
REPORTING_INTERVAL_MS.load(Ordering::Relaxed),
));
})
})
.unwrap();
}
pub fn acquire_allocation_group_id(
component_id: String,
component_type: String,
component_kind: String,
) -> AllocationGroupId {
if let Some(group_id) = AllocationGroupId::register() {
if let Some(group_lock) = GROUP_INFO.get(group_id.as_raw() as usize) {
let mut writer = group_lock.lock().unwrap();
*writer = GroupInfo {
component_id,
component_kind,
component_type,
};
return group_id;
}
}
warn!("Maximum number of registrable allocation group IDs reached ({}). Allocations for component '{}' will be attributed to the root allocation group.", NUM_GROUPS, component_id);
AllocationGroupId::ROOT
}