vector/config/
transform.rs

1use std::{
2    cell::RefCell,
3    collections::{HashMap, HashSet},
4    path::PathBuf,
5};
6
7use async_trait::async_trait;
8use dyn_clone::DynClone;
9use serde::Serialize;
10use vector_lib::{
11    config::{GlobalOptions, Input, LogNamespace, TransformOutput},
12    configurable::{
13        Configurable, GenerateError, Metadata, NamedComponent,
14        attributes::CustomAttribute,
15        configurable_component,
16        schema::{SchemaGenerator, SchemaObject},
17    },
18    id::Inputs,
19    schema,
20    transform::Transform,
21};
22
23use super::{ComponentKey, OutputId, dot_graph::GraphConfig, schema::Options as SchemaOptions};
24use crate::extra_context::ExtraContext;
25
26pub type BoxedTransform = Box<dyn TransformConfig>;
27
28impl Configurable for BoxedTransform {
29    fn referenceable_name() -> Option<&'static str> {
30        Some("vector::transforms::Transforms")
31    }
32
33    fn metadata() -> Metadata {
34        let mut metadata = Metadata::default();
35        metadata.set_description("Configurable transforms in Vector.");
36        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
37        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
38        metadata
39    }
40
41    fn generate_schema(
42        generator: &RefCell<SchemaGenerator>,
43    ) -> Result<SchemaObject, GenerateError> {
44        vector_lib::configurable::component::TransformDescription::generate_schemas(generator)
45    }
46}
47
48impl<T: TransformConfig + 'static> From<T> for BoxedTransform {
49    fn from(that: T) -> Self {
50        Box::new(that)
51    }
52}
53
54/// Fully resolved transform component.
55#[configurable_component]
56#[configurable(metadata(docs::component_base_type = "transform"))]
57#[derive(Clone, Debug)]
58pub struct TransformOuter<T>
59where
60    T: Configurable + Serialize + 'static,
61{
62    #[configurable(derived)]
63    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
64    pub graph: GraphConfig,
65
66    #[configurable(derived)]
67    pub inputs: Inputs<T>,
68
69    #[configurable(metadata(docs::hidden))]
70    #[serde(flatten)]
71    pub inner: BoxedTransform,
72}
73
74impl<T> TransformOuter<T>
75where
76    T: Configurable + Serialize,
77{
78    pub(crate) fn new<I, IT>(inputs: I, inner: IT) -> Self
79    where
80        I: IntoIterator<Item = T>,
81        IT: Into<BoxedTransform>,
82    {
83        let inputs = Inputs::from_iter(inputs);
84        let inner = inner.into();
85        TransformOuter {
86            inputs,
87            inner,
88            graph: Default::default(),
89        }
90    }
91
92    pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> TransformOuter<U>
93    where
94        U: Configurable + Serialize,
95    {
96        let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
97        self.with_inputs(inputs)
98    }
99
100    pub(crate) fn with_inputs<I, U>(self, inputs: I) -> TransformOuter<U>
101    where
102        I: IntoIterator<Item = U>,
103        U: Configurable + Serialize,
104    {
105        TransformOuter {
106            inputs: Inputs::from_iter(inputs),
107            inner: self.inner,
108            graph: self.graph,
109        }
110    }
111}
112
113pub struct TransformContext {
114    // This is optional because currently there are a lot of places we use `TransformContext` that
115    // may not have the relevant data available (e.g. tests). In the future it'd be nice to make it
116    // required somehow.
117    pub key: Option<ComponentKey>,
118
119    pub globals: GlobalOptions,
120
121    pub enrichment_tables: vector_lib::enrichment::TableRegistry,
122
123    /// Tracks the schema IDs assigned to schemas exposed by the transform.
124    ///
125    /// Given a transform can expose multiple [`TransformOutput`] channels, the ID is tied to the identifier of
126    /// that `TransformOutput`.
127    pub schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
128
129    /// The schema definition created by merging all inputs of the transform.
130    ///
131    /// This information can be used by transforms that behave differently based on schema
132    /// information, such as the `remap` transform, which passes this information along to the VRL
133    /// compiler such that type coercion becomes less of a need for operators writing VRL programs.
134    pub merged_schema_definition: schema::Definition,
135
136    pub schema: SchemaOptions,
137
138    /// Extra context data provided by the running app and shared across all components. This can be
139    /// used to pass shared settings or other data from outside the components.
140    pub extra_context: ExtraContext,
141}
142
143impl Default for TransformContext {
144    fn default() -> Self {
145        Self {
146            key: Default::default(),
147            globals: Default::default(),
148            enrichment_tables: Default::default(),
149            schema_definitions: HashMap::from([(None, HashMap::new())]),
150            merged_schema_definition: schema::Definition::any(),
151            schema: SchemaOptions::default(),
152            extra_context: Default::default(),
153        }
154    }
155}
156
157impl TransformContext {
158    // clippy allow avoids an issue where vrl is flagged off and `globals` is
159    // the sole field in the struct
160    #[allow(clippy::needless_update)]
161    pub fn new_with_globals(globals: GlobalOptions) -> Self {
162        Self {
163            globals,
164            ..Default::default()
165        }
166    }
167
168    #[cfg(test)]
169    pub fn new_test(
170        schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
171    ) -> Self {
172        Self {
173            schema_definitions,
174            ..Default::default()
175        }
176    }
177
178    /// Gets the log namespacing to use. The passed in value is from the transform itself
179    /// and will override any global default if it's set.
180    ///
181    /// This should only be used for transforms that don't originate from a log (eg: `metric_to_log`)
182    /// Most transforms will keep the log_namespace value that already exists on the event.
183    pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
184        namespace
185            .or(self.schema.log_namespace)
186            .unwrap_or(false)
187            .into()
188    }
189}
190
191/// Generalized interface for describing and building transform components.
192#[async_trait]
193#[typetag::serde(tag = "type")]
194pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
195    /// Builds the transform with the given context.
196    ///
197    /// If the transform is built successfully, `Ok(...)` is returned containing the transform.
198    ///
199    /// # Errors
200    ///
201    /// If an error occurs while building the transform, an error variant explaining the issue is
202    /// returned.
203    async fn build(&self, globals: &TransformContext) -> crate::Result<Transform>;
204
205    /// Gets the input configuration for this transform.
206    fn input(&self) -> Input;
207
208    /// Gets the list of outputs exposed by this transform.
209    ///
210    /// The provided `merged_definition` can be used by transforms to understand the expected shape
211    /// of events flowing through the transform.
212    fn outputs(
213        &self,
214        enrichment_tables: vector_lib::enrichment::TableRegistry,
215        input_definitions: &[(OutputId, schema::Definition)],
216
217        // This only exists for transforms that create logs from non-logs, to know which namespace
218        // to use, such as `metric_to_log`
219        global_log_namespace: LogNamespace,
220    ) -> Vec<TransformOutput>;
221
222    /// Validates that the configuration of the transform is valid.
223    ///
224    /// This would generally be where logical conditions were checked, such as ensuring a transform
225    /// isn't using a named output that matches a reserved output name, and so on.
226    ///
227    /// # Errors
228    ///
229    /// If validation does not succeed, an error variant containing a list of all validation errors
230    /// is returned.
231    fn validate(&self, _merged_definition: &schema::Definition) -> Result<(), Vec<String>> {
232        Ok(())
233    }
234
235    /// Whether or not concurrency should be enabled for this transform.
236    ///
237    /// When enabled, this transform may be run in parallel in order to attempt to maximize
238    /// throughput for this node in the topology. Transforms should generally not run concurrently
239    /// unless they are compute-heavy, as there is a cost/overhead associated with fanning out
240    /// events to the parallel transform tasks.
241    fn enable_concurrency(&self) -> bool {
242        false
243    }
244
245    /// Whether or not this transform can be nested, given the types of transforms it would be
246    /// nested within.
247    ///
248    /// For some transforms, they can expand themselves into a subtopology of nested transforms.
249    /// However, in order to prevent an infinite recursion of nested transforms, we may want to only
250    /// allow one layer of "expansion". Additionally, there may be known issues with a transform
251    /// that is nested under another specific transform interacting poorly, or incorrectly.
252    ///
253    /// This method allows a transform to report if it can or cannot function correctly if it is
254    /// nested under transforms of a specific type, or if such nesting is fundamentally disallowed.
255    fn nestable(&self, _parents: &HashSet<&'static str>) -> bool {
256        true
257    }
258
259    /// Gets the files to watch to trigger reload
260    fn files_to_watch(&self) -> Vec<&PathBuf> {
261        Vec::new()
262    }
263}
264
265dyn_clone::clone_trait_object!(TransformConfig);
266
267/// Often we want to call outputs just to retrieve the OutputId's without needing
268/// the schema definitions.
269pub fn get_transform_output_ids<T: TransformConfig + ?Sized>(
270    transform: &T,
271    key: ComponentKey,
272    global_log_namespace: LogNamespace,
273) -> impl Iterator<Item = OutputId> + '_ {
274    transform
275        .outputs(
276            vector_lib::enrichment::TableRegistry::default(),
277            &[(key.clone().into(), schema::Definition::any())],
278            global_log_namespace,
279        )
280        .into_iter()
281        .map(move |output| OutputId {
282            component: key.clone(),
283            port: output.port,
284        })
285}