vector/sources/util/
message_decoding.rs

1use std::iter;
2
3use bytes::{Bytes, BytesMut};
4use chrono::{DateTime, Utc};
5use tokio_util::codec::Decoder as _;
6use vector_lib::codecs::StreamDecodingError;
7use vector_lib::internal_event::{
8    CountByteSize, EventsReceived, InternalEventHandle as _, Registered,
9};
10use vector_lib::lookup::{metadata_path, path, PathPrefix};
11use vector_lib::{config::LogNamespace, EstimatedJsonEncodedSizeOf};
12
13use crate::{codecs::Decoder, config::log_schema, event::BatchNotifier, event::Event};
14
15pub fn decode_message<'a>(
16    mut decoder: Decoder,
17    source_type: &'static str,
18    message: &[u8],
19    timestamp: Option<DateTime<Utc>>,
20    batch: &'a Option<BatchNotifier>,
21    log_namespace: LogNamespace,
22    events_received: &'a Registered<EventsReceived>,
23) -> impl Iterator<Item = Event> + 'a + use<'a> {
24    let schema = log_schema();
25
26    let mut buffer = BytesMut::with_capacity(message.len());
27    buffer.extend_from_slice(message);
28    let now = Utc::now();
29
30    iter::from_fn(move || loop {
31        break match decoder.decode_eof(&mut buffer) {
32            Ok(Some((events, _))) => Some(events.into_iter().map(move |mut event| {
33                if let Event::Log(ref mut log) = event {
34                    log_namespace.insert_vector_metadata(
35                        log,
36                        schema.source_type_key(),
37                        path!("source_type"),
38                        Bytes::from(source_type),
39                    );
40                    match log_namespace {
41                        LogNamespace::Vector => {
42                            if let Some(timestamp) = timestamp {
43                                log.try_insert(metadata_path!(source_type, "timestamp"), timestamp);
44                            }
45
46                            log.insert(metadata_path!("vector", "ingest_timestamp"), now);
47                        }
48                        LogNamespace::Legacy => {
49                            if let Some(timestamp) = timestamp {
50                                if let Some(timestamp_key) = schema.timestamp_key() {
51                                    log.try_insert((PathPrefix::Event, timestamp_key), timestamp);
52                                }
53                            }
54                        }
55                    }
56                }
57                events_received.emit(CountByteSize(1, event.estimated_json_encoded_size_of()));
58                event
59            })),
60            Err(error) => {
61                // Error is logged by `crate::codecs::Decoder`, no further handling
62                // is needed here.
63                if error.can_continue() {
64                    continue;
65                }
66                None
67            }
68            Ok(None) => None,
69        };
70    })
71    .flatten()
72    .map(move |event| event.with_batch_notifier_option(batch))
73}