vector/sources/util/
message_decoding.rs1use std::iter;
2
3use bytes::{Bytes, BytesMut};
4use chrono::{DateTime, Utc};
5use tokio_util::codec::Decoder as _;
6use vector_lib::codecs::StreamDecodingError;
7use vector_lib::internal_event::{
8 CountByteSize, EventsReceived, InternalEventHandle as _, Registered,
9};
10use vector_lib::lookup::{metadata_path, path, PathPrefix};
11use vector_lib::{config::LogNamespace, EstimatedJsonEncodedSizeOf};
12
13use crate::{codecs::Decoder, config::log_schema, event::BatchNotifier, event::Event};
14
15pub fn decode_message<'a>(
16 mut decoder: Decoder,
17 source_type: &'static str,
18 message: &[u8],
19 timestamp: Option<DateTime<Utc>>,
20 batch: &'a Option<BatchNotifier>,
21 log_namespace: LogNamespace,
22 events_received: &'a Registered<EventsReceived>,
23) -> impl Iterator<Item = Event> + 'a + use<'a> {
24 let schema = log_schema();
25
26 let mut buffer = BytesMut::with_capacity(message.len());
27 buffer.extend_from_slice(message);
28 let now = Utc::now();
29
30 iter::from_fn(move || loop {
31 break match decoder.decode_eof(&mut buffer) {
32 Ok(Some((events, _))) => Some(events.into_iter().map(move |mut event| {
33 if let Event::Log(ref mut log) = event {
34 log_namespace.insert_vector_metadata(
35 log,
36 schema.source_type_key(),
37 path!("source_type"),
38 Bytes::from(source_type),
39 );
40 match log_namespace {
41 LogNamespace::Vector => {
42 if let Some(timestamp) = timestamp {
43 log.try_insert(metadata_path!(source_type, "timestamp"), timestamp);
44 }
45
46 log.insert(metadata_path!("vector", "ingest_timestamp"), now);
47 }
48 LogNamespace::Legacy => {
49 if let Some(timestamp) = timestamp {
50 if let Some(timestamp_key) = schema.timestamp_key() {
51 log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
52 }
53 }
54 }
55 }
56 }
57 events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
58 event
59 })),
60 Err(error) => {
61 if error.can_continue() {
64 continue;
65 }
66 None
67 }
68 Ok(None) => None,
69 };
70 })
71 .flatten()
72 .map(move |event| event.with_batch_notifier_option(batch))
73}