1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
mod event;
mod http;

use std::sync::Arc;

use tokio::sync::{mpsc, Mutex};
use vector_lib::{
    codecs::{
        decoding::{self, DeserializerConfig},
        encoding::{
            self, Framer, FramingConfig, JsonSerializerConfig, SerializerConfig,
            TextSerializerConfig,
        },
        BytesEncoder,
    },
    config::LogNamespace,
};
use vector_lib::{config::DataType, event::Event};

use crate::codecs::{Decoder, DecodingConfig, Encoder, EncodingConfig, EncodingConfigWithFraming};

pub use self::event::{encode_test_event, TestEvent};
pub use self::http::HttpResourceConfig;
use self::http::HttpResourceOutputContext;

use super::{
    sync::{Configuring, TaskCoordinator},
    RunnerMetrics,
};

/// The codec used by the external resource.
///
/// This enum specifically exists to encapsulate the two main ways a component will configure the
/// codec it uses, which ends up being with directionally-specific codec configuration: "encoding"
/// when taking an `Event` and convert it to a raw output, and "decoding" when taking a raw output
/// and converting it to an `Event`.
///
/// Encoding and decoding is generally tied to sinks and sources, respectively.
#[derive(Clone)]
pub enum ResourceCodec {
    /// Component encodes events.
    ///
    /// As opposed to `EncodingWithFramer`, this variant uses the default framing method defined by
    /// the encoding itself.
    ///
    /// Generally speaking, only sinks encode: going from `Event` to an encoded form.
    Encoding(EncodingConfig),

    /// Component encodes events, with a specific framer.
    ///
    /// Generally speaking, only sinks encode: going from `Event` to an encoded form.
    EncodingWithFraming(EncodingConfigWithFraming),

    /// Component decodes events.
    ///
    /// Generally speaking, only sources decode: going from an encoded form to `Event`.
    Decoding(DecodingConfig),
}

impl ResourceCodec {
    /// Gets the allowed event data types for the configured codec.
    ///
    /// Not all codecs support all possible event types (i.e. a codec has no means to losslessly
    /// represent the data in a particular event type) so we must check at runtime to ensure that
    /// we're only generating event payloads that can be encoded/decoded for the given component.
    pub fn allowed_event_data_types(self) -> DataType {
        match self {
            Self::Encoding(encoding) => encoding.config().input_type(),
            Self::EncodingWithFraming(encoding) => encoding.config().1.input_type(),
            Self::Decoding(decoding) => decoding.config().output_type(),
        }
    }

    /// Gets an encoder for this codec.
    ///
    /// The encoder is generated as an inverse to the input codec: if a decoding configuration was
    /// given, we generate an encoder that satisfies that decoding configuration, and vice versa.
    pub fn into_encoder(&self) -> Encoder<encoding::Framer> {
        let (framer, serializer) = match self {
            Self::Encoding(config) => (
                Framer::Bytes(BytesEncoder),
                config.build().expect("should not fail to build serializer"),
            ),
            Self::EncodingWithFraming(config) => {
                let (maybe_framing, serializer) = config.config();
                (
                    maybe_framing
                        .clone()
                        .unwrap_or(FramingConfig::Bytes)
                        .build(),
                    serializer
                        .build()
                        .expect("building serializer should never fail"),
                )
            }
            Self::Decoding(config) => (
                decoder_framing_to_encoding_framer(config.framing()),
                deserializer_config_to_serializer(config.config()),
            ),
        };

        Encoder::<encoding::Framer>::new(framer, serializer)
    }

    /// Gets a decoder for this codec.
    ///
    /// The decoder is generated as an inverse to the input codec: if an encoding configuration was
    /// given, we generate a decoder that satisfies that encoding configuration, and vice versa.
    pub fn into_decoder(&self, log_namespace: LogNamespace) -> vector_lib::Result<Decoder> {
        let (framer, deserializer) = match self {
            Self::Decoding(config) => return config.build(),
            Self::Encoding(config) => (
                encoder_framing_to_decoding_framer(config.config().default_stream_framing()),
                serializer_config_to_deserializer(config.config())?,
            ),
            Self::EncodingWithFraming(config) => {
                let (maybe_framing, serializer) = config.config();
                let framing = maybe_framing.clone().unwrap_or(FramingConfig::Bytes);
                (
                    encoder_framing_to_decoding_framer(framing),
                    serializer_config_to_deserializer(serializer)?,
                )
            }
        };

        Ok(Decoder::new(framer, deserializer).with_log_namespace(log_namespace))
    }
}

impl From<EncodingConfig> for ResourceCodec {
    fn from(config: EncodingConfig) -> Self {
        Self::Encoding(config)
    }
}

impl From<EncodingConfigWithFraming> for ResourceCodec {
    fn from(config: EncodingConfigWithFraming) -> Self {
        Self::EncodingWithFraming(config)
    }
}

impl From<DecodingConfig> for ResourceCodec {
    fn from(config: DecodingConfig) -> Self {
        Self::Decoding(config)
    }
}

fn deserializer_config_to_serializer(config: &DeserializerConfig) -> encoding::Serializer {
    let serializer_config = match config {
        // TODO: This isn't necessarily a one-to-one conversion, at least not in the future when
        // "bytes" can be a top-level field and we aren't implicitly decoding everything into the
        // `message` field... but it's close enough for now.
        DeserializerConfig::Bytes => SerializerConfig::Text(TextSerializerConfig::default()),
        DeserializerConfig::Json { .. } => SerializerConfig::Json(JsonSerializerConfig::default()),
        DeserializerConfig::Protobuf(config) => {
            SerializerConfig::Protobuf(vector_lib::codecs::encoding::ProtobufSerializerConfig {
                protobuf: vector_lib::codecs::encoding::ProtobufSerializerOptions {
                    desc_file: config.protobuf.desc_file.clone(),
                    message_type: config.protobuf.message_type.clone(),
                },
            })
        }
        // TODO: We need to create an Avro serializer because, certainly, for any source decoding
        // the data as Avro, we can't possibly send anything else without the source just
        // immediately barfing.
        #[cfg(feature = "codecs-syslog")]
        DeserializerConfig::Syslog { .. } => SerializerConfig::Logfmt,
        DeserializerConfig::Native => SerializerConfig::Native,
        DeserializerConfig::NativeJson { .. } => SerializerConfig::NativeJson,
        DeserializerConfig::Gelf { .. } => SerializerConfig::Gelf,
        DeserializerConfig::Avro { avro } => SerializerConfig::Avro { avro: avro.into() },
        // TODO: Influxdb has no serializer yet
        DeserializerConfig::Influxdb { .. } => todo!(),
        DeserializerConfig::Vrl { .. } => unimplemented!(),
    };

    serializer_config
        .build()
        .expect("building serializer should never fail")
}

fn decoder_framing_to_encoding_framer(framing: &decoding::FramingConfig) -> encoding::Framer {
    let framing_config = match framing {
        decoding::FramingConfig::Bytes => encoding::FramingConfig::Bytes,
        decoding::FramingConfig::CharacterDelimited(config) => {
            encoding::FramingConfig::CharacterDelimited(encoding::CharacterDelimitedEncoderConfig {
                character_delimited: encoding::CharacterDelimitedEncoderOptions {
                    delimiter: config.character_delimited.delimiter,
                },
            })
        }
        decoding::FramingConfig::LengthDelimited(config) => {
            encoding::FramingConfig::LengthDelimited(encoding::LengthDelimitedEncoderConfig {
                length_delimited: config.length_delimited.clone(),
            })
        }
        decoding::FramingConfig::NewlineDelimited(_) => encoding::FramingConfig::NewlineDelimited,
        // TODO: There's no equivalent octet counting framer for encoding... although
        // there's no particular reason that would make it hard to write.
        decoding::FramingConfig::OctetCounting(_) => todo!(),
    };

    framing_config.build()
}

fn serializer_config_to_deserializer(
    config: &SerializerConfig,
) -> vector_lib::Result<decoding::Deserializer> {
    let deserializer_config = match config {
        SerializerConfig::Avro { .. } => todo!(),
        SerializerConfig::Csv { .. } => todo!(),
        SerializerConfig::Gelf => DeserializerConfig::Gelf(Default::default()),
        SerializerConfig::Json(_) => DeserializerConfig::Json(Default::default()),
        SerializerConfig::Logfmt => todo!(),
        SerializerConfig::Native => DeserializerConfig::Native,
        SerializerConfig::NativeJson => DeserializerConfig::NativeJson(Default::default()),
        SerializerConfig::Protobuf(config) => {
            DeserializerConfig::Protobuf(vector_lib::codecs::decoding::ProtobufDeserializerConfig {
                protobuf: vector_lib::codecs::decoding::ProtobufDeserializerOptions {
                    desc_file: config.protobuf.desc_file.clone(),
                    message_type: config.protobuf.message_type.clone(),
                },
            })
        }
        SerializerConfig::RawMessage | SerializerConfig::Text(_) => DeserializerConfig::Bytes,
    };

    deserializer_config.build()
}

fn encoder_framing_to_decoding_framer(framing: encoding::FramingConfig) -> decoding::Framer {
    let framing_config = match framing {
        encoding::FramingConfig::Bytes => decoding::FramingConfig::Bytes,
        encoding::FramingConfig::CharacterDelimited(config) => {
            decoding::FramingConfig::CharacterDelimited(decoding::CharacterDelimitedDecoderConfig {
                character_delimited: decoding::CharacterDelimitedDecoderOptions {
                    delimiter: config.character_delimited.delimiter,
                    max_length: None,
                },
            })
        }
        encoding::FramingConfig::LengthDelimited(config) => {
            decoding::FramingConfig::LengthDelimited(decoding::LengthDelimitedDecoderConfig {
                length_delimited: config.length_delimited.clone(),
            })
        }
        encoding::FramingConfig::NewlineDelimited => {
            decoding::FramingConfig::NewlineDelimited(Default::default())
        }
    };

    framing_config.build()
}

/// Direction that the resource is operating in.
#[derive(Clone)]
pub enum ResourceDirection {
    /// Resource will have the component pull data from it, or pull data from the component.
    ///
    /// For a source, where an external resource functions in "input" mode, this would be the
    /// equivalent of the source calling out to the external resource (HTTP server, Kafka cluster,
    /// etc) and asking for data, or expecting it to be returned in the response.
    ///
    /// For a sink, where an external resource functions in "output" mode, this would be the
    /// equivalent of the sink exposing a network endpoint and having the external resource be
    /// responsible for connecting to the endpoint to grab the data.
    Pull,

    /// Resource will push data to the component, or have data pushed to it from the component.
    ///
    /// For a source, where an external resource functions in "input" mode, this would be the
    /// equivalent of the source waiting for data to be sent to either, whether it's listening on a
    /// network endpoint for traffic, or polling files on disks for updates, and the external
    /// resource would be responsible for initiating that communication, or writing to those files.
    ///
    /// For a sink, where an external resource functions in "output" mode, this would be the
    /// equivalent of the sink pushing its data to a network endpoint, or writing data to files,
    /// where the external resource would be responsible for aggregating that data, or read from
    /// those files.
    Push,
}

/// A resource definition.
///
/// Resource definitions uniquely identify the resource, such as HTTP, or files, and so on. These
/// definitions generally include the bare minimum amount of information to allow the component
/// validation runner to create an instance of them, such as spawning an HTTP server if a source has
/// specified an HTTP resource in the "pull" direction.
#[derive(Clone)]
pub enum ResourceDefinition {
    Http(HttpResourceConfig),
}

impl From<HttpResourceConfig> for ResourceDefinition {
    fn from(config: HttpResourceConfig) -> Self {
        Self::Http(config)
    }
}

/// An external resource associated with a component.
///
/// External resources represent the hypothetical location where, depending on whether the component
/// is a source or sink, data would be generated from or collected at. This includes things like
/// network endpoints (raw sockets, HTTP servers, etc) as well as files on disk, and more. In other
/// words, an external resource is a data dependency associated with the component, whether the
/// component depends on data from the external resource, or the external resource depends on data
/// from the component.
///
/// An external resource includes a direction -- push or pull -- as well as the fundamental
/// definition of the resource, such as HTTP or file. The component type is used to further refine
/// the direction of the resource, such that a "pull" resource used with a source implies the source
/// will pull data from the external resource, whereas a "pull" resource used with a sink implies
/// the external resource must pull the data from the sink.
#[derive(Clone)]
pub struct ExternalResource {
    pub direction: ResourceDirection,
    definition: ResourceDefinition,
    pub codec: ResourceCodec,
}

impl ExternalResource {
    /// Creates a new `ExternalResource` based on the given `direction`, `definition`, and `codec`.
    pub fn new<D, C>(direction: ResourceDirection, definition: D, codec: C) -> Self
    where
        D: Into<ResourceDefinition>,
        C: Into<ResourceCodec>,
    {
        Self {
            direction,
            definition: definition.into(),
            codec: codec.into(),
        }
    }

    /// Spawns this resource for use as an input to a source.
    pub fn spawn_as_input(
        self,
        input_rx: mpsc::Receiver<TestEvent>,
        task_coordinator: &TaskCoordinator<Configuring>,
        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
    ) {
        match self.definition {
            ResourceDefinition::Http(http_config) => http_config.spawn_as_input(
                self.direction,
                self.codec,
                input_rx,
                task_coordinator,
                runner_metrics,
            ),
        }
    }

    /// Spawns this resource for use as an output for a sink.
    pub fn spawn_as_output(
        self,
        output_tx: mpsc::Sender<Vec<Event>>,
        task_coordinator: &TaskCoordinator<Configuring>,
        input_events: Vec<TestEvent>,
        runner_metrics: &Arc<Mutex<RunnerMetrics>>,
        log_namespace: LogNamespace,
    ) -> vector_lib::Result<()> {
        match self.definition {
            ResourceDefinition::Http(http_config) => {
                http_config.spawn_as_output(HttpResourceOutputContext {
                    direction: self.direction,
                    codec: self.codec,
                    output_tx,
                    task_coordinator,
                    input_events,
                    runner_metrics,
                    log_namespace,
                })
            }
        }
    }
}