use std::{
collections::HashMap,
ops::{Add, AddAssign},
};
use crate::{
internal_event::{
CountByteSize, InternalEventHandle, RegisterTaggedInternalEvent, RegisteredEventCache,
TaggedEventsSent,
},
json_size::JsonSize,
};
pub trait GetEventCountTags {
fn get_tags(&self) -> TaggedEventsSent;
}
#[derive(Clone, Debug)]
pub enum GroupedCountByteSize {
Tagged {
sizes: HashMap<TaggedEventsSent, CountByteSize>,
},
Untagged { size: CountByteSize },
}
impl Default for GroupedCountByteSize {
fn default() -> Self {
Self::Untagged {
size: CountByteSize(0, JsonSize::zero()),
}
}
}
impl GroupedCountByteSize {
#[must_use]
pub fn new_tagged() -> Self {
Self::Tagged {
sizes: HashMap::new(),
}
}
#[must_use]
pub fn new_untagged() -> Self {
Self::Untagged {
size: CountByteSize(0, JsonSize::zero()),
}
}
#[must_use]
#[cfg(any(test, feature = "test"))]
pub fn sizes(&self) -> Option<&HashMap<TaggedEventsSent, CountByteSize>> {
match self {
Self::Tagged { sizes } => Some(sizes),
Self::Untagged { .. } => None,
}
}
#[must_use]
#[cfg(any(test, feature = "test"))]
pub fn size(&self) -> Option<CountByteSize> {
match self {
Self::Tagged { .. } => None,
Self::Untagged { size } => Some(*size),
}
}
pub fn add_event<E>(&mut self, event: &E, json_size: JsonSize)
where
E: GetEventCountTags,
{
match self {
Self::Tagged { sizes } => {
let size = CountByteSize(1, json_size);
let tags = event.get_tags();
match sizes.get_mut(&tags) {
Some(current) => {
*current += size;
}
None => {
sizes.insert(tags, size);
}
}
}
Self::Untagged { size } => {
*size += CountByteSize(1, json_size);
}
}
}
pub fn emit_event<T, H>(&self, event_cache: &RegisteredEventCache<(), T>)
where
T: RegisterTaggedInternalEvent<Tags = TaggedEventsSent, Fixed = (), Handle = H>,
H: InternalEventHandle<Data = CountByteSize>,
{
match self {
GroupedCountByteSize::Tagged { sizes } => {
for (tags, size) in sizes {
event_cache.emit(tags, *size);
}
}
GroupedCountByteSize::Untagged { size } => {
event_cache.emit(&TaggedEventsSent::new_unspecified(), *size);
}
}
}
#[must_use]
pub fn is_tagged(&self) -> bool {
match self {
GroupedCountByteSize::Tagged { .. } => true,
GroupedCountByteSize::Untagged { .. } => false,
}
}
#[must_use]
pub fn is_untagged(&self) -> bool {
!self.is_tagged()
}
}
impl From<CountByteSize> for GroupedCountByteSize {
fn from(value: CountByteSize) -> Self {
Self::Untagged { size: value }
}
}
impl AddAssign for GroupedCountByteSize {
fn add_assign(&mut self, mut rhs: Self) {
if self.is_untagged() && rhs.is_tagged() {
*self = match (&self, &mut rhs) {
(Self::Untagged { size }, Self::Tagged { sizes }) => {
let mut sizes = std::mem::take(sizes);
match sizes.get_mut(&TaggedEventsSent::new_empty()) {
Some(empty_size) => *empty_size += *size,
None => {
sizes.insert(TaggedEventsSent::new_empty(), *size);
}
}
Self::Tagged { sizes }
}
_ => {
unreachable!()
}
};
return;
}
match (self, rhs) {
(Self::Tagged { sizes: ref mut lhs }, Self::Tagged { sizes: rhs }) => {
for (key, value) in rhs {
match lhs.get_mut(&key) {
Some(size) => *size += value,
None => {
lhs.insert(key.clone(), value);
}
}
}
}
(Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => {
*lhs = *lhs + rhs;
}
(Self::Tagged { ref mut sizes }, Self::Untagged { size }) => {
match sizes.get_mut(&TaggedEventsSent::new_empty()) {
Some(empty_size) => *empty_size += size,
None => {
sizes.insert(TaggedEventsSent::new_empty(), size);
}
}
}
(Self::Untagged { .. }, Self::Tagged { .. }) => unreachable!(),
};
}
}
impl<'a> Add<&'a GroupedCountByteSize> for GroupedCountByteSize {
type Output = GroupedCountByteSize;
fn add(self, other: &'a Self::Output) -> Self::Output {
match (self, other) {
(Self::Tagged { sizes: mut lhs }, Self::Tagged { sizes: rhs }) => {
for (key, value) in rhs {
match lhs.get_mut(key) {
Some(size) => *size += *value,
None => {
lhs.insert(key.clone(), *value);
}
}
}
Self::Tagged { sizes: lhs }
}
(Self::Untagged { size: lhs }, Self::Untagged { size: rhs }) => {
Self::Untagged { size: lhs + *rhs }
}
(Self::Tagged { mut sizes }, Self::Untagged { size }) => {
match sizes.get_mut(&TaggedEventsSent::new_empty()) {
Some(empty_size) => *empty_size += *size,
None => {
sizes.insert(TaggedEventsSent::new_empty(), *size);
}
}
Self::Tagged { sizes }
}
(Self::Untagged { size }, Self::Tagged { sizes }) => {
let mut sizes = sizes.clone();
match sizes.get_mut(&TaggedEventsSent::new_empty()) {
Some(empty_size) => *empty_size += size,
None => {
sizes.insert(TaggedEventsSent::new_empty(), size);
}
}
Self::Tagged { sizes }
}
}
}
}
#[derive(Clone, Debug, Default)]
pub struct RequestMetadata {
event_count: usize,
events_byte_size: usize,
events_estimated_json_encoded_byte_size: GroupedCountByteSize,
request_encoded_size: usize,
request_wire_size: usize,
}
impl RequestMetadata {
#[must_use]
pub fn new(
event_count: usize,
events_byte_size: usize,
request_encoded_size: usize,
request_wire_size: usize,
events_estimated_json_encoded_byte_size: GroupedCountByteSize,
) -> Self {
Self {
event_count,
events_byte_size,
events_estimated_json_encoded_byte_size,
request_encoded_size,
request_wire_size,
}
}
#[must_use]
pub const fn event_count(&self) -> usize {
self.event_count
}
#[must_use]
pub const fn events_byte_size(&self) -> usize {
self.events_byte_size
}
#[must_use]
pub fn events_estimated_json_encoded_byte_size(&self) -> &GroupedCountByteSize {
&self.events_estimated_json_encoded_byte_size
}
#[must_use]
pub fn into_events_estimated_json_encoded_byte_size(self) -> GroupedCountByteSize {
self.events_estimated_json_encoded_byte_size
}
#[must_use]
pub const fn request_encoded_size(&self) -> usize {
self.request_encoded_size
}
#[must_use]
pub const fn request_wire_size(&self) -> usize {
self.request_wire_size
}
#[must_use]
pub fn from_batch<T: IntoIterator<Item = RequestMetadata>>(metadata_iter: T) -> Self {
let mut metadata_sum = RequestMetadata::new(0, 0, 0, 0, GroupedCountByteSize::default());
for metadata in metadata_iter {
metadata_sum = metadata_sum + &metadata;
}
metadata_sum
}
}
impl<'a> Add<&'a RequestMetadata> for RequestMetadata {
type Output = RequestMetadata;
fn add(self, other: &'a Self::Output) -> Self::Output {
Self::Output {
event_count: self.event_count + other.event_count,
events_byte_size: self.events_byte_size + other.events_byte_size,
events_estimated_json_encoded_byte_size: self.events_estimated_json_encoded_byte_size
+ &other.events_estimated_json_encoded_byte_size,
request_encoded_size: self.request_encoded_size + other.request_encoded_size,
request_wire_size: self.request_wire_size + other.request_wire_size,
}
}
}
pub trait MetaDescriptive {
fn get_metadata(&self) -> &RequestMetadata;
fn metadata_mut(&mut self) -> &mut RequestMetadata;
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use crate::{config::ComponentKey, internal_event::OptionalTag};
use super::*;
struct DummyEvent {
source: OptionalTag<Arc<ComponentKey>>,
service: OptionalTag<String>,
}
impl GetEventCountTags for DummyEvent {
fn get_tags(&self) -> TaggedEventsSent {
TaggedEventsSent {
source: self.source.clone(),
service: self.service.clone(),
}
}
}
#[test]
fn add_request_count_bytesize_event_untagged() {
let mut bytesize = GroupedCountByteSize::new_untagged();
let event = DummyEvent {
source: Some(Arc::new(ComponentKey::from("carrot"))).into(),
service: Some("cabbage".to_string()).into(),
};
bytesize.add_event(&event, JsonSize::new(42));
let event = DummyEvent {
source: Some(Arc::new(ComponentKey::from("pea"))).into(),
service: Some("potato".to_string()).into(),
};
bytesize.add_event(&event, JsonSize::new(36));
assert_eq!(Some(CountByteSize(2, JsonSize::new(78))), bytesize.size());
assert_eq!(None, bytesize.sizes());
}
#[test]
fn add_request_count_bytesize_event_tagged() {
let mut bytesize = GroupedCountByteSize::new_tagged();
let event = DummyEvent {
source: OptionalTag::Ignored,
service: Some("cabbage".to_string()).into(),
};
bytesize.add_event(&event, JsonSize::new(42));
let event = DummyEvent {
source: OptionalTag::Ignored,
service: Some("cabbage".to_string()).into(),
};
bytesize.add_event(&event, JsonSize::new(36));
let event = DummyEvent {
source: OptionalTag::Ignored,
service: Some("tomato".to_string()).into(),
};
bytesize.add_event(&event, JsonSize::new(23));
assert_eq!(None, bytesize.size());
let mut sizes = bytesize
.sizes()
.unwrap()
.clone()
.into_iter()
.collect::<Vec<_>>();
sizes.sort();
assert_eq!(
vec![
(
TaggedEventsSent {
source: OptionalTag::Ignored,
service: Some("cabbage".to_string()).into()
},
CountByteSize(2, JsonSize::new(78))
),
(
TaggedEventsSent {
source: OptionalTag::Ignored,
service: Some("tomato".to_string()).into()
},
CountByteSize(1, JsonSize::new(23))
),
],
sizes
);
}
}