1use std::{collections::VecDeque, fmt::Debug, io, sync::Arc};
2
3use itertools::Itertools;
4use snafu::Snafu;
5use vector_lib::{
6 event::{ObjectMap, Value},
7 internal_event::{ComponentEventsDropped, UNINTENTIONAL},
8 lookup::event_path,
9};
10use vrl::path::{OwnedSegment, OwnedTargetPath, PathPrefix};
11
12use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest};
13use crate::{
14 common::datadog::{DD_RESERVED_SEMANTIC_ATTRS, DDTAGS, MESSAGE, is_reserved_attribute},
15 sinks::{
16 prelude::*,
17 util::{Compressor, http::HttpJsonBatchSizer},
18 },
19};
20#[derive(Default)]
21pub struct EventPartitioner;
22
23impl Partitioner for EventPartitioner {
24 type Item = Event;
25 type Key = Option<Arc<str>>;
26
27 fn partition(&self, item: &Self::Item) -> Self::Key {
28 item.metadata().datadog_api_key()
29 }
30}
31
32#[derive(Debug)]
33pub struct LogSinkBuilder<S> {
34 transformer: Transformer,
35 service: S,
36 batch_settings: BatcherSettings,
37 compression: Option<Compression>,
38 default_api_key: Arc<str>,
39 protocol: String,
40 conforms_as_agent: bool,
41}
42
43impl<S> LogSinkBuilder<S> {
44 pub const fn new(
45 transformer: Transformer,
46 service: S,
47 default_api_key: Arc<str>,
48 batch_settings: BatcherSettings,
49 protocol: String,
50 conforms_as_agent: bool,
51 ) -> Self {
52 Self {
53 transformer,
54 service,
55 default_api_key,
56 batch_settings,
57 compression: None,
58 protocol,
59 conforms_as_agent,
60 }
61 }
62
63 pub const fn compression(mut self, compression: Compression) -> Self {
64 self.compression = Some(compression);
65 self
66 }
67
68 pub fn build(self) -> LogSink<S> {
69 LogSink {
70 default_api_key: self.default_api_key,
71 transformer: self.transformer,
72 service: self.service,
73 batch_settings: self.batch_settings,
74 compression: self.compression.unwrap_or_default(),
75 protocol: self.protocol,
76 conforms_as_agent: self.conforms_as_agent,
77 }
78 }
79}
80
81pub struct LogSink<S> {
82 default_api_key: Arc<str>,
89 service: S,
91 transformer: Transformer,
93 compression: Compression,
95 batch_settings: BatcherSettings,
97 protocol: String,
99 conforms_as_agent: bool,
101}
102
103pub fn normalize_event(event: &mut Event) {
107 let log = event.as_mut_log();
108
109 if !log.value().is_object() {
111 log.insert(MESSAGE, log.value().clone());
112 }
113
114 for (meaning, expected_field_name) in DD_RESERVED_SEMANTIC_ATTRS {
117 if let Some(current_path) = log.find_key_by_meaning(meaning).cloned() {
119 position_reserved_attr_event_root(log, ¤t_path, expected_field_name, meaning);
121 }
122 }
123
124 let ddtags_path = event_path!(DDTAGS);
128 if let Some(Value::Array(tags_arr)) = log.get(ddtags_path)
129 && !tags_arr.is_empty()
130 {
131 let all_tags: String = tags_arr
132 .iter()
133 .filter_map(|tag_kv| {
134 tag_kv
135 .as_bytes()
136 .map(|bytes| String::from_utf8_lossy(bytes))
137 })
138 .join(",");
139
140 log.insert(ddtags_path, all_tags);
141 }
142
143 let ts_path = event_path!("timestamp");
147 if let Some(Value::Timestamp(ts)) = log.remove(ts_path) {
148 log.insert(ts_path, Value::Integer(ts.timestamp_millis()));
149 }
150}
151
152pub fn normalize_as_agent_event(event: &mut Event) {
159 let log = event.as_mut_log();
160 let Some(object_map) = log.as_map_mut() else {
162 return;
163 };
164 let mut local_root = ObjectMap::default();
166 let keys_to_move = object_map
167 .keys()
168 .filter(|ks| !is_reserved_attribute(ks.as_str()))
169 .cloned()
170 .collect::<Vec<_>>();
171 for key in keys_to_move {
172 if let Some((entry_k, entry_v)) = object_map.remove_entry(key.as_str()) {
173 local_root.insert(entry_k, entry_v);
174 }
175 }
176 log.insert(MESSAGE, local_root);
178}
179
180pub fn position_reserved_attr_event_root(
183 log: &mut LogEvent,
184 current_path: &OwnedTargetPath,
185 expected_field_name: &str,
186 meaning: &str,
187) {
188 let desired_path = event_path!(expected_field_name);
190
191 if !path_is_field(current_path, expected_field_name) {
193 if log.contains(desired_path) {
196 let rename_attr = format!("_RESERVED_{meaning}");
197 let rename_path = event_path!(rename_attr.as_str());
198 warn!(
199 message = "Semantic meaning is defined, but the event path already exists. Renaming to not overwrite.",
200 meaning = meaning,
201 renamed = &rename_attr,
202 );
203 log.rename_key(desired_path, rename_path);
204 }
205
206 log.rename_key(current_path, desired_path);
207 }
208}
209
210pub fn path_is_field(path: &OwnedTargetPath, field: &str) -> bool {
216 path.prefix == PathPrefix::Event
217 && matches!(&path.path.segments[..], [OwnedSegment::Field(f)] if f.as_str() == field)
218}
219
220#[derive(Debug, Snafu)]
221pub enum RequestBuildError {
222 #[snafu(display("Encoded payload is greater than the max limit."))]
223 PayloadTooBig { events_that_fit: usize },
224 #[snafu(display("Failed to build payload with error: {}", error))]
225 Io { error: std::io::Error },
226 #[snafu(display("Failed to serialize payload with error: {}", error))]
227 Json { error: serde_json::Error },
228}
229
230impl From<io::Error> for RequestBuildError {
231 fn from(error: io::Error) -> RequestBuildError {
232 RequestBuildError::Io { error }
233 }
234}
235
236impl From<serde_json::Error> for RequestBuildError {
237 fn from(error: serde_json::Error) -> RequestBuildError {
238 RequestBuildError::Json { error }
239 }
240}
241
242struct LogRequestBuilder {
243 pub default_api_key: Arc<str>,
244 pub transformer: Transformer,
245 pub compression: Compression,
246 pub conforms_as_agent: bool,
247}
248
249impl LogRequestBuilder {
250 pub fn build_request(
251 &self,
252 events: Vec<Event>,
253 api_key: Arc<str>,
254 ) -> Result<Vec<LogApiRequest>, RequestBuildError> {
255 let mut events_with_estimated_size: VecDeque<(Event, JsonSize)> = events
257 .into_iter()
258 .map(|mut event| {
259 normalize_event(&mut event);
260 if self.conforms_as_agent {
261 normalize_as_agent_event(&mut event);
262 }
263 self.transformer.transform(&mut event);
264 let estimated_json_size = event.estimated_json_encoded_size_of();
265 (event, estimated_json_size)
266 })
267 .collect();
268
269 let mut requests: Vec<LogApiRequest> = Vec::new();
271 while !events_with_estimated_size.is_empty() {
272 let (events_serialized, body, byte_size) =
273 serialize_with_capacity(&mut events_with_estimated_size)?;
274 if events_serialized.is_empty() {
275 let _too_big = events_with_estimated_size.pop_front();
277 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
278 count: 1,
279 reason: "Event too large to encode."
280 });
281 } else {
282 let request =
283 self.finish_request(body, events_serialized, byte_size, Arc::clone(&api_key))?;
284 requests.push(request);
285 }
286 }
287
288 Ok(requests)
289 }
290
291 fn finish_request(
292 &self,
293 buf: Vec<u8>,
294 mut events: Vec<Event>,
295 byte_size: GroupedCountByteSize,
296 api_key: Arc<str>,
297 ) -> Result<LogApiRequest, RequestBuildError> {
298 let n_events = events.len();
299 let uncompressed_size = buf.len();
300
301 let mut compressor = Compressor::from(self.compression);
303 write_all(&mut compressor, n_events, &buf)?;
304 let bytes = compressor.into_inner().freeze();
305
306 let finalizers = events.take_finalizers();
307 let request_metadata_builder = RequestMetadataBuilder::from_events(&events);
308
309 let payload = if self.compression.is_compressed() {
310 EncodeResult::compressed(bytes, uncompressed_size, byte_size)
311 } else {
312 EncodeResult::uncompressed(bytes, byte_size)
313 };
314
315 Ok::<_, RequestBuildError>(LogApiRequest {
316 api_key,
317 finalizers,
318 compression: self.compression,
319 metadata: request_metadata_builder.build(&payload),
320 uncompressed_size: payload.uncompressed_byte_size,
321 body: payload.into_payload(),
322 })
323 }
324}
325
326pub fn serialize_with_capacity(
332 events: &mut VecDeque<(Event, JsonSize)>,
333) -> Result<(Vec<Event>, Vec<u8>, GroupedCountByteSize), io::Error> {
334 let total_estimated =
336 events.iter().map(|(_, size)| size.get()).sum::<usize>() + events.len() * 2;
337
338 let mut buf = Vec::with_capacity(total_estimated);
340 let mut byte_size = telemetry().create_request_count_byte_size();
341 let mut events_serialized = Vec::with_capacity(events.len());
342
343 buf.push(b'[');
345 let mut first = true;
346 while let Some((event, estimated_json_size)) = events.pop_front() {
347 let existing_len = buf.len();
349 if first {
350 first = false;
351 } else {
352 buf.push(b',');
353 }
354 serde_json::to_writer(&mut buf, event.as_log())?;
355 if buf.len() >= MAX_PAYLOAD_BYTES {
357 events.push_front((event, estimated_json_size));
358 buf.truncate(existing_len);
359 break;
360 }
361 byte_size.add_event(&event, estimated_json_size);
363 events_serialized.push(event);
364 }
365 buf.push(b']');
366
367 Ok((events_serialized, buf, byte_size))
368}
369
370impl<S> LogSink<S>
371where
372 S: Service<LogApiRequest> + Send + 'static,
373 S::Future: Send + 'static,
374 S::Response: DriverResponse + Send + 'static,
375 S::Error: Debug + Into<crate::Error> + Send,
376{
377 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
378 let default_api_key = Arc::clone(&self.default_api_key);
379
380 let partitioner = EventPartitioner;
381 let batch_settings = self.batch_settings;
382 let builder = Arc::new(LogRequestBuilder {
383 default_api_key,
384 transformer: self.transformer,
385 compression: self.compression,
386 conforms_as_agent: self.conforms_as_agent,
387 });
388
389 let input = input.batched_partitioned(partitioner, || {
390 batch_settings.as_item_size_config(HttpJsonBatchSizer)
391 });
392 input
393 .concurrent_map(default_request_builder_concurrency_limit(), move |input| {
394 let builder = Arc::clone(&builder);
395
396 Box::pin(async move {
397 let (api_key, events) = input;
398 let api_key = api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key));
399
400 builder.build_request(events, api_key)
401 })
402 })
403 .filter_map(|request| async move {
404 match request {
405 Err(error) => {
406 emit!(SinkRequestBuildError { error });
407 None
408 }
409 Ok(reqs) => Some(futures::stream::iter(reqs)),
410 }
411 })
412 .flatten()
413 .into_driver(self.service)
414 .protocol(self.protocol)
415 .run()
416 .await
417 }
418}
419
420#[async_trait]
421impl<S> StreamSink<Event> for LogSink<S>
422where
423 S: Service<LogApiRequest> + Send + 'static,
424 S::Future: Send + 'static,
425 S::Response: DriverResponse + Send + 'static,
426 S::Error: Debug + Into<crate::Error> + Send,
427{
428 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
429 self.run_inner(input).await
430 }
431}
432
433#[cfg(test)]
434mod tests {
435
436 use std::sync::Arc;
437
438 use chrono::Utc;
439 use vector_lib::{
440 config::{LegacyKey, LogNamespace},
441 event::{Event, EventMetadata, LogEvent},
442 schema::{Definition, meaning},
443 };
444 use vrl::{
445 core::Value,
446 event_path, metadata_path, owned_value_path, value,
447 value::{Kind, kind::Collection},
448 };
449
450 use super::{normalize_as_agent_event, normalize_event};
451 use crate::common::datadog::DD_RESERVED_SEMANTIC_ATTRS;
452
453 fn assert_normalized_log_has_expected_attrs(log: &LogEvent) {
454 assert!(
455 log.get(event_path!("timestamp"))
456 .expect("should have timestamp")
457 .is_integer()
458 );
459
460 for attr in [
461 "message",
462 "timestamp",
463 "hostname",
464 "ddtags",
465 "service",
466 "status",
467 ] {
468 assert!(log.contains(event_path!(attr)), "missing {attr}");
469 }
470
471 assert_eq!(
472 log.get(event_path!("ddtags")).expect("should have tags"),
473 &Value::Bytes("key1:value1,key2:value2".into())
474 );
475 }
476
477 fn agent_event_metadata(definition: Definition) -> EventMetadata {
478 EventMetadata::default().with_schema_definition(&Arc::new(
479 definition
480 .with_source_metadata(
481 "datadog_agent",
482 Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
483 &owned_value_path!("ddtags"),
484 Kind::bytes(),
485 Some(meaning::TAGS),
486 )
487 .with_source_metadata(
488 "datadog_agent",
489 Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
490 &owned_value_path!("hostname"),
491 Kind::bytes(),
492 Some(meaning::HOST),
493 )
494 .with_source_metadata(
495 "datadog_agent",
496 Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
497 &owned_value_path!("timestamp"),
498 Kind::timestamp(),
499 Some(meaning::TIMESTAMP),
500 )
501 .with_source_metadata(
502 "datadog_agent",
503 Some(LegacyKey::InsertIfEmpty(owned_value_path!("severity"))),
504 &owned_value_path!("severity"),
505 Kind::bytes(),
506 Some(meaning::SEVERITY),
507 )
508 .with_source_metadata(
509 "datadog_agent",
510 Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
511 &owned_value_path!("service"),
512 Kind::bytes(),
513 Some(meaning::SERVICE),
514 )
515 .with_source_metadata(
516 "datadog_agent",
517 Some(LegacyKey::InsertIfEmpty(owned_value_path!("source"))),
518 &owned_value_path!("source"),
519 Kind::bytes(),
520 Some(meaning::SOURCE),
521 ),
522 ))
523 }
524
525 #[test]
526 fn normalize_event_doesnt_require() {
527 let mut log = LogEvent::default();
528 log.insert(event_path!("foo"), "bar");
529
530 let mut event = Event::Log(log);
531 normalize_event(&mut event);
532
533 let log = event.as_log();
534
535 assert!(!log.contains(event_path!("message")));
536 assert!(!log.contains(event_path!("timestamp")));
537 assert!(!log.contains(event_path!("hostname")));
538 }
539
540 #[test]
541 fn normalize_event_normalizes_legacy_namespace() {
542 let definition = Definition::new_with_default_metadata(
543 Kind::object(Collection::empty()),
544 [LogNamespace::Legacy],
545 );
546 let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
547 log.insert(event_path!("message"), "the_message");
548 let namespace = log.namespace();
549
550 namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
551
552 let tags = vec![
553 Value::Bytes("key1:value1".into()),
554 Value::Bytes("key2:value2".into()),
555 ];
556
557 log.insert(event_path!("ddtags"), tags);
558 log.insert(event_path!("hostname"), "the_host");
559 log.insert(event_path!("service"), "the_service");
560 log.insert(event_path!("source"), "the_source");
561 log.insert(event_path!("severity"), "the_severity");
562
563 assert!(log.namespace() == LogNamespace::Legacy);
564
565 let mut event = Event::Log(log);
566 normalize_event(&mut event);
567
568 assert_normalized_log_has_expected_attrs(event.as_log());
569 }
570
571 #[test]
572 fn normalize_event_normalizes_vector_namespace_raw_field() {
573 let mut event = prepare_event_vector_namespace(|definition| {
574 LogEvent::from_parts(value!("the_message"), agent_event_metadata(definition))
575 });
576
577 normalize_event(&mut event);
578 normalize_as_agent_event(&mut event);
579
580 assert_normalized_log_has_expected_attrs(event.as_log());
581 assert_only_reserved_fields_at_root(event.as_log());
582 assert_eq!(
583 event.as_log().get("message"),
584 Some(&value!({"message": "the_message"}))
585 );
586 }
587
588 fn prepare_event_vector_namespace(log_generator: fn(Definition) -> LogEvent) -> Event {
589 let definition =
590 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector]);
591 let mut log = log_generator(definition);
592
593 log.insert(metadata_path!("vector", "foo"), "bar");
595
596 let namespace = log.namespace();
597 namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
598
599 let tags = vec![
600 Value::Bytes("key1:value1".into()),
601 Value::Bytes("key2:value2".into()),
602 ];
603 log.insert(metadata_path!("datadog_agent", "ddtags"), tags);
604
605 log.insert(metadata_path!("datadog_agent", "hostname"), "the_host");
606 log.insert(metadata_path!("datadog_agent", "timestamp"), Utc::now());
607 log.insert(metadata_path!("datadog_agent", "service"), "the_service");
608 log.insert(metadata_path!("datadog_agent", "source"), "the_source");
609 log.insert(metadata_path!("datadog_agent", "severity"), "the_severity");
610
611 assert!(log.namespace() == LogNamespace::Vector);
612 Event::Log(log)
613 }
614
615 #[test]
616 fn normalize_event_normalizes_vector_namespace() {
617 let mut event = prepare_event_vector_namespace(|definition| {
618 let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
619 log.insert(event_path!("message"), "the_message");
620 log
621 });
622
623 normalize_event(&mut event);
624 normalize_as_agent_event(&mut event);
625
626 assert_normalized_log_has_expected_attrs(event.as_log());
627 assert_only_reserved_fields_at_root(event.as_log());
628 }
629
630 fn prepare_agent_event() -> LogEvent {
631 let definition = Definition::new_with_default_metadata(
632 Kind::object(Collection::empty()),
633 [LogNamespace::Legacy],
634 );
635 let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
636 let namespace = log.namespace();
637 namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
638
639 let tags = vec![
640 Value::Bytes("key1:value1".into()),
641 Value::Bytes("key2:value2".into()),
642 ];
643
644 log.insert(event_path!("ddtags"), tags);
646 log.insert(event_path!("hostname"), "the_host");
647 log.insert(event_path!("service"), "the_service");
648 log.insert(event_path!("timestamp"), Utc::now());
649 log.insert(event_path!("source"), "the_source");
650 log.insert(event_path!("severity"), "the_severity");
651
652 let sample_message = value!({
653 "message": "hello world",
654 "field_a": "field_a_value",
655 "field_b": "field_b_value",
656 "field_c": { "field_c_nested" : "field_c_value" },
657 });
658 log.insert(event_path!("message"), sample_message.to_string());
659 log
660 }
661
662 fn assert_only_reserved_fields_at_root(log: &LogEvent) {
663 let objmap = log.as_map().unwrap();
664 let reserved_fields = DD_RESERVED_SEMANTIC_ATTRS
665 .into_iter()
666 .chain([("message", "message")])
667 .collect::<Vec<(&str, &str)>>();
668 for key in objmap.keys() {
669 assert!(reserved_fields.iter().any(|(_, msg)| *msg == key.as_str()));
670 }
671 }
672
673 #[test]
674 fn normalize_conforming_agent_with_collisions() {
675 let mut log = prepare_agent_event();
676
677 log.insert(event_path!("field_a"), "replaced_field_a_value");
679 log.insert(event_path!("field_c"), "replaced_field_c_value");
680 let mut event = Event::Log(log);
681 normalize_event(&mut event);
682 normalize_as_agent_event(&mut event);
683
684 let log = event.as_log();
685 assert_normalized_log_has_expected_attrs(log);
686 assert_only_reserved_fields_at_root(log);
687 assert_eq!(
688 log.get(event_path!("message")),
689 Some(&value!({
690 "source_type": "datadog_agent",
691 "field_a": "replaced_field_a_value",
692 "field_c": "replaced_field_c_value",
693 "message": (value!({
694 "message": "hello world",
695 "field_a": "field_a_value",
696 "field_b": "field_b_value",
697 "field_c": { "field_c_nested" : "field_c_value" },
698 }).to_string()),
699 }))
700 );
701 }
702
703 #[test]
704 fn normalize_conforming_agent() {
705 let mut log = prepare_agent_event();
706
707 log.insert(event_path!("field_1"), "value_1");
709 log.insert(event_path!("field_2"), "value_2");
710 log.insert(event_path!("field_3", "field_3_nested"), "value_3");
711
712 let mut event = Event::Log(log);
714 normalize_event(&mut event);
715 normalize_as_agent_event(&mut event);
716
717 let log = event.as_log();
719 assert_normalized_log_has_expected_attrs(log);
720 assert_only_reserved_fields_at_root(log);
721
722 assert_eq!(
724 log.get(event_path!("message")),
725 Some(&value!({
726 "source_type": "datadog_agent",
727 "message": (value!({
728 "message": "hello world",
729 "field_a": "field_a_value",
730 "field_b": "field_b_value",
731 "field_c": { "field_c_nested" : "field_c_value" },
732 }).to_string()),
733 "field_1": "value_1",
734 "field_2": "value_2",
735 "field_3": {
736 "field_3_nested": "value_3"
737 }
738 }))
739 );
740 }
741}