1use std::{
2 cmp,
3 io::{self, Write},
4 mem,
5 sync::{Arc, LazyLock, OnceLock},
6};
7
8use bytes::{BufMut, Bytes};
9use chrono::{DateTime, Utc};
10use snafu::{ResultExt, Snafu};
11use vector_lib::{
12 EstimatedJsonEncodedSizeOf,
13 config::{LogSchema, log_schema, telemetry},
14 event::{DatadogMetricOriginMetadata, Metric, MetricTags, MetricValue, metric::MetricSketch},
15 metrics::AgentDDSketch,
16 request_metadata::GroupedCountByteSize,
17};
18
19use super::config::{DatadogMetricsEndpoint, SeriesApiVersion};
20use crate::{
21 common::datadog::{
22 DatadogMetricType, DatadogPoint, DatadogSeriesMetric, DatadogSeriesMetricMetadata,
23 },
24 proto::fds::protobuf_descriptors,
25 sinks::util::{Compression, Compressor, encode_namespace, request_builder::EncodeResult},
26};
27
28const SERIES_PAYLOAD_HEADER: &[u8] = b"{\"series\":[";
29const SERIES_PAYLOAD_FOOTER: &[u8] = b"]}";
30const SERIES_PAYLOAD_DELIMITER: &[u8] = b",";
31
32pub(super) const ORIGIN_CATEGORY_VALUE: u32 = 11;
33
34const DEFAULT_DD_ORIGIN_PRODUCT_VALUE: u32 = 14;
35
36pub(super) static ORIGIN_PRODUCT_VALUE: LazyLock<u32> = LazyLock::new(|| {
37 option_env!("DD_ORIGIN_PRODUCT")
38 .map(|p| {
39 p.parse::<u32>()
40 .expect("Env var DD_ORIGIN_PRODUCT must be an unsigned 32 bit integer.")
41 })
42 .unwrap_or(DEFAULT_DD_ORIGIN_PRODUCT_VALUE)
43});
44
45#[allow(warnings, clippy::pedantic, clippy::nursery)]
46mod ddmetric_proto {
47 include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
48}
49
50#[derive(Debug, Snafu)]
51pub enum CreateError {
52 #[snafu(display("Invalid compressed/uncompressed payload size limits were given"))]
53 InvalidLimits,
54}
55
56impl CreateError {
57 pub const fn as_error_type(&self) -> &'static str {
61 match self {
62 Self::InvalidLimits => "invalid_payload_limits",
63 }
64 }
65}
66
67#[derive(Debug, Snafu)]
68pub enum EncoderError {
69 #[snafu(display(
70 "Invalid metric value '{}' was given; '{}' expected",
71 metric_value,
72 expected
73 ))]
74 InvalidMetric {
75 expected: &'static str,
76 metric_value: &'static str,
77 },
78
79 #[snafu(
80 context(false),
81 display("Failed to encode series metric to JSON: {source}")
82 )]
83 JsonEncodingFailed { source: serde_json::Error },
84
85 #[snafu(display(
88 "Failed to encode sketch metric to Protocol Buffers: insufficient buffer capacity."
89 ))]
90 ProtoEncodingFailed,
91}
92
93impl EncoderError {
94 pub const fn as_error_type(&self) -> &'static str {
98 match self {
99 Self::InvalidMetric { .. } => "invalid_metric",
100 Self::JsonEncodingFailed { .. } => "failed_to_encode_series",
101 Self::ProtoEncodingFailed => "failed_to_encode_sketch",
102 }
103 }
104}
105
106#[derive(Debug, Snafu)]
107pub enum FinishError {
108 #[snafu(display(
109 "Failure occurred during writing to or finalizing the compressor: {}",
110 source
111 ))]
112 CompressionFailed { source: io::Error },
113
114 #[snafu(display("Finished payload exceeded the (un)compressed size limits"))]
115 TooLarge {
116 metrics: Vec<Metric>,
117 recommended_splits: usize,
118 },
119}
120
121impl FinishError {
122 pub const fn as_error_type(&self) -> &'static str {
126 match self {
127 Self::CompressionFailed { .. } => "compression_failed",
128 Self::TooLarge { .. } => "too_large",
129 }
130 }
131}
132
133struct EncoderState {
134 writer: Compressor,
135 written: usize,
136 buf: Vec<u8>,
137 processed: Vec<Metric>,
138 byte_size: GroupedCountByteSize,
139}
140
141impl Default for EncoderState {
142 fn default() -> Self {
143 Self {
144 writer: get_compressor(),
145 written: 0,
146 buf: Vec::with_capacity(1024),
147 processed: Vec::new(),
148 byte_size: telemetry().create_request_count_byte_size(),
149 }
150 }
151}
152
153pub struct DatadogMetricsEncoder {
154 endpoint: DatadogMetricsEndpoint,
155 default_namespace: Option<Arc<str>>,
156 uncompressed_limit: usize,
157 compressed_limit: usize,
158
159 state: EncoderState,
160 log_schema: &'static LogSchema,
161
162 origin_product_value: u32,
163}
164
165impl DatadogMetricsEncoder {
166 pub fn new(
168 endpoint: DatadogMetricsEndpoint,
169 default_namespace: Option<String>,
170 ) -> Result<Self, CreateError> {
171 let payload_limits = endpoint.payload_limits();
172 Self::with_payload_limits(
173 endpoint,
174 default_namespace,
175 payload_limits.uncompressed,
176 payload_limits.compressed,
177 )
178 }
179
180 pub fn with_payload_limits(
182 endpoint: DatadogMetricsEndpoint,
183 default_namespace: Option<String>,
184 uncompressed_limit: usize,
185 compressed_limit: usize,
186 ) -> Result<Self, CreateError> {
187 let (uncompressed_limit, compressed_limit) =
188 validate_payload_size_limits(endpoint, uncompressed_limit, compressed_limit)
189 .ok_or(CreateError::InvalidLimits)?;
190
191 Ok(Self {
192 endpoint,
193 default_namespace: default_namespace.map(Arc::from),
194 uncompressed_limit,
195 compressed_limit,
196 state: EncoderState::default(),
197 log_schema: log_schema(),
198 origin_product_value: *ORIGIN_PRODUCT_VALUE,
199 })
200 }
201}
202
203impl DatadogMetricsEncoder {
204 fn reset_state(&mut self) -> EncoderState {
205 mem::take(&mut self.state)
206 }
207
208 fn encode_single_metric(&mut self, metric: Metric) -> Result<Option<Metric>, EncoderError> {
209 self.state.buf.clear();
221
222 self.state
223 .byte_size
224 .add_event(&metric, metric.estimated_json_encoded_size_of());
225
226 match self.endpoint {
242 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => {
244 let all_series = generate_series_metrics(
246 &metric,
247 &self.default_namespace,
248 self.log_schema,
249 self.origin_product_value,
250 )?;
251
252 let has_processed = !self.state.processed.is_empty();
255 for (i, series) in all_series.iter().enumerate() {
256 if (has_processed || i > 0)
258 && write_payload_delimiter(self.endpoint, &mut self.state.buf).is_err()
259 {
260 return Ok(Some(metric));
261 }
262 serde_json::to_writer(&mut self.state.buf, series)?;
263 }
264 }
265 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => match metric.value() {
267 MetricValue::Counter { .. }
268 | MetricValue::Gauge { .. }
269 | MetricValue::Set { .. }
270 | MetricValue::AggregatedSummary { .. } => {
271 let series_proto = series_to_proto_message(
272 &metric,
273 &self.default_namespace,
274 self.log_schema,
275 self.origin_product_value,
276 )?;
277
278 encode_proto_key_and_message(
279 series_proto,
280 get_series_payload_series_field_number(),
281 &mut self.state.buf,
282 )?;
283 }
284 value => {
285 return Err(EncoderError::InvalidMetric {
286 expected: "series",
287 metric_value: value.as_name(),
288 });
289 }
290 },
291 DatadogMetricsEndpoint::Sketches => match metric.value() {
293 MetricValue::Sketch { sketch } => match sketch {
294 MetricSketch::AgentDDSketch(ddsketch) => {
295 if let Some(sketch_proto) = sketch_to_proto_message(
296 &metric,
297 ddsketch,
298 &self.default_namespace,
299 self.log_schema,
300 self.origin_product_value,
301 ) {
302 encode_proto_key_and_message(
303 sketch_proto,
304 get_sketch_payload_sketches_field_number(),
305 &mut self.state.buf,
306 )?;
307 } else {
308 }
310 }
311 },
312 value => {
313 return Err(EncoderError::InvalidMetric {
314 expected: "sketches",
315 metric_value: value.as_name(),
316 });
317 }
318 },
319 }
320
321 match self.try_compress_buffer() {
323 Err(_) | Ok(false) => Ok(Some(metric)),
324 Ok(true) => {
325 self.state.processed.push(metric);
326 Ok(None)
327 }
328 }
329 }
330
331 fn try_compress_buffer(&mut self) -> io::Result<bool> {
332 let n = self.state.buf.len();
333
334 if self.state.written + n > self.uncompressed_limit {
336 return Ok(false);
337 }
338
339 let compressed_len = self.state.writer.get_ref().len();
355 let max_compressed_metric_len = n + max_compressed_overhead_len(n);
356 if compressed_len + max_compressed_metric_len > self.compressed_limit {
357 return Ok(false);
358 }
359
360 self.state.writer.write_all(&self.state.buf)?;
362 self.state.written += n;
363 Ok(true)
364 }
365
366 pub fn try_encode(&mut self, metric: Metric) -> Result<Option<Metric>, EncoderError> {
383 if self.state.written == 0 {
385 match write_payload_header(self.endpoint, &mut self.state.writer) {
386 Ok(n) => self.state.written += n,
387 Err(_) => return Ok(Some(metric)),
388 }
389 }
390
391 self.encode_single_metric(metric)
392 }
393
394 pub fn finish(&mut self) -> Result<(EncodeResult<Bytes>, Vec<Metric>), FinishError> {
395 let n = write_payload_footer(self.endpoint, &mut self.state.writer)
397 .context(CompressionFailedSnafu)?;
398 self.state.written += n;
399
400 let raw_bytes_written = self.state.written;
401 let byte_size = self.state.byte_size.clone();
402
403 let state = self.reset_state();
405 let payload = state
406 .writer
407 .finish()
408 .context(CompressionFailedSnafu)?
409 .freeze();
410 let processed = state.processed;
411
412 let compressed_splits = payload.len() / self.compressed_limit;
421 let uncompressed_splits = state.written / self.uncompressed_limit;
422 let recommended_splits = cmp::max(compressed_splits, uncompressed_splits) + 1;
423
424 if recommended_splits == 1 {
425 Ok((
427 EncodeResult::compressed(payload, raw_bytes_written, byte_size),
428 processed,
429 ))
430 } else {
431 Err(FinishError::TooLarge {
432 metrics: processed,
433 recommended_splits,
434 })
435 }
436 }
437}
438
439fn generate_proto_metadata(
440 maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
441 maybe_source_type: Option<&str>,
442 origin_product_value: u32,
443) -> Option<ddmetric_proto::Metadata> {
444 generate_origin_metadata(maybe_pass_through, maybe_source_type, origin_product_value).map(
445 |origin| {
446 if origin.product().is_none()
447 || origin.category().is_none()
448 || origin.service().is_none()
449 {
450 warn!(
451 message = "Generated sketch origin metadata should have each field set.",
452 product = origin.product(),
453 category = origin.category(),
454 service = origin.service()
455 );
456 }
457 ddmetric_proto::Metadata {
458 origin: Some(ddmetric_proto::Origin {
459 origin_product: origin.product().unwrap_or_default(),
460 origin_category: origin.category().unwrap_or_default(),
461 origin_service: origin.service().unwrap_or_default(),
462 }),
463 }
464 },
465 )
466}
467
468fn get_sketch_payload_sketches_field_number() -> u32 {
469 static SKETCH_PAYLOAD_SKETCHES_FIELD_NUM: OnceLock<u32> = OnceLock::new();
470 *SKETCH_PAYLOAD_SKETCHES_FIELD_NUM.get_or_init(|| {
471 let descriptors = protobuf_descriptors();
472 let descriptor = descriptors
473 .get_message_by_name("datadog.agentpayload.SketchPayload")
474 .expect("should not fail to find `SketchPayload` message in descriptor pool");
475
476 descriptor
477 .get_field_by_name("sketches")
478 .map(|field| field.number())
479 .expect("`sketches` field must exist in `SketchPayload` message")
480 })
481}
482
483fn get_series_payload_series_field_number() -> u32 {
484 static SERIES_PAYLOAD_SERIES_FIELD_NUM: OnceLock<u32> = OnceLock::new();
485 *SERIES_PAYLOAD_SERIES_FIELD_NUM.get_or_init(|| {
486 let descriptors = protobuf_descriptors();
487 let descriptor = descriptors
488 .get_message_by_name("datadog.agentpayload.MetricPayload")
489 .expect("should not fail to find `MetricPayload` message in descriptor pool");
490
491 descriptor
492 .get_field_by_name("series")
493 .map(|field| field.number())
494 .expect("`series` field must exist in `MetricPayload` message")
495 })
496}
497
498fn sketch_to_proto_message(
499 metric: &Metric,
500 ddsketch: &AgentDDSketch,
501 default_namespace: &Option<Arc<str>>,
502 log_schema: &'static LogSchema,
503 origin_product_value: u32,
504) -> Option<ddmetric_proto::sketch_payload::Sketch> {
505 if ddsketch.is_empty() {
506 return None;
507 }
508
509 let name = get_namespaced_name(metric, default_namespace);
510 let ts = encode_timestamp(metric.timestamp());
511 let mut tags = metric.tags().cloned().unwrap_or_default();
512 let host = log_schema
513 .host_key()
514 .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
515 .unwrap_or_default();
516 let tags = encode_tags(&tags);
517
518 let cnt = ddsketch.count() as i64;
519 let min = ddsketch
520 .min()
521 .expect("min should be present for non-empty sketch");
522 let max = ddsketch
523 .max()
524 .expect("max should be present for non-empty sketch");
525 let avg = ddsketch
526 .avg()
527 .expect("avg should be present for non-empty sketch");
528 let sum = ddsketch
529 .sum()
530 .expect("sum should be present for non-empty sketch");
531
532 let (bins, counts) = ddsketch.bin_map().into_parts();
533 let k = bins.into_iter().map(Into::into).collect();
534 let n = counts.into_iter().map(Into::into).collect();
535
536 let event_metadata = metric.metadata();
537 let metadata = generate_proto_metadata(
538 event_metadata.datadog_origin_metadata(),
539 event_metadata.source_type(),
540 origin_product_value,
541 );
542
543 trace!(?metadata, "Generated sketch metadata.");
544
545 Some(ddmetric_proto::sketch_payload::Sketch {
546 metric: name,
547 tags,
548 host,
549 distributions: Vec::new(),
550 dogsketches: vec![ddmetric_proto::sketch_payload::sketch::Dogsketch {
551 ts,
552 cnt,
553 min,
554 max,
555 avg,
556 sum,
557 k,
558 n,
559 }],
560 metadata,
561 })
562}
563
564fn series_to_proto_message(
565 metric: &Metric,
566 default_namespace: &Option<Arc<str>>,
567 log_schema: &'static LogSchema,
568 origin_product_value: u32,
569) -> Result<ddmetric_proto::metric_payload::MetricSeries, EncoderError> {
570 let metric_name = get_namespaced_name(metric, default_namespace);
571 let mut tags = metric.tags().cloned().unwrap_or_default();
572
573 let mut resources = vec![];
574
575 if let Some(host) = log_schema
576 .host_key()
577 .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
578 {
579 resources.push(ddmetric_proto::metric_payload::Resource {
580 r#type: "host".to_string(),
581 name: host,
582 });
583 }
584
585 if let Some(device) = tags.remove("device").or(tags.remove("resource.device")) {
588 resources.push(ddmetric_proto::metric_payload::Resource {
589 r#type: "device".to_string(),
590 name: device,
591 });
592 }
593
594 let source_type_name = tags.remove("source_type_name").unwrap_or_default();
595
596 let tags = encode_tags(&tags);
597
598 let event_metadata = metric.metadata();
599 let metadata = generate_proto_metadata(
600 event_metadata.datadog_origin_metadata(),
601 event_metadata.source_type(),
602 origin_product_value,
603 );
604 trace!(?metadata, "Generated MetricSeries metadata.");
605
606 let timestamp = encode_timestamp(metric.timestamp());
607
608 let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);
610
611 let (points, metric_type) = match metric.value() {
612 MetricValue::Counter { value } => {
613 if let Some(interval) = maybe_interval {
614 let value = *value / (interval as f64);
618 (
619 vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }],
620 ddmetric_proto::metric_payload::MetricType::Rate,
621 )
622 } else {
623 (
624 vec![ddmetric_proto::metric_payload::MetricPoint {
625 value: *value,
626 timestamp,
627 }],
628 ddmetric_proto::metric_payload::MetricType::Count,
629 )
630 }
631 }
632 MetricValue::Set { values } => (
633 vec![ddmetric_proto::metric_payload::MetricPoint {
634 value: values.len() as f64,
635 timestamp,
636 }],
637 ddmetric_proto::metric_payload::MetricType::Gauge,
638 ),
639 MetricValue::Gauge { value } => (
640 vec![ddmetric_proto::metric_payload::MetricPoint {
641 value: *value,
642 timestamp,
643 }],
644 ddmetric_proto::metric_payload::MetricType::Gauge,
645 ),
646 value => {
648 return Err(EncoderError::InvalidMetric {
650 expected: "series",
651 metric_value: value.as_name(),
652 });
653 }
654 };
655
656 Ok(ddmetric_proto::metric_payload::MetricSeries {
657 resources,
658 metric: metric_name,
659 tags,
660 points,
661 r#type: metric_type.into(),
662 unit: "".to_string(),
664 source_type_name,
665 interval: maybe_interval.unwrap_or(0) as i64,
666 metadata,
667 })
668}
669
670fn encode_proto_key_and_message<T, B>(msg: T, tag: u32, buf: &mut B) -> Result<(), EncoderError>
672where
673 T: prost::Message,
674 B: BufMut,
675{
676 prost::encoding::encode_key(tag, prost::encoding::WireType::LengthDelimited, buf);
677
678 msg.encode_length_delimited(buf)
679 .map_err(|_| EncoderError::ProtoEncodingFailed)
680}
681
682fn get_namespaced_name(metric: &Metric, default_namespace: &Option<Arc<str>>) -> String {
683 encode_namespace(
684 metric
685 .namespace()
686 .or_else(|| default_namespace.as_ref().map(|s| s.as_ref())),
687 '.',
688 metric.name(),
689 )
690}
691
692fn encode_tags(tags: &MetricTags) -> Vec<String> {
693 let mut pairs: Vec<_> = tags
694 .iter_all()
695 .map(|(name, value)| match value {
696 Some(value) => format!("{name}:{value}"),
697 None => name.into(),
698 })
699 .collect();
700 pairs.sort();
701 pairs
702}
703
704fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
705 if let Some(ts) = timestamp {
706 ts.timestamp()
707 } else {
708 Utc::now().timestamp()
709 }
710}
711
712fn source_type_to_service(source_type: &str) -> Option<u32> {
714 match source_type {
715 "datadog_agent" => None,
718
719 "apache_metrics" => Some(17),
721 "aws_ecs_metrics" => Some(209),
722 "eventstoredb_metrics" => Some(210),
723 "host_metrics" => Some(211),
724 "internal_metrics" => Some(212),
725 "mongodb_metrics" => Some(111),
726 "nginx_metrics" => Some(117),
727 "open_telemetry" => Some(213),
728 "postgresql_metrics" => Some(128),
729 "prometheus_remote_write" => Some(214),
730 "prometheus_scrape" => Some(215),
731 "statsd" => Some(153),
732
733 "kafka" | "nats" | "redis" | "gcp_pubsub" | "http_client" | "http_server" | "vector"
738 | "pulsar" => Some(0),
739
740 _ => {
745 debug!(
746 "Source {source_type} OriginService value is undefined! This source needs to be properly mapped to a Service value."
747 );
748 Some(0)
749 }
750 }
751}
752
753fn generate_origin_metadata(
758 maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
759 maybe_source_type: Option<&str>,
760 origin_product_value: u32,
761) -> Option<DatadogMetricOriginMetadata> {
762 let no_value = 0;
763
764 if let Some(pass_through) = maybe_pass_through {
773 Some(DatadogMetricOriginMetadata::new(
774 pass_through.product().or(Some(origin_product_value)),
775 pass_through.category().or(Some(ORIGIN_CATEGORY_VALUE)),
776 pass_through.service().or(Some(no_value)),
777 ))
778
779 } else {
781 maybe_source_type.and_then(|source_type| {
782 source_type_to_service(source_type).map(|origin_service_value| {
786 DatadogMetricOriginMetadata::new(
787 Some(origin_product_value),
788 Some(ORIGIN_CATEGORY_VALUE),
789 Some(origin_service_value),
790 )
791 })
792 })
793 }
794}
795
796fn generate_series_metadata(
797 maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
798 maybe_source_type: Option<&str>,
799 origin_product_value: u32,
800) -> Option<DatadogSeriesMetricMetadata> {
801 generate_origin_metadata(maybe_pass_through, maybe_source_type, origin_product_value).map(
802 |origin| DatadogSeriesMetricMetadata {
803 origin: Some(origin),
804 },
805 )
806}
807
808fn generate_series_metrics(
809 metric: &Metric,
810 default_namespace: &Option<Arc<str>>,
811 log_schema: &'static LogSchema,
812 origin_product_value: u32,
813) -> Result<Vec<DatadogSeriesMetric>, EncoderError> {
814 let name = get_namespaced_name(metric, default_namespace);
815
816 let mut tags = metric.tags().cloned().unwrap_or_default();
817 let host = log_schema
818 .host_key()
819 .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default());
820
821 let source_type_name = tags.remove("source_type_name");
822 let device = tags.remove("device");
823 let ts = encode_timestamp(metric.timestamp());
824 let tags = Some(encode_tags(&tags));
825
826 let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);
828
829 let event_metadata = metric.metadata();
830 let metadata = generate_series_metadata(
831 event_metadata.datadog_origin_metadata(),
832 event_metadata.source_type(),
833 origin_product_value,
834 );
835
836 trace!(?metadata, "Generated series metadata.");
837
838 let (points, metric_type) = match metric.value() {
839 MetricValue::Counter { value } => {
840 if let Some(interval) = maybe_interval {
841 let value = *value / (interval as f64);
845 (vec![DatadogPoint(ts, value)], DatadogMetricType::Rate)
846 } else {
847 (vec![DatadogPoint(ts, *value)], DatadogMetricType::Count)
848 }
849 }
850 MetricValue::Set { values } => (
851 vec![DatadogPoint(ts, values.len() as f64)],
852 DatadogMetricType::Gauge,
853 ),
854 MetricValue::Gauge { value } => (vec![DatadogPoint(ts, *value)], DatadogMetricType::Gauge),
855 value => {
857 return Err(EncoderError::InvalidMetric {
858 expected: "series",
859 metric_value: value.as_name(),
860 });
861 }
862 };
863
864 Ok(vec![DatadogSeriesMetric {
865 metric: name,
866 r#type: metric_type,
867 interval: maybe_interval,
868 points,
869 tags,
870 host,
871 source_type_name,
872 device,
873 metadata,
874 }])
875}
876
877fn get_compressor() -> Compressor {
878 Compression::zlib_default().into()
882}
883
884const fn max_uncompressed_header_len() -> usize {
885 SERIES_PAYLOAD_HEADER.len() + SERIES_PAYLOAD_FOOTER.len()
886}
887
888const ZLIB_HEADER_TRAILER: usize = 6;
893
894const fn max_compression_overhead_len(compressed_limit: usize) -> usize {
895 ZLIB_HEADER_TRAILER + max_compressed_overhead_len(compressed_limit)
899}
900
901const fn max_compressed_overhead_len(len: usize) -> usize {
902 const STORED_BLOCK_SIZE: usize = 16384;
916 (1 + len.saturating_sub(ZLIB_HEADER_TRAILER) / STORED_BLOCK_SIZE) * 5
917}
918
919const fn validate_payload_size_limits(
920 endpoint: DatadogMetricsEndpoint,
921 uncompressed_limit: usize,
922 compressed_limit: usize,
923) -> Option<(usize, usize)> {
924 if endpoint.is_series() {
925 let header_len = max_uncompressed_header_len();
931 if uncompressed_limit <= header_len {
932 return None;
933 }
934 }
935
936 let max_compression_overhead = max_compression_overhead_len(uncompressed_limit);
941 if compressed_limit <= max_compression_overhead {
942 return None;
943 }
944
945 Some((uncompressed_limit, compressed_limit))
946}
947
948fn write_payload_header(
949 endpoint: DatadogMetricsEndpoint,
950 writer: &mut dyn io::Write,
951) -> io::Result<usize> {
952 match endpoint {
953 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
954 .write_all(SERIES_PAYLOAD_HEADER)
955 .map(|_| SERIES_PAYLOAD_HEADER.len()),
956 _ => Ok(0),
957 }
958}
959
960fn write_payload_delimiter(
961 endpoint: DatadogMetricsEndpoint,
962 writer: &mut dyn io::Write,
963) -> io::Result<usize> {
964 match endpoint {
965 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
966 .write_all(SERIES_PAYLOAD_DELIMITER)
967 .map(|_| SERIES_PAYLOAD_DELIMITER.len()),
968 _ => Ok(0),
969 }
970}
971
972fn write_payload_footer(
973 endpoint: DatadogMetricsEndpoint,
974 writer: &mut dyn io::Write,
975) -> io::Result<usize> {
976 match endpoint {
977 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
978 .write_all(SERIES_PAYLOAD_FOOTER)
979 .map(|_| SERIES_PAYLOAD_FOOTER.len()),
980 _ => Ok(0),
981 }
982}
983
984#[cfg(test)]
985mod tests {
986 use std::{
987 io::{self, copy},
988 num::NonZeroU32,
989 sync::Arc,
990 };
991
992 use bytes::{BufMut, Bytes, BytesMut};
993 use chrono::{DateTime, TimeZone, Timelike, Utc};
994 use flate2::read::ZlibDecoder;
995 use proptest::{
996 arbitrary::any, collection::btree_map, num::f64::POSITIVE as ARB_POSITIVE_F64, prop_assert,
997 proptest, strategy::Strategy, string::string_regex,
998 };
999 use prost::Message;
1000 use vector_lib::{
1001 config::{LogSchema, log_schema},
1002 event::{
1003 DatadogMetricOriginMetadata, EventMetadata, Metric, MetricKind, MetricTags,
1004 MetricValue,
1005 metric::{MetricSketch, TagValue},
1006 },
1007 metric_tags,
1008 metrics::AgentDDSketch,
1009 };
1010
1011 use super::{
1012 DatadogMetricsEncoder, EncoderError, ddmetric_proto, encode_proto_key_and_message,
1013 encode_tags, encode_timestamp, generate_series_metrics, get_compressor,
1014 get_sketch_payload_sketches_field_number, max_compression_overhead_len,
1015 max_uncompressed_header_len, series_to_proto_message, sketch_to_proto_message,
1016 validate_payload_size_limits, write_payload_footer, write_payload_header,
1017 };
1018 use crate::{
1019 common::datadog::DatadogMetricType,
1020 sinks::datadog::metrics::{
1021 config::{DatadogMetricsEndpoint, SeriesApiVersion},
1022 encoder::{DEFAULT_DD_ORIGIN_PRODUCT_VALUE, ORIGIN_PRODUCT_VALUE},
1023 },
1024 };
1025
1026 fn get_simple_counter() -> Metric {
1027 let value = MetricValue::Counter { value: 3.14 };
1028 Metric::new("basic_counter", MetricKind::Incremental, value).with_timestamp(Some(ts()))
1029 }
1030
1031 fn get_simple_counter_with_metadata(metadata: EventMetadata) -> Metric {
1032 let value = MetricValue::Counter { value: 3.14 };
1033 Metric::new_with_metadata("basic_counter", MetricKind::Incremental, value, metadata)
1034 .with_timestamp(Some(ts()))
1035 }
1036
1037 fn get_simple_rate_counter(value: f64, interval_ms: u32) -> Metric {
1038 let value = MetricValue::Counter { value };
1039 Metric::new("basic_counter", MetricKind::Incremental, value)
1040 .with_timestamp(Some(ts()))
1041 .with_interval_ms(NonZeroU32::new(interval_ms))
1042 }
1043
1044 fn get_simple_sketch() -> Metric {
1045 let mut ddsketch = AgentDDSketch::with_agent_defaults();
1046 ddsketch.insert(3.14);
1047 Metric::new("basic_counter", MetricKind::Incremental, ddsketch.into())
1048 .with_timestamp(Some(ts()))
1049 }
1050
1051 fn get_compressed_empty_series_payload() -> Bytes {
1052 let mut compressor = get_compressor();
1053
1054 _ = write_payload_header(
1055 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1056 &mut compressor,
1057 )
1058 .expect("should not fail");
1059 _ = write_payload_footer(
1060 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1061 &mut compressor,
1062 )
1063 .expect("should not fail");
1064
1065 compressor.finish().expect("should not fail").freeze()
1066 }
1067
1068 fn get_compressed_empty_sketches_payload() -> Bytes {
1069 get_compressor().finish().expect("should not fail").freeze()
1070 }
1071
1072 fn decompress_payload(payload: Bytes) -> io::Result<Bytes> {
1073 let mut decompressor = ZlibDecoder::new(&payload[..]);
1074 let mut decompressed = BytesMut::new().writer();
1075 let result = copy(&mut decompressor, &mut decompressed);
1076 result.map(|_| decompressed.into_inner().freeze())
1077 }
1078
1079 fn ts() -> DateTime<Utc> {
1080 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
1081 .single()
1082 .and_then(|t| t.with_nanosecond(11))
1083 .expect("invalid timestamp")
1084 }
1085
1086 fn tags() -> MetricTags {
1087 metric_tags! {
1088 "normal_tag" => "value",
1089 "true_tag" => "true",
1090 "empty_tag" => TagValue::Bare,
1091 "multi_value" => "one",
1092 "multi_value" => "two",
1093 }
1094 }
1095
1096 fn encode_sketches_normal<B>(
1097 metrics: &[Metric],
1098 default_namespace: &Option<Arc<str>>,
1099 log_schema: &'static LogSchema,
1100 buf: &mut B,
1101 ) where
1102 B: BufMut,
1103 {
1104 let mut sketches = Vec::new();
1105 for metric in metrics {
1106 let MetricValue::Sketch { sketch } = metric.value() else {
1107 panic!("must be sketch")
1108 };
1109 match sketch {
1110 MetricSketch::AgentDDSketch(ddsketch) => {
1111 if let Some(sketch) =
1112 sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema, 14)
1113 {
1114 sketches.push(sketch);
1115 }
1116 }
1117 }
1118 }
1119
1120 let sketch_payload = ddmetric_proto::SketchPayload {
1121 metadata: None,
1122 sketches,
1123 };
1124
1125 sketch_payload.encode(buf).unwrap()
1127 }
1128
1129 #[test]
1130 fn test_encode_tags() {
1131 assert_eq!(
1132 encode_tags(&tags()),
1133 vec![
1134 "empty_tag",
1135 "multi_value:one",
1136 "multi_value:two",
1137 "normal_tag:value",
1138 "true_tag:true",
1139 ]
1140 );
1141 }
1142
1143 #[test]
1144 fn test_encode_timestamp() {
1145 assert_eq!(encode_timestamp(None), Utc::now().timestamp());
1146 assert_eq!(encode_timestamp(Some(ts())), 1542182950);
1147 }
1148
1149 #[test]
1150 fn incorrect_metric_for_endpoint_causes_error() {
1151 let mut sketch_encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
1153 .expect("default payload size limits should be valid");
1154 let series_result = sketch_encoder.try_encode(get_simple_counter());
1155 assert!(matches!(
1156 series_result.err(),
1157 Some(EncoderError::InvalidMetric { .. })
1158 ));
1159
1160 let mut series_v1_encoder =
1162 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None)
1163 .expect("default payload size limits should be valid");
1164 let sketch_result = series_v1_encoder.try_encode(get_simple_sketch());
1165 assert!(matches!(
1166 sketch_result.err(),
1167 Some(EncoderError::InvalidMetric { .. })
1168 ));
1169
1170 let mut series_v2_encoder =
1171 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None)
1172 .expect("default payload size limits should be valid");
1173 let sketch_result = series_v2_encoder.try_encode(get_simple_sketch());
1174 assert!(matches!(
1175 sketch_result.err(),
1176 Some(EncoderError::InvalidMetric { .. })
1177 ));
1178 }
1179
1180 #[test]
1181 fn encode_counter_with_interval_as_rate() {
1182 let value = 423.1331;
1188 let interval_ms = 10000;
1189 let rate_counter = get_simple_rate_counter(value, interval_ms);
1190 let expected_value = value / (interval_ms / 1000) as f64;
1191 let expected_interval = interval_ms / 1000;
1192
1193 {
1195 let result = generate_series_metrics(
1197 &rate_counter,
1198 &None,
1199 log_schema(),
1200 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1201 );
1202 assert!(result.is_ok());
1203
1204 let metrics = result.unwrap();
1205 assert_eq!(metrics.len(), 1);
1206
1207 let actual = &metrics[0];
1208 assert_eq!(actual.r#type, DatadogMetricType::Rate);
1209 assert_eq!(actual.interval, Some(expected_interval));
1210 assert_eq!(actual.points.len(), 1);
1211 assert_eq!(actual.points[0].1, expected_value);
1212 }
1213
1214 {
1216 let series_proto = series_to_proto_message(
1217 &rate_counter,
1218 &None,
1219 log_schema(),
1220 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1221 )
1222 .unwrap();
1223 assert_eq!(series_proto.r#type, 2);
1224 assert_eq!(series_proto.interval, expected_interval as i64);
1225 assert_eq!(series_proto.points.len(), 1);
1226 assert_eq!(series_proto.points[0].value, expected_value);
1227 }
1228 }
1229
1230 #[test]
1231 fn encode_non_rate_metric_with_interval() {
1232 let value = 423.1331;
1237 let interval_ms = 10000;
1238
1239 let gauge = Metric::new(
1240 "basic_gauge",
1241 MetricKind::Incremental,
1242 MetricValue::Gauge { value },
1243 )
1244 .with_timestamp(Some(ts()))
1245 .with_interval_ms(NonZeroU32::new(interval_ms));
1246
1247 let expected_value = value; let expected_interval = interval_ms / 1000;
1249
1250 {
1252 let result = generate_series_metrics(
1254 &gauge,
1255 &None,
1256 log_schema(),
1257 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1258 );
1259 assert!(result.is_ok());
1260
1261 let metrics = result.unwrap();
1262 assert_eq!(metrics.len(), 1);
1263
1264 let actual = &metrics[0];
1265 assert_eq!(actual.r#type, DatadogMetricType::Gauge);
1266 assert_eq!(actual.interval, Some(expected_interval));
1267 assert_eq!(actual.points.len(), 1);
1268 assert_eq!(actual.points[0].1, expected_value);
1269 }
1270
1271 {
1273 let series_proto = series_to_proto_message(
1274 &gauge,
1275 &None,
1276 log_schema(),
1277 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1278 )
1279 .unwrap();
1280 assert_eq!(series_proto.r#type, 3);
1281 assert_eq!(series_proto.interval, expected_interval as i64);
1282 assert_eq!(series_proto.points.len(), 1);
1283 assert_eq!(series_proto.points[0].value, expected_value);
1284 }
1285 }
1286
1287 #[test]
1288 fn encode_origin_metadata_pass_through() {
1289 let product = 10;
1290 let category = 11;
1291 let service = 9;
1292
1293 let event_metadata = EventMetadata::default().with_origin_metadata(
1294 DatadogMetricOriginMetadata::new(Some(product), Some(category), Some(service)),
1295 );
1296 let counter = get_simple_counter_with_metadata(event_metadata);
1297
1298 {
1300 let result = generate_series_metrics(
1301 &counter,
1302 &None,
1303 log_schema(),
1304 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1305 );
1306 assert!(result.is_ok());
1307
1308 let metrics = result.unwrap();
1309 assert_eq!(metrics.len(), 1);
1310
1311 let actual = &metrics[0];
1312 let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap();
1313
1314 assert_eq!(generated_origin.product().unwrap(), product);
1315 assert_eq!(generated_origin.category().unwrap(), category);
1316 assert_eq!(generated_origin.service().unwrap(), service);
1317 }
1318 {
1320 let series_proto = series_to_proto_message(
1321 &counter,
1322 &None,
1323 log_schema(),
1324 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1325 )
1326 .unwrap();
1327
1328 let generated_origin = series_proto.metadata.unwrap().origin.unwrap();
1329 assert_eq!(generated_origin.origin_product, product);
1330 assert_eq!(generated_origin.origin_category, category);
1331 assert_eq!(generated_origin.origin_service, service);
1332 }
1333 }
1334
1335 #[test]
1336 fn encode_origin_metadata_vector_sourced() {
1337 let product = *ORIGIN_PRODUCT_VALUE;
1338
1339 let category = 11;
1340 let service = 153;
1341
1342 let mut counter = get_simple_counter();
1343
1344 counter.metadata_mut().set_source_type("statsd");
1345
1346 {
1348 let result = generate_series_metrics(&counter, &None, log_schema(), product);
1349 assert!(result.is_ok());
1350
1351 let metrics = result.unwrap();
1352 assert_eq!(metrics.len(), 1);
1353
1354 let actual = &metrics[0];
1355 let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap();
1356
1357 assert_eq!(generated_origin.product().unwrap(), product);
1358 assert_eq!(generated_origin.category().unwrap(), category);
1359 assert_eq!(generated_origin.service().unwrap(), service);
1360 }
1361 {
1363 let series_proto = series_to_proto_message(
1364 &counter,
1365 &None,
1366 log_schema(),
1367 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1368 )
1369 .unwrap();
1370
1371 let generated_origin = series_proto.metadata.unwrap().origin.unwrap();
1372 assert_eq!(generated_origin.origin_product, product);
1373 assert_eq!(generated_origin.origin_category, category);
1374 assert_eq!(generated_origin.origin_service, service);
1375 }
1376 }
1377
1378 #[test]
1379 fn encode_single_series_v1_metric_with_default_limits() {
1380 let mut encoder =
1383 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None)
1384 .expect("default payload size limits should be valid");
1385 let counter = get_simple_counter();
1386 let expected = counter.clone();
1387
1388 let result = encoder.try_encode(counter);
1390 assert!(result.is_ok());
1391 assert_eq!(result.unwrap(), None);
1392
1393 let result = encoder.finish();
1395 assert!(result.is_ok());
1396
1397 let (_payload, mut processed) = result.unwrap();
1398 assert_eq!(processed.len(), 1);
1399 assert_eq!(expected, processed.pop().unwrap());
1400 }
1401
1402 #[test]
1403 fn encode_single_series_v2_metric_with_default_limits() {
1404 let mut encoder =
1407 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None)
1408 .expect("default payload size limits should be valid");
1409 let counter = get_simple_counter();
1410 let expected = counter.clone();
1411
1412 let result = encoder.try_encode(counter);
1414 assert!(result.is_ok());
1415 assert_eq!(result.unwrap(), None);
1416
1417 let result = encoder.finish();
1419 assert!(result.is_ok());
1420
1421 let (_payload, mut processed) = result.unwrap();
1422 assert_eq!(processed.len(), 1);
1423 assert_eq!(expected, processed.pop().unwrap());
1424 }
1425
1426 #[test]
1427 fn encode_single_sketch_metric_with_default_limits() {
1428 let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
1431 .expect("default payload size limits should be valid");
1432 let sketch = get_simple_sketch();
1433 let expected = sketch.clone();
1434
1435 let result = encoder.try_encode(sketch);
1437 assert!(result.is_ok());
1438 assert_eq!(result.unwrap(), None);
1439
1440 let result = encoder.finish();
1442 assert!(result.is_ok());
1443
1444 let (_payload, mut processed) = result.unwrap();
1445 assert_eq!(processed.len(), 1);
1446 assert_eq!(expected, processed.pop().unwrap());
1447 }
1448
1449 #[test]
1450 fn encode_empty_sketch() {
1451 let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
1454 .expect("default payload size limits should be valid");
1455 let sketch = Metric::new(
1456 "empty",
1457 MetricKind::Incremental,
1458 AgentDDSketch::with_agent_defaults().into(),
1459 )
1460 .with_timestamp(Some(ts()));
1461 let expected = sketch.clone();
1462
1463 let result = encoder.try_encode(sketch);
1465 assert!(result.is_ok());
1466 assert_eq!(result.unwrap(), None);
1467
1468 let result = encoder.finish();
1470 assert!(result.is_ok());
1471
1472 let (_payload, mut processed) = result.unwrap();
1473 assert_eq!(processed.len(), 1);
1474 assert_eq!(expected, processed.pop().unwrap());
1475 }
1476
1477 #[test]
1478 fn encode_multiple_sketch_metrics_normal_vs_incremental() {
1479 let metrics = vec![
1482 get_simple_sketch(),
1483 get_simple_sketch(),
1484 get_simple_sketch(),
1485 ];
1486
1487 let mut normal_buf = Vec::new();
1488 encode_sketches_normal(&metrics, &None, log_schema(), &mut normal_buf);
1489
1490 let mut incremental_buf = Vec::new();
1491 for metric in &metrics {
1492 match metric.value() {
1493 MetricValue::Sketch { sketch } => match sketch {
1494 MetricSketch::AgentDDSketch(ddsketch) => {
1495 if let Some(sketch_proto) =
1496 sketch_to_proto_message(metric, ddsketch, &None, log_schema(), 14)
1497 {
1498 encode_proto_key_and_message(
1499 sketch_proto,
1500 get_sketch_payload_sketches_field_number(),
1501 &mut incremental_buf,
1502 )
1503 .unwrap();
1504 }
1505 }
1506 },
1507 _ => panic!("should be a sketch"),
1508 }
1509 }
1510
1511 assert_eq!(normal_buf, incremental_buf);
1512 }
1513
1514 #[test]
1515 fn payload_size_limits_series() {
1516 let header_len = max_uncompressed_header_len();
1518
1519 let result = validate_payload_size_limits(
1521 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1522 header_len,
1523 usize::MAX,
1524 );
1525 assert_eq!(result, None);
1526
1527 let result = validate_payload_size_limits(
1529 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1530 header_len + 1,
1531 usize::MAX,
1532 );
1533 assert_eq!(result, Some((header_len + 1, usize::MAX)));
1534
1535 let compression_overhead_len = max_compression_overhead_len(usize::MAX);
1539
1540 let result = validate_payload_size_limits(
1542 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1543 usize::MAX,
1544 compression_overhead_len,
1545 );
1546 assert_eq!(result, None);
1547
1548 let result = validate_payload_size_limits(
1550 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1551 usize::MAX,
1552 compression_overhead_len + 1,
1553 );
1554 assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1)));
1555 }
1556
1557 #[test]
1558 fn payload_size_limits_sketches() {
1559 let result = validate_payload_size_limits(DatadogMetricsEndpoint::Sketches, 0, usize::MAX);
1561 assert_eq!(result, Some((0, usize::MAX)));
1562
1563 let compression_overhead_len = max_compression_overhead_len(usize::MAX);
1567
1568 let result = validate_payload_size_limits(
1570 DatadogMetricsEndpoint::Sketches,
1571 usize::MAX,
1572 compression_overhead_len,
1573 );
1574 assert_eq!(result, None);
1575
1576 let result = validate_payload_size_limits(
1578 DatadogMetricsEndpoint::Sketches,
1579 usize::MAX,
1580 compression_overhead_len + 1,
1581 );
1582 assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1)));
1583 }
1584
1585 #[test]
1586 fn encode_series_breaks_out_when_limit_reached_uncompressed() {
1587 let header_len = max_uncompressed_header_len();
1591 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1592 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1593 None,
1594 header_len + 1,
1595 usize::MAX,
1596 )
1597 .expect("payload size limits should be valid");
1598
1599 let counter = get_simple_counter();
1603 let result = encoder.try_encode(counter.clone());
1604 assert!(result.is_ok());
1605 assert_eq!(result.unwrap(), Some(counter));
1606
1607 let result = encoder.finish();
1611 assert!(result.is_ok());
1612
1613 let (payload, processed) = result.unwrap();
1614 assert_eq!(
1615 payload.uncompressed_byte_size,
1616 max_uncompressed_header_len()
1617 );
1618 assert_eq!(
1619 payload.into_payload(),
1620 get_compressed_empty_series_payload()
1621 );
1622 assert_eq!(processed.len(), 0);
1623 }
1624
1625 #[test]
1626 fn encode_sketches_breaks_out_when_limit_reached_uncompressed() {
1627 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1631 DatadogMetricsEndpoint::Sketches,
1632 None,
1633 1,
1634 usize::MAX,
1635 )
1636 .expect("payload size limits should be valid");
1637
1638 let sketch = get_simple_sketch();
1642 let result = encoder.try_encode(sketch.clone());
1643 assert!(result.is_ok());
1644 assert_eq!(result.unwrap(), Some(sketch));
1645
1646 let result = encoder.finish();
1649 assert!(result.is_ok());
1650
1651 let (payload, processed) = result.unwrap();
1652 assert_eq!(payload.uncompressed_byte_size, 0);
1653 assert_eq!(
1654 payload.into_payload(),
1655 get_compressed_empty_sketches_payload()
1656 );
1657 assert_eq!(processed.len(), 0);
1658 }
1659
1660 #[test]
1661 fn encode_series_breaks_out_when_limit_reached_compressed() {
1662 let uncompressed_limit = 128;
1666 let compressed_limit = 32;
1667 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1668 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1669 None,
1670 uncompressed_limit,
1671 compressed_limit,
1672 )
1673 .expect("payload size limits should be valid");
1674
1675 let counter = get_simple_counter();
1679 let result = encoder.try_encode(counter.clone());
1680 assert!(result.is_ok());
1681 assert_eq!(result.unwrap(), Some(counter));
1682
1683 let result = encoder.finish();
1687 assert!(result.is_ok());
1688
1689 let (payload, processed) = result.unwrap();
1690 assert_eq!(
1691 payload.uncompressed_byte_size,
1692 max_uncompressed_header_len()
1693 );
1694 assert_eq!(
1695 payload.into_payload(),
1696 get_compressed_empty_series_payload()
1697 );
1698 assert_eq!(processed.len(), 0);
1699 }
1700
1701 #[test]
1702 fn encode_sketches_breaks_out_when_limit_reached_compressed() {
1703 let uncompressed_limit = 128;
1707 let compressed_limit = 16;
1708 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1709 DatadogMetricsEndpoint::Sketches,
1710 None,
1711 uncompressed_limit,
1712 compressed_limit,
1713 )
1714 .expect("payload size limits should be valid");
1715
1716 let sketch = get_simple_sketch();
1720 let result = encoder.try_encode(sketch.clone());
1721 assert!(result.is_ok());
1722 assert_eq!(result.unwrap(), Some(sketch));
1723
1724 let result = encoder.finish();
1728 assert!(result.is_ok());
1729
1730 let (payload, processed) = result.unwrap();
1731 assert_eq!(payload.uncompressed_byte_size, 0);
1732 assert_eq!(
1733 payload.into_payload(),
1734 get_compressed_empty_sketches_payload()
1735 );
1736 assert_eq!(processed.len(), 0);
1737 }
1738
1739 fn arb_counter_metric() -> impl Strategy<Value = Metric> {
1740 let name = string_regex("[a-zA-Z][a-zA-Z0-9_]{8,96}").expect("regex should not be invalid");
1741 let value = ARB_POSITIVE_F64;
1742 let tags = btree_map(
1743 any::<u64>().prop_map(|v| v.to_string()),
1744 any::<u64>().prop_map(|v| v.to_string()),
1745 0..64,
1746 )
1747 .prop_map(|tags| (!tags.is_empty()).then(|| MetricTags::from(tags)));
1748
1749 (name, value, tags).prop_map(|(metric_name, metric_value, metric_tags)| {
1750 let metric_value = MetricValue::Counter {
1751 value: metric_value,
1752 };
1753 Metric::new(metric_name, MetricKind::Incremental, metric_value).with_tags(metric_tags)
1754 })
1755 }
1756
1757 proptest! {
1758 #[test]
1759 fn encoding_check_for_payload_limit_edge_cases(
1760 uncompressed_limit in 0..64_000_000usize,
1761 compressed_limit in 0..10_000_000usize,
1762 metric in arb_counter_metric(),
1763 ) {
1764 let result = DatadogMetricsEncoder::with_payload_limits(
1771 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1772 None,
1773 uncompressed_limit,
1774 compressed_limit,
1775 );
1776 if let Ok(mut encoder) = result {
1777 _ = encoder.try_encode(metric);
1778
1779 if let Ok((payload, _processed)) = encoder.finish() {
1780 let payload = payload.into_payload();
1781 prop_assert!(payload.len() <= compressed_limit);
1782
1783 let result = decompress_payload(payload);
1784 prop_assert!(result.is_ok());
1785
1786 let decompressed = result.unwrap();
1787 prop_assert!(decompressed.len() <= uncompressed_limit);
1788 }
1789 }
1790 }
1791 }
1792}