vector/sinks/datadog/logs/
sink.rs

1use std::{collections::VecDeque, fmt::Debug, io, sync::Arc};
2
3use itertools::Itertools;
4use snafu::Snafu;
5use vector_lib::{
6    event::ObjectMap,
7    event::Value,
8    internal_event::{ComponentEventsDropped, UNINTENTIONAL},
9    lookup::event_path,
10};
11use vrl::path::{OwnedSegment, OwnedTargetPath, PathPrefix};
12
13use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest};
14use crate::{
15    common::datadog::{is_reserved_attribute, DDTAGS, DD_RESERVED_SEMANTIC_ATTRS, MESSAGE},
16    sinks::{
17        prelude::*,
18        util::{http::HttpJsonBatchSizer, Compressor},
19    },
20};
21#[derive(Default)]
22pub struct EventPartitioner;
23
24impl Partitioner for EventPartitioner {
25    type Item = Event;
26    type Key = Option<Arc<str>>;
27
28    fn partition(&self, item: &Self::Item) -> Self::Key {
29        item.metadata().datadog_api_key()
30    }
31}
32
33#[derive(Debug)]
34pub struct LogSinkBuilder<S> {
35    transformer: Transformer,
36    service: S,
37    batch_settings: BatcherSettings,
38    compression: Option<Compression>,
39    default_api_key: Arc<str>,
40    protocol: String,
41    conforms_as_agent: bool,
42}
43
44impl<S> LogSinkBuilder<S> {
45    pub const fn new(
46        transformer: Transformer,
47        service: S,
48        default_api_key: Arc<str>,
49        batch_settings: BatcherSettings,
50        protocol: String,
51        conforms_as_agent: bool,
52    ) -> Self {
53        Self {
54            transformer,
55            service,
56            default_api_key,
57            batch_settings,
58            compression: None,
59            protocol,
60            conforms_as_agent,
61        }
62    }
63
64    pub const fn compression(mut self, compression: Compression) -> Self {
65        self.compression = Some(compression);
66        self
67    }
68
69    pub fn build(self) -> LogSink<S> {
70        LogSink {
71            default_api_key: self.default_api_key,
72            transformer: self.transformer,
73            service: self.service,
74            batch_settings: self.batch_settings,
75            compression: self.compression.unwrap_or_default(),
76            protocol: self.protocol,
77            conforms_as_agent: self.conforms_as_agent,
78        }
79    }
80}
81
82pub struct LogSink<S> {
83    /// The default Datadog API key to use
84    ///
85    /// In some instances an `Event` will come in on the stream with an
86    /// associated API key. That API key is the one it'll get batched up by but
87    /// otherwise we will see `Event` instances with no associated key. In that
88    /// case we batch them by this default.
89    default_api_key: Arc<str>,
90    /// The API service
91    service: S,
92    /// The encoding of payloads
93    transformer: Transformer,
94    /// The compression technique to use when building the request body
95    compression: Compression,
96    /// Batch settings: timeout, max events, max bytes, etc.
97    batch_settings: BatcherSettings,
98    /// The protocol name
99    protocol: String,
100    /// Normalize events to agent standard and attach associated HTTP header to request
101    conforms_as_agent: bool,
102}
103
104// The Datadog logs intake does not require the fields that are set in this
105// function. But if they are present in the event, we normalize the paths
106// (and value in the case of timestamp) to something that intake understands.
107pub fn normalize_event(event: &mut Event) {
108    let log = event.as_mut_log();
109
110    // Will cast the internal value to an object if it already isn't
111    if !log.value().is_object() {
112        log.insert(MESSAGE, log.value().clone());
113    }
114
115    // Upstream Sources may have semantically defined Datadog reserved attributes outside of their
116    // expected location by DD logs intake (root of the event). Move them if needed.
117    for (meaning, expected_field_name) in DD_RESERVED_SEMANTIC_ATTRS {
118        // check if there is a semantic meaning for the reserved attribute
119        if let Some(current_path) = log.find_key_by_meaning(meaning).cloned() {
120            // move it to the desired location
121            position_reserved_attr_event_root(log, &current_path, expected_field_name, meaning);
122        }
123    }
124
125    // if the tags value is an array we need to reconstruct it to a comma delimited string for DD logs intake.
126    // NOTE: we don't access by semantic meaning here because in the prior step
127    // we ensured reserved attributes are in expected locations.
128    let ddtags_path = event_path!(DDTAGS);
129    if let Some(Value::Array(tags_arr)) = log.get(ddtags_path) {
130        if !tags_arr.is_empty() {
131            let all_tags: String = tags_arr
132                .iter()
133                .filter_map(|tag_kv| {
134                    tag_kv
135                        .as_bytes()
136                        .map(|bytes| String::from_utf8_lossy(bytes))
137                })
138                .join(",");
139
140            log.insert(ddtags_path, all_tags);
141        }
142    }
143
144    // ensure the timestamp is in expected format
145    // NOTE: we don't access by semantic meaning here because in the prior step
146    // we ensured reserved attributes are in expected locations.
147    let ts_path = event_path!("timestamp");
148    if let Some(Value::Timestamp(ts)) = log.remove(ts_path) {
149        log.insert(ts_path, Value::Integer(ts.timestamp_millis()));
150    }
151}
152
153// Optionally for all other non-reserved fields, nest these under the `message` key. This is the
154// final step in having the event conform to the standard that the logs intake expects when an
155// event originates from an agent. Normalizing the events to the format prepared by the datadog
156// agent resolves any inconsistencies that would be observed when data flows through vector
157// before being ingested by the logs intake. This is because the logs intake interprets the
158// request with slight differences when this header and format are observed.
159pub fn normalize_as_agent_event(event: &mut Event) {
160    let log = event.as_mut_log();
161    // Should never occur since normalize_event forces a conversion of the log value to an Object type
162    let Some(object_map) = log.as_map_mut() else {
163        return;
164    };
165    // Move all non reserved fields into a new object
166    let mut local_root = ObjectMap::default();
167    let keys_to_move = object_map
168        .keys()
169        .filter(|ks| !is_reserved_attribute(ks.as_str()))
170        .cloned()
171        .collect::<Vec<_>>();
172    for key in keys_to_move {
173        if let Some((entry_k, entry_v)) = object_map.remove_entry(key.as_str()) {
174            local_root.insert(entry_k, entry_v);
175        }
176    }
177    // .. nest this object at the root under the reserved key named 'message'
178    log.insert(MESSAGE, local_root);
179}
180
181// If an expected reserved attribute is not located in the event root, rename it and handle
182// any potential conflicts by preserving the conflicting one with a _RESERVED_ prefix.
183pub fn position_reserved_attr_event_root(
184    log: &mut LogEvent,
185    current_path: &OwnedTargetPath,
186    expected_field_name: &str,
187    meaning: &str,
188) {
189    // the path that DD archives expects this reserved attribute to be in.
190    let desired_path = event_path!(expected_field_name);
191
192    // if not already be at the expected location
193    if !path_is_field(current_path, expected_field_name) {
194        // if an existing attribute exists here already, move it so to not overwrite it.
195        // yes, technically the rename path could exist, but technically that could always be the case.
196        if log.contains(desired_path) {
197            let rename_attr = format!("_RESERVED_{meaning}");
198            let rename_path = event_path!(rename_attr.as_str());
199            warn!(
200                message = "Semantic meaning is defined, but the event path already exists. Renaming to not overwrite.",
201                meaning = meaning,
202                renamed = &rename_attr,
203                internal_log_rate_limit = true,
204            );
205            log.rename_key(desired_path, rename_path);
206        }
207
208        log.rename_key(current_path, desired_path);
209    }
210}
211
212// Test if the named path consists of the single named field. This is rather a hack and should
213// hypothetically be solvable in the `vrl` crate with an implementation of
214// `PartialEq<BorrowedTargetPath<'_>>`. The alternative is doing a comparison against another
215// `OwnedTargetPath`, but the naïve implementation of that requires multiple allocations and copies
216// just to test equality.
217pub fn path_is_field(path: &OwnedTargetPath, field: &str) -> bool {
218    path.prefix == PathPrefix::Event
219        && matches!(&path.path.segments[..], [OwnedSegment::Field(f)] if f.as_str() == field)
220}
221
222#[derive(Debug, Snafu)]
223pub enum RequestBuildError {
224    #[snafu(display("Encoded payload is greater than the max limit."))]
225    PayloadTooBig { events_that_fit: usize },
226    #[snafu(display("Failed to build payload with error: {}", error))]
227    Io { error: std::io::Error },
228    #[snafu(display("Failed to serialize payload with error: {}", error))]
229    Json { error: serde_json::Error },
230}
231
232impl From<io::Error> for RequestBuildError {
233    fn from(error: io::Error) -> RequestBuildError {
234        RequestBuildError::Io { error }
235    }
236}
237
238impl From<serde_json::Error> for RequestBuildError {
239    fn from(error: serde_json::Error) -> RequestBuildError {
240        RequestBuildError::Json { error }
241    }
242}
243
244struct LogRequestBuilder {
245    pub default_api_key: Arc<str>,
246    pub transformer: Transformer,
247    pub compression: Compression,
248    pub conforms_as_agent: bool,
249}
250
251impl LogRequestBuilder {
252    pub fn build_request(
253        &self,
254        events: Vec<Event>,
255        api_key: Arc<str>,
256    ) -> Result<Vec<LogApiRequest>, RequestBuildError> {
257        // Transform events and pre-compute their estimated size.
258        let mut events_with_estimated_size: VecDeque<(Event, JsonSize)> = events
259            .into_iter()
260            .map(|mut event| {
261                normalize_event(&mut event);
262                if self.conforms_as_agent {
263                    normalize_as_agent_event(&mut event);
264                }
265                self.transformer.transform(&mut event);
266                let estimated_json_size = event.estimated_json_encoded_size_of();
267                (event, estimated_json_size)
268            })
269            .collect();
270
271        // Construct requests respecting the max payload size.
272        let mut requests: Vec<LogApiRequest> = Vec::new();
273        while !events_with_estimated_size.is_empty() {
274            let (events_serialized, body, byte_size) =
275                serialize_with_capacity(&mut events_with_estimated_size)?;
276            if events_serialized.is_empty() {
277                // first event was too large for whole request
278                let _too_big = events_with_estimated_size.pop_front();
279                emit!(ComponentEventsDropped::<UNINTENTIONAL> {
280                    count: 1,
281                    reason: "Event too large to encode."
282                });
283            } else {
284                let request =
285                    self.finish_request(body, events_serialized, byte_size, Arc::clone(&api_key))?;
286                requests.push(request);
287            }
288        }
289
290        Ok(requests)
291    }
292
293    fn finish_request(
294        &self,
295        buf: Vec<u8>,
296        mut events: Vec<Event>,
297        byte_size: GroupedCountByteSize,
298        api_key: Arc<str>,
299    ) -> Result<LogApiRequest, RequestBuildError> {
300        let n_events = events.len();
301        let uncompressed_size = buf.len();
302
303        // Now just compress it like normal.
304        let mut compressor = Compressor::from(self.compression);
305        write_all(&mut compressor, n_events, &buf)?;
306        let bytes = compressor.into_inner().freeze();
307
308        let finalizers = events.take_finalizers();
309        let request_metadata_builder = RequestMetadataBuilder::from_events(&events);
310
311        let payload = if self.compression.is_compressed() {
312            EncodeResult::compressed(bytes, uncompressed_size, byte_size)
313        } else {
314            EncodeResult::uncompressed(bytes, byte_size)
315        };
316
317        Ok::<_, RequestBuildError>(LogApiRequest {
318            api_key,
319            finalizers,
320            compression: self.compression,
321            metadata: request_metadata_builder.build(&payload),
322            uncompressed_size: payload.uncompressed_byte_size,
323            body: payload.into_payload(),
324        })
325    }
326}
327
328/// Serialize events into a buffer as a JSON array that has a maximum size of
329/// `MAX_PAYLOAD_BYTES`.
330///
331/// Returns the serialized events, the buffer, and the byte size of the events.
332/// Events that are not serialized remain in the `events` parameter.
333pub fn serialize_with_capacity(
334    events: &mut VecDeque<(Event, JsonSize)>,
335) -> Result<(Vec<Event>, Vec<u8>, GroupedCountByteSize), io::Error> {
336    // Compute estimated size, accounting for the size of the brackets and commas.
337    let total_estimated =
338        events.iter().map(|(_, size)| size.get()).sum::<usize>() + events.len() * 2;
339
340    // Initialize state.
341    let mut buf = Vec::with_capacity(total_estimated);
342    let mut byte_size = telemetry().create_request_count_byte_size();
343    let mut events_serialized = Vec::with_capacity(events.len());
344
345    // Write entries until the buffer is full.
346    buf.push(b'[');
347    let mut first = true;
348    while let Some((event, estimated_json_size)) = events.pop_front() {
349        // Track the existing length of the buffer so we can truncate it if we need to.
350        let existing_len = buf.len();
351        if first {
352            first = false;
353        } else {
354            buf.push(b',');
355        }
356        serde_json::to_writer(&mut buf, event.as_log())?;
357        // If the buffer is too big, truncate it and break out of the loop.
358        if buf.len() >= MAX_PAYLOAD_BYTES {
359            events.push_front((event, estimated_json_size));
360            buf.truncate(existing_len);
361            break;
362        }
363        // Otherwise, track the size of the event and continue.
364        byte_size.add_event(&event, estimated_json_size);
365        events_serialized.push(event);
366    }
367    buf.push(b']');
368
369    Ok((events_serialized, buf, byte_size))
370}
371
372impl<S> LogSink<S>
373where
374    S: Service<LogApiRequest> + Send + 'static,
375    S::Future: Send + 'static,
376    S::Response: DriverResponse + Send + 'static,
377    S::Error: Debug + Into<crate::Error> + Send,
378{
379    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
380        let default_api_key = Arc::clone(&self.default_api_key);
381
382        let partitioner = EventPartitioner;
383        let batch_settings = self.batch_settings;
384        let builder = Arc::new(LogRequestBuilder {
385            default_api_key,
386            transformer: self.transformer,
387            compression: self.compression,
388            conforms_as_agent: self.conforms_as_agent,
389        });
390
391        let input = input.batched_partitioned(partitioner, || {
392            batch_settings.as_item_size_config(HttpJsonBatchSizer)
393        });
394        input
395            .concurrent_map(default_request_builder_concurrency_limit(), move |input| {
396                let builder = Arc::clone(&builder);
397
398                Box::pin(async move {
399                    let (api_key, events) = input;
400                    let api_key = api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key));
401
402                    builder.build_request(events, api_key)
403                })
404            })
405            .filter_map(|request| async move {
406                match request {
407                    Err(error) => {
408                        emit!(SinkRequestBuildError { error });
409                        None
410                    }
411                    Ok(reqs) => Some(futures::stream::iter(reqs)),
412                }
413            })
414            .flatten()
415            .into_driver(self.service)
416            .protocol(self.protocol)
417            .run()
418            .await
419    }
420}
421
422#[async_trait]
423impl<S> StreamSink<Event> for LogSink<S>
424where
425    S: Service<LogApiRequest> + Send + 'static,
426    S::Future: Send + 'static,
427    S::Response: DriverResponse + Send + 'static,
428    S::Error: Debug + Into<crate::Error> + Send,
429{
430    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
431        self.run_inner(input).await
432    }
433}
434
435#[cfg(test)]
436mod tests {
437
438    use std::sync::Arc;
439
440    use chrono::Utc;
441    use vector_lib::{
442        config::{LegacyKey, LogNamespace},
443        event::{Event, EventMetadata, LogEvent},
444        schema::{meaning, Definition},
445    };
446    use vrl::{
447        core::Value,
448        event_path, metadata_path, owned_value_path, value,
449        value::{kind::Collection, Kind},
450    };
451
452    use super::{normalize_as_agent_event, normalize_event};
453    use crate::common::datadog::DD_RESERVED_SEMANTIC_ATTRS;
454
455    fn assert_normalized_log_has_expected_attrs(log: &LogEvent) {
456        assert!(log
457            .get(event_path!("timestamp"))
458            .expect("should have timestamp")
459            .is_integer());
460
461        for attr in [
462            "message",
463            "timestamp",
464            "hostname",
465            "ddtags",
466            "service",
467            "status",
468        ] {
469            assert!(log.contains(event_path!(attr)), "missing {attr}");
470        }
471
472        assert_eq!(
473            log.get(event_path!("ddtags")).expect("should have tags"),
474            &Value::Bytes("key1:value1,key2:value2".into())
475        );
476    }
477
478    fn agent_event_metadata(definition: Definition) -> EventMetadata {
479        EventMetadata::default().with_schema_definition(&Arc::new(
480            definition
481                .with_source_metadata(
482                    "datadog_agent",
483                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
484                    &owned_value_path!("ddtags"),
485                    Kind::bytes(),
486                    Some(meaning::TAGS),
487                )
488                .with_source_metadata(
489                    "datadog_agent",
490                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
491                    &owned_value_path!("hostname"),
492                    Kind::bytes(),
493                    Some(meaning::HOST),
494                )
495                .with_source_metadata(
496                    "datadog_agent",
497                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
498                    &owned_value_path!("timestamp"),
499                    Kind::timestamp(),
500                    Some(meaning::TIMESTAMP),
501                )
502                .with_source_metadata(
503                    "datadog_agent",
504                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("severity"))),
505                    &owned_value_path!("severity"),
506                    Kind::bytes(),
507                    Some(meaning::SEVERITY),
508                )
509                .with_source_metadata(
510                    "datadog_agent",
511                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
512                    &owned_value_path!("service"),
513                    Kind::bytes(),
514                    Some(meaning::SERVICE),
515                )
516                .with_source_metadata(
517                    "datadog_agent",
518                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("source"))),
519                    &owned_value_path!("source"),
520                    Kind::bytes(),
521                    Some(meaning::SOURCE),
522                ),
523        ))
524    }
525
526    #[test]
527    fn normalize_event_doesnt_require() {
528        let mut log = LogEvent::default();
529        log.insert(event_path!("foo"), "bar");
530
531        let mut event = Event::Log(log);
532        normalize_event(&mut event);
533
534        let log = event.as_log();
535
536        assert!(!log.contains(event_path!("message")));
537        assert!(!log.contains(event_path!("timestamp")));
538        assert!(!log.contains(event_path!("hostname")));
539    }
540
541    #[test]
542    fn normalize_event_normalizes_legacy_namespace() {
543        let definition = Definition::new_with_default_metadata(
544            Kind::object(Collection::empty()),
545            [LogNamespace::Legacy],
546        );
547        let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
548        log.insert(event_path!("message"), "the_message");
549        let namespace = log.namespace();
550
551        namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
552
553        let tags = vec![
554            Value::Bytes("key1:value1".into()),
555            Value::Bytes("key2:value2".into()),
556        ];
557
558        log.insert(event_path!("ddtags"), tags);
559        log.insert(event_path!("hostname"), "the_host");
560        log.insert(event_path!("service"), "the_service");
561        log.insert(event_path!("source"), "the_source");
562        log.insert(event_path!("severity"), "the_severity");
563
564        assert!(log.namespace() == LogNamespace::Legacy);
565
566        let mut event = Event::Log(log);
567        normalize_event(&mut event);
568
569        assert_normalized_log_has_expected_attrs(event.as_log());
570    }
571
572    #[test]
573    fn normalize_event_normalizes_vector_namespace_raw_field() {
574        let mut event = prepare_event_vector_namespace(|definition| {
575            LogEvent::from_parts(value!("the_message"), agent_event_metadata(definition))
576        });
577
578        normalize_event(&mut event);
579        normalize_as_agent_event(&mut event);
580
581        assert_normalized_log_has_expected_attrs(event.as_log());
582        assert_only_reserved_fields_at_root(event.as_log());
583        assert_eq!(
584            event.as_log().get("message"),
585            Some(&value!({"message": "the_message"}))
586        );
587    }
588
589    fn prepare_event_vector_namespace(log_generator: fn(Definition) -> LogEvent) -> Event {
590        let definition =
591            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector]);
592        let mut log = log_generator(definition);
593
594        // insert an arbitrary metadata field such that the log becomes Vector namespaced
595        log.insert(metadata_path!("vector", "foo"), "bar");
596
597        let namespace = log.namespace();
598        namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
599
600        let tags = vec![
601            Value::Bytes("key1:value1".into()),
602            Value::Bytes("key2:value2".into()),
603        ];
604        log.insert(metadata_path!("datadog_agent", "ddtags"), tags);
605
606        log.insert(metadata_path!("datadog_agent", "hostname"), "the_host");
607        log.insert(metadata_path!("datadog_agent", "timestamp"), Utc::now());
608        log.insert(metadata_path!("datadog_agent", "service"), "the_service");
609        log.insert(metadata_path!("datadog_agent", "source"), "the_source");
610        log.insert(metadata_path!("datadog_agent", "severity"), "the_severity");
611
612        assert!(log.namespace() == LogNamespace::Vector);
613        Event::Log(log)
614    }
615
616    #[test]
617    fn normalize_event_normalizes_vector_namespace() {
618        let mut event = prepare_event_vector_namespace(|definition| {
619            let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
620            log.insert(event_path!("message"), "the_message");
621            log
622        });
623
624        normalize_event(&mut event);
625        normalize_as_agent_event(&mut event);
626
627        assert_normalized_log_has_expected_attrs(event.as_log());
628        assert_only_reserved_fields_at_root(event.as_log());
629    }
630
631    fn prepare_agent_event() -> LogEvent {
632        let definition = Definition::new_with_default_metadata(
633            Kind::object(Collection::empty()),
634            [LogNamespace::Legacy],
635        );
636        let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
637        let namespace = log.namespace();
638        namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
639
640        let tags = vec![
641            Value::Bytes("key1:value1".into()),
642            Value::Bytes("key2:value2".into()),
643        ];
644
645        // insert mandatory fields
646        log.insert(event_path!("ddtags"), tags);
647        log.insert(event_path!("hostname"), "the_host");
648        log.insert(event_path!("service"), "the_service");
649        log.insert(event_path!("timestamp"), Utc::now());
650        log.insert(event_path!("source"), "the_source");
651        log.insert(event_path!("severity"), "the_severity");
652
653        let sample_message = value!({
654            "message": "hello world",
655            "field_a": "field_a_value",
656            "field_b": "field_b_value",
657            "field_c": { "field_c_nested" : "field_c_value" },
658        });
659        log.insert(event_path!("message"), sample_message.to_string());
660        log
661    }
662
663    fn assert_only_reserved_fields_at_root(log: &LogEvent) {
664        let objmap = log.as_map().unwrap();
665        let reserved_fields = DD_RESERVED_SEMANTIC_ATTRS
666            .into_iter()
667            .chain([("message", "message")])
668            .collect::<Vec<(&str, &str)>>();
669        for key in objmap.keys() {
670            assert!(reserved_fields.iter().any(|(_, msg)| *msg == key.as_str()));
671        }
672    }
673
674    #[test]
675    fn normalize_conforming_agent_with_collisions() {
676        let mut log = prepare_agent_event();
677
678        // insert random fields at root which will collide with sample data at 'message'
679        log.insert(event_path!("field_a"), "replaced_field_a_value");
680        log.insert(event_path!("field_c"), "replaced_field_c_value");
681        let mut event = Event::Log(log);
682        normalize_event(&mut event);
683        normalize_as_agent_event(&mut event);
684
685        let log = event.as_log();
686        assert_normalized_log_has_expected_attrs(log);
687        assert_only_reserved_fields_at_root(log);
688        assert_eq!(
689            log.get(event_path!("message")),
690            Some(&value!({
691                "source_type": "datadog_agent",
692                "field_a": "replaced_field_a_value",
693                "field_c": "replaced_field_c_value",
694                "message": (value!({
695                    "message": "hello world",
696                    "field_a": "field_a_value",
697                    "field_b": "field_b_value",
698                    "field_c": { "field_c_nested" : "field_c_value" },
699                }).to_string()),
700            }))
701        );
702    }
703
704    #[test]
705    fn normalize_conforming_agent() {
706        let mut log = prepare_agent_event();
707
708        // insert random fields at root
709        log.insert(event_path!("field_1"), "value_1");
710        log.insert(event_path!("field_2"), "value_2");
711        log.insert(event_path!("field_3", "field_3_nested"), "value_3");
712
713        // normalize and validate...
714        let mut event = Event::Log(log);
715        normalize_event(&mut event);
716        normalize_as_agent_event(&mut event);
717
718        // that all fields placed at the root no longer exist there
719        let log = event.as_log();
720        assert_normalized_log_has_expected_attrs(log);
721        assert_only_reserved_fields_at_root(log);
722
723        // .. and that they were nested properly underneath message
724        assert_eq!(
725            log.get(event_path!("message")),
726            Some(&value!({
727                "source_type": "datadog_agent",
728                "message": (value!({
729                    "message": "hello world",
730                    "field_a": "field_a_value",
731                    "field_b": "field_b_value",
732                    "field_c": { "field_c_nested" : "field_c_value" },
733                }).to_string()),
734                "field_1": "value_1",
735                "field_2": "value_2",
736                "field_3": {
737                    "field_3_nested": "value_3"
738                }
739            }))
740        );
741    }
742}