vector/components/validation/resources/
event.rs1use std::collections::HashMap;
2
3use bytes::BytesMut;
4use serde::Deserialize;
5use serde_json::Value;
6use tokio_util::codec::Encoder as _;
7use vector_lib::{
8 codecs::{
9 JsonSerializer, LengthDelimitedEncoder, LogfmtSerializer, MetricTagValues,
10 NewlineDelimitedEncoder, encoding, encoding::format::JsonSerializerOptions,
11 },
12 event::{Event, LogEvent},
13};
14
15use crate::codecs::Encoder;
16
17#[derive(Clone, Debug, Deserialize)]
20#[serde(untagged)]
21pub enum RawTestEvent {
22 Passthrough(EventData),
24
25 AlternateEncoder { fail_encoding_of: EventData },
35
36 ResourceReject {
38 external_resource_rejects: EventData,
39 },
40}
41
42#[derive(Clone, Debug, Deserialize)]
43#[serde(rename_all = "snake_case")]
44pub enum EventData {
45 Log(String),
47 LogBuilder(HashMap<String, Value>),
49}
50
51impl EventData {
52 pub fn into_event(self) -> Event {
54 match self {
55 Self::Log(message) => Event::Log(LogEvent::from_bytes_legacy(&message.into())),
56 Self::LogBuilder(data) => {
57 let mut log_event = LogEvent::default();
58 for (k, v) in data {
59 log_event
60 .parse_path_and_insert(&k, v)
61 .unwrap_or_else(|_| panic!("Unable to build log event for {}", &k));
62 }
63 Event::Log(log_event)
64 }
65 }
66 }
67}
68
69#[derive(Clone, Debug, Deserialize)]
79#[serde(from = "RawTestEvent")]
80#[serde(untagged)]
81pub enum TestEvent {
82 Passthrough(Event),
84
85 FailWithAlternateEncoder(Event),
88
89 FailWithExternalResource(Event),
92}
93
94impl TestEvent {
95 #[allow(clippy::missing_const_for_fn)] pub fn into_event(self) -> Event {
97 match self {
98 Self::Passthrough(event) => event,
99 Self::FailWithAlternateEncoder(event) => event,
100 Self::FailWithExternalResource(event) => event,
101 }
102 }
103
104 pub const fn get_event(&mut self) -> &mut Event {
105 match self {
106 Self::Passthrough(event) => event,
107 Self::FailWithAlternateEncoder(event) => event,
108 Self::FailWithExternalResource(event) => event,
109 }
110 }
111
112 pub fn get(self) -> (bool, Event) {
114 match self {
115 Self::Passthrough(event) => (false, event),
116 Self::FailWithAlternateEncoder(event) => (true, event),
117 Self::FailWithExternalResource(event) => (true, event),
118 }
119 }
120
121 pub const fn should_fail(&self) -> bool {
123 match self {
124 Self::Passthrough(_) => false,
125 Self::FailWithAlternateEncoder(_) | Self::FailWithExternalResource(_) => true,
126 }
127 }
128
129 pub const fn should_reject(&self) -> bool {
132 match self {
133 Self::Passthrough(_) | Self::FailWithAlternateEncoder(_) => false,
134 Self::FailWithExternalResource(_) => true,
135 }
136 }
137}
138
139impl From<RawTestEvent> for TestEvent {
140 fn from(other: RawTestEvent) -> Self {
141 match other {
142 RawTestEvent::Passthrough(event_data) => {
143 TestEvent::Passthrough(event_data.into_event())
144 }
145 RawTestEvent::AlternateEncoder {
146 fail_encoding_of: event_data,
147 } => TestEvent::FailWithAlternateEncoder(event_data.into_event()),
148 RawTestEvent::ResourceReject {
149 external_resource_rejects: event_data,
150 } => TestEvent::FailWithExternalResource(event_data.into_event()),
151 }
152 }
153}
154
155pub fn encode_test_event(
156 encoder: &mut Encoder<encoding::Framer>,
157 buf: &mut BytesMut,
158 event: TestEvent,
159) {
160 match event {
161 TestEvent::Passthrough(event) | TestEvent::FailWithExternalResource(event) => {
162 encoder
164 .encode(event, buf)
165 .expect("should not fail to encode input event");
166 }
167 TestEvent::FailWithAlternateEncoder(event) => {
168 let mut alt_encoder = if encoder.serializer().supports_json() {
173 Encoder::<encoding::Framer>::new(
174 LengthDelimitedEncoder::default().into(),
175 LogfmtSerializer.into(),
176 )
177 } else {
178 Encoder::<encoding::Framer>::new(
179 NewlineDelimitedEncoder::default().into(),
180 JsonSerializer::new(
181 MetricTagValues::default(),
182 JsonSerializerOptions::default(),
183 )
184 .into(),
185 )
186 };
187
188 alt_encoder
189 .encode(event, buf)
190 .expect("should not fail to encode input event");
191 }
192 }
193}