vector/enrichment_tables/
mmdb.rs1use std::{fs, net::IpAddr, path::PathBuf, sync::Arc, time::SystemTime};
6
7use maxminddb::Reader;
8use vector_lib::{
9 configurable::configurable_component,
10 enrichment::{Case, Condition, IndexHandle, Table},
11};
12use vrl::value::{ObjectMap, Value};
13
14use crate::config::{EnrichmentTableConfig, GenerateConfig};
15
16#[derive(Clone, Debug, Eq, PartialEq)]
18#[configurable_component(enrichment_table("mmdb"))]
19pub struct MmdbConfig {
20 pub path: PathBuf,
24}
25
26impl GenerateConfig for MmdbConfig {
27 fn generate_config() -> toml::Value {
28 toml::Value::try_from(Self {
29 path: "/path/to/GeoLite2-City.mmdb".into(),
30 })
31 .unwrap()
32 }
33}
34
35impl EnrichmentTableConfig for MmdbConfig {
36 async fn build(
37 &self,
38 _: &crate::config::GlobalOptions,
39 ) -> crate::Result<Box<dyn Table + Send + Sync>> {
40 Ok(Box::new(Mmdb::new(self.clone())?))
41 }
42}
43
44#[derive(Clone)]
45pub struct Mmdb {
47 config: MmdbConfig,
48 dbreader: Arc<maxminddb::Reader<Vec<u8>>>,
49 last_modified: SystemTime,
50}
51
52impl Mmdb {
53 pub fn new(config: MmdbConfig) -> crate::Result<Self> {
55 let dbreader = Arc::new(Reader::open_readfile(&config.path)?);
56
57 let ip = IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED);
59 let result = dbreader.lookup::<ObjectMap>(ip).map(|_| ());
60
61 match result {
62 Ok(_) => Ok(Mmdb {
63 last_modified: fs::metadata(&config.path)?.modified()?,
64 dbreader,
65 config,
66 }),
67 Err(error) => Err(error.into()),
68 }
69 }
70
71 fn lookup(&self, ip: IpAddr, select: Option<&[String]>) -> Option<ObjectMap> {
72 let data = self.dbreader.lookup::<ObjectMap>(ip).ok()??;
73
74 if let Some(fields) = select {
75 let mut filtered = Value::from(ObjectMap::new());
76 let mut data_value = Value::from(data);
77 for field in fields {
78 filtered.insert(
79 field.as_str(),
80 data_value
81 .remove(field.as_str(), false)
82 .unwrap_or(Value::Null),
83 );
84 }
85 filtered.into_object()
86 } else {
87 Some(data)
88 }
89 }
90}
91
92impl Table for Mmdb {
93 fn find_table_row<'a>(
99 &self,
100 case: Case,
101 condition: &'a [Condition<'a>],
102 select: Option<&[String]>,
103 wildcard: Option<&Value>,
104 index: Option<IndexHandle>,
105 ) -> Result<ObjectMap, String> {
106 let mut rows = self.find_table_rows(case, condition, select, wildcard, index)?;
107
108 match rows.pop() {
109 Some(row) if rows.is_empty() => Ok(row),
110 Some(_) => Err("More than 1 row found".to_string()),
111 None => Err("IP not found".to_string()),
112 }
113 }
114
115 fn find_table_rows<'a>(
119 &self,
120 _: Case,
121 condition: &'a [Condition<'a>],
122 select: Option<&[String]>,
123 _wildcard: Option<&Value>,
124 _: Option<IndexHandle>,
125 ) -> Result<Vec<ObjectMap>, String> {
126 match condition.first() {
127 Some(_) if condition.len() > 1 => Err("Only one condition is allowed".to_string()),
128 Some(Condition::Equals { value, .. }) => {
129 let ip = value
130 .to_string_lossy()
131 .parse::<IpAddr>()
132 .map_err(|_| "Invalid IP address".to_string())?;
133 Ok(self
134 .lookup(ip, select)
135 .map(|values| vec![values])
136 .unwrap_or_default())
137 }
138 Some(_) => Err("Only equality condition is allowed".to_string()),
139 None => Err("IP condition must be specified".to_string()),
140 }
141 }
142
143 fn add_index(&mut self, _: Case, fields: &[&str]) -> Result<IndexHandle, String> {
149 match fields.len() {
150 0 => Err("IP field is required".to_string()),
151 1 => Ok(IndexHandle(0)),
152 _ => Err("Only one field is allowed".to_string()),
153 }
154 }
155
156 fn index_fields(&self) -> Vec<(Case, Vec<String>)> {
158 Vec::new()
159 }
160
161 fn needs_reload(&self) -> bool {
163 matches!(fs::metadata(&self.config.path)
164 .and_then(|metadata| metadata.modified()),
165 Ok(modified) if modified > self.last_modified)
166 }
167}
168
169impl std::fmt::Debug for Mmdb {
170 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
171 write!(f, "Maxmind database {})", self.config.path.display())
172 }
173}
174
175#[cfg(test)]
176mod tests {
177 use vrl::value::Value;
178
179 use super::*;
180
181 #[test]
182 fn city_partial_lookup() {
183 let values = find_select(
184 "2.125.160.216",
185 "tests/data/GeoIP2-City-Test.mmdb",
186 Some(&[
187 "location.latitude".to_string(),
188 "location.longitude".to_string(),
189 ]),
190 )
191 .unwrap();
192
193 let mut expected = ObjectMap::new();
194 expected.insert(
195 "location".into(),
196 ObjectMap::from([
197 ("latitude".into(), Value::from(51.75)),
198 ("longitude".into(), Value::from(-1.25)),
199 ])
200 .into(),
201 );
202
203 assert_eq!(values, expected);
204 }
205
206 #[test]
207 fn isp_lookup() {
208 let values = find("208.192.1.2", "tests/data/GeoIP2-ISP-Test.mmdb").unwrap();
209
210 let mut expected = ObjectMap::new();
211 expected.insert("autonomous_system_number".into(), 701i64.into());
212 expected.insert(
213 "autonomous_system_organization".into(),
214 "MCI Communications Services, Inc. d/b/a Verizon Business".into(),
215 );
216 expected.insert("isp".into(), "Verizon Business".into());
217 expected.insert("organization".into(), "Verizon Business".into());
218
219 assert_eq!(values, expected);
220 }
221
222 #[test]
223 fn connection_type_lookup_success() {
224 let values = find(
225 "201.243.200.1",
226 "tests/data/GeoIP2-Connection-Type-Test.mmdb",
227 )
228 .unwrap();
229
230 let mut expected = ObjectMap::new();
231 expected.insert("connection_type".into(), "Corporate".into());
232
233 assert_eq!(values, expected);
234 }
235
236 #[test]
237 fn lookup_missing() {
238 let values = find("10.1.12.1", "tests/data/custom-type.mmdb");
239
240 assert!(values.is_none());
241 }
242
243 #[test]
244 fn custom_mmdb_type() {
245 let values = find("208.192.1.2", "tests/data/custom-type.mmdb").unwrap();
246
247 let mut expected = ObjectMap::new();
248 expected.insert("hostname".into(), "custom".into());
249 expected.insert(
250 "nested".into(),
251 ObjectMap::from([
252 ("hostname".into(), "custom".into()),
253 ("original_cidr".into(), "208.192.1.2/24".into()),
254 ])
255 .into(),
256 );
257
258 assert_eq!(values, expected);
259 }
260
261 fn find(ip: &str, database: &str) -> Option<ObjectMap> {
262 find_select(ip, database, None)
263 }
264
265 fn find_select(ip: &str, database: &str, select: Option<&[String]>) -> Option<ObjectMap> {
266 Mmdb::new(MmdbConfig {
267 path: database.into(),
268 })
269 .unwrap()
270 .find_table_rows(
271 Case::Insensitive,
272 &[Condition::Equals {
273 field: "ip",
274 value: ip.into(),
275 }],
276 select,
277 None,
278 None,
279 )
280 .unwrap()
281 .pop()
282 }
283}