vector/components/validation/resources/
mod.rs

1mod event;
2mod http;
3
4use std::sync::Arc;
5
6use tokio::sync::{Mutex, mpsc};
7use vector_lib::{
8    codecs::{
9        BytesEncoder,
10        decoding::{self, DeserializerConfig},
11        encoding::{
12            self, Framer, FramingConfig, JsonSerializerConfig, SerializerConfig,
13            TextSerializerConfig,
14        },
15    },
16    config::{DataType, LogNamespace},
17    event::Event,
18};
19
20use self::http::HttpResourceOutputContext;
21pub use self::{
22    event::{TestEvent, encode_test_event},
23    http::HttpResourceConfig,
24};
25use super::{
26    RunnerMetrics,
27    sync::{Configuring, TaskCoordinator},
28};
29use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming};
30
31/// The codec used by the external resource.
32///
33/// This enum specifically exists to encapsulate the two main ways a component will configure the
34/// codec it uses, which ends up being with directionally-specific codec configuration: "encoding"
35/// when taking an `Event` and convert it to a raw output, and "decoding" when taking a raw output
36/// and converting it to an `Event`.
37///
38/// Encoding and decoding is generally tied to sinks and sources, respectively.
39#[derive(Clone)]
40pub enum ResourceCodec {
41    /// Component encodes events.
42    ///
43    /// As opposed to `EncodingWithFramer`, this variant uses the default framing method defined by
44    /// the encoding itself.
45    ///
46    /// Generally speaking, only sinks encode: going from `Event` to an encoded form.
47    Encoding(EncodingConfig),
48
49    /// Component encodes events, with a specific framer.
50    ///
51    /// Generally speaking, only sinks encode: going from `Event` to an encoded form.
52    EncodingWithFraming(EncodingConfigWithFraming),
53
54    /// Component decodes events.
55    ///
56    /// Generally speaking, only sources decode: going from an encoded form to `Event`.
57    Decoding(DecodingConfig),
58}
59
60impl ResourceCodec {
61    /// Gets the allowed event data types for the configured codec.
62    ///
63    /// Not all codecs support all possible event types (i.e. a codec has no means to losslessly
64    /// represent the data in a particular event type) so we must check at runtime to ensure that
65    /// we're only generating event payloads that can be encoded/decoded for the given component.
66    pub fn allowed_event_data_types(self) -> DataType {
67        match self {
68            Self::Encoding(encoding) => encoding.config().input_type(),
69            Self::EncodingWithFraming(encoding) => encoding.config().1.input_type(),
70            Self::Decoding(decoding) => decoding.config().output_type(),
71        }
72    }
73
74    /// Gets an encoder for this codec.
75    ///
76    /// The encoder is generated as an inverse to the input codec: if a decoding configuration was
77    /// given, we generate an encoder that satisfies that decoding configuration, and vice versa.
78    pub fn into_encoder(&self) -> Encoder<encoding::Framer> {
79        let (framer, serializer) = match self {
80            Self::Encoding(config) => (
81                Framer::Bytes(BytesEncoder),
82                config.build().expect("should not fail to build serializer"),
83            ),
84            Self::EncodingWithFraming(config) => {
85                let (maybe_framing, serializer) = config.config();
86                (
87                    maybe_framing
88                        .clone()
89                        .unwrap_or(FramingConfig::Bytes)
90                        .build(),
91                    serializer
92                        .build()
93                        .expect("building serializer should never fail"),
94                )
95            }
96            Self::Decoding(config) => (
97                decoder_framing_to_encoding_framer(config.framing()),
98                deserializer_config_to_serializer(config.config()),
99            ),
100        };
101
102        Encoder::<encoding::Framer>::new(framer, serializer)
103    }
104
105    /// Gets a decoder for this codec.
106    ///
107    /// The decoder is generated as an inverse to the input codec: if an encoding configuration was
108    /// given, we generate a decoder that satisfies that encoding configuration, and vice versa.
109    pub fn into_decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
110        let (framer, deserializer) = match self {
111            Self::Decoding(config) => return config.build(),
112            Self::Encoding(config) => (
113                encoder_framing_to_decoding_framer(config.config().default_stream_framing()),
114                serializer_config_to_deserializer(config.config())?,
115            ),
116            Self::EncodingWithFraming(config) => {
117                let (maybe_framing, serializer) = config.config();
118                let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes);
119                (
120                    encoder_framing_to_decoding_framer(framing),
121                    serializer_config_to_deserializer(serializer)?,
122                )
123            }
124        };
125
126        Ok(Decoder::new(framer, deserializer).with_log_namespace(log_namespace))
127    }
128}
129
130impl From<EncodingConfig> for ResourceCodec {
131    fn from(config: EncodingConfig) -> Self {
132        Self::Encoding(config)
133    }
134}
135
136impl From<EncodingConfigWithFraming> for ResourceCodec {
137    fn from(config: EncodingConfigWithFraming) -> Self {
138        Self::EncodingWithFraming(config)
139    }
140}
141
142impl From<DecodingConfig> for ResourceCodec {
143    fn from(config: DecodingConfig) -> Self {
144        Self::Decoding(config)
145    }
146}
147
148fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::Serializer {
149    let serializer_config = match config {
150        // TODO: This isn't necessarily a one-to-one conversion, at least not in the future when
151        // "bytes" can be a top-level field and we aren't implicitly decoding everything into the
152        // `message` field... but it's close enough for now.
153        DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()),
154        DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()),
155        DeserializerConfig::Protobuf(config) => {
156            SerializerConfig::Protobuf(vector_lib::codecs::encoding::ProtobufSerializerConfig {
157                protobuf: vector_lib::codecs::encoding::ProtobufSerializerOptions {
158                    desc_file: config.protobuf.desc_file.clone(),
159                    message_type: config.protobuf.message_type.clone(),
160                },
161            })
162        }
163        // TODO: We need to create an Avro serializer because, certainly, for any source decoding
164        // the data as Avro, we can't possibly send anything else without the source just
165        // immediately barfing.
166        #[cfg(feature = "codecs-syslog")]
167        DeserializerConfig::Syslog { .. } => SerializerConfig::Logfmt,
168        DeserializerConfig::Native => SerializerConfig::Native,
169        DeserializerConfig::NativeJson { .. } => SerializerConfig::NativeJson,
170        DeserializerConfig::Gelf { .. } => SerializerConfig::Gelf(Default::default()),
171        DeserializerConfig::Avro { avro } => SerializerConfig::Avro { avro: avro.into() },
172        // TODO: Influxdb has no serializer yet
173        DeserializerConfig::Influxdb { .. } => todo!(),
174        DeserializerConfig::Vrl { .. } => unimplemented!(),
175    };
176
177    serializer_config
178        .build()
179        .expect("building serializer should never fail")
180}
181
182fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> encoding::Framer {
183    let framing_config = match framing {
184        decoding::FramingConfig::Bytes => encoding::FramingConfig::Bytes,
185        decoding::FramingConfig::CharacterDelimited(config) => {
186            encoding::FramingConfig::CharacterDelimited(encoding::CharacterDelimitedEncoderConfig {
187                character_delimited: encoding::CharacterDelimitedEncoderOptions {
188                    delimiter: config.character_delimited.delimiter,
189                },
190            })
191        }
192        decoding::FramingConfig::LengthDelimited(config) => {
193            encoding::FramingConfig::LengthDelimited(encoding::LengthDelimitedEncoderConfig {
194                length_delimited: config.length_delimited.clone(),
195            })
196        }
197        decoding::FramingConfig::NewlineDelimited(_) => encoding::FramingConfig::NewlineDelimited,
198        // TODO: There's no equivalent octet counting framer for encoding... although
199        // there's no particular reason that would make it hard to write.
200        decoding::FramingConfig::OctetCounting(_) => todo!(),
201        // TODO: chunked gelf is not supported yet in encoding
202        decoding::FramingConfig::ChunkedGelf(_) => todo!(),
203        decoding::FramingConfig::VarintLengthDelimited(config) => {
204            encoding::FramingConfig::VarintLengthDelimited(
205                encoding::VarintLengthDelimitedEncoderConfig {
206                    max_frame_length: config.max_frame_length,
207                },
208            )
209        }
210    };
211
212    framing_config.build()
213}
214
215fn serializer_config_to_deserializer(
216    config: &SerializerConfig,
217) -> vector_lib::Result<decoding::Deserializer> {
218    let deserializer_config = match config {
219        SerializerConfig::Avro { .. } => todo!(),
220        SerializerConfig::Cef { .. } => todo!(),
221        SerializerConfig::Csv { .. } => todo!(),
222        SerializerConfig::Gelf { .. } => DeserializerConfig::Gelf(Default::default()),
223        SerializerConfig::Json(_) => DeserializerConfig::Json(Default::default()),
224        SerializerConfig::Logfmt => todo!(),
225        SerializerConfig::Native => DeserializerConfig::Native,
226        SerializerConfig::NativeJson => DeserializerConfig::NativeJson(Default::default()),
227        SerializerConfig::Protobuf(config) => {
228            DeserializerConfig::Protobuf(vector_lib::codecs::decoding::ProtobufDeserializerConfig {
229                protobuf: vector_lib::codecs::decoding::ProtobufDeserializerOptions {
230                    desc_file: config.protobuf.desc_file.clone(),
231                    message_type: config.protobuf.message_type.clone(),
232                },
233            })
234        }
235        SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes,
236    };
237
238    deserializer_config.build()
239}
240
241fn encoder_framing_to_decoding_framer(framing: encoding::FramingConfig) -> decoding::Framer {
242    let framing_config = match framing {
243        encoding::FramingConfig::Bytes => decoding::FramingConfig::Bytes,
244        encoding::FramingConfig::CharacterDelimited(config) => {
245            decoding::FramingConfig::CharacterDelimited(decoding::CharacterDelimitedDecoderConfig {
246                character_delimited: decoding::CharacterDelimitedDecoderOptions {
247                    delimiter: config.character_delimited.delimiter,
248                    max_length: None,
249                },
250            })
251        }
252        encoding::FramingConfig::LengthDelimited(config) => {
253            decoding::FramingConfig::LengthDelimited(decoding::LengthDelimitedDecoderConfig {
254                length_delimited: config.length_delimited.clone(),
255            })
256        }
257        encoding::FramingConfig::NewlineDelimited => {
258            decoding::FramingConfig::NewlineDelimited(Default::default())
259        }
260        vector_lib::codecs::encoding::FramingConfig::VarintLengthDelimited(config) => {
261            decoding::FramingConfig::VarintLengthDelimited(
262                decoding::VarintLengthDelimitedDecoderConfig {
263                    max_frame_length: config.max_frame_length,
264                },
265            )
266        }
267    };
268
269    framing_config.build()
270}
271
272/// Direction that the resource is operating in.
273#[derive(Clone)]
274pub enum ResourceDirection {
275    /// Resource will have the component pull data from it, or pull data from the component.
276    ///
277    /// For a source, where an external resource functions in "input" mode, this would be the
278    /// equivalent of the source calling out to the external resource (HTTP server, Kafka cluster,
279    /// etc) and asking for data, or expecting it to be returned in the response.
280    ///
281    /// For a sink, where an external resource functions in "output" mode, this would be the
282    /// equivalent of the sink exposing a network endpoint and having the external resource be
283    /// responsible for connecting to the endpoint to grab the data.
284    Pull,
285
286    /// Resource will push data to the component, or have data pushed to it from the component.
287    ///
288    /// For a source, where an external resource functions in "input" mode, this would be the
289    /// equivalent of the source waiting for data to be sent to either, whether it's listening on a
290    /// network endpoint for traffic, or polling files on disks for updates, and the external
291    /// resource would be responsible for initiating that communication, or writing to those files.
292    ///
293    /// For a sink, where an external resource functions in "output" mode, this would be the
294    /// equivalent of the sink pushing its data to a network endpoint, or writing data to files,
295    /// where the external resource would be responsible for aggregating that data, or read from
296    /// those files.
297    Push,
298}
299
300/// A resource definition.
301///
302/// Resource definitions uniquely identify the resource, such as HTTP, or files, and so on. These
303/// definitions generally include the bare minimum amount of information to allow the component
304/// validation runner to create an instance of them, such as spawning an HTTP server if a source has
305/// specified an HTTP resource in the "pull" direction.
306#[derive(Clone)]
307pub enum ResourceDefinition {
308    Http(HttpResourceConfig),
309}
310
311impl From<HttpResourceConfig> for ResourceDefinition {
312    fn from(config: HttpResourceConfig) -> Self {
313        Self::Http(config)
314    }
315}
316
317/// An external resource associated with a component.
318///
319/// External resources represent the hypothetical location where, depending on whether the component
320/// is a source or sink, data would be generated from or collected at. This includes things like
321/// network endpoints (raw sockets, HTTP servers, etc) as well as files on disk, and more. In other
322/// words, an external resource is a data dependency associated with the component, whether the
323/// component depends on data from the external resource, or the external resource depends on data
324/// from the component.
325///
326/// An external resource includes a direction -- push or pull -- as well as the fundamental
327/// definition of the resource, such as HTTP or file. The component type is used to further refine
328/// the direction of the resource, such that a "pull" resource used with a source implies the source
329/// will pull data from the external resource, whereas a "pull" resource used with a sink implies
330/// the external resource must pull the data from the sink.
331#[derive(Clone)]
332pub struct ExternalResource {
333    pub direction: ResourceDirection,
334    definition: ResourceDefinition,
335    pub codec: ResourceCodec,
336}
337
338impl ExternalResource {
339    /// Creates a new `ExternalResource` based on the given `direction`, `definition`, and `codec`.
340    pub fn new<D, C>(direction: ResourceDirection, definition: D, codec: C) -> Self
341    where
342        D: Into<ResourceDefinition>,
343        C: Into<ResourceCodec>,
344    {
345        Self {
346            direction,
347            definition: definition.into(),
348            codec: codec.into(),
349        }
350    }
351
352    /// Spawns this resource for use as an input to a source.
353    pub fn spawn_as_input(
354        self,
355        input_rx: mpsc::Receiver<TestEvent>,
356        task_coordinator: &TaskCoordinator<Configuring>,
357        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
358    ) {
359        match self.definition {
360            ResourceDefinition::Http(http_config) => http_config.spawn_as_input(
361                self.direction,
362                self.codec,
363                input_rx,
364                task_coordinator,
365                runner_metrics,
366            ),
367        }
368    }
369
370    /// Spawns this resource for use as an output for a sink.
371    pub fn spawn_as_output(
372        self,
373        output_tx: mpsc::Sender<Vec<Event>>,
374        task_coordinator: &TaskCoordinator<Configuring>,
375        input_events: Vec<TestEvent>,
376        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
377        log_namespace: LogNamespace,
378    ) -> vector_lib::Result<()> {
379        match self.definition {
380            ResourceDefinition::Http(http_config) => {
381                http_config.spawn_as_output(HttpResourceOutputContext {
382                    direction: self.direction,
383                    codec: self.codec,
384                    output_tx,
385                    task_coordinator,
386                    input_events,
387                    runner_metrics,
388                    log_namespace,
389                })
390            }
391        }
392    }
393}