vector/components/validation/resources/
mod.rs1mod event;
2mod http;
3
4use std::sync::Arc;
5
6use tokio::sync::{Mutex, mpsc};
7use vector_lib::{
8 codecs::{
9 BytesEncoder, Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming,
10 decoding::{self, DeserializerConfig},
11 encoding::{
12 self, Framer, FramingConfig, JsonSerializerConfig, SerializerConfig,
13 TextSerializerConfig,
14 },
15 },
16 config::{DataType, LogNamespace},
17 event::Event,
18};
19
20use self::http::HttpResourceOutputContext;
21pub use self::{
22 event::{TestEvent, encode_test_event},
23 http::HttpResourceConfig,
24};
25use super::{
26 RunnerMetrics,
27 sync::{Configuring, TaskCoordinator},
28};
29
30#[derive(Clone)]
39pub enum ResourceCodec {
40 Encoding(EncodingConfig),
47
48 EncodingWithFraming(EncodingConfigWithFraming),
52
53 Decoding(DecodingConfig),
57}
58
59impl ResourceCodec {
60 pub fn allowed_event_data_types(self) -> DataType {
66 match self {
67 Self::Encoding(encoding) => encoding.config().input_type(),
68 Self::EncodingWithFraming(encoding) => encoding.config().1.input_type(),
69 Self::Decoding(decoding) => decoding.config().output_type(),
70 }
71 }
72
73 pub fn into_encoder(&self) -> Encoder<encoding::Framer> {
78 let (framer, serializer) = match self {
79 Self::Encoding(config) => (
80 Framer::Bytes(BytesEncoder),
81 config.build().expect("should not fail to build serializer"),
82 ),
83 Self::EncodingWithFraming(config) => {
84 let (maybe_framing, serializer) = config.config();
85 (
86 maybe_framing
87 .clone()
88 .unwrap_or(FramingConfig::Bytes)
89 .build(),
90 serializer
91 .build()
92 .expect("building serializer should never fail"),
93 )
94 }
95 Self::Decoding(config) => (
96 decoder_framing_to_encoding_framer(config.framing()),
97 deserializer_config_to_serializer(config.config()),
98 ),
99 };
100
101 Encoder::<encoding::Framer>::new(framer, serializer)
102 }
103
104 pub fn into_decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
109 let (framer, deserializer) = match self {
110 Self::Decoding(config) => return config.build(),
111 Self::Encoding(config) => (
112 encoder_framing_to_decoding_framer(config.config().default_stream_framing()),
113 serializer_config_to_deserializer(config.config())?,
114 ),
115 Self::EncodingWithFraming(config) => {
116 let (maybe_framing, serializer) = config.config();
117 let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes);
118 (
119 encoder_framing_to_decoding_framer(framing),
120 serializer_config_to_deserializer(serializer)?,
121 )
122 }
123 };
124
125 Ok(Decoder::new(framer, deserializer).with_log_namespace(log_namespace))
126 }
127}
128
129impl From<EncodingConfig> for ResourceCodec {
130 fn from(config: EncodingConfig) -> Self {
131 Self::Encoding(config)
132 }
133}
134
135impl From<EncodingConfigWithFraming> for ResourceCodec {
136 fn from(config: EncodingConfigWithFraming) -> Self {
137 Self::EncodingWithFraming(config)
138 }
139}
140
141impl From<DecodingConfig> for ResourceCodec {
142 fn from(config: DecodingConfig) -> Self {
143 Self::Decoding(config)
144 }
145}
146
147fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::Serializer {
148 let serializer_config = match config {
149 DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()),
153 DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()),
154 DeserializerConfig::Protobuf(config) => {
155 SerializerConfig::Protobuf(vector_lib::codecs::encoding::ProtobufSerializerConfig {
156 protobuf: vector_lib::codecs::encoding::ProtobufSerializerOptions {
157 desc_file: config.protobuf.desc_file.clone(),
158 message_type: config.protobuf.message_type.clone(),
159 use_json_names: config.protobuf.use_json_names,
160 },
161 })
162 }
163 #[cfg(feature = "codecs-syslog")]
167 DeserializerConfig::Syslog { .. } => SerializerConfig::Logfmt,
168 DeserializerConfig::Native => SerializerConfig::Native,
169 DeserializerConfig::NativeJson { .. } => SerializerConfig::NativeJson,
170 DeserializerConfig::Gelf { .. } => SerializerConfig::Gelf(Default::default()),
171 DeserializerConfig::Avro { avro } => SerializerConfig::Avro { avro: avro.into() },
172 DeserializerConfig::Influxdb { .. } => todo!(),
174 DeserializerConfig::Vrl { .. } => unimplemented!(),
175 #[cfg(feature = "codecs-opentelemetry")]
176 DeserializerConfig::Otlp { .. } => SerializerConfig::Otlp,
177 };
178
179 serializer_config
180 .build()
181 .expect("building serializer should never fail")
182}
183
184fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> encoding::Framer {
185 let framing_config = match framing {
186 decoding::FramingConfig::Bytes => encoding::FramingConfig::Bytes,
187 decoding::FramingConfig::CharacterDelimited(config) => {
188 encoding::FramingConfig::CharacterDelimited(encoding::CharacterDelimitedEncoderConfig {
189 character_delimited: encoding::CharacterDelimitedEncoderOptions {
190 delimiter: config.character_delimited.delimiter,
191 },
192 })
193 }
194 decoding::FramingConfig::LengthDelimited(config) => {
195 encoding::FramingConfig::LengthDelimited(encoding::LengthDelimitedEncoderConfig {
196 length_delimited: config.length_delimited.clone(),
197 })
198 }
199 decoding::FramingConfig::NewlineDelimited(_) => encoding::FramingConfig::NewlineDelimited,
200 decoding::FramingConfig::OctetCounting(_) => todo!(),
203 decoding::FramingConfig::ChunkedGelf(_) => todo!(),
205 decoding::FramingConfig::VarintLengthDelimited(config) => {
206 encoding::FramingConfig::VarintLengthDelimited(
207 encoding::VarintLengthDelimitedEncoderConfig {
208 max_frame_length: config.max_frame_length,
209 },
210 )
211 }
212 };
213
214 framing_config.build()
215}
216
217fn serializer_config_to_deserializer(
218 config: &SerializerConfig,
219) -> vector_lib::Result<decoding::Deserializer> {
220 let deserializer_config = match config {
221 SerializerConfig::Avro { .. } => todo!(),
222 SerializerConfig::Cef { .. } => todo!(),
223 SerializerConfig::Csv { .. } => todo!(),
224 SerializerConfig::Gelf { .. } => DeserializerConfig::Gelf(Default::default()),
225 SerializerConfig::Json(_) => DeserializerConfig::Json(Default::default()),
226 SerializerConfig::Logfmt => todo!(),
227 SerializerConfig::Native => DeserializerConfig::Native,
228 SerializerConfig::NativeJson => DeserializerConfig::NativeJson(Default::default()),
229 SerializerConfig::Protobuf(config) => {
230 DeserializerConfig::Protobuf(vector_lib::codecs::decoding::ProtobufDeserializerConfig {
231 protobuf: vector_lib::codecs::decoding::ProtobufDeserializerOptions {
232 desc_file: config.protobuf.desc_file.clone(),
233 message_type: config.protobuf.message_type.clone(),
234 use_json_names: config.protobuf.use_json_names,
235 },
236 })
237 }
238 SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes,
239 #[cfg(feature = "codecs-opentelemetry")]
240 SerializerConfig::Otlp => todo!(),
241 #[cfg(feature = "codecs-syslog")]
242 SerializerConfig::Syslog(_) => todo!(),
243 };
244
245 deserializer_config.build()
246}
247
248fn encoder_framing_to_decoding_framer(framing: encoding::FramingConfig) -> decoding::Framer {
249 let framing_config = match framing {
250 encoding::FramingConfig::Bytes => decoding::FramingConfig::Bytes,
251 encoding::FramingConfig::CharacterDelimited(config) => {
252 decoding::FramingConfig::CharacterDelimited(decoding::CharacterDelimitedDecoderConfig {
253 character_delimited: decoding::CharacterDelimitedDecoderOptions {
254 delimiter: config.character_delimited.delimiter,
255 max_length: None,
256 },
257 })
258 }
259 encoding::FramingConfig::LengthDelimited(config) => {
260 decoding::FramingConfig::LengthDelimited(decoding::LengthDelimitedDecoderConfig {
261 length_delimited: config.length_delimited.clone(),
262 })
263 }
264 encoding::FramingConfig::NewlineDelimited => {
265 decoding::FramingConfig::NewlineDelimited(Default::default())
266 }
267 vector_lib::codecs::encoding::FramingConfig::VarintLengthDelimited(config) => {
268 decoding::FramingConfig::VarintLengthDelimited(
269 decoding::VarintLengthDelimitedDecoderConfig {
270 max_frame_length: config.max_frame_length,
271 },
272 )
273 }
274 };
275
276 framing_config.build()
277}
278
279#[derive(Clone)]
281pub enum ResourceDirection {
282 Pull,
292
293 Push,
305}
306
307#[derive(Clone)]
314pub enum ResourceDefinition {
315 Http(HttpResourceConfig),
316}
317
318impl From<HttpResourceConfig> for ResourceDefinition {
319 fn from(config: HttpResourceConfig) -> Self {
320 Self::Http(config)
321 }
322}
323
324#[derive(Clone)]
339pub struct ExternalResource {
340 pub direction: ResourceDirection,
341 definition: ResourceDefinition,
342 pub codec: ResourceCodec,
343}
344
345impl ExternalResource {
346 pub fn new<D, C>(direction: ResourceDirection, definition: D, codec: C) -> Self
348 where
349 D: Into<ResourceDefinition>,
350 C: Into<ResourceCodec>,
351 {
352 Self {
353 direction,
354 definition: definition.into(),
355 codec: codec.into(),
356 }
357 }
358
359 pub fn spawn_as_input(
361 self,
362 input_rx: mpsc::Receiver<TestEvent>,
363 task_coordinator: &TaskCoordinator<Configuring>,
364 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
365 ) {
366 match self.definition {
367 ResourceDefinition::Http(http_config) => http_config.spawn_as_input(
368 self.direction,
369 self.codec,
370 input_rx,
371 task_coordinator,
372 runner_metrics,
373 ),
374 }
375 }
376
377 pub fn spawn_as_output(
379 self,
380 output_tx: mpsc::Sender<Vec<Event>>,
381 task_coordinator: &TaskCoordinator<Configuring>,
382 input_events: Vec<TestEvent>,
383 runner_metrics: &Arc<Mutex<RunnerMetrics>>,
384 log_namespace: LogNamespace,
385 ) -> vector_lib::Result<()> {
386 match self.definition {
387 ResourceDefinition::Http(http_config) => {
388 http_config.spawn_as_output(HttpResourceOutputContext {
389 direction: self.direction,
390 codec: self.codec,
391 output_tx,
392 task_coordinator,
393 input_events,
394 runner_metrics,
395 log_namespace,
396 })
397 }
398 }
399 }
400}