vector/components/validation/resources/
event.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
use std::collections::HashMap;

use bytes::BytesMut;
use serde::Deserialize;
use serde_json::Value;
use snafu::Snafu;
use tokio_util::codec::Encoder as _;
use vector_lib::codecs::encoding::format::JsonSerializerOptions;

use crate::codecs::Encoder;
use vector_lib::codecs::{
    encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues,
    NewlineDelimitedEncoder,
};
use vector_lib::event::{Event, LogEvent};

/// A test case event for deserialization from yaml file.
/// This is an intermediary step to TestEvent.
#[derive(Clone, Debug, Deserialize)]
#[serde(untagged)]
pub enum RawTestEvent {
    /// The event is used, as-is, without modification.
    Passthrough(EventData),

    /// The event is potentially modified by the external resource.
    ///
    /// The modification made is dependent on the external resource, but this mode is made available
    /// for when a test case wants to exercise the failure path, but cannot cause a failure simply
    /// by constructing the event in a certain way i.e. adding an invalid field, or removing a
    /// required field, or using an invalid field value, and so on.
    ///
    /// For transforms and sinks, generally, the only way to cause an error is if the event itself
    /// is malformed in some way, which can be achieved without this test event variant.
    AlternateEncoder { fail_encoding_of: EventData },

    /// The event will be rejected by the external resource.
    ResourceReject {
        external_resource_rejects: EventData,
    },
}

#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum EventData {
    /// A simple log event.
    Log(String),
    /// A log event built from key-value pairs
    LogBuilder(HashMap<String, Value>),
}

impl EventData {
    /// Converts this event data into an `Event`.
    pub fn into_event(self) -> Event {
        match self {
            Self::Log(message) => Event::Log(LogEvent::from_bytes_legacy(&message.into())),
            Self::LogBuilder(data) => {
                let mut log_event = LogEvent::default();
                for (k, v) in data {
                    log_event
                        .parse_path_and_insert(&k, v)
                        .unwrap_or_else(|_| panic!("Unable to build log event for {}", &k));
                }
                Event::Log(log_event)
            }
        }
    }
}

/// An event used in a test case.
/// It is important to have created the event with all fields, immediately after deserializing from the
/// test case definition yaml file. This ensures that the event data we are using in the expected/actual
/// metrics collection is based on the same event. Namely, one issue that can arise from creating the event
/// from the event data twice (once for the expected and once for actual), it can result in a timestamp in
/// the event which may or may not have the same millisecond precision as it's counterpart.
///
/// For transforms and sinks, generally, the only way to cause an error is if the event itself
/// is malformed in some way, which can be achieved without this test event variant.
#[derive(Clone, Debug, Deserialize)]
#[serde(from = "RawTestEvent")]
#[serde(untagged)]
pub enum TestEvent {
    /// The event is used, as-is, without modification.
    Passthrough(Event),

    /// The event is encoded using an encoding that differs from the component's
    /// configured encoding, which should cause an error when the event is decoded.
    FailWithAlternateEncoder(Event),

    /// The event encodes successfully but when the external resource receives that event, it should
    /// throw a failure.
    FailWithExternalResource(Event),
}

impl TestEvent {
    #[allow(clippy::missing_const_for_fn)] // const cannot run destructor
    pub fn into_event(self) -> Event {
        match self {
            Self::Passthrough(event) => event,
            Self::FailWithAlternateEncoder(event) => event,
            Self::FailWithExternalResource(event) => event,
        }
    }

    pub fn get_event(&mut self) -> &mut Event {
        match self {
            Self::Passthrough(event) => event,
            Self::FailWithAlternateEncoder(event) => event,
            Self::FailWithExternalResource(event) => event,
        }
    }

    /// (should_fail, event)
    pub fn get(self) -> (bool, Event) {
        match self {
            Self::Passthrough(event) => (false, event),
            Self::FailWithAlternateEncoder(event) => (true, event),
            Self::FailWithExternalResource(event) => (true, event),
        }
    }

    /// True if the event should fail, false otherwise.
    pub const fn should_fail(&self) -> bool {
        match self {
            Self::Passthrough(_) => false,
            Self::FailWithAlternateEncoder(_) | Self::FailWithExternalResource(_) => true,
        }
    }

    /// True if the event should be rejected by the external resource in order to
    /// trigger a failure path.
    pub const fn should_reject(&self) -> bool {
        match self {
            Self::Passthrough(_) | Self::FailWithAlternateEncoder(_) => false,
            Self::FailWithExternalResource(_) => true,
        }
    }
}

#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
pub enum RawTestEventParseError {}

impl From<RawTestEvent> for TestEvent {
    fn from(other: RawTestEvent) -> Self {
        match other {
            RawTestEvent::Passthrough(event_data) => {
                TestEvent::Passthrough(event_data.into_event())
            }
            RawTestEvent::AlternateEncoder {
                fail_encoding_of: event_data,
            } => TestEvent::FailWithAlternateEncoder(event_data.into_event()),
            RawTestEvent::ResourceReject {
                external_resource_rejects: event_data,
            } => TestEvent::FailWithExternalResource(event_data.into_event()),
        }
    }
}

pub fn encode_test_event(
    encoder: &mut Encoder<encoding::Framer>,
    buf: &mut BytesMut,
    event: TestEvent,
) {
    match event {
        TestEvent::Passthrough(event) | TestEvent::FailWithExternalResource(event) => {
            // Encode the event normally.
            encoder
                .encode(event, buf)
                .expect("should not fail to encode input event");
        }
        TestEvent::FailWithAlternateEncoder(event) => {
            // This is a little fragile, but we check what serializer this encoder uses, and based
            // on `Serializer::supports_json`, we choose an opposing codec. For example, if the
            // encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise
            // versa.
            let mut alt_encoder = if encoder.serializer().supports_json() {
                Encoder::<encoding::Framer>::new(
                    LengthDelimitedEncoder::default().into(),
                    LogfmtSerializer.into(),
                )
            } else {
                Encoder::<encoding::Framer>::new(
                    NewlineDelimitedEncoder::default().into(),
                    JsonSerializer::new(
                        MetricTagValues::default(),
                        JsonSerializerOptions::default(),
                    )
                    .into(),
                )
            };

            alt_encoder
                .encode(event, buf)
                .expect("should not fail to encode input event");
        }
    }
}