vector/sinks/datadog/logs/
sink.rs

1use std::{collections::VecDeque, fmt::Debug, io, sync::Arc};
2
3use itertools::Itertools;
4use snafu::Snafu;
5use vector_lib::{
6    event::{ObjectMap, Value},
7    internal_event::{ComponentEventsDropped, UNINTENTIONAL},
8    lookup::event_path,
9};
10use vrl::path::{OwnedSegment, OwnedTargetPath, PathPrefix};
11
12use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest};
13use crate::{
14    common::datadog::{DD_RESERVED_SEMANTIC_ATTRS, DDTAGS, MESSAGE, is_reserved_attribute},
15    sinks::{
16        prelude::*,
17        util::{Compressor, http::HttpJsonBatchSizer},
18    },
19};
20#[derive(Default)]
21pub struct EventPartitioner;
22
23impl Partitioner for EventPartitioner {
24    type Item = Event;
25    type Key = Option<Arc<str>>;
26
27    fn partition(&self, item: &Self::Item) -> Self::Key {
28        item.metadata().datadog_api_key()
29    }
30}
31
32#[derive(Debug)]
33pub struct LogSinkBuilder<S> {
34    transformer: Transformer,
35    service: S,
36    batch_settings: BatcherSettings,
37    compression: Option<Compression>,
38    default_api_key: Arc<str>,
39    protocol: String,
40    conforms_as_agent: bool,
41}
42
43impl<S> LogSinkBuilder<S> {
44    pub const fn new(
45        transformer: Transformer,
46        service: S,
47        default_api_key: Arc<str>,
48        batch_settings: BatcherSettings,
49        protocol: String,
50        conforms_as_agent: bool,
51    ) -> Self {
52        Self {
53            transformer,
54            service,
55            default_api_key,
56            batch_settings,
57            compression: None,
58            protocol,
59            conforms_as_agent,
60        }
61    }
62
63    pub const fn compression(mut self, compression: Compression) -> Self {
64        self.compression = Some(compression);
65        self
66    }
67
68    pub fn build(self) -> LogSink<S> {
69        LogSink {
70            default_api_key: self.default_api_key,
71            transformer: self.transformer,
72            service: self.service,
73            batch_settings: self.batch_settings,
74            compression: self.compression.unwrap_or_default(),
75            protocol: self.protocol,
76            conforms_as_agent: self.conforms_as_agent,
77        }
78    }
79}
80
81pub struct LogSink<S> {
82    /// The default Datadog API key to use
83    ///
84    /// In some instances an `Event` will come in on the stream with an
85    /// associated API key. That API key is the one it'll get batched up by but
86    /// otherwise we will see `Event` instances with no associated key. In that
87    /// case we batch them by this default.
88    default_api_key: Arc<str>,
89    /// The API service
90    service: S,
91    /// The encoding of payloads
92    transformer: Transformer,
93    /// The compression technique to use when building the request body
94    compression: Compression,
95    /// Batch settings: timeout, max events, max bytes, etc.
96    batch_settings: BatcherSettings,
97    /// The protocol name
98    protocol: String,
99    /// Normalize events to agent standard and attach associated HTTP header to request
100    conforms_as_agent: bool,
101}
102
103// The Datadog logs intake does not require the fields that are set in this
104// function. But if they are present in the event, we normalize the paths
105// (and value in the case of timestamp) to something that intake understands.
106pub fn normalize_event(event: &mut Event) {
107    let log = event.as_mut_log();
108
109    // Will cast the internal value to an object if it already isn't
110    if !log.value().is_object() {
111        log.insert(MESSAGE, log.value().clone());
112    }
113
114    // Upstream Sources may have semantically defined Datadog reserved attributes outside of their
115    // expected location by DD logs intake (root of the event). Move them if needed.
116    for (meaning, expected_field_name) in DD_RESERVED_SEMANTIC_ATTRS {
117        // check if there is a semantic meaning for the reserved attribute
118        if let Some(current_path) = log.find_key_by_meaning(meaning).cloned() {
119            // move it to the desired location
120            position_reserved_attr_event_root(log, &current_path, expected_field_name, meaning);
121        }
122    }
123
124    // if the tags value is an array we need to reconstruct it to a comma delimited string for DD logs intake.
125    // NOTE: we don't access by semantic meaning here because in the prior step
126    // we ensured reserved attributes are in expected locations.
127    let ddtags_path = event_path!(DDTAGS);
128    if let Some(Value::Array(tags_arr)) = log.get(ddtags_path)
129        && !tags_arr.is_empty()
130    {
131        let all_tags: String = tags_arr
132            .iter()
133            .filter_map(|tag_kv| {
134                tag_kv
135                    .as_bytes()
136                    .map(|bytes| String::from_utf8_lossy(bytes))
137            })
138            .join(",");
139
140        log.insert(ddtags_path, all_tags);
141    }
142
143    // ensure the timestamp is in expected format
144    // NOTE: we don't access by semantic meaning here because in the prior step
145    // we ensured reserved attributes are in expected locations.
146    let ts_path = event_path!("timestamp");
147    if let Some(Value::Timestamp(ts)) = log.remove(ts_path) {
148        log.insert(ts_path, Value::Integer(ts.timestamp_millis()));
149    }
150}
151
152// Optionally for all other non-reserved fields, nest these under the `message` key. This is the
153// final step in having the event conform to the standard that the logs intake expects when an
154// event originates from an agent. Normalizing the events to the format prepared by the datadog
155// agent resolves any inconsistencies that would be observed when data flows through vector
156// before being ingested by the logs intake. This is because the logs intake interprets the
157// request with slight differences when this header and format are observed.
158pub fn normalize_as_agent_event(event: &mut Event) {
159    let log = event.as_mut_log();
160    // Should never occur since normalize_event forces a conversion of the log value to an Object type
161    let Some(object_map) = log.as_map_mut() else {
162        return;
163    };
164    // Move all non reserved fields into a new object
165    let mut local_root = ObjectMap::default();
166    let keys_to_move = object_map
167        .keys()
168        .filter(|ks| !is_reserved_attribute(ks.as_str()))
169        .cloned()
170        .collect::<Vec<_>>();
171    for key in keys_to_move {
172        if let Some((entry_k, entry_v)) = object_map.remove_entry(key.as_str()) {
173            local_root.insert(entry_k, entry_v);
174        }
175    }
176    // .. nest this object at the root under the reserved key named 'message'
177    log.insert(MESSAGE, local_root);
178}
179
180// If an expected reserved attribute is not located in the event root, rename it and handle
181// any potential conflicts by preserving the conflicting one with a _RESERVED_ prefix.
182pub fn position_reserved_attr_event_root(
183    log: &mut LogEvent,
184    current_path: &OwnedTargetPath,
185    expected_field_name: &str,
186    meaning: &str,
187) {
188    // the path that DD archives expects this reserved attribute to be in.
189    let desired_path = event_path!(expected_field_name);
190
191    // if not already be at the expected location
192    if !path_is_field(current_path, expected_field_name) {
193        // if an existing attribute exists here already, move it so to not overwrite it.
194        // yes, technically the rename path could exist, but technically that could always be the case.
195        if log.contains(desired_path) {
196            let rename_attr = format!("_RESERVED_{meaning}");
197            let rename_path = event_path!(rename_attr.as_str());
198            warn!(
199                message = "Semantic meaning is defined, but the event path already exists. Renaming to not overwrite.",
200                meaning = meaning,
201                renamed = &rename_attr,
202            );
203            log.rename_key(desired_path, rename_path);
204        }
205
206        log.rename_key(current_path, desired_path);
207    }
208}
209
210// Test if the named path consists of the single named field. This is rather a hack and should
211// hypothetically be solvable in the `vrl` crate with an implementation of
212// `PartialEq<BorrowedTargetPath<'_>>`. The alternative is doing a comparison against another
213// `OwnedTargetPath`, but the naïve implementation of that requires multiple allocations and copies
214// just to test equality.
215pub fn path_is_field(path: &OwnedTargetPath, field: &str) -> bool {
216    path.prefix == PathPrefix::Event
217        && matches!(&path.path.segments[..], [OwnedSegment::Field(f)] if f.as_str() == field)
218}
219
220#[derive(Debug, Snafu)]
221pub enum RequestBuildError {
222    #[snafu(display("Encoded payload is greater than the max limit."))]
223    PayloadTooBig { events_that_fit: usize },
224    #[snafu(display("Failed to build payload with error: {}", error))]
225    Io { error: std::io::Error },
226    #[snafu(display("Failed to serialize payload with error: {}", error))]
227    Json { error: serde_json::Error },
228}
229
230impl From<io::Error> for RequestBuildError {
231    fn from(error: io::Error) -> RequestBuildError {
232        RequestBuildError::Io { error }
233    }
234}
235
236impl From<serde_json::Error> for RequestBuildError {
237    fn from(error: serde_json::Error) -> RequestBuildError {
238        RequestBuildError::Json { error }
239    }
240}
241
242struct LogRequestBuilder {
243    pub default_api_key: Arc<str>,
244    pub transformer: Transformer,
245    pub compression: Compression,
246    pub conforms_as_agent: bool,
247}
248
249impl LogRequestBuilder {
250    pub fn build_request(
251        &self,
252        events: Vec<Event>,
253        api_key: Arc<str>,
254    ) -> Result<Vec<LogApiRequest>, RequestBuildError> {
255        // Transform events and pre-compute their estimated size.
256        let mut events_with_estimated_size: VecDeque<(Event, JsonSize)> = events
257            .into_iter()
258            .map(|mut event| {
259                normalize_event(&mut event);
260                if self.conforms_as_agent {
261                    normalize_as_agent_event(&mut event);
262                }
263                self.transformer.transform(&mut event);
264                let estimated_json_size = event.estimated_json_encoded_size_of();
265                (event, estimated_json_size)
266            })
267            .collect();
268
269        // Construct requests respecting the max payload size.
270        let mut requests: Vec<LogApiRequest> = Vec::new();
271        while !events_with_estimated_size.is_empty() {
272            let (events_serialized, body, byte_size) =
273                serialize_with_capacity(&mut events_with_estimated_size)?;
274            if events_serialized.is_empty() {
275                // first event was too large for whole request
276                let _too_big = events_with_estimated_size.pop_front();
277                emit!(ComponentEventsDropped::<UNINTENTIONAL> {
278                    count: 1,
279                    reason: "Event too large to encode."
280                });
281            } else {
282                let request =
283                    self.finish_request(body, events_serialized, byte_size, Arc::clone(&api_key))?;
284                requests.push(request);
285            }
286        }
287
288        Ok(requests)
289    }
290
291    fn finish_request(
292        &self,
293        buf: Vec<u8>,
294        mut events: Vec<Event>,
295        byte_size: GroupedCountByteSize,
296        api_key: Arc<str>,
297    ) -> Result<LogApiRequest, RequestBuildError> {
298        let n_events = events.len();
299        let uncompressed_size = buf.len();
300
301        // Now just compress it like normal.
302        let mut compressor = Compressor::from(self.compression);
303        write_all(&mut compressor, n_events, &buf)?;
304        let bytes = compressor.into_inner().freeze();
305
306        let finalizers = events.take_finalizers();
307        let request_metadata_builder = RequestMetadataBuilder::from_events(&events);
308
309        let payload = if self.compression.is_compressed() {
310            EncodeResult::compressed(bytes, uncompressed_size, byte_size)
311        } else {
312            EncodeResult::uncompressed(bytes, byte_size)
313        };
314
315        Ok::<_, RequestBuildError>(LogApiRequest {
316            api_key,
317            finalizers,
318            compression: self.compression,
319            metadata: request_metadata_builder.build(&payload),
320            uncompressed_size: payload.uncompressed_byte_size,
321            body: payload.into_payload(),
322        })
323    }
324}
325
326/// Serialize events into a buffer as a JSON array that has a maximum size of
327/// `MAX_PAYLOAD_BYTES`.
328///
329/// Returns the serialized events, the buffer, and the byte size of the events.
330/// Events that are not serialized remain in the `events` parameter.
331pub fn serialize_with_capacity(
332    events: &mut VecDeque<(Event, JsonSize)>,
333) -> Result<(Vec<Event>, Vec<u8>, GroupedCountByteSize), io::Error> {
334    // Compute estimated size, accounting for the size of the brackets and commas.
335    let total_estimated =
336        events.iter().map(|(_, size)| size.get()).sum::<usize>() + events.len() * 2;
337
338    // Initialize state.
339    let mut buf = Vec::with_capacity(total_estimated);
340    let mut byte_size = telemetry().create_request_count_byte_size();
341    let mut events_serialized = Vec::with_capacity(events.len());
342
343    // Write entries until the buffer is full.
344    buf.push(b'[');
345    let mut first = true;
346    while let Some((event, estimated_json_size)) = events.pop_front() {
347        // Track the existing length of the buffer so we can truncate it if we need to.
348        let existing_len = buf.len();
349        if first {
350            first = false;
351        } else {
352            buf.push(b',');
353        }
354        serde_json::to_writer(&mut buf, event.as_log())?;
355        // If the buffer is too big, truncate it and break out of the loop.
356        if buf.len() >= MAX_PAYLOAD_BYTES {
357            events.push_front((event, estimated_json_size));
358            buf.truncate(existing_len);
359            break;
360        }
361        // Otherwise, track the size of the event and continue.
362        byte_size.add_event(&event, estimated_json_size);
363        events_serialized.push(event);
364    }
365    buf.push(b']');
366
367    Ok((events_serialized, buf, byte_size))
368}
369
370impl<S> LogSink<S>
371where
372    S: Service<LogApiRequest> + Send + 'static,
373    S::Future: Send + 'static,
374    S::Response: DriverResponse + Send + 'static,
375    S::Error: Debug + Into<crate::Error> + Send,
376{
377    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
378        let default_api_key = Arc::clone(&self.default_api_key);
379
380        let partitioner = EventPartitioner;
381        let batch_settings = self.batch_settings;
382        let builder = Arc::new(LogRequestBuilder {
383            default_api_key,
384            transformer: self.transformer,
385            compression: self.compression,
386            conforms_as_agent: self.conforms_as_agent,
387        });
388
389        let input = input.batched_partitioned(partitioner, || {
390            batch_settings.as_item_size_config(HttpJsonBatchSizer)
391        });
392        input
393            .concurrent_map(default_request_builder_concurrency_limit(), move |input| {
394                let builder = Arc::clone(&builder);
395
396                Box::pin(async move {
397                    let (api_key, events) = input;
398                    let api_key = api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key));
399
400                    builder.build_request(events, api_key)
401                })
402            })
403            .filter_map(|request| async move {
404                match request {
405                    Err(error) => {
406                        emit!(SinkRequestBuildError { error });
407                        None
408                    }
409                    Ok(reqs) => Some(futures::stream::iter(reqs)),
410                }
411            })
412            .flatten()
413            .into_driver(self.service)
414            .protocol(self.protocol)
415            .run()
416            .await
417    }
418}
419
420#[async_trait]
421impl<S> StreamSink<Event> for LogSink<S>
422where
423    S: Service<LogApiRequest> + Send + 'static,
424    S::Future: Send + 'static,
425    S::Response: DriverResponse + Send + 'static,
426    S::Error: Debug + Into<crate::Error> + Send,
427{
428    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
429        self.run_inner(input).await
430    }
431}
432
433#[cfg(test)]
434mod tests {
435
436    use std::sync::Arc;
437
438    use chrono::Utc;
439    use vector_lib::{
440        config::{LegacyKey, LogNamespace},
441        event::{Event, EventMetadata, LogEvent},
442        schema::{Definition, meaning},
443    };
444    use vrl::{
445        core::Value,
446        event_path, metadata_path, owned_value_path, value,
447        value::{Kind, kind::Collection},
448    };
449
450    use super::{normalize_as_agent_event, normalize_event};
451    use crate::common::datadog::DD_RESERVED_SEMANTIC_ATTRS;
452
453    fn assert_normalized_log_has_expected_attrs(log: &LogEvent) {
454        assert!(
455            log.get(event_path!("timestamp"))
456                .expect("should have timestamp")
457                .is_integer()
458        );
459
460        for attr in [
461            "message",
462            "timestamp",
463            "hostname",
464            "ddtags",
465            "service",
466            "status",
467        ] {
468            assert!(log.contains(event_path!(attr)), "missing {attr}");
469        }
470
471        assert_eq!(
472            log.get(event_path!("ddtags")).expect("should have tags"),
473            &Value::Bytes("key1:value1,key2:value2".into())
474        );
475    }
476
477    fn agent_event_metadata(definition: Definition) -> EventMetadata {
478        EventMetadata::default().with_schema_definition(&Arc::new(
479            definition
480                .with_source_metadata(
481                    "datadog_agent",
482                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
483                    &owned_value_path!("ddtags"),
484                    Kind::bytes(),
485                    Some(meaning::TAGS),
486                )
487                .with_source_metadata(
488                    "datadog_agent",
489                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
490                    &owned_value_path!("hostname"),
491                    Kind::bytes(),
492                    Some(meaning::HOST),
493                )
494                .with_source_metadata(
495                    "datadog_agent",
496                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
497                    &owned_value_path!("timestamp"),
498                    Kind::timestamp(),
499                    Some(meaning::TIMESTAMP),
500                )
501                .with_source_metadata(
502                    "datadog_agent",
503                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("severity"))),
504                    &owned_value_path!("severity"),
505                    Kind::bytes(),
506                    Some(meaning::SEVERITY),
507                )
508                .with_source_metadata(
509                    "datadog_agent",
510                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
511                    &owned_value_path!("service"),
512                    Kind::bytes(),
513                    Some(meaning::SERVICE),
514                )
515                .with_source_metadata(
516                    "datadog_agent",
517                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("source"))),
518                    &owned_value_path!("source"),
519                    Kind::bytes(),
520                    Some(meaning::SOURCE),
521                ),
522        ))
523    }
524
525    #[test]
526    fn normalize_event_doesnt_require() {
527        let mut log = LogEvent::default();
528        log.insert(event_path!("foo"), "bar");
529
530        let mut event = Event::Log(log);
531        normalize_event(&mut event);
532
533        let log = event.as_log();
534
535        assert!(!log.contains(event_path!("message")));
536        assert!(!log.contains(event_path!("timestamp")));
537        assert!(!log.contains(event_path!("hostname")));
538    }
539
540    #[test]
541    fn normalize_event_normalizes_legacy_namespace() {
542        let definition = Definition::new_with_default_metadata(
543            Kind::object(Collection::empty()),
544            [LogNamespace::Legacy],
545        );
546        let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
547        log.insert(event_path!("message"), "the_message");
548        let namespace = log.namespace();
549
550        namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
551
552        let tags = vec![
553            Value::Bytes("key1:value1".into()),
554            Value::Bytes("key2:value2".into()),
555        ];
556
557        log.insert(event_path!("ddtags"), tags);
558        log.insert(event_path!("hostname"), "the_host");
559        log.insert(event_path!("service"), "the_service");
560        log.insert(event_path!("source"), "the_source");
561        log.insert(event_path!("severity"), "the_severity");
562
563        assert!(log.namespace() == LogNamespace::Legacy);
564
565        let mut event = Event::Log(log);
566        normalize_event(&mut event);
567
568        assert_normalized_log_has_expected_attrs(event.as_log());
569    }
570
571    #[test]
572    fn normalize_event_normalizes_vector_namespace_raw_field() {
573        let mut event = prepare_event_vector_namespace(|definition| {
574            LogEvent::from_parts(value!("the_message"), agent_event_metadata(definition))
575        });
576
577        normalize_event(&mut event);
578        normalize_as_agent_event(&mut event);
579
580        assert_normalized_log_has_expected_attrs(event.as_log());
581        assert_only_reserved_fields_at_root(event.as_log());
582        assert_eq!(
583            event.as_log().get("message"),
584            Some(&value!({"message": "the_message"}))
585        );
586    }
587
588    fn prepare_event_vector_namespace(log_generator: fn(Definition) -> LogEvent) -> Event {
589        let definition =
590            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector]);
591        let mut log = log_generator(definition);
592
593        // insert an arbitrary metadata field such that the log becomes Vector namespaced
594        log.insert(metadata_path!("vector", "foo"), "bar");
595
596        let namespace = log.namespace();
597        namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
598
599        let tags = vec![
600            Value::Bytes("key1:value1".into()),
601            Value::Bytes("key2:value2".into()),
602        ];
603        log.insert(metadata_path!("datadog_agent", "ddtags"), tags);
604
605        log.insert(metadata_path!("datadog_agent", "hostname"), "the_host");
606        log.insert(metadata_path!("datadog_agent", "timestamp"), Utc::now());
607        log.insert(metadata_path!("datadog_agent", "service"), "the_service");
608        log.insert(metadata_path!("datadog_agent", "source"), "the_source");
609        log.insert(metadata_path!("datadog_agent", "severity"), "the_severity");
610
611        assert!(log.namespace() == LogNamespace::Vector);
612        Event::Log(log)
613    }
614
615    #[test]
616    fn normalize_event_normalizes_vector_namespace() {
617        let mut event = prepare_event_vector_namespace(|definition| {
618            let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
619            log.insert(event_path!("message"), "the_message");
620            log
621        });
622
623        normalize_event(&mut event);
624        normalize_as_agent_event(&mut event);
625
626        assert_normalized_log_has_expected_attrs(event.as_log());
627        assert_only_reserved_fields_at_root(event.as_log());
628    }
629
630    fn prepare_agent_event() -> LogEvent {
631        let definition = Definition::new_with_default_metadata(
632            Kind::object(Collection::empty()),
633            [LogNamespace::Legacy],
634        );
635        let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
636        let namespace = log.namespace();
637        namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
638
639        let tags = vec![
640            Value::Bytes("key1:value1".into()),
641            Value::Bytes("key2:value2".into()),
642        ];
643
644        // insert mandatory fields
645        log.insert(event_path!("ddtags"), tags);
646        log.insert(event_path!("hostname"), "the_host");
647        log.insert(event_path!("service"), "the_service");
648        log.insert(event_path!("timestamp"), Utc::now());
649        log.insert(event_path!("source"), "the_source");
650        log.insert(event_path!("severity"), "the_severity");
651
652        let sample_message = value!({
653            "message": "hello world",
654            "field_a": "field_a_value",
655            "field_b": "field_b_value",
656            "field_c": { "field_c_nested" : "field_c_value" },
657        });
658        log.insert(event_path!("message"), sample_message.to_string());
659        log
660    }
661
662    fn assert_only_reserved_fields_at_root(log: &LogEvent) {
663        let objmap = log.as_map().unwrap();
664        let reserved_fields = DD_RESERVED_SEMANTIC_ATTRS
665            .into_iter()
666            .chain([("message", "message")])
667            .collect::<Vec<(&str, &str)>>();
668        for key in objmap.keys() {
669            assert!(reserved_fields.iter().any(|(_, msg)| *msg == key.as_str()));
670        }
671    }
672
673    #[test]
674    fn normalize_conforming_agent_with_collisions() {
675        let mut log = prepare_agent_event();
676
677        // insert random fields at root which will collide with sample data at 'message'
678        log.insert(event_path!("field_a"), "replaced_field_a_value");
679        log.insert(event_path!("field_c"), "replaced_field_c_value");
680        let mut event = Event::Log(log);
681        normalize_event(&mut event);
682        normalize_as_agent_event(&mut event);
683
684        let log = event.as_log();
685        assert_normalized_log_has_expected_attrs(log);
686        assert_only_reserved_fields_at_root(log);
687        assert_eq!(
688            log.get(event_path!("message")),
689            Some(&value!({
690                "source_type": "datadog_agent",
691                "field_a": "replaced_field_a_value",
692                "field_c": "replaced_field_c_value",
693                "message": (value!({
694                    "message": "hello world",
695                    "field_a": "field_a_value",
696                    "field_b": "field_b_value",
697                    "field_c": { "field_c_nested" : "field_c_value" },
698                }).to_string()),
699            }))
700        );
701    }
702
703    #[test]
704    fn normalize_conforming_agent() {
705        let mut log = prepare_agent_event();
706
707        // insert random fields at root
708        log.insert(event_path!("field_1"), "value_1");
709        log.insert(event_path!("field_2"), "value_2");
710        log.insert(event_path!("field_3", "field_3_nested"), "value_3");
711
712        // normalize and validate...
713        let mut event = Event::Log(log);
714        normalize_event(&mut event);
715        normalize_as_agent_event(&mut event);
716
717        // that all fields placed at the root no longer exist there
718        let log = event.as_log();
719        assert_normalized_log_has_expected_attrs(log);
720        assert_only_reserved_fields_at_root(log);
721
722        // .. and that they were nested properly underneath message
723        assert_eq!(
724            log.get(event_path!("message")),
725            Some(&value!({
726                "source_type": "datadog_agent",
727                "message": (value!({
728                    "message": "hello world",
729                    "field_a": "field_a_value",
730                    "field_b": "field_b_value",
731                    "field_c": { "field_c_nested" : "field_c_value" },
732                }).to_string()),
733                "field_1": "value_1",
734                "field_2": "value_2",
735                "field_3": {
736                    "field_3_nested": "value_3"
737                }
738            }))
739        );
740    }
741}