vector/sinks/datadog/metrics/
encoder.rs

1use std::{
2    cmp,
3    io::{self, Write},
4    mem,
5    sync::{Arc, LazyLock, OnceLock},
6};
7
8use bytes::{BufMut, Bytes};
9use chrono::{DateTime, Utc};
10use snafu::{ResultExt, Snafu};
11use vector_lib::{
12    EstimatedJsonEncodedSizeOf,
13    config::{LogSchema, log_schema, telemetry},
14    event::{DatadogMetricOriginMetadata, Metric, MetricTags, MetricValue, metric::MetricSketch},
15    metrics::AgentDDSketch,
16    request_metadata::GroupedCountByteSize,
17};
18
19use vector_common::constants::{
20    ZLIB_FRAME_OVERHEAD, ZLIB_STORED_BLOCK_OVERHEAD, ZLIB_STORED_BLOCK_SIZE,
21    ZSTD_SMALL_INPUT_THRESHOLD,
22};
23
24use super::config::{DatadogMetricsCompression, DatadogMetricsEndpoint, SeriesApiVersion};
25use crate::{
26    common::datadog::{
27        DatadogMetricType, DatadogPoint, DatadogSeriesMetric, DatadogSeriesMetricMetadata,
28    },
29    proto::fds::protobuf_descriptors,
30    sinks::util::{Compression, Compressor, encode_namespace, request_builder::EncodeResult},
31};
32
33const SERIES_PAYLOAD_HEADER: &[u8] = b"{\"series\":[";
34const SERIES_PAYLOAD_FOOTER: &[u8] = b"]}";
35const SERIES_PAYLOAD_DELIMITER: &[u8] = b",";
36
37pub(super) const ORIGIN_CATEGORY_VALUE: u32 = 11;
38
39const DEFAULT_DD_ORIGIN_PRODUCT_VALUE: u32 = 14;
40
41pub(super) static ORIGIN_PRODUCT_VALUE: LazyLock<u32> = LazyLock::new(|| {
42    option_env!("DD_ORIGIN_PRODUCT")
43        .map(|p| {
44            p.parse::<u32>()
45                .expect("Env var DD_ORIGIN_PRODUCT must be an unsigned 32 bit integer.")
46        })
47        .unwrap_or(DEFAULT_DD_ORIGIN_PRODUCT_VALUE)
48});
49
50#[allow(warnings, clippy::pedantic, clippy::nursery)]
51mod ddmetric_proto {
52    include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
53}
54
55#[derive(Debug, Snafu)]
56pub enum EncoderError {
57    #[snafu(display(
58        "Invalid metric value '{}' was given; '{}' expected",
59        metric_value,
60        expected
61    ))]
62    InvalidMetric {
63        expected: &'static str,
64        metric_value: &'static str,
65    },
66
67    #[snafu(
68        context(false),
69        display("Failed to encode series metric to JSON: {source}")
70    )]
71    JsonEncodingFailed { source: serde_json::Error },
72
73    // Currently, the only time `prost` ever emits `EncodeError` is when there is insufficient
74    // buffer capacity, so we don't need to hold on to the error, and we can just hardcode this.
75    #[snafu(display(
76        "Failed to encode sketch metric to Protocol Buffers: insufficient buffer capacity."
77    ))]
78    ProtoEncodingFailed,
79}
80
81impl EncoderError {
82    /// Gets the telemetry-friendly string version of this error.
83    ///
84    /// The value will be a short string with only lowercase letters and underscores.
85    pub const fn as_error_type(&self) -> &'static str {
86        match self {
87            Self::InvalidMetric { .. } => "invalid_metric",
88            Self::JsonEncodingFailed { .. } => "failed_to_encode_series",
89            Self::ProtoEncodingFailed => "failed_to_encode_sketch",
90        }
91    }
92}
93
94#[derive(Debug, Snafu)]
95pub enum FinishError {
96    #[snafu(display(
97        "Failure occurred during writing to or finalizing the compressor: {}",
98        source
99    ))]
100    CompressionFailed { source: io::Error },
101
102    #[snafu(display("Finished payload exceeded the (un)compressed size limits"))]
103    TooLarge {
104        metrics: Vec<Metric>,
105        recommended_splits: usize,
106    },
107}
108
109impl FinishError {
110    /// Gets the telemetry-friendly string version of this error.
111    ///
112    /// The value will be a short string with only lowercase letters and underscores.
113    pub const fn as_error_type(&self) -> &'static str {
114        match self {
115            Self::CompressionFailed { .. } => "compression_failed",
116            Self::TooLarge { .. } => "too_large",
117        }
118    }
119}
120
121struct EncoderState {
122    writer: Compressor,
123    written: usize,
124    /// Upper bound on uncompressed bytes sitting in the compressor's internal buffer (written but
125    /// not yet flushed to `writer.get_ref()`).  All compressors may buffer internally: zstd holds
126    /// up to 128 KB per block, zlib's BufWriter holds up to 4 KB.  Since `get_ref().len()` only
127    /// reflects bytes that have been flushed through all layers, we track this bound to avoid
128    /// underestimating the compressed payload size.
129    ///
130    /// Increases by `n` on each write. Resets to `n` when a new compressed block is detected in
131    /// `writer.get_ref()` (the triggering write may straddle the block boundary, so `n` is a safe
132    /// upper bound on what remains buffered after the flush).
133    buffered_bound: usize,
134    buf: Vec<u8>,
135    processed: Vec<Metric>,
136    byte_size: GroupedCountByteSize,
137}
138
139impl Default for EncoderState {
140    fn default() -> Self {
141        Self {
142            writer: Compression::zlib_default().into(),
143            written: 0,
144            buffered_bound: 0,
145            buf: Vec::with_capacity(1024),
146            processed: Vec::new(),
147            byte_size: telemetry().create_request_count_byte_size(),
148        }
149    }
150}
151
152pub struct DatadogMetricsEncoder {
153    endpoint: DatadogMetricsEndpoint,
154    default_namespace: Option<Arc<str>>,
155    uncompressed_limit: usize,
156    compressed_limit: usize,
157
158    state: EncoderState,
159    log_schema: &'static LogSchema,
160
161    origin_product_value: u32,
162}
163
164impl DatadogMetricsEncoder {
165    /// Creates a new `DatadogMetricsEncoder` for the given endpoint.
166    pub fn new(endpoint: DatadogMetricsEndpoint, default_namespace: Option<String>) -> Self {
167        let payload_limits = endpoint.payload_limits();
168
169        Self {
170            endpoint,
171            default_namespace: default_namespace.map(Arc::from),
172            uncompressed_limit: payload_limits.uncompressed,
173            compressed_limit: payload_limits.compressed,
174            state: EncoderState {
175                writer: endpoint.compression().compressor(),
176                ..Default::default()
177            },
178            log_schema: log_schema(),
179            origin_product_value: *ORIGIN_PRODUCT_VALUE,
180        }
181    }
182}
183
184#[cfg(test)]
185impl DatadogMetricsEncoder {
186    /// Creates a new `DatadogMetricsEncoder` for the given endpoint, with specific payload limits.
187    ///
188    /// Only available in tests; production code always uses the API-defined limits via `new`.
189    pub fn with_payload_limits(
190        endpoint: DatadogMetricsEndpoint,
191        default_namespace: Option<String>,
192        uncompressed_limit: usize,
193        compressed_limit: usize,
194    ) -> Self {
195        Self {
196            endpoint,
197            default_namespace: default_namespace.map(Arc::from),
198            uncompressed_limit,
199            compressed_limit,
200            state: EncoderState {
201                writer: endpoint.compression().compressor(),
202                ..Default::default()
203            },
204            log_schema: log_schema(),
205            origin_product_value: *ORIGIN_PRODUCT_VALUE,
206        }
207    }
208
209    /// Returns the current `buffered_bound` value for white-box testing of zstd block-flush reset.
210    fn buffered_bound(&self) -> usize {
211        self.state.buffered_bound
212    }
213}
214
215impl DatadogMetricsEncoder {
216    fn reset_state(&mut self) -> EncoderState {
217        let new_state = EncoderState {
218            writer: self.endpoint.compression().compressor(),
219            ..Default::default()
220        };
221        mem::replace(&mut self.state, new_state)
222    }
223
224    fn encode_single_metric(&mut self, metric: Metric) -> Result<Option<Metric>, EncoderError> {
225        // We take special care in this method to capture errors which are not indicative of the
226        // metric itself causing a failure in order to be able to return the metric to the caller.
227        // The contract of the encoder is such that when an encoding attempt fails for normal
228        // reasons, like being out of room, we give back the metric so the caller can finalize the
229        // previously encoded metrics and then reset and try again to encode.
230        //
231        // If the encoder is in a persistent bad state, they'll get back a proper error when calling
232        // `finish`, so they eventually get an error, we just make sure they can tidy up before that
233        // and avoid needlessly dropping metrics due to unrelated errors.
234
235        // Clear our temporary buffer before any encoding.
236        self.state.buf.clear();
237
238        self.state
239            .byte_size
240            .add_event(&metric, metric.estimated_json_encoded_size_of());
241
242        // For V2 Series metrics, and Sketches: We encode a single Series or Sketch metric incrementally,
243        // which means that we specifically write it as if we were writing a single field entry in the
244        // overall `SketchPayload` message or `MetricPayload` type.
245        //
246        // By doing so, we can encode multiple metrics and concatenate all the buffers, and have the
247        // resulting buffer appear as if it's a normal `<>Payload` message with a bunch of repeats
248        // of the `sketches` / `series` field.
249        //
250        // Crucially, this code works because `SketchPayload` has two fields -- metadata and sketches --
251        // and we never actually set the metadata field... so the resulting message generated overall
252        // for `SketchPayload` with a single sketch looks just like as if we literally wrote out a
253        // single value for the given field.
254        //
255        // Similarly, `MetricPayload` has a single repeated `series` field.
256
257        match self.endpoint {
258            // V1 Series metrics are encoded via JSON, in an incremental fashion.
259            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => {
260                // A single `Metric` might generate multiple Datadog series metrics.
261                let all_series = generate_series_metrics(
262                    &metric,
263                    &self.default_namespace,
264                    self.log_schema,
265                    self.origin_product_value,
266                )?;
267
268                // We handle adding the JSON array separator (comma) manually since the encoding is
269                // happening incrementally.
270                let has_processed = !self.state.processed.is_empty();
271                for (i, series) in all_series.iter().enumerate() {
272                    // Add a array delimiter if we already have other metrics encoded.
273                    if (has_processed || i > 0)
274                        && write_payload_delimiter(self.endpoint, &mut self.state.buf).is_err()
275                    {
276                        return Ok(Some(metric));
277                    }
278                    serde_json::to_writer(&mut self.state.buf, series)?;
279                }
280            }
281            // V2 Series metrics are encoded via ProtoBuf, in an incremental fashion.
282            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => match metric.value() {
283                MetricValue::Counter { .. }
284                | MetricValue::Gauge { .. }
285                | MetricValue::Set { .. }
286                | MetricValue::AggregatedSummary { .. } => {
287                    let series_proto = series_to_proto_message(
288                        &metric,
289                        &self.default_namespace,
290                        self.log_schema,
291                        self.origin_product_value,
292                    )?;
293
294                    encode_proto_key_and_message(
295                        series_proto,
296                        get_series_payload_series_field_number(),
297                        &mut self.state.buf,
298                    )?;
299                }
300                value => {
301                    return Err(EncoderError::InvalidMetric {
302                        expected: "series",
303                        metric_value: value.as_name(),
304                    });
305                }
306            },
307            // Sketches are encoded via ProtoBuf, also in an incremental fashion.
308            DatadogMetricsEndpoint::Sketches => match metric.value() {
309                MetricValue::Sketch { sketch } => match sketch {
310                    MetricSketch::AgentDDSketch(ddsketch) => {
311                        if let Some(sketch_proto) = sketch_to_proto_message(
312                            &metric,
313                            ddsketch,
314                            &self.default_namespace,
315                            self.log_schema,
316                            self.origin_product_value,
317                        ) {
318                            encode_proto_key_and_message(
319                                sketch_proto,
320                                get_sketch_payload_sketches_field_number(),
321                                &mut self.state.buf,
322                            )?;
323                        } else {
324                            // If the sketch was empty, that's fine too
325                        }
326                    }
327                },
328                value => {
329                    return Err(EncoderError::InvalidMetric {
330                        expected: "sketches",
331                        metric_value: value.as_name(),
332                    });
333                }
334            },
335        }
336
337        // Try and see if our temporary buffer can be written to the compressor.
338        match self.try_compress_buffer() {
339            Err(_) | Ok(false) => Ok(Some(metric)),
340            Ok(true) => {
341                self.state.processed.push(metric);
342                Ok(None)
343            }
344        }
345    }
346
347    fn try_compress_buffer(&mut self) -> io::Result<bool> {
348        let n = self.state.buf.len();
349
350        // If we're over our uncompressed size limit with this metric, inform the caller.
351        if self.state.written + n > self.uncompressed_limit {
352            return Ok(false);
353        }
354
355        // Calculating the compressed size is slightly more tricky, because we can only speculate
356        // about how many bytes it would take when compressed.  If we write into the compressor, we
357        // can't back out that write, even if we manually modify the underlying Vec<u8>, as the
358        // compressor might have internal state around checksums, etc, that can't be similarly
359        // rolled back.
360        //
361        // Strategy: split the estimate into two parts:
362        //   1. Bytes already flushed to the output buffer (`get_ref().len()`) — exact compressed size.
363        //   2. Bytes still in the compressor's internal buffer plus this new metric — estimated via
364        //      max_compressed_size(buffered_bound + n) (worst-case upper bound).
365        //
366        // All compressors may buffer data internally before flushing to the output: zstd buffers
367        // up to 128 KB per block, zlib's BufWriter holds up to 4 KB.  `get_ref().len()` only
368        // reflects bytes that have been flushed through all layers.  We track `buffered_bound` —
369        // an upper bound on uncompressed bytes written but not yet visible in `get_ref()` — and
370        // include it in the estimate for all compressor types.
371        let compression = self.endpoint.compression();
372        let flushed_compressed = self.state.writer.get_ref().len();
373        if flushed_compressed + compression.max_compressed_size(self.state.buffered_bound + n)
374            > self.compressed_limit
375        {
376            return Ok(false);
377        }
378
379        // We should be safe to write our holding buffer to the compressor and store the metric.
380        //
381        // Update buffered_bound: if a new block appeared in the output (flushed_compressed grew),
382        // reset to n — the triggering write may straddle the block boundary, so n is a safe upper
383        // bound on what remains buffered.  Otherwise accumulate.
384        self.state.writer.write_all(&self.state.buf)?;
385        self.state.written += n;
386        if self.state.writer.get_ref().len() > flushed_compressed {
387            self.state.buffered_bound = n;
388        } else {
389            self.state.buffered_bound += n;
390        }
391        Ok(true)
392    }
393
394    /// Attempts to encode a single metric into this encoder.
395    ///
396    /// For some metric types, the metric will be encoded immediately and we will attempt to
397    /// compress it.  For some other metric types, we will store the metric until `finish` is
398    /// called, due to the inability to incrementally encode them.
399    ///
400    /// If the metric could not be encoded into this encoder due to hitting the limits on the size
401    /// of the encoded/compressed payload, it will be returned via `Ok(Some(Metric))`, otherwise `Ok(None)`
402    /// will be returned.
403    ///
404    /// If `Ok(Some(Metric))` is returned, callers must call `finish` to finalize the payload.
405    /// Further calls to `try_encode` without first calling `finish` may or may not succeed.
406    ///
407    /// # Errors
408    ///
409    /// If an error is encountered while attempting to encode the metric, an error variant will be returned.
410    pub fn try_encode(&mut self, metric: Metric) -> Result<Option<Metric>, EncoderError> {
411        // Make sure we've written our header already.
412        if self.state.written == 0 {
413            match write_payload_header(self.endpoint, &mut self.state.writer) {
414                Ok(n) => {
415                    self.state.written += n;
416                    self.state.buffered_bound += n;
417                }
418                Err(_) => return Ok(Some(metric)),
419            }
420        }
421
422        self.encode_single_metric(metric)
423    }
424
425    pub fn finish(&mut self) -> Result<(EncodeResult<Bytes>, Vec<Metric>), FinishError> {
426        // Write any payload footer necessary for the configured endpoint.
427        let n = write_payload_footer(self.endpoint, &mut self.state.writer)
428            .context(CompressionFailedSnafu)?;
429        self.state.written += n;
430
431        let raw_bytes_written = self.state.written;
432        let byte_size = self.state.byte_size.clone();
433
434        // Consume the encoder state so we can do our final checks and return the necessary data.
435        let state = self.reset_state();
436        let payload = state
437            .writer
438            .finish()
439            .context(CompressionFailedSnafu)?
440            .freeze();
441        let processed = state.processed;
442
443        // We should have configured our limits such that if all calls to `try_compress_buffer` have
444        // succeeded up until this point, then our payload should be within the limits after writing
445        // the footer and finishing the compressor.
446        //
447        // We're not only double checking that here, but we're figuring out how much bigger than the
448        // limit the payload is, if it is indeed bigger, so that we can recommend how many splits
449        // should be used to bring the given set of metrics down to chunks that can be encoded
450        // without hitting the limits.
451        let compressed_splits = payload.len() / self.compressed_limit;
452        let uncompressed_splits = state.written / self.uncompressed_limit;
453        let recommended_splits = cmp::max(compressed_splits, uncompressed_splits) + 1;
454
455        if recommended_splits == 1 {
456            // "One" split means no splits needed: our payload didn't exceed either of the limits.
457            Ok((
458                EncodeResult::compressed(payload, raw_bytes_written, byte_size),
459                processed,
460            ))
461        } else {
462            Err(FinishError::TooLarge {
463                metrics: processed,
464                recommended_splits,
465            })
466        }
467    }
468}
469
470fn generate_proto_metadata(
471    maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
472    maybe_source_type: Option<&str>,
473    origin_product_value: u32,
474) -> Option<ddmetric_proto::Metadata> {
475    generate_origin_metadata(maybe_pass_through, maybe_source_type, origin_product_value).map(
476        |origin| {
477            if origin.product().is_none()
478                || origin.category().is_none()
479                || origin.service().is_none()
480            {
481                warn!(
482                    message = "Generated sketch origin metadata should have each field set.",
483                    product = origin.product(),
484                    category = origin.category(),
485                    service = origin.service()
486                );
487            }
488            ddmetric_proto::Metadata {
489                origin: Some(ddmetric_proto::Origin {
490                    origin_product: origin.product().unwrap_or_default(),
491                    origin_category: origin.category().unwrap_or_default(),
492                    origin_service: origin.service().unwrap_or_default(),
493                }),
494            }
495        },
496    )
497}
498
499fn get_sketch_payload_sketches_field_number() -> u32 {
500    static SKETCH_PAYLOAD_SKETCHES_FIELD_NUM: OnceLock<u32> = OnceLock::new();
501    *SKETCH_PAYLOAD_SKETCHES_FIELD_NUM.get_or_init(|| {
502        let descriptors = protobuf_descriptors();
503        let descriptor = descriptors
504            .get_message_by_name("datadog.agentpayload.SketchPayload")
505            .expect("should not fail to find `SketchPayload` message in descriptor pool");
506
507        descriptor
508            .get_field_by_name("sketches")
509            .map(|field| field.number())
510            .expect("`sketches` field must exist in `SketchPayload` message")
511    })
512}
513
514fn get_series_payload_series_field_number() -> u32 {
515    static SERIES_PAYLOAD_SERIES_FIELD_NUM: OnceLock<u32> = OnceLock::new();
516    *SERIES_PAYLOAD_SERIES_FIELD_NUM.get_or_init(|| {
517        let descriptors = protobuf_descriptors();
518        let descriptor = descriptors
519            .get_message_by_name("datadog.agentpayload.MetricPayload")
520            .expect("should not fail to find `MetricPayload` message in descriptor pool");
521
522        descriptor
523            .get_field_by_name("series")
524            .map(|field| field.number())
525            .expect("`series` field must exist in `MetricPayload` message")
526    })
527}
528
529fn sketch_to_proto_message(
530    metric: &Metric,
531    ddsketch: &AgentDDSketch,
532    default_namespace: &Option<Arc<str>>,
533    log_schema: &'static LogSchema,
534    origin_product_value: u32,
535) -> Option<ddmetric_proto::sketch_payload::Sketch> {
536    if ddsketch.is_empty() {
537        return None;
538    }
539
540    let name = get_namespaced_name(metric, default_namespace);
541    let ts = encode_timestamp(metric.timestamp());
542    let mut tags = metric.tags().cloned().unwrap_or_default();
543    let host = log_schema
544        .host_key()
545        .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
546        .unwrap_or_default();
547    let tags = encode_tags(&tags);
548
549    let cnt = ddsketch.count() as i64;
550    let min = ddsketch
551        .min()
552        .expect("min should be present for non-empty sketch");
553    let max = ddsketch
554        .max()
555        .expect("max should be present for non-empty sketch");
556    let avg = ddsketch
557        .avg()
558        .expect("avg should be present for non-empty sketch");
559    let sum = ddsketch
560        .sum()
561        .expect("sum should be present for non-empty sketch");
562
563    let (bins, counts) = ddsketch.bin_map().into_parts();
564    let k = bins.into_iter().map(Into::into).collect();
565    let n = counts.into_iter().map(Into::into).collect();
566
567    let event_metadata = metric.metadata();
568    let metadata = generate_proto_metadata(
569        event_metadata.datadog_origin_metadata(),
570        event_metadata.source_type(),
571        origin_product_value,
572    );
573
574    trace!(?metadata, "Generated sketch metadata.");
575
576    Some(ddmetric_proto::sketch_payload::Sketch {
577        metric: name,
578        tags,
579        host,
580        distributions: Vec::new(),
581        dogsketches: vec![ddmetric_proto::sketch_payload::sketch::Dogsketch {
582            ts,
583            cnt,
584            min,
585            max,
586            avg,
587            sum,
588            k,
589            n,
590        }],
591        metadata,
592    })
593}
594
595fn series_to_proto_message(
596    metric: &Metric,
597    default_namespace: &Option<Arc<str>>,
598    log_schema: &'static LogSchema,
599    origin_product_value: u32,
600) -> Result<ddmetric_proto::metric_payload::MetricSeries, EncoderError> {
601    let metric_name = get_namespaced_name(metric, default_namespace);
602    let mut tags = metric.tags().cloned().unwrap_or_default();
603
604    let mut resources = vec![];
605
606    if let Some(host) = log_schema
607        .host_key()
608        .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
609    {
610        resources.push(ddmetric_proto::metric_payload::Resource {
611            r#type: "host".to_string(),
612            name: host,
613        });
614    }
615
616    // In the `datadog_agent` source, the tag is added as `device` for the V1 endpoint
617    // and `resource.device` for the V2 endpoint.
618    if let Some(device) = tags.remove("device").or(tags.remove("resource.device")) {
619        resources.push(ddmetric_proto::metric_payload::Resource {
620            r#type: "device".to_string(),
621            name: device,
622        });
623    }
624
625    let source_type_name = tags.remove("source_type_name").unwrap_or_default();
626
627    let tags = encode_tags(&tags);
628
629    let event_metadata = metric.metadata();
630    let metadata = generate_proto_metadata(
631        event_metadata.datadog_origin_metadata(),
632        event_metadata.source_type(),
633        origin_product_value,
634    );
635    trace!(?metadata, "Generated MetricSeries metadata.");
636
637    let timestamp = encode_timestamp(metric.timestamp());
638
639    // our internal representation is in milliseconds but the expected output is in seconds
640    let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);
641
642    let (points, metric_type) = match metric.value() {
643        MetricValue::Counter { value } => {
644            if let Some(interval) = maybe_interval {
645                // When an interval is defined, it implies the value should be in a per-second form,
646                // so we need to get back to seconds from our milliseconds-based interval, and then
647                // divide our value by that amount as well.
648                let value = *value / (interval as f64);
649                (
650                    vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }],
651                    ddmetric_proto::metric_payload::MetricType::Rate,
652                )
653            } else {
654                (
655                    vec![ddmetric_proto::metric_payload::MetricPoint {
656                        value: *value,
657                        timestamp,
658                    }],
659                    ddmetric_proto::metric_payload::MetricType::Count,
660                )
661            }
662        }
663        MetricValue::Set { values } => (
664            vec![ddmetric_proto::metric_payload::MetricPoint {
665                value: values.len() as f64,
666                timestamp,
667            }],
668            ddmetric_proto::metric_payload::MetricType::Gauge,
669        ),
670        MetricValue::Gauge { value } => (
671            vec![ddmetric_proto::metric_payload::MetricPoint {
672                value: *value,
673                timestamp,
674            }],
675            ddmetric_proto::metric_payload::MetricType::Gauge,
676        ),
677        // NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization
678        value => {
679            // this case should have already been surfaced by encode_single_metric() so this should never be reached
680            return Err(EncoderError::InvalidMetric {
681                expected: "series",
682                metric_value: value.as_name(),
683            });
684        }
685    };
686
687    Ok(ddmetric_proto::metric_payload::MetricSeries {
688        resources,
689        metric: metric_name,
690        tags,
691        points,
692        r#type: metric_type.into(),
693        // unit is omitted
694        unit: "".to_string(),
695        source_type_name,
696        interval: maybe_interval.unwrap_or(0) as i64,
697        metadata,
698    })
699}
700
701// Manually write the field tag and then encode the Message payload directly as a length-delimited message.
702fn encode_proto_key_and_message<T, B>(msg: T, tag: u32, buf: &mut B) -> Result<(), EncoderError>
703where
704    T: prost::Message,
705    B: BufMut,
706{
707    prost::encoding::encode_key(tag, prost::encoding::WireType::LengthDelimited, buf);
708
709    msg.encode_length_delimited(buf)
710        .map_err(|_| EncoderError::ProtoEncodingFailed)
711}
712
713fn get_namespaced_name(metric: &Metric, default_namespace: &Option<Arc<str>>) -> String {
714    encode_namespace(
715        metric
716            .namespace()
717            .or_else(|| default_namespace.as_ref().map(|s| s.as_ref())),
718        '.',
719        metric.name(),
720    )
721}
722
723fn encode_tags(tags: &MetricTags) -> Vec<String> {
724    let mut pairs: Vec<_> = tags
725        .iter_all()
726        .map(|(name, value)| match value {
727            Some(value) => format!("{name}:{value}"),
728            None => name.into(),
729        })
730        .collect();
731    pairs.sort();
732    pairs
733}
734
735fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
736    if let Some(ts) = timestamp {
737        ts.timestamp()
738    } else {
739        Utc::now().timestamp()
740    }
741}
742
743// Given the vector source type, return the OriginService value associated with that integration, if any.
744fn source_type_to_service(source_type: &str) -> Option<u32> {
745    match source_type {
746        // In order to preserve consistent behavior, we intentionally don't set origin metadata
747        // for the case where the Datadog Agent did not set it.
748        "datadog_agent" => None,
749
750        // These are the sources for which metrics truly originated from this Vector instance.
751        "apache_metrics" => Some(17),
752        "aws_ecs_metrics" => Some(209),
753        "eventstoredb_metrics" => Some(210),
754        "host_metrics" => Some(211),
755        "internal_metrics" => Some(212),
756        "mongodb_metrics" => Some(111),
757        "nginx_metrics" => Some(117),
758        "open_telemetry" => Some(213),
759        "postgresql_metrics" => Some(128),
760        "prometheus_remote_write" => Some(214),
761        "prometheus_scrape" => Some(215),
762        "statsd" => Some(153),
763
764        // These sources are only capable of receiving metrics with the `native` or `native_json` codec.
765        // Generally that means the Origin Metadata will have been set as a pass through.
766        // However, if the upstream Vector instance did not set Origin Metadata (for example if it is an
767        // older version version), we will at least set the OriginProduct and OriginCategory.
768        "kafka" | "nats" | "redis" | "gcp_pubsub" | "http_client" | "http_server" | "vector"
769        | "pulsar" => Some(0),
770
771        // This scenario should not occur- if it does it means we added a source that deals with metrics,
772        // and did not update this function.
773        // But if it does occur, by setting the Service value to be undefined, we at least populate the
774        // OriginProduct and OriginCategory.
775        _ => {
776            debug!(
777                "Source {source_type} OriginService value is undefined! This source needs to be properly mapped to a Service value."
778            );
779            Some(0)
780        }
781    }
782}
783
784/// Determine the correct Origin metadata values to use depending on if they have been
785/// set already upstream or not. The generalized struct `DatadogMetricOriginMetadata` is
786/// utilized in this function, which allows the series and sketch encoding to call and map
787/// the result appropriately for the given protocol they operate on.
788fn generate_origin_metadata(
789    maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
790    maybe_source_type: Option<&str>,
791    origin_product_value: u32,
792) -> Option<DatadogMetricOriginMetadata> {
793    let no_value = 0;
794
795    // An upstream vector source or a transform has set the origin metadata already.
796    // Currently this is only possible by these scenarios:
797    //     - `datadog_agent` source receiving the metadata on ingested metrics
798    //     - `vector` source receiving events with EventMetadata that already has the origins set
799    //     - A metrics source configured with the `native` or `native_json` codecs, where the upstream
800    //       Vector instance enriched the EventMetadata with Origin metadata.
801    //     - `log_to_metric` transform set the OriginService in the EventMetadata when it creates
802    //        the new metric.
803    if let Some(pass_through) = maybe_pass_through {
804        Some(DatadogMetricOriginMetadata::new(
805            pass_through.product().or(Some(origin_product_value)),
806            pass_through.category().or(Some(ORIGIN_CATEGORY_VALUE)),
807            pass_through.service().or(Some(no_value)),
808        ))
809
810    // No metadata has been set upstream
811    } else {
812        maybe_source_type.and_then(|source_type| {
813            // Only set the metadata if the source is a metric source we should set it for.
814            // In order to preserve consistent behavior, we intentionally don't set origin metadata
815            // for the case where the Datadog Agent did not set it.
816            source_type_to_service(source_type).map(|origin_service_value| {
817                DatadogMetricOriginMetadata::new(
818                    Some(origin_product_value),
819                    Some(ORIGIN_CATEGORY_VALUE),
820                    Some(origin_service_value),
821                )
822            })
823        })
824    }
825}
826
827fn generate_series_metadata(
828    maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
829    maybe_source_type: Option<&str>,
830    origin_product_value: u32,
831) -> Option<DatadogSeriesMetricMetadata> {
832    generate_origin_metadata(maybe_pass_through, maybe_source_type, origin_product_value).map(
833        |origin| DatadogSeriesMetricMetadata {
834            origin: Some(origin),
835        },
836    )
837}
838
839fn generate_series_metrics(
840    metric: &Metric,
841    default_namespace: &Option<Arc<str>>,
842    log_schema: &'static LogSchema,
843    origin_product_value: u32,
844) -> Result<Vec<DatadogSeriesMetric>, EncoderError> {
845    let name = get_namespaced_name(metric, default_namespace);
846
847    let mut tags = metric.tags().cloned().unwrap_or_default();
848    let host = log_schema
849        .host_key()
850        .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default());
851
852    let source_type_name = tags.remove("source_type_name");
853    let device = tags.remove("device");
854    let ts = encode_timestamp(metric.timestamp());
855    let tags = Some(encode_tags(&tags));
856
857    // our internal representation is in milliseconds but the expected output is in seconds
858    let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);
859
860    let event_metadata = metric.metadata();
861    let metadata = generate_series_metadata(
862        event_metadata.datadog_origin_metadata(),
863        event_metadata.source_type(),
864        origin_product_value,
865    );
866
867    trace!(?metadata, "Generated series metadata.");
868
869    let (points, metric_type) = match metric.value() {
870        MetricValue::Counter { value } => {
871            if let Some(interval) = maybe_interval {
872                // When an interval is defined, it implies the value should be in a per-second form,
873                // so we need to get back to seconds from our milliseconds-based interval, and then
874                // divide our value by that amount as well.
875                let value = *value / (interval as f64);
876                (vec![DatadogPoint(ts, value)], DatadogMetricType::Rate)
877            } else {
878                (vec![DatadogPoint(ts, *value)], DatadogMetricType::Count)
879            }
880        }
881        MetricValue::Set { values } => (
882            vec![DatadogPoint(ts, values.len() as f64)],
883            DatadogMetricType::Gauge,
884        ),
885        MetricValue::Gauge { value } => (vec![DatadogPoint(ts, *value)], DatadogMetricType::Gauge),
886        // NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization
887        value => {
888            return Err(EncoderError::InvalidMetric {
889                expected: "series",
890                metric_value: value.as_name(),
891            });
892        }
893    };
894
895    Ok(vec![DatadogSeriesMetric {
896        metric: name,
897        r#type: metric_type,
898        interval: maybe_interval,
899        points,
900        tags,
901        host,
902        source_type_name,
903        device,
904        metadata,
905    }])
906}
907
908impl DatadogMetricsCompression {
909    fn compressor(self) -> Compressor {
910        match self {
911            Self::Zstd => Compression::zstd_default().into(),
912            Self::Zlib => Compression::zlib_default().into(),
913        }
914    }
915
916    /// Returns the worst-case compressed size of `n` uncompressed bytes.
917    ///
918    /// For zlib (deflate), the worst case occurs when data is entirely incompressible and stored in
919    /// uncompressed blocks (5 bytes overhead per 16 KB block, as per the DEFLATE spec).
920    ///
921    /// For zstd, this uses the same formula as `ZSTD_compressBound` from the zstd C library.
922    const fn max_compressed_size(self, n: usize) -> usize {
923        match self {
924            Self::Zlib => {
925                // Deflate stores incompressible data in uncompressed blocks, each with fixed
926                // overhead. We subtract the zlib frame from the block count since those bytes
927                // are not stored-block data.
928                n + (1 + n.saturating_sub(ZLIB_FRAME_OVERHEAD) / ZLIB_STORED_BLOCK_SIZE)
929                    * ZLIB_STORED_BLOCK_OVERHEAD
930            }
931            Self::Zstd => {
932                // zstd_safe::compress_bound is not const, so we use the same formula it uses
933                // internally: srcSize + (srcSize >> 8) + small correction for inputs < 128 KB.
934                n + (n >> 8)
935                    + if n < ZSTD_SMALL_INPUT_THRESHOLD {
936                        (ZSTD_SMALL_INPUT_THRESHOLD - n) >> 11
937                    } else {
938                        0
939                    }
940            }
941        }
942    }
943}
944
945fn write_payload_header(
946    endpoint: DatadogMetricsEndpoint,
947    writer: &mut dyn io::Write,
948) -> io::Result<usize> {
949    match endpoint {
950        DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
951            .write_all(SERIES_PAYLOAD_HEADER)
952            .map(|_| SERIES_PAYLOAD_HEADER.len()),
953        _ => Ok(0),
954    }
955}
956
957fn write_payload_delimiter(
958    endpoint: DatadogMetricsEndpoint,
959    writer: &mut dyn io::Write,
960) -> io::Result<usize> {
961    match endpoint {
962        DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
963            .write_all(SERIES_PAYLOAD_DELIMITER)
964            .map(|_| SERIES_PAYLOAD_DELIMITER.len()),
965        _ => Ok(0),
966    }
967}
968
969fn write_payload_footer(
970    endpoint: DatadogMetricsEndpoint,
971    writer: &mut dyn io::Write,
972) -> io::Result<usize> {
973    match endpoint {
974        DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
975            .write_all(SERIES_PAYLOAD_FOOTER)
976            .map(|_| SERIES_PAYLOAD_FOOTER.len()),
977        _ => Ok(0),
978    }
979}
980
981#[cfg(test)]
982mod tests {
983    use std::io::{self, Write as _};
984    use std::{num::NonZeroU32, sync::Arc};
985
986    use bytes::{BufMut, Bytes, BytesMut};
987    use chrono::{DateTime, TimeZone, Timelike, Utc};
988    use flate2::read::ZlibDecoder;
989    use proptest::{
990        arbitrary::any, collection::btree_map, num::f64::POSITIVE as ARB_POSITIVE_F64, prop_assert,
991        proptest, strategy::Strategy, string::string_regex,
992    };
993    use prost::Message;
994    use vector_lib::{
995        config::{LogSchema, log_schema},
996        event::{
997            DatadogMetricOriginMetadata, EventMetadata, Metric, MetricKind, MetricTags,
998            MetricValue,
999            metric::{MetricSketch, TagValue},
1000        },
1001        metric_tags,
1002        metrics::AgentDDSketch,
1003    };
1004
1005    use super::{
1006        DatadogMetricsEncoder, EncoderError, ddmetric_proto, encode_proto_key_and_message,
1007        encode_tags, encode_timestamp, generate_series_metrics,
1008        get_sketch_payload_sketches_field_number, series_to_proto_message, sketch_to_proto_message,
1009        write_payload_footer, write_payload_header,
1010    };
1011    use crate::{
1012        common::datadog::DatadogMetricType,
1013        sinks::{
1014            datadog::metrics::{
1015                config::{DatadogMetricsCompression, DatadogMetricsEndpoint, SeriesApiVersion},
1016                encoder::{DEFAULT_DD_ORIGIN_PRODUCT_VALUE, ORIGIN_PRODUCT_VALUE},
1017            },
1018            util::{Compression, Compressor},
1019        },
1020    };
1021
1022    const fn max_uncompressed_header_len(endpoint: DatadogMetricsEndpoint) -> usize {
1023        match endpoint {
1024            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => {
1025                super::SERIES_PAYLOAD_HEADER.len() + super::SERIES_PAYLOAD_FOOTER.len()
1026            }
1027            _ => 0,
1028        }
1029    }
1030
1031    fn get_simple_counter() -> Metric {
1032        let value = MetricValue::Counter { value: 3.14 };
1033        Metric::new("basic_counter", MetricKind::Incremental, value).with_timestamp(Some(ts()))
1034    }
1035
1036    fn get_simple_counter_with_metadata(metadata: EventMetadata) -> Metric {
1037        let value = MetricValue::Counter { value: 3.14 };
1038        Metric::new_with_metadata("basic_counter", MetricKind::Incremental, value, metadata)
1039            .with_timestamp(Some(ts()))
1040    }
1041
1042    fn get_simple_rate_counter(value: f64, interval_ms: u32) -> Metric {
1043        let value = MetricValue::Counter { value };
1044        Metric::new("basic_counter", MetricKind::Incremental, value)
1045            .with_timestamp(Some(ts()))
1046            .with_interval_ms(NonZeroU32::new(interval_ms))
1047    }
1048
1049    fn get_simple_sketch() -> Metric {
1050        let mut ddsketch = AgentDDSketch::with_agent_defaults();
1051        ddsketch.insert(3.14);
1052        Metric::new("basic_counter", MetricKind::Incremental, ddsketch.into())
1053            .with_timestamp(Some(ts()))
1054    }
1055
1056    fn get_compressed_empty_series_v1_payload() -> Bytes {
1057        let mut compressor = Compressor::from(Compression::zlib_default());
1058
1059        _ = write_payload_header(
1060            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1061            &mut compressor,
1062        )
1063        .expect("should not fail");
1064        _ = write_payload_footer(
1065            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1066            &mut compressor,
1067        )
1068        .expect("should not fail");
1069
1070        compressor.finish().expect("should not fail").freeze()
1071    }
1072
1073    fn get_compressed_empty_sketches_payload() -> Bytes {
1074        Compressor::from(Compression::zstd_default())
1075            .finish()
1076            .expect("should not fail")
1077            .freeze()
1078    }
1079
1080    fn get_compressed_empty_series_v2_payload() -> Bytes {
1081        Compressor::from(Compression::zstd_default())
1082            .finish()
1083            .expect("should not fail")
1084            .freeze()
1085    }
1086
1087    fn decompress_zlib_payload(payload: Bytes) -> io::Result<Bytes> {
1088        let mut decompressor = ZlibDecoder::new(&payload[..]);
1089        let mut decompressed = BytesMut::new().writer();
1090        io::copy(&mut decompressor, &mut decompressed)?;
1091        Ok(decompressed.into_inner().freeze())
1092    }
1093
1094    fn decompress_zstd_payload(payload: Bytes) -> io::Result<Bytes> {
1095        let decompressed = zstd::decode_all(&payload[..])?;
1096        Ok(Bytes::from(decompressed))
1097    }
1098
1099    /// Returns the number of bytes added to the compressor's output buffer after writing `n`
1100    /// bytes of high-entropy data. Measures only the *incremental* bytes, not the frame overhead
1101    /// that `finish()` would append (Adler-32 / empty final block for zlib, end frame for zstd).
1102    ///
1103    /// This mirrors how `try_compress_buffer` uses `max_compressed_size`: it checks how many
1104    /// more compressed bytes would be produced, against the current running output length.
1105    /// Compresses `n` bytes of high-entropy (worst-case for compression) data and returns the
1106    /// total output size after `finish()`.
1107    fn total_compressed_len(compression: DatadogMetricsCompression, n: usize) -> usize {
1108        // Xorshift64 — period 2^64-1, passes BigCrush, produces statistically random bytes
1109        // that neither zlib nor zstd can compress significantly.
1110        let mut state = 0xdeadbeef_cafebabe_u64;
1111        let data: Vec<u8> = (0..n)
1112            .map(|_| {
1113                state ^= state << 13;
1114                state ^= state >> 7;
1115                state ^= state << 17;
1116                state as u8
1117            })
1118            .collect();
1119        let mut compressor = compression.compressor();
1120        compressor.write_all(&data).expect("write should succeed");
1121        compressor.finish().expect("finish should succeed").len()
1122    }
1123
1124    /// Validates that `max_compressed_size(n)` is a true upper bound on the compressed bytes
1125    /// attributable to `n` uncompressed bytes, for both zlib and zstd.
1126    ///
1127    /// We measure `total_compressed_len(n) - total_compressed_len(0)` to strip the fixed frame
1128    /// overhead (header + trailer) written regardless of input size, isolating the bytes
1129    /// contributed by the data itself.
1130    #[test]
1131    fn max_compressed_size_is_upper_bound() {
1132        // zlib stored-block boundary: 16 384 bytes; zstd block boundary: 131 072 bytes.
1133        let test_sizes = [
1134            0, 1, 100, 1_000, 16_383, 16_384, 16_385, 32_767, 32_768, 131_071, 131_072, 131_073,
1135            500_000,
1136        ];
1137
1138        let zlib_frame = total_compressed_len(DatadogMetricsCompression::Zlib, 0);
1139        let zstd_frame = total_compressed_len(DatadogMetricsCompression::Zstd, 0);
1140
1141        // The formula must not overestimate by more than 1% of input + 64 bytes (a small
1142        // constant that covers the zstd correction term for very small inputs).
1143        let max_slack = |n: usize| n / 100 + 64;
1144
1145        for &n in &test_sizes {
1146            let actual_zlib = total_compressed_len(DatadogMetricsCompression::Zlib, n) - zlib_frame;
1147            let max_zlib = DatadogMetricsCompression::Zlib.max_compressed_size(n);
1148            assert!(
1149                actual_zlib <= max_zlib,
1150                "zlib n={n}: formula underestimates: actual={actual_zlib} > max={max_zlib}"
1151            );
1152            assert!(
1153                max_zlib - actual_zlib <= max_slack(n),
1154                "zlib n={n}: formula overestimates: slack={} > {}",
1155                max_zlib - actual_zlib,
1156                max_slack(n)
1157            );
1158
1159            let actual_zstd = total_compressed_len(DatadogMetricsCompression::Zstd, n) - zstd_frame;
1160            let max_zstd = DatadogMetricsCompression::Zstd.max_compressed_size(n);
1161            assert!(
1162                actual_zstd <= max_zstd,
1163                "zstd n={n}: formula underestimates: actual={actual_zstd} > max={max_zstd}"
1164            );
1165            assert!(
1166                max_zstd - actual_zstd <= max_slack(n),
1167                "zstd n={n}: formula overestimates: slack={} > {}",
1168                max_zstd - actual_zstd,
1169                max_slack(n)
1170            );
1171        }
1172    }
1173
1174    fn ts() -> DateTime<Utc> {
1175        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
1176            .single()
1177            .and_then(|t| t.with_nanosecond(11))
1178            .expect("invalid timestamp")
1179    }
1180
1181    fn tags() -> MetricTags {
1182        metric_tags! {
1183            "normal_tag" => "value",
1184            "true_tag" => "true",
1185            "empty_tag" => TagValue::Bare,
1186            "multi_value" => "one",
1187            "multi_value" => "two",
1188        }
1189    }
1190
1191    fn encode_sketches_normal<B>(
1192        metrics: &[Metric],
1193        default_namespace: &Option<Arc<str>>,
1194        log_schema: &'static LogSchema,
1195        buf: &mut B,
1196    ) where
1197        B: BufMut,
1198    {
1199        let mut sketches = Vec::new();
1200        for metric in metrics {
1201            let MetricValue::Sketch { sketch } = metric.value() else {
1202                panic!("must be sketch")
1203            };
1204            match sketch {
1205                MetricSketch::AgentDDSketch(ddsketch) => {
1206                    if let Some(sketch) =
1207                        sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema, 14)
1208                    {
1209                        sketches.push(sketch);
1210                    }
1211                }
1212            }
1213        }
1214
1215        let sketch_payload = ddmetric_proto::SketchPayload {
1216            metadata: None,
1217            sketches,
1218        };
1219
1220        // Now try encoding this sketch payload, and then try to compress it.
1221        sketch_payload.encode(buf).unwrap()
1222    }
1223
1224    #[test]
1225    fn test_encode_tags() {
1226        assert_eq!(
1227            encode_tags(&tags()),
1228            vec![
1229                "empty_tag",
1230                "multi_value:one",
1231                "multi_value:two",
1232                "normal_tag:value",
1233                "true_tag:true",
1234            ]
1235        );
1236    }
1237
1238    #[test]
1239    fn test_encode_timestamp() {
1240        assert_eq!(encode_timestamp(None), Utc::now().timestamp());
1241        assert_eq!(encode_timestamp(Some(ts())), 1542182950);
1242    }
1243
1244    #[test]
1245    fn incorrect_metric_for_endpoint_causes_error() {
1246        // Series metrics can't go to the sketches endpoint.
1247        let mut sketch_encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None);
1248        let series_result = sketch_encoder.try_encode(get_simple_counter());
1249        assert!(matches!(
1250            series_result.err(),
1251            Some(EncoderError::InvalidMetric { .. })
1252        ));
1253
1254        // And sketches can't go to the series endpoint.
1255        let mut series_v1_encoder =
1256            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None);
1257        let sketch_result = series_v1_encoder.try_encode(get_simple_sketch());
1258        assert!(matches!(
1259            sketch_result.err(),
1260            Some(EncoderError::InvalidMetric { .. })
1261        ));
1262
1263        let mut series_v2_encoder =
1264            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None);
1265        let sketch_result = series_v2_encoder.try_encode(get_simple_sketch());
1266        assert!(matches!(
1267            sketch_result.err(),
1268            Some(EncoderError::InvalidMetric { .. })
1269        ));
1270    }
1271
1272    #[test]
1273    fn encode_counter_with_interval_as_rate() {
1274        // When a counter explicitly has an interval, we need to encode it as a rate. This means
1275        // dividing the value by the interval (in seconds) and setting the metric type so that when
1276        // it lands on the DD side, they can multiply the value by the interval (in seconds) and get
1277        // back the correct total value for that time period.
1278
1279        let value = 423.1331;
1280        let interval_ms = 10000;
1281        let rate_counter = get_simple_rate_counter(value, interval_ms);
1282        let expected_value = value / (interval_ms / 1000) as f64;
1283        let expected_interval = interval_ms / 1000;
1284
1285        // series v1
1286        {
1287            // Encode the metric and make sure we did the rate conversion correctly.
1288            let result = generate_series_metrics(
1289                &rate_counter,
1290                &None,
1291                log_schema(),
1292                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1293            );
1294            assert!(result.is_ok());
1295
1296            let metrics = result.unwrap();
1297            assert_eq!(metrics.len(), 1);
1298
1299            let actual = &metrics[0];
1300            assert_eq!(actual.r#type, DatadogMetricType::Rate);
1301            assert_eq!(actual.interval, Some(expected_interval));
1302            assert_eq!(actual.points.len(), 1);
1303            assert_eq!(actual.points[0].1, expected_value);
1304        }
1305
1306        // series v2
1307        {
1308            let series_proto = series_to_proto_message(
1309                &rate_counter,
1310                &None,
1311                log_schema(),
1312                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1313            )
1314            .unwrap();
1315            assert_eq!(series_proto.r#type, 2);
1316            assert_eq!(series_proto.interval, expected_interval as i64);
1317            assert_eq!(series_proto.points.len(), 1);
1318            assert_eq!(series_proto.points[0].value, expected_value);
1319        }
1320    }
1321
1322    #[test]
1323    fn encode_non_rate_metric_with_interval() {
1324        // It is possible that the Agent sends Gauges with an interval set. This
1325        // Occurs when the origin of the metric is Dogstatsd, where the interval
1326        // is set to 10.
1327
1328        let value = 423.1331;
1329        let interval_ms = 10000;
1330
1331        let gauge = Metric::new(
1332            "basic_gauge",
1333            MetricKind::Incremental,
1334            MetricValue::Gauge { value },
1335        )
1336        .with_timestamp(Some(ts()))
1337        .with_interval_ms(NonZeroU32::new(interval_ms));
1338
1339        let expected_value = value; // For gauge, the value should not be modified by interval
1340        let expected_interval = interval_ms / 1000;
1341
1342        // series v1
1343        {
1344            // Encode the metric and make sure we did the rate conversion correctly.
1345            let result = generate_series_metrics(
1346                &gauge,
1347                &None,
1348                log_schema(),
1349                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1350            );
1351            assert!(result.is_ok());
1352
1353            let metrics = result.unwrap();
1354            assert_eq!(metrics.len(), 1);
1355
1356            let actual = &metrics[0];
1357            assert_eq!(actual.r#type, DatadogMetricType::Gauge);
1358            assert_eq!(actual.interval, Some(expected_interval));
1359            assert_eq!(actual.points.len(), 1);
1360            assert_eq!(actual.points[0].1, expected_value);
1361        }
1362
1363        // series v2
1364        {
1365            let series_proto = series_to_proto_message(
1366                &gauge,
1367                &None,
1368                log_schema(),
1369                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1370            )
1371            .unwrap();
1372            assert_eq!(series_proto.r#type, 3);
1373            assert_eq!(series_proto.interval, expected_interval as i64);
1374            assert_eq!(series_proto.points.len(), 1);
1375            assert_eq!(series_proto.points[0].value, expected_value);
1376        }
1377    }
1378
1379    #[test]
1380    fn encode_origin_metadata_pass_through() {
1381        let product = 10;
1382        let category = 11;
1383        let service = 9;
1384
1385        let event_metadata = EventMetadata::default().with_origin_metadata(
1386            DatadogMetricOriginMetadata::new(Some(product), Some(category), Some(service)),
1387        );
1388        let counter = get_simple_counter_with_metadata(event_metadata);
1389
1390        // series v1
1391        {
1392            let result = generate_series_metrics(
1393                &counter,
1394                &None,
1395                log_schema(),
1396                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1397            );
1398            assert!(result.is_ok());
1399
1400            let metrics = result.unwrap();
1401            assert_eq!(metrics.len(), 1);
1402
1403            let actual = &metrics[0];
1404            let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap();
1405
1406            assert_eq!(generated_origin.product().unwrap(), product);
1407            assert_eq!(generated_origin.category().unwrap(), category);
1408            assert_eq!(generated_origin.service().unwrap(), service);
1409        }
1410        // series v2
1411        {
1412            let series_proto = series_to_proto_message(
1413                &counter,
1414                &None,
1415                log_schema(),
1416                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1417            )
1418            .unwrap();
1419
1420            let generated_origin = series_proto.metadata.unwrap().origin.unwrap();
1421            assert_eq!(generated_origin.origin_product, product);
1422            assert_eq!(generated_origin.origin_category, category);
1423            assert_eq!(generated_origin.origin_service, service);
1424        }
1425    }
1426
1427    #[test]
1428    fn encode_origin_metadata_vector_sourced() {
1429        let product = *ORIGIN_PRODUCT_VALUE;
1430
1431        let category = 11;
1432        let service = 153;
1433
1434        let mut counter = get_simple_counter();
1435
1436        counter.metadata_mut().set_source_type("statsd");
1437
1438        // series v1
1439        {
1440            let result = generate_series_metrics(&counter, &None, log_schema(), product);
1441            assert!(result.is_ok());
1442
1443            let metrics = result.unwrap();
1444            assert_eq!(metrics.len(), 1);
1445
1446            let actual = &metrics[0];
1447            let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap();
1448
1449            assert_eq!(generated_origin.product().unwrap(), product);
1450            assert_eq!(generated_origin.category().unwrap(), category);
1451            assert_eq!(generated_origin.service().unwrap(), service);
1452        }
1453        // series v2
1454        {
1455            let series_proto = series_to_proto_message(
1456                &counter,
1457                &None,
1458                log_schema(),
1459                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1460            )
1461            .unwrap();
1462
1463            let generated_origin = series_proto.metadata.unwrap().origin.unwrap();
1464            assert_eq!(generated_origin.origin_product, product);
1465            assert_eq!(generated_origin.origin_category, category);
1466            assert_eq!(generated_origin.origin_service, service);
1467        }
1468    }
1469
1470    #[test]
1471    fn encode_single_series_v1_metric_with_default_limits() {
1472        // This is a simple test where we ensure that a single metric, with the default limits, can
1473        // be encoded without hitting any errors.
1474        let mut encoder =
1475            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None);
1476        let counter = get_simple_counter();
1477        let expected = counter.clone();
1478
1479        // Encode the counter.
1480        let result = encoder.try_encode(counter);
1481        assert!(result.is_ok());
1482        assert_eq!(result.unwrap(), None);
1483
1484        // Finish the payload, make sure we got what we came for.
1485        let result = encoder.finish();
1486        assert!(result.is_ok());
1487
1488        let (_payload, mut processed) = result.unwrap();
1489        assert_eq!(processed.len(), 1);
1490        assert_eq!(expected, processed.pop().unwrap());
1491    }
1492
1493    #[test]
1494    fn encode_single_series_v2_metric_with_default_limits() {
1495        // This is a simple test where we ensure that a single metric, with the default limits, can
1496        // be encoded without hitting any errors.
1497        let mut encoder =
1498            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None);
1499        let counter = get_simple_counter();
1500        let expected = counter.clone();
1501
1502        // Encode the counter.
1503        let result = encoder.try_encode(counter);
1504        assert!(result.is_ok());
1505        assert_eq!(result.unwrap(), None);
1506
1507        // Finish the payload, make sure we got what we came for.
1508        let result = encoder.finish();
1509        assert!(result.is_ok());
1510
1511        let (_payload, mut processed) = result.unwrap();
1512        assert_eq!(processed.len(), 1);
1513        assert_eq!(expected, processed.pop().unwrap());
1514    }
1515
1516    #[test]
1517    fn encode_single_sketch_metric_with_default_limits() {
1518        // This is a simple test where we ensure that a single metric, with the default limits, can
1519        // be encoded without hitting any errors.
1520        let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None);
1521        let sketch = get_simple_sketch();
1522        let expected = sketch.clone();
1523
1524        // Encode the sketch.
1525        let result = encoder.try_encode(sketch);
1526        assert!(result.is_ok());
1527        assert_eq!(result.unwrap(), None);
1528
1529        // Finish the payload, make sure we got what we came for.
1530        let result = encoder.finish();
1531        assert!(result.is_ok());
1532
1533        let (_payload, mut processed) = result.unwrap();
1534        assert_eq!(processed.len(), 1);
1535        assert_eq!(expected, processed.pop().unwrap());
1536    }
1537
1538    #[test]
1539    fn encode_empty_sketch() {
1540        // This is a simple test where we ensure that a single metric, with the default limits, can
1541        // be encoded without hitting any errors.
1542        let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None);
1543        let sketch = Metric::new(
1544            "empty",
1545            MetricKind::Incremental,
1546            AgentDDSketch::with_agent_defaults().into(),
1547        )
1548        .with_timestamp(Some(ts()));
1549        let expected = sketch.clone();
1550
1551        // Encode the sketch.
1552        let result = encoder.try_encode(sketch);
1553        assert!(result.is_ok());
1554        assert_eq!(result.unwrap(), None);
1555
1556        // Finish the payload, make sure we got what we came for.
1557        let result = encoder.finish();
1558        assert!(result.is_ok());
1559
1560        let (_payload, mut processed) = result.unwrap();
1561        assert_eq!(processed.len(), 1);
1562        assert_eq!(expected, processed.pop().unwrap());
1563    }
1564
1565    #[test]
1566    fn encode_multiple_sketch_metrics_normal_vs_incremental() {
1567        // This tests our incremental sketch encoding against the more straightforward approach of
1568        // just building/encoding a full `SketchPayload` message.
1569        let metrics = vec![
1570            get_simple_sketch(),
1571            get_simple_sketch(),
1572            get_simple_sketch(),
1573        ];
1574
1575        let mut normal_buf = Vec::new();
1576        encode_sketches_normal(&metrics, &None, log_schema(), &mut normal_buf);
1577
1578        let mut incremental_buf = Vec::new();
1579        for metric in &metrics {
1580            match metric.value() {
1581                MetricValue::Sketch { sketch } => match sketch {
1582                    MetricSketch::AgentDDSketch(ddsketch) => {
1583                        if let Some(sketch_proto) =
1584                            sketch_to_proto_message(metric, ddsketch, &None, log_schema(), 14)
1585                        {
1586                            encode_proto_key_and_message(
1587                                sketch_proto,
1588                                get_sketch_payload_sketches_field_number(),
1589                                &mut incremental_buf,
1590                            )
1591                            .unwrap();
1592                        }
1593                    }
1594                },
1595                _ => panic!("should be a sketch"),
1596            }
1597        }
1598
1599        assert_eq!(normal_buf, incremental_buf);
1600    }
1601
1602    #[test]
1603    fn default_payload_limits_are_endpoint_aware() {
1604        let v1 = DatadogMetricsEndpoint::Series(SeriesApiVersion::V1).payload_limits();
1605        assert_eq!(v1.uncompressed, 62_914_560);
1606        assert_eq!(v1.compressed, 3_200_000);
1607
1608        let v2 = DatadogMetricsEndpoint::Series(SeriesApiVersion::V2).payload_limits();
1609        assert_eq!(v2.uncompressed, 5_242_880);
1610        assert_eq!(v2.compressed, 512_000);
1611
1612        let sketches = DatadogMetricsEndpoint::Sketches.payload_limits();
1613        assert_eq!(sketches.uncompressed, 62_914_560);
1614        assert_eq!(sketches.compressed, 3_200_000);
1615    }
1616
1617    #[test]
1618    fn v2_series_default_limits_split_large_batches() {
1619        // Simulate a large send and validate that default V2 limits split payloads into multiple
1620        // requests, while still making forward progress each pass.
1621        let mut pending = vec![get_simple_counter(); 120_000];
1622        let mut encoded_batches = 0;
1623        let mut encoded_metrics = 0;
1624
1625        while !pending.is_empty() {
1626            let mut encoder = DatadogMetricsEncoder::new(
1627                DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1628                None,
1629            );
1630
1631            let mut next_pending = Vec::new();
1632            let mut hit_limit = false;
1633            for metric in pending.drain(..) {
1634                match encoder.try_encode(metric.clone()) {
1635                    Ok(None) => {}
1636                    Ok(Some(returned_metric)) => {
1637                        hit_limit = true;
1638                        next_pending.push(returned_metric);
1639                    }
1640                    Err(error) => panic!("unexpected encoding error: {error}"),
1641                }
1642            }
1643
1644            let finish_result = encoder.finish();
1645            assert!(finish_result.is_ok());
1646            let (_payload, processed) = finish_result.unwrap();
1647            assert!(
1648                !processed.is_empty(),
1649                "encoder should always make progress for a non-empty batch"
1650            );
1651
1652            encoded_metrics += processed.len();
1653            encoded_batches += 1;
1654
1655            if hit_limit {
1656                assert!(
1657                    !next_pending.is_empty(),
1658                    "hitting limits should leave metrics to process in the next batch"
1659                );
1660            }
1661
1662            pending = next_pending;
1663        }
1664
1665        assert_eq!(encoded_metrics, 120_000);
1666        assert!(
1667            encoded_batches > 1,
1668            "expected multiple batches for V2 default limits"
1669        );
1670    }
1671
1672    #[test]
1673    fn encode_series_breaks_out_when_limit_reached_uncompressed() {
1674        // We manually create the encoder with an arbitrarily low "uncompressed" limit but high
1675        // "compressed" limit to exercise the codepath that should avoid encoding a metric when the
1676        // uncompressed payload would exceed the limit.
1677        let header_len =
1678            max_uncompressed_header_len(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1));
1679        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1680            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1681            None,
1682            header_len + 1,
1683            usize::MAX,
1684        );
1685
1686        // Trying to encode a metric that would cause us to exceed our uncompressed limits will
1687        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1688        // as it could not be added.
1689        let counter = get_simple_counter();
1690        let result = encoder.try_encode(counter.clone());
1691        assert!(result.is_ok());
1692        assert_eq!(result.unwrap(), Some(counter));
1693
1694        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1695        // this payload, but it will be empty (effectively, the header/footer will exist) and no
1696        // processed metrics should be returned.
1697        let result = encoder.finish();
1698        assert!(result.is_ok());
1699
1700        let (payload, processed) = result.unwrap();
1701        assert_eq!(
1702            payload.uncompressed_byte_size,
1703            max_uncompressed_header_len(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1))
1704        );
1705        assert_eq!(
1706            payload.into_payload(),
1707            get_compressed_empty_series_v1_payload()
1708        );
1709        assert_eq!(processed.len(), 0);
1710    }
1711
1712    #[test]
1713    fn encode_sketches_breaks_out_when_limit_reached_uncompressed() {
1714        // We manually create the encoder with an arbitrarily low "uncompressed" limit but high
1715        // "compressed" limit to exercise the codepath that should avoid encoding a metric when the
1716        // uncompressed payload would exceed the limit.
1717        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1718            DatadogMetricsEndpoint::Sketches,
1719            None,
1720            1,
1721            usize::MAX,
1722        );
1723
1724        // Trying to encode a metric that would cause us to exceed our uncompressed limits will
1725        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1726        // as it could not be added.
1727        let sketch = get_simple_sketch();
1728        let result = encoder.try_encode(sketch.clone());
1729        assert!(result.is_ok());
1730        assert_eq!(result.unwrap(), Some(sketch));
1731
1732        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1733        // this payload, but it will be empty and no processed metrics should be returned.
1734        let result = encoder.finish();
1735        assert!(result.is_ok());
1736
1737        let (payload, processed) = result.unwrap();
1738        assert_eq!(payload.uncompressed_byte_size, 0);
1739        assert_eq!(
1740            payload.into_payload(),
1741            get_compressed_empty_sketches_payload()
1742        );
1743        assert_eq!(processed.len(), 0);
1744    }
1745
1746    #[test]
1747    fn encode_series_breaks_out_when_limit_reached_compressed() {
1748        // We manually create the encoder with an arbitrarily low "compressed" limit but high
1749        // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the
1750        // compressed payload would exceed the limit.
1751        let uncompressed_limit = 128;
1752        let compressed_limit = 32;
1753        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1754            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1755            None,
1756            uncompressed_limit,
1757            compressed_limit,
1758        );
1759
1760        // Trying to encode a metric that would cause us to exceed our compressed limits will
1761        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1762        // as it could not be added.
1763        let counter = get_simple_counter();
1764        let result = encoder.try_encode(counter.clone());
1765        assert!(result.is_ok());
1766        assert_eq!(result.unwrap(), Some(counter));
1767
1768        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1769        // this payload, but it will be empty (effectively, the header/footer will exist) and no
1770        // processed metrics should be returned.
1771        let result = encoder.finish();
1772        assert!(result.is_ok());
1773
1774        let (payload, processed) = result.unwrap();
1775        assert_eq!(
1776            payload.uncompressed_byte_size,
1777            max_uncompressed_header_len(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1))
1778        );
1779        assert_eq!(
1780            payload.into_payload(),
1781            get_compressed_empty_series_v1_payload()
1782        );
1783        assert_eq!(processed.len(), 0);
1784    }
1785
1786    #[test]
1787    fn encode_sketches_breaks_out_when_limit_reached_compressed() {
1788        // We manually create the encoder with an arbitrarily low "compressed" limit but high
1789        // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the
1790        // compressed payload would exceed the limit.
1791        let uncompressed_limit = 128;
1792        let compressed_limit = 32;
1793        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1794            DatadogMetricsEndpoint::Sketches,
1795            None,
1796            uncompressed_limit,
1797            compressed_limit,
1798        );
1799
1800        // Trying to encode a metric that would cause us to exceed our compressed limits will
1801        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1802        // as it could not be added.
1803        let sketch = get_simple_sketch();
1804        let result = encoder.try_encode(sketch.clone());
1805        assert!(result.is_ok());
1806        assert_eq!(result.unwrap(), Some(sketch));
1807
1808        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1809        // this payload, but it will be empty (effectively, the header/footer will exist) and no
1810        // processed metrics should be returned.
1811        let result = encoder.finish();
1812        assert!(result.is_ok());
1813
1814        let (payload, processed) = result.unwrap();
1815        assert_eq!(payload.uncompressed_byte_size, 0);
1816        assert_eq!(
1817            payload.into_payload(),
1818            get_compressed_empty_sketches_payload()
1819        );
1820        assert_eq!(processed.len(), 0);
1821    }
1822
1823    #[test]
1824    fn encode_series_v2_breaks_out_when_limit_reached_compressed() {
1825        // We manually create the encoder with an arbitrarily low "compressed" limit but high
1826        // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the
1827        // compressed payload would exceed the limit.
1828        let uncompressed_limit = 128;
1829        let compressed_limit = 32;
1830        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1831            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1832            None,
1833            uncompressed_limit,
1834            compressed_limit,
1835        );
1836
1837        // Trying to encode a metric that would cause us to exceed our compressed limits will
1838        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1839        // as it could not be added.
1840        let counter = get_simple_counter();
1841        let result = encoder.try_encode(counter.clone());
1842        assert!(result.is_ok());
1843        assert_eq!(result.unwrap(), Some(counter));
1844
1845        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1846        // this payload, but it will be empty (effectively, the header/footer will exist) and no
1847        // processed metrics should be returned.
1848        let result = encoder.finish();
1849        assert!(result.is_ok());
1850
1851        let (payload, processed) = result.unwrap();
1852        assert_eq!(payload.uncompressed_byte_size, 0);
1853        assert_eq!(
1854            payload.into_payload(),
1855            get_compressed_empty_series_v2_payload()
1856        );
1857        assert_eq!(processed.len(), 0);
1858    }
1859
1860    #[test]
1861    fn zstd_v2_payload_never_exceeds_512kb_with_incompressible_data() {
1862        // End-to-end regression test using the real 512 KB compressed limit.
1863        //
1864        // Metric names are generated with a xorshift64 PRNG producing random printable ASCII
1865        // (6.5 bits of entropy per byte), making them effectively incompressible for zstd.
1866        // This makes the capacity estimate tight, so the test validates both directions:
1867        //
1868        //   Safety   (upper bound): payload ≤ 512 KB.
1869        //     Without the fix, the encoder ignores zstd's internal 128 KB buffer.  When the
1870        //     encoder finally stops, finish() flushes that hidden buffer on top of the already
1871        //     ~511 KB payload → ~639 KB → TooLarge.
1872        //
1873        //   Utilization (lower bound): payload > 95% of 512 KB.
1874        //     With incompressible data, actual_compressed ≈ max_cs(uncompressed), so the
1875        //     estimate is tight.  The ~2.5% gap comes from: (1) compressible proto framing
1876        //     (field tags, timestamps, metadata) that zstd compresses better than max_cs
1877        //     predicts, (2) the max_cs overhead term (~0.4%), and (3) one-metric stopping
1878        //     granularity (~1%).
1879
1880        // xorshift64 PRNG: 5000 random printable ASCII chars per metric name (0x21..0x7E,
1881        // 93 values ≈ 6.5 bits/byte).  Long names ensure the random portion dominates the
1882        // compressible proto framing, maximizing utilization.
1883        const PRINTABLE_START: u8 = 0x21;
1884        const PRINTABLE_END: u8 = 0x7E;
1885        const PRINTABLE_LEN: u64 = (PRINTABLE_END - PRINTABLE_START + 1) as u64; // 93
1886        let mut xor_state = 0xdeadbeef_cafebabe_u64;
1887        let mut next_name = || -> String {
1888            std::iter::once('m')
1889                .chain((0..4999).map(|_| {
1890                    xor_state ^= xor_state << 13;
1891                    xor_state ^= xor_state >> 7;
1892                    xor_state ^= xor_state << 17;
1893                    (PRINTABLE_START + (xor_state % PRINTABLE_LEN) as u8) as char
1894                }))
1895                .collect()
1896        };
1897
1898        let mut encoder =
1899            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None);
1900
1901        let mut accepted = 0usize;
1902        loop {
1903            let metric = Metric::new(
1904                next_name(),
1905                MetricKind::Incremental,
1906                MetricValue::Counter {
1907                    value: (accepted + 1) as f64,
1908                },
1909            )
1910            .with_timestamp(Some(ts()));
1911
1912            match encoder.try_encode(metric) {
1913                Ok(None) => accepted += 1,
1914                Ok(Some(_)) => break,
1915                Err(e) => panic!("unexpected encoding error: {e}"),
1916            }
1917        }
1918
1919        assert!(accepted > 0, "at least one metric must be accepted");
1920
1921        let compressed_limit = DatadogMetricsEndpoint::Series(SeriesApiVersion::V2)
1922            .payload_limits()
1923            .compressed;
1924
1925        let (payload, _) = encoder.finish().unwrap_or_else(|e| {
1926            panic!(
1927                "finish() returned an error after {accepted} metrics — \
1928                 the capacity estimate failed to prevent overflow: {e:?}"
1929            )
1930        });
1931        let payload_len = payload.into_payload().len();
1932
1933        // Safety: the hard limit must never be exceeded.
1934        assert!(
1935            payload_len <= compressed_limit,
1936            "payload ({payload_len} bytes) exceeded the {compressed_limit}-byte compressed limit"
1937        );
1938
1939        // Utilization: the encoder should use at least 95% of the available capacity.
1940        let min_utilization = compressed_limit * 95 / 100;
1941        assert!(
1942            payload_len > min_utilization,
1943            "payload ({payload_len} bytes) is below 95% of the {compressed_limit}-byte limit \
1944             ({min_utilization} bytes) — estimate may be over-conservative"
1945        );
1946    }
1947
1948    #[test]
1949    fn compressed_limit_is_respected_regardless_of_compressor_internal_buffering() {
1950        // Regression test for zstd's internal buffering hiding the compressed-size check.
1951        //
1952        // zstd buffers up to 128 KB internally before flushing a block to the output Vec.
1953        // The old capacity check used `get_ref().len()` alone as "compressed bytes so far", which
1954        // returns 0 until zstd's first 128 KB block completes. This caused the encoder to accept
1955        // metrics indefinitely — finish() would then return TooLarge, triggering expensive
1956        // re-encoding.
1957        //
1958        // The fix splits the estimate: exact compressed size for flushed blocks, plus
1959        // max_compressed_size for the unflushed portion (bytes still in zstd's internal buffer).
1960        // This is accurate for flushed data and bounded for unflushed data.
1961        //
1962        // At compressed_limit=512, no zstd block will flush (needs 128 KB of input), so
1963        // get_ref().len() stays 0 throughout. The old code would accept all 100 metrics;
1964        // the new code stops after a handful.
1965        let compressed_limit = 512;
1966        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1967            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1968            None,
1969            1_000_000,
1970            compressed_limit,
1971        );
1972
1973        let mut accepted = 0;
1974        for i in 0..100 {
1975            let metric = Metric::new(
1976                format!("counter_{i:0>20}"),
1977                MetricKind::Incremental,
1978                MetricValue::Counter {
1979                    value: (i + 1) as f64,
1980                },
1981            )
1982            .with_timestamp(Some(ts()));
1983            match encoder.try_encode(metric) {
1984                Ok(None) => accepted += 1,
1985                Ok(Some(_)) => break,
1986                Err(e) => panic!("unexpected encoding error: {e}"),
1987            }
1988        }
1989
1990        assert!(accepted > 0, "encoder should accept at least one metric");
1991        assert!(
1992            accepted < 10,
1993            "encoder accepted too many metrics — compressed limit was likely not enforced (accepted={accepted})"
1994        );
1995
1996        let result = encoder.finish();
1997        assert!(
1998            result.is_ok(),
1999            "finish() must not return TooLarge: {:?}",
2000            result.err()
2001        );
2002        let (payload, _) = result.unwrap();
2003        assert!(
2004            payload.into_payload().len() <= compressed_limit,
2005            "payload exceeded compressed_limit"
2006        );
2007    }
2008
2009    #[test]
2010    fn zstd_buffered_bound_resets_to_last_metric_size_after_block_flush() {
2011        // White-box test: directly verifies that buffered_bound resets to exactly n (the last
2012        // metric's encoded size) when a zstd block flush occurs, not to 0 or some other value.
2013        //
2014        // buffered_bound is an upper bound on bytes in zstd's internal buffer.  On each write it
2015        // accumulates (+= n).  When a flush is detected (get_ref().len() grows), it resets to n —
2016        // meaning only the triggering write could straddle the block boundary.
2017        //
2018        // If it reset to 0 instead, subsequent capacity checks would degenerate to
2019        //   flushed_compressed + max_cs(n)
2020        // which vastly underestimates for any data written after the flush, re-introducing the
2021        // original blind spot.  If it failed to reset at all, the encoder would become
2022        // over-conservative and reject valid metrics after the first flush.
2023        //
2024        // Strategy:
2025        //   1. Measure a single metric's encoded size by inspecting buffered_bound after one write.
2026        //   2. Feed metrics into a second encoder (with unlimited limits) until buffered_bound
2027        //      *decreases*, which signals a block flush.  Assert the post-flush value equals
2028        //      exactly one metric's encoded size.
2029
2030        let make_metric = |i: usize| {
2031            // Fixed-width name (600-char zero-padded) gives a constant per-metric encoded size.
2032            // Value is (i + 1) to ensure it is never 0.0: proto3 omits default (zero) values,
2033            // which would make the first metric smaller than the rest.
2034            Metric::new(
2035                format!("counter_{i:0>600}"),
2036                MetricKind::Incremental,
2037                MetricValue::Counter {
2038                    value: (i + 1) as f64,
2039                },
2040            )
2041            .with_timestamp(Some(ts()))
2042        };
2043
2044        // Step 1: measure a single metric's encoded size.
2045        let metric_size = {
2046            let mut probe = DatadogMetricsEncoder::with_payload_limits(
2047                DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
2048                None,
2049                usize::MAX,
2050                usize::MAX,
2051            );
2052            assert!(
2053                probe.try_encode(make_metric(0)).unwrap().is_none(),
2054                "first metric must be accepted"
2055            );
2056            probe.buffered_bound()
2057        };
2058        assert!(metric_size > 0, "encoded metric must be non-empty");
2059
2060        // Step 2: encode metrics until buffered_bound decreases (= flush detected).
2061        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
2062            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
2063            None,
2064            usize::MAX,
2065            usize::MAX,
2066        );
2067
2068        let mut prev_buffered = 0usize;
2069        let mut flush_seen = false;
2070
2071        for i in 0..1000 {
2072            let metric = make_metric(i);
2073            match encoder.try_encode(metric) {
2074                Ok(None) => {}
2075                Ok(Some(_)) => panic!("unexpected rejection at i={i} with unlimited limits"),
2076                Err(e) => panic!("unexpected error at i={i}: {e}"),
2077            }
2078
2079            let curr = encoder.buffered_bound();
2080
2081            if curr < prev_buffered {
2082                // A block flush just occurred: buffered_bound must reset to exactly n.
2083                assert_eq!(
2084                    curr, metric_size,
2085                    "after block flush, buffered_bound should reset to one metric's encoded size \
2086                     ({metric_size} bytes) but got {curr}"
2087                );
2088                flush_seen = true;
2089                break;
2090            }
2091
2092            prev_buffered = curr;
2093        }
2094
2095        assert!(
2096            flush_seen,
2097            "no zstd block flush detected after 1000 metrics — increase loop bound or metric size"
2098        );
2099    }
2100
2101    fn arb_counter_metric() -> impl Strategy<Value = Metric> {
2102        let name = string_regex("[a-zA-Z][a-zA-Z0-9_]{8,96}").expect("regex should not be invalid");
2103        let value = ARB_POSITIVE_F64;
2104        let tags = btree_map(
2105            any::<u64>().prop_map(|v| v.to_string()),
2106            any::<u64>().prop_map(|v| v.to_string()),
2107            0..64,
2108        )
2109        .prop_map(|tags| (!tags.is_empty()).then(|| MetricTags::from(tags)));
2110
2111        (name, value, tags).prop_map(|(metric_name, metric_value, metric_tags)| {
2112            let metric_value = MetricValue::Counter {
2113                value: metric_value,
2114            };
2115            Metric::new(metric_name, MetricKind::Incremental, metric_value).with_tags(metric_tags)
2116        })
2117    }
2118
2119    proptest! {
2120        #[test]
2121        fn encoding_check_for_payload_limit_edge_cases_v1(
2122            uncompressed_limit in 1..64_000_000usize,
2123            compressed_limit in 1..10_000_000usize,
2124            metric in arb_counter_metric(),
2125        ) {
2126            // We simply try to encode a single metric into an encoder, and make sure that when we
2127            // finish the payload, if it didn't result in an error, that the payload was under the
2128            // configured limits.
2129            //
2130            // We check this with targeted unit tests as well but this is some cheap insurance to
2131            // show that we're hopefully not missing any particular corner cases.
2132            let mut encoder = DatadogMetricsEncoder::with_payload_limits(
2133                DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
2134                None,
2135                uncompressed_limit,
2136                compressed_limit,
2137            );
2138            _ = encoder.try_encode(metric);
2139
2140            if let Ok((payload, _processed)) = encoder.finish() {
2141                let payload = payload.into_payload();
2142                prop_assert!(payload.len() <= compressed_limit);
2143
2144                // V1 uses zlib/deflate.
2145                let result = decompress_zlib_payload(payload);
2146                prop_assert!(result.is_ok());
2147
2148                let decompressed = result.unwrap();
2149                prop_assert!(decompressed.len() <= uncompressed_limit);
2150            }
2151        }
2152
2153        #[test]
2154        fn encoding_check_for_payload_limit_edge_cases_v2(
2155            uncompressed_limit in 1..10_000_000usize,
2156            compressed_limit in 1..1_000_000usize,
2157            metric in arb_counter_metric(),
2158        ) {
2159            let mut encoder = DatadogMetricsEncoder::with_payload_limits(
2160                DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
2161                None,
2162                uncompressed_limit,
2163                compressed_limit,
2164            );
2165            _ = encoder.try_encode(metric);
2166
2167            if let Ok((payload, _processed)) = encoder.finish() {
2168                let payload = payload.into_payload();
2169                prop_assert!(payload.len() <= compressed_limit);
2170
2171                // V2 uses zstd.
2172                let result = decompress_zstd_payload(payload);
2173                prop_assert!(result.is_ok());
2174
2175                let decompressed = result.unwrap();
2176                prop_assert!(decompressed.len() <= uncompressed_limit);
2177            }
2178        }
2179    }
2180}