vector/components/validation/resources/
event.rs

1use std::collections::HashMap;
2
3use bytes::BytesMut;
4use serde::Deserialize;
5use serde_json::Value;
6use snafu::Snafu;
7use tokio_util::codec::Encoder as _;
8use vector_lib::codecs::encoding::format::JsonSerializerOptions;
9
10use crate::codecs::Encoder;
11use vector_lib::codecs::{
12    encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues,
13    NewlineDelimitedEncoder,
14};
15use vector_lib::event::{Event, LogEvent};
16
17/// A test case event for deserialization from yaml file.
18/// This is an intermediary step to TestEvent.
19#[derive(Clone, Debug, Deserialize)]
20#[serde(untagged)]
21pub enum RawTestEvent {
22    /// The event is used, as-is, without modification.
23    Passthrough(EventData),
24
25    /// The event is potentially modified by the external resource.
26    ///
27    /// The modification made is dependent on the external resource, but this mode is made available
28    /// for when a test case wants to exercise the failure path, but cannot cause a failure simply
29    /// by constructing the event in a certain way i.e. adding an invalid field, or removing a
30    /// required field, or using an invalid field value, and so on.
31    ///
32    /// For transforms and sinks, generally, the only way to cause an error is if the event itself
33    /// is malformed in some way, which can be achieved without this test event variant.
34    AlternateEncoder { fail_encoding_of: EventData },
35
36    /// The event will be rejected by the external resource.
37    ResourceReject {
38        external_resource_rejects: EventData,
39    },
40}
41
42#[derive(Clone, Debug, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum EventData {
45    /// A simple log event.
46    Log(String),
47    /// A log event built from key-value pairs
48    LogBuilder(HashMap<String, Value>),
49}
50
51impl EventData {
52    /// Converts this event data into an `Event`.
53    pub fn into_event(self) -> Event {
54        match self {
55            Self::Log(message) => Event::Log(LogEvent::from_bytes_legacy(&message.into())),
56            Self::LogBuilder(data) => {
57                let mut log_event = LogEvent::default();
58                for (k, v) in data {
59                    log_event
60                        .parse_path_and_insert(&k, v)
61                        .unwrap_or_else(|_| panic!("Unable to build log event for {}", &k));
62                }
63                Event::Log(log_event)
64            }
65        }
66    }
67}
68
69/// An event used in a test case.
70/// It is important to have created the event with all fields, immediately after deserializing from the
71/// test case definition yaml file. This ensures that the event data we are using in the expected/actual
72/// metrics collection is based on the same event. Namely, one issue that can arise from creating the event
73/// from the event data twice (once for the expected and once for actual), it can result in a timestamp in
74/// the event which may or may not have the same millisecond precision as it's counterpart.
75///
76/// For transforms and sinks, generally, the only way to cause an error is if the event itself
77/// is malformed in some way, which can be achieved without this test event variant.
78#[derive(Clone, Debug, Deserialize)]
79#[serde(from = "RawTestEvent")]
80#[serde(untagged)]
81pub enum TestEvent {
82    /// The event is used, as-is, without modification.
83    Passthrough(Event),
84
85    /// The event is encoded using an encoding that differs from the component's
86    /// configured encoding, which should cause an error when the event is decoded.
87    FailWithAlternateEncoder(Event),
88
89    /// The event encodes successfully but when the external resource receives that event, it should
90    /// throw a failure.
91    FailWithExternalResource(Event),
92}
93
94impl TestEvent {
95    #[allow(clippy::missing_const_for_fn)] // const cannot run destructor
96    pub fn into_event(self) -> Event {
97        match self {
98            Self::Passthrough(event) => event,
99            Self::FailWithAlternateEncoder(event) => event,
100            Self::FailWithExternalResource(event) => event,
101        }
102    }
103
104    pub const fn get_event(&mut self) -> &mut Event {
105        match self {
106            Self::Passthrough(event) => event,
107            Self::FailWithAlternateEncoder(event) => event,
108            Self::FailWithExternalResource(event) => event,
109        }
110    }
111
112    /// (should_fail, event)
113    pub fn get(self) -> (bool, Event) {
114        match self {
115            Self::Passthrough(event) => (false, event),
116            Self::FailWithAlternateEncoder(event) => (true, event),
117            Self::FailWithExternalResource(event) => (true, event),
118        }
119    }
120
121    /// True if the event should fail, false otherwise.
122    pub const fn should_fail(&self) -> bool {
123        match self {
124            Self::Passthrough(_) => false,
125            Self::FailWithAlternateEncoder(_) | Self::FailWithExternalResource(_) => true,
126        }
127    }
128
129    /// True if the event should be rejected by the external resource in order to
130    /// trigger a failure path.
131    pub const fn should_reject(&self) -> bool {
132        match self {
133            Self::Passthrough(_) | Self::FailWithAlternateEncoder(_) => false,
134            Self::FailWithExternalResource(_) => true,
135        }
136    }
137}
138
139#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
140pub enum RawTestEventParseError {}
141
142impl From<RawTestEvent> for TestEvent {
143    fn from(other: RawTestEvent) -> Self {
144        match other {
145            RawTestEvent::Passthrough(event_data) => {
146                TestEvent::Passthrough(event_data.into_event())
147            }
148            RawTestEvent::AlternateEncoder {
149                fail_encoding_of: event_data,
150            } => TestEvent::FailWithAlternateEncoder(event_data.into_event()),
151            RawTestEvent::ResourceReject {
152                external_resource_rejects: event_data,
153            } => TestEvent::FailWithExternalResource(event_data.into_event()),
154        }
155    }
156}
157
158pub fn encode_test_event(
159    encoder: &mut Encoder<encoding::Framer>,
160    buf: &mut BytesMut,
161    event: TestEvent,
162) {
163    match event {
164        TestEvent::Passthrough(event) | TestEvent::FailWithExternalResource(event) => {
165            // Encode the event normally.
166            encoder
167                .encode(event, buf)
168                .expect("should not fail to encode input event");
169        }
170        TestEvent::FailWithAlternateEncoder(event) => {
171            // This is a little fragile, but we check what serializer this encoder uses, and based
172            // on `Serializer::supports_json`, we choose an opposing codec. For example, if the
173            // encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise
174            // versa.
175            let mut alt_encoder = if encoder.serializer().supports_json() {
176                Encoder::<encoding::Framer>::new(
177                    LengthDelimitedEncoder::default().into(),
178                    LogfmtSerializer.into(),
179                )
180            } else {
181                Encoder::<encoding::Framer>::new(
182                    NewlineDelimitedEncoder::default().into(),
183                    JsonSerializer::new(
184                        MetricTagValues::default(),
185                        JsonSerializerOptions::default(),
186                    )
187                    .into(),
188                )
189            };
190
191            alt_encoder
192                .encode(event, buf)
193                .expect("should not fail to encode input event");
194        }
195    }
196}