vector/components/validation/resources/event.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
use std::collections::HashMap;
use bytes::BytesMut;
use serde::Deserialize;
use serde_json::Value;
use snafu::Snafu;
use tokio_util::codec::Encoder as _;
use vector_lib::codecs::encoding::format::JsonSerializerOptions;
use crate::codecs::Encoder;
use vector_lib::codecs::{
encoding, JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues,
NewlineDelimitedEncoder,
};
use vector_lib::event::{Event, LogEvent};
/// A test case event for deserialization from yaml file.
/// This is an intermediary step to TestEvent.
#[derive(Clone, Debug, Deserialize)]
#[serde(untagged)]
pub enum RawTestEvent {
/// The event is used, as-is, without modification.
Passthrough(EventData),
/// The event is potentially modified by the external resource.
///
/// The modification made is dependent on the external resource, but this mode is made available
/// for when a test case wants to exercise the failure path, but cannot cause a failure simply
/// by constructing the event in a certain way i.e. adding an invalid field, or removing a
/// required field, or using an invalid field value, and so on.
///
/// For transforms and sinks, generally, the only way to cause an error is if the event itself
/// is malformed in some way, which can be achieved without this test event variant.
AlternateEncoder { fail_encoding_of: EventData },
/// The event will be rejected by the external resource.
ResourceReject {
external_resource_rejects: EventData,
},
}
#[derive(Clone, Debug, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum EventData {
/// A simple log event.
Log(String),
/// A log event built from key-value pairs
LogBuilder(HashMap<String, Value>),
}
impl EventData {
/// Converts this event data into an `Event`.
pub fn into_event(self) -> Event {
match self {
Self::Log(message) => Event::Log(LogEvent::from_bytes_legacy(&message.into())),
Self::LogBuilder(data) => {
let mut log_event = LogEvent::default();
for (k, v) in data {
log_event
.parse_path_and_insert(&k, v)
.unwrap_or_else(|_| panic!("Unable to build log event for {}", &k));
}
Event::Log(log_event)
}
}
}
}
/// An event used in a test case.
/// It is important to have created the event with all fields, immediately after deserializing from the
/// test case definition yaml file. This ensures that the event data we are using in the expected/actual
/// metrics collection is based on the same event. Namely, one issue that can arise from creating the event
/// from the event data twice (once for the expected and once for actual), it can result in a timestamp in
/// the event which may or may not have the same millisecond precision as it's counterpart.
///
/// For transforms and sinks, generally, the only way to cause an error is if the event itself
/// is malformed in some way, which can be achieved without this test event variant.
#[derive(Clone, Debug, Deserialize)]
#[serde(from = "RawTestEvent")]
#[serde(untagged)]
pub enum TestEvent {
/// The event is used, as-is, without modification.
Passthrough(Event),
/// The event is encoded using an encoding that differs from the component's
/// configured encoding, which should cause an error when the event is decoded.
FailWithAlternateEncoder(Event),
/// The event encodes successfully but when the external resource receives that event, it should
/// throw a failure.
FailWithExternalResource(Event),
}
impl TestEvent {
#[allow(clippy::missing_const_for_fn)] // const cannot run destructor
pub fn into_event(self) -> Event {
match self {
Self::Passthrough(event) => event,
Self::FailWithAlternateEncoder(event) => event,
Self::FailWithExternalResource(event) => event,
}
}
pub fn get_event(&mut self) -> &mut Event {
match self {
Self::Passthrough(event) => event,
Self::FailWithAlternateEncoder(event) => event,
Self::FailWithExternalResource(event) => event,
}
}
/// (should_fail, event)
pub fn get(self) -> (bool, Event) {
match self {
Self::Passthrough(event) => (false, event),
Self::FailWithAlternateEncoder(event) => (true, event),
Self::FailWithExternalResource(event) => (true, event),
}
}
/// True if the event should fail, false otherwise.
pub const fn should_fail(&self) -> bool {
match self {
Self::Passthrough(_) => false,
Self::FailWithAlternateEncoder(_) | Self::FailWithExternalResource(_) => true,
}
}
/// True if the event should be rejected by the external resource in order to
/// trigger a failure path.
pub const fn should_reject(&self) -> bool {
match self {
Self::Passthrough(_) | Self::FailWithAlternateEncoder(_) => false,
Self::FailWithExternalResource(_) => true,
}
}
}
#[derive(Clone, Debug, Eq, PartialEq, Snafu)]
pub enum RawTestEventParseError {}
impl From<RawTestEvent> for TestEvent {
fn from(other: RawTestEvent) -> Self {
match other {
RawTestEvent::Passthrough(event_data) => {
TestEvent::Passthrough(event_data.into_event())
}
RawTestEvent::AlternateEncoder {
fail_encoding_of: event_data,
} => TestEvent::FailWithAlternateEncoder(event_data.into_event()),
RawTestEvent::ResourceReject {
external_resource_rejects: event_data,
} => TestEvent::FailWithExternalResource(event_data.into_event()),
}
}
}
pub fn encode_test_event(
encoder: &mut Encoder<encoding::Framer>,
buf: &mut BytesMut,
event: TestEvent,
) {
match event {
TestEvent::Passthrough(event) | TestEvent::FailWithExternalResource(event) => {
// Encode the event normally.
encoder
.encode(event, buf)
.expect("should not fail to encode input event");
}
TestEvent::FailWithAlternateEncoder(event) => {
// This is a little fragile, but we check what serializer this encoder uses, and based
// on `Serializer::supports_json`, we choose an opposing codec. For example, if the
// encoder supports JSON, we'll use a serializer that doesn't support JSON, and vise
// versa.
let mut alt_encoder = if encoder.serializer().supports_json() {
Encoder::<encoding::Framer>::new(
LengthDelimitedEncoder::default().into(),
LogfmtSerializer.into(),
)
} else {
Encoder::<encoding::Framer>::new(
NewlineDelimitedEncoder::default().into(),
JsonSerializer::new(
MetricTagValues::default(),
JsonSerializerOptions::default(),
)
.into(),
)
};
alt_encoder
.encode(event, buf)
.expect("should not fail to encode input event");
}
}
}