vector/components/validation/resources/
mod.rs1mod event;
2mod http;
3
4use std::sync::Arc;
5
6use tokio::sync::{mpsc, Mutex};
7use vector_lib::{
8 codecs::{
9 decoding::{self, DeserializerConfig},
10 encoding::{
11 self, Framer, FramingConfig, JsonSerializerConfig, SerializerConfig,
12 TextSerializerConfig,
13 },
14 BytesEncoder,
15 },
16 config::LogNamespace,
17};
18use vector_lib::{config::DataType, event::Event};
19
20use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming};
21
22pub use self::event::{encode_test_event, TestEvent};
23pub use self::http::HttpResourceConfig;
24use self::http::HttpResourceOutputContext;
25
26use super::{
27 sync::{Configuring, TaskCoordinator},
28 RunnerMetrics,
29};
30
31#[derive(Clone)]
40pub enum ResourceCodec {
41 Encoding(EncodingConfig),
48
49 EncodingWithFraming(EncodingConfigWithFraming),
53
54 Decoding(DecodingConfig),
58}
59
60impl ResourceCodec {
61 pub fn allowed_event_data_types(self) -> DataType {
67 match self {
68 Self::Encoding(encoding) => encoding.config().input_type(),
69 Self::EncodingWithFraming(encoding) => encoding.config().1.input_type(),
70 Self::Decoding(decoding) => decoding.config().output_type(),
71 }
72 }
73
74 pub fn into_encoder(&self) -> Encoder<encoding::Framer> {
79 let (framer, serializer) = match self {
80 Self::Encoding(config) => (
81 Framer::Bytes(BytesEncoder),
82 config.build().expect("should not fail to build serializer"),
83 ),
84 Self::EncodingWithFraming(config) => {
85 let (maybe_framing, serializer) = config.config();
86 (
87 maybe_framing
88 .clone()
89 .unwrap_or(FramingConfig::Bytes)
90 .build(),
91 serializer
92 .build()
93 .expect("building serializer should never fail"),
94 )
95 }
96 Self::Decoding(config) => (
97 decoder_framing_to_encoding_framer(config.framing()),
98 deserializer_config_to_serializer(config.config()),
99 ),
100 };
101
102 Encoder::<encoding::Framer>::new(framer, serializer)
103 }
104
105 pub fn into_decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
110 let (framer, deserializer) = match self {
111 Self::Decoding(config) => return config.build(),
112 Self::Encoding(config) => (
113 encoder_framing_to_decoding_framer(config.config().default_stream_framing()),
114 serializer_config_to_deserializer(config.config())?,
115 ),
116 Self::EncodingWithFraming(config) => {
117 let (maybe_framing, serializer) = config.config();
118 let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes);
119 (
120 encoder_framing_to_decoding_framer(framing),
121 serializer_config_to_deserializer(serializer)?,
122 )
123 }
124 };
125
126 Ok(Decoder::new(framer, deserializer).with_log_namespace(log_namespace))
127 }
128}
129
130impl From<EncodingConfig> for ResourceCodec {
131 fn from(config: EncodingConfig) -> Self {
132 Self::Encoding(config)
133 }
134}
135
136impl From<EncodingConfigWithFraming> for ResourceCodec {
137 fn from(config: EncodingConfigWithFraming) -> Self {
138 Self::EncodingWithFraming(config)
139 }
140}
141
142impl From<DecodingConfig> for ResourceCodec {
143 fn from(config: DecodingConfig) -> Self {
144 Self::Decoding(config)
145 }
146}
147
148fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::Serializer {
149 let serializer_config = match config {
150 DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()),
154 DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()),
155 DeserializerConfig::Protobuf(config) => {
156 SerializerConfig::Protobuf(vector_lib::codecs::encoding::ProtobufSerializerConfig {
157 protobuf: vector_lib::codecs::encoding::ProtobufSerializerOptions {
158 desc_file: config.protobuf.desc_file.clone(),
159 message_type: config.protobuf.message_type.clone(),
160 },
161 })
162 }
163 #[cfg(feature = "codecs-syslog")]
167 DeserializerConfig::Syslog { .. } => SerializerConfig::Logfmt,
168 DeserializerConfig::Native => SerializerConfig::Native,
169 DeserializerConfig::NativeJson { .. } => SerializerConfig::NativeJson,
170 DeserializerConfig::Gelf { .. } => SerializerConfig::Gelf,
171 DeserializerConfig::Avro { avro } => SerializerConfig::Avro { avro: avro.into() },
172 DeserializerConfig::Influxdb { .. } => todo!(),
174 DeserializerConfig::Vrl { .. } => unimplemented!(),
175 };
176
177 serializer_config
178 .build()
179 .expect("building serializer should never fail")
180}
181
182fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> encoding::Framer {
183 let framing_config = match framing {
184 decoding::FramingConfig::Bytes => encoding::FramingConfig::Bytes,
185 decoding::FramingConfig::CharacterDelimited(config) => {
186 encoding::FramingConfig::CharacterDelimited(encoding::CharacterDelimitedEncoderConfig {
187 character_delimited: encoding::CharacterDelimitedEncoderOptions {
188 delimiter: config.character_delimited.delimiter,
189 },
190 })
191 }
192 decoding::FramingConfig::LengthDelimited(config) => {
193 encoding::FramingConfig::LengthDelimited(encoding::LengthDelimitedEncoderConfig {
194 length_delimited: config.length_delimited.clone(),
195 })
196 }
197 decoding::FramingConfig::NewlineDelimited(_) => encoding::FramingConfig::NewlineDelimited,
198 decoding::FramingConfig::OctetCounting(_) => todo!(),
201 decoding::FramingConfig::ChunkedGelf(_) => todo!(),
203 };
204
205 framing_config.build()
206}
207
208fn serializer_config_to_deserializer(
209 config: &SerializerConfig,
210) -> vector_lib::Result<decoding::Deserializer> {
211 let deserializer_config = match config {
212 SerializerConfig::Avro { .. } => todo!(),
213 SerializerConfig::Cef { .. } => todo!(),
214 SerializerConfig::Csv { .. } => todo!(),
215 SerializerConfig::Gelf => DeserializerConfig::Gelf(Default::default()),
216 SerializerConfig::Json(_) => DeserializerConfig::Json(Default::default()),
217 SerializerConfig::Logfmt => todo!(),
218 SerializerConfig::Native => DeserializerConfig::Native,
219 SerializerConfig::NativeJson => DeserializerConfig::NativeJson(Default::default()),
220 SerializerConfig::Protobuf(config) => {
221 DeserializerConfig::Protobuf(vector_lib::codecs::decoding::ProtobufDeserializerConfig {
222 protobuf: vector_lib::codecs::decoding::ProtobufDeserializerOptions {
223 desc_file: config.protobuf.desc_file.clone(),
224 message_type: config.protobuf.message_type.clone(),
225 },
226 })
227 }
228 SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes,
229 };
230
231 deserializer_config.build()
232}
233
234fn encoder_framing_to_decoding_framer(framing: encoding::FramingConfig) -> decoding::Framer {
235 let framing_config = match framing {
236 encoding::FramingConfig::Bytes => decoding::FramingConfig::Bytes,
237 encoding::FramingConfig::CharacterDelimited(config) => {
238 decoding::FramingConfig::CharacterDelimited(decoding::CharacterDelimitedDecoderConfig {
239 character_delimited: decoding::CharacterDelimitedDecoderOptions {
240 delimiter: config.character_delimited.delimiter,
241 max_length: None,
242 },
243 })
244 }
245 encoding::FramingConfig::LengthDelimited(config) => {
246 decoding::FramingConfig::LengthDelimited(decoding::LengthDelimitedDecoderConfig {
247 length_delimited: config.length_delimited.clone(),
248 })
249 }
250 encoding::FramingConfig::NewlineDelimited => {
251 decoding::FramingConfig::NewlineDelimited(Default::default())
252 }
253 };
254
255 framing_config.build()
256}
257
258#[derive(Clone)]
260pub enum ResourceDirection {
261 Pull,
271
272 Push,
284}
285
286#[derive(Clone)]
293pub enum ResourceDefinition {
294 Http(HttpResourceConfig),
295}
296
297impl From<HttpResourceConfig> for ResourceDefinition {
298 fn from(config: HttpResourceConfig) -> Self {
299 Self::Http(config)
300 }
301}
302
303#[derive(Clone)]
318pub struct ExternalResource {
319 pub direction: ResourceDirection,
320 definition: ResourceDefinition,
321 pub codec: ResourceCodec,
322}
323
324impl ExternalResource {
325 pub fn new<D, C>(direction: ResourceDirection, definition: D, codec: C) -> Self
327 where
328 D: Into<ResourceDefinition>,
329 C: Into<ResourceCodec>,
330 {
331 Self {
332 direction,
333 definition: definition.into(),
334 codec: codec.into(),
335 }
336 }
337
338 pub fn spawn_as_input(
340 self,
341 input_rx: mpsc::Receiver<TestEvent>,
342 task_coordinator: &TaskCoordinator<Configuring>,
343 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
344 ) {
345 match self.definition {
346 ResourceDefinition::Http(http_config) => http_config.spawn_as_input(
347 self.direction,
348 self.codec,
349 input_rx,
350 task_coordinator,
351 runner_metrics,
352 ),
353 }
354 }
355
356 pub fn spawn_as_output(
358 self,
359 output_tx: mpsc::Sender<Vec<Event>>,
360 task_coordinator: &TaskCoordinator<Configuring>,
361 input_events: Vec<TestEvent>,
362 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
363 log_namespace: LogNamespace,
364 ) -> vector_lib::Result<()> {
365 match self.definition {
366 ResourceDefinition::Http(http_config) => {
367 http_config.spawn_as_output(HttpResourceOutputContext {
368 direction: self.direction,
369 codec: self.codec,
370 output_tx,
371 task_coordinator,
372 input_events,
373 runner_metrics,
374 log_namespace,
375 })
376 }
377 }
378 }
379}