vrl/stdlib/
parse_aws_vpc_flow_log.rs

1use crate::compiler::prelude::*;
2use std::collections::BTreeMap;
3
4fn parse_aws_vpc_flow_log(value: Value, format: Option<Value>) -> Resolved {
5    let bytes = value.try_bytes()?;
6    let input = String::from_utf8_lossy(&bytes);
7    if let Some(expr) = format {
8        let bytes = expr.try_bytes()?;
9        parse_log(&input, Some(&String::from_utf8_lossy(&bytes)))
10    } else {
11        parse_log(&input, None)
12    }
13    .map_err(Into::into)
14}
15
16#[derive(Clone, Copy, Debug)]
17pub struct ParseAwsVpcFlowLog;
18
19impl Function for ParseAwsVpcFlowLog {
20    fn identifier(&self) -> &'static str {
21        "parse_aws_vpc_flow_log"
22    }
23
24    fn usage(&self) -> &'static str {
25        "Parses `value` in the [VPC Flow Logs format](https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs.html)."
26    }
27
28    fn category(&self) -> &'static str {
29        Category::Parse.as_ref()
30    }
31
32    fn internal_failure_reasons(&self) -> &'static [&'static str] {
33        &["`value` is not a properly formatted AWS VPC Flow log."]
34    }
35
36    fn return_kind(&self) -> u16 {
37        kind::OBJECT
38    }
39
40    fn examples(&self) -> &'static [Example] {
41        &[
42            example! {
43                title: "Parse AWS VPC Flow log (default format)",
44                source: r#"parse_aws_vpc_flow_log!("2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA")"#,
45                result: Ok(indoc! { r#"{
46                    "version": 2,
47                    "account_id": "123456789010",
48                    "interface_id": "eni-1235b8ca123456789",
49                    "srcaddr": null,
50                    "dstaddr": null,
51                    "srcport": null,
52                    "dstport": null,
53                    "protocol": null,
54                    "packets": null,
55                    "bytes": null,
56                    "start": 1431280876,
57                    "end": 1431280934,
58                    "action": null,
59                    "log_status":"NODATA"
60                }"# }),
61            },
62            example! {
63                title: "Parse AWS VPC Flow log (custom format)",
64                source: indoc! {r#"
65                    parse_aws_vpc_flow_log!(
66                        "- eni-1235b8ca123456789 10.0.1.5 10.0.0.220 10.0.1.5 203.0.113.5",
67                        "instance_id interface_id srcaddr dstaddr pkt_srcaddr pkt_dstaddr"
68                    )
69                "#},
70                result: Ok(indoc! { r#"{
71                    "instance_id": null,
72                    "interface_id": "eni-1235b8ca123456789",
73                    "srcaddr": "10.0.1.5",
74                    "dstaddr": "10.0.0.220",
75                    "pkt_srcaddr": "10.0.1.5",
76                    "pkt_dstaddr": "203.0.113.5"
77                }"# }),
78            },
79            example! {
80                title: "Parse AWS VPC Flow log including v5 fields",
81                source: indoc! {r#"
82                    parse_aws_vpc_flow_log!(
83                        "5 52.95.128.179 10.0.0.71 80 34210 6 1616729292 1616729349 IPv4 14 15044 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 19 52.95.128.179 10.0.0.71 S3 - - ingress OK",
84                        format: "version srcaddr dstaddr srcport dstport protocol start end type packets bytes account_id vpc_id subnet_id instance_id interface_id region az_id sublocation_type sublocation_id action tcp_flags pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path flow_direction log_status"
85                    )
86                "#},
87                result: Ok(indoc! { r#"{
88                    "account_id": "123456789012",
89                    "action": "ACCEPT",
90                    "az_id": "apse2-az3",
91                    "bytes": 15044,
92                    "dstaddr": "10.0.0.71",
93                    "dstport": 34210,
94                    "end": 1616729349,
95                    "flow_direction": "ingress",
96                    "instance_id": "i-0c50d5961bcb2d47b",
97                    "interface_id": "eni-1235b8ca123456789",
98                    "log_status": "OK",
99                    "packets": 14,
100                    "pkt_dst_aws_service": null,
101                    "pkt_dstaddr": "10.0.0.71",
102                    "pkt_src_aws_service": "S3",
103                    "pkt_srcaddr": "52.95.128.179",
104                    "protocol": 6,
105                    "region": "ap-southeast-2",
106                    "srcaddr": "52.95.128.179",
107                    "srcport": 80,
108                    "start": 1616729292,
109                    "sublocation_id": null,
110                    "sublocation_type": null,
111                    "subnet_id": "subnet-aaaaaaaa012345678",
112                    "tcp_flags": 19,
113                    "traffic_path": null,
114                    "type": "IPv4",
115                    "version": 5,
116                    "vpc_id": "vpc-abcdefab012345678"
117                }"# }),
118            },
119        ]
120    }
121
122    fn compile(
123        &self,
124        _state: &state::TypeState,
125        _ctx: &mut FunctionCompileContext,
126        arguments: ArgumentList,
127    ) -> Compiled {
128        let value = arguments.required("value");
129        let format = arguments.optional("format");
130
131        Ok(ParseAwsVpcFlowLogFn::new(value, format).as_expr())
132    }
133
134    fn parameters(&self) -> &'static [Parameter] {
135        const PARAMETERS: &[Parameter] = &[
136            Parameter::required("value", kind::BYTES, "VPC Flow Log."),
137            Parameter::optional("format", kind::BYTES, "VPC Flow Log format."),
138        ];
139        PARAMETERS
140    }
141}
142
143#[derive(Debug, Clone)]
144struct ParseAwsVpcFlowLogFn {
145    value: Box<dyn Expression>,
146    format: Option<Box<dyn Expression>>,
147}
148
149impl ParseAwsVpcFlowLogFn {
150    fn new(value: Box<dyn Expression>, format: Option<Box<dyn Expression>>) -> Self {
151        Self { value, format }
152    }
153}
154
155impl FunctionExpression for ParseAwsVpcFlowLogFn {
156    fn resolve(&self, ctx: &mut Context) -> Resolved {
157        let value = self.value.resolve(ctx)?;
158        let format = self
159            .format
160            .as_ref()
161            .map(|expr| expr.resolve(ctx))
162            .transpose()?;
163
164        parse_aws_vpc_flow_log(value, format)
165    }
166
167    fn type_def(&self, _: &state::TypeState) -> TypeDef {
168        TypeDef::object(inner_kind()).fallible(/* log parsing error */)
169    }
170}
171
172fn inner_kind() -> BTreeMap<Field, Kind> {
173    BTreeMap::from([
174        (Field::from("account_id"), Kind::bytes() | Kind::null()),
175        (Field::from("action"), Kind::bytes() | Kind::null()),
176        (Field::from("az_id"), Kind::bytes() | Kind::null()),
177        (Field::from("bytes"), Kind::integer() | Kind::null()),
178        (Field::from("dstaddr"), Kind::bytes() | Kind::null()),
179        (Field::from("dstport"), Kind::integer() | Kind::null()),
180        (Field::from("end"), Kind::integer() | Kind::null()),
181        (Field::from("flow_direction"), Kind::bytes() | Kind::null()),
182        (Field::from("instance_id"), Kind::bytes() | Kind::null()),
183        (Field::from("interface_id"), Kind::bytes() | Kind::null()),
184        (Field::from("log_status"), Kind::bytes() | Kind::null()),
185        (Field::from("packets"), Kind::integer() | Kind::null()),
186        (Field::from("pkt_dstaddr"), Kind::bytes() | Kind::null()),
187        (
188            Field::from("pkt_dst_aws_service"),
189            Kind::bytes() | Kind::null(),
190        ),
191        (Field::from("pkt_srcaddr"), Kind::bytes() | Kind::null()),
192        (
193            Field::from("pkt_src_aws_service"),
194            Kind::bytes() | Kind::null(),
195        ),
196        (Field::from("protocol"), Kind::integer() | Kind::null()),
197        (Field::from("region"), Kind::bytes() | Kind::null()),
198        (Field::from("srcaddr"), Kind::bytes() | Kind::null()),
199        (Field::from("srcport"), Kind::integer() | Kind::null()),
200        (Field::from("start"), Kind::integer() | Kind::null()),
201        (Field::from("sublocation_id"), Kind::bytes() | Kind::null()),
202        (
203            Field::from("sublocation_type"),
204            Kind::bytes() | Kind::null(),
205        ),
206        (Field::from("subnet_id"), Kind::bytes() | Kind::null()),
207        (Field::from("tcp_flags"), Kind::integer() | Kind::null()),
208        (Field::from("traffic_path"), Kind::integer() | Kind::null()),
209        (Field::from("type"), Kind::bytes() | Kind::null()),
210        (Field::from("version"), Kind::integer() | Kind::null()),
211        (Field::from("vpc_id"), Kind::bytes() | Kind::null()),
212    ])
213}
214
215type ParseResult<T> = std::result::Result<T, String>;
216
217#[allow(clippy::unnecessary_wraps)] // match other parse methods
218fn identity<'a>(_key: &'a str, value: &'a str) -> ParseResult<&'a str> {
219    Ok(value)
220}
221
222fn parse_i64(key: &str, value: &str) -> ParseResult<i64> {
223    value
224        .parse()
225        .map_err(|_| format!("failed to parse value as i64 (key: `{key}`): `{value}`"))
226}
227
228macro_rules! create_match {
229    ($log:expr_2021, $key:expr_2021, $value:expr_2021, $($name:expr_2021 => $transform:expr_2021),+) => {
230        match $key {
231            $($name => {
232                let value = match $value {
233                    "-" => Value::Null,
234                    value => $transform($name, value)?.into(),
235                };
236                if $log.insert($name.into(), value).is_some() {
237                    return Err(format!("value already exists for key: `{}`", $key));
238                }
239            })+
240            key => return Err(format!("unknown key: `{}`", key))
241        };
242    };
243}
244
245fn parse_log(input: &str, format: Option<&str>) -> ParseResult<Value> {
246    let mut log = BTreeMap::new();
247
248    let mut input = input.split(' ');
249    let mut format = format
250        .unwrap_or("version account_id interface_id srcaddr dstaddr srcport dstport protocol packets bytes start end action log_status")
251        .split(' ');
252
253    loop {
254        return match (format.next(), input.next()) {
255            (Some(key), Some(value)) => {
256                create_match!(
257                    log, key, value,
258                    "account_id" => identity,
259                    "action" => identity,
260                    "az_id" => identity,
261                    "bytes" => parse_i64,
262                    "dstaddr" => identity,
263                    "dstport" => parse_i64,
264                    "end" => parse_i64,
265                    "flow_direction" => identity,
266                    "instance_id" => identity,
267                    "interface_id" => identity,
268                    "log_status" => identity,
269                    "packets" => parse_i64,
270                    "pkt_dstaddr" => identity,
271                    "pkt_dst_aws_service" => identity,
272                    "pkt_srcaddr" => identity,
273                    "pkt_src_aws_service" => identity,
274                    "protocol" => parse_i64,
275                    "region" => identity,
276                    "srcaddr" => identity,
277                    "srcport" => parse_i64,
278                    "start" => parse_i64,
279                    "sublocation_id" => identity,
280                    "sublocation_type" => identity,
281                    "subnet_id" => identity,
282                    "tcp_flags" => parse_i64,
283                    "traffic_path" => parse_i64,
284                    "type" => identity,
285                    "version" => parse_i64,
286                    "vpc_id" => identity
287                );
288
289                continue;
290            }
291            (None, Some(value)) => Err(format!("no key for value: `{value}`")),
292            (Some(key), None) => Err(format!("no item for key: `{key}`")),
293            (None, None) => Ok(log.into()),
294        };
295    }
296}
297
298#[cfg(test)]
299mod tests {
300    use super::*;
301    use crate::value;
302
303    #[test]
304    fn parse_aws_vpc_flow_log() {
305        // Examples from https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-records-examples.html
306        let logs = vec![
307            (
308                None,
309                vec![
310                    "2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK",
311                    "2 123456789010 eni-1235b8ca123456789 172.31.9.69 172.31.9.12 49761 3389 6 20 4249 1418530010 1418530070 REJECT OK",
312                    "2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA",
313                    "2 123456789010 eni-11111111aaaaaaaaa - - - - - - - 1431280876 1431280934 - SKIPDATA",
314                    "2 123456789010 eni-1235b8ca123456789 203.0.113.12 172.31.16.139 0 0 1 4 336 1432917027 1432917142 ACCEPT OK",
315                    "2 123456789010 eni-1235b8ca123456789 172.31.16.139 203.0.113.12 0 0 1 4 336 1432917094 1432917142 REJECT OK",
316                    "2 123456789010 eni-1235b8ca123456789 2001:db8:1234:a100:8d6e:3477:df66:f105 2001:db8:1234:a102:3304:8879:34cf:4071 34892 22 6 54 8855 1477913708 1477913820 ACCEPT OK",
317                ],
318            ),
319            (
320                Some(
321                    "version vpc_id subnet_id instance_id interface_id account_id type srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr protocol bytes packets start end action tcp_flags log_status",
322                ),
323                vec![
324                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43416 5001 52.213.180.42 10.0.0.62 6 568 8 1566848875 1566848933 ACCEPT 2 OK",
325                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43416 10.0.0.62 52.213.180.42 6 376 7 1566848875 1566848933 ACCEPT 18 OK",
326                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43418 5001 52.213.180.42 10.0.0.62 6 100701 70 1566848875 1566848933 ACCEPT 2 OK",
327                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43418 10.0.0.62 52.213.180.42 6 632 12 1566848875 1566848933 ACCEPT 18 OK",
328                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43418 10.0.0.62 52.213.180.42 6 63388 1219 1566848933 1566849113 ACCEPT 1 OK",
329                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43418 5001 52.213.180.42 10.0.0.62 6 23294588 15774 1566848933 1566849113 ACCEPT 1 OK",
330                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43638 5001 52.213.180.42 10.0.0.62 6 1260 17 1566933133 1566933193 ACCEPT 3 OK",
331                    "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 10.0.0.62 52.213.180.42 5001 43638 10.0.0.62 52.213.180.42 6 967 14 1566933133 1566933193 ACCEPT 19 OK",
332                ],
333            ),
334            (
335                Some("instance_id interface_id srcaddr dstaddr pkt_srcaddr pkt_dstaddr"),
336                vec![
337                    "- eni-1235b8ca123456789 10.0.1.5 10.0.0.220 10.0.1.5 203.0.113.5",
338                    "- eni-1235b8ca123456789 10.0.0.220 203.0.113.5 10.0.0.220 203.0.113.5",
339                    "- eni-1235b8ca123456789 203.0.113.5 10.0.0.220 203.0.113.5 10.0.0.220",
340                    "- eni-1235b8ca123456789 10.0.0.220 10.0.1.5 203.0.113.5 10.0.1.5",
341                    "i-01234567890123456 eni-1111aaaa2222bbbb3 10.0.1.5 203.0.113.5 10.0.1.5 203.0.113.5",
342                    "i-01234567890123456 eni-1111aaaa2222bbbb3 203.0.113.5 10.0.1.5 203.0.113.5 10.0.1.5",
343                ],
344            ),
345            (
346                Some(
347                    "version interface_id account_id vpc_id subnet_id instance_id srcaddr dstaddr srcport dstport protocol tcp_flags type pkt_srcaddr pkt_dstaddr action log_status",
348                ),
349                vec![
350                    "3 eni-33333333333333333 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb i-01234567890123456 10.20.33.164 10.40.2.236 39812 80 6 3 IPv4 10.20.33.164 10.40.2.236 ACCEPT OK",
351                    "3 eni-33333333333333333 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb i-01234567890123456 10.40.2.236 10.20.33.164 80 39812 6 19 IPv4 10.40.2.236 10.20.33.164 ACCEPT OK",
352                    "3 eni-11111111111111111 123456789010 vpc-abcdefab012345678 subnet-11111111aaaaaaaaa - 10.40.1.175 10.40.2.236 39812 80 6 3 IPv4 10.20.33.164 10.40.2.236 ACCEPT OK",
353                    "3 eni-22222222222222222 123456789010 vpc-abcdefab012345678 subnet-22222222bbbbbbbbb - 10.40.2.236 10.40.2.31 80 39812 6 19 IPv4 10.40.2.236 10.20.33.164 ACCEPT OK",
354                ],
355            ),
356            (
357                Some(
358                    "version srcaddr dstaddr srcport dstport protocol start end type packets bytes account_id vpc_id subnet_id instance_id interface_id region az_id sublocation_type sublocation_id action tcp_flags pkt_srcaddr pkt_dstaddr pkt_src_aws_service pkt_dst_aws_service traffic_path flow_direction log_status",
359                ),
360                vec![
361                    "5 52.95.128.179 10.0.0.71 80 34210 6 1616729292 1616729349 IPv4 14 15044 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 19 52.95.128.179 10.0.0.71 S3 - - ingress OK",
362                    "5 10.0.0.71 52.95.128.179 34210 80 6 1616729292 1616729349 IPv4 7 471 123456789012 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-0c50d5961bcb2d47b eni-1235b8ca123456789 ap-southeast-2 apse2-az3 - - ACCEPT 3 10.0.0.71 52.95.128.179 - S3 8 egress OK",
363                ],
364            ),
365        ];
366
367        for (format, logs) in logs {
368            for log in logs {
369                assert!(parse_log(log, format).is_ok());
370            }
371        }
372    }
373
374    test_function![
375        parse_aws_vpc_flow_log => ParseAwsVpcFlowLog;
376
377        default {
378             args: func_args![value: "2 123456789010 eni-1235b8ca123456789 172.31.16.139 172.31.16.21 20641 22 6 20 4249 1418530010 1418530070 ACCEPT OK"],
379             want: Ok(value!({
380                 "account_id": "123456789010",
381                 "action": "ACCEPT",
382                 "bytes": 4249,
383                 "dstaddr": "172.31.16.21",
384                 "dstport": 22,
385                 "end": 1_418_530_070,
386                 "interface_id": "eni-1235b8ca123456789",
387                 "log_status": "OK",
388                 "packets": 20,
389                 "protocol": 6,
390                 "srcaddr": "172.31.16.139",
391                 "srcport": 20641,
392                 "start": 1_418_530_010,
393                 "version": 2
394             })),
395             tdef: TypeDef::object(inner_kind()).fallible(),
396         }
397
398        fields {
399             args: func_args![value: "3 vpc-abcdefab012345678 subnet-aaaaaaaa012345678 i-01234567890123456 eni-1235b8ca123456789 123456789010 IPv4 52.213.180.42 10.0.0.62 43416 5001 52.213.180.42 10.0.0.62 6 568 8 1566848875 1566848933 ACCEPT 2 OK",
400                              format: "version vpc_id subnet_id instance_id interface_id account_id type srcaddr dstaddr srcport dstport pkt_srcaddr pkt_dstaddr protocol bytes packets start end action tcp_flags log_status"],
401             want: Ok(value!({
402                 "account_id": "123456789010",
403                 "action": "ACCEPT",
404                 "bytes": 568,
405                 "dstaddr": "10.0.0.62",
406                 "dstport": 5001,
407                 "end": 1_566_848_933,
408                 "instance_id": "i-01234567890123456",
409                 "interface_id": "eni-1235b8ca123456789",
410                 "log_status": "OK",
411                 "packets": 8,
412                 "pkt_dstaddr": "10.0.0.62",
413                 "pkt_srcaddr": "52.213.180.42",
414                 "protocol": 6,
415                 "srcaddr": "52.213.180.42",
416                 "srcport": 43416,
417                 "start": 1_566_848_875,
418                 "subnet_id": "subnet-aaaaaaaa012345678",
419                 "tcp_flags": 2,
420                 "type": "IPv4",
421                 "version": 3,
422                 "vpc_id": "vpc-abcdefab012345678"
423             })),
424             tdef: TypeDef::object(inner_kind()).fallible(),
425         }
426    ];
427}