vector/sinks/datadog/metrics/
encoder.rs

1use std::{
2    cmp,
3    io::{self, Write},
4    mem,
5    sync::{Arc, LazyLock, OnceLock},
6};
7
8use bytes::{BufMut, Bytes};
9use chrono::{DateTime, Utc};
10use snafu::{ResultExt, Snafu};
11use vector_lib::{
12    EstimatedJsonEncodedSizeOf,
13    config::{LogSchema, log_schema, telemetry},
14    event::{DatadogMetricOriginMetadata, Metric, MetricTags, MetricValue, metric::MetricSketch},
15    metrics::AgentDDSketch,
16    request_metadata::GroupedCountByteSize,
17};
18
19use super::config::{DatadogMetricsEndpoint, SeriesApiVersion};
20use crate::{
21    common::datadog::{
22        DatadogMetricType, DatadogPoint, DatadogSeriesMetric, DatadogSeriesMetricMetadata,
23    },
24    proto::fds::protobuf_descriptors,
25    sinks::util::{Compression, Compressor, encode_namespace, request_builder::EncodeResult},
26};
27
28const SERIES_PAYLOAD_HEADER: &[u8] = b"{\"series\":[";
29const SERIES_PAYLOAD_FOOTER: &[u8] = b"]}";
30const SERIES_PAYLOAD_DELIMITER: &[u8] = b",";
31
32pub(super) const ORIGIN_CATEGORY_VALUE: u32 = 11;
33
34const DEFAULT_DD_ORIGIN_PRODUCT_VALUE: u32 = 14;
35
36pub(super) static ORIGIN_PRODUCT_VALUE: LazyLock<u32> = LazyLock::new(|| {
37    option_env!("DD_ORIGIN_PRODUCT")
38        .map(|p| {
39            p.parse::<u32>()
40                .expect("Env var DD_ORIGIN_PRODUCT must be an unsigned 32 bit integer.")
41        })
42        .unwrap_or(DEFAULT_DD_ORIGIN_PRODUCT_VALUE)
43});
44
45#[allow(warnings, clippy::pedantic, clippy::nursery)]
46mod ddmetric_proto {
47    include!(concat!(env!("OUT_DIR"), "/datadog.agentpayload.rs"));
48}
49
50#[derive(Debug, Snafu)]
51pub enum CreateError {
52    #[snafu(display("Invalid compressed/uncompressed payload size limits were given"))]
53    InvalidLimits,
54}
55
56impl CreateError {
57    /// Gets the telemetry-friendly string version of this error.
58    ///
59    /// The value will be a short string with only lowercase letters and underscores.
60    pub const fn as_error_type(&self) -> &'static str {
61        match self {
62            Self::InvalidLimits => "invalid_payload_limits",
63        }
64    }
65}
66
67#[derive(Debug, Snafu)]
68pub enum EncoderError {
69    #[snafu(display(
70        "Invalid metric value '{}' was given; '{}' expected",
71        metric_value,
72        expected
73    ))]
74    InvalidMetric {
75        expected: &'static str,
76        metric_value: &'static str,
77    },
78
79    #[snafu(
80        context(false),
81        display("Failed to encode series metric to JSON: {source}")
82    )]
83    JsonEncodingFailed { source: serde_json::Error },
84
85    // Currently, the only time `prost` ever emits `EncodeError` is when there is insufficient
86    // buffer capacity, so we don't need to hold on to the error, and we can just hardcode this.
87    #[snafu(display(
88        "Failed to encode sketch metric to Protocol Buffers: insufficient buffer capacity."
89    ))]
90    ProtoEncodingFailed,
91}
92
93impl EncoderError {
94    /// Gets the telemetry-friendly string version of this error.
95    ///
96    /// The value will be a short string with only lowercase letters and underscores.
97    pub const fn as_error_type(&self) -> &'static str {
98        match self {
99            Self::InvalidMetric { .. } => "invalid_metric",
100            Self::JsonEncodingFailed { .. } => "failed_to_encode_series",
101            Self::ProtoEncodingFailed => "failed_to_encode_sketch",
102        }
103    }
104}
105
106#[derive(Debug, Snafu)]
107pub enum FinishError {
108    #[snafu(display(
109        "Failure occurred during writing to or finalizing the compressor: {}",
110        source
111    ))]
112    CompressionFailed { source: io::Error },
113
114    #[snafu(display("Finished payload exceeded the (un)compressed size limits"))]
115    TooLarge {
116        metrics: Vec<Metric>,
117        recommended_splits: usize,
118    },
119}
120
121impl FinishError {
122    /// Gets the telemetry-friendly string version of this error.
123    ///
124    /// The value will be a short string with only lowercase letters and underscores.
125    pub const fn as_error_type(&self) -> &'static str {
126        match self {
127            Self::CompressionFailed { .. } => "compression_failed",
128            Self::TooLarge { .. } => "too_large",
129        }
130    }
131}
132
133struct EncoderState {
134    writer: Compressor,
135    written: usize,
136    buf: Vec<u8>,
137    processed: Vec<Metric>,
138    byte_size: GroupedCountByteSize,
139}
140
141impl Default for EncoderState {
142    fn default() -> Self {
143        Self {
144            writer: get_compressor(),
145            written: 0,
146            buf: Vec::with_capacity(1024),
147            processed: Vec::new(),
148            byte_size: telemetry().create_request_count_byte_size(),
149        }
150    }
151}
152
153pub struct DatadogMetricsEncoder {
154    endpoint: DatadogMetricsEndpoint,
155    default_namespace: Option<Arc<str>>,
156    uncompressed_limit: usize,
157    compressed_limit: usize,
158
159    state: EncoderState,
160    log_schema: &'static LogSchema,
161
162    origin_product_value: u32,
163}
164
165impl DatadogMetricsEncoder {
166    /// Creates a new `DatadogMetricsEncoder` for the given endpoint.
167    pub fn new(
168        endpoint: DatadogMetricsEndpoint,
169        default_namespace: Option<String>,
170    ) -> Result<Self, CreateError> {
171        let payload_limits = endpoint.payload_limits();
172        Self::with_payload_limits(
173            endpoint,
174            default_namespace,
175            payload_limits.uncompressed,
176            payload_limits.compressed,
177        )
178    }
179
180    /// Creates a new `DatadogMetricsEncoder` for the given endpoint, with specific payload limits.
181    pub fn with_payload_limits(
182        endpoint: DatadogMetricsEndpoint,
183        default_namespace: Option<String>,
184        uncompressed_limit: usize,
185        compressed_limit: usize,
186    ) -> Result<Self, CreateError> {
187        let (uncompressed_limit, compressed_limit) =
188            validate_payload_size_limits(endpoint, uncompressed_limit, compressed_limit)
189                .ok_or(CreateError::InvalidLimits)?;
190
191        Ok(Self {
192            endpoint,
193            default_namespace: default_namespace.map(Arc::from),
194            uncompressed_limit,
195            compressed_limit,
196            state: EncoderState::default(),
197            log_schema: log_schema(),
198            origin_product_value: *ORIGIN_PRODUCT_VALUE,
199        })
200    }
201}
202
203impl DatadogMetricsEncoder {
204    fn reset_state(&mut self) -> EncoderState {
205        mem::take(&mut self.state)
206    }
207
208    fn encode_single_metric(&mut self, metric: Metric) -> Result<Option<Metric>, EncoderError> {
209        // We take special care in this method to capture errors which are not indicative of the
210        // metric itself causing a failure in order to be able to return the metric to the caller.
211        // The contract of the encoder is such that when an encoding attempt fails for normal
212        // reasons, like being out of room, we give back the metric so the caller can finalize the
213        // previously encoded metrics and then reset and try again to encode.
214        //
215        // If the encoder is in a persistent bad state, they'll get back a proper error when calling
216        // `finish`, so they eventually get an error, we just make sure they can tidy up before that
217        // and avoid needlessly dropping metrics due to unrelated errors.
218
219        // Clear our temporary buffer before any encoding.
220        self.state.buf.clear();
221
222        self.state
223            .byte_size
224            .add_event(&metric, metric.estimated_json_encoded_size_of());
225
226        // For V2 Series metrics, and Sketches: We encode a single Series or Sketch metric incrementally,
227        // which means that we specifically write it as if we were writing a single field entry in the
228        // overall `SketchPayload` message or `MetricPayload` type.
229        //
230        // By doing so, we can encode multiple metrics and concatenate all the buffers, and have the
231        // resulting buffer appear as if it's a normal `<>Payload` message with a bunch of repeats
232        // of the `sketches` / `series` field.
233        //
234        // Crucially, this code works because `SketchPayload` has two fields -- metadata and sketches --
235        // and we never actually set the metadata field... so the resulting message generated overall
236        // for `SketchPayload` with a single sketch looks just like as if we literally wrote out a
237        // single value for the given field.
238        //
239        // Similarly, `MetricPayload` has a single repeated `series` field.
240
241        match self.endpoint {
242            // V1 Series metrics are encoded via JSON, in an incremental fashion.
243            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => {
244                // A single `Metric` might generate multiple Datadog series metrics.
245                let all_series = generate_series_metrics(
246                    &metric,
247                    &self.default_namespace,
248                    self.log_schema,
249                    self.origin_product_value,
250                )?;
251
252                // We handle adding the JSON array separator (comma) manually since the encoding is
253                // happening incrementally.
254                let has_processed = !self.state.processed.is_empty();
255                for (i, series) in all_series.iter().enumerate() {
256                    // Add a array delimiter if we already have other metrics encoded.
257                    if (has_processed || i > 0)
258                        && write_payload_delimiter(self.endpoint, &mut self.state.buf).is_err()
259                    {
260                        return Ok(Some(metric));
261                    }
262                    serde_json::to_writer(&mut self.state.buf, series)?;
263                }
264            }
265            // V2 Series metrics are encoded via ProtoBuf, in an incremental fashion.
266            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2) => match metric.value() {
267                MetricValue::Counter { .. }
268                | MetricValue::Gauge { .. }
269                | MetricValue::Set { .. }
270                | MetricValue::AggregatedSummary { .. } => {
271                    let series_proto = series_to_proto_message(
272                        &metric,
273                        &self.default_namespace,
274                        self.log_schema,
275                        self.origin_product_value,
276                    )?;
277
278                    encode_proto_key_and_message(
279                        series_proto,
280                        get_series_payload_series_field_number(),
281                        &mut self.state.buf,
282                    )?;
283                }
284                value => {
285                    return Err(EncoderError::InvalidMetric {
286                        expected: "series",
287                        metric_value: value.as_name(),
288                    });
289                }
290            },
291            // Sketches are encoded via ProtoBuf, also in an incremental fashion.
292            DatadogMetricsEndpoint::Sketches => match metric.value() {
293                MetricValue::Sketch { sketch } => match sketch {
294                    MetricSketch::AgentDDSketch(ddsketch) => {
295                        if let Some(sketch_proto) = sketch_to_proto_message(
296                            &metric,
297                            ddsketch,
298                            &self.default_namespace,
299                            self.log_schema,
300                            self.origin_product_value,
301                        ) {
302                            encode_proto_key_and_message(
303                                sketch_proto,
304                                get_sketch_payload_sketches_field_number(),
305                                &mut self.state.buf,
306                            )?;
307                        } else {
308                            // If the sketch was empty, that's fine too
309                        }
310                    }
311                },
312                value => {
313                    return Err(EncoderError::InvalidMetric {
314                        expected: "sketches",
315                        metric_value: value.as_name(),
316                    });
317                }
318            },
319        }
320
321        // Try and see if our temporary buffer can be written to the compressor.
322        match self.try_compress_buffer() {
323            Err(_) | Ok(false) => Ok(Some(metric)),
324            Ok(true) => {
325                self.state.processed.push(metric);
326                Ok(None)
327            }
328        }
329    }
330
331    fn try_compress_buffer(&mut self) -> io::Result<bool> {
332        let n = self.state.buf.len();
333
334        // If we're over our uncompressed size limit with this metric, inform the caller.
335        if self.state.written + n > self.uncompressed_limit {
336            return Ok(false);
337        }
338
339        // Calculating the compressed size is slightly more tricky, because we can only speculate
340        // about how many bytes it would take when compressed.  If we write into the compressor, we
341        // can't back out that write, even if we manually modify the underlying Vec<u8>, as the
342        // compressor might have internal state around checksums, etc, that can't be similarly
343        // rolled back.
344        //
345        // Our strategy is thus: simply take the encoded-but-decompressed size and see if it would
346        // fit within the compressed limit.  In `get_endpoint_payload_size_limits`, we calculate the
347        // expected maximum overhead of zlib based on all input data being incompressible, which
348        // zlib ensures will be the worst case as it can figure out whether a compressed or
349        // uncompressed block would take up more space _before_ choosing which strategy to go with.
350        //
351        // Thus, simply put, we've already accounted for the uncertainty by making our check here
352        // assume the worst case while our limits assume the worst case _overhead_.  Maybe our
353        // numbers are technically off in the end, but `finish` catches that for us, too.
354        let compressed_len = self.state.writer.get_ref().len();
355        let max_compressed_metric_len = n + max_compressed_overhead_len(n);
356        if compressed_len + max_compressed_metric_len > self.compressed_limit {
357            return Ok(false);
358        }
359
360        // We should be safe to write our holding buffer to the compressor and store the metric.
361        self.state.writer.write_all(&self.state.buf)?;
362        self.state.written += n;
363        Ok(true)
364    }
365
366    /// Attempts to encode a single metric into this encoder.
367    ///
368    /// For some metric types, the metric will be encoded immediately and we will attempt to
369    /// compress it.  For some other metric types, we will store the metric until `finish` is
370    /// called, due to the inability to incrementally encode them.
371    ///
372    /// If the metric could not be encoded into this encoder due to hitting the limits on the size
373    /// of the encoded/compressed payload, it will be returned via `Ok(Some(Metric))`, otherwise `Ok(None)`
374    /// will be returned.
375    ///
376    /// If `Ok(Some(Metric))` is returned, callers must call `finish` to finalize the payload.
377    /// Further calls to `try_encode` without first calling `finish` may or may not succeed.
378    ///
379    /// # Errors
380    ///
381    /// If an error is encountered while attempting to encode the metric, an error variant will be returned.
382    pub fn try_encode(&mut self, metric: Metric) -> Result<Option<Metric>, EncoderError> {
383        // Make sure we've written our header already.
384        if self.state.written == 0 {
385            match write_payload_header(self.endpoint, &mut self.state.writer) {
386                Ok(n) => self.state.written += n,
387                Err(_) => return Ok(Some(metric)),
388            }
389        }
390
391        self.encode_single_metric(metric)
392    }
393
394    pub fn finish(&mut self) -> Result<(EncodeResult<Bytes>, Vec<Metric>), FinishError> {
395        // Write any payload footer necessary for the configured endpoint.
396        let n = write_payload_footer(self.endpoint, &mut self.state.writer)
397            .context(CompressionFailedSnafu)?;
398        self.state.written += n;
399
400        let raw_bytes_written = self.state.written;
401        let byte_size = self.state.byte_size.clone();
402
403        // Consume the encoder state so we can do our final checks and return the necessary data.
404        let state = self.reset_state();
405        let payload = state
406            .writer
407            .finish()
408            .context(CompressionFailedSnafu)?
409            .freeze();
410        let processed = state.processed;
411
412        // We should have configured our limits such that if all calls to `try_compress_buffer` have
413        // succeeded up until this point, then our payload should be within the limits after writing
414        // the footer and finishing the compressor.
415        //
416        // We're not only double checking that here, but we're figuring out how much bigger than the
417        // limit the payload is, if it is indeed bigger, so that we can recommend how many splits
418        // should be used to bring the given set of metrics down to chunks that can be encoded
419        // without hitting the limits.
420        let compressed_splits = payload.len() / self.compressed_limit;
421        let uncompressed_splits = state.written / self.uncompressed_limit;
422        let recommended_splits = cmp::max(compressed_splits, uncompressed_splits) + 1;
423
424        if recommended_splits == 1 {
425            // "One" split means no splits needed: our payload didn't exceed either of the limits.
426            Ok((
427                EncodeResult::compressed(payload, raw_bytes_written, byte_size),
428                processed,
429            ))
430        } else {
431            Err(FinishError::TooLarge {
432                metrics: processed,
433                recommended_splits,
434            })
435        }
436    }
437}
438
439fn generate_proto_metadata(
440    maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
441    maybe_source_type: Option<&str>,
442    origin_product_value: u32,
443) -> Option<ddmetric_proto::Metadata> {
444    generate_origin_metadata(maybe_pass_through, maybe_source_type, origin_product_value).map(
445        |origin| {
446            if origin.product().is_none()
447                || origin.category().is_none()
448                || origin.service().is_none()
449            {
450                warn!(
451                    message = "Generated sketch origin metadata should have each field set.",
452                    product = origin.product(),
453                    category = origin.category(),
454                    service = origin.service()
455                );
456            }
457            ddmetric_proto::Metadata {
458                origin: Some(ddmetric_proto::Origin {
459                    origin_product: origin.product().unwrap_or_default(),
460                    origin_category: origin.category().unwrap_or_default(),
461                    origin_service: origin.service().unwrap_or_default(),
462                }),
463            }
464        },
465    )
466}
467
468fn get_sketch_payload_sketches_field_number() -> u32 {
469    static SKETCH_PAYLOAD_SKETCHES_FIELD_NUM: OnceLock<u32> = OnceLock::new();
470    *SKETCH_PAYLOAD_SKETCHES_FIELD_NUM.get_or_init(|| {
471        let descriptors = protobuf_descriptors();
472        let descriptor = descriptors
473            .get_message_by_name("datadog.agentpayload.SketchPayload")
474            .expect("should not fail to find `SketchPayload` message in descriptor pool");
475
476        descriptor
477            .get_field_by_name("sketches")
478            .map(|field| field.number())
479            .expect("`sketches` field must exist in `SketchPayload` message")
480    })
481}
482
483fn get_series_payload_series_field_number() -> u32 {
484    static SERIES_PAYLOAD_SERIES_FIELD_NUM: OnceLock<u32> = OnceLock::new();
485    *SERIES_PAYLOAD_SERIES_FIELD_NUM.get_or_init(|| {
486        let descriptors = protobuf_descriptors();
487        let descriptor = descriptors
488            .get_message_by_name("datadog.agentpayload.MetricPayload")
489            .expect("should not fail to find `MetricPayload` message in descriptor pool");
490
491        descriptor
492            .get_field_by_name("series")
493            .map(|field| field.number())
494            .expect("`series` field must exist in `MetricPayload` message")
495    })
496}
497
498fn sketch_to_proto_message(
499    metric: &Metric,
500    ddsketch: &AgentDDSketch,
501    default_namespace: &Option<Arc<str>>,
502    log_schema: &'static LogSchema,
503    origin_product_value: u32,
504) -> Option<ddmetric_proto::sketch_payload::Sketch> {
505    if ddsketch.is_empty() {
506        return None;
507    }
508
509    let name = get_namespaced_name(metric, default_namespace);
510    let ts = encode_timestamp(metric.timestamp());
511    let mut tags = metric.tags().cloned().unwrap_or_default();
512    let host = log_schema
513        .host_key()
514        .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
515        .unwrap_or_default();
516    let tags = encode_tags(&tags);
517
518    let cnt = ddsketch.count() as i64;
519    let min = ddsketch
520        .min()
521        .expect("min should be present for non-empty sketch");
522    let max = ddsketch
523        .max()
524        .expect("max should be present for non-empty sketch");
525    let avg = ddsketch
526        .avg()
527        .expect("avg should be present for non-empty sketch");
528    let sum = ddsketch
529        .sum()
530        .expect("sum should be present for non-empty sketch");
531
532    let (bins, counts) = ddsketch.bin_map().into_parts();
533    let k = bins.into_iter().map(Into::into).collect();
534    let n = counts.into_iter().map(Into::into).collect();
535
536    let event_metadata = metric.metadata();
537    let metadata = generate_proto_metadata(
538        event_metadata.datadog_origin_metadata(),
539        event_metadata.source_type(),
540        origin_product_value,
541    );
542
543    trace!(?metadata, "Generated sketch metadata.");
544
545    Some(ddmetric_proto::sketch_payload::Sketch {
546        metric: name,
547        tags,
548        host,
549        distributions: Vec::new(),
550        dogsketches: vec![ddmetric_proto::sketch_payload::sketch::Dogsketch {
551            ts,
552            cnt,
553            min,
554            max,
555            avg,
556            sum,
557            k,
558            n,
559        }],
560        metadata,
561    })
562}
563
564fn series_to_proto_message(
565    metric: &Metric,
566    default_namespace: &Option<Arc<str>>,
567    log_schema: &'static LogSchema,
568    origin_product_value: u32,
569) -> Result<ddmetric_proto::metric_payload::MetricSeries, EncoderError> {
570    let metric_name = get_namespaced_name(metric, default_namespace);
571    let mut tags = metric.tags().cloned().unwrap_or_default();
572
573    let mut resources = vec![];
574
575    if let Some(host) = log_schema
576        .host_key()
577        .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default())
578    {
579        resources.push(ddmetric_proto::metric_payload::Resource {
580            r#type: "host".to_string(),
581            name: host,
582        });
583    }
584
585    // In the `datadog_agent` source, the tag is added as `device` for the V1 endpoint
586    // and `resource.device` for the V2 endpoint.
587    if let Some(device) = tags.remove("device").or(tags.remove("resource.device")) {
588        resources.push(ddmetric_proto::metric_payload::Resource {
589            r#type: "device".to_string(),
590            name: device,
591        });
592    }
593
594    let source_type_name = tags.remove("source_type_name").unwrap_or_default();
595
596    let tags = encode_tags(&tags);
597
598    let event_metadata = metric.metadata();
599    let metadata = generate_proto_metadata(
600        event_metadata.datadog_origin_metadata(),
601        event_metadata.source_type(),
602        origin_product_value,
603    );
604    trace!(?metadata, "Generated MetricSeries metadata.");
605
606    let timestamp = encode_timestamp(metric.timestamp());
607
608    // our internal representation is in milliseconds but the expected output is in seconds
609    let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);
610
611    let (points, metric_type) = match metric.value() {
612        MetricValue::Counter { value } => {
613            if let Some(interval) = maybe_interval {
614                // When an interval is defined, it implies the value should be in a per-second form,
615                // so we need to get back to seconds from our milliseconds-based interval, and then
616                // divide our value by that amount as well.
617                let value = *value / (interval as f64);
618                (
619                    vec![ddmetric_proto::metric_payload::MetricPoint { value, timestamp }],
620                    ddmetric_proto::metric_payload::MetricType::Rate,
621                )
622            } else {
623                (
624                    vec![ddmetric_proto::metric_payload::MetricPoint {
625                        value: *value,
626                        timestamp,
627                    }],
628                    ddmetric_proto::metric_payload::MetricType::Count,
629                )
630            }
631        }
632        MetricValue::Set { values } => (
633            vec![ddmetric_proto::metric_payload::MetricPoint {
634                value: values.len() as f64,
635                timestamp,
636            }],
637            ddmetric_proto::metric_payload::MetricType::Gauge,
638        ),
639        MetricValue::Gauge { value } => (
640            vec![ddmetric_proto::metric_payload::MetricPoint {
641                value: *value,
642                timestamp,
643            }],
644            ddmetric_proto::metric_payload::MetricType::Gauge,
645        ),
646        // NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization
647        value => {
648            // this case should have already been surfaced by encode_single_metric() so this should never be reached
649            return Err(EncoderError::InvalidMetric {
650                expected: "series",
651                metric_value: value.as_name(),
652            });
653        }
654    };
655
656    Ok(ddmetric_proto::metric_payload::MetricSeries {
657        resources,
658        metric: metric_name,
659        tags,
660        points,
661        r#type: metric_type.into(),
662        // unit is omitted
663        unit: "".to_string(),
664        source_type_name,
665        interval: maybe_interval.unwrap_or(0) as i64,
666        metadata,
667    })
668}
669
670// Manually write the field tag and then encode the Message payload directly as a length-delimited message.
671fn encode_proto_key_and_message<T, B>(msg: T, tag: u32, buf: &mut B) -> Result<(), EncoderError>
672where
673    T: prost::Message,
674    B: BufMut,
675{
676    prost::encoding::encode_key(tag, prost::encoding::WireType::LengthDelimited, buf);
677
678    msg.encode_length_delimited(buf)
679        .map_err(|_| EncoderError::ProtoEncodingFailed)
680}
681
682fn get_namespaced_name(metric: &Metric, default_namespace: &Option<Arc<str>>) -> String {
683    encode_namespace(
684        metric
685            .namespace()
686            .or_else(|| default_namespace.as_ref().map(|s| s.as_ref())),
687        '.',
688        metric.name(),
689    )
690}
691
692fn encode_tags(tags: &MetricTags) -> Vec<String> {
693    let mut pairs: Vec<_> = tags
694        .iter_all()
695        .map(|(name, value)| match value {
696            Some(value) => format!("{name}:{value}"),
697            None => name.into(),
698        })
699        .collect();
700    pairs.sort();
701    pairs
702}
703
704fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
705    if let Some(ts) = timestamp {
706        ts.timestamp()
707    } else {
708        Utc::now().timestamp()
709    }
710}
711
712// Given the vector source type, return the OriginService value associated with that integration, if any.
713fn source_type_to_service(source_type: &str) -> Option<u32> {
714    match source_type {
715        // In order to preserve consistent behavior, we intentionally don't set origin metadata
716        // for the case where the Datadog Agent did not set it.
717        "datadog_agent" => None,
718
719        // These are the sources for which metrics truly originated from this Vector instance.
720        "apache_metrics" => Some(17),
721        "aws_ecs_metrics" => Some(209),
722        "eventstoredb_metrics" => Some(210),
723        "host_metrics" => Some(211),
724        "internal_metrics" => Some(212),
725        "mongodb_metrics" => Some(111),
726        "nginx_metrics" => Some(117),
727        "open_telemetry" => Some(213),
728        "postgresql_metrics" => Some(128),
729        "prometheus_remote_write" => Some(214),
730        "prometheus_scrape" => Some(215),
731        "statsd" => Some(153),
732
733        // These sources are only capable of receiving metrics with the `native` or `native_json` codec.
734        // Generally that means the Origin Metadata will have been set as a pass through.
735        // However, if the upstream Vector instance did not set Origin Metadata (for example if it is an
736        // older version version), we will at least set the OriginProduct and OriginCategory.
737        "kafka" | "nats" | "redis" | "gcp_pubsub" | "http_client" | "http_server" | "vector"
738        | "pulsar" => Some(0),
739
740        // This scenario should not occur- if it does it means we added a source that deals with metrics,
741        // and did not update this function.
742        // But if it does occur, by setting the Service value to be undefined, we at least populate the
743        // OriginProduct and OriginCategory.
744        _ => {
745            debug!(
746                "Source {source_type} OriginService value is undefined! This source needs to be properly mapped to a Service value."
747            );
748            Some(0)
749        }
750    }
751}
752
753/// Determine the correct Origin metadata values to use depending on if they have been
754/// set already upstream or not. The generalized struct `DatadogMetricOriginMetadata` is
755/// utilized in this function, which allows the series and sketch encoding to call and map
756/// the result appropriately for the given protocol they operate on.
757fn generate_origin_metadata(
758    maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
759    maybe_source_type: Option<&str>,
760    origin_product_value: u32,
761) -> Option<DatadogMetricOriginMetadata> {
762    let no_value = 0;
763
764    // An upstream vector source or a transform has set the origin metadata already.
765    // Currently this is only possible by these scenarios:
766    //     - `datadog_agent` source receiving the metadata on ingested metrics
767    //     - `vector` source receiving events with EventMetadata that already has the origins set
768    //     - A metrics source configured with the `native` or `native_json` codecs, where the upstream
769    //       Vector instance enriched the EventMetadata with Origin metadata.
770    //     - `log_to_metric` transform set the OriginService in the EventMetadata when it creates
771    //        the new metric.
772    if let Some(pass_through) = maybe_pass_through {
773        Some(DatadogMetricOriginMetadata::new(
774            pass_through.product().or(Some(origin_product_value)),
775            pass_through.category().or(Some(ORIGIN_CATEGORY_VALUE)),
776            pass_through.service().or(Some(no_value)),
777        ))
778
779    // No metadata has been set upstream
780    } else {
781        maybe_source_type.and_then(|source_type| {
782            // Only set the metadata if the source is a metric source we should set it for.
783            // In order to preserve consistent behavior, we intentionally don't set origin metadata
784            // for the case where the Datadog Agent did not set it.
785            source_type_to_service(source_type).map(|origin_service_value| {
786                DatadogMetricOriginMetadata::new(
787                    Some(origin_product_value),
788                    Some(ORIGIN_CATEGORY_VALUE),
789                    Some(origin_service_value),
790                )
791            })
792        })
793    }
794}
795
796fn generate_series_metadata(
797    maybe_pass_through: Option<&DatadogMetricOriginMetadata>,
798    maybe_source_type: Option<&str>,
799    origin_product_value: u32,
800) -> Option<DatadogSeriesMetricMetadata> {
801    generate_origin_metadata(maybe_pass_through, maybe_source_type, origin_product_value).map(
802        |origin| DatadogSeriesMetricMetadata {
803            origin: Some(origin),
804        },
805    )
806}
807
808fn generate_series_metrics(
809    metric: &Metric,
810    default_namespace: &Option<Arc<str>>,
811    log_schema: &'static LogSchema,
812    origin_product_value: u32,
813) -> Result<Vec<DatadogSeriesMetric>, EncoderError> {
814    let name = get_namespaced_name(metric, default_namespace);
815
816    let mut tags = metric.tags().cloned().unwrap_or_default();
817    let host = log_schema
818        .host_key()
819        .map(|key| tags.remove(key.to_string().as_str()).unwrap_or_default());
820
821    let source_type_name = tags.remove("source_type_name");
822    let device = tags.remove("device");
823    let ts = encode_timestamp(metric.timestamp());
824    let tags = Some(encode_tags(&tags));
825
826    // our internal representation is in milliseconds but the expected output is in seconds
827    let maybe_interval = metric.interval_ms().map(|i| i.get() / 1000);
828
829    let event_metadata = metric.metadata();
830    let metadata = generate_series_metadata(
831        event_metadata.datadog_origin_metadata(),
832        event_metadata.source_type(),
833        origin_product_value,
834    );
835
836    trace!(?metadata, "Generated series metadata.");
837
838    let (points, metric_type) = match metric.value() {
839        MetricValue::Counter { value } => {
840            if let Some(interval) = maybe_interval {
841                // When an interval is defined, it implies the value should be in a per-second form,
842                // so we need to get back to seconds from our milliseconds-based interval, and then
843                // divide our value by that amount as well.
844                let value = *value / (interval as f64);
845                (vec![DatadogPoint(ts, value)], DatadogMetricType::Rate)
846            } else {
847                (vec![DatadogPoint(ts, *value)], DatadogMetricType::Count)
848            }
849        }
850        MetricValue::Set { values } => (
851            vec![DatadogPoint(ts, values.len() as f64)],
852            DatadogMetricType::Gauge,
853        ),
854        MetricValue::Gauge { value } => (vec![DatadogPoint(ts, *value)], DatadogMetricType::Gauge),
855        // NOTE: AggregatedSummary will have been previously split into counters and gauges during normalization
856        value => {
857            return Err(EncoderError::InvalidMetric {
858                expected: "series",
859                metric_value: value.as_name(),
860            });
861        }
862    };
863
864    Ok(vec![DatadogSeriesMetric {
865        metric: name,
866        r#type: metric_type,
867        interval: maybe_interval,
868        points,
869        tags,
870        host,
871        source_type_name,
872        device,
873        metadata,
874    }])
875}
876
877fn get_compressor() -> Compressor {
878    // We use the "zlib default" compressor because it's all Datadog supports, and adding it
879    // generically to `Compression` would make things a little weird because of the conversion trait
880    // implementations that are also only none vs gzip.
881    Compression::zlib_default().into()
882}
883
884const fn max_uncompressed_header_len() -> usize {
885    SERIES_PAYLOAD_HEADER.len() + SERIES_PAYLOAD_FOOTER.len()
886}
887
888// Datadog ingest APIs accept zlib, which is what we're accounting for here. By default, zlib
889// has a 2 byte header and 4 byte CRC trailer. [1]
890//
891// [1] https://www.zlib.net/zlib_tech.html
892const ZLIB_HEADER_TRAILER: usize = 6;
893
894const fn max_compression_overhead_len(compressed_limit: usize) -> usize {
895    // We calculate the overhead as the zlib header/trailer plus the worst case overhead of
896    // compressing `compressed_limit` bytes, such that we assume all of the data we write may not be
897    // compressed at all.
898    ZLIB_HEADER_TRAILER + max_compressed_overhead_len(compressed_limit)
899}
900
901const fn max_compressed_overhead_len(len: usize) -> usize {
902    // Datadog ingest APIs accept zlib, which is what we're accounting for here.
903    //
904    // Deflate, the underlying compression algorithm, has a technique to ensure that input data
905    // can't be encoded in such a way where it's expanded by a meaningful amount. This technique
906    // allows storing blocks of uncompressed data with only 5 bytes of overhead per block.
907    // Technically, the blocks can be up to 65KB in Deflate, but modern zlib implementations use
908    // block sizes of 16KB. [1][2]
909    //
910    // We calculate the overhead of compressing a given `len` bytes as the worst case of that many
911    // bytes being written to the compressor and being unable to be compressed at all
912    //
913    // [1] https://www.zlib.net/zlib_tech.html
914    // [2] https://www.bolet.org/~pornin/deflate-flush-fr.html
915    const STORED_BLOCK_SIZE: usize = 16384;
916    (1 + len.saturating_sub(ZLIB_HEADER_TRAILER) / STORED_BLOCK_SIZE) * 5
917}
918
919const fn validate_payload_size_limits(
920    endpoint: DatadogMetricsEndpoint,
921    uncompressed_limit: usize,
922    compressed_limit: usize,
923) -> Option<(usize, usize)> {
924    if endpoint.is_series() {
925        // For series, we need to make sure the uncompressed limit can account for the header/footer
926        // we would add that wraps the encoded metrics up in the expected JSON object. This does
927        // imply that adding 1 to this limit would be allowed, and obviously we can't encode a
928        // series metric in a single byte, but this is just a simple sanity check, not an exhaustive
929        // search of the absolute bare minimum size.
930        let header_len = max_uncompressed_header_len();
931        if uncompressed_limit <= header_len {
932            return None;
933        }
934    }
935
936    // Get the maximum possible overhead of the compression container, based on the incoming
937    // _uncompressed_ limit. We use the uncompressed limit because we're calculating the maximum
938    // overhead in the case that, theoretically, none of the input data was compressible.  This
939    // possibility is essentially impossible, but serves as a proper worst-case-scenario check.
940    let max_compression_overhead = max_compression_overhead_len(uncompressed_limit);
941    if compressed_limit <= max_compression_overhead {
942        return None;
943    }
944
945    Some((uncompressed_limit, compressed_limit))
946}
947
948fn write_payload_header(
949    endpoint: DatadogMetricsEndpoint,
950    writer: &mut dyn io::Write,
951) -> io::Result<usize> {
952    match endpoint {
953        DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
954            .write_all(SERIES_PAYLOAD_HEADER)
955            .map(|_| SERIES_PAYLOAD_HEADER.len()),
956        _ => Ok(0),
957    }
958}
959
960fn write_payload_delimiter(
961    endpoint: DatadogMetricsEndpoint,
962    writer: &mut dyn io::Write,
963) -> io::Result<usize> {
964    match endpoint {
965        DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
966            .write_all(SERIES_PAYLOAD_DELIMITER)
967            .map(|_| SERIES_PAYLOAD_DELIMITER.len()),
968        _ => Ok(0),
969    }
970}
971
972fn write_payload_footer(
973    endpoint: DatadogMetricsEndpoint,
974    writer: &mut dyn io::Write,
975) -> io::Result<usize> {
976    match endpoint {
977        DatadogMetricsEndpoint::Series(SeriesApiVersion::V1) => writer
978            .write_all(SERIES_PAYLOAD_FOOTER)
979            .map(|_| SERIES_PAYLOAD_FOOTER.len()),
980        _ => Ok(0),
981    }
982}
983
984#[cfg(test)]
985mod tests {
986    use std::{
987        io::{self, copy},
988        num::NonZeroU32,
989        sync::Arc,
990    };
991
992    use bytes::{BufMut, Bytes, BytesMut};
993    use chrono::{DateTime, TimeZone, Timelike, Utc};
994    use flate2::read::ZlibDecoder;
995    use proptest::{
996        arbitrary::any, collection::btree_map, num::f64::POSITIVE as ARB_POSITIVE_F64, prop_assert,
997        proptest, strategy::Strategy, string::string_regex,
998    };
999    use prost::Message;
1000    use vector_lib::{
1001        config::{LogSchema, log_schema},
1002        event::{
1003            DatadogMetricOriginMetadata, EventMetadata, Metric, MetricKind, MetricTags,
1004            MetricValue,
1005            metric::{MetricSketch, TagValue},
1006        },
1007        metric_tags,
1008        metrics::AgentDDSketch,
1009    };
1010
1011    use super::{
1012        DatadogMetricsEncoder, EncoderError, ddmetric_proto, encode_proto_key_and_message,
1013        encode_tags, encode_timestamp, generate_series_metrics, get_compressor,
1014        get_sketch_payload_sketches_field_number, max_compression_overhead_len,
1015        max_uncompressed_header_len, series_to_proto_message, sketch_to_proto_message,
1016        validate_payload_size_limits, write_payload_footer, write_payload_header,
1017    };
1018    use crate::{
1019        common::datadog::DatadogMetricType,
1020        sinks::datadog::metrics::{
1021            config::{DatadogMetricsEndpoint, SeriesApiVersion},
1022            encoder::{DEFAULT_DD_ORIGIN_PRODUCT_VALUE, ORIGIN_PRODUCT_VALUE},
1023        },
1024    };
1025
1026    fn get_simple_counter() -> Metric {
1027        let value = MetricValue::Counter { value: 3.14 };
1028        Metric::new("basic_counter", MetricKind::Incremental, value).with_timestamp(Some(ts()))
1029    }
1030
1031    fn get_simple_counter_with_metadata(metadata: EventMetadata) -> Metric {
1032        let value = MetricValue::Counter { value: 3.14 };
1033        Metric::new_with_metadata("basic_counter", MetricKind::Incremental, value, metadata)
1034            .with_timestamp(Some(ts()))
1035    }
1036
1037    fn get_simple_rate_counter(value: f64, interval_ms: u32) -> Metric {
1038        let value = MetricValue::Counter { value };
1039        Metric::new("basic_counter", MetricKind::Incremental, value)
1040            .with_timestamp(Some(ts()))
1041            .with_interval_ms(NonZeroU32::new(interval_ms))
1042    }
1043
1044    fn get_simple_sketch() -> Metric {
1045        let mut ddsketch = AgentDDSketch::with_agent_defaults();
1046        ddsketch.insert(3.14);
1047        Metric::new("basic_counter", MetricKind::Incremental, ddsketch.into())
1048            .with_timestamp(Some(ts()))
1049    }
1050
1051    fn get_compressed_empty_series_payload() -> Bytes {
1052        let mut compressor = get_compressor();
1053
1054        _ = write_payload_header(
1055            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1056            &mut compressor,
1057        )
1058        .expect("should not fail");
1059        _ = write_payload_footer(
1060            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1061            &mut compressor,
1062        )
1063        .expect("should not fail");
1064
1065        compressor.finish().expect("should not fail").freeze()
1066    }
1067
1068    fn get_compressed_empty_sketches_payload() -> Bytes {
1069        get_compressor().finish().expect("should not fail").freeze()
1070    }
1071
1072    fn decompress_payload(payload: Bytes) -> io::Result<Bytes> {
1073        let mut decompressor = ZlibDecoder::new(&payload[..]);
1074        let mut decompressed = BytesMut::new().writer();
1075        let result = copy(&mut decompressor, &mut decompressed);
1076        result.map(|_| decompressed.into_inner().freeze())
1077    }
1078
1079    fn ts() -> DateTime<Utc> {
1080        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
1081            .single()
1082            .and_then(|t| t.with_nanosecond(11))
1083            .expect("invalid timestamp")
1084    }
1085
1086    fn tags() -> MetricTags {
1087        metric_tags! {
1088            "normal_tag" => "value",
1089            "true_tag" => "true",
1090            "empty_tag" => TagValue::Bare,
1091            "multi_value" => "one",
1092            "multi_value" => "two",
1093        }
1094    }
1095
1096    fn encode_sketches_normal<B>(
1097        metrics: &[Metric],
1098        default_namespace: &Option<Arc<str>>,
1099        log_schema: &'static LogSchema,
1100        buf: &mut B,
1101    ) where
1102        B: BufMut,
1103    {
1104        let mut sketches = Vec::new();
1105        for metric in metrics {
1106            let MetricValue::Sketch { sketch } = metric.value() else {
1107                panic!("must be sketch")
1108            };
1109            match sketch {
1110                MetricSketch::AgentDDSketch(ddsketch) => {
1111                    if let Some(sketch) =
1112                        sketch_to_proto_message(metric, ddsketch, default_namespace, log_schema, 14)
1113                    {
1114                        sketches.push(sketch);
1115                    }
1116                }
1117            }
1118        }
1119
1120        let sketch_payload = ddmetric_proto::SketchPayload {
1121            metadata: None,
1122            sketches,
1123        };
1124
1125        // Now try encoding this sketch payload, and then try to compress it.
1126        sketch_payload.encode(buf).unwrap()
1127    }
1128
1129    #[test]
1130    fn test_encode_tags() {
1131        assert_eq!(
1132            encode_tags(&tags()),
1133            vec![
1134                "empty_tag",
1135                "multi_value:one",
1136                "multi_value:two",
1137                "normal_tag:value",
1138                "true_tag:true",
1139            ]
1140        );
1141    }
1142
1143    #[test]
1144    fn test_encode_timestamp() {
1145        assert_eq!(encode_timestamp(None), Utc::now().timestamp());
1146        assert_eq!(encode_timestamp(Some(ts())), 1542182950);
1147    }
1148
1149    #[test]
1150    fn incorrect_metric_for_endpoint_causes_error() {
1151        // Series metrics can't go to the sketches endpoint.
1152        let mut sketch_encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
1153            .expect("default payload size limits should be valid");
1154        let series_result = sketch_encoder.try_encode(get_simple_counter());
1155        assert!(matches!(
1156            series_result.err(),
1157            Some(EncoderError::InvalidMetric { .. })
1158        ));
1159
1160        // And sketches can't go to the series endpoint.
1161        let mut series_v1_encoder =
1162            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None)
1163                .expect("default payload size limits should be valid");
1164        let sketch_result = series_v1_encoder.try_encode(get_simple_sketch());
1165        assert!(matches!(
1166            sketch_result.err(),
1167            Some(EncoderError::InvalidMetric { .. })
1168        ));
1169
1170        let mut series_v2_encoder =
1171            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None)
1172                .expect("default payload size limits should be valid");
1173        let sketch_result = series_v2_encoder.try_encode(get_simple_sketch());
1174        assert!(matches!(
1175            sketch_result.err(),
1176            Some(EncoderError::InvalidMetric { .. })
1177        ));
1178    }
1179
1180    #[test]
1181    fn encode_counter_with_interval_as_rate() {
1182        // When a counter explicitly has an interval, we need to encode it as a rate. This means
1183        // dividing the value by the interval (in seconds) and setting the metric type so that when
1184        // it lands on the DD side, they can multiply the value by the interval (in seconds) and get
1185        // back the correct total value for that time period.
1186
1187        let value = 423.1331;
1188        let interval_ms = 10000;
1189        let rate_counter = get_simple_rate_counter(value, interval_ms);
1190        let expected_value = value / (interval_ms / 1000) as f64;
1191        let expected_interval = interval_ms / 1000;
1192
1193        // series v1
1194        {
1195            // Encode the metric and make sure we did the rate conversion correctly.
1196            let result = generate_series_metrics(
1197                &rate_counter,
1198                &None,
1199                log_schema(),
1200                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1201            );
1202            assert!(result.is_ok());
1203
1204            let metrics = result.unwrap();
1205            assert_eq!(metrics.len(), 1);
1206
1207            let actual = &metrics[0];
1208            assert_eq!(actual.r#type, DatadogMetricType::Rate);
1209            assert_eq!(actual.interval, Some(expected_interval));
1210            assert_eq!(actual.points.len(), 1);
1211            assert_eq!(actual.points[0].1, expected_value);
1212        }
1213
1214        // series v2
1215        {
1216            let series_proto = series_to_proto_message(
1217                &rate_counter,
1218                &None,
1219                log_schema(),
1220                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1221            )
1222            .unwrap();
1223            assert_eq!(series_proto.r#type, 2);
1224            assert_eq!(series_proto.interval, expected_interval as i64);
1225            assert_eq!(series_proto.points.len(), 1);
1226            assert_eq!(series_proto.points[0].value, expected_value);
1227        }
1228    }
1229
1230    #[test]
1231    fn encode_non_rate_metric_with_interval() {
1232        // It is possible that the Agent sends Gauges with an interval set. This
1233        // Occurs when the origin of the metric is Dogstatsd, where the interval
1234        // is set to 10.
1235
1236        let value = 423.1331;
1237        let interval_ms = 10000;
1238
1239        let gauge = Metric::new(
1240            "basic_gauge",
1241            MetricKind::Incremental,
1242            MetricValue::Gauge { value },
1243        )
1244        .with_timestamp(Some(ts()))
1245        .with_interval_ms(NonZeroU32::new(interval_ms));
1246
1247        let expected_value = value; // For gauge, the value should not be modified by interval
1248        let expected_interval = interval_ms / 1000;
1249
1250        // series v1
1251        {
1252            // Encode the metric and make sure we did the rate conversion correctly.
1253            let result = generate_series_metrics(
1254                &gauge,
1255                &None,
1256                log_schema(),
1257                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1258            );
1259            assert!(result.is_ok());
1260
1261            let metrics = result.unwrap();
1262            assert_eq!(metrics.len(), 1);
1263
1264            let actual = &metrics[0];
1265            assert_eq!(actual.r#type, DatadogMetricType::Gauge);
1266            assert_eq!(actual.interval, Some(expected_interval));
1267            assert_eq!(actual.points.len(), 1);
1268            assert_eq!(actual.points[0].1, expected_value);
1269        }
1270
1271        // series v2
1272        {
1273            let series_proto = series_to_proto_message(
1274                &gauge,
1275                &None,
1276                log_schema(),
1277                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1278            )
1279            .unwrap();
1280            assert_eq!(series_proto.r#type, 3);
1281            assert_eq!(series_proto.interval, expected_interval as i64);
1282            assert_eq!(series_proto.points.len(), 1);
1283            assert_eq!(series_proto.points[0].value, expected_value);
1284        }
1285    }
1286
1287    #[test]
1288    fn encode_origin_metadata_pass_through() {
1289        let product = 10;
1290        let category = 11;
1291        let service = 9;
1292
1293        let event_metadata = EventMetadata::default().with_origin_metadata(
1294            DatadogMetricOriginMetadata::new(Some(product), Some(category), Some(service)),
1295        );
1296        let counter = get_simple_counter_with_metadata(event_metadata);
1297
1298        // series v1
1299        {
1300            let result = generate_series_metrics(
1301                &counter,
1302                &None,
1303                log_schema(),
1304                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1305            );
1306            assert!(result.is_ok());
1307
1308            let metrics = result.unwrap();
1309            assert_eq!(metrics.len(), 1);
1310
1311            let actual = &metrics[0];
1312            let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap();
1313
1314            assert_eq!(generated_origin.product().unwrap(), product);
1315            assert_eq!(generated_origin.category().unwrap(), category);
1316            assert_eq!(generated_origin.service().unwrap(), service);
1317        }
1318        // series v2
1319        {
1320            let series_proto = series_to_proto_message(
1321                &counter,
1322                &None,
1323                log_schema(),
1324                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1325            )
1326            .unwrap();
1327
1328            let generated_origin = series_proto.metadata.unwrap().origin.unwrap();
1329            assert_eq!(generated_origin.origin_product, product);
1330            assert_eq!(generated_origin.origin_category, category);
1331            assert_eq!(generated_origin.origin_service, service);
1332        }
1333    }
1334
1335    #[test]
1336    fn encode_origin_metadata_vector_sourced() {
1337        let product = *ORIGIN_PRODUCT_VALUE;
1338
1339        let category = 11;
1340        let service = 153;
1341
1342        let mut counter = get_simple_counter();
1343
1344        counter.metadata_mut().set_source_type("statsd");
1345
1346        // series v1
1347        {
1348            let result = generate_series_metrics(&counter, &None, log_schema(), product);
1349            assert!(result.is_ok());
1350
1351            let metrics = result.unwrap();
1352            assert_eq!(metrics.len(), 1);
1353
1354            let actual = &metrics[0];
1355            let generated_origin = actual.metadata.as_ref().unwrap().origin.as_ref().unwrap();
1356
1357            assert_eq!(generated_origin.product().unwrap(), product);
1358            assert_eq!(generated_origin.category().unwrap(), category);
1359            assert_eq!(generated_origin.service().unwrap(), service);
1360        }
1361        // series v2
1362        {
1363            let series_proto = series_to_proto_message(
1364                &counter,
1365                &None,
1366                log_schema(),
1367                DEFAULT_DD_ORIGIN_PRODUCT_VALUE,
1368            )
1369            .unwrap();
1370
1371            let generated_origin = series_proto.metadata.unwrap().origin.unwrap();
1372            assert_eq!(generated_origin.origin_product, product);
1373            assert_eq!(generated_origin.origin_category, category);
1374            assert_eq!(generated_origin.origin_service, service);
1375        }
1376    }
1377
1378    #[test]
1379    fn encode_single_series_v1_metric_with_default_limits() {
1380        // This is a simple test where we ensure that a single metric, with the default limits, can
1381        // be encoded without hitting any errors.
1382        let mut encoder =
1383            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V1), None)
1384                .expect("default payload size limits should be valid");
1385        let counter = get_simple_counter();
1386        let expected = counter.clone();
1387
1388        // Encode the counter.
1389        let result = encoder.try_encode(counter);
1390        assert!(result.is_ok());
1391        assert_eq!(result.unwrap(), None);
1392
1393        // Finish the payload, make sure we got what we came for.
1394        let result = encoder.finish();
1395        assert!(result.is_ok());
1396
1397        let (_payload, mut processed) = result.unwrap();
1398        assert_eq!(processed.len(), 1);
1399        assert_eq!(expected, processed.pop().unwrap());
1400    }
1401
1402    #[test]
1403    fn encode_single_series_v2_metric_with_default_limits() {
1404        // This is a simple test where we ensure that a single metric, with the default limits, can
1405        // be encoded without hitting any errors.
1406        let mut encoder =
1407            DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Series(SeriesApiVersion::V2), None)
1408                .expect("default payload size limits should be valid");
1409        let counter = get_simple_counter();
1410        let expected = counter.clone();
1411
1412        // Encode the counter.
1413        let result = encoder.try_encode(counter);
1414        assert!(result.is_ok());
1415        assert_eq!(result.unwrap(), None);
1416
1417        // Finish the payload, make sure we got what we came for.
1418        let result = encoder.finish();
1419        assert!(result.is_ok());
1420
1421        let (_payload, mut processed) = result.unwrap();
1422        assert_eq!(processed.len(), 1);
1423        assert_eq!(expected, processed.pop().unwrap());
1424    }
1425
1426    #[test]
1427    fn encode_single_sketch_metric_with_default_limits() {
1428        // This is a simple test where we ensure that a single metric, with the default limits, can
1429        // be encoded without hitting any errors.
1430        let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
1431            .expect("default payload size limits should be valid");
1432        let sketch = get_simple_sketch();
1433        let expected = sketch.clone();
1434
1435        // Encode the sketch.
1436        let result = encoder.try_encode(sketch);
1437        assert!(result.is_ok());
1438        assert_eq!(result.unwrap(), None);
1439
1440        // Finish the payload, make sure we got what we came for.
1441        let result = encoder.finish();
1442        assert!(result.is_ok());
1443
1444        let (_payload, mut processed) = result.unwrap();
1445        assert_eq!(processed.len(), 1);
1446        assert_eq!(expected, processed.pop().unwrap());
1447    }
1448
1449    #[test]
1450    fn encode_empty_sketch() {
1451        // This is a simple test where we ensure that a single metric, with the default limits, can
1452        // be encoded without hitting any errors.
1453        let mut encoder = DatadogMetricsEncoder::new(DatadogMetricsEndpoint::Sketches, None)
1454            .expect("default payload size limits should be valid");
1455        let sketch = Metric::new(
1456            "empty",
1457            MetricKind::Incremental,
1458            AgentDDSketch::with_agent_defaults().into(),
1459        )
1460        .with_timestamp(Some(ts()));
1461        let expected = sketch.clone();
1462
1463        // Encode the sketch.
1464        let result = encoder.try_encode(sketch);
1465        assert!(result.is_ok());
1466        assert_eq!(result.unwrap(), None);
1467
1468        // Finish the payload, make sure we got what we came for.
1469        let result = encoder.finish();
1470        assert!(result.is_ok());
1471
1472        let (_payload, mut processed) = result.unwrap();
1473        assert_eq!(processed.len(), 1);
1474        assert_eq!(expected, processed.pop().unwrap());
1475    }
1476
1477    #[test]
1478    fn encode_multiple_sketch_metrics_normal_vs_incremental() {
1479        // This tests our incremental sketch encoding against the more straightforward approach of
1480        // just building/encoding a full `SketchPayload` message.
1481        let metrics = vec![
1482            get_simple_sketch(),
1483            get_simple_sketch(),
1484            get_simple_sketch(),
1485        ];
1486
1487        let mut normal_buf = Vec::new();
1488        encode_sketches_normal(&metrics, &None, log_schema(), &mut normal_buf);
1489
1490        let mut incremental_buf = Vec::new();
1491        for metric in &metrics {
1492            match metric.value() {
1493                MetricValue::Sketch { sketch } => match sketch {
1494                    MetricSketch::AgentDDSketch(ddsketch) => {
1495                        if let Some(sketch_proto) =
1496                            sketch_to_proto_message(metric, ddsketch, &None, log_schema(), 14)
1497                        {
1498                            encode_proto_key_and_message(
1499                                sketch_proto,
1500                                get_sketch_payload_sketches_field_number(),
1501                                &mut incremental_buf,
1502                            )
1503                            .unwrap();
1504                        }
1505                    }
1506                },
1507                _ => panic!("should be a sketch"),
1508            }
1509        }
1510
1511        assert_eq!(normal_buf, incremental_buf);
1512    }
1513
1514    #[test]
1515    fn payload_size_limits_series() {
1516        // Get the maximum length of the header/trailer data.
1517        let header_len = max_uncompressed_header_len();
1518
1519        // This is too small.
1520        let result = validate_payload_size_limits(
1521            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1522            header_len,
1523            usize::MAX,
1524        );
1525        assert_eq!(result, None);
1526
1527        // This is just right.
1528        let result = validate_payload_size_limits(
1529            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1530            header_len + 1,
1531            usize::MAX,
1532        );
1533        assert_eq!(result, Some((header_len + 1, usize::MAX)));
1534
1535        // Get the maximum compressed overhead length, based on our input uncompressed size.  This
1536        // represents the worst case overhead based on the input data (of length usize::MAX, in this
1537        // case) being entirely incompressible.
1538        let compression_overhead_len = max_compression_overhead_len(usize::MAX);
1539
1540        // This is too small.
1541        let result = validate_payload_size_limits(
1542            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1543            usize::MAX,
1544            compression_overhead_len,
1545        );
1546        assert_eq!(result, None);
1547
1548        // This is just right.
1549        let result = validate_payload_size_limits(
1550            DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1551            usize::MAX,
1552            compression_overhead_len + 1,
1553        );
1554        assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1)));
1555    }
1556
1557    #[test]
1558    fn payload_size_limits_sketches() {
1559        // There's no lower bound on uncompressed size for the sketches payload.
1560        let result = validate_payload_size_limits(DatadogMetricsEndpoint::Sketches, 0, usize::MAX);
1561        assert_eq!(result, Some((0, usize::MAX)));
1562
1563        // Get the maximum compressed overhead length, based on our input uncompressed size.  This
1564        // represents the worst case overhead based on the input data (of length usize::MAX, in this
1565        // case) being entirely incompressible.
1566        let compression_overhead_len = max_compression_overhead_len(usize::MAX);
1567
1568        // This is too small.
1569        let result = validate_payload_size_limits(
1570            DatadogMetricsEndpoint::Sketches,
1571            usize::MAX,
1572            compression_overhead_len,
1573        );
1574        assert_eq!(result, None);
1575
1576        // This is just right.
1577        let result = validate_payload_size_limits(
1578            DatadogMetricsEndpoint::Sketches,
1579            usize::MAX,
1580            compression_overhead_len + 1,
1581        );
1582        assert_eq!(result, Some((usize::MAX, compression_overhead_len + 1)));
1583    }
1584
1585    #[test]
1586    fn encode_series_breaks_out_when_limit_reached_uncompressed() {
1587        // We manually create the encoder with an arbitrarily low "uncompressed" limit but high
1588        // "compressed" limit to exercise the codepath that should avoid encoding a metric when the
1589        // uncompressed payload would exceed the limit.
1590        let header_len = max_uncompressed_header_len();
1591        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1592            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1593            None,
1594            header_len + 1,
1595            usize::MAX,
1596        )
1597        .expect("payload size limits should be valid");
1598
1599        // Trying to encode a metric that would cause us to exceed our uncompressed limits will
1600        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1601        // as it could not be added.
1602        let counter = get_simple_counter();
1603        let result = encoder.try_encode(counter.clone());
1604        assert!(result.is_ok());
1605        assert_eq!(result.unwrap(), Some(counter));
1606
1607        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1608        // this payload, but it will be empty (effectively, the header/footer will exist) and no
1609        // processed metrics should be returned.
1610        let result = encoder.finish();
1611        assert!(result.is_ok());
1612
1613        let (payload, processed) = result.unwrap();
1614        assert_eq!(
1615            payload.uncompressed_byte_size,
1616            max_uncompressed_header_len()
1617        );
1618        assert_eq!(
1619            payload.into_payload(),
1620            get_compressed_empty_series_payload()
1621        );
1622        assert_eq!(processed.len(), 0);
1623    }
1624
1625    #[test]
1626    fn encode_sketches_breaks_out_when_limit_reached_uncompressed() {
1627        // We manually create the encoder with an arbitrarily low "uncompressed" limit but high
1628        // "compressed" limit to exercise the codepath that should avoid encoding a metric when the
1629        // uncompressed payload would exceed the limit.
1630        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1631            DatadogMetricsEndpoint::Sketches,
1632            None,
1633            1,
1634            usize::MAX,
1635        )
1636        .expect("payload size limits should be valid");
1637
1638        // Trying to encode a metric that would cause us to exceed our uncompressed limits will
1639        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1640        // as it could not be added.
1641        let sketch = get_simple_sketch();
1642        let result = encoder.try_encode(sketch.clone());
1643        assert!(result.is_ok());
1644        assert_eq!(result.unwrap(), Some(sketch));
1645
1646        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1647        // this payload, but it will be empty and no processed metrics should be returned.
1648        let result = encoder.finish();
1649        assert!(result.is_ok());
1650
1651        let (payload, processed) = result.unwrap();
1652        assert_eq!(payload.uncompressed_byte_size, 0);
1653        assert_eq!(
1654            payload.into_payload(),
1655            get_compressed_empty_sketches_payload()
1656        );
1657        assert_eq!(processed.len(), 0);
1658    }
1659
1660    #[test]
1661    fn encode_series_breaks_out_when_limit_reached_compressed() {
1662        // We manually create the encoder with an arbitrarily low "compressed" limit but high
1663        // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the
1664        // compressed payload would exceed the limit.
1665        let uncompressed_limit = 128;
1666        let compressed_limit = 32;
1667        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1668            DatadogMetricsEndpoint::Series(SeriesApiVersion::V1),
1669            None,
1670            uncompressed_limit,
1671            compressed_limit,
1672        )
1673        .expect("payload size limits should be valid");
1674
1675        // Trying to encode a metric that would cause us to exceed our compressed limits will
1676        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1677        // as it could not be added.
1678        let counter = get_simple_counter();
1679        let result = encoder.try_encode(counter.clone());
1680        assert!(result.is_ok());
1681        assert_eq!(result.unwrap(), Some(counter));
1682
1683        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1684        // this payload, but it will be empty (effectively, the header/footer will exist) and no
1685        // processed metrics should be returned.
1686        let result = encoder.finish();
1687        assert!(result.is_ok());
1688
1689        let (payload, processed) = result.unwrap();
1690        assert_eq!(
1691            payload.uncompressed_byte_size,
1692            max_uncompressed_header_len()
1693        );
1694        assert_eq!(
1695            payload.into_payload(),
1696            get_compressed_empty_series_payload()
1697        );
1698        assert_eq!(processed.len(), 0);
1699    }
1700
1701    #[test]
1702    fn encode_sketches_breaks_out_when_limit_reached_compressed() {
1703        // We manually create the encoder with an arbitrarily low "compressed" limit but high
1704        // "uncompressed" limit to exercise the codepath that should avoid encoding a metric when the
1705        // compressed payload would exceed the limit.
1706        let uncompressed_limit = 128;
1707        let compressed_limit = 16;
1708        let mut encoder = DatadogMetricsEncoder::with_payload_limits(
1709            DatadogMetricsEndpoint::Sketches,
1710            None,
1711            uncompressed_limit,
1712            compressed_limit,
1713        )
1714        .expect("payload size limits should be valid");
1715
1716        // Trying to encode a metric that would cause us to exceed our compressed limits will
1717        // _not_ return an error from `try_encode`, but instead will simply return back the metric
1718        // as it could not be added.
1719        let sketch = get_simple_sketch();
1720        let result = encoder.try_encode(sketch.clone());
1721        assert!(result.is_ok());
1722        assert_eq!(result.unwrap(), Some(sketch));
1723
1724        // And similarly, since we didn't actually encode a metric, we _should_ be able to finish
1725        // this payload, but it will be empty (effectively, the header/footer will exist) and no
1726        // processed metrics should be returned.
1727        let result = encoder.finish();
1728        assert!(result.is_ok());
1729
1730        let (payload, processed) = result.unwrap();
1731        assert_eq!(payload.uncompressed_byte_size, 0);
1732        assert_eq!(
1733            payload.into_payload(),
1734            get_compressed_empty_sketches_payload()
1735        );
1736        assert_eq!(processed.len(), 0);
1737    }
1738
1739    fn arb_counter_metric() -> impl Strategy<Value = Metric> {
1740        let name = string_regex("[a-zA-Z][a-zA-Z0-9_]{8,96}").expect("regex should not be invalid");
1741        let value = ARB_POSITIVE_F64;
1742        let tags = btree_map(
1743            any::<u64>().prop_map(|v| v.to_string()),
1744            any::<u64>().prop_map(|v| v.to_string()),
1745            0..64,
1746        )
1747        .prop_map(|tags| (!tags.is_empty()).then(|| MetricTags::from(tags)));
1748
1749        (name, value, tags).prop_map(|(metric_name, metric_value, metric_tags)| {
1750            let metric_value = MetricValue::Counter {
1751                value: metric_value,
1752            };
1753            Metric::new(metric_name, MetricKind::Incremental, metric_value).with_tags(metric_tags)
1754        })
1755    }
1756
1757    proptest! {
1758        #[test]
1759        fn encoding_check_for_payload_limit_edge_cases(
1760            uncompressed_limit in 0..64_000_000usize,
1761            compressed_limit in 0..10_000_000usize,
1762            metric in arb_counter_metric(),
1763        ) {
1764            // We simply try to encode a single metric into an encoder, and make sure that when we
1765            // finish the payload, if it didn't result in an error, that the payload was under the
1766            // configured limits.
1767            //
1768            // We check this with targeted unit tests as well but this is some cheap insurance to
1769            // show that we're hopefully not missing any particular corner cases.
1770            let result = DatadogMetricsEncoder::with_payload_limits(
1771                DatadogMetricsEndpoint::Series(SeriesApiVersion::V2),
1772                None,
1773                uncompressed_limit,
1774                compressed_limit,
1775            );
1776            if let Ok(mut encoder) = result {
1777                _ = encoder.try_encode(metric);
1778
1779                if let Ok((payload, _processed)) = encoder.finish() {
1780                    let payload = payload.into_payload();
1781                    prop_assert!(payload.len() <= compressed_limit);
1782
1783                    let result = decompress_payload(payload);
1784                    prop_assert!(result.is_ok());
1785
1786                    let decompressed = result.unwrap();
1787                    prop_assert!(decompressed.len() <= uncompressed_limit);
1788                }
1789            }
1790        }
1791    }
1792}