vector/sinks/datadog/logs/
sink.rs

1use std::{collections::VecDeque, fmt::Debug, io, sync::Arc};
2
3use itertools::Itertools;
4use snafu::Snafu;
5use vector_lib::{
6    event::{ObjectMap, Value},
7    internal_event::{ComponentEventsDropped, UNINTENTIONAL},
8    lookup::event_path,
9};
10use vrl::path::{OwnedSegment, OwnedTargetPath, PathPrefix};
11
12use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest};
13use crate::{
14    common::datadog::{DD_RESERVED_SEMANTIC_ATTRS, DDTAGS, MESSAGE, is_reserved_attribute},
15    sinks::{
16        prelude::*,
17        util::{Compressor, http::HttpJsonBatchSizer},
18    },
19};
20#[derive(Default)]
21pub struct EventPartitioner;
22
23impl Partitioner for EventPartitioner {
24    type Item = Event;
25    type Key = Option<Arc<str>>;
26
27    fn partition(&self, item: &Self::Item) -> Self::Key {
28        item.metadata().datadog_api_key()
29    }
30}
31
32#[derive(Debug)]
33pub struct LogSinkBuilder<S> {
34    transformer: Transformer,
35    service: S,
36    batch_settings: BatcherSettings,
37    compression: Option<Compression>,
38    default_api_key: Arc<str>,
39    protocol: String,
40    conforms_as_agent: bool,
41}
42
43impl<S> LogSinkBuilder<S> {
44    pub const fn new(
45        transformer: Transformer,
46        service: S,
47        default_api_key: Arc<str>,
48        batch_settings: BatcherSettings,
49        protocol: String,
50        conforms_as_agent: bool,
51    ) -> Self {
52        Self {
53            transformer,
54            service,
55            default_api_key,
56            batch_settings,
57            compression: None,
58            protocol,
59            conforms_as_agent,
60        }
61    }
62
63    pub const fn compression(mut self, compression: Compression) -> Self {
64        self.compression = Some(compression);
65        self
66    }
67
68    pub fn build(self) -> LogSink<S> {
69        LogSink {
70            default_api_key: self.default_api_key,
71            transformer: self.transformer,
72            service: self.service,
73            batch_settings: self.batch_settings,
74            compression: self.compression.unwrap_or_default(),
75            protocol: self.protocol,
76            conforms_as_agent: self.conforms_as_agent,
77        }
78    }
79}
80
81pub struct LogSink<S> {
82    /// The default Datadog API key to use
83    ///
84    /// In some instances an `Event` will come in on the stream with an
85    /// associated API key. That API key is the one it'll get batched up by but
86    /// otherwise we will see `Event` instances with no associated key. In that
87    /// case we batch them by this default.
88    default_api_key: Arc<str>,
89    /// The API service
90    service: S,
91    /// The encoding of payloads
92    transformer: Transformer,
93    /// The compression technique to use when building the request body
94    compression: Compression,
95    /// Batch settings: timeout, max events, max bytes, etc.
96    batch_settings: BatcherSettings,
97    /// The protocol name
98    protocol: String,
99    /// Normalize events to agent standard and attach associated HTTP header to request
100    conforms_as_agent: bool,
101}
102
103// The Datadog logs intake does not require the fields that are set in this
104// function. But if they are present in the event, we normalize the paths
105// (and value in the case of timestamp) to something that intake understands.
106pub fn normalize_event(event: &mut Event) {
107    let log = event.as_mut_log();
108
109    // Will cast the internal value to an object if it already isn't
110    if !log.value().is_object() {
111        log.insert(MESSAGE, log.value().clone());
112    }
113
114    // Upstream Sources may have semantically defined Datadog reserved attributes outside of their
115    // expected location by DD logs intake (root of the event). Move them if needed.
116    for (meaning, expected_field_name) in DD_RESERVED_SEMANTIC_ATTRS {
117        // check if there is a semantic meaning for the reserved attribute
118        if let Some(current_path) = log.find_key_by_meaning(meaning).cloned() {
119            // move it to the desired location
120            position_reserved_attr_event_root(log, &current_path, expected_field_name, meaning);
121        }
122    }
123
124    // if the tags value is an array we need to reconstruct it to a comma delimited string for DD logs intake.
125    // NOTE: we don't access by semantic meaning here because in the prior step
126    // we ensured reserved attributes are in expected locations.
127    let ddtags_path = event_path!(DDTAGS);
128    if let Some(Value::Array(tags_arr)) = log.get(ddtags_path)
129        && !tags_arr.is_empty()
130    {
131        let all_tags: String = tags_arr
132            .iter()
133            .filter_map(|tag_kv| {
134                tag_kv
135                    .as_bytes()
136                    .map(|bytes| String::from_utf8_lossy(bytes))
137            })
138            .join(",");
139
140        log.insert(ddtags_path, all_tags);
141    }
142
143    // ensure the timestamp is in expected format
144    // NOTE: we don't access by semantic meaning here because in the prior step
145    // we ensured reserved attributes are in expected locations.
146    let ts_path = event_path!("timestamp");
147    if let Some(Value::Timestamp(ts)) = log.remove(ts_path) {
148        log.insert(ts_path, Value::Integer(ts.timestamp_millis()));
149    }
150}
151
152// Optionally for all other non-reserved fields, nest these under the `message` key. This is the
153// final step in having the event conform to the standard that the logs intake expects when an
154// event originates from an agent. Normalizing the events to the format prepared by the datadog
155// agent resolves any inconsistencies that would be observed when data flows through vector
156// before being ingested by the logs intake. This is because the logs intake interprets the
157// request with slight differences when this header and format are observed.
158pub fn normalize_as_agent_event(event: &mut Event) {
159    let log = event.as_mut_log();
160    // Should never occur since normalize_event forces a conversion of the log value to an Object type
161    let Some(object_map) = log.as_map_mut() else {
162        return;
163    };
164    // Move all non reserved fields into a new object
165    let mut local_root = ObjectMap::default();
166    let keys_to_move = object_map
167        .keys()
168        .filter(|ks| !is_reserved_attribute(ks.as_str()))
169        .cloned()
170        .collect::<Vec<_>>();
171    for key in keys_to_move {
172        if let Some((entry_k, entry_v)) = object_map.remove_entry(key.as_str()) {
173            local_root.insert(entry_k, entry_v);
174        }
175    }
176    // .. nest this object at the root under the reserved key named 'message'
177    log.insert(MESSAGE, local_root);
178}
179
180// If an expected reserved attribute is not located in the event root, rename it and handle
181// any potential conflicts by preserving the conflicting one with a _RESERVED_ prefix.
182pub fn position_reserved_attr_event_root(
183    log: &mut LogEvent,
184    current_path: &OwnedTargetPath,
185    expected_field_name: &str,
186    meaning: &str,
187) {
188    // the path that DD archives expects this reserved attribute to be in.
189    let desired_path = event_path!(expected_field_name);
190
191    // if not already be at the expected location
192    if !path_is_field(current_path, expected_field_name) {
193        // if an existing attribute exists here already, move it so to not overwrite it.
194        // yes, technically the rename path could exist, but technically that could always be the case.
195        if log.contains(desired_path) {
196            let rename_attr = format!("_RESERVED_{meaning}");
197            let rename_path = event_path!(rename_attr.as_str());
198            warn!(
199                message = "Semantic meaning is defined, but the event path already exists. Renaming to not overwrite.",
200                meaning = meaning,
201                renamed = &rename_attr,
202                internal_log_rate_limit = true,
203            );
204            log.rename_key(desired_path, rename_path);
205        }
206
207        log.rename_key(current_path, desired_path);
208    }
209}
210
211// Test if the named path consists of the single named field. This is rather a hack and should
212// hypothetically be solvable in the `vrl` crate with an implementation of
213// `PartialEq<BorrowedTargetPath<'_>>`. The alternative is doing a comparison against another
214// `OwnedTargetPath`, but the naïve implementation of that requires multiple allocations and copies
215// just to test equality.
216pub fn path_is_field(path: &OwnedTargetPath, field: &str) -> bool {
217    path.prefix == PathPrefix::Event
218        && matches!(&path.path.segments[..], [OwnedSegment::Field(f)] if f.as_str() == field)
219}
220
221#[derive(Debug, Snafu)]
222pub enum RequestBuildError {
223    #[snafu(display("Encoded payload is greater than the max limit."))]
224    PayloadTooBig { events_that_fit: usize },
225    #[snafu(display("Failed to build payload with error: {}", error))]
226    Io { error: std::io::Error },
227    #[snafu(display("Failed to serialize payload with error: {}", error))]
228    Json { error: serde_json::Error },
229}
230
231impl From<io::Error> for RequestBuildError {
232    fn from(error: io::Error) -> RequestBuildError {
233        RequestBuildError::Io { error }
234    }
235}
236
237impl From<serde_json::Error> for RequestBuildError {
238    fn from(error: serde_json::Error) -> RequestBuildError {
239        RequestBuildError::Json { error }
240    }
241}
242
243struct LogRequestBuilder {
244    pub default_api_key: Arc<str>,
245    pub transformer: Transformer,
246    pub compression: Compression,
247    pub conforms_as_agent: bool,
248}
249
250impl LogRequestBuilder {
251    pub fn build_request(
252        &self,
253        events: Vec<Event>,
254        api_key: Arc<str>,
255    ) -> Result<Vec<LogApiRequest>, RequestBuildError> {
256        // Transform events and pre-compute their estimated size.
257        let mut events_with_estimated_size: VecDeque<(Event, JsonSize)> = events
258            .into_iter()
259            .map(|mut event| {
260                normalize_event(&mut event);
261                if self.conforms_as_agent {
262                    normalize_as_agent_event(&mut event);
263                }
264                self.transformer.transform(&mut event);
265                let estimated_json_size = event.estimated_json_encoded_size_of();
266                (event, estimated_json_size)
267            })
268            .collect();
269
270        // Construct requests respecting the max payload size.
271        let mut requests: Vec<LogApiRequest> = Vec::new();
272        while !events_with_estimated_size.is_empty() {
273            let (events_serialized, body, byte_size) =
274                serialize_with_capacity(&mut events_with_estimated_size)?;
275            if events_serialized.is_empty() {
276                // first event was too large for whole request
277                let _too_big = events_with_estimated_size.pop_front();
278                emit!(ComponentEventsDropped::<UNINTENTIONAL> {
279                    count: 1,
280                    reason: "Event too large to encode."
281                });
282            } else {
283                let request =
284                    self.finish_request(body, events_serialized, byte_size, Arc::clone(&api_key))?;
285                requests.push(request);
286            }
287        }
288
289        Ok(requests)
290    }
291
292    fn finish_request(
293        &self,
294        buf: Vec<u8>,
295        mut events: Vec<Event>,
296        byte_size: GroupedCountByteSize,
297        api_key: Arc<str>,
298    ) -> Result<LogApiRequest, RequestBuildError> {
299        let n_events = events.len();
300        let uncompressed_size = buf.len();
301
302        // Now just compress it like normal.
303        let mut compressor = Compressor::from(self.compression);
304        write_all(&mut compressor, n_events, &buf)?;
305        let bytes = compressor.into_inner().freeze();
306
307        let finalizers = events.take_finalizers();
308        let request_metadata_builder = RequestMetadataBuilder::from_events(&events);
309
310        let payload = if self.compression.is_compressed() {
311            EncodeResult::compressed(bytes, uncompressed_size, byte_size)
312        } else {
313            EncodeResult::uncompressed(bytes, byte_size)
314        };
315
316        Ok::<_, RequestBuildError>(LogApiRequest {
317            api_key,
318            finalizers,
319            compression: self.compression,
320            metadata: request_metadata_builder.build(&payload),
321            uncompressed_size: payload.uncompressed_byte_size,
322            body: payload.into_payload(),
323        })
324    }
325}
326
327/// Serialize events into a buffer as a JSON array that has a maximum size of
328/// `MAX_PAYLOAD_BYTES`.
329///
330/// Returns the serialized events, the buffer, and the byte size of the events.
331/// Events that are not serialized remain in the `events` parameter.
332pub fn serialize_with_capacity(
333    events: &mut VecDeque<(Event, JsonSize)>,
334) -> Result<(Vec<Event>, Vec<u8>, GroupedCountByteSize), io::Error> {
335    // Compute estimated size, accounting for the size of the brackets and commas.
336    let total_estimated =
337        events.iter().map(|(_, size)| size.get()).sum::<usize>() + events.len() * 2;
338
339    // Initialize state.
340    let mut buf = Vec::with_capacity(total_estimated);
341    let mut byte_size = telemetry().create_request_count_byte_size();
342    let mut events_serialized = Vec::with_capacity(events.len());
343
344    // Write entries until the buffer is full.
345    buf.push(b'[');
346    let mut first = true;
347    while let Some((event, estimated_json_size)) = events.pop_front() {
348        // Track the existing length of the buffer so we can truncate it if we need to.
349        let existing_len = buf.len();
350        if first {
351            first = false;
352        } else {
353            buf.push(b',');
354        }
355        serde_json::to_writer(&mut buf, event.as_log())?;
356        // If the buffer is too big, truncate it and break out of the loop.
357        if buf.len() >= MAX_PAYLOAD_BYTES {
358            events.push_front((event, estimated_json_size));
359            buf.truncate(existing_len);
360            break;
361        }
362        // Otherwise, track the size of the event and continue.
363        byte_size.add_event(&event, estimated_json_size);
364        events_serialized.push(event);
365    }
366    buf.push(b']');
367
368    Ok((events_serialized, buf, byte_size))
369}
370
371impl<S> LogSink<S>
372where
373    S: Service<LogApiRequest> + Send + 'static,
374    S::Future: Send + 'static,
375    S::Response: DriverResponse + Send + 'static,
376    S::Error: Debug + Into<crate::Error> + Send,
377{
378    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
379        let default_api_key = Arc::clone(&self.default_api_key);
380
381        let partitioner = EventPartitioner;
382        let batch_settings = self.batch_settings;
383        let builder = Arc::new(LogRequestBuilder {
384            default_api_key,
385            transformer: self.transformer,
386            compression: self.compression,
387            conforms_as_agent: self.conforms_as_agent,
388        });
389
390        let input = input.batched_partitioned(partitioner, || {
391            batch_settings.as_item_size_config(HttpJsonBatchSizer)
392        });
393        input
394            .concurrent_map(default_request_builder_concurrency_limit(), move |input| {
395                let builder = Arc::clone(&builder);
396
397                Box::pin(async move {
398                    let (api_key, events) = input;
399                    let api_key = api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key));
400
401                    builder.build_request(events, api_key)
402                })
403            })
404            .filter_map(|request| async move {
405                match request {
406                    Err(error) => {
407                        emit!(SinkRequestBuildError { error });
408                        None
409                    }
410                    Ok(reqs) => Some(futures::stream::iter(reqs)),
411                }
412            })
413            .flatten()
414            .into_driver(self.service)
415            .protocol(self.protocol)
416            .run()
417            .await
418    }
419}
420
421#[async_trait]
422impl<S> StreamSink<Event> for LogSink<S>
423where
424    S: Service<LogApiRequest> + Send + 'static,
425    S::Future: Send + 'static,
426    S::Response: DriverResponse + Send + 'static,
427    S::Error: Debug + Into<crate::Error> + Send,
428{
429    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
430        self.run_inner(input).await
431    }
432}
433
434#[cfg(test)]
435mod tests {
436
437    use std::sync::Arc;
438
439    use chrono::Utc;
440    use vector_lib::{
441        config::{LegacyKey, LogNamespace},
442        event::{Event, EventMetadata, LogEvent},
443        schema::{Definition, meaning},
444    };
445    use vrl::{
446        core::Value,
447        event_path, metadata_path, owned_value_path, value,
448        value::{Kind, kind::Collection},
449    };
450
451    use super::{normalize_as_agent_event, normalize_event};
452    use crate::common::datadog::DD_RESERVED_SEMANTIC_ATTRS;
453
454    fn assert_normalized_log_has_expected_attrs(log: &LogEvent) {
455        assert!(
456            log.get(event_path!("timestamp"))
457                .expect("should have timestamp")
458                .is_integer()
459        );
460
461        for attr in [
462            "message",
463            "timestamp",
464            "hostname",
465            "ddtags",
466            "service",
467            "status",
468        ] {
469            assert!(log.contains(event_path!(attr)), "missing {attr}");
470        }
471
472        assert_eq!(
473            log.get(event_path!("ddtags")).expect("should have tags"),
474            &Value::Bytes("key1:value1,key2:value2".into())
475        );
476    }
477
478    fn agent_event_metadata(definition: Definition) -> EventMetadata {
479        EventMetadata::default().with_schema_definition(&Arc::new(
480            definition
481                .with_source_metadata(
482                    "datadog_agent",
483                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
484                    &owned_value_path!("ddtags"),
485                    Kind::bytes(),
486                    Some(meaning::TAGS),
487                )
488                .with_source_metadata(
489                    "datadog_agent",
490                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
491                    &owned_value_path!("hostname"),
492                    Kind::bytes(),
493                    Some(meaning::HOST),
494                )
495                .with_source_metadata(
496                    "datadog_agent",
497                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
498                    &owned_value_path!("timestamp"),
499                    Kind::timestamp(),
500                    Some(meaning::TIMESTAMP),
501                )
502                .with_source_metadata(
503                    "datadog_agent",
504                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("severity"))),
505                    &owned_value_path!("severity"),
506                    Kind::bytes(),
507                    Some(meaning::SEVERITY),
508                )
509                .with_source_metadata(
510                    "datadog_agent",
511                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
512                    &owned_value_path!("service"),
513                    Kind::bytes(),
514                    Some(meaning::SERVICE),
515                )
516                .with_source_metadata(
517                    "datadog_agent",
518                    Some(LegacyKey::InsertIfEmpty(owned_value_path!("source"))),
519                    &owned_value_path!("source"),
520                    Kind::bytes(),
521                    Some(meaning::SOURCE),
522                ),
523        ))
524    }
525
526    #[test]
527    fn normalize_event_doesnt_require() {
528        let mut log = LogEvent::default();
529        log.insert(event_path!("foo"), "bar");
530
531        let mut event = Event::Log(log);
532        normalize_event(&mut event);
533
534        let log = event.as_log();
535
536        assert!(!log.contains(event_path!("message")));
537        assert!(!log.contains(event_path!("timestamp")));
538        assert!(!log.contains(event_path!("hostname")));
539    }
540
541    #[test]
542    fn normalize_event_normalizes_legacy_namespace() {
543        let definition = Definition::new_with_default_metadata(
544            Kind::object(Collection::empty()),
545            [LogNamespace::Legacy],
546        );
547        let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
548        log.insert(event_path!("message"), "the_message");
549        let namespace = log.namespace();
550
551        namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
552
553        let tags = vec![
554            Value::Bytes("key1:value1".into()),
555            Value::Bytes("key2:value2".into()),
556        ];
557
558        log.insert(event_path!("ddtags"), tags);
559        log.insert(event_path!("hostname"), "the_host");
560        log.insert(event_path!("service"), "the_service");
561        log.insert(event_path!("source"), "the_source");
562        log.insert(event_path!("severity"), "the_severity");
563
564        assert!(log.namespace() == LogNamespace::Legacy);
565
566        let mut event = Event::Log(log);
567        normalize_event(&mut event);
568
569        assert_normalized_log_has_expected_attrs(event.as_log());
570    }
571
572    #[test]
573    fn normalize_event_normalizes_vector_namespace_raw_field() {
574        let mut event = prepare_event_vector_namespace(|definition| {
575            LogEvent::from_parts(value!("the_message"), agent_event_metadata(definition))
576        });
577
578        normalize_event(&mut event);
579        normalize_as_agent_event(&mut event);
580
581        assert_normalized_log_has_expected_attrs(event.as_log());
582        assert_only_reserved_fields_at_root(event.as_log());
583        assert_eq!(
584            event.as_log().get("message"),
585            Some(&value!({"message": "the_message"}))
586        );
587    }
588
589    fn prepare_event_vector_namespace(log_generator: fn(Definition) -> LogEvent) -> Event {
590        let definition =
591            Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector]);
592        let mut log = log_generator(definition);
593
594        // insert an arbitrary metadata field such that the log becomes Vector namespaced
595        log.insert(metadata_path!("vector", "foo"), "bar");
596
597        let namespace = log.namespace();
598        namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
599
600        let tags = vec![
601            Value::Bytes("key1:value1".into()),
602            Value::Bytes("key2:value2".into()),
603        ];
604        log.insert(metadata_path!("datadog_agent", "ddtags"), tags);
605
606        log.insert(metadata_path!("datadog_agent", "hostname"), "the_host");
607        log.insert(metadata_path!("datadog_agent", "timestamp"), Utc::now());
608        log.insert(metadata_path!("datadog_agent", "service"), "the_service");
609        log.insert(metadata_path!("datadog_agent", "source"), "the_source");
610        log.insert(metadata_path!("datadog_agent", "severity"), "the_severity");
611
612        assert!(log.namespace() == LogNamespace::Vector);
613        Event::Log(log)
614    }
615
616    #[test]
617    fn normalize_event_normalizes_vector_namespace() {
618        let mut event = prepare_event_vector_namespace(|definition| {
619            let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
620            log.insert(event_path!("message"), "the_message");
621            log
622        });
623
624        normalize_event(&mut event);
625        normalize_as_agent_event(&mut event);
626
627        assert_normalized_log_has_expected_attrs(event.as_log());
628        assert_only_reserved_fields_at_root(event.as_log());
629    }
630
631    fn prepare_agent_event() -> LogEvent {
632        let definition = Definition::new_with_default_metadata(
633            Kind::object(Collection::empty()),
634            [LogNamespace::Legacy],
635        );
636        let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
637        let namespace = log.namespace();
638        namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
639
640        let tags = vec![
641            Value::Bytes("key1:value1".into()),
642            Value::Bytes("key2:value2".into()),
643        ];
644
645        // insert mandatory fields
646        log.insert(event_path!("ddtags"), tags);
647        log.insert(event_path!("hostname"), "the_host");
648        log.insert(event_path!("service"), "the_service");
649        log.insert(event_path!("timestamp"), Utc::now());
650        log.insert(event_path!("source"), "the_source");
651        log.insert(event_path!("severity"), "the_severity");
652
653        let sample_message = value!({
654            "message": "hello world",
655            "field_a": "field_a_value",
656            "field_b": "field_b_value",
657            "field_c": { "field_c_nested" : "field_c_value" },
658        });
659        log.insert(event_path!("message"), sample_message.to_string());
660        log
661    }
662
663    fn assert_only_reserved_fields_at_root(log: &LogEvent) {
664        let objmap = log.as_map().unwrap();
665        let reserved_fields = DD_RESERVED_SEMANTIC_ATTRS
666            .into_iter()
667            .chain([("message", "message")])
668            .collect::<Vec<(&str, &str)>>();
669        for key in objmap.keys() {
670            assert!(reserved_fields.iter().any(|(_, msg)| *msg == key.as_str()));
671        }
672    }
673
674    #[test]
675    fn normalize_conforming_agent_with_collisions() {
676        let mut log = prepare_agent_event();
677
678        // insert random fields at root which will collide with sample data at 'message'
679        log.insert(event_path!("field_a"), "replaced_field_a_value");
680        log.insert(event_path!("field_c"), "replaced_field_c_value");
681        let mut event = Event::Log(log);
682        normalize_event(&mut event);
683        normalize_as_agent_event(&mut event);
684
685        let log = event.as_log();
686        assert_normalized_log_has_expected_attrs(log);
687        assert_only_reserved_fields_at_root(log);
688        assert_eq!(
689            log.get(event_path!("message")),
690            Some(&value!({
691                "source_type": "datadog_agent",
692                "field_a": "replaced_field_a_value",
693                "field_c": "replaced_field_c_value",
694                "message": (value!({
695                    "message": "hello world",
696                    "field_a": "field_a_value",
697                    "field_b": "field_b_value",
698                    "field_c": { "field_c_nested" : "field_c_value" },
699                }).to_string()),
700            }))
701        );
702    }
703
704    #[test]
705    fn normalize_conforming_agent() {
706        let mut log = prepare_agent_event();
707
708        // insert random fields at root
709        log.insert(event_path!("field_1"), "value_1");
710        log.insert(event_path!("field_2"), "value_2");
711        log.insert(event_path!("field_3", "field_3_nested"), "value_3");
712
713        // normalize and validate...
714        let mut event = Event::Log(log);
715        normalize_event(&mut event);
716        normalize_as_agent_event(&mut event);
717
718        // that all fields placed at the root no longer exist there
719        let log = event.as_log();
720        assert_normalized_log_has_expected_attrs(log);
721        assert_only_reserved_fields_at_root(log);
722
723        // .. and that they were nested properly underneath message
724        assert_eq!(
725            log.get(event_path!("message")),
726            Some(&value!({
727                "source_type": "datadog_agent",
728                "message": (value!({
729                    "message": "hello world",
730                    "field_a": "field_a_value",
731                    "field_b": "field_b_value",
732                    "field_c": { "field_c_nested" : "field_c_value" },
733                }).to_string()),
734                "field_1": "value_1",
735                "field_2": "value_2",
736                "field_3": {
737                    "field_3_nested": "value_3"
738                }
739            }))
740        );
741    }
742}