vector/sources/util/
message_decoding.rs1use std::iter;
2
3use bytes::{Bytes, BytesMut};
4use chrono::{DateTime, Utc};
5use tokio_util::codec::Decoder as _;
6use vector_lib::{
7 EstimatedJsonEncodedSizeOf,
8 codecs::StreamDecodingError,
9 config::LogNamespace,
10 internal_event::{CountByteSize, EventsReceived, InternalEventHandle as _, Registered},
11 lookup::{PathPrefix, metadata_path, path},
12};
13
14use crate::{
15 codecs::Decoder,
16 config::log_schema,
17 event::{BatchNotifier, Event},
18};
19
20pub fn decode_message<'a>(
21 mut decoder: Decoder,
22 source_type: &'static str,
23 message: &[u8],
24 timestamp: Option<DateTime<Utc>>,
25 batch: &'a Option<BatchNotifier>,
26 log_namespace: LogNamespace,
27 events_received: &'a Registered<EventsReceived>,
28) -> impl Iterator<Item = Event> + 'a + use<'a> {
29 let schema = log_schema();
30
31 let mut buffer = BytesMut::with_capacity(message.len());
32 buffer.extend_from_slice(message);
33 let now = Utc::now();
34
35 iter::from_fn(move || {
36 loop {
37 break match decoder.decode_eof(&mut buffer) {
38 Ok(Some((events, _))) => Some(events.into_iter().map(move |mut event| {
39 if let Event::Log(ref mut log) = event {
40 log_namespace.insert_vector_metadata(
41 log,
42 schema.source_type_key(),
43 path!("source_type"),
44 Bytes::from(source_type),
45 );
46 match log_namespace {
47 LogNamespace::Vector => {
48 if let Some(timestamp) = timestamp {
49 log.try_insert(
50 metadata_path!(source_type, "timestamp"),
51 timestamp,
52 );
53 }
54
55 log.insert(metadata_path!("vector", "ingest_timestamp"), now);
56 }
57 LogNamespace::Legacy => {
58 if let Some(timestamp) = timestamp
59 && let Some(timestamp_key) = schema.timestamp_key()
60 {
61 log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
62 }
63 }
64 }
65 }
66 events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
67 event
68 })),
69 Err(error) => {
70 if error.can_continue() {
73 continue;
74 }
75 None
76 }
77 Ok(None) => None,
78 };
79 }
80 })
81 .flatten()
82 .map(move |event| event.with_batch_notifier_option(batch))
83}