1use std::{
2 collections::{HashMap, HashSet},
3 fmt::Debug,
4 num::{ParseFloatError, ParseIntError},
5};
6
7use bytes::Bytes;
8use chrono::{DateTime, LocalResult, ParseError as ChronoParseError, TimeZone as _, Utc};
9use ordered_float::NotNan;
10use snafu::{ResultExt, Snafu};
11
12use super::datetime::{TimeZone, datetime_to_utc};
13
14#[cfg(test)]
15mod tests;
16
17#[allow(clippy::module_name_repetitions)]
18#[derive(Debug, Snafu)]
19pub enum ConversionError {
20 #[snafu(display("Unknown conversion name {:?}", name))]
21 UnknownConversion { name: String },
22}
23
24#[derive(Clone, Debug)]
28pub enum Conversion {
29 Bytes,
30 Integer,
31 Float,
32 Boolean,
33 Timestamp(TimeZone),
34 TimestampFmt(String, TimeZone),
35 TimestampTzFmt(String),
36}
37
38#[derive(Debug, Eq, PartialEq, Snafu)]
39pub enum Error {
40 #[snafu(display("Invalid boolean value {:?}", s))]
41 BoolParse { s: String },
42 #[snafu(display("Invalid integer {:?}: {}", s, source))]
43 IntParse { s: String, source: ParseIntError },
44 #[snafu(display("NaN number not supported {:?}", s))]
45 NanFloat { s: String },
46 #[snafu(display("Invalid floating point number {:?}: {}", s, source))]
47 FloatParse { s: String, source: ParseFloatError },
48 #[snafu(
49 display("Invalid timestamp {:?}: {}", s, source),
50 visibility(pub(super))
51 )]
52 TimestampParse { s: String, source: ChronoParseError },
53 #[snafu(display("No matching timestamp format found for {:?}", s))]
54 AutoTimestampParse { s: String },
55}
56
57#[allow(clippy::implicit_hasher)]
63pub fn parse_check_conversion_map(
64 types: &HashMap<String, String>,
65 names: &[impl AsRef<str>],
66 tz: TimeZone,
67) -> Result<HashMap<String, Conversion>, ConversionError> {
68 let names = names
70 .iter()
71 .map(std::convert::AsRef::as_ref)
72 .collect::<HashSet<_>>();
73 for name in types.keys() {
74 if !names.contains(name.as_str()) {
75 tracing::warn!(
76 message = "Field was specified in the types but is not a valid field name.",
77 field = &name[..]
78 );
79 }
80 }
81
82 parse_conversion_map(types, tz)
83}
84
85#[allow(clippy::implicit_hasher)]
91pub fn parse_conversion_map(
92 types: &HashMap<String, String>,
93 tz: TimeZone,
94) -> Result<HashMap<String, Conversion>, ConversionError> {
95 types
96 .iter()
97 .map(|(field, typename)| Conversion::parse(typename, tz).map(|conv| (field.clone(), conv)))
98 .collect()
99}
100
101impl Conversion {
102 pub fn parse(s: impl AsRef<str>, tz: TimeZone) -> Result<Self, ConversionError> {
116 let s = s.as_ref();
117 let mut split = s.splitn(2, '|').map(str::trim);
118 match (split.next(), split.next()) {
119 (Some("asis" | "bytes" | "string"), None) => Ok(Self::Bytes),
120 (Some("integer" | "int"), None) => Ok(Self::Integer),
121 (Some("float"), None) => Ok(Self::Float),
122 (Some("bool" | "boolean"), None) => Ok(Self::Boolean),
123 (Some("timestamp"), None) => Ok(Self::Timestamp(tz)),
124 (Some("timestamp"), Some(fmt)) => Ok(Self::timestamp(fmt, tz)),
125 _ => Err(ConversionError::UnknownConversion { name: s.into() }),
126 }
127 }
128
129 #[must_use]
131 pub fn timestamp(fmt: &str, tz: TimeZone) -> Self {
132 if format_has_zone(fmt) {
137 Self::TimestampTzFmt(fmt.into())
138 } else {
139 Self::TimestampFmt(fmt.into(), tz)
140 }
141 }
142
143 #[allow(clippy::trait_duplication_in_bounds)] pub fn convert<T>(&self, bytes: Bytes) -> Result<T, Error>
150 where
151 T: From<Bytes> + From<i64> + From<NotNan<f64>> + From<bool> + From<DateTime<Utc>>,
152 {
153 Ok(match self {
154 Self::Bytes => bytes.into(),
155 Self::Integer => {
156 let s = String::from_utf8_lossy(&bytes);
157 s.parse::<i64>()
158 .with_context(|_| IntParseSnafu { s })?
159 .into()
160 }
161 Self::Float => {
162 let s = String::from_utf8_lossy(&bytes);
163 let parsed = s
164 .parse::<f64>()
165 .with_context(|_| FloatParseSnafu { s: s.clone() })?;
166 let f = NotNan::new(parsed).map_err(|_| Error::NanFloat { s: s.to_string() })?;
167 f.into()
168 }
169 Self::Boolean => parse_bool(&String::from_utf8_lossy(&bytes))?.into(),
170 Self::Timestamp(tz) => parse_timestamp(*tz, &String::from_utf8_lossy(&bytes))?.into(),
171 Self::TimestampFmt(format, tz) => {
172 let s = String::from_utf8_lossy(&bytes);
173 let dt = tz
174 .datetime_from_str(&s, format)
175 .context(TimestampParseSnafu { s })?;
176
177 datetime_to_utc(&dt).into()
178 }
179 Self::TimestampTzFmt(format) => {
180 let s = String::from_utf8_lossy(&bytes);
181 let dt = DateTime::parse_from_str(&s, format)
182 .with_context(|_| TimestampParseSnafu { s })?;
183
184 datetime_to_utc(&dt).into()
185 }
186 })
187 }
188}
189
190fn parse_bool(s: &str) -> Result<bool, Error> {
205 match s {
206 "true" | "t" | "yes" | "y" => Ok(true),
207 "false" | "f" | "no" | "n" | "0" => Ok(false),
208 _ => {
209 if let Ok(n) = s.parse::<isize>() {
210 Ok(n != 0)
211 } else {
212 match s.to_lowercase().as_str() {
215 "true" | "t" | "yes" | "y" => Ok(true),
216 "false" | "f" | "no" | "n" => Ok(false),
217 _ => Err(Error::BoolParse { s: s.into() }),
218 }
219 }
220 }
221 }
222}
223
224fn format_has_zone(fmt: &str) -> bool {
226 fmt.contains("%Z")
227 || fmt.contains("%z")
228 || fmt.contains("%:z")
229 || fmt.contains("%#z")
230 || fmt.contains("%+")
231}
232
233const TIMESTAMP_LOCAL_FORMATS: &[&str] = &[
235 "%F %T", "%v %T", "%FT%T", "%m/%d/%Y:%T", "%a, %d %b %Y %T", "%a %d %b %T %Y", "%A %d %B %T %Y", "%a %b %e %T %Y", ];
244
245const TIMESTAMP_TZ_FORMATS: &[&str] = &[
247 "%+", "%a %d %b %T %Z %Y", "%a %d %b %T %z %Y", "%a %d %b %T %#z %Y", "%d/%b/%Y:%T %z", ];
253
254fn parse_unix_timestamp(timestamp_str: &str) -> LocalResult<DateTime<Utc>> {
255 if let Ok(seconds_since_epoch) = timestamp_str.parse::<i64>() {
256 Utc.timestamp_opt(seconds_since_epoch, 0)
257 } else {
258 LocalResult::None
259 }
260}
261
262fn parse_timestamp(tz: TimeZone, s: &str) -> Result<DateTime<Utc>, Error> {
269 for format in TIMESTAMP_LOCAL_FORMATS {
270 if let Ok(result) = tz.datetime_from_str(s, format) {
271 return Ok(result);
272 }
273 }
274
275 if let LocalResult::Single(result) = parse_unix_timestamp(s) {
277 return Ok(result);
278 }
279
280 if let Ok(result) = DateTime::parse_from_rfc3339(s) {
282 return Ok(datetime_to_utc(&result));
283 }
284
285 if let Ok(result) = DateTime::parse_from_rfc2822(s) {
286 return Ok(datetime_to_utc(&result));
287 }
288
289 for format in TIMESTAMP_TZ_FORMATS {
290 if let Ok(result) = DateTime::parse_from_str(s, format) {
291 return Ok(datetime_to_utc(&result));
292 }
293 }
294
295 Err(Error::AutoTimestampParse { s: s.into() })
296}