1use crate::compiler::prelude::*;
2use chrono::{DateTime, Utc, serde::ts_milliseconds};
3use serde::Deserialize;
4use std::collections::BTreeMap;
5
6#[derive(Debug, Deserialize, Clone, Copy)]
7#[serde(rename_all = "SCREAMING_SNAKE_CASE", deny_unknown_fields)]
8enum AwsCloudWatchLogsSubscriptionMessageType {
9 ControlMessage,
10 DataMessage,
11}
12
13#[derive(Debug, Deserialize)]
14#[serde(deny_unknown_fields)]
15struct AwsCloudWatchLogEvent {
16 id: String,
17 #[serde(with = "ts_milliseconds")]
18 timestamp: DateTime<Utc>,
19 message: String,
20}
21
22#[derive(Debug, Deserialize)]
23#[serde(rename_all = "camelCase", deny_unknown_fields)]
24struct AwsCloudWatchLogsSubscriptionMessage {
25 owner: String,
26 message_type: AwsCloudWatchLogsSubscriptionMessageType,
27 log_group: String,
28 log_stream: String,
29 subscription_filters: Vec<String>,
30 log_events: Vec<AwsCloudWatchLogEvent>,
31}
32
33impl AwsCloudWatchLogsSubscriptionMessageType {
34 #[must_use]
35 fn as_str(self) -> &'static str {
36 match self {
37 AwsCloudWatchLogsSubscriptionMessageType::ControlMessage => "CONTROL_MESSAGE",
38 AwsCloudWatchLogsSubscriptionMessageType::DataMessage => "DATA_MESSAGE",
39 }
40 }
41}
42
43fn parse_aws_cloudwatch_log_subscription_message(bytes: Value) -> Resolved {
44 let bytes = bytes.try_bytes()?;
45 let message = serde_json::from_slice::<AwsCloudWatchLogsSubscriptionMessage>(&bytes)
46 .map_err(|e| format!("unable to parse: {e}"))?;
47 let map = Value::from(BTreeMap::from([
48 (KeyString::from("owner"), Value::from(message.owner)),
49 (
50 KeyString::from("message_type"),
51 Value::from(message.message_type.as_str()),
52 ),
53 (KeyString::from("log_group"), Value::from(message.log_group)),
54 (
55 KeyString::from("log_stream"),
56 Value::from(message.log_stream),
57 ),
58 (
59 KeyString::from("subscription_filters"),
60 Value::from(message.subscription_filters),
61 ),
62 (
63 KeyString::from("log_events"),
64 Value::Array(
65 message
66 .log_events
67 .into_iter()
68 .map(|event| {
69 Value::from(BTreeMap::from([
70 (KeyString::from("id"), Value::from(event.id)),
71 (KeyString::from("timestamp"), Value::from(event.timestamp)),
72 (KeyString::from("message"), Value::from(event.message)),
73 ]))
74 })
75 .collect::<Vec<Value>>(),
76 ),
77 ),
78 ]));
79 Ok(map)
80}
81
82#[derive(Clone, Copy, Debug)]
83pub struct ParseAwsCloudWatchLogSubscriptionMessage;
84
85impl Function for ParseAwsCloudWatchLogSubscriptionMessage {
86 fn identifier(&self) -> &'static str {
87 "parse_aws_cloudwatch_log_subscription_message"
88 }
89
90 fn usage(&self) -> &'static str {
91 "Parses AWS CloudWatch Logs events (configured through AWS Cloudwatch subscriptions) from the `aws_kinesis_firehose` source."
92 }
93
94 fn category(&self) -> &'static str {
95 Category::Parse.as_ref()
96 }
97
98 fn internal_failure_reasons(&self) -> &'static [&'static str] {
99 &["`value` is not a properly formatted AWS CloudWatch Log subscription message."]
100 }
101
102 fn return_kind(&self) -> u16 {
103 kind::OBJECT
104 }
105
106 fn examples(&self) -> &'static [Example] {
107 &[example! {
108 title: "Parse AWS Cloudwatch Log subscription message",
109 source: indoc! {r#"
110 parse_aws_cloudwatch_log_subscription_message!(s'{
111 "messageType": "DATA_MESSAGE",
112 "owner": "111111111111",
113 "logGroup": "test",
114 "logStream": "test",
115 "subscriptionFilters": [
116 "Destination"
117 ],
118 "logEvents": [
119 {
120 "id": "35683658089614582423604394983260738922885519999578275840",
121 "timestamp": 1600110569039,
122 "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41-0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
123 }
124 ]
125 }')
126 "#},
127 result: Ok(indoc! {r#"{
128 "log_events": [{
129 "id": "35683658089614582423604394983260738922885519999578275840",
130 "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41-0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}",
131 "timestamp": "2020-09-14T19:09:29.039Z"}
132 ],
133 "log_group": "test",
134 "log_stream": "test",
135 "message_type": "DATA_MESSAGE",
136 "owner": "111111111111",
137 "subscription_filters": ["Destination"]
138 }"#}),
139 }]
140 }
141
142 fn compile(
143 &self,
144 _state: &state::TypeState,
145 _ctx: &mut FunctionCompileContext,
146 arguments: ArgumentList,
147 ) -> Compiled {
148 let value = arguments.required("value");
149
150 Ok(ParseAwsCloudWatchLogSubscriptionMessageFn { value }.as_expr())
151 }
152
153 fn parameters(&self) -> &'static [Parameter] {
154 const PARAMETERS: &[Parameter] = &[Parameter::required(
155 "value",
156 kind::BYTES,
157 "The string representation of the message to parse.",
158 )];
159 PARAMETERS
160 }
161}
162
163#[derive(Debug, Clone)]
164struct ParseAwsCloudWatchLogSubscriptionMessageFn {
165 value: Box<dyn Expression>,
166}
167
168impl FunctionExpression for ParseAwsCloudWatchLogSubscriptionMessageFn {
169 fn resolve(&self, ctx: &mut Context) -> Resolved {
170 let bytes = self.value.resolve(ctx)?;
171 parse_aws_cloudwatch_log_subscription_message(bytes)
172 }
173
174 fn type_def(&self, _: &state::TypeState) -> TypeDef {
175 TypeDef::object(inner_kind()).fallible()
176 }
177}
178
179fn inner_kind() -> BTreeMap<Field, Kind> {
180 BTreeMap::from([
181 (Field::from("owner"), Kind::bytes()),
182 (Field::from("message_type"), Kind::bytes()),
183 (Field::from("log_group"), Kind::bytes()),
184 (Field::from("log_stream"), Kind::bytes()),
185 (
186 Field::from("subscription_filters"),
187 Kind::array({
188 let mut v = Collection::any();
189 v.set_unknown(Kind::bytes());
190 v
191 }),
192 ),
193 (
194 Field::from("log_events"),
195 Kind::array(Collection::from_unknown(Kind::object(BTreeMap::from([
196 (Field::from("id"), Kind::bytes()),
197 (Field::from("timestamp"), Kind::timestamp()),
198 (Field::from("message"), Kind::bytes()),
199 ])))),
200 ),
201 ])
202}
203
204#[cfg(test)]
205mod tests {
206 use chrono::{TimeZone, Utc};
207
208 use super::*;
209
210 test_function![
211 parse_aws_cloudwatch_log_subscription_message => ParseAwsCloudWatchLogSubscriptionMessage;
212
213 invalid_type {
214 args: func_args![value: "42"],
215 want: Err("unable to parse: invalid type: integer `42`, expected struct AwsCloudWatchLogsSubscriptionMessage at line 1 column 2"),
216 tdef: TypeDef::object(inner_kind()).fallible(),
217 }
218
219 string {
220 args: func_args![value: r#"
221 {
222 "messageType": "DATA_MESSAGE",
223 "owner": "071959437513",
224 "logGroup": "/jesse/test",
225 "logStream": "test",
226 "subscriptionFilters": [
227 "Destination"
228 ],
229 "logEvents": [
230 {
231 "id": "35683658089614582423604394983260738922885519999578275840",
232 "timestamp": 1600110569039,
233 "message": "{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}"
234 },
235 {
236 "id": "35683658089659183914001456229543810359430816722590236673",
237 "timestamp": 1600110569041,
238 "message": "{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}"
239 }
240 ]
241 }
242 "#],
243 want: Ok(Value::from_iter([
244 (String::from("owner"), Value::from("071959437513")),
245 (String::from("message_type"), Value::from("DATA_MESSAGE")),
246 (String::from("log_group"), Value::from("/jesse/test")),
247 (String::from("log_stream"), Value::from("test")),
248 (String::from("subscription_filters"), Value::from(vec![Value::from("Destination")])),
249 (String::from("log_events"), Value::from(vec![
250 Value::from_iter([
251 (String::from("id"), Value::from( "35683658089614582423604394983260738922885519999578275840")),
252 (String::from("timestamp"), Value::from(Utc.timestamp_opt(1_600_110_569, 39_000_000).single().expect("invalid timestamp"))),
253 (String::from("message"), Value::from("{\"bytes\":26780,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"157.130.216.193\",\"method\":\"PUT\",\"protocol\":\"HTTP/1.0\",\"referer\":\"https://www.principalcross-platform.io/markets/ubiquitous\",\"request\":\"/expedite/convergence\",\"source_type\":\"stdin\",\"status\":301,\"user-identifier\":\"-\"}")),
254 ]),
255 Value::from_iter([
256 (String::from("id"), Value::from("35683658089659183914001456229543810359430816722590236673")),
257 (String::from("timestamp"), Value::from(Utc.timestamp_opt(1_600_110_569, 41_000_000).single().expect("invalid timestamp"))),
258 (String::from("message"), Value::from("{\"bytes\":17707,\"datetime\":\"14/Sep/2020:11:45:41 -0400\",\"host\":\"109.81.244.252\",\"method\":\"GET\",\"protocol\":\"HTTP/2.0\",\"referer\":\"http://www.investormission-critical.io/24/7/vortals\",\"request\":\"/scale/functionalities/optimize\",\"source_type\":\"stdin\",\"status\":502,\"user-identifier\":\"feeney1708\"}")),
259 ])
260 ])),
261 ])),
262 tdef: TypeDef::object(inner_kind()).fallible(),
263 }
264
265 invalid_value {
266 args: func_args![value: "{ INVALID }"],
267 want: Err("unable to parse: key must be a string at line 1 column 3"),
268 tdef: TypeDef::object(inner_kind()).fallible(),
269 }
270 ];
271}