vector_core/config/
log_schema.rs

1use std::sync::{LazyLock, OnceLock};
2
3use lookup::{OwnedTargetPath, OwnedValuePath, lookup_v2::OptionalTargetPath};
4use vector_config::configurable_component;
5
6static LOG_SCHEMA: OnceLock<LogSchema> = OnceLock::new();
7static LOG_SCHEMA_DEFAULT: LazyLock<LogSchema> = LazyLock::new(LogSchema::default);
8
9const MESSAGE: &str = "message";
10const TIMESTAMP: &str = "timestamp";
11const HOST: &str = "host";
12const SOURCE_TYPE: &str = "source_type";
13const METADATA: &str = "metadata";
14
15/// Loads Log Schema from configurations and sets global schema. Once this is
16/// done, configurations can be correctly loaded using configured log schema
17/// defaults.
18///
19/// # Errors
20///
21/// This function will fail if the `builder` fails.
22///
23/// # Panics
24///
25/// If deny is set, will panic if schema has already been set.
26pub fn init_log_schema(log_schema: LogSchema, deny_if_set: bool) {
27    assert!(
28        !(LOG_SCHEMA.set(log_schema).is_err() && deny_if_set),
29        "Couldn't set schema"
30    );
31}
32
33/// Components should use global `LogSchema` returned by this function.  The
34/// returned value can differ from `LogSchema::default()` which is unchanging.
35pub fn log_schema() -> &'static LogSchema {
36    LOG_SCHEMA.get().unwrap_or(&LOG_SCHEMA_DEFAULT)
37}
38
39/// Log schema.
40///
41/// A log schema is used by Vector not only to uniformly process the fields of an event, but also to
42/// specify which fields should hold specific data that is also set by Vector once an event is
43/// flowing through a topology.
44#[configurable_component]
45#[derive(Clone, Debug, Eq, PartialEq)]
46#[serde(default)]
47#[allow(clippy::struct_field_names)]
48pub struct LogSchema {
49    /// The name of the event field to treat as the event message.
50    ///
51    /// This would be the field that holds the raw message, such as a raw log line.
52    #[serde(default = "LogSchema::default_message_key")]
53    message_key: OptionalTargetPath,
54
55    /// The name of the event field to treat as the event timestamp.
56    #[serde(default = "LogSchema::default_timestamp_key")]
57    timestamp_key: OptionalTargetPath,
58
59    /// The name of the event field to treat as the host which sent the message.
60    ///
61    /// This field will generally represent a real host, or container, that generated the message,
62    /// but is somewhat source-dependent.
63    #[serde(default = "LogSchema::default_host_key")]
64    host_key: OptionalTargetPath,
65
66    /// The name of the event field to set the source identifier in.
67    ///
68    /// This field will be set by the Vector source that the event was created in.
69    #[serde(default = "LogSchema::default_source_type_key")]
70    source_type_key: OptionalTargetPath,
71
72    /// The name of the event field to set the event metadata in.
73    ///
74    /// Generally, this field will be set by Vector to hold event-specific metadata, such as
75    /// annotations by the `remap` transform when an error or abort is encountered.
76    #[serde(default = "LogSchema::default_metadata_key")]
77    metadata_key: OptionalTargetPath,
78}
79
80impl Default for LogSchema {
81    fn default() -> Self {
82        LogSchema {
83            message_key: Self::default_message_key(),
84            timestamp_key: Self::default_timestamp_key(),
85            host_key: Self::default_host_key(),
86            source_type_key: Self::default_source_type_key(),
87            metadata_key: Self::default_metadata_key(),
88        }
89    }
90}
91
92impl LogSchema {
93    fn default_message_key() -> OptionalTargetPath {
94        OptionalTargetPath::event(MESSAGE)
95    }
96
97    fn default_timestamp_key() -> OptionalTargetPath {
98        OptionalTargetPath::event(TIMESTAMP)
99    }
100
101    fn default_host_key() -> OptionalTargetPath {
102        OptionalTargetPath::event(HOST)
103    }
104
105    fn default_source_type_key() -> OptionalTargetPath {
106        OptionalTargetPath::event(SOURCE_TYPE)
107    }
108
109    fn default_metadata_key() -> OptionalTargetPath {
110        OptionalTargetPath::event(METADATA)
111    }
112
113    pub fn message_key(&self) -> Option<&OwnedValuePath> {
114        self.message_key.path.as_ref().map(|key| &key.path)
115    }
116
117    /// Returns an `OwnedTargetPath` of the message key.
118    /// This parses the path and will panic if it is invalid.
119    ///
120    /// This should only be used where the result will either be cached,
121    /// or performance isn't critical, since this requires memory allocation.
122    ///
123    /// # Panics
124    ///
125    /// Panics if the path in `self.message_key` is invalid.
126    pub fn owned_message_path(&self) -> OwnedTargetPath {
127        self.message_key
128            .path
129            .as_ref()
130            .expect("valid message key")
131            .clone()
132    }
133
134    pub fn timestamp_key(&self) -> Option<&OwnedValuePath> {
135        self.timestamp_key.as_ref().map(|key| &key.path)
136    }
137
138    pub fn host_key(&self) -> Option<&OwnedValuePath> {
139        self.host_key.as_ref().map(|key| &key.path)
140    }
141
142    pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
143        self.source_type_key.as_ref().map(|key| &key.path)
144    }
145
146    pub fn metadata_key(&self) -> Option<&OwnedValuePath> {
147        self.metadata_key.as_ref().map(|key| &key.path)
148    }
149
150    pub fn message_key_target_path(&self) -> Option<&OwnedTargetPath> {
151        self.message_key.as_ref()
152    }
153
154    pub fn timestamp_key_target_path(&self) -> Option<&OwnedTargetPath> {
155        self.timestamp_key.as_ref()
156    }
157
158    pub fn host_key_target_path(&self) -> Option<&OwnedTargetPath> {
159        self.host_key.as_ref()
160    }
161
162    pub fn source_type_key_target_path(&self) -> Option<&OwnedTargetPath> {
163        self.source_type_key.as_ref()
164    }
165
166    pub fn metadata_key_target_path(&self) -> Option<&OwnedTargetPath> {
167        self.metadata_key.as_ref()
168    }
169
170    pub fn set_message_key(&mut self, path: Option<OwnedTargetPath>) {
171        self.message_key = OptionalTargetPath { path };
172    }
173
174    pub fn set_timestamp_key(&mut self, path: Option<OwnedTargetPath>) {
175        self.timestamp_key = OptionalTargetPath { path };
176    }
177
178    pub fn set_host_key(&mut self, path: Option<OwnedTargetPath>) {
179        self.host_key = OptionalTargetPath { path };
180    }
181
182    pub fn set_source_type_key(&mut self, path: Option<OwnedTargetPath>) {
183        self.source_type_key = OptionalTargetPath { path };
184    }
185
186    pub fn set_metadata_key(&mut self, path: Option<OwnedTargetPath>) {
187        self.metadata_key = OptionalTargetPath { path };
188    }
189
190    /// Merge two `LogSchema` instances together.
191    ///
192    /// # Errors
193    ///
194    /// This function will fail when the `LogSchema` to be merged contains
195    /// conflicting keys.
196    pub fn merge(&mut self, other: &LogSchema) -> Result<(), Vec<String>> {
197        let mut errors = Vec::new();
198
199        if *other != *LOG_SCHEMA_DEFAULT {
200            // If the set value is the default, override it. If it's already overridden, error.
201            if self.host_key() != LOG_SCHEMA_DEFAULT.host_key()
202                && self.host_key() != other.host_key()
203            {
204                errors.push("conflicting values for 'log_schema.host_key' found".to_owned());
205            } else {
206                self.set_host_key(other.host_key_target_path().cloned());
207            }
208            if self.message_key() != LOG_SCHEMA_DEFAULT.message_key()
209                && self.message_key() != other.message_key()
210            {
211                errors.push("conflicting values for 'log_schema.message_key' found".to_owned());
212            } else {
213                self.set_message_key(other.message_key_target_path().cloned());
214            }
215            if self.timestamp_key() != LOG_SCHEMA_DEFAULT.timestamp_key()
216                && self.timestamp_key() != other.timestamp_key()
217            {
218                errors.push("conflicting values for 'log_schema.timestamp_key' found".to_owned());
219            } else {
220                self.set_timestamp_key(other.timestamp_key_target_path().cloned());
221            }
222            if self.source_type_key() != LOG_SCHEMA_DEFAULT.source_type_key()
223                && self.source_type_key() != other.source_type_key()
224            {
225                errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
226            } else {
227                self.set_source_type_key(other.source_type_key_target_path().cloned());
228            }
229            if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
230                && self.metadata_key() != other.metadata_key()
231            {
232                errors.push("conflicting values for 'log_schema.metadata_key' found".to_owned());
233            } else {
234                self.set_metadata_key(other.metadata_key_target_path().cloned());
235            }
236        }
237
238        if errors.is_empty() {
239            Ok(())
240        } else {
241            Err(errors)
242        }
243    }
244}
245
246#[cfg(test)]
247mod test {
248    use super::*;
249
250    #[test]
251    fn partial_log_schema() {
252        let toml = r#"
253            message_key = "message"
254            timestamp_key = "timestamp"
255        "#;
256        toml::from_str::<LogSchema>(toml).unwrap();
257    }
258}