vector/components/validation/resources/
mod.rs1mod event;
2mod http;
3
4use std::sync::Arc;
5
6use tokio::sync::{Mutex, mpsc};
7use vector_lib::{
8 codecs::{
9 BytesEncoder,
10 decoding::{self, DeserializerConfig},
11 encoding::{
12 self, Framer, FramingConfig, JsonSerializerConfig, SerializerConfig,
13 TextSerializerConfig,
14 },
15 },
16 config::{DataType, LogNamespace},
17 event::Event,
18};
19
20use self::http::HttpResourceOutputContext;
21pub use self::{
22 event::{TestEvent, encode_test_event},
23 http::HttpResourceConfig,
24};
25use super::{
26 RunnerMetrics,
27 sync::{Configuring, TaskCoordinator},
28};
29use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming};
30
31#[derive(Clone)]
40pub enum ResourceCodec {
41 Encoding(EncodingConfig),
48
49 EncodingWithFraming(EncodingConfigWithFraming),
53
54 Decoding(DecodingConfig),
58}
59
60impl ResourceCodec {
61 pub fn allowed_event_data_types(self) -> DataType {
67 match self {
68 Self::Encoding(encoding) => encoding.config().input_type(),
69 Self::EncodingWithFraming(encoding) => encoding.config().1.input_type(),
70 Self::Decoding(decoding) => decoding.config().output_type(),
71 }
72 }
73
74 pub fn into_encoder(&self) -> Encoder<encoding::Framer> {
79 let (framer, serializer) = match self {
80 Self::Encoding(config) => (
81 Framer::Bytes(BytesEncoder),
82 config.build().expect("should not fail to build serializer"),
83 ),
84 Self::EncodingWithFraming(config) => {
85 let (maybe_framing, serializer) = config.config();
86 (
87 maybe_framing
88 .clone()
89 .unwrap_or(FramingConfig::Bytes)
90 .build(),
91 serializer
92 .build()
93 .expect("building serializer should never fail"),
94 )
95 }
96 Self::Decoding(config) => (
97 decoder_framing_to_encoding_framer(config.framing()),
98 deserializer_config_to_serializer(config.config()),
99 ),
100 };
101
102 Encoder::<encoding::Framer>::new(framer, serializer)
103 }
104
105 pub fn into_decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
110 let (framer, deserializer) = match self {
111 Self::Decoding(config) => return config.build(),
112 Self::Encoding(config) => (
113 encoder_framing_to_decoding_framer(config.config().default_stream_framing()),
114 serializer_config_to_deserializer(config.config())?,
115 ),
116 Self::EncodingWithFraming(config) => {
117 let (maybe_framing, serializer) = config.config();
118 let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes);
119 (
120 encoder_framing_to_decoding_framer(framing),
121 serializer_config_to_deserializer(serializer)?,
122 )
123 }
124 };
125
126 Ok(Decoder::new(framer, deserializer).with_log_namespace(log_namespace))
127 }
128}
129
130impl From<EncodingConfig> for ResourceCodec {
131 fn from(config: EncodingConfig) -> Self {
132 Self::Encoding(config)
133 }
134}
135
136impl From<EncodingConfigWithFraming> for ResourceCodec {
137 fn from(config: EncodingConfigWithFraming) -> Self {
138 Self::EncodingWithFraming(config)
139 }
140}
141
142impl From<DecodingConfig> for ResourceCodec {
143 fn from(config: DecodingConfig) -> Self {
144 Self::Decoding(config)
145 }
146}
147
148fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::Serializer {
149 let serializer_config = match config {
150 DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()),
154 DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()),
155 DeserializerConfig::Protobuf(config) => {
156 SerializerConfig::Protobuf(vector_lib::codecs::encoding::ProtobufSerializerConfig {
157 protobuf: vector_lib::codecs::encoding::ProtobufSerializerOptions {
158 desc_file: config.protobuf.desc_file.clone(),
159 message_type: config.protobuf.message_type.clone(),
160 use_json_names: config.protobuf.use_json_names,
161 },
162 })
163 }
164 #[cfg(feature = "codecs-syslog")]
168 DeserializerConfig::Syslog { .. } => SerializerConfig::Logfmt,
169 DeserializerConfig::Native => SerializerConfig::Native,
170 DeserializerConfig::NativeJson { .. } => SerializerConfig::NativeJson,
171 DeserializerConfig::Gelf { .. } => SerializerConfig::Gelf(Default::default()),
172 DeserializerConfig::Avro { avro } => SerializerConfig::Avro { avro: avro.into() },
173 DeserializerConfig::Influxdb { .. } => todo!(),
175 DeserializerConfig::Vrl { .. } => unimplemented!(),
176 #[cfg(feature = "codecs-opentelemetry")]
177 DeserializerConfig::Otlp { .. } => SerializerConfig::Otlp,
178 };
179
180 serializer_config
181 .build()
182 .expect("building serializer should never fail")
183}
184
185fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> encoding::Framer {
186 let framing_config = match framing {
187 decoding::FramingConfig::Bytes => encoding::FramingConfig::Bytes,
188 decoding::FramingConfig::CharacterDelimited(config) => {
189 encoding::FramingConfig::CharacterDelimited(encoding::CharacterDelimitedEncoderConfig {
190 character_delimited: encoding::CharacterDelimitedEncoderOptions {
191 delimiter: config.character_delimited.delimiter,
192 },
193 })
194 }
195 decoding::FramingConfig::LengthDelimited(config) => {
196 encoding::FramingConfig::LengthDelimited(encoding::LengthDelimitedEncoderConfig {
197 length_delimited: config.length_delimited.clone(),
198 })
199 }
200 decoding::FramingConfig::NewlineDelimited(_) => encoding::FramingConfig::NewlineDelimited,
201 decoding::FramingConfig::OctetCounting(_) => todo!(),
204 decoding::FramingConfig::ChunkedGelf(_) => todo!(),
206 decoding::FramingConfig::VarintLengthDelimited(config) => {
207 encoding::FramingConfig::VarintLengthDelimited(
208 encoding::VarintLengthDelimitedEncoderConfig {
209 max_frame_length: config.max_frame_length,
210 },
211 )
212 }
213 };
214
215 framing_config.build()
216}
217
218fn serializer_config_to_deserializer(
219 config: &SerializerConfig,
220) -> vector_lib::Result<decoding::Deserializer> {
221 let deserializer_config = match config {
222 SerializerConfig::Avro { .. } => todo!(),
223 SerializerConfig::Cef { .. } => todo!(),
224 SerializerConfig::Csv { .. } => todo!(),
225 SerializerConfig::Gelf { .. } => DeserializerConfig::Gelf(Default::default()),
226 SerializerConfig::Json(_) => DeserializerConfig::Json(Default::default()),
227 SerializerConfig::Logfmt => todo!(),
228 SerializerConfig::Native => DeserializerConfig::Native,
229 SerializerConfig::NativeJson => DeserializerConfig::NativeJson(Default::default()),
230 SerializerConfig::Protobuf(config) => {
231 DeserializerConfig::Protobuf(vector_lib::codecs::decoding::ProtobufDeserializerConfig {
232 protobuf: vector_lib::codecs::decoding::ProtobufDeserializerOptions {
233 desc_file: config.protobuf.desc_file.clone(),
234 message_type: config.protobuf.message_type.clone(),
235 use_json_names: config.protobuf.use_json_names,
236 },
237 })
238 }
239 SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes,
240 #[cfg(feature = "codecs-opentelemetry")]
241 SerializerConfig::Otlp => todo!(),
242 };
243
244 deserializer_config.build()
245}
246
247fn encoder_framing_to_decoding_framer(framing: encoding::FramingConfig) -> decoding::Framer {
248 let framing_config = match framing {
249 encoding::FramingConfig::Bytes => decoding::FramingConfig::Bytes,
250 encoding::FramingConfig::CharacterDelimited(config) => {
251 decoding::FramingConfig::CharacterDelimited(decoding::CharacterDelimitedDecoderConfig {
252 character_delimited: decoding::CharacterDelimitedDecoderOptions {
253 delimiter: config.character_delimited.delimiter,
254 max_length: None,
255 },
256 })
257 }
258 encoding::FramingConfig::LengthDelimited(config) => {
259 decoding::FramingConfig::LengthDelimited(decoding::LengthDelimitedDecoderConfig {
260 length_delimited: config.length_delimited.clone(),
261 })
262 }
263 encoding::FramingConfig::NewlineDelimited => {
264 decoding::FramingConfig::NewlineDelimited(Default::default())
265 }
266 vector_lib::codecs::encoding::FramingConfig::VarintLengthDelimited(config) => {
267 decoding::FramingConfig::VarintLengthDelimited(
268 decoding::VarintLengthDelimitedDecoderConfig {
269 max_frame_length: config.max_frame_length,
270 },
271 )
272 }
273 };
274
275 framing_config.build()
276}
277
278#[derive(Clone)]
280pub enum ResourceDirection {
281 Pull,
291
292 Push,
304}
305
306#[derive(Clone)]
313pub enum ResourceDefinition {
314 Http(HttpResourceConfig),
315}
316
317impl From<HttpResourceConfig> for ResourceDefinition {
318 fn from(config: HttpResourceConfig) -> Self {
319 Self::Http(config)
320 }
321}
322
323#[derive(Clone)]
338pub struct ExternalResource {
339 pub direction: ResourceDirection,
340 definition: ResourceDefinition,
341 pub codec: ResourceCodec,
342}
343
344impl ExternalResource {
345 pub fn new<D, C>(direction: ResourceDirection, definition: D, codec: C) -> Self
347 where
348 D: Into<ResourceDefinition>,
349 C: Into<ResourceCodec>,
350 {
351 Self {
352 direction,
353 definition: definition.into(),
354 codec: codec.into(),
355 }
356 }
357
358 pub fn spawn_as_input(
360 self,
361 input_rx: mpsc::Receiver<TestEvent>,
362 task_coordinator: &TaskCoordinator<Configuring>,
363 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
364 ) {
365 match self.definition {
366 ResourceDefinition::Http(http_config) => http_config.spawn_as_input(
367 self.direction,
368 self.codec,
369 input_rx,
370 task_coordinator,
371 runner_metrics,
372 ),
373 }
374 }
375
376 pub fn spawn_as_output(
378 self,
379 output_tx: mpsc::Sender<Vec<Event>>,
380 task_coordinator: &TaskCoordinator<Configuring>,
381 input_events: Vec<TestEvent>,
382 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
383 log_namespace: LogNamespace,
384 ) -> vector_lib::Result<()> {
385 match self.definition {
386 ResourceDefinition::Http(http_config) => {
387 http_config.spawn_as_output(HttpResourceOutputContext {
388 direction: self.direction,
389 codec: self.codec,
390 output_tx,
391 task_coordinator,
392 input_events,
393 runner_metrics,
394 log_namespace,
395 })
396 }
397 }
398 }
399}