vector/config/
transform.rs

1use std::{
2    cell::RefCell,
3    collections::{HashMap, HashSet},
4    path::PathBuf,
5};
6
7use async_trait::async_trait;
8use dyn_clone::DynClone;
9use serde::Serialize;
10use vector_lib::{
11    config::{GlobalOptions, Input, LogNamespace, TransformOutput},
12    configurable::{
13        Configurable, GenerateError, Metadata, NamedComponent,
14        attributes::CustomAttribute,
15        configurable_component,
16        schema::{SchemaGenerator, SchemaObject},
17    },
18    id::Inputs,
19    schema,
20    transform::Transform,
21};
22use vector_vrl_metrics::MetricsStorage;
23
24use super::{ComponentKey, OutputId, dot_graph::GraphConfig, schema::Options as SchemaOptions};
25use crate::extra_context::ExtraContext;
26
27pub type BoxedTransform = Box<dyn TransformConfig>;
28
29impl Configurable for BoxedTransform {
30    fn referenceable_name() -> Option<&'static str> {
31        Some("vector::transforms::Transforms")
32    }
33
34    fn metadata() -> Metadata {
35        let mut metadata = Metadata::default();
36        metadata.set_description("Configurable transforms in Vector.");
37        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
38        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
39        metadata
40    }
41
42    fn generate_schema(
43        generator: &RefCell<SchemaGenerator>,
44    ) -> Result<SchemaObject, GenerateError> {
45        vector_lib::configurable::component::TransformDescription::generate_schemas(generator)
46    }
47}
48
49impl<T: TransformConfig + 'static> From<T> for BoxedTransform {
50    fn from(that: T) -> Self {
51        Box::new(that)
52    }
53}
54
55/// Fully resolved transform component.
56#[configurable_component]
57#[configurable(metadata(docs::component_base_type = "transform"))]
58#[derive(Clone, Debug)]
59pub struct TransformOuter<T>
60where
61    T: Configurable + Serialize + 'static,
62{
63    #[configurable(derived)]
64    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
65    pub graph: GraphConfig,
66
67    #[configurable(derived)]
68    pub inputs: Inputs<T>,
69
70    #[configurable(metadata(docs::hidden))]
71    #[serde(flatten)]
72    pub inner: BoxedTransform,
73}
74
75impl<T> TransformOuter<T>
76where
77    T: Configurable + Serialize,
78{
79    pub(crate) fn new<I, IT>(inputs: I, inner: IT) -> Self
80    where
81        I: IntoIterator<Item = T>,
82        IT: Into<BoxedTransform>,
83    {
84        let inputs = Inputs::from_iter(inputs);
85        let inner = inner.into();
86        TransformOuter {
87            inputs,
88            inner,
89            graph: Default::default(),
90        }
91    }
92
93    pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> TransformOuter<U>
94    where
95        U: Configurable + Serialize,
96    {
97        let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
98        self.with_inputs(inputs)
99    }
100
101    pub(crate) fn with_inputs<I, U>(self, inputs: I) -> TransformOuter<U>
102    where
103        I: IntoIterator<Item = U>,
104        U: Configurable + Serialize,
105    {
106        TransformOuter {
107            inputs: Inputs::from_iter(inputs),
108            inner: self.inner,
109            graph: self.graph,
110        }
111    }
112}
113
114pub struct TransformContext {
115    // This is optional because currently there are a lot of places we use `TransformContext` that
116    // may not have the relevant data available (e.g. tests). In the future it'd be nice to make it
117    // required somehow.
118    pub key: Option<ComponentKey>,
119
120    pub globals: GlobalOptions,
121
122    pub enrichment_tables: vector_lib::enrichment::TableRegistry,
123
124    pub metrics_storage: MetricsStorage,
125
126    /// Tracks the schema IDs assigned to schemas exposed by the transform.
127    ///
128    /// Given a transform can expose multiple [`TransformOutput`] channels, the ID is tied to the identifier of
129    /// that `TransformOutput`.
130    pub schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
131
132    /// The schema definition created by merging all inputs of the transform.
133    ///
134    /// This information can be used by transforms that behave differently based on schema
135    /// information, such as the `remap` transform, which passes this information along to the VRL
136    /// compiler such that type coercion becomes less of a need for operators writing VRL programs.
137    pub merged_schema_definition: schema::Definition,
138
139    pub schema: SchemaOptions,
140
141    /// Extra context data provided by the running app and shared across all components. This can be
142    /// used to pass shared settings or other data from outside the components.
143    pub extra_context: ExtraContext,
144}
145
146impl Default for TransformContext {
147    fn default() -> Self {
148        Self {
149            key: Default::default(),
150            globals: Default::default(),
151            enrichment_tables: Default::default(),
152            metrics_storage: Default::default(),
153            schema_definitions: HashMap::from([(None, HashMap::new())]),
154            merged_schema_definition: schema::Definition::any(),
155            schema: SchemaOptions::default(),
156            extra_context: Default::default(),
157        }
158    }
159}
160
161impl TransformContext {
162    // clippy allow avoids an issue where vrl is flagged off and `globals` is
163    // the sole field in the struct
164    #[allow(clippy::needless_update)]
165    pub fn new_with_globals(globals: GlobalOptions) -> Self {
166        Self {
167            globals,
168            ..Default::default()
169        }
170    }
171
172    #[cfg(test)]
173    pub fn new_test(
174        schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
175    ) -> Self {
176        Self {
177            schema_definitions,
178            ..Default::default()
179        }
180    }
181
182    /// Gets the log namespacing to use. The passed in value is from the transform itself
183    /// and will override any global default if it's set.
184    ///
185    /// This should only be used for transforms that don't originate from a log (eg: `metric_to_log`)
186    /// Most transforms will keep the log_namespace value that already exists on the event.
187    pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
188        namespace
189            .or(self.schema.log_namespace)
190            .unwrap_or(false)
191            .into()
192    }
193}
194
195/// Generalized interface for describing and building transform components.
196#[async_trait]
197#[typetag::serde(tag = "type")]
198pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
199    /// Builds the transform with the given context.
200    ///
201    /// If the transform is built successfully, `Ok(...)` is returned containing the transform.
202    ///
203    /// # Errors
204    ///
205    /// If an error occurs while building the transform, an error variant explaining the issue is
206    /// returned.
207    async fn build(&self, globals: &TransformContext) -> crate::Result<Transform>;
208
209    /// Gets the input configuration for this transform.
210    fn input(&self) -> Input;
211
212    /// Gets the list of outputs exposed by this transform.
213    ///
214    /// The provided `merged_definition` can be used by transforms to understand the expected shape
215    /// of events flowing through the transform.
216    fn outputs(
217        &self,
218        globals: &TransformContext,
219        input_definitions: &[(OutputId, schema::Definition)],
220    ) -> Vec<TransformOutput>;
221
222    /// Validates that the configuration of the transform is valid.
223    ///
224    /// This would generally be where logical conditions were checked, such as ensuring a transform
225    /// isn't using a named output that matches a reserved output name, and so on.
226    ///
227    /// # Errors
228    ///
229    /// If validation does not succeed, an error variant containing a list of all validation errors
230    /// is returned.
231    fn validate(&self, _merged_definition: &schema::Definition) -> Result<(), Vec<String>> {
232        Ok(())
233    }
234
235    /// Whether or not concurrency should be enabled for this transform.
236    ///
237    /// When enabled, this transform may be run in parallel in order to attempt to maximize
238    /// throughput for this node in the topology. Transforms should generally not run concurrently
239    /// unless they are compute-heavy, as there is a cost/overhead associated with fanning out
240    /// events to the parallel transform tasks.
241    fn enable_concurrency(&self) -> bool {
242        false
243    }
244
245    /// Whether or not this transform can be nested, given the types of transforms it would be
246    /// nested within.
247    ///
248    /// For some transforms, they can expand themselves into a subtopology of nested transforms.
249    /// However, in order to prevent an infinite recursion of nested transforms, we may want to only
250    /// allow one layer of "expansion". Additionally, there may be known issues with a transform
251    /// that is nested under another specific transform interacting poorly, or incorrectly.
252    ///
253    /// This method allows a transform to report if it can or cannot function correctly if it is
254    /// nested under transforms of a specific type, or if such nesting is fundamentally disallowed.
255    fn nestable(&self, _parents: &HashSet<&'static str>) -> bool {
256        true
257    }
258
259    /// Gets the files to watch to trigger reload
260    fn files_to_watch(&self) -> Vec<&PathBuf> {
261        Vec::new()
262    }
263}
264
265dyn_clone::clone_trait_object!(TransformConfig);
266
267/// Often we want to call outputs just to retrieve the OutputId's without needing
268/// the schema definitions.
269pub fn get_transform_output_ids<T: TransformConfig + ?Sized>(
270    transform: &T,
271    key: ComponentKey,
272    global_log_namespace: LogNamespace,
273) -> impl Iterator<Item = OutputId> + '_ {
274    transform
275        .outputs(
276            &TransformContext {
277                schema: SchemaOptions {
278                    log_namespace: Some(global_log_namespace.into()),
279                    ..Default::default()
280                },
281                ..Default::default()
282            },
283            &[(key.clone().into(), schema::Definition::any())],
284        )
285        .into_iter()
286        .map(move |output| OutputId {
287            component: key.clone(),
288            port: output.port,
289        })
290}