1use std::{
2 cmp,
3 io::{self, Write},
4 mem,
5 sync::{Arc, LazyLock, OnceLock},
6};
7
8use bytes::{BufMut, Bytes};
9use chrono::{DateTime, Utc};
10use snafu::{ResultExt, Snafu};
11use vector_lib::{
12 EstimatedJsonEncodedSizeOf,
13 config::{LogSchema, log_schema, telemetry},
14 event::{DatadogMetricOriginMetadata, Metric, MetricTags, MetricValue, metric::MetricSketch},
15 metrics::AgentDDSketch,
16 request_metadata::GroupedCountByteSize,
17};
18
19use vector_common::constants::{
20 ZLIB_FRAME_OVERHEAD, ZLIB_STORED_BLOCK_OVERHEAD, ZLIB_STORED_BLOCK_SIZE,
21 ZSTD_SMALL_INPUT_THRESHOLD,
22};
23
24use super::config::{DatadogMetricsCompression, DatadogMetricsEndpoint, SeriesApiVersion};
25use crate::{
26 common::datadog::{
27 DatadogMetricType, DatadogPoint, DatadogSeriesMetric, DatadogSeriesMetricMetadata,
28 },
29 proto::fds::protobuf_descriptors,
30 sinks::util::{Compression, Compressor, encode_namespace, request_builder::EncodeResult},
31};
32
33const SERIES_PAYLOAD_HEADER: &[u8] = b"{\"series\":[";
34const SERIES_PAYLOAD_FOOTER: &[u8] = b"]}";
35const SERIES_PAYLOAD_DELIMITER: &[u8] = b",";
36
37pub(super) const ORIGIN_CATEGORY_VALUE: u32 = 11;
38
39const DEFAULT_DD_ORIGIN_PRODUCT_VALUE: u32 = 14;
40
41pub(super) static ORIGIN_PRODUCT_VALUE: LazyLock<u32> = LazyLock::new(|| {
42 option_env!("DD_ORIGIN_PRODUCT")
43 .map(|p| {
44 p.parse::<u32>()
45 .expect("Env var DD_ORIGIN_PRODUCT must be an unsigned 32 bit integer.")
46 })
47 .unwrap_or(DEFAULT_DD_ORIGIN_PRODUCT_VALUE)
48});
49
50#[allow(warnings, clippy::pedantic, clippy::nursery)]
51mod ddmetric_proto {
52 include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
53}
54
55#[derive(Debug, Snafu)]
56pub enum EncoderError {
57 #[snafu(display(
58 "Invalid metric value '{}' was given; '{}' expected",
59 metric_value,
60 expected
61 ))]
62 InvalidMetric {
63 expected: &'static str,
64 metric_value: &'static str,
65 },
66
67 #[snafu(
68 context(false),
69 display("Failed to encode series metric to JSON: {source}")
70 )]
71 JsonEncodingFailed { source: serde_json::Error },
72
73 #[snafu(display(
76 "Failed to encode sketch metric to Protocol Buffers: insufficient buffer capacity."
77 ))]
78 ProtoEncodingFailed,
79}
80
81impl EncoderError {
82 pub const fn as_error_type(&self) -> &'static str {
86 match self {
87 Self::InvalidMetric { .. } => "invalid_metric",
88 Self::JsonEncodingFailed { .. } => "failed_to_encode_series",
89 Self::ProtoEncodingFailed => "failed_to_encode_sketch",
90 }
91 }
92}
93
94#[derive(Debug, Snafu)]
95pub enum FinishError {
96 #[snafu(display(
97 "Failure occurred during writing to or finalizing the compressor: {}",
98 source
99 ))]
100 CompressionFailed { source: io::Error },
101
102 #[snafu(display("Finished payload exceeded the (un)compressed size limits"))]
103 TooLarge {
104 metrics: Vec<Metric>,
105 recommended_splits: usize,
106 },
107}
108
109impl FinishError {
110 pub const fn as_error_type(&self) -> &'static str {
114 match self {
115 Self::CompressionFailed { .. } => "compression_failed",
116 Self::TooLarge { .. } => "too_large",
117 }
118 }
119}
120
121struct EncoderState {
122 writer: Compressor,
123 written: usize,
124 buffered_bound: usize,
134 buf: Vec<u8>,
135 processed: Vec<Metric>,
136 byte_size: GroupedCountByteSize,
137}
138
139impl Default for EncoderState {
140 fn default() -> Self {
141 Self {
142 writer: Compression::zlib_default().into(),
143 written: 0,
144 buffered_bound: 0,
145 buf: Vec::with_capacity(1024),
146 processed: Vec::new(),
147 byte_size: telemetry().create_request_count_byte_size(),
148 }
149 }
150}
151
152pub struct DatadogMetricsEncoder {
153 endpoint: DatadogMetricsEndpoint,
154 default_namespace: Option<Arc<str>>,
155 uncompressed_limit: usize,
156 compressed_limit: usize,
157
158 state: EncoderState,
159 log_schema: &'static LogSchema,
160
161 origin_product_value: u32,
162}
163
164impl DatadogMetricsEncoder {
165 pub fn new(endpoint: DatadogMetricsEndpoint, default_namespace: Option<String>) -> Self {
167 let payload_limits = endpoint.payload_limits();
168
169 Self {
170 endpoint,
171 default_namespace: default_namespace.map(Arc::from),
172 uncompressed_limit: payload_limits.uncompressed,
173 compressed_limit: payload_limits.compressed,
174 state: EncoderState {
175 writer: endpoint.compression().compressor(),
176 ..Default::default()
177 },
178 log_schema: log_schema(),
179 origin_product_value: *ORIGIN_PRODUCT_VALUE,
180 }
181 }
182}
183
184#[cfg(test)]
185impl DatadogMetricsEncoder {
186 pub fn with_payload_limits(
190 endpoint: DatadogMetricsEndpoint,
191 default_namespace: Option<String>,
192 uncompressed_limit: usize,
193 compressed_limit: usize,
194 ) -> Self {
195 Self {
196 endpoint,
197 default_namespace: default_namespace.map(Arc::from),
198 uncompressed_limit,
199 compressed_limit,
200 state: EncoderState {
201 writer: endpoint.compression().compressor(),
202 ..Default::default()
203 },
204 log_schema: log_schema(),
205 origin_product_value: *ORIGIN_PRODUCT_VALUE,
206 }
207 }
208
209 fn buffered_bound(&self) -> usize {
211 self.state.buffered_bound
212 }
213}
214
215impl DatadogMetricsEncoder {
216 fn reset_state(&mut self) -> EncoderState {
217 let new_state = EncoderState {
218 writer: self.endpoint.compression().compressor(),
219 ..Default::default()
220 };
221 mem::replace(&mut self.state, new_state)
222 }
223
224 fn encode_single_metric(&mut self, metric: Metric) -> Result<Option<Metric>, EncoderError> {
225 self.state.buf.clear();
237
238 self.state
239 .byte_size
240 .add_event(&metric, metric.estimated_json_encoded_size_of());
241
242 match self.endpoint {
258 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => {
260 let all_series = generate_series_metrics(
262 &metric,
263 &self.default_namespace,
264 self.log_schema,
265 self.origin_product_value,
266 )?;
267
268 let has_processed = !self.state.processed.is_empty();
271 for (i, series) in all_series.iter().enumerate() {
272 if (has_processed || i > 0)
274 && write_payload_delimiter(self.endpoint, &mut self.state.buf).is_err()
275 {
276 return Ok(Some(metric));
277 }
278 serde_json::to_writer(&mut self.state.buf, series)?;
279 }
280 }
281 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => match metric.value() {
283 MetricValue::Counter { .. }
284 | MetricValue::Gauge { .. }
285 | MetricValue::Set { .. }
286 | MetricValue::AggregatedSummary { .. } => {
287 let series_proto = series_to_proto_message(
288 &metric,
289 &self.default_namespace,
290 self.log_schema,
291 self.origin_product_value,
292 )?;
293
294 encode_proto_key_and_message(
295 series_proto,
296 get_series_payload_series_field_number(),
297 &mut self.state.buf,
298 )?;
299 }
300 value => {
301 return Err(EncoderError::InvalidMetric {
302 expected: "series",
303 metric_value: value.as_name(),
304 });
305 }
306 },
307 DatadogMetricsEndpoint::Sketches => match metric.value() {
309 MetricValue::Sketch { sketch } => match sketch {
310 MetricSketch::AgentDDSketch(ddsketch) => {
311 if let Some(sketch_proto) = sketch_to_proto_message(
312 &metric,
313 ddsketch,
314 &self.default_namespace,
315 self.log_schema,
316 self.origin_product_value,
317 ) {
318 encode_proto_key_and_message(
319 sketch_proto,
320 get_sketch_payload_sketches_field_number(),
321 &mut self.state.buf,
322 )?;
323 } else {
324 }
326 }
327 },
328 value => {
329 return Err(EncoderError::InvalidMetric {
330 expected: "sketches",
331 metric_value: value.as_name(),
332 });
333 }
334 },
335 }
336
337 match self.try_compress_buffer() {
339 Err(_) | Ok(false) => Ok(Some(metric)),
340 Ok(true) => {
341 self.state.processed.push(metric);
342 Ok(None)
343 }
344 }
345 }
346
347 fn try_compress_buffer(&mut self) -> io::Result<bool> {
348 let n = self.state.buf.len();
349
350 if self.state.written + n > self.uncompressed_limit {
352 return Ok(false);
353 }
354
355 let compression = self.endpoint.compression();
372 let flushed_compressed = self.state.writer.get_ref().len();
373 if flushed_compressed + compression.max_compressed_size(self.state.buffered_bound + n)
374 > self.compressed_limit
375 {
376 return Ok(false);
377 }
378
379 self.state.writer.write_all(&self.state.buf)?;
385 self.state.written += n;
386 if self.state.writer.get_ref().len() > flushed_compressed {
387 self.state.buffered_bound = n;
388 } else {
389 self.state.buffered_bound += n;
390 }
391 Ok(true)
392 }
393
394 pub fn try_encode(&mut self, metric: Metric) -> Result<Option<Metric>, EncoderError> {
411 if self.state.written == 0 {
413 match write_payload_header(self.endpoint, &mut self.state.writer) {
414 Ok(n) => {
415 self.state.written += n;
416 self.state.buffered_bound += n;
417 }
418 Err(_) => return Ok(Some(metric)),
419 }
420 }
421
422 self.encode_single_metric(metric)
423 }
424
425 pub fn finish(&mut self) -> Result<(EncodeResult<Bytes>, Vec<Metric>), FinishError> {
426 let n = write_payload_footer(self.endpoint, &mut self.state.writer)
428 .context(CompressionFailedSnafu)?;
429 self.state.written += n;
430
431 let raw_bytes_written = self.state.written;
432 let byte_size = self.state.byte_size.clone();
433
434 let state = self.reset_state();
436 let payload = state
437 .writer
438 .finish()
439 .context(CompressionFailedSnafu)?
440 .freeze();
441 let processed = state.processed;
442
443 let compressed_splits = payload.len() / self.compressed_limit;
452 let uncompressed_splits = state.written / self.uncompressed_limit;
453 let recommended_splits = cmp::max(compressed_splits, uncompressed_splits) + 1;
454
455 if recommended_splits == 1 {
456 Ok((
458 EncodeResult::compressed(payload, raw_bytes_written, byte_size),
459 processed,
460 ))
461 } else {
462 Err(FinishError::TooLarge {
463 metrics: processed,
464 recommended_splits,
465 })
466 }
467 }
468}
469
470fn generate_proto_metadata(
471 maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
472 maybe_source_type: Option<&str>,
473 origin_product_value: u32,
474) -> Option<ddmetric_proto::Metadata> {
475 generate_origin_metadata(maybe_pass_through, maybe_source_type, origin_product_value).map(
476 |origin| {
477 if origin.product().is_none()
478 || origin.category().is_none()
479 || origin.service().is_none()
480 {
481 warn!(
482 message = "Generated sketch origin metadata should have each field set.",
483 product = origin.product(),
484 category = origin.category(),
485 service = origin.service()
486 );
487 }
488 ddmetric_proto::Metadata {
489 origin: Some(ddmetric_proto::Origin {
490 origin_product: origin.product().unwrap_or_default(),
491 origin_category: origin.category().unwrap_or_default(),
492 origin_service: origin.service().unwrap_or_default(),
493 }),
494 }
495 },
496 )
497}
498
499fn get_sketch_payload_sketches_field_number() -> u32 {
500 static SKETCH_PAYLOAD_SKETCHES_FIELD_NUM: OnceLock<u32> = OnceLock::new();
501 *SKETCH_PAYLOAD_SKETCHES_FIELD_NUM.get_or_init(|| {
502 let descriptors = protobuf_descriptors();
503 let descriptor = descriptors
504 .get_message_by_name("datadog.agentpayload.SketchPayload")
505 .expect("should not fail to find `SketchPayload` message in descriptor pool");
506
507 descriptor
508 .get_field_by_name("sketches")
509 .map(|field| field.number())
510 .expect("`sketches` field must exist in `SketchPayload` message")
511 })
512}
513
514fn get_series_payload_series_field_number() -> u32 {
515 static SERIES_PAYLOAD_SERIES_FIELD_NUM: OnceLock<u32> = OnceLock::new();
516 *SERIES_PAYLOAD_SERIES_FIELD_NUM.get_or_init(|| {
517 let descriptors = protobuf_descriptors();
518 let descriptor = descriptors
519 .get_message_by_name("datadog.agentpayload.MetricPayload")
520 .expect("should not fail to find `MetricPayload` message in descriptor pool");
521
522 descriptor
523 .get_field_by_name("series")
524 .map(|field| field.number())
525 .expect("`series` field must exist in `MetricPayload` message")
526 })
527}
528
529fn sketch_to_proto_message(
530 metric: &Metric,
531 ddsketch: &AgentDDSketch,
532 default_namespace: &Option<Arc<str>>,
533 log_schema: &'static LogSchema,
534 origin_product_value: u32,
535) -> Option<ddmetric_proto::sketch_payload::Sketch> {
536 if ddsketch.is_empty() {
537 return None;
538 }
539
540 let name = get_namespaced_name(metric, default_namespace);
541 let ts = encode_timestamp(metric.timestamp());
542 let mut tags = metric.tags().cloned().unwrap_or_default();
543 let host = log_schema
544 .host_key()
545 .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
546 .unwrap_or_default();
547 let tags = encode_tags(&tags);
548
549 let cnt = ddsketch.count() as i64;
550 let min = ddsketch
551 .min()
552 .expect("min should be present for non-empty sketch");
553 let max = ddsketch
554 .max()
555 .expect("max should be present for non-empty sketch");
556 let avg = ddsketch
557 .avg()
558 .expect("avg should be present for non-empty sketch");
559 let sum = ddsketch
560 .sum()
561 .expect("sum should be present for non-empty sketch");
562
563 let (bins, counts) = ddsketch.bin_map().into_parts();
564 let k = bins.into_iter().map(Into::into).collect();
565 let n = counts.into_iter().map(Into::into).collect();
566
567 let event_metadata = metric.metadata();
568 let metadata = generate_proto_metadata(
569 event_metadata.datadog_origin_metadata(),
570 event_metadata.source_type(),
571 origin_product_value,
572 );
573
574 trace!(?metadata, "Generated sketch metadata.");
575
576 Some(ddmetric_proto::sketch_payload::Sketch {
577 metric: name,
578 tags,
579 host,
580 distributions: Vec::new(),
581 dogsketches: vec![ddmetric_proto::sketch_payload::sketch::Dogsketch {
582 ts,
583 cnt,
584 min,
585 max,
586 avg,
587 sum,
588 k,
589 n,
590 }],
591 metadata,
592 })
593}
594
595fn series_to_proto_message(
596 metric: &Metric,
597 default_namespace: &Option<Arc<str>>,
598 log_schema: &'static LogSchema,
599 origin_product_value: u32,
600) -> Result<ddmetric_proto::metric_payload::MetricSeries, EncoderError> {
601 let metric_name = get_namespaced_name(metric, default_namespace);
602 let mut tags = metric.tags().cloned().unwrap_or_default();
603
604 let mut resources = vec![];
605
606 if let Some(host) = log_schema
607 .host_key()
608 .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
609 {
610 resources.push(ddmetric_proto::metric_payload::Resource {
611 r#type: "host".to_string(),
612 name: host,
613 });
614 }
615
616 if let Some(device) = tags.remove("device").or(tags.remove("resource.device")) {
619 resources.push(ddmetric_proto::metric_payload::Resource {
620 r#type: "device".to_string(),
621 name: device,
622 });
623 }
624
625 let source_type_name = tags.remove("source_type_name").unwrap_or_default();
626
627 let tags = encode_tags(&tags);
628
629 let event_metadata = metric.metadata();
630 let metadata = generate_proto_metadata(
631 event_metadata.datadog_origin_metadata(),
632 event_metadata.source_type(),
633 origin_product_value,
634 );
635 trace!(?metadata, "Generated MetricSeries metadata.");
636
637 let timestamp = encode_timestamp(metric.timestamp());
638
639 let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);
641
642 let (points, metric_type) = match metric.value() {
643 MetricValue::Counter { value } => {
644 if let Some(interval) = maybe_interval {
645 let value = *value / (interval as f64);
649 (
650 vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }],
651 ddmetric_proto::metric_payload::MetricType::Rate,
652 )
653 } else {
654 (
655 vec![ddmetric_proto::metric_payload::MetricPoint {
656 value: *value,
657 timestamp,
658 }],
659 ddmetric_proto::metric_payload::MetricType::Count,
660 )
661 }
662 }
663 MetricValue::Set { values } => (
664 vec![ddmetric_proto::metric_payload::MetricPoint {
665 value: values.len() as f64,
666 timestamp,
667 }],
668 ddmetric_proto::metric_payload::MetricType::Gauge,
669 ),
670 MetricValue::Gauge { value } => (
671 vec![ddmetric_proto::metric_payload::MetricPoint {
672 value: *value,
673 timestamp,
674 }],
675 ddmetric_proto::metric_payload::MetricType::Gauge,
676 ),
677 value => {
679 return Err(EncoderError::InvalidMetric {
681 expected: "series",
682 metric_value: value.as_name(),
683 });
684 }
685 };
686
687 Ok(ddmetric_proto::metric_payload::MetricSeries {
688 resources,
689 metric: metric_name,
690 tags,
691 points,
692 r#type: metric_type.into(),
693 unit: "".to_string(),
695 source_type_name,
696 interval: maybe_interval.unwrap_or(0) as i64,
697 metadata,
698 })
699}
700
701fn encode_proto_key_and_message<T, B>(msg: T, tag: u32, buf: &mut B) -> Result<(), EncoderError>
703where
704 T: prost::Message,
705 B: BufMut,
706{
707 prost::encoding::encode_key(tag, prost::encoding::WireType::LengthDelimited, buf);
708
709 msg.encode_length_delimited(buf)
710 .map_err(|_| EncoderError::ProtoEncodingFailed)
711}
712
713fn get_namespaced_name(metric: &Metric, default_namespace: &Option<Arc<str>>) -> String {
714 encode_namespace(
715 metric
716 .namespace()
717 .or_else(|| default_namespace.as_ref().map(|s| s.as_ref())),
718 '.',
719 metric.name(),
720 )
721}
722
723fn encode_tags(tags: &MetricTags) -> Vec<String> {
724 let mut pairs: Vec<_> = tags
725 .iter_all()
726 .map(|(name, value)| match value {
727 Some(value) => format!("{name}:{value}"),
728 None => name.into(),
729 })
730 .collect();
731 pairs.sort();
732 pairs
733}
734
735fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
736 if let Some(ts) = timestamp {
737 ts.timestamp()
738 } else {
739 Utc::now().timestamp()
740 }
741}
742
743fn source_type_to_service(source_type: &str) -> Option<u32> {
745 match source_type {
746 "datadog_agent" => None,
749
750 "apache_metrics" => Some(17),
752 "aws_ecs_metrics" => Some(209),
753 "eventstoredb_metrics" => Some(210),
754 "host_metrics" => Some(211),
755 "internal_metrics" => Some(212),
756 "mongodb_metrics" => Some(111),
757 "nginx_metrics" => Some(117),
758 "open_telemetry" => Some(213),
759 "postgresql_metrics" => Some(128),
760 "prometheus_remote_write" => Some(214),
761 "prometheus_scrape" => Some(215),
762 "statsd" => Some(153),
763
764 "kafka" | "nats" | "redis" | "gcp_pubsub" | "http_client" | "http_server" | "vector"
769 | "pulsar" => Some(0),
770
771 _ => {
776 debug!(
777 "Source {source_type} OriginService value is undefined! This source needs to be properly mapped to a Service value."
778 );
779 Some(0)
780 }
781 }
782}
783
784fn generate_origin_metadata(
789 maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
790 maybe_source_type: Option<&str>,
791 origin_product_value: u32,
792) -> Option<DatadogMetricOriginMetadata> {
793 let no_value = 0;
794
795 if let Some(pass_through) = maybe_pass_through {
804 Some(DatadogMetricOriginMetadata::new(
805 pass_through.product().or(Some(origin_product_value)),
806 pass_through.category().or(Some(ORIGIN_CATEGORY_VALUE)),
807 pass_through.service().or(Some(no_value)),
808 ))
809
810 } else {
812 maybe_source_type.and_then(|source_type| {
813 source_type_to_service(source_type).map(|origin_service_value| {
817 DatadogMetricOriginMetadata::new(
818 Some(origin_product_value),
819 Some(ORIGIN_CATEGORY_VALUE),
820 Some(origin_service_value),
821 )
822 })
823 })
824 }
825}
826
827fn generate_series_metadata(
828 maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
829 maybe_source_type: Option<&str>,
830 origin_product_value: u32,
831) -> Option<DatadogSeriesMetricMetadata> {
832 generate_origin_metadata(maybe_pass_through, maybe_source_type, origin_product_value).map(
833 |origin| DatadogSeriesMetricMetadata {
834 origin: Some(origin),
835 },
836 )
837}
838
839fn generate_series_metrics(
840 metric: &Metric,
841 default_namespace: &Option<Arc<str>>,
842 log_schema: &'static LogSchema,
843 origin_product_value: u32,
844) -> Result<Vec<DatadogSeriesMetric>, EncoderError> {
845 let name = get_namespaced_name(metric, default_namespace);
846
847 let mut tags = metric.tags().cloned().unwrap_or_default();
848 let host = log_schema
849 .host_key()
850 .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default());
851
852 let source_type_name = tags.remove("source_type_name");
853 let device = tags.remove("device");
854 let ts = encode_timestamp(metric.timestamp());
855 let tags = Some(encode_tags(&tags));
856
857 let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);
859
860 let event_metadata = metric.metadata();
861 let metadata = generate_series_metadata(
862 event_metadata.datadog_origin_metadata(),
863 event_metadata.source_type(),
864 origin_product_value,
865 );
866
867 trace!(?metadata, "Generated series metadata.");
868
869 let (points, metric_type) = match metric.value() {
870 MetricValue::Counter { value } => {
871 if let Some(interval) = maybe_interval {
872 let value = *value / (interval as f64);
876 (vec![DatadogPoint(ts, value)], DatadogMetricType::Rate)
877 } else {
878 (vec![DatadogPoint(ts, *value)], DatadogMetricType::Count)
879 }
880 }
881 MetricValue::Set { values } => (
882 vec![DatadogPoint(ts, values.len() as f64)],
883 DatadogMetricType::Gauge,
884 ),
885 MetricValue::Gauge { value } => (vec![DatadogPoint(ts, *value)], DatadogMetricType::Gauge),
886 value => {
888 return Err(EncoderError::InvalidMetric {
889 expected: "series",
890 metric_value: value.as_name(),
891 });
892 }
893 };
894
895 Ok(vec![DatadogSeriesMetric {
896 metric: name,
897 r#type: metric_type,
898 interval: maybe_interval,
899 points,
900 tags,
901 host,
902 source_type_name,
903 device,
904 metadata,
905 }])
906}
907
908impl DatadogMetricsCompression {
909 fn compressor(self) -> Compressor {
910 match self {
911 Self::Zstd => Compression::zstd_default().into(),
912 Self::Zlib => Compression::zlib_default().into(),
913 }
914 }
915
916 const fn max_compressed_size(self, n: usize) -> usize {
923 match self {
924 Self::Zlib => {
925 n + (1 + n.saturating_sub(ZLIB_FRAME_OVERHEAD) / ZLIB_STORED_BLOCK_SIZE)
929 * ZLIB_STORED_BLOCK_OVERHEAD
930 }
931 Self::Zstd => {
932 n + (n >> 8)
935 + if n < ZSTD_SMALL_INPUT_THRESHOLD {
936 (ZSTD_SMALL_INPUT_THRESHOLD - n) >> 11
937 } else {
938 0
939 }
940 }
941 }
942 }
943}
944
945fn write_payload_header(
946 endpoint: DatadogMetricsEndpoint,
947 writer: &mut dyn io::Write,
948) -> io::Result<usize> {
949 match endpoint {
950 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
951 .write_all(SERIES_PAYLOAD_HEADER)
952 .map(|_| SERIES_PAYLOAD_HEADER.len()),
953 _ => Ok(0),
954 }
955}
956
957fn write_payload_delimiter(
958 endpoint: DatadogMetricsEndpoint,
959 writer: &mut dyn io::Write,
960) -> io::Result<usize> {
961 match endpoint {
962 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
963 .write_all(SERIES_PAYLOAD_DELIMITER)
964 .map(|_| SERIES_PAYLOAD_DELIMITER.len()),
965 _ => Ok(0),
966 }
967}
968
969fn write_payload_footer(
970 endpoint: DatadogMetricsEndpoint,
971 writer: &mut dyn io::Write,
972) -> io::Result<usize> {
973 match endpoint {
974 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
975 .write_all(SERIES_PAYLOAD_FOOTER)
976 .map(|_| SERIES_PAYLOAD_FOOTER.len()),
977 _ => Ok(0),
978 }
979}
980
981#[cfg(test)]
982mod tests {
983 use std::io::{self, Write as _};
984 use std::{num::NonZeroU32, sync::Arc};
985
986 use bytes::{BufMut, Bytes, BytesMut};
987 use chrono::{DateTime, TimeZone, Timelike, Utc};
988 use flate2::read::ZlibDecoder;
989 use proptest::{
990 arbitrary::any, collection::btree_map, num::f64::POSITIVE as ARB_POSITIVE_F64, prop_assert,
991 proptest, strategy::Strategy, string::string_regex,
992 };
993 use prost::Message;
994 use vector_lib::{
995 config::{LogSchema, log_schema},
996 event::{
997 DatadogMetricOriginMetadata, EventMetadata, Metric, MetricKind, MetricTags,
998 MetricValue,
999 metric::{MetricSketch, TagValue},
1000 },
1001 metric_tags,
1002 metrics::AgentDDSketch,
1003 };
1004
1005 use super::{
1006 DatadogMetricsEncoder, EncoderError, ddmetric_proto, encode_proto_key_and_message,
1007 encode_tags, encode_timestamp, generate_series_metrics,
1008 get_sketch_payload_sketches_field_number, series_to_proto_message, sketch_to_proto_message,
1009 write_payload_footer, write_payload_header,
1010 };
1011 use crate::{
1012 common::datadog::DatadogMetricType,
1013 sinks::{
1014 datadog::metrics::{
1015 config::{DatadogMetricsCompression, DatadogMetricsEndpoint, SeriesApiVersion},
1016 encoder::{DEFAULT_DD_ORIGIN_PRODUCT_VALUE, ORIGIN_PRODUCT_VALUE},
1017 },
1018 util::{Compression, Compressor},
1019 },
1020 };
1021
1022 const fn max_uncompressed_header_len(endpoint: DatadogMetricsEndpoint) -> usize {
1023 match endpoint {
1024 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => {
1025 super::SERIES_PAYLOAD_HEADER.len() + super::SERIES_PAYLOAD_FOOTER.len()
1026 }
1027 _ => 0,
1028 }
1029 }
1030
1031 fn get_simple_counter() -> Metric {
1032 let value = MetricValue::Counter { value: 3.14 };
1033 Metric::new("basic_counter", MetricKind::Incremental, value).with_timestamp(Some(ts()))
1034 }
1035
1036 fn get_simple_counter_with_metadata(metadata: EventMetadata) -> Metric {
1037 let value = MetricValue::Counter { value: 3.14 };
1038 Metric::new_with_metadata("basic_counter", MetricKind::Incremental, value, metadata)
1039 .with_timestamp(Some(ts()))
1040 }
1041
1042 fn get_simple_rate_counter(value: f64, interval_ms: u32) -> Metric {
1043 let value = MetricValue::Counter { value };
1044 Metric::new("basic_counter", MetricKind::Incremental, value)
1045 .with_timestamp(Some(ts()))
1046 .with_interval_ms(NonZeroU32::new(interval_ms))
1047 }
1048
1049 fn get_simple_sketch() -> Metric {
1050 let mut ddsketch = AgentDDSketch::with_agent_defaults();
1051 ddsketch.insert(3.14);
1052 Metric::new("basic_counter", MetricKind::Incremental, ddsketch.into())
1053 .with_timestamp(Some(ts()))
1054 }
1055
1056 fn get_compressed_empty_series_v1_payload() -> Bytes {
1057 let mut compressor = Compressor::from(Compression::zlib_default());
1058
1059 _ = write_payload_header(
1060 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1061 &mut compressor,
1062 )
1063 .expect("should not fail");
1064 _ = write_payload_footer(
1065 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1066 &mut compressor,
1067 )
1068 .expect("should not fail");
1069
1070 compressor.finish().expect("should not fail").freeze()
1071 }
1072
1073 fn get_compressed_empty_sketches_payload() -> Bytes {
1074 Compressor::from(Compression::zstd_default())
1075 .finish()
1076 .expect("should not fail")
1077 .freeze()
1078 }
1079
1080 fn get_compressed_empty_series_v2_payload() -> Bytes {
1081 Compressor::from(Compression::zstd_default())
1082 .finish()
1083 .expect("should not fail")
1084 .freeze()
1085 }
1086
1087 fn decompress_zlib_payload(payload: Bytes) -> io::Result<Bytes> {
1088 let mut decompressor = ZlibDecoder::new(&payload[..]);
1089 let mut decompressed = BytesMut::new().writer();
1090 io::copy(&mut decompressor, &mut decompressed)?;
1091 Ok(decompressed.into_inner().freeze())
1092 }
1093
1094 fn decompress_zstd_payload(payload: Bytes) -> io::Result<Bytes> {
1095 let decompressed = zstd::decode_all(&payload[..])?;
1096 Ok(Bytes::from(decompressed))
1097 }
1098
1099 fn total_compressed_len(compression: DatadogMetricsCompression, n: usize) -> usize {
1108 let mut state = 0xdeadbeef_cafebabe_u64;
1111 let data: Vec<u8> = (0..n)
1112 .map(|_| {
1113 state ^= state << 13;
1114 state ^= state >> 7;
1115 state ^= state << 17;
1116 state as u8
1117 })
1118 .collect();
1119 let mut compressor = compression.compressor();
1120 compressor.write_all(&data).expect("write should succeed");
1121 compressor.finish().expect("finish should succeed").len()
1122 }
1123
1124 #[test]
1131 fn max_compressed_size_is_upper_bound() {
1132 let test_sizes = [
1134 0, 1, 100, 1_000, 16_383, 16_384, 16_385, 32_767, 32_768, 131_071, 131_072, 131_073,
1135 500_000,
1136 ];
1137
1138 let zlib_frame = total_compressed_len(DatadogMetricsCompression::Zlib, 0);
1139 let zstd_frame = total_compressed_len(DatadogMetricsCompression::Zstd, 0);
1140
1141 let max_slack = |n: usize| n / 100 + 64;
1144
1145 for &n in &test_sizes {
1146 let actual_zlib = total_compressed_len(DatadogMetricsCompression::Zlib, n) - zlib_frame;
1147 let max_zlib = DatadogMetricsCompression::Zlib.max_compressed_size(n);
1148 assert!(
1149 actual_zlib <= max_zlib,
1150 "zlib n={n}: formula underestimates: actual={actual_zlib} > max={max_zlib}"
1151 );
1152 assert!(
1153 max_zlib - actual_zlib <= max_slack(n),
1154 "zlib n={n}: formula overestimates: slack={} > {}",
1155 max_zlib - actual_zlib,
1156 max_slack(n)
1157 );
1158
1159 let actual_zstd = total_compressed_len(DatadogMetricsCompression::Zstd, n) - zstd_frame;
1160 let max_zstd = DatadogMetricsCompression::Zstd.max_compressed_size(n);
1161 assert!(
1162 actual_zstd <= max_zstd,
1163 "zstd n={n}: formula underestimates: actual={actual_zstd} > max={max_zstd}"
1164 );
1165 assert!(
1166 max_zstd - actual_zstd <= max_slack(n),
1167 "zstd n={n}: formula overestimates: slack={} > {}",
1168 max_zstd - actual_zstd,
1169 max_slack(n)
1170 );
1171 }
1172 }
1173
1174 fn ts() -> DateTime<Utc> {
1175 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
1176 .single()
1177 .and_then(|t| t.with_nanosecond(11))
1178 .expect("invalid timestamp")
1179 }
1180
1181 fn tags() -> MetricTags {
1182 metric_tags! {
1183 "normal_tag" => "value",
1184 "true_tag" => "true",
1185 "empty_tag" => TagValue::Bare,
1186 "multi_value" => "one",
1187 "multi_value" => "two",
1188 }
1189 }
1190
1191 fn encode_sketches_normal<B>(
1192 metrics: &[Metric],
1193 default_namespace: &Option<Arc<str>>,
1194 log_schema: &'static LogSchema,
1195 buf: &mut B,
1196 ) where
1197 B: BufMut,
1198 {
1199 let mut sketches = Vec::new();
1200 for metric in metrics {
1201 let MetricValue::Sketch { sketch } = metric.value() else {
1202 panic!("must be sketch")
1203 };
1204 match sketch {
1205 MetricSketch::AgentDDSketch(ddsketch) => {
1206 if let Some(sketch) =
1207 sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema, 14)
1208 {
1209 sketches.push(sketch);
1210 }
1211 }
1212 }
1213 }
1214
1215 let sketch_payload = ddmetric_proto::SketchPayload {
1216 metadata: None,
1217 sketches,
1218 };
1219
1220 sketch_payload.encode(buf).unwrap()
1222 }
1223
1224 #[test]
1225 fn test_encode_tags() {
1226 assert_eq!(
1227 encode_tags(&tags()),
1228 vec![
1229 "empty_tag",
1230 "multi_value:one",
1231 "multi_value:two",
1232 "normal_tag:value",
1233 "true_tag:true",
1234 ]
1235 );
1236 }
1237
1238 #[test]
1239 fn test_encode_timestamp() {
1240 assert_eq!(encode_timestamp(None), Utc::now().timestamp());
1241 assert_eq!(encode_timestamp(Some(ts())), 1542182950);
1242 }
1243
1244 #[test]
1245 fn incorrect_metric_for_endpoint_causes_error() {
1246 let mut sketch_encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None);
1248 let series_result = sketch_encoder.try_encode(get_simple_counter());
1249 assert!(matches!(
1250 series_result.err(),
1251 Some(EncoderError::InvalidMetric { .. })
1252 ));
1253
1254 let mut series_v1_encoder =
1256 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None);
1257 let sketch_result = series_v1_encoder.try_encode(get_simple_sketch());
1258 assert!(matches!(
1259 sketch_result.err(),
1260 Some(EncoderError::InvalidMetric { .. })
1261 ));
1262
1263 let mut series_v2_encoder =
1264 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None);
1265 let sketch_result = series_v2_encoder.try_encode(get_simple_sketch());
1266 assert!(matches!(
1267 sketch_result.err(),
1268 Some(EncoderError::InvalidMetric { .. })
1269 ));
1270 }
1271
1272 #[test]
1273 fn encode_counter_with_interval_as_rate() {
1274 let value = 423.1331;
1280 let interval_ms = 10000;
1281 let rate_counter = get_simple_rate_counter(value, interval_ms);
1282 let expected_value = value / (interval_ms / 1000) as f64;
1283 let expected_interval = interval_ms / 1000;
1284
1285 {
1287 let result = generate_series_metrics(
1289 &rate_counter,
1290 &None,
1291 log_schema(),
1292 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1293 );
1294 assert!(result.is_ok());
1295
1296 let metrics = result.unwrap();
1297 assert_eq!(metrics.len(), 1);
1298
1299 let actual = &metrics[0];
1300 assert_eq!(actual.r#type, DatadogMetricType::Rate);
1301 assert_eq!(actual.interval, Some(expected_interval));
1302 assert_eq!(actual.points.len(), 1);
1303 assert_eq!(actual.points[0].1, expected_value);
1304 }
1305
1306 {
1308 let series_proto = series_to_proto_message(
1309 &rate_counter,
1310 &None,
1311 log_schema(),
1312 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1313 )
1314 .unwrap();
1315 assert_eq!(series_proto.r#type, 2);
1316 assert_eq!(series_proto.interval, expected_interval as i64);
1317 assert_eq!(series_proto.points.len(), 1);
1318 assert_eq!(series_proto.points[0].value, expected_value);
1319 }
1320 }
1321
1322 #[test]
1323 fn encode_non_rate_metric_with_interval() {
1324 let value = 423.1331;
1329 let interval_ms = 10000;
1330
1331 let gauge = Metric::new(
1332 "basic_gauge",
1333 MetricKind::Incremental,
1334 MetricValue::Gauge { value },
1335 )
1336 .with_timestamp(Some(ts()))
1337 .with_interval_ms(NonZeroU32::new(interval_ms));
1338
1339 let expected_value = value; let expected_interval = interval_ms / 1000;
1341
1342 {
1344 let result = generate_series_metrics(
1346 &gauge,
1347 &None,
1348 log_schema(),
1349 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1350 );
1351 assert!(result.is_ok());
1352
1353 let metrics = result.unwrap();
1354 assert_eq!(metrics.len(), 1);
1355
1356 let actual = &metrics[0];
1357 assert_eq!(actual.r#type, DatadogMetricType::Gauge);
1358 assert_eq!(actual.interval, Some(expected_interval));
1359 assert_eq!(actual.points.len(), 1);
1360 assert_eq!(actual.points[0].1, expected_value);
1361 }
1362
1363 {
1365 let series_proto = series_to_proto_message(
1366 &gauge,
1367 &None,
1368 log_schema(),
1369 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1370 )
1371 .unwrap();
1372 assert_eq!(series_proto.r#type, 3);
1373 assert_eq!(series_proto.interval, expected_interval as i64);
1374 assert_eq!(series_proto.points.len(), 1);
1375 assert_eq!(series_proto.points[0].value, expected_value);
1376 }
1377 }
1378
1379 #[test]
1380 fn encode_origin_metadata_pass_through() {
1381 let product = 10;
1382 let category = 11;
1383 let service = 9;
1384
1385 let event_metadata = EventMetadata::default().with_origin_metadata(
1386 DatadogMetricOriginMetadata::new(Some(product), Some(category), Some(service)),
1387 );
1388 let counter = get_simple_counter_with_metadata(event_metadata);
1389
1390 {
1392 let result = generate_series_metrics(
1393 &counter,
1394 &None,
1395 log_schema(),
1396 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1397 );
1398 assert!(result.is_ok());
1399
1400 let metrics = result.unwrap();
1401 assert_eq!(metrics.len(), 1);
1402
1403 let actual = &metrics[0];
1404 let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap();
1405
1406 assert_eq!(generated_origin.product().unwrap(), product);
1407 assert_eq!(generated_origin.category().unwrap(), category);
1408 assert_eq!(generated_origin.service().unwrap(), service);
1409 }
1410 {
1412 let series_proto = series_to_proto_message(
1413 &counter,
1414 &None,
1415 log_schema(),
1416 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1417 )
1418 .unwrap();
1419
1420 let generated_origin = series_proto.metadata.unwrap().origin.unwrap();
1421 assert_eq!(generated_origin.origin_product, product);
1422 assert_eq!(generated_origin.origin_category, category);
1423 assert_eq!(generated_origin.origin_service, service);
1424 }
1425 }
1426
1427 #[test]
1428 fn encode_origin_metadata_vector_sourced() {
1429 let product = *ORIGIN_PRODUCT_VALUE;
1430
1431 let category = 11;
1432 let service = 153;
1433
1434 let mut counter = get_simple_counter();
1435
1436 counter.metadata_mut().set_source_type("statsd");
1437
1438 {
1440 let result = generate_series_metrics(&counter, &None, log_schema(), product);
1441 assert!(result.is_ok());
1442
1443 let metrics = result.unwrap();
1444 assert_eq!(metrics.len(), 1);
1445
1446 let actual = &metrics[0];
1447 let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap();
1448
1449 assert_eq!(generated_origin.product().unwrap(), product);
1450 assert_eq!(generated_origin.category().unwrap(), category);
1451 assert_eq!(generated_origin.service().unwrap(), service);
1452 }
1453 {
1455 let series_proto = series_to_proto_message(
1456 &counter,
1457 &None,
1458 log_schema(),
1459 DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1460 )
1461 .unwrap();
1462
1463 let generated_origin = series_proto.metadata.unwrap().origin.unwrap();
1464 assert_eq!(generated_origin.origin_product, product);
1465 assert_eq!(generated_origin.origin_category, category);
1466 assert_eq!(generated_origin.origin_service, service);
1467 }
1468 }
1469
1470 #[test]
1471 fn encode_single_series_v1_metric_with_default_limits() {
1472 let mut encoder =
1475 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None);
1476 let counter = get_simple_counter();
1477 let expected = counter.clone();
1478
1479 let result = encoder.try_encode(counter);
1481 assert!(result.is_ok());
1482 assert_eq!(result.unwrap(), None);
1483
1484 let result = encoder.finish();
1486 assert!(result.is_ok());
1487
1488 let (_payload, mut processed) = result.unwrap();
1489 assert_eq!(processed.len(), 1);
1490 assert_eq!(expected, processed.pop().unwrap());
1491 }
1492
1493 #[test]
1494 fn encode_single_series_v2_metric_with_default_limits() {
1495 let mut encoder =
1498 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None);
1499 let counter = get_simple_counter();
1500 let expected = counter.clone();
1501
1502 let result = encoder.try_encode(counter);
1504 assert!(result.is_ok());
1505 assert_eq!(result.unwrap(), None);
1506
1507 let result = encoder.finish();
1509 assert!(result.is_ok());
1510
1511 let (_payload, mut processed) = result.unwrap();
1512 assert_eq!(processed.len(), 1);
1513 assert_eq!(expected, processed.pop().unwrap());
1514 }
1515
1516 #[test]
1517 fn encode_single_sketch_metric_with_default_limits() {
1518 let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None);
1521 let sketch = get_simple_sketch();
1522 let expected = sketch.clone();
1523
1524 let result = encoder.try_encode(sketch);
1526 assert!(result.is_ok());
1527 assert_eq!(result.unwrap(), None);
1528
1529 let result = encoder.finish();
1531 assert!(result.is_ok());
1532
1533 let (_payload, mut processed) = result.unwrap();
1534 assert_eq!(processed.len(), 1);
1535 assert_eq!(expected, processed.pop().unwrap());
1536 }
1537
1538 #[test]
1539 fn encode_empty_sketch() {
1540 let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None);
1543 let sketch = Metric::new(
1544 "empty",
1545 MetricKind::Incremental,
1546 AgentDDSketch::with_agent_defaults().into(),
1547 )
1548 .with_timestamp(Some(ts()));
1549 let expected = sketch.clone();
1550
1551 let result = encoder.try_encode(sketch);
1553 assert!(result.is_ok());
1554 assert_eq!(result.unwrap(), None);
1555
1556 let result = encoder.finish();
1558 assert!(result.is_ok());
1559
1560 let (_payload, mut processed) = result.unwrap();
1561 assert_eq!(processed.len(), 1);
1562 assert_eq!(expected, processed.pop().unwrap());
1563 }
1564
1565 #[test]
1566 fn encode_multiple_sketch_metrics_normal_vs_incremental() {
1567 let metrics = vec![
1570 get_simple_sketch(),
1571 get_simple_sketch(),
1572 get_simple_sketch(),
1573 ];
1574
1575 let mut normal_buf = Vec::new();
1576 encode_sketches_normal(&metrics, &None, log_schema(), &mut normal_buf);
1577
1578 let mut incremental_buf = Vec::new();
1579 for metric in &metrics {
1580 match metric.value() {
1581 MetricValue::Sketch { sketch } => match sketch {
1582 MetricSketch::AgentDDSketch(ddsketch) => {
1583 if let Some(sketch_proto) =
1584 sketch_to_proto_message(metric, ddsketch, &None, log_schema(), 14)
1585 {
1586 encode_proto_key_and_message(
1587 sketch_proto,
1588 get_sketch_payload_sketches_field_number(),
1589 &mut incremental_buf,
1590 )
1591 .unwrap();
1592 }
1593 }
1594 },
1595 _ => panic!("should be a sketch"),
1596 }
1597 }
1598
1599 assert_eq!(normal_buf, incremental_buf);
1600 }
1601
1602 #[test]
1603 fn default_payload_limits_are_endpoint_aware() {
1604 let v1 = DatadogMetricsEndpoint::Series(SeriesApiVersion::V1).payload_limits();
1605 assert_eq!(v1.uncompressed, 62_914_560);
1606 assert_eq!(v1.compressed, 3_200_000);
1607
1608 let v2 = DatadogMetricsEndpoint::Series(SeriesApiVersion::V2).payload_limits();
1609 assert_eq!(v2.uncompressed, 5_242_880);
1610 assert_eq!(v2.compressed, 512_000);
1611
1612 let sketches = DatadogMetricsEndpoint::Sketches.payload_limits();
1613 assert_eq!(sketches.uncompressed, 62_914_560);
1614 assert_eq!(sketches.compressed, 3_200_000);
1615 }
1616
1617 #[test]
1618 fn v2_series_default_limits_split_large_batches() {
1619 let mut pending = vec![get_simple_counter(); 120_000];
1622 let mut encoded_batches = 0;
1623 let mut encoded_metrics = 0;
1624
1625 while !pending.is_empty() {
1626 let mut encoder = DatadogMetricsEncoder::new(
1627 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1628 None,
1629 );
1630
1631 let mut next_pending = Vec::new();
1632 let mut hit_limit = false;
1633 for metric in pending.drain(..) {
1634 match encoder.try_encode(metric.clone()) {
1635 Ok(None) => {}
1636 Ok(Some(returned_metric)) => {
1637 hit_limit = true;
1638 next_pending.push(returned_metric);
1639 }
1640 Err(error) => panic!("unexpected encoding error: {error}"),
1641 }
1642 }
1643
1644 let finish_result = encoder.finish();
1645 assert!(finish_result.is_ok());
1646 let (_payload, processed) = finish_result.unwrap();
1647 assert!(
1648 !processed.is_empty(),
1649 "encoder should always make progress for a non-empty batch"
1650 );
1651
1652 encoded_metrics += processed.len();
1653 encoded_batches += 1;
1654
1655 if hit_limit {
1656 assert!(
1657 !next_pending.is_empty(),
1658 "hitting limits should leave metrics to process in the next batch"
1659 );
1660 }
1661
1662 pending = next_pending;
1663 }
1664
1665 assert_eq!(encoded_metrics, 120_000);
1666 assert!(
1667 encoded_batches > 1,
1668 "expected multiple batches for V2 default limits"
1669 );
1670 }
1671
1672 #[test]
1673 fn encode_series_breaks_out_when_limit_reached_uncompressed() {
1674 let header_len =
1678 max_uncompressed_header_len(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1));
1679 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1680 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1681 None,
1682 header_len + 1,
1683 usize::MAX,
1684 );
1685
1686 let counter = get_simple_counter();
1690 let result = encoder.try_encode(counter.clone());
1691 assert!(result.is_ok());
1692 assert_eq!(result.unwrap(), Some(counter));
1693
1694 let result = encoder.finish();
1698 assert!(result.is_ok());
1699
1700 let (payload, processed) = result.unwrap();
1701 assert_eq!(
1702 payload.uncompressed_byte_size,
1703 max_uncompressed_header_len(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1))
1704 );
1705 assert_eq!(
1706 payload.into_payload(),
1707 get_compressed_empty_series_v1_payload()
1708 );
1709 assert_eq!(processed.len(), 0);
1710 }
1711
1712 #[test]
1713 fn encode_sketches_breaks_out_when_limit_reached_uncompressed() {
1714 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1718 DatadogMetricsEndpoint::Sketches,
1719 None,
1720 1,
1721 usize::MAX,
1722 );
1723
1724 let sketch = get_simple_sketch();
1728 let result = encoder.try_encode(sketch.clone());
1729 assert!(result.is_ok());
1730 assert_eq!(result.unwrap(), Some(sketch));
1731
1732 let result = encoder.finish();
1735 assert!(result.is_ok());
1736
1737 let (payload, processed) = result.unwrap();
1738 assert_eq!(payload.uncompressed_byte_size, 0);
1739 assert_eq!(
1740 payload.into_payload(),
1741 get_compressed_empty_sketches_payload()
1742 );
1743 assert_eq!(processed.len(), 0);
1744 }
1745
1746 #[test]
1747 fn encode_series_breaks_out_when_limit_reached_compressed() {
1748 let uncompressed_limit = 128;
1752 let compressed_limit = 32;
1753 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1754 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1755 None,
1756 uncompressed_limit,
1757 compressed_limit,
1758 );
1759
1760 let counter = get_simple_counter();
1764 let result = encoder.try_encode(counter.clone());
1765 assert!(result.is_ok());
1766 assert_eq!(result.unwrap(), Some(counter));
1767
1768 let result = encoder.finish();
1772 assert!(result.is_ok());
1773
1774 let (payload, processed) = result.unwrap();
1775 assert_eq!(
1776 payload.uncompressed_byte_size,
1777 max_uncompressed_header_len(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1))
1778 );
1779 assert_eq!(
1780 payload.into_payload(),
1781 get_compressed_empty_series_v1_payload()
1782 );
1783 assert_eq!(processed.len(), 0);
1784 }
1785
1786 #[test]
1787 fn encode_sketches_breaks_out_when_limit_reached_compressed() {
1788 let uncompressed_limit = 128;
1792 let compressed_limit = 32;
1793 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1794 DatadogMetricsEndpoint::Sketches,
1795 None,
1796 uncompressed_limit,
1797 compressed_limit,
1798 );
1799
1800 let sketch = get_simple_sketch();
1804 let result = encoder.try_encode(sketch.clone());
1805 assert!(result.is_ok());
1806 assert_eq!(result.unwrap(), Some(sketch));
1807
1808 let result = encoder.finish();
1812 assert!(result.is_ok());
1813
1814 let (payload, processed) = result.unwrap();
1815 assert_eq!(payload.uncompressed_byte_size, 0);
1816 assert_eq!(
1817 payload.into_payload(),
1818 get_compressed_empty_sketches_payload()
1819 );
1820 assert_eq!(processed.len(), 0);
1821 }
1822
1823 #[test]
1824 fn encode_series_v2_breaks_out_when_limit_reached_compressed() {
1825 let uncompressed_limit = 128;
1829 let compressed_limit = 32;
1830 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1831 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1832 None,
1833 uncompressed_limit,
1834 compressed_limit,
1835 );
1836
1837 let counter = get_simple_counter();
1841 let result = encoder.try_encode(counter.clone());
1842 assert!(result.is_ok());
1843 assert_eq!(result.unwrap(), Some(counter));
1844
1845 let result = encoder.finish();
1849 assert!(result.is_ok());
1850
1851 let (payload, processed) = result.unwrap();
1852 assert_eq!(payload.uncompressed_byte_size, 0);
1853 assert_eq!(
1854 payload.into_payload(),
1855 get_compressed_empty_series_v2_payload()
1856 );
1857 assert_eq!(processed.len(), 0);
1858 }
1859
1860 #[test]
1861 fn zstd_v2_payload_never_exceeds_512kb_with_incompressible_data() {
1862 const PRINTABLE_START: u8 = 0x21;
1884 const PRINTABLE_END: u8 = 0x7E;
1885 const PRINTABLE_LEN: u64 = (PRINTABLE_END - PRINTABLE_START + 1) as u64; let mut xor_state = 0xdeadbeef_cafebabe_u64;
1887 let mut next_name = || -> String {
1888 std::iter::once('m')
1889 .chain((0..4999).map(|_| {
1890 xor_state ^= xor_state << 13;
1891 xor_state ^= xor_state >> 7;
1892 xor_state ^= xor_state << 17;
1893 (PRINTABLE_START + (xor_state % PRINTABLE_LEN) as u8) as char
1894 }))
1895 .collect()
1896 };
1897
1898 let mut encoder =
1899 DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None);
1900
1901 let mut accepted = 0usize;
1902 loop {
1903 let metric = Metric::new(
1904 next_name(),
1905 MetricKind::Incremental,
1906 MetricValue::Counter {
1907 value: (accepted + 1) as f64,
1908 },
1909 )
1910 .with_timestamp(Some(ts()));
1911
1912 match encoder.try_encode(metric) {
1913 Ok(None) => accepted += 1,
1914 Ok(Some(_)) => break,
1915 Err(e) => panic!("unexpected encoding error: {e}"),
1916 }
1917 }
1918
1919 assert!(accepted > 0, "at least one metric must be accepted");
1920
1921 let compressed_limit = DatadogMetricsEndpoint::Series(SeriesApiVersion::V2)
1922 .payload_limits()
1923 .compressed;
1924
1925 let (payload, _) = encoder.finish().unwrap_or_else(|e| {
1926 panic!(
1927 "finish() returned an error after {accepted} metrics — \
1928 the capacity estimate failed to prevent overflow: {e:?}"
1929 )
1930 });
1931 let payload_len = payload.into_payload().len();
1932
1933 assert!(
1935 payload_len <= compressed_limit,
1936 "payload ({payload_len} bytes) exceeded the {compressed_limit}-byte compressed limit"
1937 );
1938
1939 let min_utilization = compressed_limit * 95 / 100;
1941 assert!(
1942 payload_len > min_utilization,
1943 "payload ({payload_len} bytes) is below 95% of the {compressed_limit}-byte limit \
1944 ({min_utilization} bytes) — estimate may be over-conservative"
1945 );
1946 }
1947
1948 #[test]
1949 fn compressed_limit_is_respected_regardless_of_compressor_internal_buffering() {
1950 let compressed_limit = 512;
1966 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1967 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1968 None,
1969 1_000_000,
1970 compressed_limit,
1971 );
1972
1973 let mut accepted = 0;
1974 for i in 0..100 {
1975 let metric = Metric::new(
1976 format!("counter_{i:0>20}"),
1977 MetricKind::Incremental,
1978 MetricValue::Counter {
1979 value: (i + 1) as f64,
1980 },
1981 )
1982 .with_timestamp(Some(ts()));
1983 match encoder.try_encode(metric) {
1984 Ok(None) => accepted += 1,
1985 Ok(Some(_)) => break,
1986 Err(e) => panic!("unexpected encoding error: {e}"),
1987 }
1988 }
1989
1990 assert!(accepted > 0, "encoder should accept at least one metric");
1991 assert!(
1992 accepted < 10,
1993 "encoder accepted too many metrics — compressed limit was likely not enforced (accepted={accepted})"
1994 );
1995
1996 let result = encoder.finish();
1997 assert!(
1998 result.is_ok(),
1999 "finish() must not return TooLarge: {:?}",
2000 result.err()
2001 );
2002 let (payload, _) = result.unwrap();
2003 assert!(
2004 payload.into_payload().len() <= compressed_limit,
2005 "payload exceeded compressed_limit"
2006 );
2007 }
2008
2009 #[test]
2010 fn zstd_buffered_bound_resets_to_last_metric_size_after_block_flush() {
2011 let make_metric = |i: usize| {
2031 Metric::new(
2035 format!("counter_{i:0>600}"),
2036 MetricKind::Incremental,
2037 MetricValue::Counter {
2038 value: (i + 1) as f64,
2039 },
2040 )
2041 .with_timestamp(Some(ts()))
2042 };
2043
2044 let metric_size = {
2046 let mut probe = DatadogMetricsEncoder::with_payload_limits(
2047 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
2048 None,
2049 usize::MAX,
2050 usize::MAX,
2051 );
2052 assert!(
2053 probe.try_encode(make_metric(0)).unwrap().is_none(),
2054 "first metric must be accepted"
2055 );
2056 probe.buffered_bound()
2057 };
2058 assert!(metric_size > 0, "encoded metric must be non-empty");
2059
2060 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
2062 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
2063 None,
2064 usize::MAX,
2065 usize::MAX,
2066 );
2067
2068 let mut prev_buffered = 0usize;
2069 let mut flush_seen = false;
2070
2071 for i in 0..1000 {
2072 let metric = make_metric(i);
2073 match encoder.try_encode(metric) {
2074 Ok(None) => {}
2075 Ok(Some(_)) => panic!("unexpected rejection at i={i} with unlimited limits"),
2076 Err(e) => panic!("unexpected error at i={i}: {e}"),
2077 }
2078
2079 let curr = encoder.buffered_bound();
2080
2081 if curr < prev_buffered {
2082 assert_eq!(
2084 curr, metric_size,
2085 "after block flush, buffered_bound should reset to one metric's encoded size \
2086 ({metric_size} bytes) but got {curr}"
2087 );
2088 flush_seen = true;
2089 break;
2090 }
2091
2092 prev_buffered = curr;
2093 }
2094
2095 assert!(
2096 flush_seen,
2097 "no zstd block flush detected after 1000 metrics — increase loop bound or metric size"
2098 );
2099 }
2100
2101 fn arb_counter_metric() -> impl Strategy<Value = Metric> {
2102 let name = string_regex("[a-zA-Z][a-zA-Z0-9_]{8,96}").expect("regex should not be invalid");
2103 let value = ARB_POSITIVE_F64;
2104 let tags = btree_map(
2105 any::<u64>().prop_map(|v| v.to_string()),
2106 any::<u64>().prop_map(|v| v.to_string()),
2107 0..64,
2108 )
2109 .prop_map(|tags| (!tags.is_empty()).then(|| MetricTags::from(tags)));
2110
2111 (name, value, tags).prop_map(|(metric_name, metric_value, metric_tags)| {
2112 let metric_value = MetricValue::Counter {
2113 value: metric_value,
2114 };
2115 Metric::new(metric_name, MetricKind::Incremental, metric_value).with_tags(metric_tags)
2116 })
2117 }
2118
2119 proptest! {
2120 #[test]
2121 fn encoding_check_for_payload_limit_edge_cases_v1(
2122 uncompressed_limit in 1..64_000_000usize,
2123 compressed_limit in 1..10_000_000usize,
2124 metric in arb_counter_metric(),
2125 ) {
2126 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
2133 DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
2134 None,
2135 uncompressed_limit,
2136 compressed_limit,
2137 );
2138 _ = encoder.try_encode(metric);
2139
2140 if let Ok((payload, _processed)) = encoder.finish() {
2141 let payload = payload.into_payload();
2142 prop_assert!(payload.len() <= compressed_limit);
2143
2144 let result = decompress_zlib_payload(payload);
2146 prop_assert!(result.is_ok());
2147
2148 let decompressed = result.unwrap();
2149 prop_assert!(decompressed.len() <= uncompressed_limit);
2150 }
2151 }
2152
2153 #[test]
2154 fn encoding_check_for_payload_limit_edge_cases_v2(
2155 uncompressed_limit in 1..10_000_000usize,
2156 compressed_limit in 1..1_000_000usize,
2157 metric in arb_counter_metric(),
2158 ) {
2159 let mut encoder = DatadogMetricsEncoder::with_payload_limits(
2160 DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
2161 None,
2162 uncompressed_limit,
2163 compressed_limit,
2164 );
2165 _ = encoder.try_encode(metric);
2166
2167 if let Ok((payload, _processed)) = encoder.finish() {
2168 let payload = payload.into_payload();
2169 prop_assert!(payload.len() <= compressed_limit);
2170
2171 let result = decompress_zstd_payload(payload);
2173 prop_assert!(result.is_ok());
2174
2175 let decompressed = result.unwrap();
2176 prop_assert!(decompressed.len() <= uncompressed_limit);
2177 }
2178 }
2179 }
2180}