1use crate::compiler::prelude::*;
2use std::collections::BTreeMap;
3
4fn parse_aws_vpc_flow_log(value: Value, format: Option<Value>) -> Resolved {
5 let bytes = value.try_bytes()?;
6 let input = String::from_utf8_lossy(&bytes);
7 if let Some(expr) = format {
8 let bytes = expr.try_bytes()?;
9 parse_log(&input, Some(&String::from_utf8_lossy(&bytes)))
10 } else {
11 parse_log(&input, None)
12 }
13 .map_err(Into::into)
14}
15
16#[derive(Clone, Copy, Debug)]
17pub struct ParseAwsVpcFlowLog;
18
19impl Function for ParseAwsVpcFlowLog {
20 fn identifier(&self) -> &'static str {
21 "parse_aws_vpc_flow_log"
22 }
23
24 fn usage(&self) -> &'static str {
25 "Parses `value` in the [VPC Flow Logs format](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs.html)."
26 }
27
28 fn category(&self) -> &'static str {
29 Category::Parse.as_ref()
30 }
31
32 fn internal_failure_reasons(&self) -> &'static [&'static str] {
33 &["`value` is not a properly formatted AWS VPC Flow log."]
34 }
35
36 fn return_kind(&self) -> u16 {
37 kind::OBJECT
38 }
39
40 fn examples(&self) -> &'static [Example] {
41 &[
42 example! {
43 title: "Parse AWS VPC Flow log (default format)",
44 source: r#"parse_aws_vpc_flow_log!("2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA")"#,
45 result: Ok(indoc! { r#"{
46 "version": 2,
47 "account_id": "123456789010",
48 "interface_id": "eni-1235b8ca123456789",
49 "srcaddr": null,
50 "dstaddr": null,
51 "srcport": null,
52 "dstport": null,
53 "protocol": null,
54 "packets": null,
55 "bytes": null,
56 "start": 1431280876,
57 "end": 1431280934,
58 "action": null,
59 "log_status":"NODATA"
60 }"# }),
61 },
62 example! {
63 title: "Parse AWS VPC Flow log (custom format)",
64 source: indoc! {r#"
65 parse_aws_vpc_flow_log!(
66 "- eni-1235b8ca123456789 10.0.1.5 10.0.0.220 10.0.1.5 203.0.113.5",
67 "instance_id interface_id srcaddr dstaddr pkt_srcaddr pkt_dstaddr"
68 )
69 "#},
70 result: Ok(indoc! { r#"{
71 "instance_id": null,
72 "interface_id": "eni-1235b8ca123456789",
73 "srcaddr": "10.0.1.5",
74 "dstaddr": "10.0.0.220",
75 "pkt_srcaddr": "10.0.1.5",
76 "pkt_dstaddr": "203.0.113.5"
77 }"# }),
78 },
79 example! {
80 title: "Parse AWS VPC Flow log including v5 fields",
81 source: indoc! {r#"
82 parse_aws_vpc_flow_log!(
83 "5 52.95.128.179 10.0.0.71 80 34210 6 1616729292 1616729349 IPv4 14 15044 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 19 52.95.128.179 10.0.0.71 S3 - - ingress OK",
84 format: "version srcaddr dstaddr srcport dstport protocol start end type packets bytes account_id vpc_id subnet_id instance_id interface_id region az_id sublocation_type sublocation_id action tcp_flags pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path flow_direction log_status"
85 )
86 "#},
87 result: Ok(indoc! { r#"{
88 "account_id": "123456789012",
89 "action": "ACCEPT",
90 "az_id": "apse2-az3",
91 "bytes": 15044,
92 "dstaddr": "10.0.0.71",
93 "dstport": 34210,
94 "end": 1616729349,
95 "flow_direction": "ingress",
96 "instance_id": "i-0c50d5961bcb2d47b",
97 "interface_id": "eni-1235b8ca123456789",
98 "log_status": "OK",
99 "packets": 14,
100 "pkt_dst_aws_service": null,
101 "pkt_dstaddr": "10.0.0.71",
102 "pkt_src_aws_service": "S3",
103 "pkt_srcaddr": "52.95.128.179",
104 "protocol": 6,
105 "region": "ap-southeast-2",
106 "srcaddr": "52.95.128.179",
107 "srcport": 80,
108 "start": 1616729292,
109 "sublocation_id": null,
110 "sublocation_type": null,
111 "subnet_id": "subnet-aaaaaaaa012345678",
112 "tcp_flags": 19,
113 "traffic_path": null,
114 "type": "IPv4",
115 "version": 5,
116 "vpc_id": "vpc-abcdefab012345678"
117 }"# }),
118 },
119 ]
120 }
121
122 fn compile(
123 &self,
124 _state: &state::TypeState,
125 _ctx: &mut FunctionCompileContext,
126 arguments: ArgumentList,
127 ) -> Compiled {
128 let value = arguments.required("value");
129 let format = arguments.optional("format");
130
131 Ok(ParseAwsVpcFlowLogFn::new(value, format).as_expr())
132 }
133
134 fn parameters(&self) -> &'static [Parameter] {
135 const PARAMETERS: &[Parameter] = &[
136 Parameter::required("value", kind::BYTES, "VPC Flow Log."),
137 Parameter::optional("format", kind::BYTES, "VPC Flow Log format."),
138 ];
139 PARAMETERS
140 }
141}
142
143#[derive(Debug, Clone)]
144struct ParseAwsVpcFlowLogFn {
145 value: Box<dyn Expression>,
146 format: Option<Box<dyn Expression>>,
147}
148
149impl ParseAwsVpcFlowLogFn {
150 fn new(value: Box<dyn Expression>, format: Option<Box<dyn Expression>>) -> Self {
151 Self { value, format }
152 }
153}
154
155impl FunctionExpression for ParseAwsVpcFlowLogFn {
156 fn resolve(&self, ctx: &mut Context) -> Resolved {
157 let value = self.value.resolve(ctx)?;
158 let format = self
159 .format
160 .as_ref()
161 .map(|expr| expr.resolve(ctx))
162 .transpose()?;
163
164 parse_aws_vpc_flow_log(value, format)
165 }
166
167 fn type_def(&self, _: &state::TypeState) -> TypeDef {
168 TypeDef::object(inner_kind()).fallible()
169 }
170}
171
172fn inner_kind() -> BTreeMap<Field, Kind> {
173 BTreeMap::from([
174 (Field::from("account_id"), Kind::bytes() | Kind::null()),
175 (Field::from("action"), Kind::bytes() | Kind::null()),
176 (Field::from("az_id"), Kind::bytes() | Kind::null()),
177 (Field::from("bytes"), Kind::integer() | Kind::null()),
178 (Field::from("dstaddr"), Kind::bytes() | Kind::null()),
179 (Field::from("dstport"), Kind::integer() | Kind::null()),
180 (Field::from("end"), Kind::integer() | Kind::null()),
181 (Field::from("flow_direction"), Kind::bytes() | Kind::null()),
182 (Field::from("instance_id"), Kind::bytes() | Kind::null()),
183 (Field::from("interface_id"), Kind::bytes() | Kind::null()),
184 (Field::from("log_status"), Kind::bytes() | Kind::null()),
185 (Field::from("packets"), Kind::integer() | Kind::null()),
186 (Field::from("pkt_dstaddr"), Kind::bytes() | Kind::null()),
187 (
188 Field::from("pkt_dst_aws_service"),
189 Kind::bytes() | Kind::null(),
190 ),
191 (Field::from("pkt_srcaddr"), Kind::bytes() | Kind::null()),
192 (
193 Field::from("pkt_src_aws_service"),
194 Kind::bytes() | Kind::null(),
195 ),
196 (Field::from("protocol"), Kind::integer() | Kind::null()),
197 (Field::from("region"), Kind::bytes() | Kind::null()),
198 (Field::from("srcaddr"), Kind::bytes() | Kind::null()),
199 (Field::from("srcport"), Kind::integer() | Kind::null()),
200 (Field::from("start"), Kind::integer() | Kind::null()),
201 (Field::from("sublocation_id"), Kind::bytes() | Kind::null()),
202 (
203 Field::from("sublocation_type"),
204 Kind::bytes() | Kind::null(),
205 ),
206 (Field::from("subnet_id"), Kind::bytes() | Kind::null()),
207 (Field::from("tcp_flags"), Kind::integer() | Kind::null()),
208 (Field::from("traffic_path"), Kind::integer() | Kind::null()),
209 (Field::from("type"), Kind::bytes() | Kind::null()),
210 (Field::from("version"), Kind::integer() | Kind::null()),
211 (Field::from("vpc_id"), Kind::bytes() | Kind::null()),
212 ])
213}
214
215type ParseResult<T> = std::result::Result<T, String>;
216
217#[allow(clippy::unnecessary_wraps)] fn identity<'a>(_key: &'a str, value: &'a str) -> ParseResult<&'a str> {
219 Ok(value)
220}
221
222fn parse_i64(key: &str, value: &str) -> ParseResult<i64> {
223 value
224 .parse()
225 .map_err(|_| format!("failed to parse value as i64 (key: `{key}`): `{value}`"))
226}
227
228macro_rules! create_match {
229 ($log:expr_2021, $key:expr_2021, $value:expr_2021, $($name:expr_2021 => $transform:expr_2021),+) => {
230 match $key {
231 $($name => {
232 let value = match $value {
233 "-" => Value::Null,
234 value => $transform($name, value)?.into(),
235 };
236 if $log.insert($name.into(), value).is_some() {
237 return Err(format!("value already exists for key: `{}`", $key));
238 }
239 })+
240 key => return Err(format!("unknown key: `{}`", key))
241 };
242 };
243}
244
245fn parse_log(input: &str, format: Option<&str>) -> ParseResult<Value> {
246 let mut log = BTreeMap::new();
247
248 let mut input = input.split(' ');
249 let mut format = format
250 .unwrap_or("version account_id interface_id srcaddr dstaddr srcport dstport protocol packets bytes start end action log_status")
251 .split(' ');
252
253 loop {
254 return match (format.next(), input.next()) {
255 (Some(key), Some(value)) => {
256 create_match!(
257 log, key, value,
258 "account_id" => identity,
259 "action" => identity,
260 "az_id" => identity,
261 "bytes" => parse_i64,
262 "dstaddr" => identity,
263 "dstport" => parse_i64,
264 "end" => parse_i64,
265 "flow_direction" => identity,
266 "instance_id" => identity,
267 "interface_id" => identity,
268 "log_status" => identity,
269 "packets" => parse_i64,
270 "pkt_dstaddr" => identity,
271 "pkt_dst_aws_service" => identity,
272 "pkt_srcaddr" => identity,
273 "pkt_src_aws_service" => identity,
274 "protocol" => parse_i64,
275 "region" => identity,
276 "srcaddr" => identity,
277 "srcport" => parse_i64,
278 "start" => parse_i64,
279 "sublocation_id" => identity,
280 "sublocation_type" => identity,
281 "subnet_id" => identity,
282 "tcp_flags" => parse_i64,
283 "traffic_path" => parse_i64,
284 "type" => identity,
285 "version" => parse_i64,
286 "vpc_id" => identity
287 );
288
289 continue;
290 }
291 (None, Some(value)) => Err(format!("no key for value: `{value}`")),
292 (Some(key), None) => Err(format!("no item for key: `{key}`")),
293 (None, None) => Ok(log.into()),
294 };
295 }
296}
297
298#[cfg(test)]
299mod tests {
300 use super::*;
301 use crate::value;
302
303 #[test]
304 fn parse_aws_vpc_flow_log() {
305 let logs = vec![
307 (
308 None,
309 vec![
310 "2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK",
311 "2 123456789010 eni-1235b8ca123456789 172.31.9.69 172.31.9.12 49761 3389 6 20 4249 1418530010 1418530070 REJECT OK",
312 "2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA",
313 "2 123456789010 eni-11111111aaaaaaaaa - - - - - - - 1431280876 1431280934 - SKIPDATA",
314 "2 123456789010 eni-1235b8ca123456789 203.0.113.12 172.31.16.139 0 0 1 4 336 1432917027 1432917142 ACCEPT OK",
315 "2 123456789010 eni-1235b8ca123456789 172.31.16.139 203.0.113.12 0 0 1 4 336 1432917094 1432917142 REJECT OK",
316 "2 123456789010 eni-1235b8ca123456789 2001:db8:1234:a100:8d6e:3477:df66:f105 2001:db8:1234:a102:3304:8879:34cf:4071 34892 22 6 54 8855 1477913708 1477913820 ACCEPT OK",
317 ],
318 ),
319 (
320 Some(
321 "version vpc_id subnet_id instance_id interface_id account_id type srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr protocol bytes packets start end action tcp_flags log_status",
322 ),
323 vec![
324 "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43416 5001 52.213.180.42 10.0.0.62 6 568 8 1566848875 1566848933 ACCEPT 2 OK",
325 "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43416 10.0.0.62 52.213.180.42 6 376 7 1566848875 1566848933 ACCEPT 18 OK",
326 "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43418 5001 52.213.180.42 10.0.0.62 6 100701 70 1566848875 1566848933 ACCEPT 2 OK",
327 "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43418 10.0.0.62 52.213.180.42 6 632 12 1566848875 1566848933 ACCEPT 18 OK",
328 "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43418 10.0.0.62 52.213.180.42 6 63388 1219 1566848933 1566849113 ACCEPT 1 OK",
329 "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43418 5001 52.213.180.42 10.0.0.62 6 23294588 15774 1566848933 1566849113 ACCEPT 1 OK",
330 "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43638 5001 52.213.180.42 10.0.0.62 6 1260 17 1566933133 1566933193 ACCEPT 3 OK",
331 "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43638 10.0.0.62 52.213.180.42 6 967 14 1566933133 1566933193 ACCEPT 19 OK",
332 ],
333 ),
334 (
335 Some("instance_id interface_id srcaddr dstaddr pkt_srcaddr pkt_dstaddr"),
336 vec![
337 "- eni-1235b8ca123456789 10.0.1.5 10.0.0.220 10.0.1.5 203.0.113.5",
338 "- eni-1235b8ca123456789 10.0.0.220 203.0.113.5 10.0.0.220 203.0.113.5",
339 "- eni-1235b8ca123456789 203.0.113.5 10.0.0.220 203.0.113.5 10.0.0.220",
340 "- eni-1235b8ca123456789 10.0.0.220 10.0.1.5 203.0.113.5 10.0.1.5",
341 "i-01234567890123456 eni-1111aaaa2222bbbb3 10.0.1.5 203.0.113.5 10.0.1.5 203.0.113.5",
342 "i-01234567890123456 eni-1111aaaa2222bbbb3 203.0.113.5 10.0.1.5 203.0.113.5 10.0.1.5",
343 ],
344 ),
345 (
346 Some(
347 "version interface_id account_id vpc_id subnet_id instance_id srcaddr dstaddr srcport dstport protocol tcp_flags type pkt_srcaddr pkt_dstaddr action log_status",
348 ),
349 vec![
350 "3 eni-33333333333333333 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb i-01234567890123456 10.20.33.164 10.40.2.236 39812 80 6 3 IPv4 10.20.33.164 10.40.2.236 ACCEPT OK",
351 "3 eni-33333333333333333 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb i-01234567890123456 10.40.2.236 10.20.33.164 80 39812 6 19 IPv4 10.40.2.236 10.20.33.164 ACCEPT OK",
352 "3 eni-11111111111111111 123456789010 vpc-abcdefab012345678 subnet-11111111aaaaaaaaa - 10.40.1.175 10.40.2.236 39812 80 6 3 IPv4 10.20.33.164 10.40.2.236 ACCEPT OK",
353 "3 eni-22222222222222222 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb - 10.40.2.236 10.40.2.31 80 39812 6 19 IPv4 10.40.2.236 10.20.33.164 ACCEPT OK",
354 ],
355 ),
356 (
357 Some(
358 "version srcaddr dstaddr srcport dstport protocol start end type packets bytes account_id vpc_id subnet_id instance_id interface_id region az_id sublocation_type sublocation_id action tcp_flags pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path flow_direction log_status",
359 ),
360 vec![
361 "5 52.95.128.179 10.0.0.71 80 34210 6 1616729292 1616729349 IPv4 14 15044 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 19 52.95.128.179 10.0.0.71 S3 - - ingress OK",
362 "5 10.0.0.71 52.95.128.179 34210 80 6 1616729292 1616729349 IPv4 7 471 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 3 10.0.0.71 52.95.128.179 - S3 8 egress OK",
363 ],
364 ),
365 ];
366
367 for (format, logs) in logs {
368 for log in logs {
369 assert!(parse_log(log, format).is_ok());
370 }
371 }
372 }
373
374 test_function![
375 parse_aws_vpc_flow_log => ParseAwsVpcFlowLog;
376
377 default {
378 args: func_args![value: "2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK"],
379 want: Ok(value!({
380 "account_id": "123456789010",
381 "action": "ACCEPT",
382 "bytes": 4249,
383 "dstaddr": "172.31.16.21",
384 "dstport": 22,
385 "end": 1_418_530_070,
386 "interface_id": "eni-1235b8ca123456789",
387 "log_status": "OK",
388 "packets": 20,
389 "protocol": 6,
390 "srcaddr": "172.31.16.139",
391 "srcport": 20641,
392 "start": 1_418_530_010,
393 "version": 2
394 })),
395 tdef: TypeDef::object(inner_kind()).fallible(),
396 }
397
398 fields {
399 args: func_args![value: "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43416 5001 52.213.180.42 10.0.0.62 6 568 8 1566848875 1566848933 ACCEPT 2 OK",
400 format: "version vpc_id subnet_id instance_id interface_id account_id type srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr protocol bytes packets start end action tcp_flags log_status"],
401 want: Ok(value!({
402 "account_id": "123456789010",
403 "action": "ACCEPT",
404 "bytes": 568,
405 "dstaddr": "10.0.0.62",
406 "dstport": 5001,
407 "end": 1_566_848_933,
408 "instance_id": "i-01234567890123456",
409 "interface_id": "eni-1235b8ca123456789",
410 "log_status": "OK",
411 "packets": 8,
412 "pkt_dstaddr": "10.0.0.62",
413 "pkt_srcaddr": "52.213.180.42",
414 "protocol": 6,
415 "srcaddr": "52.213.180.42",
416 "srcport": 43416,
417 "start": 1_566_848_875,
418 "subnet_id": "subnet-aaaaaaaa012345678",
419 "tcp_flags": 2,
420 "type": "IPv4",
421 "version": 3,
422 "vpc_id": "vpc-abcdefab012345678"
423 })),
424 tdef: TypeDef::object(inner_kind()).fallible(),
425 }
426 ];
427}