1use std::{collections::VecDeque, fmt::Debug, io, sync::Arc};
2
3use itertools::Itertools;
4use snafu::Snafu;
5use vector_lib::{
6 event::ObjectMap,
7 event::Value,
8 internal_event::{ComponentEventsDropped, UNINTENTIONAL},
9 lookup::event_path,
10};
11use vrl::path::{OwnedSegment, OwnedTargetPath, PathPrefix};
12
13use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest};
14use crate::{
15 common::datadog::{is_reserved_attribute, DDTAGS, DD_RESERVED_SEMANTIC_ATTRS, MESSAGE},
16 sinks::{
17 prelude::*,
18 util::{http::HttpJsonBatchSizer, Compressor},
19 },
20};
21#[derive(Default)]
22pub struct EventPartitioner;
23
24impl Partitioner for EventPartitioner {
25 type Item = Event;
26 type Key = Option<Arc<str>>;
27
28 fn partition(&self, item: &Self::Item) -> Self::Key {
29 item.metadata().datadog_api_key()
30 }
31}
32
33#[derive(Debug)]
34pub struct LogSinkBuilder<S> {
35 transformer: Transformer,
36 service: S,
37 batch_settings: BatcherSettings,
38 compression: Option<Compression>,
39 default_api_key: Arc<str>,
40 protocol: String,
41 conforms_as_agent: bool,
42}
43
44impl<S> LogSinkBuilder<S> {
45 pub const fn new(
46 transformer: Transformer,
47 service: S,
48 default_api_key: Arc<str>,
49 batch_settings: BatcherSettings,
50 protocol: String,
51 conforms_as_agent: bool,
52 ) -> Self {
53 Self {
54 transformer,
55 service,
56 default_api_key,
57 batch_settings,
58 compression: None,
59 protocol,
60 conforms_as_agent,
61 }
62 }
63
64 pub const fn compression(mut self, compression: Compression) -> Self {
65 self.compression = Some(compression);
66 self
67 }
68
69 pub fn build(self) -> LogSink<S> {
70 LogSink {
71 default_api_key: self.default_api_key,
72 transformer: self.transformer,
73 service: self.service,
74 batch_settings: self.batch_settings,
75 compression: self.compression.unwrap_or_default(),
76 protocol: self.protocol,
77 conforms_as_agent: self.conforms_as_agent,
78 }
79 }
80}
81
82pub struct LogSink<S> {
83 default_api_key: Arc<str>,
90 service: S,
92 transformer: Transformer,
94 compression: Compression,
96 batch_settings: BatcherSettings,
98 protocol: String,
100 conforms_as_agent: bool,
102}
103
104pub fn normalize_event(event: &mut Event) {
108 let log = event.as_mut_log();
109
110 if !log.value().is_object() {
112 log.insert(MESSAGE, log.value().clone());
113 }
114
115 for (meaning, expected_field_name) in DD_RESERVED_SEMANTIC_ATTRS {
118 if let Some(current_path) = log.find_key_by_meaning(meaning).cloned() {
120 position_reserved_attr_event_root(log, ¤t_path, expected_field_name, meaning);
122 }
123 }
124
125 let ddtags_path = event_path!(DDTAGS);
129 if let Some(Value::Array(tags_arr)) = log.get(ddtags_path) {
130 if !tags_arr.is_empty() {
131 let all_tags: String = tags_arr
132 .iter()
133 .filter_map(|tag_kv| {
134 tag_kv
135 .as_bytes()
136 .map(|bytes| String::from_utf8_lossy(bytes))
137 })
138 .join(",");
139
140 log.insert(ddtags_path, all_tags);
141 }
142 }
143
144 let ts_path = event_path!("timestamp");
148 if let Some(Value::Timestamp(ts)) = log.remove(ts_path) {
149 log.insert(ts_path, Value::Integer(ts.timestamp_millis()));
150 }
151}
152
153pub fn normalize_as_agent_event(event: &mut Event) {
160 let log = event.as_mut_log();
161 let Some(object_map) = log.as_map_mut() else {
163 return;
164 };
165 let mut local_root = ObjectMap::default();
167 let keys_to_move = object_map
168 .keys()
169 .filter(|ks| !is_reserved_attribute(ks.as_str()))
170 .cloned()
171 .collect::<Vec<_>>();
172 for key in keys_to_move {
173 if let Some((entry_k, entry_v)) = object_map.remove_entry(key.as_str()) {
174 local_root.insert(entry_k, entry_v);
175 }
176 }
177 log.insert(MESSAGE, local_root);
179}
180
181pub fn position_reserved_attr_event_root(
184 log: &mut LogEvent,
185 current_path: &OwnedTargetPath,
186 expected_field_name: &str,
187 meaning: &str,
188) {
189 let desired_path = event_path!(expected_field_name);
191
192 if !path_is_field(current_path, expected_field_name) {
194 if log.contains(desired_path) {
197 let rename_attr = format!("_RESERVED_{meaning}");
198 let rename_path = event_path!(rename_attr.as_str());
199 warn!(
200 message = "Semantic meaning is defined, but the event path already exists. Renaming to not overwrite.",
201 meaning = meaning,
202 renamed = &rename_attr,
203 internal_log_rate_limit = true,
204 );
205 log.rename_key(desired_path, rename_path);
206 }
207
208 log.rename_key(current_path, desired_path);
209 }
210}
211
212pub fn path_is_field(path: &OwnedTargetPath, field: &str) -> bool {
218 path.prefix == PathPrefix::Event
219 && matches!(&path.path.segments[..], [OwnedSegment::Field(f)] if f.as_str() == field)
220}
221
222#[derive(Debug, Snafu)]
223pub enum RequestBuildError {
224 #[snafu(display("Encoded payload is greater than the max limit."))]
225 PayloadTooBig { events_that_fit: usize },
226 #[snafu(display("Failed to build payload with error: {}", error))]
227 Io { error: std::io::Error },
228 #[snafu(display("Failed to serialize payload with error: {}", error))]
229 Json { error: serde_json::Error },
230}
231
232impl From<io::Error> for RequestBuildError {
233 fn from(error: io::Error) -> RequestBuildError {
234 RequestBuildError::Io { error }
235 }
236}
237
238impl From<serde_json::Error> for RequestBuildError {
239 fn from(error: serde_json::Error) -> RequestBuildError {
240 RequestBuildError::Json { error }
241 }
242}
243
244struct LogRequestBuilder {
245 pub default_api_key: Arc<str>,
246 pub transformer: Transformer,
247 pub compression: Compression,
248 pub conforms_as_agent: bool,
249}
250
251impl LogRequestBuilder {
252 pub fn build_request(
253 &self,
254 events: Vec<Event>,
255 api_key: Arc<str>,
256 ) -> Result<Vec<LogApiRequest>, RequestBuildError> {
257 let mut events_with_estimated_size: VecDeque<(Event, JsonSize)> = events
259 .into_iter()
260 .map(|mut event| {
261 normalize_event(&mut event);
262 if self.conforms_as_agent {
263 normalize_as_agent_event(&mut event);
264 }
265 self.transformer.transform(&mut event);
266 let estimated_json_size = event.estimated_json_encoded_size_of();
267 (event, estimated_json_size)
268 })
269 .collect();
270
271 let mut requests: Vec<LogApiRequest> = Vec::new();
273 while !events_with_estimated_size.is_empty() {
274 let (events_serialized, body, byte_size) =
275 serialize_with_capacity(&mut events_with_estimated_size)?;
276 if events_serialized.is_empty() {
277 let _too_big = events_with_estimated_size.pop_front();
279 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
280 count: 1,
281 reason: "Event too large to encode."
282 });
283 } else {
284 let request =
285 self.finish_request(body, events_serialized, byte_size, Arc::clone(&api_key))?;
286 requests.push(request);
287 }
288 }
289
290 Ok(requests)
291 }
292
293 fn finish_request(
294 &self,
295 buf: Vec<u8>,
296 mut events: Vec<Event>,
297 byte_size: GroupedCountByteSize,
298 api_key: Arc<str>,
299 ) -> Result<LogApiRequest, RequestBuildError> {
300 let n_events = events.len();
301 let uncompressed_size = buf.len();
302
303 let mut compressor = Compressor::from(self.compression);
305 write_all(&mut compressor, n_events, &buf)?;
306 let bytes = compressor.into_inner().freeze();
307
308 let finalizers = events.take_finalizers();
309 let request_metadata_builder = RequestMetadataBuilder::from_events(&events);
310
311 let payload = if self.compression.is_compressed() {
312 EncodeResult::compressed(bytes, uncompressed_size, byte_size)
313 } else {
314 EncodeResult::uncompressed(bytes, byte_size)
315 };
316
317 Ok::<_, RequestBuildError>(LogApiRequest {
318 api_key,
319 finalizers,
320 compression: self.compression,
321 metadata: request_metadata_builder.build(&payload),
322 uncompressed_size: payload.uncompressed_byte_size,
323 body: payload.into_payload(),
324 })
325 }
326}
327
328pub fn serialize_with_capacity(
334 events: &mut VecDeque<(Event, JsonSize)>,
335) -> Result<(Vec<Event>, Vec<u8>, GroupedCountByteSize), io::Error> {
336 let total_estimated =
338 events.iter().map(|(_, size)| size.get()).sum::<usize>() + events.len() * 2;
339
340 let mut buf = Vec::with_capacity(total_estimated);
342 let mut byte_size = telemetry().create_request_count_byte_size();
343 let mut events_serialized = Vec::with_capacity(events.len());
344
345 buf.push(b'[');
347 let mut first = true;
348 while let Some((event, estimated_json_size)) = events.pop_front() {
349 let existing_len = buf.len();
351 if first {
352 first = false;
353 } else {
354 buf.push(b',');
355 }
356 serde_json::to_writer(&mut buf, event.as_log())?;
357 if buf.len() >= MAX_PAYLOAD_BYTES {
359 events.push_front((event, estimated_json_size));
360 buf.truncate(existing_len);
361 break;
362 }
363 byte_size.add_event(&event, estimated_json_size);
365 events_serialized.push(event);
366 }
367 buf.push(b']');
368
369 Ok((events_serialized, buf, byte_size))
370}
371
372impl<S> LogSink<S>
373where
374 S: Service<LogApiRequest> + Send + 'static,
375 S::Future: Send + 'static,
376 S::Response: DriverResponse + Send + 'static,
377 S::Error: Debug + Into<crate::Error> + Send,
378{
379 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
380 let default_api_key = Arc::clone(&self.default_api_key);
381
382 let partitioner = EventPartitioner;
383 let batch_settings = self.batch_settings;
384 let builder = Arc::new(LogRequestBuilder {
385 default_api_key,
386 transformer: self.transformer,
387 compression: self.compression,
388 conforms_as_agent: self.conforms_as_agent,
389 });
390
391 let input = input.batched_partitioned(partitioner, || {
392 batch_settings.as_item_size_config(HttpJsonBatchSizer)
393 });
394 input
395 .concurrent_map(default_request_builder_concurrency_limit(), move |input| {
396 let builder = Arc::clone(&builder);
397
398 Box::pin(async move {
399 let (api_key, events) = input;
400 let api_key = api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key));
401
402 builder.build_request(events, api_key)
403 })
404 })
405 .filter_map(|request| async move {
406 match request {
407 Err(error) => {
408 emit!(SinkRequestBuildError { error });
409 None
410 }
411 Ok(reqs) => Some(futures::stream::iter(reqs)),
412 }
413 })
414 .flatten()
415 .into_driver(self.service)
416 .protocol(self.protocol)
417 .run()
418 .await
419 }
420}
421
422#[async_trait]
423impl<S> StreamSink<Event> for LogSink<S>
424where
425 S: Service<LogApiRequest> + Send + 'static,
426 S::Future: Send + 'static,
427 S::Response: DriverResponse + Send + 'static,
428 S::Error: Debug + Into<crate::Error> + Send,
429{
430 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
431 self.run_inner(input).await
432 }
433}
434
435#[cfg(test)]
436mod tests {
437
438 use std::sync::Arc;
439
440 use chrono::Utc;
441 use vector_lib::{
442 config::{LegacyKey, LogNamespace},
443 event::{Event, EventMetadata, LogEvent},
444 schema::{meaning, Definition},
445 };
446 use vrl::{
447 core::Value,
448 event_path, metadata_path, owned_value_path, value,
449 value::{kind::Collection, Kind},
450 };
451
452 use super::{normalize_as_agent_event, normalize_event};
453 use crate::common::datadog::DD_RESERVED_SEMANTIC_ATTRS;
454
455 fn assert_normalized_log_has_expected_attrs(log: &LogEvent) {
456 assert!(log
457 .get(event_path!("timestamp"))
458 .expect("should have timestamp")
459 .is_integer());
460
461 for attr in [
462 "message",
463 "timestamp",
464 "hostname",
465 "ddtags",
466 "service",
467 "status",
468 ] {
469 assert!(log.contains(event_path!(attr)), "missing {attr}");
470 }
471
472 assert_eq!(
473 log.get(event_path!("ddtags")).expect("should have tags"),
474 &Value::Bytes("key1:value1,key2:value2".into())
475 );
476 }
477
478 fn agent_event_metadata(definition: Definition) -> EventMetadata {
479 EventMetadata::default().with_schema_definition(&Arc::new(
480 definition
481 .with_source_metadata(
482 "datadog_agent",
483 Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
484 &owned_value_path!("ddtags"),
485 Kind::bytes(),
486 Some(meaning::TAGS),
487 )
488 .with_source_metadata(
489 "datadog_agent",
490 Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
491 &owned_value_path!("hostname"),
492 Kind::bytes(),
493 Some(meaning::HOST),
494 )
495 .with_source_metadata(
496 "datadog_agent",
497 Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
498 &owned_value_path!("timestamp"),
499 Kind::timestamp(),
500 Some(meaning::TIMESTAMP),
501 )
502 .with_source_metadata(
503 "datadog_agent",
504 Some(LegacyKey::InsertIfEmpty(owned_value_path!("severity"))),
505 &owned_value_path!("severity"),
506 Kind::bytes(),
507 Some(meaning::SEVERITY),
508 )
509 .with_source_metadata(
510 "datadog_agent",
511 Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
512 &owned_value_path!("service"),
513 Kind::bytes(),
514 Some(meaning::SERVICE),
515 )
516 .with_source_metadata(
517 "datadog_agent",
518 Some(LegacyKey::InsertIfEmpty(owned_value_path!("source"))),
519 &owned_value_path!("source"),
520 Kind::bytes(),
521 Some(meaning::SOURCE),
522 ),
523 ))
524 }
525
526 #[test]
527 fn normalize_event_doesnt_require() {
528 let mut log = LogEvent::default();
529 log.insert(event_path!("foo"), "bar");
530
531 let mut event = Event::Log(log);
532 normalize_event(&mut event);
533
534 let log = event.as_log();
535
536 assert!(!log.contains(event_path!("message")));
537 assert!(!log.contains(event_path!("timestamp")));
538 assert!(!log.contains(event_path!("hostname")));
539 }
540
541 #[test]
542 fn normalize_event_normalizes_legacy_namespace() {
543 let definition = Definition::new_with_default_metadata(
544 Kind::object(Collection::empty()),
545 [LogNamespace::Legacy],
546 );
547 let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
548 log.insert(event_path!("message"), "the_message");
549 let namespace = log.namespace();
550
551 namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
552
553 let tags = vec![
554 Value::Bytes("key1:value1".into()),
555 Value::Bytes("key2:value2".into()),
556 ];
557
558 log.insert(event_path!("ddtags"), tags);
559 log.insert(event_path!("hostname"), "the_host");
560 log.insert(event_path!("service"), "the_service");
561 log.insert(event_path!("source"), "the_source");
562 log.insert(event_path!("severity"), "the_severity");
563
564 assert!(log.namespace() == LogNamespace::Legacy);
565
566 let mut event = Event::Log(log);
567 normalize_event(&mut event);
568
569 assert_normalized_log_has_expected_attrs(event.as_log());
570 }
571
572 #[test]
573 fn normalize_event_normalizes_vector_namespace_raw_field() {
574 let mut event = prepare_event_vector_namespace(|definition| {
575 LogEvent::from_parts(value!("the_message"), agent_event_metadata(definition))
576 });
577
578 normalize_event(&mut event);
579 normalize_as_agent_event(&mut event);
580
581 assert_normalized_log_has_expected_attrs(event.as_log());
582 assert_only_reserved_fields_at_root(event.as_log());
583 assert_eq!(
584 event.as_log().get("message"),
585 Some(&value!({"message": "the_message"}))
586 );
587 }
588
589 fn prepare_event_vector_namespace(log_generator: fn(Definition) -> LogEvent) -> Event {
590 let definition =
591 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector]);
592 let mut log = log_generator(definition);
593
594 log.insert(metadata_path!("vector", "foo"), "bar");
596
597 let namespace = log.namespace();
598 namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
599
600 let tags = vec![
601 Value::Bytes("key1:value1".into()),
602 Value::Bytes("key2:value2".into()),
603 ];
604 log.insert(metadata_path!("datadog_agent", "ddtags"), tags);
605
606 log.insert(metadata_path!("datadog_agent", "hostname"), "the_host");
607 log.insert(metadata_path!("datadog_agent", "timestamp"), Utc::now());
608 log.insert(metadata_path!("datadog_agent", "service"), "the_service");
609 log.insert(metadata_path!("datadog_agent", "source"), "the_source");
610 log.insert(metadata_path!("datadog_agent", "severity"), "the_severity");
611
612 assert!(log.namespace() == LogNamespace::Vector);
613 Event::Log(log)
614 }
615
616 #[test]
617 fn normalize_event_normalizes_vector_namespace() {
618 let mut event = prepare_event_vector_namespace(|definition| {
619 let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
620 log.insert(event_path!("message"), "the_message");
621 log
622 });
623
624 normalize_event(&mut event);
625 normalize_as_agent_event(&mut event);
626
627 assert_normalized_log_has_expected_attrs(event.as_log());
628 assert_only_reserved_fields_at_root(event.as_log());
629 }
630
631 fn prepare_agent_event() -> LogEvent {
632 let definition = Definition::new_with_default_metadata(
633 Kind::object(Collection::empty()),
634 [LogNamespace::Legacy],
635 );
636 let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
637 let namespace = log.namespace();
638 namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
639
640 let tags = vec![
641 Value::Bytes("key1:value1".into()),
642 Value::Bytes("key2:value2".into()),
643 ];
644
645 log.insert(event_path!("ddtags"), tags);
647 log.insert(event_path!("hostname"), "the_host");
648 log.insert(event_path!("service"), "the_service");
649 log.insert(event_path!("timestamp"), Utc::now());
650 log.insert(event_path!("source"), "the_source");
651 log.insert(event_path!("severity"), "the_severity");
652
653 let sample_message = value!({
654 "message": "hello world",
655 "field_a": "field_a_value",
656 "field_b": "field_b_value",
657 "field_c": { "field_c_nested" : "field_c_value" },
658 });
659 log.insert(event_path!("message"), sample_message.to_string());
660 log
661 }
662
663 fn assert_only_reserved_fields_at_root(log: &LogEvent) {
664 let objmap = log.as_map().unwrap();
665 let reserved_fields = DD_RESERVED_SEMANTIC_ATTRS
666 .into_iter()
667 .chain([("message", "message")])
668 .collect::<Vec<(&str, &str)>>();
669 for key in objmap.keys() {
670 assert!(reserved_fields.iter().any(|(_, msg)| *msg == key.as_str()));
671 }
672 }
673
674 #[test]
675 fn normalize_conforming_agent_with_collisions() {
676 let mut log = prepare_agent_event();
677
678 log.insert(event_path!("field_a"), "replaced_field_a_value");
680 log.insert(event_path!("field_c"), "replaced_field_c_value");
681 let mut event = Event::Log(log);
682 normalize_event(&mut event);
683 normalize_as_agent_event(&mut event);
684
685 let log = event.as_log();
686 assert_normalized_log_has_expected_attrs(log);
687 assert_only_reserved_fields_at_root(log);
688 assert_eq!(
689 log.get(event_path!("message")),
690 Some(&value!({
691 "source_type": "datadog_agent",
692 "field_a": "replaced_field_a_value",
693 "field_c": "replaced_field_c_value",
694 "message": (value!({
695 "message": "hello world",
696 "field_a": "field_a_value",
697 "field_b": "field_b_value",
698 "field_c": { "field_c_nested" : "field_c_value" },
699 }).to_string()),
700 }))
701 );
702 }
703
704 #[test]
705 fn normalize_conforming_agent() {
706 let mut log = prepare_agent_event();
707
708 log.insert(event_path!("field_1"), "value_1");
710 log.insert(event_path!("field_2"), "value_2");
711 log.insert(event_path!("field_3", "field_3_nested"), "value_3");
712
713 let mut event = Event::Log(log);
715 normalize_event(&mut event);
716 normalize_as_agent_event(&mut event);
717
718 let log = event.as_log();
720 assert_normalized_log_has_expected_attrs(log);
721 assert_only_reserved_fields_at_root(log);
722
723 assert_eq!(
725 log.get(event_path!("message")),
726 Some(&value!({
727 "source_type": "datadog_agent",
728 "message": (value!({
729 "message": "hello world",
730 "field_a": "field_a_value",
731 "field_b": "field_b_value",
732 "field_c": { "field_c_nested" : "field_c_value" },
733 }).to_string()),
734 "field_1": "value_1",
735 "field_2": "value_2",
736 "field_3": {
737 "field_3_nested": "value_3"
738 }
739 }))
740 );
741 }
742}