vector_core/config/
log_schema.rs

1use std::sync::{LazyLock, OnceLock};
2
3use lookup::lookup_v2::OptionalTargetPath;
4use lookup::{OwnedTargetPath, OwnedValuePath};
5use vector_config::configurable_component;
6
7static LOG_SCHEMA: OnceLock<LogSchema> = OnceLock::new();
8static LOG_SCHEMA_DEFAULT: LazyLock<LogSchema> = LazyLock::new(LogSchema::default);
9
10const MESSAGE: &str = "message";
11const TIMESTAMP: &str = "timestamp";
12const HOST: &str = "host";
13const SOURCE_TYPE: &str = "source_type";
14const METADATA: &str = "metadata";
15
16/// Loads Log Schema from configurations and sets global schema. Once this is
17/// done, configurations can be correctly loaded using configured log schema
18/// defaults.
19///
20/// # Errors
21///
22/// This function will fail if the `builder` fails.
23///
24/// # Panics
25///
26/// If deny is set, will panic if schema has already been set.
27pub fn init_log_schema(log_schema: LogSchema, deny_if_set: bool) {
28    assert!(
29        !(LOG_SCHEMA.set(log_schema).is_err() && deny_if_set),
30        "Couldn't set schema"
31    );
32}
33
34/// Components should use global `LogSchema` returned by this function.  The
35/// returned value can differ from `LogSchema::default()` which is unchanging.
36pub fn log_schema() -> &'static LogSchema {
37    LOG_SCHEMA.get().unwrap_or(&LOG_SCHEMA_DEFAULT)
38}
39
40/// Log schema.
41///
42/// A log schema is used by Vector not only to uniformly process the fields of an event, but also to
43/// specify which fields should hold specific data that is also set by Vector once an event is
44/// flowing through a topology.
45#[configurable_component]
46#[derive(Clone, Debug, Eq, PartialEq)]
47#[serde(default)]
48#[allow(clippy::struct_field_names)]
49pub struct LogSchema {
50    /// The name of the event field to treat as the event message.
51    ///
52    /// This would be the field that holds the raw message, such as a raw log line.
53    #[serde(default = "LogSchema::default_message_key")]
54    message_key: OptionalTargetPath,
55
56    /// The name of the event field to treat as the event timestamp.
57    #[serde(default = "LogSchema::default_timestamp_key")]
58    timestamp_key: OptionalTargetPath,
59
60    /// The name of the event field to treat as the host which sent the message.
61    ///
62    /// This field will generally represent a real host, or container, that generated the message,
63    /// but is somewhat source-dependent.
64    #[serde(default = "LogSchema::default_host_key")]
65    host_key: OptionalTargetPath,
66
67    /// The name of the event field to set the source identifier in.
68    ///
69    /// This field will be set by the Vector source that the event was created in.
70    #[serde(default = "LogSchema::default_source_type_key")]
71    source_type_key: OptionalTargetPath,
72
73    /// The name of the event field to set the event metadata in.
74    ///
75    /// Generally, this field will be set by Vector to hold event-specific metadata, such as
76    /// annotations by the `remap` transform when an error or abort is encountered.
77    #[serde(default = "LogSchema::default_metadata_key")]
78    metadata_key: OptionalTargetPath,
79}
80
81impl Default for LogSchema {
82    fn default() -> Self {
83        LogSchema {
84            message_key: Self::default_message_key(),
85            timestamp_key: Self::default_timestamp_key(),
86            host_key: Self::default_host_key(),
87            source_type_key: Self::default_source_type_key(),
88            metadata_key: Self::default_metadata_key(),
89        }
90    }
91}
92
93impl LogSchema {
94    fn default_message_key() -> OptionalTargetPath {
95        OptionalTargetPath::event(MESSAGE)
96    }
97
98    fn default_timestamp_key() -> OptionalTargetPath {
99        OptionalTargetPath::event(TIMESTAMP)
100    }
101
102    fn default_host_key() -> OptionalTargetPath {
103        OptionalTargetPath::event(HOST)
104    }
105
106    fn default_source_type_key() -> OptionalTargetPath {
107        OptionalTargetPath::event(SOURCE_TYPE)
108    }
109
110    fn default_metadata_key() -> OptionalTargetPath {
111        OptionalTargetPath::event(METADATA)
112    }
113
114    pub fn message_key(&self) -> Option<&OwnedValuePath> {
115        self.message_key.path.as_ref().map(|key| &key.path)
116    }
117
118    /// Returns an `OwnedTargetPath` of the message key.
119    /// This parses the path and will panic if it is invalid.
120    ///
121    /// This should only be used where the result will either be cached,
122    /// or performance isn't critical, since this requires memory allocation.
123    ///
124    /// # Panics
125    ///
126    /// Panics if the path in `self.message_key` is invalid.
127    pub fn owned_message_path(&self) -> OwnedTargetPath {
128        self.message_key
129            .path
130            .as_ref()
131            .expect("valid message key")
132            .clone()
133    }
134
135    pub fn timestamp_key(&self) -> Option<&OwnedValuePath> {
136        self.timestamp_key.as_ref().map(|key| &key.path)
137    }
138
139    pub fn host_key(&self) -> Option<&OwnedValuePath> {
140        self.host_key.as_ref().map(|key| &key.path)
141    }
142
143    pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
144        self.source_type_key.as_ref().map(|key| &key.path)
145    }
146
147    pub fn metadata_key(&self) -> Option<&OwnedValuePath> {
148        self.metadata_key.as_ref().map(|key| &key.path)
149    }
150
151    pub fn message_key_target_path(&self) -> Option<&OwnedTargetPath> {
152        self.message_key.as_ref()
153    }
154
155    pub fn timestamp_key_target_path(&self) -> Option<&OwnedTargetPath> {
156        self.timestamp_key.as_ref()
157    }
158
159    pub fn host_key_target_path(&self) -> Option<&OwnedTargetPath> {
160        self.host_key.as_ref()
161    }
162
163    pub fn source_type_key_target_path(&self) -> Option<&OwnedTargetPath> {
164        self.source_type_key.as_ref()
165    }
166
167    pub fn metadata_key_target_path(&self) -> Option<&OwnedTargetPath> {
168        self.metadata_key.as_ref()
169    }
170
171    pub fn set_message_key(&mut self, path: Option<OwnedTargetPath>) {
172        self.message_key = OptionalTargetPath { path };
173    }
174
175    pub fn set_timestamp_key(&mut self, path: Option<OwnedTargetPath>) {
176        self.timestamp_key = OptionalTargetPath { path };
177    }
178
179    pub fn set_host_key(&mut self, path: Option<OwnedTargetPath>) {
180        self.host_key = OptionalTargetPath { path };
181    }
182
183    pub fn set_source_type_key(&mut self, path: Option<OwnedTargetPath>) {
184        self.source_type_key = OptionalTargetPath { path };
185    }
186
187    pub fn set_metadata_key(&mut self, path: Option<OwnedTargetPath>) {
188        self.metadata_key = OptionalTargetPath { path };
189    }
190
191    /// Merge two `LogSchema` instances together.
192    ///
193    /// # Errors
194    ///
195    /// This function will fail when the `LogSchema` to be merged contains
196    /// conflicting keys.
197    pub fn merge(&mut self, other: &LogSchema) -> Result<(), Vec<String>> {
198        let mut errors = Vec::new();
199
200        if *other != *LOG_SCHEMA_DEFAULT {
201            // If the set value is the default, override it. If it's already overridden, error.
202            if self.host_key() != LOG_SCHEMA_DEFAULT.host_key()
203                && self.host_key() != other.host_key()
204            {
205                errors.push("conflicting values for 'log_schema.host_key' found".to_owned());
206            } else {
207                self.set_host_key(other.host_key_target_path().cloned());
208            }
209            if self.message_key() != LOG_SCHEMA_DEFAULT.message_key()
210                && self.message_key() != other.message_key()
211            {
212                errors.push("conflicting values for 'log_schema.message_key' found".to_owned());
213            } else {
214                self.set_message_key(other.message_key_target_path().cloned());
215            }
216            if self.timestamp_key() != LOG_SCHEMA_DEFAULT.timestamp_key()
217                && self.timestamp_key() != other.timestamp_key()
218            {
219                errors.push("conflicting values for 'log_schema.timestamp_key' found".to_owned());
220            } else {
221                self.set_timestamp_key(other.timestamp_key_target_path().cloned());
222            }
223            if self.source_type_key() != LOG_SCHEMA_DEFAULT.source_type_key()
224                && self.source_type_key() != other.source_type_key()
225            {
226                errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
227            } else {
228                self.set_source_type_key(other.source_type_key_target_path().cloned());
229            }
230            if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
231                && self.metadata_key() != other.metadata_key()
232            {
233                errors.push("conflicting values for 'log_schema.metadata_key' found".to_owned());
234            } else {
235                self.set_metadata_key(other.metadata_key_target_path().cloned());
236            }
237        }
238
239        if errors.is_empty() {
240            Ok(())
241        } else {
242            Err(errors)
243        }
244    }
245}
246
247#[cfg(test)]
248mod test {
249    use super::*;
250
251    #[test]
252    fn partial_log_schema() {
253        let toml = r#"
254            message_key = "message"
255            timestamp_key = "timestamp"
256        "#;
257        toml::from_str::<LogSchema>(toml).unwrap();
258    }
259}