vector/sinks/datadog/metrics/
encoder.rs

1use std::{
2    cmp,
3    io::{self, Write},
4    mem,
5    sync::{Arc, LazyLock, OnceLock},
6};
7
8use bytes::{BufMut, Bytes};
9use chrono::{DateTime, Utc};
10use snafu::{ResultExt, Snafu};
11use vector_lib::request_metadata::GroupedCountByteSize;
12use vector_lib::{
13    config::{log_schema, telemetry, LogSchema},
14    event::{metric::MetricSketch, DatadogMetricOriginMetadata, Metric, MetricTags, MetricValue},
15    metrics::AgentDDSketch,
16    EstimatedJsonEncodedSizeOf,
17};
18
19use super::config::{DatadogMetricsEndpoint, SeriesApiVersion};
20use crate::{
21    common::datadog::{
22        DatadogMetricType, DatadogPoint, DatadogSeriesMetric, DatadogSeriesMetricMetadata,
23    },
24    proto::fds::protobuf_descriptors,
25    sinks::util::{encode_namespace, request_builder::EncodeResult, Compression, Compressor},
26};
27
28const SERIES_PAYLOAD_HEADER: &[u8] = b"{\"series\":[";
29const SERIES_PAYLOAD_FOOTER: &[u8] = b"]}";
30const SERIES_PAYLOAD_DELIMITER: &[u8] = b",";
31
32pub(super) const ORIGIN_CATEGORY_VALUE: u32 = 11;
33
34const DEFAULT_DD_ORIGIN_PRODUCT_VALUE: u32 = 14;
35
36pub(super) static ORIGIN_PRODUCT_VALUE: LazyLock<u32> = LazyLock::new(|| {
37    option_env!("DD_ORIGIN_PRODUCT")
38        .map(|p| {
39            p.parse::<u32>()
40                .expect("Env var DD_ORIGIN_PRODUCT must be an unsigned 32 bit integer.")
41        })
42        .unwrap_or(DEFAULT_DD_ORIGIN_PRODUCT_VALUE)
43});
44
45#[allow(warnings, clippy::pedantic, clippy::nursery)]
46mod ddmetric_proto {
47    include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
48}
49
50#[derive(Debug, Snafu)]
51pub enum CreateError {
52    #[snafu(display("Invalid compressed/uncompressed payload size limits were given"))]
53    InvalidLimits,
54}
55
56impl CreateError {
57    /// Gets the telemetry-friendly string version of this error.
58    ///
59    /// The value will be a short string with only lowercase letters and underscores.
60    pub const fn as_error_type(&self) -> &'static str {
61        match self {
62            Self::InvalidLimits => "invalid_payload_limits",
63        }
64    }
65}
66
67#[derive(Debug, Snafu)]
68pub enum EncoderError {
69    #[snafu(display(
70        "Invalid metric value '{}' was given; '{}' expected",
71        metric_value,
72        expected
73    ))]
74    InvalidMetric {
75        expected: &'static str,
76        metric_value: &'static str,
77    },
78
79    #[snafu(
80        context(false),
81        display("Failed to encode series metric to JSON: {source}")
82    )]
83    JsonEncodingFailed { source: serde_json::Error },
84
85    // Currently, the only time `prost` ever emits `EncodeError` is when there is insufficient
86    // buffer capacity, so we don't need to hold on to the error, and we can just hardcode this.
87    #[snafu(display(
88        "Failed to encode sketch metric to Protocol Buffers: insufficient buffer capacity."
89    ))]
90    ProtoEncodingFailed,
91}
92
93impl EncoderError {
94    /// Gets the telemetry-friendly string version of this error.
95    ///
96    /// The value will be a short string with only lowercase letters and underscores.
97    pub const fn as_error_type(&self) -> &'static str {
98        match self {
99            Self::InvalidMetric { .. } => "invalid_metric",
100            Self::JsonEncodingFailed { .. } => "failed_to_encode_series",
101            Self::ProtoEncodingFailed => "failed_to_encode_sketch",
102        }
103    }
104}
105
106#[derive(Debug, Snafu)]
107pub enum FinishError {
108    #[snafu(display(
109        "Failure occurred during writing to or finalizing the compressor: {}",
110        source
111    ))]
112    CompressionFailed { source: io::Error },
113
114    #[snafu(display("Finished payload exceeded the (un)compressed size limits"))]
115    TooLarge {
116        metrics: Vec<Metric>,
117        recommended_splits: usize,
118    },
119}
120
121impl FinishError {
122    /// Gets the telemetry-friendly string version of this error.
123    ///
124    /// The value will be a short string with only lowercase letters and underscores.
125    pub const fn as_error_type(&self) -> &'static str {
126        match self {
127            Self::CompressionFailed { .. } => "compression_failed",
128            Self::TooLarge { .. } => "too_large",
129        }
130    }
131}
132
133struct EncoderState {
134    writer: Compressor,
135    written: usize,
136    buf: Vec<u8>,
137    processed: Vec<Metric>,
138    byte_size: GroupedCountByteSize,
139}
140
141impl Default for EncoderState {
142    fn default() -> Self {
143        Self {
144            writer: get_compressor(),
145            written: 0,
146            buf: Vec::with_capacity(1024),
147            processed: Vec::new(),
148            byte_size: telemetry().create_request_count_byte_size(),
149        }
150    }
151}
152
153pub struct DatadogMetricsEncoder {
154    endpoint: DatadogMetricsEndpoint,
155    default_namespace: Option<Arc<str>>,
156    uncompressed_limit: usize,
157    compressed_limit: usize,
158
159    state: EncoderState,
160    log_schema: &'static LogSchema,
161
162    origin_product_value: u32,
163}
164
165impl DatadogMetricsEncoder {
166    /// Creates a new `DatadogMetricsEncoder` for the given endpoint.
167    pub fn new(
168        endpoint: DatadogMetricsEndpoint,
169        default_namespace: Option<String>,
170    ) -> Result<Self, CreateError> {
171        let payload_limits = endpoint.payload_limits();
172        Self::with_payload_limits(
173            endpoint,
174            default_namespace,
175            payload_limits.uncompressed,
176            payload_limits.compressed,
177        )
178    }
179
180    /// Creates a new `DatadogMetricsEncoder` for the given endpoint, with specific payload limits.
181    pub fn with_payload_limits(
182        endpoint: DatadogMetricsEndpoint,
183        default_namespace: Option<String>,
184        uncompressed_limit: usize,
185        compressed_limit: usize,
186    ) -> Result<Self, CreateError> {
187        let (uncompressed_limit, compressed_limit) =
188            validate_payload_size_limits(endpoint, uncompressed_limit, compressed_limit)
189                .ok_or(CreateError::InvalidLimits)?;
190
191        Ok(Self {
192            endpoint,
193            default_namespace: default_namespace.map(Arc::from),
194            uncompressed_limit,
195            compressed_limit,
196            state: EncoderState::default(),
197            log_schema: log_schema(),
198            origin_product_value: *ORIGIN_PRODUCT_VALUE,
199        })
200    }
201}
202
203impl DatadogMetricsEncoder {
204    fn reset_state(&mut self) -> EncoderState {
205        mem::take(&mut self.state)
206    }
207
208    fn encode_single_metric(&mut self, metric: Metric) -> Result<Option<Metric>, EncoderError> {
209        // We take special care in this method to capture errors which are not indicative of the
210        // metric itself causing a failure in order to be able to return the metric to the caller.
211        // The contract of the encoder is such that when an encoding attempt fails for normal
212        // reasons, like being out of room, we give back the metric so the caller can finalize the
213        // previously encoded metrics and then reset and try again to encode.
214        //
215        // If the encoder is in a persistent bad state, they'll get back a proper error when calling
216        // `finish`, so they eventually get an error, we just make sure they can tidy up before that
217        // and avoid needlessly dropping metrics due to unrelated errors.
218
219        // Clear our temporary buffer before any encoding.
220        self.state.buf.clear();
221
222        self.state
223            .byte_size
224            .add_event(&metric, metric.estimated_json_encoded_size_of());
225
226        // For V2 Series metrics, and Sketches: We encode a single Series or Sketch metric incrementally,
227        // which means that we specifically write it as if we were writing a single field entry in the
228        // overall `SketchPayload` message or `MetricPayload` type.
229        //
230        // By doing so, we can encode multiple metrics and concatenate all the buffers, and have the
231        // resulting buffer appear as if it's a normal `<>Payload` message with a bunch of repeats
232        // of the `sketches` / `series` field.
233        //
234        // Crucially, this code works because `SketchPayload` has two fields -- metadata and sketches --
235        // and we never actually set the metadata field... so the resulting message generated overall
236        // for `SketchPayload` with a single sketch looks just like as if we literally wrote out a
237        // single value for the given field.
238        //
239        // Similarly, `MetricPayload` has a single repeated `series` field.
240
241        match self.endpoint {
242            // V1 Series metrics are encoded via JSON, in an incremental fashion.
243            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => {
244                // A single `Metric` might generate multiple Datadog series metrics.
245                let all_series = generate_series_metrics(
246                    &metric,
247                    &self.default_namespace,
248                    self.log_schema,
249                    self.origin_product_value,
250                )?;
251
252                // We handle adding the JSON array separator (comma) manually since the encoding is
253                // happening incrementally.
254                let has_processed = !self.state.processed.is_empty();
255                for (i, series) in all_series.iter().enumerate() {
256                    // Add a array delimiter if we already have other metrics encoded.
257                    if (has_processed || i > 0)
258                        && write_payload_delimiter(self.endpoint, &mut self.state.buf).is_err()
259                    {
260                        return Ok(Some(metric));
261                    }
262                    serde_json::to_writer(&mut self.state.buf, series)?;
263                }
264            }
265            // V2 Series metrics are encoded via ProtoBuf, in an incremental fashion.
266            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => match metric.value() {
267                MetricValue::Counter { .. }
268                | MetricValue::Gauge { .. }
269                | MetricValue::Set { .. }
270                | MetricValue::AggregatedSummary { .. } => {
271                    let series_proto = series_to_proto_message(
272                        &metric,
273                        &self.default_namespace,
274                        self.log_schema,
275                        self.origin_product_value,
276                    )?;
277
278                    encode_proto_key_and_message(
279                        series_proto,
280                        get_series_payload_series_field_number(),
281                        &mut self.state.buf,
282                    )?;
283                }
284                value => {
285                    return Err(EncoderError::InvalidMetric {
286                        expected: "series",
287                        metric_value: value.as_name(),
288                    })
289                }
290            },
291            // Sketches are encoded via ProtoBuf, also in an incremental fashion.
292            DatadogMetricsEndpoint::Sketches => match metric.value() {
293                MetricValue::Sketch { sketch } => match sketch {
294                    MetricSketch::AgentDDSketch(ddsketch) => {
295                        if let Some(sketch_proto) = sketch_to_proto_message(
296                            &metric,
297                            ddsketch,
298                            &self.default_namespace,
299                            self.log_schema,
300                            self.origin_product_value,
301                        ) {
302                            encode_proto_key_and_message(
303                                sketch_proto,
304                                get_sketch_payload_sketches_field_number(),
305                                &mut self.state.buf,
306                            )?;
307                        } else {
308                            // If the sketch was empty, that's fine too
309                        }
310                    }
311                },
312                value => {
313                    return Err(EncoderError::InvalidMetric {
314                        expected: "sketches",
315                        metric_value: value.as_name(),
316                    })
317                }
318            },
319        }
320
321        // Try and see if our temporary buffer can be written to the compressor.
322        match self.try_compress_buffer() {
323            Err(_) | Ok(false) => Ok(Some(metric)),
324            Ok(true) => {
325                self.state.processed.push(metric);
326                Ok(None)
327            }
328        }
329    }
330
331    fn try_compress_buffer(&mut self) -> io::Result<bool> {
332        let n = self.state.buf.len();
333
334        // If we're over our uncompressed size limit with this metric, inform the caller.
335        if self.state.written + n > self.uncompressed_limit {
336            return Ok(false);
337        }
338
339        // Calculating the compressed size is slightly more tricky, because we can only speculate
340        // about how many bytes it would take when compressed.  If we write into the compressor, we
341        // can't back out that write, even if we manually modify the underlying Vec<u8>, as the
342        // compressor might have internal state around checksums, etc, that can't be similarly
343        // rolled back.
344        //
345        // Our strategy is thus: simply take the encoded-but-decompressed size and see if it would
346        // fit within the compressed limit.  In `get_endpoint_payload_size_limits`, we calculate the
347        // expected maximum overhead of zlib based on all input data being incompressible, which
348        // zlib ensures will be the worst case as it can figure out whether a compressed or
349        // uncompressed block would take up more space _before_ choosing which strategy to go with.
350        //
351        // Thus, simply put, we've already accounted for the uncertainty by making our check here
352        // assume the worst case while our limits assume the worst case _overhead_.  Maybe our
353        // numbers are technically off in the end, but `finish` catches that for us, too.
354        let compressed_len = self.state.writer.get_ref().len();
355        let max_compressed_metric_len = n + max_compressed_overhead_len(n);
356        if compressed_len + max_compressed_metric_len > self.compressed_limit {
357            return Ok(false);
358        }
359
360        // We should be safe to write our holding buffer to the compressor and store the metric.
361        self.state.writer.write_all(&self.state.buf)?;
362        self.state.written += n;
363        Ok(true)
364    }
365
366    /// Attempts to encode a single metric into this encoder.
367    ///
368    /// For some metric types, the metric will be encoded immediately and we will attempt to
369    /// compress it.  For some other metric types, we will store the metric until `finish` is
370    /// called, due to the inability to incrementally encode them.
371    ///
372    /// If the metric could not be encoded into this encoder due to hitting the limits on the size
373    /// of the encoded/compressed payload, it will be returned via `Ok(Some(Metric))`, otherwise `Ok(None)`
374    /// will be returned.
375    ///
376    /// If `Ok(Some(Metric))` is returned, callers must call `finish` to finalize the payload.
377    /// Further calls to `try_encode` without first calling `finish` may or may not succeed.
378    ///
379    /// # Errors
380    ///
381    /// If an error is encountered while attempting to encode the metric, an error variant will be returned.
382    pub fn try_encode(&mut self, metric: Metric) -> Result<Option<Metric>, EncoderError> {
383        // Make sure we've written our header already.
384        if self.state.written == 0 {
385            match write_payload_header(self.endpoint, &mut self.state.writer) {
386                Ok(n) => self.state.written += n,
387                Err(_) => return Ok(Some(metric)),
388            }
389        }
390
391        self.encode_single_metric(metric)
392    }
393
394    pub fn finish(&mut self) -> Result<(EncodeResult<Bytes>, Vec<Metric>), FinishError> {
395        // Write any payload footer necessary for the configured endpoint.
396        let n = write_payload_footer(self.endpoint, &mut self.state.writer)
397            .context(CompressionFailedSnafu)?;
398        self.state.written += n;
399
400        let raw_bytes_written = self.state.written;
401        let byte_size = self.state.byte_size.clone();
402
403        // Consume the encoder state so we can do our final checks and return the necessary data.
404        let state = self.reset_state();
405        let payload = state
406            .writer
407            .finish()
408            .context(CompressionFailedSnafu)?
409            .freeze();
410        let processed = state.processed;
411
412        // We should have configured our limits such that if all calls to `try_compress_buffer` have
413        // succeeded up until this point, then our payload should be within the limits after writing
414        // the footer and finishing the compressor.
415        //
416        // We're not only double checking that here, but we're figuring out how much bigger than the
417        // limit the payload is, if it is indeed bigger, so that we can recommend how many splits
418        // should be used to bring the given set of metrics down to chunks that can be encoded
419        // without hitting the limits.
420        let compressed_splits = payload.len() / self.compressed_limit;
421        let uncompressed_splits = state.written / self.uncompressed_limit;
422        let recommended_splits = cmp::max(compressed_splits, uncompressed_splits) + 1;
423
424        if recommended_splits == 1 {
425            // "One" split means no splits needed: our payload didn't exceed either of the limits.
426            Ok((
427                EncodeResult::compressed(payload, raw_bytes_written, byte_size),
428                processed,
429            ))
430        } else {
431            Err(FinishError::TooLarge {
432                metrics: processed,
433                recommended_splits,
434            })
435        }
436    }
437}
438
439fn generate_proto_metadata(
440    maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
441    maybe_source_type: Option<&str>,
442    origin_product_value: u32,
443) -> Option<ddmetric_proto::Metadata> {
444    generate_origin_metadata(maybe_pass_through, maybe_source_type, origin_product_value).map(
445        |origin| {
446            if origin.product().is_none()
447                || origin.category().is_none()
448                || origin.service().is_none()
449            {
450                warn!(
451                    message = "Generated sketch origin metadata should have each field set.",
452                    product = origin.product(),
453                    category = origin.category(),
454                    service = origin.service()
455                );
456            }
457            ddmetric_proto::Metadata {
458                origin: Some(ddmetric_proto::Origin {
459                    origin_product: origin.product().unwrap_or_default(),
460                    origin_category: origin.category().unwrap_or_default(),
461                    origin_service: origin.service().unwrap_or_default(),
462                }),
463            }
464        },
465    )
466}
467
468fn get_sketch_payload_sketches_field_number() -> u32 {
469    static SKETCH_PAYLOAD_SKETCHES_FIELD_NUM: OnceLock<u32> = OnceLock::new();
470    *SKETCH_PAYLOAD_SKETCHES_FIELD_NUM.get_or_init(|| {
471        let descriptors = protobuf_descriptors();
472        let descriptor = descriptors
473            .get_message_by_name("datadog.agentpayload.SketchPayload")
474            .expect("should not fail to find `SketchPayload` message in descriptor pool");
475
476        descriptor
477            .get_field_by_name("sketches")
478            .map(|field| field.number())
479            .expect("`sketches` field must exist in `SketchPayload` message")
480    })
481}
482
483fn get_series_payload_series_field_number() -> u32 {
484    static SERIES_PAYLOAD_SERIES_FIELD_NUM: OnceLock<u32> = OnceLock::new();
485    *SERIES_PAYLOAD_SERIES_FIELD_NUM.get_or_init(|| {
486        let descriptors = protobuf_descriptors();
487        let descriptor = descriptors
488            .get_message_by_name("datadog.agentpayload.MetricPayload")
489            .expect("should not fail to find `MetricPayload` message in descriptor pool");
490
491        descriptor
492            .get_field_by_name("series")
493            .map(|field| field.number())
494            .expect("`series` field must exist in `MetricPayload` message")
495    })
496}
497
498fn sketch_to_proto_message(
499    metric: &Metric,
500    ddsketch: &AgentDDSketch,
501    default_namespace: &Option<Arc<str>>,
502    log_schema: &'static LogSchema,
503    origin_product_value: u32,
504) -> Option<ddmetric_proto::sketch_payload::Sketch> {
505    if ddsketch.is_empty() {
506        return None;
507    }
508
509    let name = get_namespaced_name(metric, default_namespace);
510    let ts = encode_timestamp(metric.timestamp());
511    let mut tags = metric.tags().cloned().unwrap_or_default();
512    let host = log_schema
513        .host_key()
514        .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
515        .unwrap_or_default();
516    let tags = encode_tags(&tags);
517
518    let cnt = ddsketch.count() as i64;
519    let min = ddsketch
520        .min()
521        .expect("min should be present for non-empty sketch");
522    let max = ddsketch
523        .max()
524        .expect("max should be present for non-empty sketch");
525    let avg = ddsketch
526        .avg()
527        .expect("avg should be present for non-empty sketch");
528    let sum = ddsketch
529        .sum()
530        .expect("sum should be present for non-empty sketch");
531
532    let (bins, counts) = ddsketch.bin_map().into_parts();
533    let k = bins.into_iter().map(Into::into).collect();
534    let n = counts.into_iter().map(Into::into).collect();
535
536    let event_metadata = metric.metadata();
537    let metadata = generate_proto_metadata(
538        event_metadata.datadog_origin_metadata(),
539        event_metadata.source_type(),
540        origin_product_value,
541    );
542
543    trace!(?metadata, "Generated sketch metadata.");
544
545    Some(ddmetric_proto::sketch_payload::Sketch {
546        metric: name,
547        tags,
548        host,
549        distributions: Vec::new(),
550        dogsketches: vec![ddmetric_proto::sketch_payload::sketch::Dogsketch {
551            ts,
552            cnt,
553            min,
554            max,
555            avg,
556            sum,
557            k,
558            n,
559        }],
560        metadata,
561    })
562}
563
564fn series_to_proto_message(
565    metric: &Metric,
566    default_namespace: &Option<Arc<str>>,
567    log_schema: &'static LogSchema,
568    origin_product_value: u32,
569) -> Result<ddmetric_proto::metric_payload::MetricSeries, EncoderError> {
570    let metric_name = get_namespaced_name(metric, default_namespace);
571    let mut tags = metric.tags().cloned().unwrap_or_default();
572
573    let mut resources = vec![];
574
575    if let Some(host) = log_schema
576        .host_key()
577        .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
578    {
579        resources.push(ddmetric_proto::metric_payload::Resource {
580            r#type: "host".to_string(),
581            name: host,
582        });
583    }
584
585    // In the `datadog_agent` source, the tag is added as `device` for the V1 endpoint
586    // and `resource.device` for the V2 endpoint.
587    if let Some(device) = tags.remove("device").or(tags.remove("resource.device")) {
588        resources.push(ddmetric_proto::metric_payload::Resource {
589            r#type: "device".to_string(),
590            name: device,
591        });
592    }
593
594    let source_type_name = tags.remove("source_type_name").unwrap_or_default();
595
596    let tags = encode_tags(&tags);
597
598    let event_metadata = metric.metadata();
599    let metadata = generate_proto_metadata(
600        event_metadata.datadog_origin_metadata(),
601        event_metadata.source_type(),
602        origin_product_value,
603    );
604    trace!(?metadata, "Generated MetricSeries metadata.");
605
606    let timestamp = encode_timestamp(metric.timestamp());
607
608    // our internal representation is in milliseconds but the expected output is in seconds
609    let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);
610
611    let (points, metric_type) = match metric.value() {
612        MetricValue::Counter { value } => {
613            if let Some(interval) = maybe_interval {
614                // When an interval is defined, it implies the value should be in a per-second form,
615                // so we need to get back to seconds from our milliseconds-based interval, and then
616                // divide our value by that amount as well.
617                let value = *value / (interval as f64);
618                (
619                    vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }],
620                    ddmetric_proto::metric_payload::MetricType::Rate,
621                )
622            } else {
623                (
624                    vec![ddmetric_proto::metric_payload::MetricPoint {
625                        value: *value,
626                        timestamp,
627                    }],
628                    ddmetric_proto::metric_payload::MetricType::Count,
629                )
630            }
631        }
632        MetricValue::Set { values } => (
633            vec![ddmetric_proto::metric_payload::MetricPoint {
634                value: values.len() as f64,
635                timestamp,
636            }],
637            ddmetric_proto::metric_payload::MetricType::Gauge,
638        ),
639        MetricValue::Gauge { value } => (
640            vec![ddmetric_proto::metric_payload::MetricPoint {
641                value: *value,
642                timestamp,
643            }],
644            ddmetric_proto::metric_payload::MetricType::Gauge,
645        ),
646        // NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization
647        value => {
648            // this case should have already been surfaced by encode_single_metric() so this should never be reached
649            return Err(EncoderError::InvalidMetric {
650                expected: "series",
651                metric_value: value.as_name(),
652            });
653        }
654    };
655
656    Ok(ddmetric_proto::metric_payload::MetricSeries {
657        resources,
658        metric: metric_name,
659        tags,
660        points,
661        r#type: metric_type.into(),
662        // unit is omitted
663        unit: "".to_string(),
664        source_type_name,
665        interval: maybe_interval.unwrap_or(0) as i64,
666        metadata,
667    })
668}
669
670// Manually write the field tag and then encode the Message payload directly as a length-delimited message.
671fn encode_proto_key_and_message<T, B>(msg: T, tag: u32, buf: &mut B) -> Result<(), EncoderError>
672where
673    T: prost::Message,
674    B: BufMut,
675{
676    prost::encoding::encode_key(tag, prost::encoding::WireType::LengthDelimited, buf);
677
678    msg.encode_length_delimited(buf)
679        .map_err(|_| EncoderError::ProtoEncodingFailed)
680}
681
682fn get_namespaced_name(metric: &Metric, default_namespace: &Option<Arc<str>>) -> String {
683    encode_namespace(
684        metric
685            .namespace()
686            .or_else(|| default_namespace.as_ref().map(|s| s.as_ref())),
687        '.',
688        metric.name(),
689    )
690}
691
692fn encode_tags(tags: &MetricTags) -> Vec<String> {
693    let mut pairs: Vec<_> = tags
694        .iter_all()
695        .map(|(name, value)| match value {
696            Some(value) => format!("{name}:{value}"),
697            None => name.into(),
698        })
699        .collect();
700    pairs.sort();
701    pairs
702}
703
704fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
705    if let Some(ts) = timestamp {
706        ts.timestamp()
707    } else {
708        Utc::now().timestamp()
709    }
710}
711
712// Given the vector source type, return the OriginService value associated with that integration, if any.
713fn source_type_to_service(source_type: &str) -> Option<u32> {
714    match source_type {
715        // In order to preserve consistent behavior, we intentionally don't set origin metadata
716        // for the case where the Datadog Agent did not set it.
717        "datadog_agent" => None,
718
719        // These are the sources for which metrics truly originated from this Vector instance.
720        "apache_metrics" => Some(17),
721        "aws_ecs_metrics" => Some(209),
722        "eventstoredb_metrics" => Some(210),
723        "host_metrics" => Some(211),
724        "internal_metrics" => Some(212),
725        "mongodb_metrics" => Some(111),
726        "nginx_metrics" => Some(117),
727        "open_telemetry" => Some(213),
728        "postgresql_metrics" => Some(128),
729        "prometheus_remote_write" => Some(214),
730        "prometheus_scrape" => Some(215),
731        "statsd" => Some(153),
732
733        // These sources are only capable of receiving metrics with the `native` or `native_json` codec.
734        // Generally that means the Origin Metadata will have been set as a pass through.
735        // However, if the upstream Vector instance did not set Origin Metadata (for example if it is an
736        // older version version), we will at least set the OriginProduct and OriginCategory.
737        "kafka" | "nats" | "redis" | "gcp_pubsub" | "http_client" | "http_server" | "vector"
738        | "pulsar" => Some(0),
739
740        // This scenario should not occur- if it does it means we added a source that deals with metrics,
741        // and did not update this function.
742        // But if it does occur, by setting the Service value to be undefined, we at least populate the
743        // OriginProduct and OriginCategory.
744        _ => {
745            debug!("Source {source_type} OriginService value is undefined! This source needs to be properly mapped to a Service value.");
746            Some(0)
747        }
748    }
749}
750
751/// Determine the correct Origin metadata values to use depending on if they have been
752/// set already upstream or not. The generalized struct `DatadogMetricOriginMetadata` is
753/// utilized in this function, which allows the series and sketch encoding to call and map
754/// the result appropriately for the given protocol they operate on.
755fn generate_origin_metadata(
756    maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
757    maybe_source_type: Option<&str>,
758    origin_product_value: u32,
759) -> Option<DatadogMetricOriginMetadata> {
760    let no_value = 0;
761
762    // An upstream vector source or a transform has set the origin metadata already.
763    // Currently this is only possible by these scenarios:
764    //     - `datadog_agent` source receiving the metadata on ingested metrics
765    //     - `vector` source receiving events with EventMetadata that already has the origins set
766    //     - A metrics source configured with the `native` or `native_json` codecs, where the upstream
767    //       Vector instance enriched the EventMetadata with Origin metadata.
768    //     - `log_to_metric` transform set the OriginService in the EventMetadata when it creates
769    //        the new metric.
770    if let Some(pass_through) = maybe_pass_through {
771        Some(DatadogMetricOriginMetadata::new(
772            pass_through.product().or(Some(origin_product_value)),
773            pass_through.category().or(Some(ORIGIN_CATEGORY_VALUE)),
774            pass_through.service().or(Some(no_value)),
775        ))
776
777    // No metadata has been set upstream
778    } else {
779        maybe_source_type.and_then(|source_type| {
780            // Only set the metadata if the source is a metric source we should set it for.
781            // In order to preserve consistent behavior, we intentionally don't set origin metadata
782            // for the case where the Datadog Agent did not set it.
783            source_type_to_service(source_type).map(|origin_service_value| {
784                DatadogMetricOriginMetadata::new(
785                    Some(origin_product_value),
786                    Some(ORIGIN_CATEGORY_VALUE),
787                    Some(origin_service_value),
788                )
789            })
790        })
791    }
792}
793
794fn generate_series_metadata(
795    maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
796    maybe_source_type: Option<&str>,
797    origin_product_value: u32,
798) -> Option<DatadogSeriesMetricMetadata> {
799    generate_origin_metadata(maybe_pass_through, maybe_source_type, origin_product_value).map(
800        |origin| DatadogSeriesMetricMetadata {
801            origin: Some(origin),
802        },
803    )
804}
805
806fn generate_series_metrics(
807    metric: &Metric,
808    default_namespace: &Option<Arc<str>>,
809    log_schema: &'static LogSchema,
810    origin_product_value: u32,
811) -> Result<Vec<DatadogSeriesMetric>, EncoderError> {
812    let name = get_namespaced_name(metric, default_namespace);
813
814    let mut tags = metric.tags().cloned().unwrap_or_default();
815    let host = log_schema
816        .host_key()
817        .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default());
818
819    let source_type_name = tags.remove("source_type_name");
820    let device = tags.remove("device");
821    let ts = encode_timestamp(metric.timestamp());
822    let tags = Some(encode_tags(&tags));
823
824    // our internal representation is in milliseconds but the expected output is in seconds
825    let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);
826
827    let event_metadata = metric.metadata();
828    let metadata = generate_series_metadata(
829        event_metadata.datadog_origin_metadata(),
830        event_metadata.source_type(),
831        origin_product_value,
832    );
833
834    trace!(?metadata, "Generated series metadata.");
835
836    let (points, metric_type) = match metric.value() {
837        MetricValue::Counter { value } => {
838            if let Some(interval) = maybe_interval {
839                // When an interval is defined, it implies the value should be in a per-second form,
840                // so we need to get back to seconds from our milliseconds-based interval, and then
841                // divide our value by that amount as well.
842                let value = *value / (interval as f64);
843                (vec![DatadogPoint(ts, value)], DatadogMetricType::Rate)
844            } else {
845                (vec![DatadogPoint(ts, *value)], DatadogMetricType::Count)
846            }
847        }
848        MetricValue::Set { values } => (
849            vec![DatadogPoint(ts, values.len() as f64)],
850            DatadogMetricType::Gauge,
851        ),
852        MetricValue::Gauge { value } => (vec![DatadogPoint(ts, *value)], DatadogMetricType::Gauge),
853        // NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization
854        value => {
855            return Err(EncoderError::InvalidMetric {
856                expected: "series",
857                metric_value: value.as_name(),
858            })
859        }
860    };
861
862    Ok(vec![DatadogSeriesMetric {
863        metric: name,
864        r#type: metric_type,
865        interval: maybe_interval,
866        points,
867        tags,
868        host,
869        source_type_name,
870        device,
871        metadata,
872    }])
873}
874
875fn get_compressor() -> Compressor {
876    // We use the "zlib default" compressor because it's all Datadog supports, and adding it
877    // generically to `Compression` would make things a little weird because of the conversion trait
878    // implementations that are also only none vs gzip.
879    Compression::zlib_default().into()
880}
881
882const fn max_uncompressed_header_len() -> usize {
883    SERIES_PAYLOAD_HEADER.len() + SERIES_PAYLOAD_FOOTER.len()
884}
885
886// Datadog ingest APIs accept zlib, which is what we're accounting for here. By default, zlib
887// has a 2 byte header and 4 byte CRC trailer. [1]
888//
889// [1] https://www.zlib.net/zlib_tech.html
890const ZLIB_HEADER_TRAILER: usize = 6;
891
892const fn max_compression_overhead_len(compressed_limit: usize) -> usize {
893    // We calculate the overhead as the zlib header/trailer plus the worst case overhead of
894    // compressing `compressed_limit` bytes, such that we assume all of the data we write may not be
895    // compressed at all.
896    ZLIB_HEADER_TRAILER + max_compressed_overhead_len(compressed_limit)
897}
898
899const fn max_compressed_overhead_len(len: usize) -> usize {
900    // Datadog ingest APIs accept zlib, which is what we're accounting for here.
901    //
902    // Deflate, the underlying compression algorithm, has a technique to ensure that input data
903    // can't be encoded in such a way where it's expanded by a meaningful amount. This technique
904    // allows storing blocks of uncompressed data with only 5 bytes of overhead per block.
905    // Technically, the blocks can be up to 65KB in Deflate, but modern zlib implementations use
906    // block sizes of 16KB. [1][2]
907    //
908    // We calculate the overhead of compressing a given `len` bytes as the worst case of that many
909    // bytes being written to the compressor and being unable to be compressed at all
910    //
911    // [1] https://www.zlib.net/zlib_tech.html
912    // [2] https://www.bolet.org/~pornin/deflate-flush-fr.html
913    const STORED_BLOCK_SIZE: usize = 16384;
914    (1 + len.saturating_sub(ZLIB_HEADER_TRAILER) / STORED_BLOCK_SIZE) * 5
915}
916
917const fn validate_payload_size_limits(
918    endpoint: DatadogMetricsEndpoint,
919    uncompressed_limit: usize,
920    compressed_limit: usize,
921) -> Option<(usize, usize)> {
922    if endpoint.is_series() {
923        // For series, we need to make sure the uncompressed limit can account for the header/footer
924        // we would add that wraps the encoded metrics up in the expected JSON object. This does
925        // imply that adding 1 to this limit would be allowed, and obviously we can't encode a
926        // series metric in a single byte, but this is just a simple sanity check, not an exhaustive
927        // search of the absolute bare minimum size.
928        let header_len = max_uncompressed_header_len();
929        if uncompressed_limit <= header_len {
930            return None;
931        }
932    }
933
934    // Get the maximum possible overhead of the compression container, based on the incoming
935    // _uncompressed_ limit. We use the uncompressed limit because we're calculating the maximum
936    // overhead in the case that, theoretically, none of the input data was compressible.  This
937    // possibility is essentially impossible, but serves as a proper worst-case-scenario check.
938    let max_compression_overhead = max_compression_overhead_len(uncompressed_limit);
939    if compressed_limit <= max_compression_overhead {
940        return None;
941    }
942
943    Some((uncompressed_limit, compressed_limit))
944}
945
946fn write_payload_header(
947    endpoint: DatadogMetricsEndpoint,
948    writer: &mut dyn io::Write,
949) -> io::Result<usize> {
950    match endpoint {
951        DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
952            .write_all(SERIES_PAYLOAD_HEADER)
953            .map(|_| SERIES_PAYLOAD_HEADER.len()),
954        _ => Ok(0),
955    }
956}
957
958fn write_payload_delimiter(
959    endpoint: DatadogMetricsEndpoint,
960    writer: &mut dyn io::Write,
961) -> io::Result<usize> {
962    match endpoint {
963        DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
964            .write_all(SERIES_PAYLOAD_DELIMITER)
965            .map(|_| SERIES_PAYLOAD_DELIMITER.len()),
966        _ => Ok(0),
967    }
968}
969
970fn write_payload_footer(
971    endpoint: DatadogMetricsEndpoint,
972    writer: &mut dyn io::Write,
973) -> io::Result<usize> {
974    match endpoint {
975        DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
976            .write_all(SERIES_PAYLOAD_FOOTER)
977            .map(|_| SERIES_PAYLOAD_FOOTER.len()),
978        _ => Ok(0),
979    }
980}
981
982#[cfg(test)]
983mod tests {
984    use std::{
985        io::{self, copy},
986        num::NonZeroU32,
987        sync::Arc,
988    };
989
990    use bytes::{BufMut, Bytes, BytesMut};
991    use chrono::{DateTime, TimeZone, Timelike, Utc};
992    use flate2::read::ZlibDecoder;
993    use proptest::{
994        arbitrary::any, collection::btree_map, num::f64::POSITIVE as ARB_POSITIVE_F64, prop_assert,
995        proptest, strategy::Strategy, string::string_regex,
996    };
997    use prost::Message;
998    use vector_lib::{
999        config::{log_schema, LogSchema},
1000        event::{
1001            metric::{MetricSketch, TagValue},
1002            DatadogMetricOriginMetadata, EventMetadata, Metric, MetricKind, MetricTags,
1003            MetricValue,
1004        },
1005        metric_tags,
1006        metrics::AgentDDSketch,
1007    };
1008
1009    use super::{
1010        ddmetric_proto, encode_proto_key_and_message, encode_tags, encode_timestamp,
1011        generate_series_metrics, get_compressor, get_sketch_payload_sketches_field_number,
1012        max_compression_overhead_len, max_uncompressed_header_len, series_to_proto_message,
1013        sketch_to_proto_message, validate_payload_size_limits, write_payload_footer,
1014        write_payload_header, DatadogMetricsEncoder, EncoderError,
1015    };
1016    use crate::{
1017        common::datadog::DatadogMetricType,
1018        sinks::datadog::metrics::{
1019            config::{DatadogMetricsEndpoint, SeriesApiVersion},
1020            encoder::{DEFAULT_DD_ORIGIN_PRODUCT_VALUE, ORIGIN_PRODUCT_VALUE},
1021        },
1022    };
1023
1024    fn get_simple_counter() -> Metric {
1025        let value = MetricValue::Counter { value: 3.14 };
1026        Metric::new("basic_counter", MetricKind::Incremental, value).with_timestamp(Some(ts()))
1027    }
1028
1029    fn get_simple_counter_with_metadata(metadata: EventMetadata) -> Metric {
1030        let value = MetricValue::Counter { value: 3.14 };
1031        Metric::new_with_metadata("basic_counter", MetricKind::Incremental, value, metadata)
1032            .with_timestamp(Some(ts()))
1033    }
1034
1035    fn get_simple_rate_counter(value: f64, interval_ms: u32) -> Metric {
1036        let value = MetricValue::Counter { value };
1037        Metric::new("basic_counter", MetricKind::Incremental, value)
1038            .with_timestamp(Some(ts()))
1039            .with_interval_ms(NonZeroU32::new(interval_ms))
1040    }
1041
1042    fn get_simple_sketch() -> Metric {
1043        let mut ddsketch = AgentDDSketch::with_agent_defaults();
1044        ddsketch.insert(3.14);
1045        Metric::new("basic_counter", MetricKind::Incremental, ddsketch.into())
1046            .with_timestamp(Some(ts()))
1047    }
1048
1049    fn get_compressed_empty_series_payload() -> Bytes {
1050        let mut compressor = get_compressor();
1051
1052        _ = write_payload_header(
1053            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1054            &mut compressor,
1055        )
1056        .expect("should not fail");
1057        _ = write_payload_footer(
1058            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1059            &mut compressor,
1060        )
1061        .expect("should not fail");
1062
1063        compressor.finish().expect("should not fail").freeze()
1064    }
1065
1066    fn get_compressed_empty_sketches_payload() -> Bytes {
1067        get_compressor().finish().expect("should not fail").freeze()
1068    }
1069
1070    fn decompress_payload(payload: Bytes) -> io::Result<Bytes> {
1071        let mut decompressor = ZlibDecoder::new(&payload[..]);
1072        let mut decompressed = BytesMut::new().writer();
1073        let result = copy(&mut decompressor, &mut decompressed);
1074        result.map(|_| decompressed.into_inner().freeze())
1075    }
1076
1077    fn ts() -> DateTime<Utc> {
1078        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
1079            .single()
1080            .and_then(|t| t.with_nanosecond(11))
1081            .expect("invalid timestamp")
1082    }
1083
1084    fn tags() -> MetricTags {
1085        metric_tags! {
1086            "normal_tag" => "value",
1087            "true_tag" => "true",
1088            "empty_tag" => TagValue::Bare,
1089            "multi_value" => "one",
1090            "multi_value" => "two",
1091        }
1092    }
1093
1094    fn encode_sketches_normal<B>(
1095        metrics: &[Metric],
1096        default_namespace: &Option<Arc<str>>,
1097        log_schema: &'static LogSchema,
1098        buf: &mut B,
1099    ) where
1100        B: BufMut,
1101    {
1102        let mut sketches = Vec::new();
1103        for metric in metrics {
1104            let MetricValue::Sketch { sketch } = metric.value() else {
1105                panic!("must be sketch")
1106            };
1107            match sketch {
1108                MetricSketch::AgentDDSketch(ddsketch) => {
1109                    if let Some(sketch) =
1110                        sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema, 14)
1111                    {
1112                        sketches.push(sketch);
1113                    }
1114                }
1115            }
1116        }
1117
1118        let sketch_payload = ddmetric_proto::SketchPayload {
1119            metadata: None,
1120            sketches,
1121        };
1122
1123        // Now try encoding this sketch payload, and then try to compress it.
1124        sketch_payload.encode(buf).unwrap()
1125    }
1126
1127    #[test]
1128    fn test_encode_tags() {
1129        assert_eq!(
1130            encode_tags(&tags()),
1131            vec![
1132                "empty_tag",
1133                "multi_value:one",
1134                "multi_value:two",
1135                "normal_tag:value",
1136                "true_tag:true",
1137            ]
1138        );
1139    }
1140
1141    #[test]
1142    fn test_encode_timestamp() {
1143        assert_eq!(encode_timestamp(None), Utc::now().timestamp());
1144        assert_eq!(encode_timestamp(Some(ts())), 1542182950);
1145    }
1146
1147    #[test]
1148    fn incorrect_metric_for_endpoint_causes_error() {
1149        // Series metrics can't go to the sketches endpoint.
1150        let mut sketch_encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
1151            .expect("default payload size limits should be valid");
1152        let series_result = sketch_encoder.try_encode(get_simple_counter());
1153        assert!(matches!(
1154            series_result.err(),
1155            Some(EncoderError::InvalidMetric { .. })
1156        ));
1157
1158        // And sketches can't go to the series endpoint.
1159        let mut series_v1_encoder =
1160            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None)
1161                .expect("default payload size limits should be valid");
1162        let sketch_result = series_v1_encoder.try_encode(get_simple_sketch());
1163        assert!(matches!(
1164            sketch_result.err(),
1165            Some(EncoderError::InvalidMetric { .. })
1166        ));
1167
1168        let mut series_v2_encoder =
1169            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None)
1170                .expect("default payload size limits should be valid");
1171        let sketch_result = series_v2_encoder.try_encode(get_simple_sketch());
1172        assert!(matches!(
1173            sketch_result.err(),
1174            Some(EncoderError::InvalidMetric { .. })
1175        ));
1176    }
1177
1178    #[test]
1179    fn encode_counter_with_interval_as_rate() {
1180        // When a counter explicitly has an interval, we need to encode it as a rate. This means
1181        // dividing the value by the interval (in seconds) and setting the metric type so that when
1182        // it lands on the DD side, they can multiply the value by the interval (in seconds) and get
1183        // back the correct total value for that time period.
1184
1185        let value = 423.1331;
1186        let interval_ms = 10000;
1187        let rate_counter = get_simple_rate_counter(value, interval_ms);
1188        let expected_value = value / (interval_ms / 1000) as f64;
1189        let expected_interval = interval_ms / 1000;
1190
1191        // series v1
1192        {
1193            // Encode the metric and make sure we did the rate conversion correctly.
1194            let result = generate_series_metrics(
1195                &rate_counter,
1196                &None,
1197                log_schema(),
1198                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1199            );
1200            assert!(result.is_ok());
1201
1202            let metrics = result.unwrap();
1203            assert_eq!(metrics.len(), 1);
1204
1205            let actual = &metrics[0];
1206            assert_eq!(actual.r#type, DatadogMetricType::Rate);
1207            assert_eq!(actual.interval, Some(expected_interval));
1208            assert_eq!(actual.points.len(), 1);
1209            assert_eq!(actual.points[0].1, expected_value);
1210        }
1211
1212        // series v2
1213        {
1214            let series_proto = series_to_proto_message(
1215                &rate_counter,
1216                &None,
1217                log_schema(),
1218                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1219            )
1220            .unwrap();
1221            assert_eq!(series_proto.r#type, 2);
1222            assert_eq!(series_proto.interval, expected_interval as i64);
1223            assert_eq!(series_proto.points.len(), 1);
1224            assert_eq!(series_proto.points[0].value, expected_value);
1225        }
1226    }
1227
1228    #[test]
1229    fn encode_non_rate_metric_with_interval() {
1230        // It is possible that the Agent sends Gauges with an interval set. This
1231        // Occurs when the origin of the metric is Dogstatsd, where the interval
1232        // is set to 10.
1233
1234        let value = 423.1331;
1235        let interval_ms = 10000;
1236
1237        let gauge = Metric::new(
1238            "basic_gauge",
1239            MetricKind::Incremental,
1240            MetricValue::Gauge { value },
1241        )
1242        .with_timestamp(Some(ts()))
1243        .with_interval_ms(NonZeroU32::new(interval_ms));
1244
1245        let expected_value = value; // For gauge, the value should not be modified by interval
1246        let expected_interval = interval_ms / 1000;
1247
1248        // series v1
1249        {
1250            // Encode the metric and make sure we did the rate conversion correctly.
1251            let result = generate_series_metrics(
1252                &gauge,
1253                &None,
1254                log_schema(),
1255                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1256            );
1257            assert!(result.is_ok());
1258
1259            let metrics = result.unwrap();
1260            assert_eq!(metrics.len(), 1);
1261
1262            let actual = &metrics[0];
1263            assert_eq!(actual.r#type, DatadogMetricType::Gauge);
1264            assert_eq!(actual.interval, Some(expected_interval));
1265            assert_eq!(actual.points.len(), 1);
1266            assert_eq!(actual.points[0].1, expected_value);
1267        }
1268
1269        // series v2
1270        {
1271            let series_proto = series_to_proto_message(
1272                &gauge,
1273                &None,
1274                log_schema(),
1275                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1276            )
1277            .unwrap();
1278            assert_eq!(series_proto.r#type, 3);
1279            assert_eq!(series_proto.interval, expected_interval as i64);
1280            assert_eq!(series_proto.points.len(), 1);
1281            assert_eq!(series_proto.points[0].value, expected_value);
1282        }
1283    }
1284
1285    #[test]
1286    fn encode_origin_metadata_pass_through() {
1287        let product = 10;
1288        let category = 11;
1289        let service = 9;
1290
1291        let event_metadata = EventMetadata::default().with_origin_metadata(
1292            DatadogMetricOriginMetadata::new(Some(product), Some(category), Some(service)),
1293        );
1294        let counter = get_simple_counter_with_metadata(event_metadata);
1295
1296        // series v1
1297        {
1298            let result = generate_series_metrics(
1299                &counter,
1300                &None,
1301                log_schema(),
1302                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1303            );
1304            assert!(result.is_ok());
1305
1306            let metrics = result.unwrap();
1307            assert_eq!(metrics.len(), 1);
1308
1309            let actual = &metrics[0];
1310            let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap();
1311
1312            assert_eq!(generated_origin.product().unwrap(), product);
1313            assert_eq!(generated_origin.category().unwrap(), category);
1314            assert_eq!(generated_origin.service().unwrap(), service);
1315        }
1316        // series v2
1317        {
1318            let series_proto = series_to_proto_message(
1319                &counter,
1320                &None,
1321                log_schema(),
1322                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1323            )
1324            .unwrap();
1325
1326            let generated_origin = series_proto.metadata.unwrap().origin.unwrap();
1327            assert_eq!(generated_origin.origin_product, product);
1328            assert_eq!(generated_origin.origin_category, category);
1329            assert_eq!(generated_origin.origin_service, service);
1330        }
1331    }
1332
1333    #[test]
1334    fn encode_origin_metadata_vector_sourced() {
1335        let product = *ORIGIN_PRODUCT_VALUE;
1336
1337        let category = 11;
1338        let service = 153;
1339
1340        let mut counter = get_simple_counter();
1341
1342        counter.metadata_mut().set_source_type("statsd");
1343
1344        // series v1
1345        {
1346            let result = generate_series_metrics(&counter, &None, log_schema(), product);
1347            assert!(result.is_ok());
1348
1349            let metrics = result.unwrap();
1350            assert_eq!(metrics.len(), 1);
1351
1352            let actual = &metrics[0];
1353            let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap();
1354
1355            assert_eq!(generated_origin.product().unwrap(), product);
1356            assert_eq!(generated_origin.category().unwrap(), category);
1357            assert_eq!(generated_origin.service().unwrap(), service);
1358        }
1359        // series v2
1360        {
1361            let series_proto = series_to_proto_message(
1362                &counter,
1363                &None,
1364                log_schema(),
1365                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1366            )
1367            .unwrap();
1368
1369            let generated_origin = series_proto.metadata.unwrap().origin.unwrap();
1370            assert_eq!(generated_origin.origin_product, product);
1371            assert_eq!(generated_origin.origin_category, category);
1372            assert_eq!(generated_origin.origin_service, service);
1373        }
1374    }
1375
1376    #[test]
1377    fn encode_single_series_v1_metric_with_default_limits() {
1378        // This is a simple test where we ensure that a single metric, with the default limits, can
1379        // be encoded without hitting any errors.
1380        let mut encoder =
1381            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None)
1382                .expect("default payload size limits should be valid");
1383        let counter = get_simple_counter();
1384        let expected = counter.clone();
1385
1386        // Encode the counter.
1387        let result = encoder.try_encode(counter);
1388        assert!(result.is_ok());
1389        assert_eq!(result.unwrap(), None);
1390
1391        // Finish the payload, make sure we got what we came for.
1392        let result = encoder.finish();
1393        assert!(result.is_ok());
1394
1395        let (_payload, mut processed) = result.unwrap();
1396        assert_eq!(processed.len(), 1);
1397        assert_eq!(expected, processed.pop().unwrap());
1398    }
1399
1400    #[test]
1401    fn encode_single_series_v2_metric_with_default_limits() {
1402        // This is a simple test where we ensure that a single metric, with the default limits, can
1403        // be encoded without hitting any errors.
1404        let mut encoder =
1405            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None)
1406                .expect("default payload size limits should be valid");
1407        let counter = get_simple_counter();
1408        let expected = counter.clone();
1409
1410        // Encode the counter.
1411        let result = encoder.try_encode(counter);
1412        assert!(result.is_ok());
1413        assert_eq!(result.unwrap(), None);
1414
1415        // Finish the payload, make sure we got what we came for.
1416        let result = encoder.finish();
1417        assert!(result.is_ok());
1418
1419        let (_payload, mut processed) = result.unwrap();
1420        assert_eq!(processed.len(), 1);
1421        assert_eq!(expected, processed.pop().unwrap());
1422    }
1423
1424    #[test]
1425    fn encode_single_sketch_metric_with_default_limits() {
1426        // This is a simple test where we ensure that a single metric, with the default limits, can
1427        // be encoded without hitting any errors.
1428        let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
1429            .expect("default payload size limits should be valid");
1430        let sketch = get_simple_sketch();
1431        let expected = sketch.clone();
1432
1433        // Encode the sketch.
1434        let result = encoder.try_encode(sketch);
1435        assert!(result.is_ok());
1436        assert_eq!(result.unwrap(), None);
1437
1438        // Finish the payload, make sure we got what we came for.
1439        let result = encoder.finish();
1440        assert!(result.is_ok());
1441
1442        let (_payload, mut processed) = result.unwrap();
1443        assert_eq!(processed.len(), 1);
1444        assert_eq!(expected, processed.pop().unwrap());
1445    }
1446
1447    #[test]
1448    fn encode_empty_sketch() {
1449        // This is a simple test where we ensure that a single metric, with the default limits, can
1450        // be encoded without hitting any errors.
1451        let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
1452            .expect("default payload size limits should be valid");
1453        let sketch = Metric::new(
1454            "empty",
1455            MetricKind::Incremental,
1456            AgentDDSketch::with_agent_defaults().into(),
1457        )
1458        .with_timestamp(Some(ts()));
1459        let expected = sketch.clone();
1460
1461        // Encode the sketch.
1462        let result = encoder.try_encode(sketch);
1463        assert!(result.is_ok());
1464        assert_eq!(result.unwrap(), None);
1465
1466        // Finish the payload, make sure we got what we came for.
1467        let result = encoder.finish();
1468        assert!(result.is_ok());
1469
1470        let (_payload, mut processed) = result.unwrap();
1471        assert_eq!(processed.len(), 1);
1472        assert_eq!(expected, processed.pop().unwrap());
1473    }
1474
1475    #[test]
1476    fn encode_multiple_sketch_metrics_normal_vs_incremental() {
1477        // This tests our incremental sketch encoding against the more straightforward approach of
1478        // just building/encoding a full `SketchPayload` message.
1479        let metrics = vec![
1480            get_simple_sketch(),
1481            get_simple_sketch(),
1482            get_simple_sketch(),
1483        ];
1484
1485        let mut normal_buf = Vec::new();
1486        encode_sketches_normal(&metrics, &None, log_schema(), &mut normal_buf);
1487
1488        let mut incremental_buf = Vec::new();
1489        for metric in &metrics {
1490            match metric.value() {
1491                MetricValue::Sketch { sketch } => match sketch {
1492                    MetricSketch::AgentDDSketch(ddsketch) => {
1493                        if let Some(sketch_proto) =
1494                            sketch_to_proto_message(metric, ddsketch, &None, log_schema(), 14)
1495                        {
1496                            encode_proto_key_and_message(
1497                                sketch_proto,
1498                                get_sketch_payload_sketches_field_number(),
1499                                &mut incremental_buf,
1500                            )
1501                            .unwrap();
1502                        }
1503                    }
1504                },
1505                _ => panic!("should be a sketch"),
1506            }
1507        }
1508
1509        assert_eq!(normal_buf, incremental_buf);
1510    }
1511
1512    #[test]
1513    fn payload_size_limits_series() {
1514        // Get the maximum length of the header/trailer data.
1515        let header_len = max_uncompressed_header_len();
1516
1517        // This is too small.
1518        let result = validate_payload_size_limits(
1519            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1520            header_len,
1521            usize::MAX,
1522        );
1523        assert_eq!(result, None);
1524
1525        // This is just right.
1526        let result = validate_payload_size_limits(
1527            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1528            header_len + 1,
1529            usize::MAX,
1530        );
1531        assert_eq!(result, Some((header_len + 1, usize::MAX)));
1532
1533        // Get the maximum compressed overhead length, based on our input uncompressed size.  This
1534        // represents the worst case overhead based on the input data (of length usize::MAX, in this
1535        // case) being entirely incompressible.
1536        let compression_overhead_len = max_compression_overhead_len(usize::MAX);
1537
1538        // This is too small.
1539        let result = validate_payload_size_limits(
1540            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1541            usize::MAX,
1542            compression_overhead_len,
1543        );
1544        assert_eq!(result, None);
1545
1546        // This is just right.
1547        let result = validate_payload_size_limits(
1548            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1549            usize::MAX,
1550            compression_overhead_len + 1,
1551        );
1552        assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1)));
1553    }
1554
1555    #[test]
1556    fn payload_size_limits_sketches() {
1557        // There's no lower bound on uncompressed size for the sketches payload.
1558        let result = validate_payload_size_limits(DatadogMetricsEndpoint::Sketches, 0, usize::MAX);
1559        assert_eq!(result, Some((0, usize::MAX)));
1560
1561        // Get the maximum compressed overhead length, based on our input uncompressed size.  This
1562        // represents the worst case overhead based on the input data (of length usize::MAX, in this
1563        // case) being entirely incompressible.
1564        let compression_overhead_len = max_compression_overhead_len(usize::MAX);
1565
1566        // This is too small.
1567        let result = validate_payload_size_limits(
1568            DatadogMetricsEndpoint::Sketches,
1569            usize::MAX,
1570            compression_overhead_len,
1571        );
1572        assert_eq!(result, None);
1573
1574        // This is just right.
1575        let result = validate_payload_size_limits(
1576            DatadogMetricsEndpoint::Sketches,
1577            usize::MAX,
1578            compression_overhead_len + 1,
1579        );
1580        assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1)));
1581    }
1582
1583    #[test]
1584    fn encode_series_breaks_out_when_limit_reached_uncompressed() {
1585        // We manually create the encoder with an arbitrarily low "uncompressed" limit but high
1586        // "compressed" limit to exercise the codepath that should avoid encoding a metric when the
1587        // uncompressed payload would exceed the limit.
1588        let header_len = max_uncompressed_header_len();
1589        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1590            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1591            None,
1592            header_len + 1,
1593            usize::MAX,
1594        )
1595        .expect("payload size limits should be valid");
1596
1597        // Trying to encode a metric that would cause us to exceed our uncompressed limits will
1598        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1599        // as it could not be added.
1600        let counter = get_simple_counter();
1601        let result = encoder.try_encode(counter.clone());
1602        assert!(result.is_ok());
1603        assert_eq!(result.unwrap(), Some(counter));
1604
1605        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1606        // this payload, but it will be empty (effectively, the header/footer will exist) and no
1607        // processed metrics should be returned.
1608        let result = encoder.finish();
1609        assert!(result.is_ok());
1610
1611        let (payload, processed) = result.unwrap();
1612        assert_eq!(
1613            payload.uncompressed_byte_size,
1614            max_uncompressed_header_len()
1615        );
1616        assert_eq!(
1617            payload.into_payload(),
1618            get_compressed_empty_series_payload()
1619        );
1620        assert_eq!(processed.len(), 0);
1621    }
1622
1623    #[test]
1624    fn encode_sketches_breaks_out_when_limit_reached_uncompressed() {
1625        // We manually create the encoder with an arbitrarily low "uncompressed" limit but high
1626        // "compressed" limit to exercise the codepath that should avoid encoding a metric when the
1627        // uncompressed payload would exceed the limit.
1628        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1629            DatadogMetricsEndpoint::Sketches,
1630            None,
1631            1,
1632            usize::MAX,
1633        )
1634        .expect("payload size limits should be valid");
1635
1636        // Trying to encode a metric that would cause us to exceed our uncompressed limits will
1637        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1638        // as it could not be added.
1639        let sketch = get_simple_sketch();
1640        let result = encoder.try_encode(sketch.clone());
1641        assert!(result.is_ok());
1642        assert_eq!(result.unwrap(), Some(sketch));
1643
1644        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1645        // this payload, but it will be empty and no processed metrics should be returned.
1646        let result = encoder.finish();
1647        assert!(result.is_ok());
1648
1649        let (payload, processed) = result.unwrap();
1650        assert_eq!(payload.uncompressed_byte_size, 0);
1651        assert_eq!(
1652            payload.into_payload(),
1653            get_compressed_empty_sketches_payload()
1654        );
1655        assert_eq!(processed.len(), 0);
1656    }
1657
1658    #[test]
1659    fn encode_series_breaks_out_when_limit_reached_compressed() {
1660        // We manually create the encoder with an arbitrarily low "compressed" limit but high
1661        // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the
1662        // compressed payload would exceed the limit.
1663        let uncompressed_limit = 128;
1664        let compressed_limit = 32;
1665        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1666            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1667            None,
1668            uncompressed_limit,
1669            compressed_limit,
1670        )
1671        .expect("payload size limits should be valid");
1672
1673        // Trying to encode a metric that would cause us to exceed our compressed limits will
1674        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1675        // as it could not be added.
1676        let counter = get_simple_counter();
1677        let result = encoder.try_encode(counter.clone());
1678        assert!(result.is_ok());
1679        assert_eq!(result.unwrap(), Some(counter));
1680
1681        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1682        // this payload, but it will be empty (effectively, the header/footer will exist) and no
1683        // processed metrics should be returned.
1684        let result = encoder.finish();
1685        assert!(result.is_ok());
1686
1687        let (payload, processed) = result.unwrap();
1688        assert_eq!(
1689            payload.uncompressed_byte_size,
1690            max_uncompressed_header_len()
1691        );
1692        assert_eq!(
1693            payload.into_payload(),
1694            get_compressed_empty_series_payload()
1695        );
1696        assert_eq!(processed.len(), 0);
1697    }
1698
1699    #[test]
1700    fn encode_sketches_breaks_out_when_limit_reached_compressed() {
1701        // We manually create the encoder with an arbitrarily low "compressed" limit but high
1702        // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the
1703        // compressed payload would exceed the limit.
1704        let uncompressed_limit = 128;
1705        let compressed_limit = 16;
1706        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1707            DatadogMetricsEndpoint::Sketches,
1708            None,
1709            uncompressed_limit,
1710            compressed_limit,
1711        )
1712        .expect("payload size limits should be valid");
1713
1714        // Trying to encode a metric that would cause us to exceed our compressed limits will
1715        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1716        // as it could not be added.
1717        let sketch = get_simple_sketch();
1718        let result = encoder.try_encode(sketch.clone());
1719        assert!(result.is_ok());
1720        assert_eq!(result.unwrap(), Some(sketch));
1721
1722        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1723        // this payload, but it will be empty (effectively, the header/footer will exist) and no
1724        // processed metrics should be returned.
1725        let result = encoder.finish();
1726        assert!(result.is_ok());
1727
1728        let (payload, processed) = result.unwrap();
1729        assert_eq!(payload.uncompressed_byte_size, 0);
1730        assert_eq!(
1731            payload.into_payload(),
1732            get_compressed_empty_sketches_payload()
1733        );
1734        assert_eq!(processed.len(), 0);
1735    }
1736
1737    fn arb_counter_metric() -> impl Strategy<Value = Metric> {
1738        let name = string_regex("[a-zA-Z][a-zA-Z0-9_]{8,96}").expect("regex should not be invalid");
1739        let value = ARB_POSITIVE_F64;
1740        let tags = btree_map(
1741            any::<u64>().prop_map(|v| v.to_string()),
1742            any::<u64>().prop_map(|v| v.to_string()),
1743            0..64,
1744        )
1745        .prop_map(|tags| (!tags.is_empty()).then(|| MetricTags::from(tags)));
1746
1747        (name, value, tags).prop_map(|(metric_name, metric_value, metric_tags)| {
1748            let metric_value = MetricValue::Counter {
1749                value: metric_value,
1750            };
1751            Metric::new(metric_name, MetricKind::Incremental, metric_value).with_tags(metric_tags)
1752        })
1753    }
1754
1755    proptest! {
1756        #[test]
1757        fn encoding_check_for_payload_limit_edge_cases(
1758            uncompressed_limit in 0..64_000_000usize,
1759            compressed_limit in 0..10_000_000usize,
1760            metric in arb_counter_metric(),
1761        ) {
1762            // We simply try to encode a single metric into an encoder, and make sure that when we
1763            // finish the payload, if it didn't result in an error, that the payload was under the
1764            // configured limits.
1765            //
1766            // We check this with targeted unit tests as well but this is some cheap insurance to
1767            // show that we're hopefully not missing any particular corner cases.
1768            let result = DatadogMetricsEncoder::with_payload_limits(
1769                DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1770                None,
1771                uncompressed_limit,
1772                compressed_limit,
1773            );
1774            if let Ok(mut encoder) = result {
1775                _ = encoder.try_encode(metric);
1776
1777                if let Ok((payload, _processed)) = encoder.finish() {
1778                    let payload = payload.into_payload();
1779                    prop_assert!(payload.len() <= compressed_limit);
1780
1781                    let result = decompress_payload(payload);
1782                    prop_assert!(result.is_ok());
1783
1784                    let decompressed = result.unwrap();
1785                    prop_assert!(decompressed.len() <= uncompressed_limit);
1786                }
1787            }
1788        }
1789    }
1790}