vector/components/validation/resources/
mod.rs

1mod event;
2mod http;
3
4use std::sync::Arc;
5
6use tokio::sync::{mpsc, Mutex};
7use vector_lib::{
8    codecs::{
9        decoding::{self, DeserializerConfig},
10        encoding::{
11            self, Framer, FramingConfig, JsonSerializerConfig, SerializerConfig,
12            TextSerializerConfig,
13        },
14        BytesEncoder,
15    },
16    config::LogNamespace,
17};
18use vector_lib::{config::DataType, event::Event};
19
20use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming};
21
22pub use self::event::{encode_test_event, TestEvent};
23pub use self::http::HttpResourceConfig;
24use self::http::HttpResourceOutputContext;
25
26use super::{
27    sync::{Configuring, TaskCoordinator},
28    RunnerMetrics,
29};
30
31/// The codec used by the external resource.
32///
33/// This enum specifically exists to encapsulate the two main ways a component will configure the
34/// codec it uses, which ends up being with directionally-specific codec configuration: "encoding"
35/// when taking an `Event` and convert it to a raw output, and "decoding" when taking a raw output
36/// and converting it to an `Event`.
37///
38/// Encoding and decoding is generally tied to sinks and sources, respectively.
39#[derive(Clone)]
40pub enum ResourceCodec {
41    /// Component encodes events.
42    ///
43    /// As opposed to `EncodingWithFramer`, this variant uses the default framing method defined by
44    /// the encoding itself.
45    ///
46    /// Generally speaking, only sinks encode: going from `Event` to an encoded form.
47    Encoding(EncodingConfig),
48
49    /// Component encodes events, with a specific framer.
50    ///
51    /// Generally speaking, only sinks encode: going from `Event` to an encoded form.
52    EncodingWithFraming(EncodingConfigWithFraming),
53
54    /// Component decodes events.
55    ///
56    /// Generally speaking, only sources decode: going from an encoded form to `Event`.
57    Decoding(DecodingConfig),
58}
59
60impl ResourceCodec {
61    /// Gets the allowed event data types for the configured codec.
62    ///
63    /// Not all codecs support all possible event types (i.e. a codec has no means to losslessly
64    /// represent the data in a particular event type) so we must check at runtime to ensure that
65    /// we're only generating event payloads that can be encoded/decoded for the given component.
66    pub fn allowed_event_data_types(self) -> DataType {
67        match self {
68            Self::Encoding(encoding) => encoding.config().input_type(),
69            Self::EncodingWithFraming(encoding) => encoding.config().1.input_type(),
70            Self::Decoding(decoding) => decoding.config().output_type(),
71        }
72    }
73
74    /// Gets an encoder for this codec.
75    ///
76    /// The encoder is generated as an inverse to the input codec: if a decoding configuration was
77    /// given, we generate an encoder that satisfies that decoding configuration, and vice versa.
78    pub fn into_encoder(&self) -> Encoder<encoding::Framer> {
79        let (framer, serializer) = match self {
80            Self::Encoding(config) => (
81                Framer::Bytes(BytesEncoder),
82                config.build().expect("should not fail to build serializer"),
83            ),
84            Self::EncodingWithFraming(config) => {
85                let (maybe_framing, serializer) = config.config();
86                (
87                    maybe_framing
88                        .clone()
89                        .unwrap_or(FramingConfig::Bytes)
90                        .build(),
91                    serializer
92                        .build()
93                        .expect("building serializer should never fail"),
94                )
95            }
96            Self::Decoding(config) => (
97                decoder_framing_to_encoding_framer(config.framing()),
98                deserializer_config_to_serializer(config.config()),
99            ),
100        };
101
102        Encoder::<encoding::Framer>::new(framer, serializer)
103    }
104
105    /// Gets a decoder for this codec.
106    ///
107    /// The decoder is generated as an inverse to the input codec: if an encoding configuration was
108    /// given, we generate a decoder that satisfies that encoding configuration, and vice versa.
109    pub fn into_decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
110        let (framer, deserializer) = match self {
111            Self::Decoding(config) => return config.build(),
112            Self::Encoding(config) => (
113                encoder_framing_to_decoding_framer(config.config().default_stream_framing()),
114                serializer_config_to_deserializer(config.config())?,
115            ),
116            Self::EncodingWithFraming(config) => {
117                let (maybe_framing, serializer) = config.config();
118                let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes);
119                (
120                    encoder_framing_to_decoding_framer(framing),
121                    serializer_config_to_deserializer(serializer)?,
122                )
123            }
124        };
125
126        Ok(Decoder::new(framer, deserializer).with_log_namespace(log_namespace))
127    }
128}
129
130impl From<EncodingConfig> for ResourceCodec {
131    fn from(config: EncodingConfig) -> Self {
132        Self::Encoding(config)
133    }
134}
135
136impl From<EncodingConfigWithFraming> for ResourceCodec {
137    fn from(config: EncodingConfigWithFraming) -> Self {
138        Self::EncodingWithFraming(config)
139    }
140}
141
142impl From<DecodingConfig> for ResourceCodec {
143    fn from(config: DecodingConfig) -> Self {
144        Self::Decoding(config)
145    }
146}
147
148fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::Serializer {
149    let serializer_config = match config {
150        // TODO: This isn't necessarily a one-to-one conversion, at least not in the future when
151        // "bytes" can be a top-level field and we aren't implicitly decoding everything into the
152        // `message` field... but it's close enough for now.
153        DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()),
154        DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()),
155        DeserializerConfig::Protobuf(config) => {
156            SerializerConfig::Protobuf(vector_lib::codecs::encoding::ProtobufSerializerConfig {
157                protobuf: vector_lib::codecs::encoding::ProtobufSerializerOptions {
158                    desc_file: config.protobuf.desc_file.clone(),
159                    message_type: config.protobuf.message_type.clone(),
160                },
161            })
162        }
163        // TODO: We need to create an Avro serializer because, certainly, for any source decoding
164        // the data as Avro, we can't possibly send anything else without the source just
165        // immediately barfing.
166        #[cfg(feature = "codecs-syslog")]
167        DeserializerConfig::Syslog { .. } => SerializerConfig::Logfmt,
168        DeserializerConfig::Native => SerializerConfig::Native,
169        DeserializerConfig::NativeJson { .. } => SerializerConfig::NativeJson,
170        DeserializerConfig::Gelf { .. } => SerializerConfig::Gelf,
171        DeserializerConfig::Avro { avro } => SerializerConfig::Avro { avro: avro.into() },
172        // TODO: Influxdb has no serializer yet
173        DeserializerConfig::Influxdb { .. } => todo!(),
174        DeserializerConfig::Vrl { .. } => unimplemented!(),
175    };
176
177    serializer_config
178        .build()
179        .expect("building serializer should never fail")
180}
181
182fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> encoding::Framer {
183    let framing_config = match framing {
184        decoding::FramingConfig::Bytes => encoding::FramingConfig::Bytes,
185        decoding::FramingConfig::CharacterDelimited(config) => {
186            encoding::FramingConfig::CharacterDelimited(encoding::CharacterDelimitedEncoderConfig {
187                character_delimited: encoding::CharacterDelimitedEncoderOptions {
188                    delimiter: config.character_delimited.delimiter,
189                },
190            })
191        }
192        decoding::FramingConfig::LengthDelimited(config) => {
193            encoding::FramingConfig::LengthDelimited(encoding::LengthDelimitedEncoderConfig {
194                length_delimited: config.length_delimited.clone(),
195            })
196        }
197        decoding::FramingConfig::NewlineDelimited(_) => encoding::FramingConfig::NewlineDelimited,
198        // TODO: There's no equivalent octet counting framer for encoding... although
199        // there's no particular reason that would make it hard to write.
200        decoding::FramingConfig::OctetCounting(_) => todo!(),
201        // TODO: chunked gelf is not supported yet in encoding
202        decoding::FramingConfig::ChunkedGelf(_) => todo!(),
203    };
204
205    framing_config.build()
206}
207
208fn serializer_config_to_deserializer(
209    config: &SerializerConfig,
210) -> vector_lib::Result<decoding::Deserializer> {
211    let deserializer_config = match config {
212        SerializerConfig::Avro { .. } => todo!(),
213        SerializerConfig::Cef { .. } => todo!(),
214        SerializerConfig::Csv { .. } => todo!(),
215        SerializerConfig::Gelf => DeserializerConfig::Gelf(Default::default()),
216        SerializerConfig::Json(_) => DeserializerConfig::Json(Default::default()),
217        SerializerConfig::Logfmt => todo!(),
218        SerializerConfig::Native => DeserializerConfig::Native,
219        SerializerConfig::NativeJson => DeserializerConfig::NativeJson(Default::default()),
220        SerializerConfig::Protobuf(config) => {
221            DeserializerConfig::Protobuf(vector_lib::codecs::decoding::ProtobufDeserializerConfig {
222                protobuf: vector_lib::codecs::decoding::ProtobufDeserializerOptions {
223                    desc_file: config.protobuf.desc_file.clone(),
224                    message_type: config.protobuf.message_type.clone(),
225                },
226            })
227        }
228        SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes,
229    };
230
231    deserializer_config.build()
232}
233
234fn encoder_framing_to_decoding_framer(framing: encoding::FramingConfig) -> decoding::Framer {
235    let framing_config = match framing {
236        encoding::FramingConfig::Bytes => decoding::FramingConfig::Bytes,
237        encoding::FramingConfig::CharacterDelimited(config) => {
238            decoding::FramingConfig::CharacterDelimited(decoding::CharacterDelimitedDecoderConfig {
239                character_delimited: decoding::CharacterDelimitedDecoderOptions {
240                    delimiter: config.character_delimited.delimiter,
241                    max_length: None,
242                },
243            })
244        }
245        encoding::FramingConfig::LengthDelimited(config) => {
246            decoding::FramingConfig::LengthDelimited(decoding::LengthDelimitedDecoderConfig {
247                length_delimited: config.length_delimited.clone(),
248            })
249        }
250        encoding::FramingConfig::NewlineDelimited => {
251            decoding::FramingConfig::NewlineDelimited(Default::default())
252        }
253    };
254
255    framing_config.build()
256}
257
258/// Direction that the resource is operating in.
259#[derive(Clone)]
260pub enum ResourceDirection {
261    /// Resource will have the component pull data from it, or pull data from the component.
262    ///
263    /// For a source, where an external resource functions in "input" mode, this would be the
264    /// equivalent of the source calling out to the external resource (HTTP server, Kafka cluster,
265    /// etc) and asking for data, or expecting it to be returned in the response.
266    ///
267    /// For a sink, where an external resource functions in "output" mode, this would be the
268    /// equivalent of the sink exposing a network endpoint and having the external resource be
269    /// responsible for connecting to the endpoint to grab the data.
270    Pull,
271
272    /// Resource will push data to the component, or have data pushed to it from the component.
273    ///
274    /// For a source, where an external resource functions in "input" mode, this would be the
275    /// equivalent of the source waiting for data to be sent to either, whether it's listening on a
276    /// network endpoint for traffic, or polling files on disks for updates, and the external
277    /// resource would be responsible for initiating that communication, or writing to those files.
278    ///
279    /// For a sink, where an external resource functions in "output" mode, this would be the
280    /// equivalent of the sink pushing its data to a network endpoint, or writing data to files,
281    /// where the external resource would be responsible for aggregating that data, or read from
282    /// those files.
283    Push,
284}
285
286/// A resource definition.
287///
288/// Resource definitions uniquely identify the resource, such as HTTP, or files, and so on. These
289/// definitions generally include the bare minimum amount of information to allow the component
290/// validation runner to create an instance of them, such as spawning an HTTP server if a source has
291/// specified an HTTP resource in the "pull" direction.
292#[derive(Clone)]
293pub enum ResourceDefinition {
294    Http(HttpResourceConfig),
295}
296
297impl From<HttpResourceConfig> for ResourceDefinition {
298    fn from(config: HttpResourceConfig) -> Self {
299        Self::Http(config)
300    }
301}
302
303/// An external resource associated with a component.
304///
305/// External resources represent the hypothetical location where, depending on whether the component
306/// is a source or sink, data would be generated from or collected at. This includes things like
307/// network endpoints (raw sockets, HTTP servers, etc) as well as files on disk, and more. In other
308/// words, an external resource is a data dependency associated with the component, whether the
309/// component depends on data from the external resource, or the external resource depends on data
310/// from the component.
311///
312/// An external resource includes a direction -- push or pull -- as well as the fundamental
313/// definition of the resource, such as HTTP or file. The component type is used to further refine
314/// the direction of the resource, such that a "pull" resource used with a source implies the source
315/// will pull data from the external resource, whereas a "pull" resource used with a sink implies
316/// the external resource must pull the data from the sink.
317#[derive(Clone)]
318pub struct ExternalResource {
319    pub direction: ResourceDirection,
320    definition: ResourceDefinition,
321    pub codec: ResourceCodec,
322}
323
324impl ExternalResource {
325    /// Creates a new `ExternalResource` based on the given `direction`, `definition`, and `codec`.
326    pub fn new<D, C>(direction: ResourceDirection, definition: D, codec: C) -> Self
327    where
328        D: Into<ResourceDefinition>,
329        C: Into<ResourceCodec>,
330    {
331        Self {
332            direction,
333            definition: definition.into(),
334            codec: codec.into(),
335        }
336    }
337
338    /// Spawns this resource for use as an input to a source.
339    pub fn spawn_as_input(
340        self,
341        input_rx: mpsc::Receiver<TestEvent>,
342        task_coordinator: &TaskCoordinator<Configuring>,
343        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
344    ) {
345        match self.definition {
346            ResourceDefinition::Http(http_config) => http_config.spawn_as_input(
347                self.direction,
348                self.codec,
349                input_rx,
350                task_coordinator,
351                runner_metrics,
352            ),
353        }
354    }
355
356    /// Spawns this resource for use as an output for a sink.
357    pub fn spawn_as_output(
358        self,
359        output_tx: mpsc::Sender<Vec<Event>>,
360        task_coordinator: &TaskCoordinator<Configuring>,
361        input_events: Vec<TestEvent>,
362        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
363        log_namespace: LogNamespace,
364    ) -> vector_lib::Result<()> {
365        match self.definition {
366            ResourceDefinition::Http(http_config) => {
367                http_config.spawn_as_output(HttpResourceOutputContext {
368                    direction: self.direction,
369                    codec: self.codec,
370                    output_tx,
371                    task_coordinator,
372                    input_events,
373                    runner_metrics,
374                    log_namespace,
375                })
376            }
377        }
378    }
379}