vector/transforms/window/
config.rs

1use vector_lib::{config::clone_input_definitions, configurable::configurable_component};
2
3use super::transform::Window;
4use crate::{
5    conditions::AnyCondition,
6    config::{
7        DataType, GenerateConfig, Input, OutputId, TransformConfig, TransformContext,
8        TransformOutput,
9    },
10    schema,
11    transforms::Transform,
12};
13
14/// Configuration for the `window` transform.
15#[configurable_component(transform(
16    "window",
17    "Apply a buffered sliding window over the stream of events and flush it based on supplied criteria"
18))]
19#[derive(Clone, Debug)]
20#[serde(deny_unknown_fields)]
21pub struct WindowConfig {
22    /// A condition used to pass events through the transform without buffering.
23    ///
24    /// If the condition resolves to `true` for an event, the event is immediately forwarded without
25    /// buffering and without preserving the original order of events. Use with caution if the sink
26    /// cannot handle out of order events.
27    pub forward_when: Option<AnyCondition>,
28
29    /// A condition used to flush the events.
30    ///
31    /// If the condition resolves to `true` for an event, the whole window is immediately flushed,
32    /// including the event itself, and any following events if `num_events_after` is more than zero.
33    pub flush_when: AnyCondition,
34
35    /// The maximum number of events to keep before the event matched by the `flush_when` condition.
36    #[serde(default = "default_events_before")]
37    pub num_events_before: usize,
38
39    /// The maximum number of events to keep after the event matched by the `flush_when` condition.
40    #[serde(default = "default_events_after")]
41    pub num_events_after: usize,
42}
43
44impl GenerateConfig for WindowConfig {
45    fn generate_config() -> toml::Value {
46        toml::from_str(r#"flush_when = ".message == \"value\"""#).unwrap()
47    }
48}
49
50const fn default_events_before() -> usize {
51    100
52}
53
54const fn default_events_after() -> usize {
55    0
56}
57
58#[async_trait::async_trait]
59#[typetag::serde(name = "window")]
60impl TransformConfig for WindowConfig {
61    async fn build(&self, context: &TransformContext) -> crate::Result<Transform> {
62        Ok(Transform::function(
63            Window::new(
64                self.forward_when
65                    .as_ref()
66                    .map(|condition| {
67                        condition.build(&context.enrichment_tables, &context.metrics_storage)
68                    })
69                    .transpose()?,
70                self.flush_when
71                    .build(&context.enrichment_tables, &context.metrics_storage)?,
72                self.num_events_before,
73                self.num_events_after,
74            )
75            .unwrap(),
76        ))
77    }
78
79    fn input(&self) -> Input {
80        Input::new(DataType::Log)
81    }
82
83    fn outputs(
84        &self,
85        _: &TransformContext,
86        input_definitions: &[(OutputId, schema::Definition)],
87    ) -> Vec<TransformOutput> {
88        // The event is not modified, so the definition is passed through as-is
89        vec![TransformOutput::new(
90            DataType::Log,
91            clone_input_definitions(input_definitions),
92        )]
93    }
94}