vector/transforms/window/
config.rs

1use vector_lib::{
2    config::{LogNamespace, clone_input_definitions},
3    configurable::configurable_component,
4};
5
6use super::transform::Window;
7use crate::{
8    conditions::AnyCondition,
9    config::{
10        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
11        TransformOutput,
12    },
13    schema,
14    transforms::Transform,
15};
16
17/// Configuration for the `window` transform.
18#[configurable_component(transform(
19    "window",
20    "Apply a buffered sliding window over the stream of events and flush it based on supplied criteria"
21))]
22#[derive(Clone, Debug)]
23#[serde(deny_unknown_fields)]
24pub struct WindowConfig {
25    /// A condition used to pass events through the transform without buffering.
26    ///
27    /// If the condition resolves to `true` for an event, the event is immediately forwarded without
28    /// buffering and without preserving the original order of events. Use with caution if the sink
29    /// cannot handle out of order events.
30    pub forward_when: Option<AnyCondition>,
31
32    /// A condition used to flush the events.
33    ///
34    /// If the condition resolves to `true` for an event, the whole window is immediately flushed,
35    /// including the event itself, and any following events if `num_events_after` is more than zero.
36    pub flush_when: AnyCondition,
37
38    /// The maximum number of events to keep before the event matched by the `flush_when` condition.
39    #[serde(default = "default_events_before")]
40    pub num_events_before: usize,
41
42    /// The maximum number of events to keep after the event matched by the `flush_when` condition.
43    #[serde(default = "default_events_after")]
44    pub num_events_after: usize,
45}
46
47impl GenerateConfig for WindowConfig {
48    fn generate_config() -> toml::Value {
49        toml::from_str(r#"flush_when = ".message == \"value\"""#).unwrap()
50    }
51}
52
53const fn default_events_before() -> usize {
54    100
55}
56
57const fn default_events_after() -> usize {
58    0
59}
60
61#[async_trait::async_trait]
62#[typetag::serde(name = "window")]
63impl TransformConfig for WindowConfig {
64    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
65        Ok(Transform::function(
66            Window::new(
67                self.forward_when
68                    .as_ref()
69                    .map(|condition| condition.build(&context.enrichment_tables))
70                    .transpose()?,
71                self.flush_when.build(&context.enrichment_tables)?,
72                self.num_events_before,
73                self.num_events_after,
74            )
75            .unwrap(),
76        ))
77    }
78
79    fn input(&self) -> Input {
80        Input::new(DataType::Log)
81    }
82
83    fn outputs(
84        &self,
85        _: vector_lib::enrichment::TableRegistry,
86        input_definitions: &[(OutputId, schema::Definition)],
87        _: LogNamespace,
88    ) -> Vec<TransformOutput> {
89        // The event is not modified, so the definition is passed through as-is
90        vec![TransformOutput::new(
91            DataType::Log,
92            clone_input_definitions(input_definitions),
93        )]
94    }
95}