vector_core/config/
log_schema.rs1use std::sync::{LazyLock, OnceLock};
2
3use lookup::lookup_v2::OptionalTargetPath;
4use lookup::{OwnedTargetPath, OwnedValuePath};
5use vector_config::configurable_component;
6
7static LOG_SCHEMA: OnceLock<LogSchema> = OnceLock::new();
8static LOG_SCHEMA_DEFAULT: LazyLock<LogSchema> = LazyLock::new(LogSchema::default);
9
10const MESSAGE: &str = "message";
11const TIMESTAMP: &str = "timestamp";
12const HOST: &str = "host";
13const SOURCE_TYPE: &str = "source_type";
14const METADATA: &str = "metadata";
15
16pub fn init_log_schema(log_schema: LogSchema, deny_if_set: bool) {
28 assert!(
29 !(LOG_SCHEMA.set(log_schema).is_err() && deny_if_set),
30 "Couldn't set schema"
31 );
32}
33
34pub fn log_schema() -> &'static LogSchema {
37 LOG_SCHEMA.get().unwrap_or(&LOG_SCHEMA_DEFAULT)
38}
39
40#[configurable_component]
46#[derive(Clone, Debug, Eq, PartialEq)]
47#[serde(default)]
48#[allow(clippy::struct_field_names)]
49pub struct LogSchema {
50 #[serde(default = "LogSchema::default_message_key")]
54 message_key: OptionalTargetPath,
55
56 #[serde(default = "LogSchema::default_timestamp_key")]
58 timestamp_key: OptionalTargetPath,
59
60 #[serde(default = "LogSchema::default_host_key")]
65 host_key: OptionalTargetPath,
66
67 #[serde(default = "LogSchema::default_source_type_key")]
71 source_type_key: OptionalTargetPath,
72
73 #[serde(default = "LogSchema::default_metadata_key")]
78 metadata_key: OptionalTargetPath,
79}
80
81impl Default for LogSchema {
82 fn default() -> Self {
83 LogSchema {
84 message_key: Self::default_message_key(),
85 timestamp_key: Self::default_timestamp_key(),
86 host_key: Self::default_host_key(),
87 source_type_key: Self::default_source_type_key(),
88 metadata_key: Self::default_metadata_key(),
89 }
90 }
91}
92
93impl LogSchema {
94 fn default_message_key() -> OptionalTargetPath {
95 OptionalTargetPath::event(MESSAGE)
96 }
97
98 fn default_timestamp_key() -> OptionalTargetPath {
99 OptionalTargetPath::event(TIMESTAMP)
100 }
101
102 fn default_host_key() -> OptionalTargetPath {
103 OptionalTargetPath::event(HOST)
104 }
105
106 fn default_source_type_key() -> OptionalTargetPath {
107 OptionalTargetPath::event(SOURCE_TYPE)
108 }
109
110 fn default_metadata_key() -> OptionalTargetPath {
111 OptionalTargetPath::event(METADATA)
112 }
113
114 pub fn message_key(&self) -> Option<&OwnedValuePath> {
115 self.message_key.path.as_ref().map(|key| &key.path)
116 }
117
118 pub fn owned_message_path(&self) -> OwnedTargetPath {
128 self.message_key
129 .path
130 .as_ref()
131 .expect("valid message key")
132 .clone()
133 }
134
135 pub fn timestamp_key(&self) -> Option<&OwnedValuePath> {
136 self.timestamp_key.as_ref().map(|key| &key.path)
137 }
138
139 pub fn host_key(&self) -> Option<&OwnedValuePath> {
140 self.host_key.as_ref().map(|key| &key.path)
141 }
142
143 pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
144 self.source_type_key.as_ref().map(|key| &key.path)
145 }
146
147 pub fn metadata_key(&self) -> Option<&OwnedValuePath> {
148 self.metadata_key.as_ref().map(|key| &key.path)
149 }
150
151 pub fn message_key_target_path(&self) -> Option<&OwnedTargetPath> {
152 self.message_key.as_ref()
153 }
154
155 pub fn timestamp_key_target_path(&self) -> Option<&OwnedTargetPath> {
156 self.timestamp_key.as_ref()
157 }
158
159 pub fn host_key_target_path(&self) -> Option<&OwnedTargetPath> {
160 self.host_key.as_ref()
161 }
162
163 pub fn source_type_key_target_path(&self) -> Option<&OwnedTargetPath> {
164 self.source_type_key.as_ref()
165 }
166
167 pub fn metadata_key_target_path(&self) -> Option<&OwnedTargetPath> {
168 self.metadata_key.as_ref()
169 }
170
171 pub fn set_message_key(&mut self, path: Option<OwnedTargetPath>) {
172 self.message_key = OptionalTargetPath { path };
173 }
174
175 pub fn set_timestamp_key(&mut self, path: Option<OwnedTargetPath>) {
176 self.timestamp_key = OptionalTargetPath { path };
177 }
178
179 pub fn set_host_key(&mut self, path: Option<OwnedTargetPath>) {
180 self.host_key = OptionalTargetPath { path };
181 }
182
183 pub fn set_source_type_key(&mut self, path: Option<OwnedTargetPath>) {
184 self.source_type_key = OptionalTargetPath { path };
185 }
186
187 pub fn set_metadata_key(&mut self, path: Option<OwnedTargetPath>) {
188 self.metadata_key = OptionalTargetPath { path };
189 }
190
191 pub fn merge(&mut self, other: &LogSchema) -> Result<(), Vec<String>> {
198 let mut errors = Vec::new();
199
200 if *other != *LOG_SCHEMA_DEFAULT {
201 if self.host_key() != LOG_SCHEMA_DEFAULT.host_key()
203 && self.host_key() != other.host_key()
204 {
205 errors.push("conflicting values for 'log_schema.host_key' found".to_owned());
206 } else {
207 self.set_host_key(other.host_key_target_path().cloned());
208 }
209 if self.message_key() != LOG_SCHEMA_DEFAULT.message_key()
210 && self.message_key() != other.message_key()
211 {
212 errors.push("conflicting values for 'log_schema.message_key' found".to_owned());
213 } else {
214 self.set_message_key(other.message_key_target_path().cloned());
215 }
216 if self.timestamp_key() != LOG_SCHEMA_DEFAULT.timestamp_key()
217 && self.timestamp_key() != other.timestamp_key()
218 {
219 errors.push("conflicting values for 'log_schema.timestamp_key' found".to_owned());
220 } else {
221 self.set_timestamp_key(other.timestamp_key_target_path().cloned());
222 }
223 if self.source_type_key() != LOG_SCHEMA_DEFAULT.source_type_key()
224 && self.source_type_key() != other.source_type_key()
225 {
226 errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
227 } else {
228 self.set_source_type_key(other.source_type_key_target_path().cloned());
229 }
230 if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
231 && self.metadata_key() != other.metadata_key()
232 {
233 errors.push("conflicting values for 'log_schema.metadata_key' found".to_owned());
234 } else {
235 self.set_metadata_key(other.metadata_key_target_path().cloned());
236 }
237 }
238
239 if errors.is_empty() {
240 Ok(())
241 } else {
242 Err(errors)
243 }
244 }
245}
246
247#[cfg(test)]
248mod test {
249 use super::*;
250
251 #[test]
252 fn partial_log_schema() {
253 let toml = r#"
254 message_key = "message"
255 timestamp_key = "timestamp"
256 "#;
257 toml::from_str::<LogSchema>(toml).unwrap();
258 }
259}