vector/components/validation/resources/
mod.rs

1mod event;
2mod http;
3
4use std::sync::Arc;
5
6use tokio::sync::{Mutex, mpsc};
7use vector_lib::{
8    codecs::{
9        BytesEncoder,
10        decoding::{self, DeserializerConfig},
11        encoding::{
12            self, Framer, FramingConfig, JsonSerializerConfig, SerializerConfig,
13            TextSerializerConfig,
14        },
15    },
16    config::{DataType, LogNamespace},
17    event::Event,
18};
19
20use self::http::HttpResourceOutputContext;
21pub use self::{
22    event::{TestEvent, encode_test_event},
23    http::HttpResourceConfig,
24};
25use super::{
26    RunnerMetrics,
27    sync::{Configuring, TaskCoordinator},
28};
29use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming};
30
31/// The codec used by the external resource.
32///
33/// This enum specifically exists to encapsulate the two main ways a component will configure the
34/// codec it uses, which ends up being with directionally-specific codec configuration: "encoding"
35/// when taking an `Event` and convert it to a raw output, and "decoding" when taking a raw output
36/// and converting it to an `Event`.
37///
38/// Encoding and decoding is generally tied to sinks and sources, respectively.
39#[derive(Clone)]
40pub enum ResourceCodec {
41    /// Component encodes events.
42    ///
43    /// As opposed to `EncodingWithFramer`, this variant uses the default framing method defined by
44    /// the encoding itself.
45    ///
46    /// Generally speaking, only sinks encode: going from `Event` to an encoded form.
47    Encoding(EncodingConfig),
48
49    /// Component encodes events, with a specific framer.
50    ///
51    /// Generally speaking, only sinks encode: going from `Event` to an encoded form.
52    EncodingWithFraming(EncodingConfigWithFraming),
53
54    /// Component decodes events.
55    ///
56    /// Generally speaking, only sources decode: going from an encoded form to `Event`.
57    Decoding(DecodingConfig),
58}
59
60impl ResourceCodec {
61    /// Gets the allowed event data types for the configured codec.
62    ///
63    /// Not all codecs support all possible event types (i.e. a codec has no means to losslessly
64    /// represent the data in a particular event type) so we must check at runtime to ensure that
65    /// we're only generating event payloads that can be encoded/decoded for the given component.
66    pub fn allowed_event_data_types(self) -> DataType {
67        match self {
68            Self::Encoding(encoding) => encoding.config().input_type(),
69            Self::EncodingWithFraming(encoding) => encoding.config().1.input_type(),
70            Self::Decoding(decoding) => decoding.config().output_type(),
71        }
72    }
73
74    /// Gets an encoder for this codec.
75    ///
76    /// The encoder is generated as an inverse to the input codec: if a decoding configuration was
77    /// given, we generate an encoder that satisfies that decoding configuration, and vice versa.
78    pub fn into_encoder(&self) -> Encoder<encoding::Framer> {
79        let (framer, serializer) = match self {
80            Self::Encoding(config) => (
81                Framer::Bytes(BytesEncoder),
82                config.build().expect("should not fail to build serializer"),
83            ),
84            Self::EncodingWithFraming(config) => {
85                let (maybe_framing, serializer) = config.config();
86                (
87                    maybe_framing
88                        .clone()
89                        .unwrap_or(FramingConfig::Bytes)
90                        .build(),
91                    serializer
92                        .build()
93                        .expect("building serializer should never fail"),
94                )
95            }
96            Self::Decoding(config) => (
97                decoder_framing_to_encoding_framer(config.framing()),
98                deserializer_config_to_serializer(config.config()),
99            ),
100        };
101
102        Encoder::<encoding::Framer>::new(framer, serializer)
103    }
104
105    /// Gets a decoder for this codec.
106    ///
107    /// The decoder is generated as an inverse to the input codec: if an encoding configuration was
108    /// given, we generate a decoder that satisfies that encoding configuration, and vice versa.
109    pub fn into_decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
110        let (framer, deserializer) = match self {
111            Self::Decoding(config) => return config.build(),
112            Self::Encoding(config) => (
113                encoder_framing_to_decoding_framer(config.config().default_stream_framing()),
114                serializer_config_to_deserializer(config.config())?,
115            ),
116            Self::EncodingWithFraming(config) => {
117                let (maybe_framing, serializer) = config.config();
118                let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes);
119                (
120                    encoder_framing_to_decoding_framer(framing),
121                    serializer_config_to_deserializer(serializer)?,
122                )
123            }
124        };
125
126        Ok(Decoder::new(framer, deserializer).with_log_namespace(log_namespace))
127    }
128}
129
130impl From<EncodingConfig> for ResourceCodec {
131    fn from(config: EncodingConfig) -> Self {
132        Self::Encoding(config)
133    }
134}
135
136impl From<EncodingConfigWithFraming> for ResourceCodec {
137    fn from(config: EncodingConfigWithFraming) -> Self {
138        Self::EncodingWithFraming(config)
139    }
140}
141
142impl From<DecodingConfig> for ResourceCodec {
143    fn from(config: DecodingConfig) -> Self {
144        Self::Decoding(config)
145    }
146}
147
148fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::Serializer {
149    let serializer_config = match config {
150        // TODO: This isn't necessarily a one-to-one conversion, at least not in the future when
151        // "bytes" can be a top-level field and we aren't implicitly decoding everything into the
152        // `message` field... but it's close enough for now.
153        DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()),
154        DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()),
155        DeserializerConfig::Protobuf(config) => {
156            SerializerConfig::Protobuf(vector_lib::codecs::encoding::ProtobufSerializerConfig {
157                protobuf: vector_lib::codecs::encoding::ProtobufSerializerOptions {
158                    desc_file: config.protobuf.desc_file.clone(),
159                    message_type: config.protobuf.message_type.clone(),
160                    use_json_names: config.protobuf.use_json_names,
161                },
162            })
163        }
164        // TODO: We need to create an Avro serializer because, certainly, for any source decoding
165        // the data as Avro, we can't possibly send anything else without the source just
166        // immediately barfing.
167        #[cfg(feature = "codecs-syslog")]
168        DeserializerConfig::Syslog { .. } => SerializerConfig::Logfmt,
169        DeserializerConfig::Native => SerializerConfig::Native,
170        DeserializerConfig::NativeJson { .. } => SerializerConfig::NativeJson,
171        DeserializerConfig::Gelf { .. } => SerializerConfig::Gelf(Default::default()),
172        DeserializerConfig::Avro { avro } => SerializerConfig::Avro { avro: avro.into() },
173        // TODO: Influxdb has no serializer yet
174        DeserializerConfig::Influxdb { .. } => todo!(),
175        DeserializerConfig::Vrl { .. } => unimplemented!(),
176        #[cfg(feature = "codecs-opentelemetry")]
177        DeserializerConfig::Otlp { .. } => SerializerConfig::Otlp,
178    };
179
180    serializer_config
181        .build()
182        .expect("building serializer should never fail")
183}
184
185fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> encoding::Framer {
186    let framing_config = match framing {
187        decoding::FramingConfig::Bytes => encoding::FramingConfig::Bytes,
188        decoding::FramingConfig::CharacterDelimited(config) => {
189            encoding::FramingConfig::CharacterDelimited(encoding::CharacterDelimitedEncoderConfig {
190                character_delimited: encoding::CharacterDelimitedEncoderOptions {
191                    delimiter: config.character_delimited.delimiter,
192                },
193            })
194        }
195        decoding::FramingConfig::LengthDelimited(config) => {
196            encoding::FramingConfig::LengthDelimited(encoding::LengthDelimitedEncoderConfig {
197                length_delimited: config.length_delimited.clone(),
198            })
199        }
200        decoding::FramingConfig::NewlineDelimited(_) => encoding::FramingConfig::NewlineDelimited,
201        // TODO: There's no equivalent octet counting framer for encoding... although
202        // there's no particular reason that would make it hard to write.
203        decoding::FramingConfig::OctetCounting(_) => todo!(),
204        // TODO: chunked gelf is not supported yet in encoding
205        decoding::FramingConfig::ChunkedGelf(_) => todo!(),
206        decoding::FramingConfig::VarintLengthDelimited(config) => {
207            encoding::FramingConfig::VarintLengthDelimited(
208                encoding::VarintLengthDelimitedEncoderConfig {
209                    max_frame_length: config.max_frame_length,
210                },
211            )
212        }
213    };
214
215    framing_config.build()
216}
217
218fn serializer_config_to_deserializer(
219    config: &SerializerConfig,
220) -> vector_lib::Result<decoding::Deserializer> {
221    let deserializer_config = match config {
222        SerializerConfig::Avro { .. } => todo!(),
223        SerializerConfig::Cef { .. } => todo!(),
224        SerializerConfig::Csv { .. } => todo!(),
225        SerializerConfig::Gelf { .. } => DeserializerConfig::Gelf(Default::default()),
226        SerializerConfig::Json(_) => DeserializerConfig::Json(Default::default()),
227        SerializerConfig::Logfmt => todo!(),
228        SerializerConfig::Native => DeserializerConfig::Native,
229        SerializerConfig::NativeJson => DeserializerConfig::NativeJson(Default::default()),
230        SerializerConfig::Protobuf(config) => {
231            DeserializerConfig::Protobuf(vector_lib::codecs::decoding::ProtobufDeserializerConfig {
232                protobuf: vector_lib::codecs::decoding::ProtobufDeserializerOptions {
233                    desc_file: config.protobuf.desc_file.clone(),
234                    message_type: config.protobuf.message_type.clone(),
235                    use_json_names: config.protobuf.use_json_names,
236                },
237            })
238        }
239        SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes,
240        #[cfg(feature = "codecs-opentelemetry")]
241        SerializerConfig::Otlp => todo!(),
242    };
243
244    deserializer_config.build()
245}
246
247fn encoder_framing_to_decoding_framer(framing: encoding::FramingConfig) -> decoding::Framer {
248    let framing_config = match framing {
249        encoding::FramingConfig::Bytes => decoding::FramingConfig::Bytes,
250        encoding::FramingConfig::CharacterDelimited(config) => {
251            decoding::FramingConfig::CharacterDelimited(decoding::CharacterDelimitedDecoderConfig {
252                character_delimited: decoding::CharacterDelimitedDecoderOptions {
253                    delimiter: config.character_delimited.delimiter,
254                    max_length: None,
255                },
256            })
257        }
258        encoding::FramingConfig::LengthDelimited(config) => {
259            decoding::FramingConfig::LengthDelimited(decoding::LengthDelimitedDecoderConfig {
260                length_delimited: config.length_delimited.clone(),
261            })
262        }
263        encoding::FramingConfig::NewlineDelimited => {
264            decoding::FramingConfig::NewlineDelimited(Default::default())
265        }
266        vector_lib::codecs::encoding::FramingConfig::VarintLengthDelimited(config) => {
267            decoding::FramingConfig::VarintLengthDelimited(
268                decoding::VarintLengthDelimitedDecoderConfig {
269                    max_frame_length: config.max_frame_length,
270                },
271            )
272        }
273    };
274
275    framing_config.build()
276}
277
278/// Direction that the resource is operating in.
279#[derive(Clone)]
280pub enum ResourceDirection {
281    /// Resource will have the component pull data from it, or pull data from the component.
282    ///
283    /// For a source, where an external resource functions in "input" mode, this would be the
284    /// equivalent of the source calling out to the external resource (HTTP server, Kafka cluster,
285    /// etc) and asking for data, or expecting it to be returned in the response.
286    ///
287    /// For a sink, where an external resource functions in "output" mode, this would be the
288    /// equivalent of the sink exposing a network endpoint and having the external resource be
289    /// responsible for connecting to the endpoint to grab the data.
290    Pull,
291
292    /// Resource will push data to the component, or have data pushed to it from the component.
293    ///
294    /// For a source, where an external resource functions in "input" mode, this would be the
295    /// equivalent of the source waiting for data to be sent to either, whether it's listening on a
296    /// network endpoint for traffic, or polling files on disks for updates, and the external
297    /// resource would be responsible for initiating that communication, or writing to those files.
298    ///
299    /// For a sink, where an external resource functions in "output" mode, this would be the
300    /// equivalent of the sink pushing its data to a network endpoint, or writing data to files,
301    /// where the external resource would be responsible for aggregating that data, or read from
302    /// those files.
303    Push,
304}
305
306/// A resource definition.
307///
308/// Resource definitions uniquely identify the resource, such as HTTP, or files, and so on. These
309/// definitions generally include the bare minimum amount of information to allow the component
310/// validation runner to create an instance of them, such as spawning an HTTP server if a source has
311/// specified an HTTP resource in the "pull" direction.
312#[derive(Clone)]
313pub enum ResourceDefinition {
314    Http(HttpResourceConfig),
315}
316
317impl From<HttpResourceConfig> for ResourceDefinition {
318    fn from(config: HttpResourceConfig) -> Self {
319        Self::Http(config)
320    }
321}
322
323/// An external resource associated with a component.
324///
325/// External resources represent the hypothetical location where, depending on whether the component
326/// is a source or sink, data would be generated from or collected at. This includes things like
327/// network endpoints (raw sockets, HTTP servers, etc) as well as files on disk, and more. In other
328/// words, an external resource is a data dependency associated with the component, whether the
329/// component depends on data from the external resource, or the external resource depends on data
330/// from the component.
331///
332/// An external resource includes a direction -- push or pull -- as well as the fundamental
333/// definition of the resource, such as HTTP or file. The component type is used to further refine
334/// the direction of the resource, such that a "pull" resource used with a source implies the source
335/// will pull data from the external resource, whereas a "pull" resource used with a sink implies
336/// the external resource must pull the data from the sink.
337#[derive(Clone)]
338pub struct ExternalResource {
339    pub direction: ResourceDirection,
340    definition: ResourceDefinition,
341    pub codec: ResourceCodec,
342}
343
344impl ExternalResource {
345    /// Creates a new `ExternalResource` based on the given `direction`, `definition`, and `codec`.
346    pub fn new<D, C>(direction: ResourceDirection, definition: D, codec: C) -> Self
347    where
348        D: Into<ResourceDefinition>,
349        C: Into<ResourceCodec>,
350    {
351        Self {
352            direction,
353            definition: definition.into(),
354            codec: codec.into(),
355        }
356    }
357
358    /// Spawns this resource for use as an input to a source.
359    pub fn spawn_as_input(
360        self,
361        input_rx: mpsc::Receiver<TestEvent>,
362        task_coordinator: &TaskCoordinator<Configuring>,
363        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
364    ) {
365        match self.definition {
366            ResourceDefinition::Http(http_config) => http_config.spawn_as_input(
367                self.direction,
368                self.codec,
369                input_rx,
370                task_coordinator,
371                runner_metrics,
372            ),
373        }
374    }
375
376    /// Spawns this resource for use as an output for a sink.
377    pub fn spawn_as_output(
378        self,
379        output_tx: mpsc::Sender<Vec<Event>>,
380        task_coordinator: &TaskCoordinator<Configuring>,
381        input_events: Vec<TestEvent>,
382        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
383        log_namespace: LogNamespace,
384    ) -> vector_lib::Result<()> {
385        match self.definition {
386            ResourceDefinition::Http(http_config) => {
387                http_config.spawn_as_output(HttpResourceOutputContext {
388                    direction: self.direction,
389                    codec: self.codec,
390                    output_tx,
391                    task_coordinator,
392                    input_events,
393                    runner_metrics,
394                    log_namespace,
395                })
396            }
397        }
398    }
399}