vector/components/validation/resources/
mod.rs

1mod event;
2mod http;
3
4use std::sync::Arc;
5
6use tokio::sync::{Mutex, mpsc};
7use vector_lib::{
8    codecs::{
9        BytesEncoder, Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming,
10        decoding::{self, DeserializerConfig},
11        encoding::{
12            self, Framer, FramingConfig, JsonSerializerConfig, SerializerConfig,
13            TextSerializerConfig,
14        },
15    },
16    config::{DataType, LogNamespace},
17    event::Event,
18};
19
20use self::http::HttpResourceOutputContext;
21pub use self::{
22    event::{TestEvent, encode_test_event},
23    http::HttpResourceConfig,
24};
25use super::{
26    RunnerMetrics,
27    sync::{Configuring, TaskCoordinator},
28};
29
30/// The codec used by the external resource.
31///
32/// This enum specifically exists to encapsulate the two main ways a component will configure the
33/// codec it uses, which ends up being with directionally-specific codec configuration: "encoding"
34/// when taking an `Event` and convert it to a raw output, and "decoding" when taking a raw output
35/// and converting it to an `Event`.
36///
37/// Encoding and decoding is generally tied to sinks and sources, respectively.
38#[derive(Clone)]
39pub enum ResourceCodec {
40    /// Component encodes events.
41    ///
42    /// As opposed to `EncodingWithFramer`, this variant uses the default framing method defined by
43    /// the encoding itself.
44    ///
45    /// Generally speaking, only sinks encode: going from `Event` to an encoded form.
46    Encoding(EncodingConfig),
47
48    /// Component encodes events, with a specific framer.
49    ///
50    /// Generally speaking, only sinks encode: going from `Event` to an encoded form.
51    EncodingWithFraming(EncodingConfigWithFraming),
52
53    /// Component decodes events.
54    ///
55    /// Generally speaking, only sources decode: going from an encoded form to `Event`.
56    Decoding(DecodingConfig),
57}
58
59impl ResourceCodec {
60    /// Gets the allowed event data types for the configured codec.
61    ///
62    /// Not all codecs support all possible event types (i.e. a codec has no means to losslessly
63    /// represent the data in a particular event type) so we must check at runtime to ensure that
64    /// we're only generating event payloads that can be encoded/decoded for the given component.
65    pub fn allowed_event_data_types(self) -> DataType {
66        match self {
67            Self::Encoding(encoding) => encoding.config().input_type(),
68            Self::EncodingWithFraming(encoding) => encoding.config().1.input_type(),
69            Self::Decoding(decoding) => decoding.config().output_type(),
70        }
71    }
72
73    /// Gets an encoder for this codec.
74    ///
75    /// The encoder is generated as an inverse to the input codec: if a decoding configuration was
76    /// given, we generate an encoder that satisfies that decoding configuration, and vice versa.
77    pub fn into_encoder(&self) -> Encoder<encoding::Framer> {
78        let (framer, serializer) = match self {
79            Self::Encoding(config) => (
80                Framer::Bytes(BytesEncoder),
81                config.build().expect("should not fail to build serializer"),
82            ),
83            Self::EncodingWithFraming(config) => {
84                let (maybe_framing, serializer) = config.config();
85                (
86                    maybe_framing
87                        .clone()
88                        .unwrap_or(FramingConfig::Bytes)
89                        .build(),
90                    serializer
91                        .build()
92                        .expect("building serializer should never fail"),
93                )
94            }
95            Self::Decoding(config) => (
96                decoder_framing_to_encoding_framer(config.framing()),
97                deserializer_config_to_serializer(config.config()),
98            ),
99        };
100
101        Encoder::<encoding::Framer>::new(framer, serializer)
102    }
103
104    /// Gets a decoder for this codec.
105    ///
106    /// The decoder is generated as an inverse to the input codec: if an encoding configuration was
107    /// given, we generate a decoder that satisfies that encoding configuration, and vice versa.
108    pub fn into_decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
109        let (framer, deserializer) = match self {
110            Self::Decoding(config) => return config.build(),
111            Self::Encoding(config) => (
112                encoder_framing_to_decoding_framer(config.config().default_stream_framing()),
113                serializer_config_to_deserializer(config.config())?,
114            ),
115            Self::EncodingWithFraming(config) => {
116                let (maybe_framing, serializer) = config.config();
117                let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes);
118                (
119                    encoder_framing_to_decoding_framer(framing),
120                    serializer_config_to_deserializer(serializer)?,
121                )
122            }
123        };
124
125        Ok(Decoder::new(framer, deserializer).with_log_namespace(log_namespace))
126    }
127}
128
129impl From<EncodingConfig> for ResourceCodec {
130    fn from(config: EncodingConfig) -> Self {
131        Self::Encoding(config)
132    }
133}
134
135impl From<EncodingConfigWithFraming> for ResourceCodec {
136    fn from(config: EncodingConfigWithFraming) -> Self {
137        Self::EncodingWithFraming(config)
138    }
139}
140
141impl From<DecodingConfig> for ResourceCodec {
142    fn from(config: DecodingConfig) -> Self {
143        Self::Decoding(config)
144    }
145}
146
147fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::Serializer {
148    let serializer_config = match config {
149        // TODO: This isn't necessarily a one-to-one conversion, at least not in the future when
150        // "bytes" can be a top-level field and we aren't implicitly decoding everything into the
151        // `message` field... but it's close enough for now.
152        DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()),
153        DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()),
154        DeserializerConfig::Protobuf(config) => {
155            SerializerConfig::Protobuf(vector_lib::codecs::encoding::ProtobufSerializerConfig {
156                protobuf: vector_lib::codecs::encoding::ProtobufSerializerOptions {
157                    desc_file: config.protobuf.desc_file.clone(),
158                    message_type: config.protobuf.message_type.clone(),
159                    use_json_names: config.protobuf.use_json_names,
160                },
161            })
162        }
163        // TODO: We need to create an Avro serializer because, certainly, for any source decoding
164        // the data as Avro, we can't possibly send anything else without the source just
165        // immediately barfing.
166        #[cfg(feature = "codecs-syslog")]
167        DeserializerConfig::Syslog { .. } => SerializerConfig::Logfmt,
168        DeserializerConfig::Native => SerializerConfig::Native,
169        DeserializerConfig::NativeJson { .. } => SerializerConfig::NativeJson,
170        DeserializerConfig::Gelf { .. } => SerializerConfig::Gelf(Default::default()),
171        DeserializerConfig::Avro { avro } => SerializerConfig::Avro { avro: avro.into() },
172        // TODO: Influxdb has no serializer yet
173        DeserializerConfig::Influxdb { .. } => todo!(),
174        DeserializerConfig::Vrl { .. } => unimplemented!(),
175        #[cfg(feature = "codecs-opentelemetry")]
176        DeserializerConfig::Otlp { .. } => SerializerConfig::Otlp,
177    };
178
179    serializer_config
180        .build()
181        .expect("building serializer should never fail")
182}
183
184fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> encoding::Framer {
185    let framing_config = match framing {
186        decoding::FramingConfig::Bytes => encoding::FramingConfig::Bytes,
187        decoding::FramingConfig::CharacterDelimited(config) => {
188            encoding::FramingConfig::CharacterDelimited(encoding::CharacterDelimitedEncoderConfig {
189                character_delimited: encoding::CharacterDelimitedEncoderOptions {
190                    delimiter: config.character_delimited.delimiter,
191                },
192            })
193        }
194        decoding::FramingConfig::LengthDelimited(config) => {
195            encoding::FramingConfig::LengthDelimited(encoding::LengthDelimitedEncoderConfig {
196                length_delimited: config.length_delimited.clone(),
197            })
198        }
199        decoding::FramingConfig::NewlineDelimited(_) => encoding::FramingConfig::NewlineDelimited,
200        // TODO: There's no equivalent octet counting framer for encoding... although
201        // there's no particular reason that would make it hard to write.
202        decoding::FramingConfig::OctetCounting(_) => todo!(),
203        // TODO: chunked gelf is not supported yet in encoding
204        decoding::FramingConfig::ChunkedGelf(_) => todo!(),
205        decoding::FramingConfig::VarintLengthDelimited(config) => {
206            encoding::FramingConfig::VarintLengthDelimited(
207                encoding::VarintLengthDelimitedEncoderConfig {
208                    max_frame_length: config.max_frame_length,
209                },
210            )
211        }
212    };
213
214    framing_config.build()
215}
216
217fn serializer_config_to_deserializer(
218    config: &SerializerConfig,
219) -> vector_lib::Result<decoding::Deserializer> {
220    let deserializer_config = match config {
221        SerializerConfig::Avro { .. } => todo!(),
222        SerializerConfig::Cef { .. } => todo!(),
223        SerializerConfig::Csv { .. } => todo!(),
224        SerializerConfig::Gelf { .. } => DeserializerConfig::Gelf(Default::default()),
225        SerializerConfig::Json(_) => DeserializerConfig::Json(Default::default()),
226        SerializerConfig::Logfmt => todo!(),
227        SerializerConfig::Native => DeserializerConfig::Native,
228        SerializerConfig::NativeJson => DeserializerConfig::NativeJson(Default::default()),
229        SerializerConfig::Protobuf(config) => {
230            DeserializerConfig::Protobuf(vector_lib::codecs::decoding::ProtobufDeserializerConfig {
231                protobuf: vector_lib::codecs::decoding::ProtobufDeserializerOptions {
232                    desc_file: config.protobuf.desc_file.clone(),
233                    message_type: config.protobuf.message_type.clone(),
234                    use_json_names: config.protobuf.use_json_names,
235                },
236            })
237        }
238        SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes,
239        #[cfg(feature = "codecs-opentelemetry")]
240        SerializerConfig::Otlp => todo!(),
241        #[cfg(feature = "codecs-syslog")]
242        SerializerConfig::Syslog(_) => todo!(),
243    };
244
245    deserializer_config.build()
246}
247
248fn encoder_framing_to_decoding_framer(framing: encoding::FramingConfig) -> decoding::Framer {
249    let framing_config = match framing {
250        encoding::FramingConfig::Bytes => decoding::FramingConfig::Bytes,
251        encoding::FramingConfig::CharacterDelimited(config) => {
252            decoding::FramingConfig::CharacterDelimited(decoding::CharacterDelimitedDecoderConfig {
253                character_delimited: decoding::CharacterDelimitedDecoderOptions {
254                    delimiter: config.character_delimited.delimiter,
255                    max_length: None,
256                },
257            })
258        }
259        encoding::FramingConfig::LengthDelimited(config) => {
260            decoding::FramingConfig::LengthDelimited(decoding::LengthDelimitedDecoderConfig {
261                length_delimited: config.length_delimited.clone(),
262            })
263        }
264        encoding::FramingConfig::NewlineDelimited => {
265            decoding::FramingConfig::NewlineDelimited(Default::default())
266        }
267        vector_lib::codecs::encoding::FramingConfig::VarintLengthDelimited(config) => {
268            decoding::FramingConfig::VarintLengthDelimited(
269                decoding::VarintLengthDelimitedDecoderConfig {
270                    max_frame_length: config.max_frame_length,
271                },
272            )
273        }
274    };
275
276    framing_config.build()
277}
278
279/// Direction that the resource is operating in.
280#[derive(Clone)]
281pub enum ResourceDirection {
282    /// Resource will have the component pull data from it, or pull data from the component.
283    ///
284    /// For a source, where an external resource functions in "input" mode, this would be the
285    /// equivalent of the source calling out to the external resource (HTTP server, Kafka cluster,
286    /// etc) and asking for data, or expecting it to be returned in the response.
287    ///
288    /// For a sink, where an external resource functions in "output" mode, this would be the
289    /// equivalent of the sink exposing a network endpoint and having the external resource be
290    /// responsible for connecting to the endpoint to grab the data.
291    Pull,
292
293    /// Resource will push data to the component, or have data pushed to it from the component.
294    ///
295    /// For a source, where an external resource functions in "input" mode, this would be the
296    /// equivalent of the source waiting for data to be sent to either, whether it's listening on a
297    /// network endpoint for traffic, or polling files on disks for updates, and the external
298    /// resource would be responsible for initiating that communication, or writing to those files.
299    ///
300    /// For a sink, where an external resource functions in "output" mode, this would be the
301    /// equivalent of the sink pushing its data to a network endpoint, or writing data to files,
302    /// where the external resource would be responsible for aggregating that data, or read from
303    /// those files.
304    Push,
305}
306
307/// A resource definition.
308///
309/// Resource definitions uniquely identify the resource, such as HTTP, or files, and so on. These
310/// definitions generally include the bare minimum amount of information to allow the component
311/// validation runner to create an instance of them, such as spawning an HTTP server if a source has
312/// specified an HTTP resource in the "pull" direction.
313#[derive(Clone)]
314pub enum ResourceDefinition {
315    Http(HttpResourceConfig),
316}
317
318impl From<HttpResourceConfig> for ResourceDefinition {
319    fn from(config: HttpResourceConfig) -> Self {
320        Self::Http(config)
321    }
322}
323
324/// An external resource associated with a component.
325///
326/// External resources represent the hypothetical location where, depending on whether the component
327/// is a source or sink, data would be generated from or collected at. This includes things like
328/// network endpoints (raw sockets, HTTP servers, etc) as well as files on disk, and more. In other
329/// words, an external resource is a data dependency associated with the component, whether the
330/// component depends on data from the external resource, or the external resource depends on data
331/// from the component.
332///
333/// An external resource includes a direction -- push or pull -- as well as the fundamental
334/// definition of the resource, such as HTTP or file. The component type is used to further refine
335/// the direction of the resource, such that a "pull" resource used with a source implies the source
336/// will pull data from the external resource, whereas a "pull" resource used with a sink implies
337/// the external resource must pull the data from the sink.
338#[derive(Clone)]
339pub struct ExternalResource {
340    pub direction: ResourceDirection,
341    definition: ResourceDefinition,
342    pub codec: ResourceCodec,
343}
344
345impl ExternalResource {
346    /// Creates a new `ExternalResource` based on the given `direction`, `definition`, and `codec`.
347    pub fn new<D, C>(direction: ResourceDirection, definition: D, codec: C) -> Self
348    where
349        D: Into<ResourceDefinition>,
350        C: Into<ResourceCodec>,
351    {
352        Self {
353            direction,
354            definition: definition.into(),
355            codec: codec.into(),
356        }
357    }
358
359    /// Spawns this resource for use as an input to a source.
360    pub fn spawn_as_input(
361        self,
362        input_rx: mpsc::Receiver<TestEvent>,
363        task_coordinator: &TaskCoordinator<Configuring>,
364        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
365    ) {
366        match self.definition {
367            ResourceDefinition::Http(http_config) => http_config.spawn_as_input(
368                self.direction,
369                self.codec,
370                input_rx,
371                task_coordinator,
372                runner_metrics,
373            ),
374        }
375    }
376
377    /// Spawns this resource for use as an output for a sink.
378    pub fn spawn_as_output(
379        self,
380        output_tx: mpsc::Sender<Vec<Event>>,
381        task_coordinator: &TaskCoordinator<Configuring>,
382        input_events: Vec<TestEvent>,
383        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
384        log_namespace: LogNamespace,
385    ) -> vector_lib::Result<()> {
386        match self.definition {
387            ResourceDefinition::Http(http_config) => {
388                http_config.spawn_as_output(HttpResourceOutputContext {
389                    direction: self.direction,
390                    codec: self.codec,
391                    output_tx,
392                    task_coordinator,
393                    input_events,
394                    runner_metrics,
395                    log_namespace,
396                })
397            }
398        }
399    }
400}