vector/transforms/window/
config.rs

1use vector_lib::config::{clone_input_definitions, LogNamespace};
2use vector_lib::configurable::configurable_component;
3
4use crate::{
5    conditions::AnyCondition,
6    config::{
7        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
8        TransformOutput,
9    },
10    schema,
11    transforms::Transform,
12};
13
14use super::transform::Window;
15
16/// Configuration for the `window` transform.
17#[configurable_component(transform(
18    "window",
19    "Apply a buffered sliding window over the stream of events and flush it based on supplied criteria"
20))]
21#[derive(Clone, Debug)]
22#[serde(deny_unknown_fields)]
23pub struct WindowConfig {
24    /// A condition used to pass events through the transform without buffering.
25    ///
26    /// If the condition resolves to `true` for an event, the event is immediately forwarded without
27    /// buffering and without preserving the original order of events. Use with caution if the sink
28    /// cannot handle out of order events.
29    pub forward_when: Option<AnyCondition>,
30
31    /// A condition used to flush the events.
32    ///
33    /// If the condition resolves to `true` for an event, the whole window is immediately flushed,
34    /// including the event itself, and any following events if `num_events_after` is more than zero.
35    pub flush_when: AnyCondition,
36
37    /// The maximum number of events to keep before the event matched by the `flush_when` condition.
38    #[serde(default = "default_events_before")]
39    pub num_events_before: usize,
40
41    /// The maximum number of events to keep after the event matched by the `flush_when` condition.
42    #[serde(default = "default_events_after")]
43    pub num_events_after: usize,
44}
45
46impl GenerateConfig for WindowConfig {
47    fn generate_config() -> toml::Value {
48        toml::from_str(r#"flush_when = ".message == \"value\"""#).unwrap()
49    }
50}
51
52const fn default_events_before() -> usize {
53    100
54}
55
56const fn default_events_after() -> usize {
57    0
58}
59
60#[async_trait::async_trait]
61#[typetag::serde(name = "window")]
62impl TransformConfig for WindowConfig {
63    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
64        Ok(Transform::function(
65            Window::new(
66                self.forward_when
67                    .as_ref()
68                    .map(|condition| condition.build(&context.enrichment_tables))
69                    .transpose()?,
70                self.flush_when.build(&context.enrichment_tables)?,
71                self.num_events_before,
72                self.num_events_after,
73            )
74            .unwrap(),
75        ))
76    }
77
78    fn input(&self) -> Input {
79        Input::new(DataType::Log)
80    }
81
82    fn outputs(
83        &self,
84        _: vector_lib::enrichment::TableRegistry,
85        input_definitions: &[(OutputId, schema::Definition)],
86        _: LogNamespace,
87    ) -> Vec<TransformOutput> {
88        // The event is not modified, so the definition is passed through as-is
89        vec![TransformOutput::new(
90            DataType::Log,
91            clone_input_definitions(input_definitions),
92        )]
93    }
94}