vector_core/config/
log_schema.rs1use std::sync::{LazyLock, OnceLock};
2
3use lookup::{OwnedTargetPath, OwnedValuePath, lookup_v2::OptionalTargetPath};
4use vector_config::configurable_component;
5
6static LOG_SCHEMA: OnceLock<LogSchema> = OnceLock::new();
7static LOG_SCHEMA_DEFAULT: LazyLock<LogSchema> = LazyLock::new(LogSchema::default);
8
9const MESSAGE: &str = "message";
10const TIMESTAMP: &str = "timestamp";
11const HOST: &str = "host";
12const SOURCE_TYPE: &str = "source_type";
13const METADATA: &str = "metadata";
14
15pub fn init_log_schema(log_schema: LogSchema, deny_if_set: bool) {
27 assert!(
28 !(LOG_SCHEMA.set(log_schema).is_err() && deny_if_set),
29 "Couldn't set schema"
30 );
31}
32
33pub fn log_schema() -> &'static LogSchema {
36 LOG_SCHEMA.get().unwrap_or(&LOG_SCHEMA_DEFAULT)
37}
38
39#[configurable_component]
45#[derive(Clone, Debug, Eq, PartialEq)]
46#[serde(default)]
47#[allow(clippy::struct_field_names)]
48pub struct LogSchema {
49 #[serde(default = "LogSchema::default_message_key")]
53 message_key: OptionalTargetPath,
54
55 #[serde(default = "LogSchema::default_timestamp_key")]
57 timestamp_key: OptionalTargetPath,
58
59 #[serde(default = "LogSchema::default_host_key")]
64 host_key: OptionalTargetPath,
65
66 #[serde(default = "LogSchema::default_source_type_key")]
70 source_type_key: OptionalTargetPath,
71
72 #[serde(default = "LogSchema::default_metadata_key")]
77 metadata_key: OptionalTargetPath,
78}
79
80impl Default for LogSchema {
81 fn default() -> Self {
82 LogSchema {
83 message_key: Self::default_message_key(),
84 timestamp_key: Self::default_timestamp_key(),
85 host_key: Self::default_host_key(),
86 source_type_key: Self::default_source_type_key(),
87 metadata_key: Self::default_metadata_key(),
88 }
89 }
90}
91
92impl LogSchema {
93 fn default_message_key() -> OptionalTargetPath {
94 OptionalTargetPath::event(MESSAGE)
95 }
96
97 fn default_timestamp_key() -> OptionalTargetPath {
98 OptionalTargetPath::event(TIMESTAMP)
99 }
100
101 fn default_host_key() -> OptionalTargetPath {
102 OptionalTargetPath::event(HOST)
103 }
104
105 fn default_source_type_key() -> OptionalTargetPath {
106 OptionalTargetPath::event(SOURCE_TYPE)
107 }
108
109 fn default_metadata_key() -> OptionalTargetPath {
110 OptionalTargetPath::event(METADATA)
111 }
112
113 pub fn message_key(&self) -> Option<&OwnedValuePath> {
114 self.message_key.path.as_ref().map(|key| &key.path)
115 }
116
117 pub fn owned_message_path(&self) -> OwnedTargetPath {
127 self.message_key
128 .path
129 .as_ref()
130 .expect("valid message key")
131 .clone()
132 }
133
134 pub fn timestamp_key(&self) -> Option<&OwnedValuePath> {
135 self.timestamp_key.as_ref().map(|key| &key.path)
136 }
137
138 pub fn host_key(&self) -> Option<&OwnedValuePath> {
139 self.host_key.as_ref().map(|key| &key.path)
140 }
141
142 pub fn source_type_key(&self) -> Option<&OwnedValuePath> {
143 self.source_type_key.as_ref().map(|key| &key.path)
144 }
145
146 pub fn metadata_key(&self) -> Option<&OwnedValuePath> {
147 self.metadata_key.as_ref().map(|key| &key.path)
148 }
149
150 pub fn message_key_target_path(&self) -> Option<&OwnedTargetPath> {
151 self.message_key.as_ref()
152 }
153
154 pub fn timestamp_key_target_path(&self) -> Option<&OwnedTargetPath> {
155 self.timestamp_key.as_ref()
156 }
157
158 pub fn host_key_target_path(&self) -> Option<&OwnedTargetPath> {
159 self.host_key.as_ref()
160 }
161
162 pub fn source_type_key_target_path(&self) -> Option<&OwnedTargetPath> {
163 self.source_type_key.as_ref()
164 }
165
166 pub fn metadata_key_target_path(&self) -> Option<&OwnedTargetPath> {
167 self.metadata_key.as_ref()
168 }
169
170 pub fn set_message_key(&mut self, path: Option<OwnedTargetPath>) {
171 self.message_key = OptionalTargetPath { path };
172 }
173
174 pub fn set_timestamp_key(&mut self, path: Option<OwnedTargetPath>) {
175 self.timestamp_key = OptionalTargetPath { path };
176 }
177
178 pub fn set_host_key(&mut self, path: Option<OwnedTargetPath>) {
179 self.host_key = OptionalTargetPath { path };
180 }
181
182 pub fn set_source_type_key(&mut self, path: Option<OwnedTargetPath>) {
183 self.source_type_key = OptionalTargetPath { path };
184 }
185
186 pub fn set_metadata_key(&mut self, path: Option<OwnedTargetPath>) {
187 self.metadata_key = OptionalTargetPath { path };
188 }
189
190 pub fn merge(&mut self, other: &LogSchema) -> Result<(), Vec<String>> {
197 let mut errors = Vec::new();
198
199 if *other != *LOG_SCHEMA_DEFAULT {
200 if self.host_key() != LOG_SCHEMA_DEFAULT.host_key()
202 && self.host_key() != other.host_key()
203 {
204 errors.push("conflicting values for 'log_schema.host_key' found".to_owned());
205 } else {
206 self.set_host_key(other.host_key_target_path().cloned());
207 }
208 if self.message_key() != LOG_SCHEMA_DEFAULT.message_key()
209 && self.message_key() != other.message_key()
210 {
211 errors.push("conflicting values for 'log_schema.message_key' found".to_owned());
212 } else {
213 self.set_message_key(other.message_key_target_path().cloned());
214 }
215 if self.timestamp_key() != LOG_SCHEMA_DEFAULT.timestamp_key()
216 && self.timestamp_key() != other.timestamp_key()
217 {
218 errors.push("conflicting values for 'log_schema.timestamp_key' found".to_owned());
219 } else {
220 self.set_timestamp_key(other.timestamp_key_target_path().cloned());
221 }
222 if self.source_type_key() != LOG_SCHEMA_DEFAULT.source_type_key()
223 && self.source_type_key() != other.source_type_key()
224 {
225 errors.push("conflicting values for 'log_schema.source_type_key' found".to_owned());
226 } else {
227 self.set_source_type_key(other.source_type_key_target_path().cloned());
228 }
229 if self.metadata_key() != LOG_SCHEMA_DEFAULT.metadata_key()
230 && self.metadata_key() != other.metadata_key()
231 {
232 errors.push("conflicting values for 'log_schema.metadata_key' found".to_owned());
233 } else {
234 self.set_metadata_key(other.metadata_key_target_path().cloned());
235 }
236 }
237
238 if errors.is_empty() {
239 Ok(())
240 } else {
241 Err(errors)
242 }
243 }
244}
245
246#[cfg(test)]
247mod test {
248 use super::*;
249
250 #[test]
251 fn partial_log_schema() {
252 let toml = r#"
253 message_key = "message"
254 timestamp_key = "timestamp"
255 "#;
256 toml::from_str::<LogSchema>(toml).unwrap();
257 }
258}