vrl/stdlib/
parse_aws_cloudwatch_log_subscription_message.rs

1use crate::compiler::prelude::*;
2use chrono::{DateTime, Utc, serde::ts_milliseconds};
3use serde::Deserialize;
4use std::collections::BTreeMap;
5
6#[derive(Debug, Deserialize, Clone, Copy)]
7#[serde(rename_all = "SCREAMING_SNAKE_CASE", deny_unknown_fields)]
8enum AwsCloudWatchLogsSubscriptionMessageType {
9    ControlMessage,
10    DataMessage,
11}
12
13#[derive(Debug, Deserialize)]
14#[serde(deny_unknown_fields)]
15struct AwsCloudWatchLogEvent {
16    id: String,
17    #[serde(with = "ts_milliseconds")]
18    timestamp: DateTime<Utc>,
19    message: String,
20}
21
22#[derive(Debug, Deserialize)]
23#[serde(rename_all = "camelCase", deny_unknown_fields)]
24struct AwsCloudWatchLogsSubscriptionMessage {
25    owner: String,
26    message_type: AwsCloudWatchLogsSubscriptionMessageType,
27    log_group: String,
28    log_stream: String,
29    subscription_filters: Vec<String>,
30    log_events: Vec<AwsCloudWatchLogEvent>,
31}
32
33impl AwsCloudWatchLogsSubscriptionMessageType {
34    #[must_use]
35    fn as_str(self) -> &'static str {
36        match self {
37            AwsCloudWatchLogsSubscriptionMessageType::ControlMessage => "CONTROL_MESSAGE",
38            AwsCloudWatchLogsSubscriptionMessageType::DataMessage => "DATA_MESSAGE",
39        }
40    }
41}
42
43fn parse_aws_cloudwatch_log_subscription_message(bytes: Value) -> Resolved {
44    let bytes = bytes.try_bytes()?;
45    let message = serde_json::from_slice::<AwsCloudWatchLogsSubscriptionMessage>(&bytes)
46        .map_err(|e| format!("unable to parse: {e}"))?;
47    let map = Value::from(BTreeMap::from([
48        (KeyString::from("owner"), Value::from(message.owner)),
49        (
50            KeyString::from("message_type"),
51            Value::from(message.message_type.as_str()),
52        ),
53        (KeyString::from("log_group"), Value::from(message.log_group)),
54        (
55            KeyString::from("log_stream"),
56            Value::from(message.log_stream),
57        ),
58        (
59            KeyString::from("subscription_filters"),
60            Value::from(message.subscription_filters),
61        ),
62        (
63            KeyString::from("log_events"),
64            Value::Array(
65                message
66                    .log_events
67                    .into_iter()
68                    .map(|event| {
69                        Value::from(BTreeMap::from([
70                            (KeyString::from("id"), Value::from(event.id)),
71                            (KeyString::from("timestamp"), Value::from(event.timestamp)),
72                            (KeyString::from("message"), Value::from(event.message)),
73                        ]))
74                    })
75                    .collect::<Vec<Value>>(),
76            ),
77        ),
78    ]));
79    Ok(map)
80}
81
82#[derive(Clone, Copy, Debug)]
83pub struct ParseAwsCloudWatchLogSubscriptionMessage;
84
85impl Function for ParseAwsCloudWatchLogSubscriptionMessage {
86    fn identifier(&self) -> &'static str {
87        "parse_aws_cloudwatch_log_subscription_message"
88    }
89
90    fn usage(&self) -> &'static str {
91        "Parses AWS CloudWatch Logs events (configured through AWS Cloudwatch subscriptions) from the `aws_kinesis_firehose` source."
92    }
93
94    fn category(&self) -> &'static str {
95        Category::Parse.as_ref()
96    }
97
98    fn internal_failure_reasons(&self) -> &'static [&'static str] {
99        &["`value` is not a properly formatted AWS CloudWatch Log subscription message."]
100    }
101
102    fn return_kind(&self) -> u16 {
103        kind::OBJECT
104    }
105
106    fn examples(&self) -> &'static [Example] {
107        &[example! {
108            title: "Parse AWS Cloudwatch Log subscription message",
109            source: indoc! {r#"
110                parse_aws_cloudwatch_log_subscription_message!(s'{
111                    "messageType": "DATA_MESSAGE",
112                    "owner": "111111111111",
113                    "logGroup": "test",
114                    "logStream": "test",
115                    "subscriptionFilters": [
116                        "Destination"
117                    ],
118                    "logEvents": [
119                        {
120                            "id": "35683658089614582423604394983260738922885519999578275840",
121                            "timestamp": 1600110569039,
122                            "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41-0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
123                        }
124                    ]
125                }')
126            "#},
127            result: Ok(indoc! {r#"{
128                "log_events": [{
129                    "id": "35683658089614582423604394983260738922885519999578275840",
130                    "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41-0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}",
131                    "timestamp": "2020-09-14T19:09:29.039Z"}
132                ],
133                "log_group": "test",
134                "log_stream": "test",
135                "message_type": "DATA_MESSAGE",
136                "owner": "111111111111",
137                "subscription_filters": ["Destination"]
138            }"#}),
139        }]
140    }
141
142    fn compile(
143        &self,
144        _state: &state::TypeState,
145        _ctx: &mut FunctionCompileContext,
146        arguments: ArgumentList,
147    ) -> Compiled {
148        let value = arguments.required("value");
149
150        Ok(ParseAwsCloudWatchLogSubscriptionMessageFn { value }.as_expr())
151    }
152
153    fn parameters(&self) -> &'static [Parameter] {
154        const PARAMETERS: &[Parameter] = &[Parameter::required(
155            "value",
156            kind::BYTES,
157            "The string representation of the message to parse.",
158        )];
159        PARAMETERS
160    }
161}
162
163#[derive(Debug, Clone)]
164struct ParseAwsCloudWatchLogSubscriptionMessageFn {
165    value: Box<dyn Expression>,
166}
167
168impl FunctionExpression for ParseAwsCloudWatchLogSubscriptionMessageFn {
169    fn resolve(&self, ctx: &mut Context) -> Resolved {
170        let bytes = self.value.resolve(ctx)?;
171        parse_aws_cloudwatch_log_subscription_message(bytes)
172    }
173
174    fn type_def(&self, _: &state::TypeState) -> TypeDef {
175        TypeDef::object(inner_kind()).fallible(/* message parsing error */)
176    }
177}
178
179fn inner_kind() -> BTreeMap<Field, Kind> {
180    BTreeMap::from([
181        (Field::from("owner"), Kind::bytes()),
182        (Field::from("message_type"), Kind::bytes()),
183        (Field::from("log_group"), Kind::bytes()),
184        (Field::from("log_stream"), Kind::bytes()),
185        (
186            Field::from("subscription_filters"),
187            Kind::array({
188                let mut v = Collection::any();
189                v.set_unknown(Kind::bytes());
190                v
191            }),
192        ),
193        (
194            Field::from("log_events"),
195            Kind::array(Collection::from_unknown(Kind::object(BTreeMap::from([
196                (Field::from("id"), Kind::bytes()),
197                (Field::from("timestamp"), Kind::timestamp()),
198                (Field::from("message"), Kind::bytes()),
199            ])))),
200        ),
201    ])
202}
203
204#[cfg(test)]
205mod tests {
206    use chrono::{TimeZone, Utc};
207
208    use super::*;
209
210    test_function![
211        parse_aws_cloudwatch_log_subscription_message => ParseAwsCloudWatchLogSubscriptionMessage;
212
213        invalid_type {
214            args: func_args![value: "42"],
215            want: Err("unable to parse: invalid type: integer `42`, expected struct AwsCloudWatchLogsSubscriptionMessage at line 1 column 2"),
216            tdef: TypeDef::object(inner_kind()).fallible(),
217        }
218
219        string {
220            args: func_args![value: r#"
221     {
222         "messageType": "DATA_MESSAGE",
223         "owner": "071959437513",
224         "logGroup": "/jesse/test",
225         "logStream": "test",
226         "subscriptionFilters": [
227         "Destination"
228         ],
229         "logEvents": [
230         {
231             "id": "35683658089614582423604394983260738922885519999578275840",
232             "timestamp": 1600110569039,
233             "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
234         },
235         {
236             "id": "35683658089659183914001456229543810359430816722590236673",
237             "timestamp": 1600110569041,
238             "message": "{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}"
239         }
240         ]
241     }
242     "#],
243            want: Ok(Value::from_iter([
244                (String::from("owner"), Value::from("071959437513")),
245                (String::from("message_type"), Value::from("DATA_MESSAGE")),
246                (String::from("log_group"), Value::from("/jesse/test")),
247                (String::from("log_stream"), Value::from("test")),
248                (String::from("subscription_filters"), Value::from(vec![Value::from("Destination")])),
249                (String::from("log_events"), Value::from(vec![
250                    Value::from_iter([
251                        (String::from("id"), Value::from( "35683658089614582423604394983260738922885519999578275840")),
252                        (String::from("timestamp"), Value::from(Utc.timestamp_opt(1_600_110_569, 39_000_000).single().expect("invalid timestamp"))),
253                        (String::from("message"), Value::from("{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}")),
254                    ]),
255                    Value::from_iter([
256                        (String::from("id"), Value::from("35683658089659183914001456229543810359430816722590236673")),
257                        (String::from("timestamp"), Value::from(Utc.timestamp_opt(1_600_110_569, 41_000_000).single().expect("invalid timestamp"))),
258                        (String::from("message"), Value::from("{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}")),
259                    ])
260                ])),
261            ])),
262            tdef: TypeDef::object(inner_kind()).fallible(),
263        }
264
265        invalid_value {
266            args: func_args![value: "{ INVALID }"],
267            want: Err("unable to parse: key must be a string at line 1 column 3"),
268            tdef: TypeDef::object(inner_kind()).fallible(),
269        }
270    ];
271}