vector/components/validation/resources/
mod.rs1mod event;
2mod http;
3
4use std::sync::Arc;
5
6use tokio::sync::{Mutex, mpsc};
7use vector_lib::{
8 codecs::{
9 BytesEncoder,
10 decoding::{self, DeserializerConfig},
11 encoding::{
12 self, Framer, FramingConfig, JsonSerializerConfig, SerializerConfig,
13 TextSerializerConfig,
14 },
15 },
16 config::{DataType, LogNamespace},
17 event::Event,
18};
19
20use self::http::HttpResourceOutputContext;
21pub use self::{
22 event::{TestEvent, encode_test_event},
23 http::HttpResourceConfig,
24};
25use super::{
26 RunnerMetrics,
27 sync::{Configuring, TaskCoordinator},
28};
29use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming};
30
31#[derive(Clone)]
40pub enum ResourceCodec {
41 Encoding(EncodingConfig),
48
49 EncodingWithFraming(EncodingConfigWithFraming),
53
54 Decoding(DecodingConfig),
58}
59
60impl ResourceCodec {
61 pub fn allowed_event_data_types(self) -> DataType {
67 match self {
68 Self::Encoding(encoding) => encoding.config().input_type(),
69 Self::EncodingWithFraming(encoding) => encoding.config().1.input_type(),
70 Self::Decoding(decoding) => decoding.config().output_type(),
71 }
72 }
73
74 pub fn into_encoder(&self) -> Encoder<encoding::Framer> {
79 let (framer, serializer) = match self {
80 Self::Encoding(config) => (
81 Framer::Bytes(BytesEncoder),
82 config.build().expect("should not fail to build serializer"),
83 ),
84 Self::EncodingWithFraming(config) => {
85 let (maybe_framing, serializer) = config.config();
86 (
87 maybe_framing
88 .clone()
89 .unwrap_or(FramingConfig::Bytes)
90 .build(),
91 serializer
92 .build()
93 .expect("building serializer should never fail"),
94 )
95 }
96 Self::Decoding(config) => (
97 decoder_framing_to_encoding_framer(config.framing()),
98 deserializer_config_to_serializer(config.config()),
99 ),
100 };
101
102 Encoder::<encoding::Framer>::new(framer, serializer)
103 }
104
105 pub fn into_decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
110 let (framer, deserializer) = match self {
111 Self::Decoding(config) => return config.build(),
112 Self::Encoding(config) => (
113 encoder_framing_to_decoding_framer(config.config().default_stream_framing()),
114 serializer_config_to_deserializer(config.config())?,
115 ),
116 Self::EncodingWithFraming(config) => {
117 let (maybe_framing, serializer) = config.config();
118 let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes);
119 (
120 encoder_framing_to_decoding_framer(framing),
121 serializer_config_to_deserializer(serializer)?,
122 )
123 }
124 };
125
126 Ok(Decoder::new(framer, deserializer).with_log_namespace(log_namespace))
127 }
128}
129
130impl From<EncodingConfig> for ResourceCodec {
131 fn from(config: EncodingConfig) -> Self {
132 Self::Encoding(config)
133 }
134}
135
136impl From<EncodingConfigWithFraming> for ResourceCodec {
137 fn from(config: EncodingConfigWithFraming) -> Self {
138 Self::EncodingWithFraming(config)
139 }
140}
141
142impl From<DecodingConfig> for ResourceCodec {
143 fn from(config: DecodingConfig) -> Self {
144 Self::Decoding(config)
145 }
146}
147
148fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::Serializer {
149 let serializer_config = match config {
150 DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()),
154 DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()),
155 DeserializerConfig::Protobuf(config) => {
156 SerializerConfig::Protobuf(vector_lib::codecs::encoding::ProtobufSerializerConfig {
157 protobuf: vector_lib::codecs::encoding::ProtobufSerializerOptions {
158 desc_file: config.protobuf.desc_file.clone(),
159 message_type: config.protobuf.message_type.clone(),
160 },
161 })
162 }
163 #[cfg(feature = "codecs-syslog")]
167 DeserializerConfig::Syslog { .. } => SerializerConfig::Logfmt,
168 DeserializerConfig::Native => SerializerConfig::Native,
169 DeserializerConfig::NativeJson { .. } => SerializerConfig::NativeJson,
170 DeserializerConfig::Gelf { .. } => SerializerConfig::Gelf(Default::default()),
171 DeserializerConfig::Avro { avro } => SerializerConfig::Avro { avro: avro.into() },
172 DeserializerConfig::Influxdb { .. } => todo!(),
174 DeserializerConfig::Vrl { .. } => unimplemented!(),
175 };
176
177 serializer_config
178 .build()
179 .expect("building serializer should never fail")
180}
181
182fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> encoding::Framer {
183 let framing_config = match framing {
184 decoding::FramingConfig::Bytes => encoding::FramingConfig::Bytes,
185 decoding::FramingConfig::CharacterDelimited(config) => {
186 encoding::FramingConfig::CharacterDelimited(encoding::CharacterDelimitedEncoderConfig {
187 character_delimited: encoding::CharacterDelimitedEncoderOptions {
188 delimiter: config.character_delimited.delimiter,
189 },
190 })
191 }
192 decoding::FramingConfig::LengthDelimited(config) => {
193 encoding::FramingConfig::LengthDelimited(encoding::LengthDelimitedEncoderConfig {
194 length_delimited: config.length_delimited.clone(),
195 })
196 }
197 decoding::FramingConfig::NewlineDelimited(_) => encoding::FramingConfig::NewlineDelimited,
198 decoding::FramingConfig::OctetCounting(_) => todo!(),
201 decoding::FramingConfig::ChunkedGelf(_) => todo!(),
203 decoding::FramingConfig::VarintLengthDelimited(config) => {
204 encoding::FramingConfig::VarintLengthDelimited(
205 encoding::VarintLengthDelimitedEncoderConfig {
206 max_frame_length: config.max_frame_length,
207 },
208 )
209 }
210 };
211
212 framing_config.build()
213}
214
215fn serializer_config_to_deserializer(
216 config: &SerializerConfig,
217) -> vector_lib::Result<decoding::Deserializer> {
218 let deserializer_config = match config {
219 SerializerConfig::Avro { .. } => todo!(),
220 SerializerConfig::Cef { .. } => todo!(),
221 SerializerConfig::Csv { .. } => todo!(),
222 SerializerConfig::Gelf { .. } => DeserializerConfig::Gelf(Default::default()),
223 SerializerConfig::Json(_) => DeserializerConfig::Json(Default::default()),
224 SerializerConfig::Logfmt => todo!(),
225 SerializerConfig::Native => DeserializerConfig::Native,
226 SerializerConfig::NativeJson => DeserializerConfig::NativeJson(Default::default()),
227 SerializerConfig::Protobuf(config) => {
228 DeserializerConfig::Protobuf(vector_lib::codecs::decoding::ProtobufDeserializerConfig {
229 protobuf: vector_lib::codecs::decoding::ProtobufDeserializerOptions {
230 desc_file: config.protobuf.desc_file.clone(),
231 message_type: config.protobuf.message_type.clone(),
232 },
233 })
234 }
235 SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes,
236 };
237
238 deserializer_config.build()
239}
240
241fn encoder_framing_to_decoding_framer(framing: encoding::FramingConfig) -> decoding::Framer {
242 let framing_config = match framing {
243 encoding::FramingConfig::Bytes => decoding::FramingConfig::Bytes,
244 encoding::FramingConfig::CharacterDelimited(config) => {
245 decoding::FramingConfig::CharacterDelimited(decoding::CharacterDelimitedDecoderConfig {
246 character_delimited: decoding::CharacterDelimitedDecoderOptions {
247 delimiter: config.character_delimited.delimiter,
248 max_length: None,
249 },
250 })
251 }
252 encoding::FramingConfig::LengthDelimited(config) => {
253 decoding::FramingConfig::LengthDelimited(decoding::LengthDelimitedDecoderConfig {
254 length_delimited: config.length_delimited.clone(),
255 })
256 }
257 encoding::FramingConfig::NewlineDelimited => {
258 decoding::FramingConfig::NewlineDelimited(Default::default())
259 }
260 vector_lib::codecs::encoding::FramingConfig::VarintLengthDelimited(config) => {
261 decoding::FramingConfig::VarintLengthDelimited(
262 decoding::VarintLengthDelimitedDecoderConfig {
263 max_frame_length: config.max_frame_length,
264 },
265 )
266 }
267 };
268
269 framing_config.build()
270}
271
272#[derive(Clone)]
274pub enum ResourceDirection {
275 Pull,
285
286 Push,
298}
299
300#[derive(Clone)]
307pub enum ResourceDefinition {
308 Http(HttpResourceConfig),
309}
310
311impl From<HttpResourceConfig> for ResourceDefinition {
312 fn from(config: HttpResourceConfig) -> Self {
313 Self::Http(config)
314 }
315}
316
317#[derive(Clone)]
332pub struct ExternalResource {
333 pub direction: ResourceDirection,
334 definition: ResourceDefinition,
335 pub codec: ResourceCodec,
336}
337
338impl ExternalResource {
339 pub fn new<D, C>(direction: ResourceDirection, definition: D, codec: C) -> Self
341 where
342 D: Into<ResourceDefinition>,
343 C: Into<ResourceCodec>,
344 {
345 Self {
346 direction,
347 definition: definition.into(),
348 codec: codec.into(),
349 }
350 }
351
352 pub fn spawn_as_input(
354 self,
355 input_rx: mpsc::Receiver<TestEvent>,
356 task_coordinator: &TaskCoordinator<Configuring>,
357 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
358 ) {
359 match self.definition {
360 ResourceDefinition::Http(http_config) => http_config.spawn_as_input(
361 self.direction,
362 self.codec,
363 input_rx,
364 task_coordinator,
365 runner_metrics,
366 ),
367 }
368 }
369
370 pub fn spawn_as_output(
372 self,
373 output_tx: mpsc::Sender<Vec<Event>>,
374 task_coordinator: &TaskCoordinator<Configuring>,
375 input_events: Vec<TestEvent>,
376 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
377 log_namespace: LogNamespace,
378 ) -> vector_lib::Result<()> {
379 match self.definition {
380 ResourceDefinition::Http(http_config) => {
381 http_config.spawn_as_output(HttpResourceOutputContext {
382 direction: self.direction,
383 codec: self.codec,
384 output_tx,
385 task_coordinator,
386 input_events,
387 runner_metrics,
388 log_namespace,
389 })
390 }
391 }
392 }
393}