1use std::{
2 cmp,
3 io::{self, Write},
4 mem,
5 sync::{Arc, LazyLock, OnceLock},
6};
7
8use bytes::{BufMut, Bytes};
9use chrono::{DateTime, Utc};
10use snafu::{ResultExt, Snafu};
11use vector_lib::request_metadata::GroupedCountByteSize;
12use vector_lib::{
13 config::{log_schema, telemetry, LogSchema},
14 event::{metric::MetricSketch, DatadogMetricOriginMetadata, Metric, MetricTags, MetricValue},
15 metrics::AgentDDSketch,
16 EstimatedJsonEncodedSizeOf,
17};
18
19use super::config::{DatadogMetricsEndpoint, SeriesApiVersion};
20use crate::{
21 common::datadog::{
22 DatadogMetricType, DatadogPoint, DatadogSeriesMetric, DatadogSeriesMetricMetadata,
23 },
24 proto::fds::protobuf_descriptors,
25 sinks::util::{encode_namespace, request_builder::EncodeResult, Compression, Compressor},
26};
27
28const SERIES_PAYLOAD_HEADER: &[u8] = b"{\"series\":[";
29const SERIES_PAYLOAD_FOOTER: &[u8] = b"]}";
30const SERIES_PAYLOAD_DELIMITER: &[u8] = b",";
31
32pub(super) const ORIGIN_CATEGORY_VALUE: u32 = 11;
33
34const DEFAULT_DD_ORIGIN_PRODUCT_VALUE: u32 = 14;
35
36pub(super) static ORIGIN_PRODUCT_VALUE: LazyLock<u32> = LazyLock::new(|| {
37 option_env!("DD_ORIGIN_PRODUCT")
38 .map(|p| {
39 p.parse::<u32>()
40 .expect("Env var DD_ORIGIN_PRODUCT must be an unsigned 32 bit integer.")
41 })
42 .unwrap_or(DEFAULT_DD_ORIGIN_PRODUCT_VALUE)
43});
44
45#[allow(warnings, clippy::pedantic, clippy::nursery)]
46mod ddmetric_proto {
47 include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
48}
49
50#[derive(Debug, Snafu)]
51pub enum CreateError {
52 #[snafu(display("Invalid compressed/uncompressed payload size limits were given"))]
53 InvalidLimits,
54}
55
56impl CreateError {
57 pub const fn as_error_type(&self) -> &'static str {
61 match self {
62 Self::InvalidLimits => "invalid_payload_limits",
63 }
64 }
65}
66
67#[derive(Debug, Snafu)]
68pub enum EncoderError {
69 #[snafu(display(
70 "Invalid metric value '{}' was given; '{}' expected",
71 metric_value,
72 expected
73 ))]
74 InvalidMetric {
75 expected: &'static str,
76 metric_value: &'static str,
77 },
78
79 #[snafu(
80 context(false),
81 display("Failed to encode series metric to JSON: {source}")
82 )]
83 JsonEncodingFailed { source: serde_json::Error },
84
85 #[snafu(display(
88 "Failed to encode sketch metric to Protocol Buffers: insufficient buffer capacity."
89 ))]
90 ProtoEncodingFailed,
91}
92
93impl EncoderError {
94 pub const fn as_error_type(&self) -> &'static str {
98 match self {
99 Self::InvalidMetric { .. } => "invalid_metric",
100 Self::JsonEncodingFailed { .. } => "failed_to_encode_series",
101 Self::ProtoEncodingFailed => "failed_to_encode_sketch",
102 }
103 }
104}
105
106#[derive(Debug, Snafu)]
107pub enum FinishError {
108 #[snafu(display(
109 "Failure occurred during writing to or finalizing the compressor: {}",
110 source
111 ))]
112 CompressionFailed { source: io::Error },
113
114 #[snafu(display("Finished payload exceeded the (un)compressed size limits"))]
115 TooLarge {
116 metrics: Vec<Metric>,
117 recommended_splits: usize,
118 },
119}
120
121impl FinishError {
122 pub const fn as_error_type(&self) -> &'static str {
126 match self {
127 Self::CompressionFailed { .. } => "compression_failed",
128 Self::TooLarge { .. } => "too_large",
129 }
130 }
131}
132
133struct EncoderState {
134 writer: Compressor,
135 written: usize,
136 buf: Vec<u8>,
137 processed: Vec<Metric>,
138 byte_size: GroupedCountByteSize,
139}
140
141impl Default for EncoderState {
142 fn default() -> Self {
143 Self {
144 writer: get_compressor(),
145 written: 0,
146 buf: Vec::with_capacity(1024),
147 processed: Vec::new(),
148 byte_size: telemetry().create_request_count_byte_size(),
149 }
150 }
151}
152
153pub struct DatadogMetricsEncoder {
154 endpoint: DatadogMetricsEndpoint,
155 default_namespace: Option<Arc<str>>,
156 uncompressed_limit: usize,
157 compressed_limit: usize,
158
159 state: EncoderState,
160 log_schema: &'static LogSchema,
161
162 origin_product_value: u32,
163}
164
165impl DatadogMetricsEncoder {
166 pub fn new(
168 endpoint: DatadogMetricsEndpoint,
169 default_namespace: Option<String>,
170 ) -> Result<Self, CreateError> {
171 let payload_limits = endpoint.payload_limits();
172 Self::with_payload_limits(
173 endpoint,
174 default_namespace,
175 payload_limits.uncompressed,
176 payload_limits.compressed,
177 )
178 }
179
180 pub fn with_payload_limits(
182 endpoint: DatadogMetricsEndpoint,
183 default_namespace: Option<String>,
184 uncompressed_limit: usize,
185 compressed_limit: usize,
186 ) -> Result<Self, CreateError> {
187 let (uncompressed_limit, compressed_limit) =
188 validate_payload_size_limits(endpoint, uncompressed_limit, compressed_limit)
189 .ok_or(CreateError::InvalidLimits)?;
190
191 Ok(Self {
192 endpoint,
193 default_namespace: default_namespace.map(Arc::from),
194 uncompressed_limit,
195 compressed_limit,
196 state: EncoderState::default(),
197 log_schema: log_schema(),
198 origin_product_value: *ORIGIN_PRODUCT_VALUE,
199 })
200 }
201}
202
203impl DatadogMetricsEncoder {
204 fn reset_state(&mut self) -> EncoderState {
205 mem::take(&mut self.state)
206 }
207
208 fn encode_single_metric(&mut self, metric: Metric) -> Result<Option<Metric>, EncoderError> {
209 self.state.buf.clear();
221
222 self.state
223 .byte_size
224 .add_event(&metric, metric.estimated_json_encoded_size_of());
225
226 match self.endpoint {
242 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => {
244 let all_series = generate_series_metrics(
246 &metric,
247 &self.default_namespace,
248 self.log_schema,
249 self.origin_product_value,
250 )?;
251
252 let has_processed = !self.state.processed.is_empty();
255 for (i, series) in all_series.iter().enumerate() {
256 if (has_processed || i > 0)
258 && write_payload_delimiter(self.endpoint, &mut self.state.buf).is_err()
259 {
260 return Ok(Some(metric));
261 }
262 serde_json::to_writer(&mut self.state.buf, series)?;
263 }
264 }
265 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => match metric.value() {
267 MetricValue::Counter { .. }
268 | MetricValue::Gauge { .. }
269 | MetricValue::Set { .. }
270 | MetricValue::AggregatedSummary { .. } => {
271 let series_proto = series_to_proto_message(
272 &metric,
273 &self.default_namespace,
274 self.log_schema,
275 self.origin_product_value,
276 )?;
277
278 encode_proto_key_and_message(
279 series_proto,
280 get_series_payload_series_field_number(),
281 &mut self.state.buf,
282 )?;
283 }
284 value => {
285 return Err(EncoderError::InvalidMetric {
286 expected: "series",
287 metric_value: value.as_name(),
288 })
289 }
290 },
291 DatadogMetricsEndpoint::Sketches => match metric.value() {
293 MetricValue::Sketch { sketch } => match sketch {
294 MetricSketch::AgentDDSketch(ddsketch) => {
295 if let Some(sketch_proto) = sketch_to_proto_message(
296 &metric,
297 ddsketch,
298 &self.default_namespace,
299 self.log_schema,
300 self.origin_product_value,
301 ) {
302 encode_proto_key_and_message(
303 sketch_proto,
304 get_sketch_payload_sketches_field_number(),
305 &mut self.state.buf,
306 )?;
307 } else {
308 }
310 }
311 },
312 value => {
313 return Err(EncoderError::InvalidMetric {
314 expected: "sketches",
315 metric_value: value.as_name(),
316 })
317 }
318 },
319 }
320
321 match self.try_compress_buffer() {
323 Err(_) | Ok(false) => Ok(Some(metric)),
324 Ok(true) => {
325 self.state.processed.push(metric);
326 Ok(None)
327 }
328 }
329 }
330
331 fn try_compress_buffer(&mut self) -> io::Result<bool> {
332 let n = self.state.buf.len();
333
334 if self.state.written + n > self.uncompressed_limit {
336 return Ok(false);
337 }
338
339 let compressed_len = self.state.writer.get_ref().len();
355 let max_compressed_metric_len = n + max_compressed_overhead_len(n);
356 if compressed_len + max_compressed_metric_len > self.compressed_limit {
357 return Ok(false);
358 }
359
360 self.state.writer.write_all(&self.state.buf)?;
362 self.state.written += n;
363 Ok(true)
364 }
365
366 pub fn try_encode(&mut self, metric: Metric) -> Result<Option<Metric>, EncoderError> {
383 if self.state.written == 0 {
385 match write_payload_header(self.endpoint, &mut self.state.writer) {
386 Ok(n) => self.state.written += n,
387 Err(_) => return Ok(Some(metric)),
388 }
389 }
390
391 self.encode_single_metric(metric)
392 }
393
394 pub fn finish(&mut self) -> Result<(EncodeResult<Bytes>, Vec<Metric>), FinishError> {
395 let n = write_payload_footer(self.endpoint, &mut self.state.writer)
397 .context(CompressionFailedSnafu)?;
398 self.state.written += n;
399
400 let raw_bytes_written = self.state.written;
401 let byte_size = self.state.byte_size.clone();
402
403 let state = self.reset_state();
405 let payload = state
406 .writer
407 .finish()
408 .context(CompressionFailedSnafu)?
409 .freeze();
410 let processed = state.processed;
411
412 let compressed_splits = payload.len() / self.compressed_limit;
421 let uncompressed_splits = state.written / self.uncompressed_limit;
422 let recommended_splits = cmp::max(compressed_splits, uncompressed_splits) + 1;
423
424 if recommended_splits == 1 {
425 Ok((
427 EncodeResult::compressed(payload, raw_bytes_written, byte_size),
428 processed,
429 ))
430 } else {
431 Err(FinishError::TooLarge {
432 metrics: processed,
433 recommended_splits,
434 })
435 }
436 }
437}
438
439fn generate_proto_metadata(
440 maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
441 maybe_source_type: Option<&str>,
442 origin_product_value: u32,
443) -> Option<ddmetric_proto::Metadata> {
444 generate_origin_metadata(maybe_pass_through, maybe_source_type, origin_product_value).map(
445 |origin| {
446 if origin.product().is_none()
447 || origin.category().is_none()
448 || origin.service().is_none()
449 {
450 warn!(
451 message = "Generated sketch origin metadata should have each field set.",
452 product = origin.product(),
453 category = origin.category(),
454 service = origin.service()
455 );
456 }
457 ddmetric_proto::Metadata {
458 origin: Some(ddmetric_proto::Origin {
459 origin_product: origin.product().unwrap_or_default(),
460 origin_category: origin.category().unwrap_or_default(),
461 origin_service: origin.service().unwrap_or_default(),
462 }),
463 }
464 },
465 )
466}
467
468fn get_sketch_payload_sketches_field_number() -> u32 {
469 static SKETCH_PAYLOAD_SKETCHES_FIELD_NUM: OnceLock<u32> = OnceLock::new();
470 *SKETCH_PAYLOAD_SKETCHES_FIELD_NUM.get_or_init(|| {
471 let descriptors = protobuf_descriptors();
472 let descriptor = descriptors
473 .get_message_by_name("datadog.agentpayload.SketchPayload")
474 .expect("should not fail to find `SketchPayload` message in descriptor pool");
475
476 descriptor
477 .get_field_by_name("sketches")
478 .map(|field| field.number())
479 .expect("`sketches` field must exist in `SketchPayload` message")
480 })
481}
482
483fn get_series_payload_series_field_number() -> u32 {
484 static SERIES_PAYLOAD_SERIES_FIELD_NUM: OnceLock<u32> = OnceLock::new();
485 *SERIES_PAYLOAD_SERIES_FIELD_NUM.get_or_init(|| {
486 let descriptors = protobuf_descriptors();
487 let descriptor = descriptors
488 .get_message_by_name("datadog.agentpayload.MetricPayload")
489 .expect("should not fail to find `MetricPayload` message in descriptor pool");
490
491 descriptor
492 .get_field_by_name("series")
493 .map(|field| field.number())
494 .expect("`series` field must exist in `MetricPayload` message")
495 })
496}
497
498fn sketch_to_proto_message(
499 metric: &Metric,
500 ddsketch: &AgentDDSketch,
501 default_namespace: &Option<Arc<str>>,
502 log_schema: &'static LogSchema,
503 origin_product_value: u32,
504) -> Option<ddmetric_proto::sketch_payload::Sketch> {
505 if ddsketch.is_empty() {
506 return None;
507 }
508
509 let name = get_namespaced_name(metric, default_namespace);
510 let ts = encode_timestamp(metric.timestamp());
511 let mut tags = metric.tags().cloned().unwrap_or_default();
512 let host = log_schema
513 .host_key()
514 .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
515 .unwrap_or_default();
516 let tags = encode_tags(&tags);
517
518 let cnt = ddsketch.count() as i64;
519 let min = ddsketch
520 .min()
521 .expect("min should be present for non-empty sketch");
522 let max = ddsketch
523 .max()
524 .expect("max should be present for non-empty sketch");
525 let avg = ddsketch
526 .avg()
527 .expect("avg should be present for non-empty sketch");
528 let sum = ddsketch
529 .sum()
530 .expect("sum should be present for non-empty sketch");
531
532 let (bins, counts) = ddsketch.bin_map().into_parts();
533 let k = bins.into_iter().map(Into::into).collect();
534 let n = counts.into_iter().map(Into::into).collect();
535
536 let event_metadata = metric.metadata();
537 let metadata = generate_proto_metadata(
538 event_metadata.datadog_origin_metadata(),
539 event_metadata.source_type(),
540 origin_product_value,
541 );
542
543 trace!(?metadata, "Generated sketch metadata.");
544
545 Some(ddmetric_proto::sketch_payload::Sketch {
546 metric: name,
547 tags,
548 host,
549 distributions: Vec::new(),
550 dogsketches: vec![ddmetric_proto::sketch_payload::sketch::Dogsketch {
551 ts,
552 cnt,
553 min,
554 max,
555 avg,
556 sum,
557 k,
558 n,
559 }],
560 metadata,
561 })
562}
563
564fn series_to_proto_message(
565 metric: &Metric,
566 default_namespace: &Option<Arc<str>>,
567 log_schema: &'static LogSchema,
568 origin_product_value: u32,
569) -> Result<ddmetric_proto::metric_payload::MetricSeries, EncoderError> {
570 let metric_name = get_namespaced_name(metric, default_namespace);
571 let mut tags = metric.tags().cloned().unwrap_or_default();
572
573 let mut resources = vec![];
574
575 if let Some(host) = log_schema
576 .host_key()
577 .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
578 {
579 resources.push(ddmetric_proto::metric_payload::Resource {
580 r#type: "host".to_string(),
581 name: host,
582 });
583 }
584
585 if let Some(device) = tags.remove("device").or(tags.remove("resource.device")) {
588 resources.push(ddmetric_proto::metric_payload::Resource {
589 r#type: "device".to_string(),
590 name: device,
591 });
592 }
593
594 let source_type_name = tags.remove("source_type_name").unwrap_or_default();
595
596 let tags = encode_tags(&tags);
597
598 let event_metadata = metric.metadata();
599 let metadata = generate_proto_metadata(
600 event_metadata.datadog_origin_metadata(),
601 event_metadata.source_type(),
602 origin_product_value,
603 );
604 trace!(?metadata, "Generated MetricSeries metadata.");
605
606 let timestamp = encode_timestamp(metric.timestamp());
607
608 let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);
610
611 let (points, metric_type) = match metric.value() {
612 MetricValue::Counter { value } => {
613 if let Some(interval) = maybe_interval {
614 let value = *value / (interval as f64);
618 (
619 vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }],
620 ddmetric_proto::metric_payload::MetricType::Rate,
621 )
622 } else {
623 (
624 vec![ddmetric_proto::metric_payload::MetricPoint {
625 value: *value,
626 timestamp,
627 }],
628 ddmetric_proto::metric_payload::MetricType::Count,
629 )
630 }
631 }
632 MetricValue::Set { values } => (
633 vec![ddmetric_proto::metric_payload::MetricPoint {
634 value: values.len() as f64,
635 timestamp,
636 }],
637 ddmetric_proto::metric_payload::MetricType::Gauge,
638 ),
639 MetricValue::Gauge { value } => (
640 vec![ddmetric_proto::metric_payload::MetricPoint {
641 value: *value,
642 timestamp,
643 }],
644 ddmetric_proto::metric_payload::MetricType::Gauge,
645 ),
646 value => {
648 return Err(EncoderError::InvalidMetric {
650 expected: "series",
651 metric_value: value.as_name(),
652 });
653 }
654 };
655
656 Ok(ddmetric_proto::metric_payload::MetricSeries {
657 resources,
658 metric: metric_name,
659 tags,
660 points,
661 r#type: metric_type.into(),
662 unit: "".to_string(),
664 source_type_name,
665 interval: maybe_interval.unwrap_or(0) as i64,
666 metadata,
667 })
668}
669
670fn encode_proto_key_and_message<T, B>(msg: T, tag: u32, buf: &mut B) -> Result<(), EncoderError>
672where
673 T: prost::Message,
674 B: BufMut,
675{
676 prost::encoding::encode_key(tag, prost::encoding::WireType::LengthDelimited, buf);
677
678 msg.encode_length_delimited(buf)
679 .map_err(|_| EncoderError::ProtoEncodingFailed)
680}
681
682fn get_namespaced_name(metric: &Metric, default_namespace: &Option<Arc<str>>) -> String {
683 encode_namespace(
684 metric
685 .namespace()
686 .or_else(|| default_namespace.as_ref().map(|s| s.as_ref())),
687 '.',
688 metric.name(),
689 )
690}
691
692fn encode_tags(tags: &MetricTags) -> Vec<String> {
693 let mut pairs: Vec<_> = tags
694 .iter_all()
695 .map(|(name, value)| match value {
696 Some(value) => format!("{name}:{value}"),
697 None => name.into(),
698 })
699 .collect();
700 pairs.sort();
701 pairs
702}
703
704fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
705 if let Some(ts) = timestamp {
706 ts.timestamp()
707 } else {
708 Utc::now().timestamp()
709 }
710}
711
712fn source_type_to_service(source_type: &str) -> Option<u32> {
714 match source_type {
715 "datadog_agent" => None,
718
719 "apache_metrics" => Some(17),
721 "aws_ecs_metrics" => Some(209),
722 "eventstoredb_metrics" => Some(210),
723 "host_metrics" => Some(211),
724 "internal_metrics" => Some(212),
725 "mongodb_metrics" => Some(111),
726 "nginx_metrics" => Some(117),
727 "open_telemetry" => Some(213),
728 "postgresql_metrics" => Some(128),
729 "prometheus_remote_write" => Some(214),
730 "prometheus_scrape" => Some(215),
731 "statsd" => Some(153),
732
733 "kafka" | "nats" | "redis" | "gcp_pubsub" | "http_client" | "http_server" | "vector"
738 | "pulsar" => Some(0),
739
740 _ => {
745 debug!("Source {source_type} OriginService value is undefined! This source needs to be properly mapped to a Service value.");
746 Some(0)
747 }
748 }
749}
750
751fn generate_origin_metadata(
756 maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
757 maybe_source_type: Option<&str>,
758 origin_product_value: u32,
759) -> Option<DatadogMetricOriginMetadata> {
760 let no_value = 0;
761
762 if let Some(pass_through) = maybe_pass_through {
771 Some(DatadogMetricOriginMetadata::new(
772 pass_through.product().or(Some(origin_product_value)),
773 pass_through.category().or(Some(ORIGIN_CATEGORY_VALUE)),
774 pass_through.service().or(Some(no_value)),
775 ))
776
777 } else {
779 maybe_source_type.and_then(|source_type| {
780 source_type_to_service(source_type).map(|origin_service_value| {
784 DatadogMetricOriginMetadata::new(
785 Some(origin_product_value),
786 Some(ORIGIN_CATEGORY_VALUE),
787 Some(origin_service_value),
788 )
789 })
790 })
791 }
792}
793
794fn generate_series_metadata(
795 maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
796 maybe_source_type: Option<&str>,
797 origin_product_value: u32,
798) -> Option<DatadogSeriesMetricMetadata> {
799 generate_origin_metadata(maybe_pass_through, maybe_source_type, origin_product_value).map(
800 |origin| DatadogSeriesMetricMetadata {
801 origin: Some(origin),
802 },
803 )
804}
805
806fn generate_series_metrics(
807 metric: &Metric,
808 default_namespace: &Option<Arc<str>>,
809 log_schema: &'static LogSchema,
810 origin_product_value: u32,
811) -> Result<Vec<DatadogSeriesMetric>, EncoderError> {
812 let name = get_namespaced_name(metric, default_namespace);
813
814 let mut tags = metric.tags().cloned().unwrap_or_default();
815 let host = log_schema
816 .host_key()
817 .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default());
818
819 let source_type_name = tags.remove("source_type_name");
820 let device = tags.remove("device");
821 let ts = encode_timestamp(metric.timestamp());
822 let tags = Some(encode_tags(&tags));
823
824 let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);
826
827 let event_metadata = metric.metadata();
828 let metadata = generate_series_metadata(
829 event_metadata.datadog_origin_metadata(),
830 event_metadata.source_type(),
831 origin_product_value,
832 );
833
834 trace!(?metadata, "Generated series metadata.");
835
836 let (points, metric_type) = match metric.value() {
837 MetricValue::Counter { value } => {
838 if let Some(interval) = maybe_interval {
839 let value = *value / (interval as f64);
843 (vec![DatadogPoint(ts, value)], DatadogMetricType::Rate)
844 } else {
845 (vec![DatadogPoint(ts, *value)], DatadogMetricType::Count)
846 }
847 }
848 MetricValue::Set { values } => (
849 vec![DatadogPoint(ts, values.len() as f64)],
850 DatadogMetricType::Gauge,
851 ),
852 MetricValue::Gauge { value } => (vec![DatadogPoint(ts, *value)], DatadogMetricType::Gauge),
853 value => {
855 return Err(EncoderError::InvalidMetric {
856 expected: "series",
857 metric_value: value.as_name(),
858 })
859 }
860 };
861
862 Ok(vec![DatadogSeriesMetric {
863 metric: name,
864 r#type: metric_type,
865 interval: maybe_interval,
866 points,
867 tags,
868 host,
869 source_type_name,
870 device,
871 metadata,
872 }])
873}
874
875fn get_compressor() -> Compressor {
876 Compression::zlib_default().into()
880}
881
882const fn max_uncompressed_header_len() -> usize {
883 SERIES_PAYLOAD_HEADER.len() + SERIES_PAYLOAD_FOOTER.len()
884}
885
886const ZLIB_HEADER_TRAILER: usize = 6;
891
892const fn max_compression_overhead_len(compressed_limit: usize) -> usize {
893 ZLIB_HEADER_TRAILER + max_compressed_overhead_len(compressed_limit)
897}
898
899const fn max_compressed_overhead_len(len: usize) -> usize {
900 const STORED_BLOCK_SIZE: usize = 16384;
914 (1 + len.saturating_sub(ZLIB_HEADER_TRAILER) / STORED_BLOCK_SIZE) * 5
915}
916
917const fn validate_payload_size_limits(
918 endpoint: DatadogMetricsEndpoint,
919 uncompressed_limit: usize,
920 compressed_limit: usize,
921) -> Option<(usize, usize)> {
922 if endpoint.is_series() {
923 let header_len = max_uncompressed_header_len();
929 if uncompressed_limit <= header_len {
930 return None;
931 }
932 }
933
934 let max_compression_overhead = max_compression_overhead_len(uncompressed_limit);
939 if compressed_limit <= max_compression_overhead {
940 return None;
941 }
942
943 Some((uncompressed_limit, compressed_limit))
944}
945
946fn write_payload_header(
947 endpoint: DatadogMetricsEndpoint,
948 writer: &mut dyn io::Write,
949) -> io::Result<usize> {
950 match endpoint {
951 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
952 .write_all(SERIES_PAYLOAD_HEADER)
953 .map(|_| SERIES_PAYLOAD_HEADER.len()),
954 _ => Ok(0),
955 }
956}
957
958fn write_payload_delimiter(
959 endpoint: DatadogMetricsEndpoint,
960 writer: &mut dyn io::Write,
961) -> io::Result<usize> {
962 match endpoint {
963 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
964 .write_all(SERIES_PAYLOAD_DELIMITER)
965 .map(|_| SERIES_PAYLOAD_DELIMITER.len()),
966 _ => Ok(0),
967 }
968}
969
970fn write_payload_footer(
971 endpoint: DatadogMetricsEndpoint,
972 writer: &mut dyn io::Write,
973) -> io::Result<usize> {
974 match endpoint {
975 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
976 .write_all(SERIES_PAYLOAD_FOOTER)
977 .map(|_| SERIES_PAYLOAD_FOOTER.len()),
978 _ => Ok(0),
979 }
980}
981
982#[cfg(test)]
983mod tests {
984 use std::{
985 io::{self, copy},
986 num::NonZeroU32,
987 sync::Arc,
988 };
989
990 use bytes::{BufMut, Bytes, BytesMut};
991 use chrono::{DateTime, TimeZone, Timelike, Utc};
992 use flate2::read::ZlibDecoder;
993 use proptest::{
994 arbitrary::any, collection::btree_map, num::f64::POSITIVE as ARB_POSITIVE_F64, prop_assert,
995 proptest, strategy::Strategy, string::string_regex,
996 };
997 use prost::Message;
998 use vector_lib::{
999 config::{log_schema, LogSchema},
1000 event::{
1001 metric::{MetricSketch, TagValue},
1002 DatadogMetricOriginMetadata, EventMetadata, Metric, MetricKind, MetricTags,
1003 MetricValue,
1004 },
1005 metric_tags,
1006 metrics::AgentDDSketch,
1007 };
1008
1009 use super::{
1010 ddmetric_proto, encode_proto_key_and_message, encode_tags, encode_timestamp,
1011 generate_series_metrics, get_compressor, get_sketch_payload_sketches_field_number,
1012 max_compression_overhead_len, max_uncompressed_header_len, series_to_proto_message,
1013 sketch_to_proto_message, validate_payload_size_limits, write_payload_footer,
1014 write_payload_header, DatadogMetricsEncoder, EncoderError,
1015 };
1016 use crate::{
1017 common::datadog::DatadogMetricType,
1018 sinks::datadog::metrics::{
1019 config::{DatadogMetricsEndpoint, SeriesApiVersion},
1020 encoder::{DEFAULT_DD_ORIGIN_PRODUCT_VALUE, ORIGIN_PRODUCT_VALUE},
1021 },
1022 };
1023
1024 fn get_simple_counter() -> Metric {
1025 let value = MetricValue::Counter { value: 3.14 };
1026 Metric::new("basic_counter", MetricKind::Incremental, value).with_timestamp(Some(ts()))
1027 }
1028
1029 fn get_simple_counter_with_metadata(metadata: EventMetadata) -> Metric {
1030 let value = MetricValue::Counter { value: 3.14 };
1031 Metric::new_with_metadata("basic_counter", MetricKind::Incremental, value, metadata)
1032 .with_timestamp(Some(ts()))
1033 }
1034
1035 fn get_simple_rate_counter(value: f64, interval_ms: u32) -> Metric {
1036 let value = MetricValue::Counter { value };
1037 Metric::new("basic_counter", MetricKind::Incremental, value)
1038 .with_timestamp(Some(ts()))
1039 .with_interval_ms(NonZeroU32::new(interval_ms))
1040 }
1041
1042 fn get_simple_sketch() -> Metric {
1043 let mut ddsketch = AgentDDSketch::with_agent_defaults();
1044 ddsketch.insert(3.14);
1045 Metric::new("basic_counter", MetricKind::Incremental, ddsketch.into())
1046 .with_timestamp(Some(ts()))
1047 }
1048
1049 fn get_compressed_empty_series_payload() -> Bytes {
1050 let mut compressor = get_compressor();
1051
1052 _ = write_payload_header(
1053 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1054 &mut compressor,
1055 )
1056 .expect("should not fail");
1057 _ = write_payload_footer(
1058 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1059 &mut compressor,
1060 )
1061 .expect("should not fail");
1062
1063 compressor.finish().expect("should not fail").freeze()
1064 }
1065
1066 fn get_compressed_empty_sketches_payload() -> Bytes {
1067 get_compressor().finish().expect("should not fail").freeze()
1068 }
1069
1070 fn decompress_payload(payload: Bytes) -> io::Result<Bytes> {
1071 let mut decompressor = ZlibDecoder::new(&payload[..]);
1072 let mut decompressed = BytesMut::new().writer();
1073 let result = copy(&mut decompressor, &mut decompressed);
1074 result.map(|_| decompressed.into_inner().freeze())
1075 }
1076
1077 fn ts() -> DateTime<Utc> {
1078 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
1079 .single()
1080 .and_then(|t| t.with_nanosecond(11))
1081 .expect("invalid timestamp")
1082 }
1083
1084 fn tags() -> MetricTags {
1085 metric_tags! {
1086 "normal_tag" => "value",
1087 "true_tag" => "true",
1088 "empty_tag" => TagValue::Bare,
1089 "multi_value" => "one",
1090 "multi_value" => "two",
1091 }
1092 }
1093
1094 fn encode_sketches_normal<B>(
1095 metrics: &[Metric],
1096 default_namespace: &Option<Arc<str>>,
1097 log_schema: &'static LogSchema,
1098 buf: &mut B,
1099 ) where
1100 B: BufMut,
1101 {
1102 let mut sketches = Vec::new();
1103 for metric in metrics {
1104 let MetricValue::Sketch { sketch } = metric.value() else {
1105 panic!("must be sketch")
1106 };
1107 match sketch {
1108 MetricSketch::AgentDDSketch(ddsketch) => {
1109 if let Some(sketch) =
1110 sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema, 14)
1111 {
1112 sketches.push(sketch);
1113 }
1114 }
1115 }
1116 }
1117
1118 let sketch_payload = ddmetric_proto::SketchPayload {
1119 metadata: None,
1120 sketches,
1121 };
1122
1123 sketch_payload.encode(buf).unwrap()
1125 }
1126
1127 #[test]
1128 fn test_encode_tags() {
1129 assert_eq!(
1130 encode_tags(&tags()),
1131 vec![
1132 "empty_tag",
1133 "multi_value:one",
1134 "multi_value:two",
1135 "normal_tag:value",
1136 "true_tag:true",
1137 ]
1138 );
1139 }
1140
1141 #[test]
1142 fn test_encode_timestamp() {
1143 assert_eq!(encode_timestamp(None), Utc::now().timestamp());
1144 assert_eq!(encode_timestamp(Some(ts())), 1542182950);
1145 }
1146
1147 #[test]
1148 fn incorrect_metric_for_endpoint_causes_error() {
1149 let mut sketch_encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
1151 .expect("default payload size limits should be valid");
1152 let series_result = sketch_encoder.try_encode(get_simple_counter());
1153 assert!(matches!(
1154 series_result.err(),
1155 Some(EncoderError::InvalidMetric { .. })
1156 ));
1157
1158 let mut series_v1_encoder =
1160 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None)
1161 .expect("default payload size limits should be valid");
1162 let sketch_result = series_v1_encoder.try_encode(get_simple_sketch());
1163 assert!(matches!(
1164 sketch_result.err(),
1165 Some(EncoderError::InvalidMetric { .. })
1166 ));
1167
1168 let mut series_v2_encoder =
1169 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None)
1170 .expect("default payload size limits should be valid");
1171 let sketch_result = series_v2_encoder.try_encode(get_simple_sketch());
1172 assert!(matches!(
1173 sketch_result.err(),
1174 Some(EncoderError::InvalidMetric { .. })
1175 ));
1176 }
1177
1178 #[test]
1179 fn encode_counter_with_interval_as_rate() {
1180 let value = 423.1331;
1186 let interval_ms = 10000;
1187 let rate_counter = get_simple_rate_counter(value, interval_ms);
1188 let expected_value = value / (interval_ms / 1000) as f64;
1189 let expected_interval = interval_ms / 1000;
1190
1191 {
1193 let result = generate_series_metrics(
1195 &rate_counter,
1196 &None,
1197 log_schema(),
1198 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1199 );
1200 assert!(result.is_ok());
1201
1202 let metrics = result.unwrap();
1203 assert_eq!(metrics.len(), 1);
1204
1205 let actual = &metrics[0];
1206 assert_eq!(actual.r#type, DatadogMetricType::Rate);
1207 assert_eq!(actual.interval, Some(expected_interval));
1208 assert_eq!(actual.points.len(), 1);
1209 assert_eq!(actual.points[0].1, expected_value);
1210 }
1211
1212 {
1214 let series_proto = series_to_proto_message(
1215 &rate_counter,
1216 &None,
1217 log_schema(),
1218 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1219 )
1220 .unwrap();
1221 assert_eq!(series_proto.r#type, 2);
1222 assert_eq!(series_proto.interval, expected_interval as i64);
1223 assert_eq!(series_proto.points.len(), 1);
1224 assert_eq!(series_proto.points[0].value, expected_value);
1225 }
1226 }
1227
1228 #[test]
1229 fn encode_non_rate_metric_with_interval() {
1230 let value = 423.1331;
1235 let interval_ms = 10000;
1236
1237 let gauge = Metric::new(
1238 "basic_gauge",
1239 MetricKind::Incremental,
1240 MetricValue::Gauge { value },
1241 )
1242 .with_timestamp(Some(ts()))
1243 .with_interval_ms(NonZeroU32::new(interval_ms));
1244
1245 let expected_value = value; let expected_interval = interval_ms / 1000;
1247
1248 {
1250 let result = generate_series_metrics(
1252 &gauge,
1253 &None,
1254 log_schema(),
1255 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1256 );
1257 assert!(result.is_ok());
1258
1259 let metrics = result.unwrap();
1260 assert_eq!(metrics.len(), 1);
1261
1262 let actual = &metrics[0];
1263 assert_eq!(actual.r#type, DatadogMetricType::Gauge);
1264 assert_eq!(actual.interval, Some(expected_interval));
1265 assert_eq!(actual.points.len(), 1);
1266 assert_eq!(actual.points[0].1, expected_value);
1267 }
1268
1269 {
1271 let series_proto = series_to_proto_message(
1272 &gauge,
1273 &None,
1274 log_schema(),
1275 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1276 )
1277 .unwrap();
1278 assert_eq!(series_proto.r#type, 3);
1279 assert_eq!(series_proto.interval, expected_interval as i64);
1280 assert_eq!(series_proto.points.len(), 1);
1281 assert_eq!(series_proto.points[0].value, expected_value);
1282 }
1283 }
1284
1285 #[test]
1286 fn encode_origin_metadata_pass_through() {
1287 let product = 10;
1288 let category = 11;
1289 let service = 9;
1290
1291 let event_metadata = EventMetadata::default().with_origin_metadata(
1292 DatadogMetricOriginMetadata::new(Some(product), Some(category), Some(service)),
1293 );
1294 let counter = get_simple_counter_with_metadata(event_metadata);
1295
1296 {
1298 let result = generate_series_metrics(
1299 &counter,
1300 &None,
1301 log_schema(),
1302 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1303 );
1304 assert!(result.is_ok());
1305
1306 let metrics = result.unwrap();
1307 assert_eq!(metrics.len(), 1);
1308
1309 let actual = &metrics[0];
1310 let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap();
1311
1312 assert_eq!(generated_origin.product().unwrap(), product);
1313 assert_eq!(generated_origin.category().unwrap(), category);
1314 assert_eq!(generated_origin.service().unwrap(), service);
1315 }
1316 {
1318 let series_proto = series_to_proto_message(
1319 &counter,
1320 &None,
1321 log_schema(),
1322 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1323 )
1324 .unwrap();
1325
1326 let generated_origin = series_proto.metadata.unwrap().origin.unwrap();
1327 assert_eq!(generated_origin.origin_product, product);
1328 assert_eq!(generated_origin.origin_category, category);
1329 assert_eq!(generated_origin.origin_service, service);
1330 }
1331 }
1332
1333 #[test]
1334 fn encode_origin_metadata_vector_sourced() {
1335 let product = *ORIGIN_PRODUCT_VALUE;
1336
1337 let category = 11;
1338 let service = 153;
1339
1340 let mut counter = get_simple_counter();
1341
1342 counter.metadata_mut().set_source_type("statsd");
1343
1344 {
1346 let result = generate_series_metrics(&counter, &None, log_schema(), product);
1347 assert!(result.is_ok());
1348
1349 let metrics = result.unwrap();
1350 assert_eq!(metrics.len(), 1);
1351
1352 let actual = &metrics[0];
1353 let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap();
1354
1355 assert_eq!(generated_origin.product().unwrap(), product);
1356 assert_eq!(generated_origin.category().unwrap(), category);
1357 assert_eq!(generated_origin.service().unwrap(), service);
1358 }
1359 {
1361 let series_proto = series_to_proto_message(
1362 &counter,
1363 &None,
1364 log_schema(),
1365 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1366 )
1367 .unwrap();
1368
1369 let generated_origin = series_proto.metadata.unwrap().origin.unwrap();
1370 assert_eq!(generated_origin.origin_product, product);
1371 assert_eq!(generated_origin.origin_category, category);
1372 assert_eq!(generated_origin.origin_service, service);
1373 }
1374 }
1375
1376 #[test]
1377 fn encode_single_series_v1_metric_with_default_limits() {
1378 let mut encoder =
1381 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None)
1382 .expect("default payload size limits should be valid");
1383 let counter = get_simple_counter();
1384 let expected = counter.clone();
1385
1386 let result = encoder.try_encode(counter);
1388 assert!(result.is_ok());
1389 assert_eq!(result.unwrap(), None);
1390
1391 let result = encoder.finish();
1393 assert!(result.is_ok());
1394
1395 let (_payload, mut processed) = result.unwrap();
1396 assert_eq!(processed.len(), 1);
1397 assert_eq!(expected, processed.pop().unwrap());
1398 }
1399
1400 #[test]
1401 fn encode_single_series_v2_metric_with_default_limits() {
1402 let mut encoder =
1405 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None)
1406 .expect("default payload size limits should be valid");
1407 let counter = get_simple_counter();
1408 let expected = counter.clone();
1409
1410 let result = encoder.try_encode(counter);
1412 assert!(result.is_ok());
1413 assert_eq!(result.unwrap(), None);
1414
1415 let result = encoder.finish();
1417 assert!(result.is_ok());
1418
1419 let (_payload, mut processed) = result.unwrap();
1420 assert_eq!(processed.len(), 1);
1421 assert_eq!(expected, processed.pop().unwrap());
1422 }
1423
1424 #[test]
1425 fn encode_single_sketch_metric_with_default_limits() {
1426 let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
1429 .expect("default payload size limits should be valid");
1430 let sketch = get_simple_sketch();
1431 let expected = sketch.clone();
1432
1433 let result = encoder.try_encode(sketch);
1435 assert!(result.is_ok());
1436 assert_eq!(result.unwrap(), None);
1437
1438 let result = encoder.finish();
1440 assert!(result.is_ok());
1441
1442 let (_payload, mut processed) = result.unwrap();
1443 assert_eq!(processed.len(), 1);
1444 assert_eq!(expected, processed.pop().unwrap());
1445 }
1446
1447 #[test]
1448 fn encode_empty_sketch() {
1449 let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
1452 .expect("default payload size limits should be valid");
1453 let sketch = Metric::new(
1454 "empty",
1455 MetricKind::Incremental,
1456 AgentDDSketch::with_agent_defaults().into(),
1457 )
1458 .with_timestamp(Some(ts()));
1459 let expected = sketch.clone();
1460
1461 let result = encoder.try_encode(sketch);
1463 assert!(result.is_ok());
1464 assert_eq!(result.unwrap(), None);
1465
1466 let result = encoder.finish();
1468 assert!(result.is_ok());
1469
1470 let (_payload, mut processed) = result.unwrap();
1471 assert_eq!(processed.len(), 1);
1472 assert_eq!(expected, processed.pop().unwrap());
1473 }
1474
1475 #[test]
1476 fn encode_multiple_sketch_metrics_normal_vs_incremental() {
1477 let metrics = vec![
1480 get_simple_sketch(),
1481 get_simple_sketch(),
1482 get_simple_sketch(),
1483 ];
1484
1485 let mut normal_buf = Vec::new();
1486 encode_sketches_normal(&metrics, &None, log_schema(), &mut normal_buf);
1487
1488 let mut incremental_buf = Vec::new();
1489 for metric in &metrics {
1490 match metric.value() {
1491 MetricValue::Sketch { sketch } => match sketch {
1492 MetricSketch::AgentDDSketch(ddsketch) => {
1493 if let Some(sketch_proto) =
1494 sketch_to_proto_message(metric, ddsketch, &None, log_schema(), 14)
1495 {
1496 encode_proto_key_and_message(
1497 sketch_proto,
1498 get_sketch_payload_sketches_field_number(),
1499 &mut incremental_buf,
1500 )
1501 .unwrap();
1502 }
1503 }
1504 },
1505 _ => panic!("should be a sketch"),
1506 }
1507 }
1508
1509 assert_eq!(normal_buf, incremental_buf);
1510 }
1511
1512 #[test]
1513 fn payload_size_limits_series() {
1514 let header_len = max_uncompressed_header_len();
1516
1517 let result = validate_payload_size_limits(
1519 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1520 header_len,
1521 usize::MAX,
1522 );
1523 assert_eq!(result, None);
1524
1525 let result = validate_payload_size_limits(
1527 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1528 header_len + 1,
1529 usize::MAX,
1530 );
1531 assert_eq!(result, Some((header_len + 1, usize::MAX)));
1532
1533 let compression_overhead_len = max_compression_overhead_len(usize::MAX);
1537
1538 let result = validate_payload_size_limits(
1540 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1541 usize::MAX,
1542 compression_overhead_len,
1543 );
1544 assert_eq!(result, None);
1545
1546 let result = validate_payload_size_limits(
1548 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1549 usize::MAX,
1550 compression_overhead_len + 1,
1551 );
1552 assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1)));
1553 }
1554
1555 #[test]
1556 fn payload_size_limits_sketches() {
1557 let result = validate_payload_size_limits(DatadogMetricsEndpoint::Sketches, 0, usize::MAX);
1559 assert_eq!(result, Some((0, usize::MAX)));
1560
1561 let compression_overhead_len = max_compression_overhead_len(usize::MAX);
1565
1566 let result = validate_payload_size_limits(
1568 DatadogMetricsEndpoint::Sketches,
1569 usize::MAX,
1570 compression_overhead_len,
1571 );
1572 assert_eq!(result, None);
1573
1574 let result = validate_payload_size_limits(
1576 DatadogMetricsEndpoint::Sketches,
1577 usize::MAX,
1578 compression_overhead_len + 1,
1579 );
1580 assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1)));
1581 }
1582
1583 #[test]
1584 fn encode_series_breaks_out_when_limit_reached_uncompressed() {
1585 let header_len = max_uncompressed_header_len();
1589 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1590 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1591 None,
1592 header_len + 1,
1593 usize::MAX,
1594 )
1595 .expect("payload size limits should be valid");
1596
1597 let counter = get_simple_counter();
1601 let result = encoder.try_encode(counter.clone());
1602 assert!(result.is_ok());
1603 assert_eq!(result.unwrap(), Some(counter));
1604
1605 let result = encoder.finish();
1609 assert!(result.is_ok());
1610
1611 let (payload, processed) = result.unwrap();
1612 assert_eq!(
1613 payload.uncompressed_byte_size,
1614 max_uncompressed_header_len()
1615 );
1616 assert_eq!(
1617 payload.into_payload(),
1618 get_compressed_empty_series_payload()
1619 );
1620 assert_eq!(processed.len(), 0);
1621 }
1622
1623 #[test]
1624 fn encode_sketches_breaks_out_when_limit_reached_uncompressed() {
1625 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1629 DatadogMetricsEndpoint::Sketches,
1630 None,
1631 1,
1632 usize::MAX,
1633 )
1634 .expect("payload size limits should be valid");
1635
1636 let sketch = get_simple_sketch();
1640 let result = encoder.try_encode(sketch.clone());
1641 assert!(result.is_ok());
1642 assert_eq!(result.unwrap(), Some(sketch));
1643
1644 let result = encoder.finish();
1647 assert!(result.is_ok());
1648
1649 let (payload, processed) = result.unwrap();
1650 assert_eq!(payload.uncompressed_byte_size, 0);
1651 assert_eq!(
1652 payload.into_payload(),
1653 get_compressed_empty_sketches_payload()
1654 );
1655 assert_eq!(processed.len(), 0);
1656 }
1657
1658 #[test]
1659 fn encode_series_breaks_out_when_limit_reached_compressed() {
1660 let uncompressed_limit = 128;
1664 let compressed_limit = 32;
1665 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1666 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1667 None,
1668 uncompressed_limit,
1669 compressed_limit,
1670 )
1671 .expect("payload size limits should be valid");
1672
1673 let counter = get_simple_counter();
1677 let result = encoder.try_encode(counter.clone());
1678 assert!(result.is_ok());
1679 assert_eq!(result.unwrap(), Some(counter));
1680
1681 let result = encoder.finish();
1685 assert!(result.is_ok());
1686
1687 let (payload, processed) = result.unwrap();
1688 assert_eq!(
1689 payload.uncompressed_byte_size,
1690 max_uncompressed_header_len()
1691 );
1692 assert_eq!(
1693 payload.into_payload(),
1694 get_compressed_empty_series_payload()
1695 );
1696 assert_eq!(processed.len(), 0);
1697 }
1698
1699 #[test]
1700 fn encode_sketches_breaks_out_when_limit_reached_compressed() {
1701 let uncompressed_limit = 128;
1705 let compressed_limit = 16;
1706 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1707 DatadogMetricsEndpoint::Sketches,
1708 None,
1709 uncompressed_limit,
1710 compressed_limit,
1711 )
1712 .expect("payload size limits should be valid");
1713
1714 let sketch = get_simple_sketch();
1718 let result = encoder.try_encode(sketch.clone());
1719 assert!(result.is_ok());
1720 assert_eq!(result.unwrap(), Some(sketch));
1721
1722 let result = encoder.finish();
1726 assert!(result.is_ok());
1727
1728 let (payload, processed) = result.unwrap();
1729 assert_eq!(payload.uncompressed_byte_size, 0);
1730 assert_eq!(
1731 payload.into_payload(),
1732 get_compressed_empty_sketches_payload()
1733 );
1734 assert_eq!(processed.len(), 0);
1735 }
1736
1737 fn arb_counter_metric() -> impl Strategy<Value = Metric> {
1738 let name = string_regex("[a-zA-Z][a-zA-Z0-9_]{8,96}").expect("regex should not be invalid");
1739 let value = ARB_POSITIVE_F64;
1740 let tags = btree_map(
1741 any::<u64>().prop_map(|v| v.to_string()),
1742 any::<u64>().prop_map(|v| v.to_string()),
1743 0..64,
1744 )
1745 .prop_map(|tags| (!tags.is_empty()).then(|| MetricTags::from(tags)));
1746
1747 (name, value, tags).prop_map(|(metric_name, metric_value, metric_tags)| {
1748 let metric_value = MetricValue::Counter {
1749 value: metric_value,
1750 };
1751 Metric::new(metric_name, MetricKind::Incremental, metric_value).with_tags(metric_tags)
1752 })
1753 }
1754
1755 proptest! {
1756 #[test]
1757 fn encoding_check_for_payload_limit_edge_cases(
1758 uncompressed_limit in 0..64_000_000usize,
1759 compressed_limit in 0..10_000_000usize,
1760 metric in arb_counter_metric(),
1761 ) {
1762 let result = DatadogMetricsEncoder::with_payload_limits(
1769 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1770 None,
1771 uncompressed_limit,
1772 compressed_limit,
1773 );
1774 if let Ok(mut encoder) = result {
1775 _ = encoder.try_encode(metric);
1776
1777 if let Ok((payload, _processed)) = encoder.finish() {
1778 let payload = payload.into_payload();
1779 prop_assert!(payload.len() <= compressed_limit);
1780
1781 let result = decompress_payload(payload);
1782 prop_assert!(result.is_ok());
1783
1784 let decompressed = result.unwrap();
1785 prop_assert!(decompressed.len() <= uncompressed_limit);
1786 }
1787 }
1788 }
1789 }
1790}