1use std::{collections::VecDeque, fmt::Debug, io, sync::Arc};
2
3use itertools::Itertools;
4use snafu::Snafu;
5use vector_lib::{
6 event::{ObjectMap, Value},
7 internal_event::{ComponentEventsDropped, UNINTENTIONAL},
8 lookup::event_path,
9};
10use vrl::path::{OwnedSegment, OwnedTargetPath, PathPrefix};
11
12use super::{config::MAX_PAYLOAD_BYTES, service::LogApiRequest};
13use crate::{
14 common::datadog::{DD_RESERVED_SEMANTIC_ATTRS, DDTAGS, MESSAGE, is_reserved_attribute},
15 sinks::{
16 prelude::*,
17 util::{Compressor, http::HttpJsonBatchSizer},
18 },
19};
20#[derive(Default)]
21pub struct EventPartitioner;
22
23impl Partitioner for EventPartitioner {
24 type Item = Event;
25 type Key = Option<Arc<str>>;
26
27 fn partition(&self, item: &Self::Item) -> Self::Key {
28 item.metadata().datadog_api_key()
29 }
30}
31
32#[derive(Debug)]
33pub struct LogSinkBuilder<S> {
34 transformer: Transformer,
35 service: S,
36 batch_settings: BatcherSettings,
37 compression: Option<Compression>,
38 default_api_key: Arc<str>,
39 protocol: String,
40 conforms_as_agent: bool,
41}
42
43impl<S> LogSinkBuilder<S> {
44 pub const fn new(
45 transformer: Transformer,
46 service: S,
47 default_api_key: Arc<str>,
48 batch_settings: BatcherSettings,
49 protocol: String,
50 conforms_as_agent: bool,
51 ) -> Self {
52 Self {
53 transformer,
54 service,
55 default_api_key,
56 batch_settings,
57 compression: None,
58 protocol,
59 conforms_as_agent,
60 }
61 }
62
63 pub const fn compression(mut self, compression: Compression) -> Self {
64 self.compression = Some(compression);
65 self
66 }
67
68 pub fn build(self) -> LogSink<S> {
69 LogSink {
70 default_api_key: self.default_api_key,
71 transformer: self.transformer,
72 service: self.service,
73 batch_settings: self.batch_settings,
74 compression: self.compression.unwrap_or_default(),
75 protocol: self.protocol,
76 conforms_as_agent: self.conforms_as_agent,
77 }
78 }
79}
80
81pub struct LogSink<S> {
82 default_api_key: Arc<str>,
89 service: S,
91 transformer: Transformer,
93 compression: Compression,
95 batch_settings: BatcherSettings,
97 protocol: String,
99 conforms_as_agent: bool,
101}
102
103pub fn normalize_event(event: &mut Event) {
107 let log = event.as_mut_log();
108
109 if !log.value().is_object() {
111 log.insert(MESSAGE, log.value().clone());
112 }
113
114 for (meaning, expected_field_name) in DD_RESERVED_SEMANTIC_ATTRS {
117 if let Some(current_path) = log.find_key_by_meaning(meaning).cloned() {
119 position_reserved_attr_event_root(log, ¤t_path, expected_field_name, meaning);
121 }
122 }
123
124 let ddtags_path = event_path!(DDTAGS);
128 if let Some(Value::Array(tags_arr)) = log.get(ddtags_path)
129 && !tags_arr.is_empty()
130 {
131 let all_tags: String = tags_arr
132 .iter()
133 .filter_map(|tag_kv| {
134 tag_kv
135 .as_bytes()
136 .map(|bytes| String::from_utf8_lossy(bytes))
137 })
138 .join(",");
139
140 log.insert(ddtags_path, all_tags);
141 }
142
143 let ts_path = event_path!("timestamp");
147 if let Some(Value::Timestamp(ts)) = log.remove(ts_path) {
148 log.insert(ts_path, Value::Integer(ts.timestamp_millis()));
149 }
150}
151
152pub fn normalize_as_agent_event(event: &mut Event) {
159 let log = event.as_mut_log();
160 let Some(object_map) = log.as_map_mut() else {
162 return;
163 };
164 let mut local_root = ObjectMap::default();
166 let keys_to_move = object_map
167 .keys()
168 .filter(|ks| !is_reserved_attribute(ks.as_str()))
169 .cloned()
170 .collect::<Vec<_>>();
171 for key in keys_to_move {
172 if let Some((entry_k, entry_v)) = object_map.remove_entry(key.as_str()) {
173 local_root.insert(entry_k, entry_v);
174 }
175 }
176 log.insert(MESSAGE, local_root);
178}
179
180pub fn position_reserved_attr_event_root(
183 log: &mut LogEvent,
184 current_path: &OwnedTargetPath,
185 expected_field_name: &str,
186 meaning: &str,
187) {
188 let desired_path = event_path!(expected_field_name);
190
191 if !path_is_field(current_path, expected_field_name) {
193 if log.contains(desired_path) {
196 let rename_attr = format!("_RESERVED_{meaning}");
197 let rename_path = event_path!(rename_attr.as_str());
198 warn!(
199 message = "Semantic meaning is defined, but the event path already exists. Renaming to not overwrite.",
200 meaning = meaning,
201 renamed = &rename_attr,
202 internal_log_rate_limit = true,
203 );
204 log.rename_key(desired_path, rename_path);
205 }
206
207 log.rename_key(current_path, desired_path);
208 }
209}
210
211pub fn path_is_field(path: &OwnedTargetPath, field: &str) -> bool {
217 path.prefix == PathPrefix::Event
218 && matches!(&path.path.segments[..], [OwnedSegment::Field(f)] if f.as_str() == field)
219}
220
221#[derive(Debug, Snafu)]
222pub enum RequestBuildError {
223 #[snafu(display("Encoded payload is greater than the max limit."))]
224 PayloadTooBig { events_that_fit: usize },
225 #[snafu(display("Failed to build payload with error: {}", error))]
226 Io { error: std::io::Error },
227 #[snafu(display("Failed to serialize payload with error: {}", error))]
228 Json { error: serde_json::Error },
229}
230
231impl From<io::Error> for RequestBuildError {
232 fn from(error: io::Error) -> RequestBuildError {
233 RequestBuildError::Io { error }
234 }
235}
236
237impl From<serde_json::Error> for RequestBuildError {
238 fn from(error: serde_json::Error) -> RequestBuildError {
239 RequestBuildError::Json { error }
240 }
241}
242
243struct LogRequestBuilder {
244 pub default_api_key: Arc<str>,
245 pub transformer: Transformer,
246 pub compression: Compression,
247 pub conforms_as_agent: bool,
248}
249
250impl LogRequestBuilder {
251 pub fn build_request(
252 &self,
253 events: Vec<Event>,
254 api_key: Arc<str>,
255 ) -> Result<Vec<LogApiRequest>, RequestBuildError> {
256 let mut events_with_estimated_size: VecDeque<(Event, JsonSize)> = events
258 .into_iter()
259 .map(|mut event| {
260 normalize_event(&mut event);
261 if self.conforms_as_agent {
262 normalize_as_agent_event(&mut event);
263 }
264 self.transformer.transform(&mut event);
265 let estimated_json_size = event.estimated_json_encoded_size_of();
266 (event, estimated_json_size)
267 })
268 .collect();
269
270 let mut requests: Vec<LogApiRequest> = Vec::new();
272 while !events_with_estimated_size.is_empty() {
273 let (events_serialized, body, byte_size) =
274 serialize_with_capacity(&mut events_with_estimated_size)?;
275 if events_serialized.is_empty() {
276 let _too_big = events_with_estimated_size.pop_front();
278 emit!(ComponentEventsDropped::<UNINTENTIONAL> {
279 count: 1,
280 reason: "Event too large to encode."
281 });
282 } else {
283 let request =
284 self.finish_request(body, events_serialized, byte_size, Arc::clone(&api_key))?;
285 requests.push(request);
286 }
287 }
288
289 Ok(requests)
290 }
291
292 fn finish_request(
293 &self,
294 buf: Vec<u8>,
295 mut events: Vec<Event>,
296 byte_size: GroupedCountByteSize,
297 api_key: Arc<str>,
298 ) -> Result<LogApiRequest, RequestBuildError> {
299 let n_events = events.len();
300 let uncompressed_size = buf.len();
301
302 let mut compressor = Compressor::from(self.compression);
304 write_all(&mut compressor, n_events, &buf)?;
305 let bytes = compressor.into_inner().freeze();
306
307 let finalizers = events.take_finalizers();
308 let request_metadata_builder = RequestMetadataBuilder::from_events(&events);
309
310 let payload = if self.compression.is_compressed() {
311 EncodeResult::compressed(bytes, uncompressed_size, byte_size)
312 } else {
313 EncodeResult::uncompressed(bytes, byte_size)
314 };
315
316 Ok::<_, RequestBuildError>(LogApiRequest {
317 api_key,
318 finalizers,
319 compression: self.compression,
320 metadata: request_metadata_builder.build(&payload),
321 uncompressed_size: payload.uncompressed_byte_size,
322 body: payload.into_payload(),
323 })
324 }
325}
326
327pub fn serialize_with_capacity(
333 events: &mut VecDeque<(Event, JsonSize)>,
334) -> Result<(Vec<Event>, Vec<u8>, GroupedCountByteSize), io::Error> {
335 let total_estimated =
337 events.iter().map(|(_, size)| size.get()).sum::<usize>() + events.len() * 2;
338
339 let mut buf = Vec::with_capacity(total_estimated);
341 let mut byte_size = telemetry().create_request_count_byte_size();
342 let mut events_serialized = Vec::with_capacity(events.len());
343
344 buf.push(b'[');
346 let mut first = true;
347 while let Some((event, estimated_json_size)) = events.pop_front() {
348 let existing_len = buf.len();
350 if first {
351 first = false;
352 } else {
353 buf.push(b',');
354 }
355 serde_json::to_writer(&mut buf, event.as_log())?;
356 if buf.len() >= MAX_PAYLOAD_BYTES {
358 events.push_front((event, estimated_json_size));
359 buf.truncate(existing_len);
360 break;
361 }
362 byte_size.add_event(&event, estimated_json_size);
364 events_serialized.push(event);
365 }
366 buf.push(b']');
367
368 Ok((events_serialized, buf, byte_size))
369}
370
371impl<S> LogSink<S>
372where
373 S: Service<LogApiRequest> + Send + 'static,
374 S::Future: Send + 'static,
375 S::Response: DriverResponse + Send + 'static,
376 S::Error: Debug + Into<crate::Error> + Send,
377{
378 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
379 let default_api_key = Arc::clone(&self.default_api_key);
380
381 let partitioner = EventPartitioner;
382 let batch_settings = self.batch_settings;
383 let builder = Arc::new(LogRequestBuilder {
384 default_api_key,
385 transformer: self.transformer,
386 compression: self.compression,
387 conforms_as_agent: self.conforms_as_agent,
388 });
389
390 let input = input.batched_partitioned(partitioner, || {
391 batch_settings.as_item_size_config(HttpJsonBatchSizer)
392 });
393 input
394 .concurrent_map(default_request_builder_concurrency_limit(), move |input| {
395 let builder = Arc::clone(&builder);
396
397 Box::pin(async move {
398 let (api_key, events) = input;
399 let api_key = api_key.unwrap_or_else(|| Arc::clone(&builder.default_api_key));
400
401 builder.build_request(events, api_key)
402 })
403 })
404 .filter_map(|request| async move {
405 match request {
406 Err(error) => {
407 emit!(SinkRequestBuildError { error });
408 None
409 }
410 Ok(reqs) => Some(futures::stream::iter(reqs)),
411 }
412 })
413 .flatten()
414 .into_driver(self.service)
415 .protocol(self.protocol)
416 .run()
417 .await
418 }
419}
420
421#[async_trait]
422impl<S> StreamSink<Event> for LogSink<S>
423where
424 S: Service<LogApiRequest> + Send + 'static,
425 S::Future: Send + 'static,
426 S::Response: DriverResponse + Send + 'static,
427 S::Error: Debug + Into<crate::Error> + Send,
428{
429 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
430 self.run_inner(input).await
431 }
432}
433
434#[cfg(test)]
435mod tests {
436
437 use std::sync::Arc;
438
439 use chrono::Utc;
440 use vector_lib::{
441 config::{LegacyKey, LogNamespace},
442 event::{Event, EventMetadata, LogEvent},
443 schema::{Definition, meaning},
444 };
445 use vrl::{
446 core::Value,
447 event_path, metadata_path, owned_value_path, value,
448 value::{Kind, kind::Collection},
449 };
450
451 use super::{normalize_as_agent_event, normalize_event};
452 use crate::common::datadog::DD_RESERVED_SEMANTIC_ATTRS;
453
454 fn assert_normalized_log_has_expected_attrs(log: &LogEvent) {
455 assert!(
456 log.get(event_path!("timestamp"))
457 .expect("should have timestamp")
458 .is_integer()
459 );
460
461 for attr in [
462 "message",
463 "timestamp",
464 "hostname",
465 "ddtags",
466 "service",
467 "status",
468 ] {
469 assert!(log.contains(event_path!(attr)), "missing {attr}");
470 }
471
472 assert_eq!(
473 log.get(event_path!("ddtags")).expect("should have tags"),
474 &Value::Bytes("key1:value1,key2:value2".into())
475 );
476 }
477
478 fn agent_event_metadata(definition: Definition) -> EventMetadata {
479 EventMetadata::default().with_schema_definition(&Arc::new(
480 definition
481 .with_source_metadata(
482 "datadog_agent",
483 Some(LegacyKey::InsertIfEmpty(owned_value_path!("ddtags"))),
484 &owned_value_path!("ddtags"),
485 Kind::bytes(),
486 Some(meaning::TAGS),
487 )
488 .with_source_metadata(
489 "datadog_agent",
490 Some(LegacyKey::InsertIfEmpty(owned_value_path!("hostname"))),
491 &owned_value_path!("hostname"),
492 Kind::bytes(),
493 Some(meaning::HOST),
494 )
495 .with_source_metadata(
496 "datadog_agent",
497 Some(LegacyKey::InsertIfEmpty(owned_value_path!("timestamp"))),
498 &owned_value_path!("timestamp"),
499 Kind::timestamp(),
500 Some(meaning::TIMESTAMP),
501 )
502 .with_source_metadata(
503 "datadog_agent",
504 Some(LegacyKey::InsertIfEmpty(owned_value_path!("severity"))),
505 &owned_value_path!("severity"),
506 Kind::bytes(),
507 Some(meaning::SEVERITY),
508 )
509 .with_source_metadata(
510 "datadog_agent",
511 Some(LegacyKey::InsertIfEmpty(owned_value_path!("service"))),
512 &owned_value_path!("service"),
513 Kind::bytes(),
514 Some(meaning::SERVICE),
515 )
516 .with_source_metadata(
517 "datadog_agent",
518 Some(LegacyKey::InsertIfEmpty(owned_value_path!("source"))),
519 &owned_value_path!("source"),
520 Kind::bytes(),
521 Some(meaning::SOURCE),
522 ),
523 ))
524 }
525
526 #[test]
527 fn normalize_event_doesnt_require() {
528 let mut log = LogEvent::default();
529 log.insert(event_path!("foo"), "bar");
530
531 let mut event = Event::Log(log);
532 normalize_event(&mut event);
533
534 let log = event.as_log();
535
536 assert!(!log.contains(event_path!("message")));
537 assert!(!log.contains(event_path!("timestamp")));
538 assert!(!log.contains(event_path!("hostname")));
539 }
540
541 #[test]
542 fn normalize_event_normalizes_legacy_namespace() {
543 let definition = Definition::new_with_default_metadata(
544 Kind::object(Collection::empty()),
545 [LogNamespace::Legacy],
546 );
547 let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
548 log.insert(event_path!("message"), "the_message");
549 let namespace = log.namespace();
550
551 namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
552
553 let tags = vec![
554 Value::Bytes("key1:value1".into()),
555 Value::Bytes("key2:value2".into()),
556 ];
557
558 log.insert(event_path!("ddtags"), tags);
559 log.insert(event_path!("hostname"), "the_host");
560 log.insert(event_path!("service"), "the_service");
561 log.insert(event_path!("source"), "the_source");
562 log.insert(event_path!("severity"), "the_severity");
563
564 assert!(log.namespace() == LogNamespace::Legacy);
565
566 let mut event = Event::Log(log);
567 normalize_event(&mut event);
568
569 assert_normalized_log_has_expected_attrs(event.as_log());
570 }
571
572 #[test]
573 fn normalize_event_normalizes_vector_namespace_raw_field() {
574 let mut event = prepare_event_vector_namespace(|definition| {
575 LogEvent::from_parts(value!("the_message"), agent_event_metadata(definition))
576 });
577
578 normalize_event(&mut event);
579 normalize_as_agent_event(&mut event);
580
581 assert_normalized_log_has_expected_attrs(event.as_log());
582 assert_only_reserved_fields_at_root(event.as_log());
583 assert_eq!(
584 event.as_log().get("message"),
585 Some(&value!({"message": "the_message"}))
586 );
587 }
588
589 fn prepare_event_vector_namespace(log_generator: fn(Definition) -> LogEvent) -> Event {
590 let definition =
591 Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector]);
592 let mut log = log_generator(definition);
593
594 log.insert(metadata_path!("vector", "foo"), "bar");
596
597 let namespace = log.namespace();
598 namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
599
600 let tags = vec![
601 Value::Bytes("key1:value1".into()),
602 Value::Bytes("key2:value2".into()),
603 ];
604 log.insert(metadata_path!("datadog_agent", "ddtags"), tags);
605
606 log.insert(metadata_path!("datadog_agent", "hostname"), "the_host");
607 log.insert(metadata_path!("datadog_agent", "timestamp"), Utc::now());
608 log.insert(metadata_path!("datadog_agent", "service"), "the_service");
609 log.insert(metadata_path!("datadog_agent", "source"), "the_source");
610 log.insert(metadata_path!("datadog_agent", "severity"), "the_severity");
611
612 assert!(log.namespace() == LogNamespace::Vector);
613 Event::Log(log)
614 }
615
616 #[test]
617 fn normalize_event_normalizes_vector_namespace() {
618 let mut event = prepare_event_vector_namespace(|definition| {
619 let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
620 log.insert(event_path!("message"), "the_message");
621 log
622 });
623
624 normalize_event(&mut event);
625 normalize_as_agent_event(&mut event);
626
627 assert_normalized_log_has_expected_attrs(event.as_log());
628 assert_only_reserved_fields_at_root(event.as_log());
629 }
630
631 fn prepare_agent_event() -> LogEvent {
632 let definition = Definition::new_with_default_metadata(
633 Kind::object(Collection::empty()),
634 [LogNamespace::Legacy],
635 );
636 let mut log = LogEvent::new_with_metadata(agent_event_metadata(definition));
637 let namespace = log.namespace();
638 namespace.insert_standard_vector_source_metadata(&mut log, "datadog_agent", Utc::now());
639
640 let tags = vec![
641 Value::Bytes("key1:value1".into()),
642 Value::Bytes("key2:value2".into()),
643 ];
644
645 log.insert(event_path!("ddtags"), tags);
647 log.insert(event_path!("hostname"), "the_host");
648 log.insert(event_path!("service"), "the_service");
649 log.insert(event_path!("timestamp"), Utc::now());
650 log.insert(event_path!("source"), "the_source");
651 log.insert(event_path!("severity"), "the_severity");
652
653 let sample_message = value!({
654 "message": "hello world",
655 "field_a": "field_a_value",
656 "field_b": "field_b_value",
657 "field_c": { "field_c_nested" : "field_c_value" },
658 });
659 log.insert(event_path!("message"), sample_message.to_string());
660 log
661 }
662
663 fn assert_only_reserved_fields_at_root(log: &LogEvent) {
664 let objmap = log.as_map().unwrap();
665 let reserved_fields = DD_RESERVED_SEMANTIC_ATTRS
666 .into_iter()
667 .chain([("message", "message")])
668 .collect::<Vec<(&str, &str)>>();
669 for key in objmap.keys() {
670 assert!(reserved_fields.iter().any(|(_, msg)| *msg == key.as_str()));
671 }
672 }
673
674 #[test]
675 fn normalize_conforming_agent_with_collisions() {
676 let mut log = prepare_agent_event();
677
678 log.insert(event_path!("field_a"), "replaced_field_a_value");
680 log.insert(event_path!("field_c"), "replaced_field_c_value");
681 let mut event = Event::Log(log);
682 normalize_event(&mut event);
683 normalize_as_agent_event(&mut event);
684
685 let log = event.as_log();
686 assert_normalized_log_has_expected_attrs(log);
687 assert_only_reserved_fields_at_root(log);
688 assert_eq!(
689 log.get(event_path!("message")),
690 Some(&value!({
691 "source_type": "datadog_agent",
692 "field_a": "replaced_field_a_value",
693 "field_c": "replaced_field_c_value",
694 "message": (value!({
695 "message": "hello world",
696 "field_a": "field_a_value",
697 "field_b": "field_b_value",
698 "field_c": { "field_c_nested" : "field_c_value" },
699 }).to_string()),
700 }))
701 );
702 }
703
704 #[test]
705 fn normalize_conforming_agent() {
706 let mut log = prepare_agent_event();
707
708 log.insert(event_path!("field_1"), "value_1");
710 log.insert(event_path!("field_2"), "value_2");
711 log.insert(event_path!("field_3", "field_3_nested"), "value_3");
712
713 let mut event = Event::Log(log);
715 normalize_event(&mut event);
716 normalize_as_agent_event(&mut event);
717
718 let log = event.as_log();
720 assert_normalized_log_has_expected_attrs(log);
721 assert_only_reserved_fields_at_root(log);
722
723 assert_eq!(
725 log.get(event_path!("message")),
726 Some(&value!({
727 "source_type": "datadog_agent",
728 "message": (value!({
729 "message": "hello world",
730 "field_a": "field_a_value",
731 "field_b": "field_b_value",
732 "field_c": { "field_c_nested" : "field_c_value" },
733 }).to_string()),
734 "field_1": "value_1",
735 "field_2": "value_2",
736 "field_3": {
737 "field_3_nested": "value_3"
738 }
739 }))
740 );
741 }
742}