vector/sources/util/
message_decoding.rs

1use std::iter;
2
3use bytes::{Bytes, BytesMut};
4use chrono::{DateTime, Utc};
5use tokio_util::codec::Decoder as _;
6use vector_lib::{
7    EstimatedJsonEncodedSizeOf,
8    codecs::StreamDecodingError,
9    config::LogNamespace,
10    internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _, Registered},
11    lookup::{PathPrefix, metadata_path, path},
12};
13
14use crate::{
15    codecs::Decoder,
16    config::log_schema,
17    event::{BatchNotifier, Event},
18};
19
20pub fn decode_message<'a>(
21    mut decoder: Decoder,
22    source_type: &'static str,
23    message: &[u8],
24    timestamp: Option<DateTime<Utc>>,
25    batch: &'a Option<BatchNotifier>,
26    log_namespace: LogNamespace,
27    events_received: &'a Registered<EventsReceived>,
28) -> impl Iterator<Item = Event> + 'a + use<'a> {
29    let schema = log_schema();
30
31    let mut buffer = BytesMut::with_capacity(message.len());
32    buffer.extend_from_slice(message);
33    let now = Utc::now();
34
35    iter::from_fn(move || {
36        loop {
37            break match decoder.decode_eof(&mut buffer) {
38                Ok(Some((events, _))) => Some(events.into_iter().map(move |mut event| {
39                    if let Event::Log(ref mut log) = event {
40                        log_namespace.insert_vector_metadata(
41                            log,
42                            schema.source_type_key(),
43                            path!("source_type"),
44                            Bytes::from(source_type),
45                        );
46                        match log_namespace {
47                            LogNamespace::Vector => {
48                                if let Some(timestamp) = timestamp {
49                                    log.try_insert(
50                                        metadata_path!(source_type, "timestamp"),
51                                        timestamp,
52                                    );
53                                }
54
55                                log.insert(metadata_path!("vector", "ingest_timestamp"), now);
56                            }
57                            LogNamespace::Legacy => {
58                                if let Some(timestamp) = timestamp
59                                    && let Some(timestamp_key) = schema.timestamp_key()
60                                {
61                                    log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
62                                }
63                            }
64                        }
65                    }
66                    events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
67                    event
68                })),
69                Err(error) => {
70                    // Error is logged by `crate::codecs::Decoder`, no further handling
71                    // is needed here.
72                    if error.can_continue() {
73                        continue;
74                    }
75                    None
76                }
77                Ok(None) => None,
78            };
79        }
80    })
81    .flatten()
82    .map(move |event| event.with_batch_notifier_option(batch))
83}