vector/config/
transform.rs

1use std::cell::RefCell;
2use std::collections::{HashMap, HashSet};
3use std::path::PathBuf;
4
5use async_trait::async_trait;
6use dyn_clone::DynClone;
7use serde::Serialize;
8use vector_lib::configurable::attributes::CustomAttribute;
9use vector_lib::configurable::{
10    configurable_component,
11    schema::{SchemaGenerator, SchemaObject},
12    Configurable, GenerateError, Metadata, NamedComponent,
13};
14use vector_lib::{
15    config::{GlobalOptions, Input, LogNamespace, TransformOutput},
16    id::Inputs,
17    schema,
18    transform::Transform,
19};
20
21use super::dot_graph::GraphConfig;
22use super::schema::Options as SchemaOptions;
23use super::ComponentKey;
24use super::OutputId;
25use crate::extra_context::ExtraContext;
26
27pub type BoxedTransform = Box<dyn TransformConfig>;
28
29impl Configurable for BoxedTransform {
30    fn referenceable_name() -> Option<&'static str> {
31        Some("vector::transforms::Transforms")
32    }
33
34    fn metadata() -> Metadata {
35        let mut metadata = Metadata::default();
36        metadata.set_description("Configurable transforms in Vector.");
37        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tagging", "internal"));
38        metadata.add_custom_attribute(CustomAttribute::kv("docs::enum_tag_field", "type"));
39        metadata
40    }
41
42    fn generate_schema(
43        generator: &RefCell<SchemaGenerator>,
44    ) -> Result<SchemaObject, GenerateError> {
45        vector_lib::configurable::component::TransformDescription::generate_schemas(generator)
46    }
47}
48
49impl<T: TransformConfig + 'static> From<T> for BoxedTransform {
50    fn from(that: T) -> Self {
51        Box::new(that)
52    }
53}
54
55/// Fully resolved transform component.
56#[configurable_component]
57#[configurable(metadata(docs::component_base_type = "transform"))]
58#[derive(Clone, Debug)]
59pub struct TransformOuter<T>
60where
61    T: Configurable + Serialize + 'static,
62{
63    #[configurable(derived)]
64    #[serde(default, skip_serializing_if = "vector_lib::serde::is_default")]
65    pub graph: GraphConfig,
66
67    #[configurable(derived)]
68    pub inputs: Inputs<T>,
69
70    #[configurable(metadata(docs::hidden))]
71    #[serde(flatten)]
72    pub inner: BoxedTransform,
73}
74
75impl<T> TransformOuter<T>
76where
77    T: Configurable + Serialize,
78{
79    pub(crate) fn new<I, IT>(inputs: I, inner: IT) -> Self
80    where
81        I: IntoIterator<Item = T>,
82        IT: Into<BoxedTransform>,
83    {
84        let inputs = Inputs::from_iter(inputs);
85        let inner = inner.into();
86        TransformOuter {
87            inputs,
88            inner,
89            graph: Default::default(),
90        }
91    }
92
93    pub(super) fn map_inputs<U>(self, f: impl Fn(&T) -> U) -> TransformOuter<U>
94    where
95        U: Configurable + Serialize,
96    {
97        let inputs = self.inputs.iter().map(f).collect::<Vec<_>>();
98        self.with_inputs(inputs)
99    }
100
101    pub(crate) fn with_inputs<I, U>(self, inputs: I) -> TransformOuter<U>
102    where
103        I: IntoIterator<Item = U>,
104        U: Configurable + Serialize,
105    {
106        TransformOuter {
107            inputs: Inputs::from_iter(inputs),
108            inner: self.inner,
109            graph: self.graph,
110        }
111    }
112}
113
114pub struct TransformContext {
115    // This is optional because currently there are a lot of places we use `TransformContext` that
116    // may not have the relevant data available (e.g. tests). In the future it'd be nice to make it
117    // required somehow.
118    pub key: Option<ComponentKey>,
119
120    pub globals: GlobalOptions,
121
122    pub enrichment_tables: vector_lib::enrichment::TableRegistry,
123
124    /// Tracks the schema IDs assigned to schemas exposed by the transform.
125    ///
126    /// Given a transform can expose multiple [`TransformOutput`] channels, the ID is tied to the identifier of
127    /// that `TransformOutput`.
128    pub schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
129
130    /// The schema definition created by merging all inputs of the transform.
131    ///
132    /// This information can be used by transforms that behave differently based on schema
133    /// information, such as the `remap` transform, which passes this information along to the VRL
134    /// compiler such that type coercion becomes less of a need for operators writing VRL programs.
135    pub merged_schema_definition: schema::Definition,
136
137    pub schema: SchemaOptions,
138
139    /// Extra context data provided by the running app and shared across all components. This can be
140    /// used to pass shared settings or other data from outside the components.
141    pub extra_context: ExtraContext,
142}
143
144impl Default for TransformContext {
145    fn default() -> Self {
146        Self {
147            key: Default::default(),
148            globals: Default::default(),
149            enrichment_tables: Default::default(),
150            schema_definitions: HashMap::from([(None, HashMap::new())]),
151            merged_schema_definition: schema::Definition::any(),
152            schema: SchemaOptions::default(),
153            extra_context: Default::default(),
154        }
155    }
156}
157
158impl TransformContext {
159    // clippy allow avoids an issue where vrl is flagged off and `globals` is
160    // the sole field in the struct
161    #[allow(clippy::needless_update)]
162    pub fn new_with_globals(globals: GlobalOptions) -> Self {
163        Self {
164            globals,
165            ..Default::default()
166        }
167    }
168
169    #[cfg(test)]
170    pub fn new_test(
171        schema_definitions: HashMap<Option<String>, HashMap<OutputId, schema::Definition>>,
172    ) -> Self {
173        Self {
174            schema_definitions,
175            ..Default::default()
176        }
177    }
178
179    /// Gets the log namespacing to use. The passed in value is from the transform itself
180    /// and will override any global default if it's set.
181    ///
182    /// This should only be used for transforms that don't originate from a log (eg: `metric_to_log`)
183    /// Most transforms will keep the log_namespace value that already exists on the event.
184    pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
185        namespace
186            .or(self.schema.log_namespace)
187            .unwrap_or(false)
188            .into()
189    }
190}
191
192/// Generalized interface for describing and building transform components.
193#[async_trait]
194#[typetag::serde(tag = "type")]
195pub trait TransformConfig: DynClone + NamedComponent + core::fmt::Debug + Send + Sync {
196    /// Builds the transform with the given context.
197    ///
198    /// If the transform is built successfully, `Ok(...)` is returned containing the transform.
199    ///
200    /// # Errors
201    ///
202    /// If an error occurs while building the transform, an error variant explaining the issue is
203    /// returned.
204    async fn build(&self, globals: &TransformContext) -> crate::Result<Transform>;
205
206    /// Gets the input configuration for this transform.
207    fn input(&self) -> Input;
208
209    /// Gets the list of outputs exposed by this transform.
210    ///
211    /// The provided `merged_definition` can be used by transforms to understand the expected shape
212    /// of events flowing through the transform.
213    fn outputs(
214        &self,
215        enrichment_tables: vector_lib::enrichment::TableRegistry,
216        input_definitions: &[(OutputId, schema::Definition)],
217
218        // This only exists for transforms that create logs from non-logs, to know which namespace
219        // to use, such as `metric_to_log`
220        global_log_namespace: LogNamespace,
221    ) -> Vec<TransformOutput>;
222
223    /// Validates that the configuration of the transform is valid.
224    ///
225    /// This would generally be where logical conditions were checked, such as ensuring a transform
226    /// isn't using a named output that matches a reserved output name, and so on.
227    ///
228    /// # Errors
229    ///
230    /// If validation does not succeed, an error variant containing a list of all validation errors
231    /// is returned.
232    fn validate(&self, _merged_definition: &schema::Definition) -> Result<(), Vec<String>> {
233        Ok(())
234    }
235
236    /// Whether or not concurrency should be enabled for this transform.
237    ///
238    /// When enabled, this transform may be run in parallel in order to attempt to maximize
239    /// throughput for this node in the topology. Transforms should generally not run concurrently
240    /// unless they are compute-heavy, as there is a cost/overhead associated with fanning out
241    /// events to the parallel transform tasks.
242    fn enable_concurrency(&self) -> bool {
243        false
244    }
245
246    /// Whether or not this transform can be nested, given the types of transforms it would be
247    /// nested within.
248    ///
249    /// For some transforms, they can expand themselves into a subtopology of nested transforms.
250    /// However, in order to prevent an infinite recursion of nested transforms, we may want to only
251    /// allow one layer of "expansion". Additionally, there may be known issues with a transform
252    /// that is nested under another specific transform interacting poorly, or incorrectly.
253    ///
254    /// This method allows a transform to report if it can or cannot function correctly if it is
255    /// nested under transforms of a specific type, or if such nesting is fundamentally disallowed.
256    fn nestable(&self, _parents: &HashSet<&'static str>) -> bool {
257        true
258    }
259
260    /// Gets the files to watch to trigger reload
261    fn files_to_watch(&self) -> Vec<&PathBuf> {
262        Vec::new()
263    }
264}
265
266dyn_clone::clone_trait_object!(TransformConfig);
267
268/// Often we want to call outputs just to retrieve the OutputId's without needing
269/// the schema definitions.
270pub fn get_transform_output_ids<T: TransformConfig + ?Sized>(
271    transform: &T,
272    key: ComponentKey,
273    global_log_namespace: LogNamespace,
274) -> impl Iterator<Item = OutputId> + '_ {
275    transform
276        .outputs(
277            vector_lib::enrichment::TableRegistry::default(),
278            &[(key.clone().into(), schema::Definition::any())],
279            global_log_namespace,
280        )
281        .into_iter()
282        .map(move |output| OutputId {
283            component: key.clone(),
284            port: output.port,
285        })
286}