vector/sinks/influxdb/
mod.rs

1pub mod logs;
2pub mod metrics;
3
4use std::collections::HashMap;
5
6use bytes::{BufMut, BytesMut};
7use chrono::{DateTime, Utc};
8use futures::FutureExt;
9use http::{StatusCode, Uri};
10use snafu::{ResultExt, Snafu};
11use tower::Service;
12use vector_lib::configurable::configurable_component;
13use vector_lib::event::{KeyString, MetricTags};
14use vector_lib::sensitive_string::SensitiveString;
15
16use crate::http::HttpClient;
17
18pub(in crate::sinks) enum Field {
19    /// string
20    String(String),
21    /// float
22    Float(f64),
23    /// unsigned integer
24    /// Influx can support 64 bit integers if compiled with a flag, see:
25    /// <https://github.com/influxdata/influxdb/issues/7801#issuecomment-466801839>
26    UnsignedInt(u64),
27    /// integer
28    Int(i64),
29    /// boolean
30    Bool(bool),
31}
32
33#[derive(Clone, Copy, Debug)]
34pub(in crate::sinks) enum ProtocolVersion {
35    V1,
36    V2,
37}
38
39#[derive(Debug, Snafu)]
40enum ConfigError {
41    #[snafu(display("InfluxDB v1 or v2 should be configured as endpoint."))]
42    MissingConfiguration,
43    #[snafu(display(
44        "Unclear settings. Both version configured v1: {:?}, v2: {:?}.",
45        v1_settings,
46        v2_settings
47    ))]
48    BothConfiguration {
49        v1_settings: InfluxDb1Settings,
50        v2_settings: InfluxDb2Settings,
51    },
52}
53
54/// Configuration settings for InfluxDB v0.x/v1.x.
55#[configurable_component]
56#[derive(Clone, Debug)]
57pub struct InfluxDb1Settings {
58    /// The name of the database to write into.
59    ///
60    /// Only relevant when using InfluxDB v0.x/v1.x.
61    #[configurable(metadata(docs::examples = "vector-database"))]
62    #[configurable(metadata(docs::examples = "iot-store"))]
63    database: String,
64
65    /// The consistency level to use for writes.
66    ///
67    /// Only relevant when using InfluxDB v0.x/v1.x.
68    #[configurable(metadata(docs::examples = "any"))]
69    #[configurable(metadata(docs::examples = "one"))]
70    #[configurable(metadata(docs::examples = "quorum"))]
71    #[configurable(metadata(docs::examples = "all"))]
72    consistency: Option<String>,
73
74    /// The target retention policy for writes.
75    ///
76    /// Only relevant when using InfluxDB v0.x/v1.x.
77    #[configurable(metadata(docs::examples = "autogen"))]
78    #[configurable(metadata(docs::examples = "one_day_only"))]
79    retention_policy_name: Option<String>,
80
81    /// The username to authenticate with.
82    ///
83    /// Only relevant when using InfluxDB v0.x/v1.x.
84    #[configurable(metadata(docs::examples = "todd"))]
85    #[configurable(metadata(docs::examples = "vector-source"))]
86    username: Option<String>,
87
88    /// The password to authenticate with.
89    ///
90    /// Only relevant when using InfluxDB v0.x/v1.x.
91    #[configurable(metadata(docs::examples = "${INFLUXDB_PASSWORD}"))]
92    #[configurable(metadata(docs::examples = "influxdb4ever"))]
93    password: Option<SensitiveString>,
94}
95
96/// Configuration settings for InfluxDB v2.x.
97#[configurable_component]
98#[derive(Clone, Debug)]
99pub struct InfluxDb2Settings {
100    /// The name of the organization to write into.
101    ///
102    /// Only relevant when using InfluxDB v2.x and above.
103    #[configurable(metadata(docs::examples = "my-org"))]
104    #[configurable(metadata(docs::examples = "33f2cff0a28e5b63"))]
105    org: String,
106
107    /// The name of the bucket to write into.
108    ///
109    /// Only relevant when using InfluxDB v2.x and above.
110    #[configurable(metadata(docs::examples = "vector-bucket"))]
111    #[configurable(metadata(docs::examples = "4d2225e4d3d49f75"))]
112    bucket: String,
113
114    /// The [token][token_docs] to authenticate with.
115    ///
116    /// Only relevant when using InfluxDB v2.x and above.
117    ///
118    /// [token_docs]: https://v2.docs.influxdata.com/v2.0/security/tokens/
119    #[configurable(metadata(docs::examples = "${INFLUXDB_TOKEN}"))]
120    #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
121    token: SensitiveString,
122}
123
124trait InfluxDbSettings: std::fmt::Debug {
125    fn write_uri(&self, endpoint: String) -> crate::Result<Uri>;
126    fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri>;
127    fn token(&self) -> SensitiveString;
128    fn protocol_version(&self) -> ProtocolVersion;
129}
130
131impl InfluxDbSettings for InfluxDb1Settings {
132    fn write_uri(&self, endpoint: String) -> crate::Result<Uri> {
133        encode_uri(
134            &endpoint,
135            "write",
136            &[
137                ("consistency", self.consistency.clone()),
138                ("db", Some(self.database.clone())),
139                ("rp", self.retention_policy_name.clone()),
140                ("p", self.password.as_ref().map(|v| v.inner().to_owned())),
141                ("u", self.username.clone()),
142                ("precision", Some("ns".to_owned())),
143            ],
144        )
145    }
146
147    fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri> {
148        encode_uri(&endpoint, "ping", &[])
149    }
150
151    fn token(&self) -> SensitiveString {
152        SensitiveString::default()
153    }
154
155    fn protocol_version(&self) -> ProtocolVersion {
156        ProtocolVersion::V1
157    }
158}
159
160impl InfluxDbSettings for InfluxDb2Settings {
161    fn write_uri(&self, endpoint: String) -> crate::Result<Uri> {
162        encode_uri(
163            &endpoint,
164            "api/v2/write",
165            &[
166                ("org", Some(self.org.clone())),
167                ("bucket", Some(self.bucket.clone())),
168                ("precision", Some("ns".to_owned())),
169            ],
170        )
171    }
172
173    fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri> {
174        encode_uri(&endpoint, "ping", &[])
175    }
176
177    fn token(&self) -> SensitiveString {
178        self.token.clone()
179    }
180
181    fn protocol_version(&self) -> ProtocolVersion {
182        ProtocolVersion::V2
183    }
184}
185
186fn influxdb_settings(
187    influxdb1_settings: Option<InfluxDb1Settings>,
188    influxdb2_settings: Option<InfluxDb2Settings>,
189) -> Result<Box<dyn InfluxDbSettings>, crate::Error> {
190    match (influxdb1_settings, influxdb2_settings) {
191        (Some(v1_settings), Some(v2_settings)) => Err(ConfigError::BothConfiguration {
192            v1_settings,
193            v2_settings,
194        }
195        .into()),
196        (None, None) => Err(ConfigError::MissingConfiguration.into()),
197        (Some(settings), _) => Ok(Box::new(settings)),
198        (_, Some(settings)) => Ok(Box::new(settings)),
199    }
200}
201
202// V1: https://docs.influxdata.com/influxdb/v1.7/tools/api/#ping-http-endpoint
203// V2: https://v2.docs.influxdata.com/v2.0/api/#operation/GetHealth
204fn healthcheck(
205    endpoint: String,
206    influxdb1_settings: Option<InfluxDb1Settings>,
207    influxdb2_settings: Option<InfluxDb2Settings>,
208    mut client: HttpClient,
209) -> crate::Result<super::Healthcheck> {
210    let settings = influxdb_settings(influxdb1_settings, influxdb2_settings)?;
211
212    let uri = settings.healthcheck_uri(endpoint)?;
213
214    let request = hyper::Request::get(uri).body(hyper::Body::empty()).unwrap();
215
216    Ok(async move {
217        client
218            .call(request)
219            .await
220            .map_err(|error| error.into())
221            .and_then(|response| match response.status() {
222                StatusCode::OK => Ok(()),
223                StatusCode::NO_CONTENT => Ok(()),
224                other => Err(super::HealthcheckError::UnexpectedStatus { status: other }.into()),
225            })
226    }
227    .boxed())
228}
229
230// https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/
231pub(in crate::sinks) fn influx_line_protocol(
232    protocol_version: ProtocolVersion,
233    measurement: &str,
234    tags: Option<MetricTags>,
235    fields: Option<HashMap<KeyString, Field>>,
236    timestamp: i64,
237    line_protocol: &mut BytesMut,
238) -> Result<(), &'static str> {
239    // Fields
240    let unwrapped_fields = fields.unwrap_or_default();
241    // LineProtocol should have a field
242    if unwrapped_fields.is_empty() {
243        return Err("fields must not be empty");
244    }
245
246    encode_string(measurement, line_protocol);
247
248    // Tags are optional
249    let unwrapped_tags = tags.unwrap_or_default();
250    if !unwrapped_tags.is_empty() {
251        line_protocol.put_u8(b',');
252        encode_tags(unwrapped_tags, line_protocol);
253    }
254    line_protocol.put_u8(b' ');
255
256    // Fields
257    encode_fields(protocol_version, unwrapped_fields, line_protocol);
258    line_protocol.put_u8(b' ');
259
260    // Timestamp
261    line_protocol.put_slice(&timestamp.to_string().into_bytes());
262    line_protocol.put_u8(b'\n');
263    Ok(())
264}
265
266fn encode_tags(tags: MetricTags, output: &mut BytesMut) {
267    let original_len = output.len();
268    // `tags` is already sorted
269    for (key, value) in tags.iter_single() {
270        if key.is_empty() || value.is_empty() {
271            continue;
272        }
273        encode_string(key, output);
274        output.put_u8(b'=');
275        encode_string(value, output);
276        output.put_u8(b',');
277    }
278
279    // remove last ','
280    if output.len() > original_len {
281        output.truncate(output.len() - 1);
282    }
283}
284
285fn encode_fields(
286    protocol_version: ProtocolVersion,
287    fields: HashMap<KeyString, Field>,
288    output: &mut BytesMut,
289) {
290    let original_len = output.len();
291    for (key, value) in fields.into_iter() {
292        encode_string(&key, output);
293        output.put_u8(b'=');
294        match value {
295            Field::String(s) => {
296                output.put_u8(b'"');
297                for c in s.chars() {
298                    if "\\\"".contains(c) {
299                        output.put_u8(b'\\');
300                    }
301                    let mut c_buffer: [u8; 4] = [0; 4];
302                    output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
303                }
304                output.put_u8(b'"');
305            }
306            Field::Float(f) => output.put_slice(&f.to_string().into_bytes()),
307            Field::UnsignedInt(i) => {
308                output.put_slice(&i.to_string().into_bytes());
309                let c = match protocol_version {
310                    ProtocolVersion::V1 => 'i',
311                    ProtocolVersion::V2 => 'u',
312                };
313                let mut c_buffer: [u8; 4] = [0; 4];
314                output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
315            }
316            Field::Int(i) => {
317                output.put_slice(&i.to_string().into_bytes());
318                output.put_u8(b'i');
319            }
320            Field::Bool(b) => {
321                output.put_slice(&b.to_string().into_bytes());
322            }
323        };
324        output.put_u8(b',');
325    }
326
327    // remove last ','
328    if output.len() > original_len {
329        output.truncate(output.len() - 1);
330    }
331}
332
333fn encode_string(key: &str, output: &mut BytesMut) {
334    for c in key.chars() {
335        if "\\, =".contains(c) {
336            output.put_u8(b'\\');
337        }
338        let mut c_buffer: [u8; 4] = [0; 4];
339        output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
340    }
341}
342
343pub(in crate::sinks) fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
344    if let Some(ts) = timestamp {
345        ts.timestamp_nanos_opt().unwrap()
346    } else {
347        encode_timestamp(Some(Utc::now()))
348    }
349}
350
351pub(in crate::sinks) fn encode_uri(
352    endpoint: &str,
353    path: &str,
354    pairs: &[(&str, Option<String>)],
355) -> crate::Result<Uri> {
356    let mut serializer = url::form_urlencoded::Serializer::new(String::new());
357
358    for pair in pairs {
359        if let Some(v) = &pair.1 {
360            serializer.append_pair(pair.0, v);
361        }
362    }
363
364    let mut url = if endpoint.ends_with('/') {
365        format!("{}{}?{}", endpoint, path, serializer.finish())
366    } else {
367        format!("{}/{}?{}", endpoint, path, serializer.finish())
368    };
369
370    if url.ends_with('?') {
371        url.pop();
372    }
373
374    Ok(url.parse::<Uri>().context(super::UriParseSnafu)?)
375}
376
377#[cfg(test)]
378#[allow(dead_code)]
379pub mod test_util {
380    use std::{fs::File, io::Read};
381
382    use chrono::{offset::TimeZone, DateTime, SecondsFormat, Timelike, Utc};
383    use vector_lib::metric_tags;
384
385    use super::*;
386    use crate::tls;
387
388    pub(crate) const ORG: &str = "my-org";
389    pub(crate) const BUCKET: &str = "my-bucket";
390    pub(crate) const TOKEN: &str = "my-token";
391
392    pub(crate) fn next_database() -> String {
393        format!("testdb{}", Utc::now().timestamp_nanos_opt().unwrap())
394    }
395
396    pub(crate) fn ts() -> DateTime<Utc> {
397        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
398            .single()
399            .and_then(|t| t.with_nanosecond(11))
400            .expect("invalid timestamp")
401    }
402
403    pub(crate) fn tags() -> MetricTags {
404        metric_tags!(
405            "normal_tag" => "value",
406            "true_tag" => "true",
407            "empty_tag" => "",
408        )
409    }
410
411    pub(crate) fn assert_fields(value: String, fields: Vec<&str>) {
412        let encoded_fields: Vec<&str> = value.split(',').collect();
413
414        assert_eq!(fields.len(), encoded_fields.len());
415
416        for field in fields.into_iter() {
417            assert!(
418                encoded_fields.contains(&field),
419                "Fields: {value} has to have: {field}"
420            )
421        }
422    }
423
424    pub(crate) fn address_v1(secure: bool) -> String {
425        if secure {
426            std::env::var("INFLUXDB_V1_HTTPS_ADDRESS")
427                .unwrap_or_else(|_| "http://localhost:8087".into())
428        } else {
429            std::env::var("INFLUXDB_V1_HTTP_ADDRESS")
430                .unwrap_or_else(|_| "http://localhost:8086".into())
431        }
432    }
433
434    pub(crate) fn address_v2() -> String {
435        std::env::var("INFLUXDB_V2_ADDRESS").unwrap_or_else(|_| "http://localhost:9999".into())
436    }
437
438    // ns.requests,metric_type=distribution,normal_tag=value,true_tag=true avg=1.875,count=8,max=3,median=2,min=1,quantile_0.95=3,sum=15 1542182950000000011
439    //
440    // =>
441    //
442    // ns.requests
443    // metric_type=distribution,normal_tag=value,true_tag=true
444    // avg=1.875,count=8,max=3,median=2,min=1,quantile_0.95=3,sum=15
445    // 1542182950000000011
446    //
447    pub(crate) fn split_line_protocol(line_protocol: &str) -> (&str, &str, String, &str) {
448        let (name, fields) = line_protocol.split_once(' ').unwrap_or_default();
449        // tags and timestamp may not be present
450        let (measurement, tags) = name.split_once(',').unwrap_or((name, ""));
451        let (fields, ts) = fields.split_once(' ').unwrap_or((fields, ""));
452
453        (measurement, tags, fields.to_string(), ts)
454    }
455
456    fn client() -> reqwest::Client {
457        let mut test_ca = Vec::<u8>::new();
458        File::open(tls::TEST_PEM_CA_PATH)
459            .unwrap()
460            .read_to_end(&mut test_ca)
461            .unwrap();
462        let test_ca = reqwest::Certificate::from_pem(&test_ca).unwrap();
463
464        reqwest::Client::builder()
465            .add_root_certificate(test_ca)
466            .build()
467            .unwrap()
468    }
469
470    pub(crate) async fn query_v1(endpoint: &str, query: &str) -> reqwest::Response {
471        client()
472            .get(format!("{endpoint}/query"))
473            .query(&[("q", query)])
474            .send()
475            .await
476            .unwrap()
477    }
478
479    pub(crate) async fn onboarding_v1(endpoint: &str) -> String {
480        let database = next_database();
481        let status = query_v1(endpoint, &format!("create database {database}"))
482            .await
483            .status();
484        assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {status}");
485        // Some times InfluxDB will return OK before it can actually
486        // accept writes to the database, leading to test failures. Test
487        // this with empty writes and loop if it reports the database
488        // does not exist yet.
489        crate::test_util::wait_for(|| {
490            let write_url = format!("{}/write?db={}", endpoint, &database);
491            async move {
492                match client()
493                    .post(&write_url)
494                    .header("Content-Type", "text/plain")
495                    .header("Authorization", &format!("Token {TOKEN}"))
496                    .body("")
497                    .send()
498                    .await
499                    .unwrap()
500                    .status()
501                {
502                    http::StatusCode::NO_CONTENT => true,
503                    http::StatusCode::NOT_FOUND => false,
504                    status => panic!("Unexpected status: {status}"),
505                }
506            }
507        })
508        .await;
509        database
510    }
511
512    pub(crate) async fn cleanup_v1(endpoint: &str, database: &str) {
513        let status = query_v1(endpoint, &format!("drop database {database}"))
514            .await
515            .status();
516        assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {status}");
517    }
518
519    pub(crate) async fn onboarding_v2(endpoint: &str) {
520        let mut body = std::collections::HashMap::new();
521        body.insert("username", "my-user");
522        body.insert("password", "my-password");
523        body.insert("org", ORG);
524        body.insert("bucket", BUCKET);
525        body.insert("token", TOKEN);
526
527        let client = reqwest::Client::builder()
528            .danger_accept_invalid_certs(true)
529            .build()
530            .unwrap();
531
532        let res = client
533            .post(format!("{endpoint}/api/v2/setup"))
534            .json(&body)
535            .header("accept", "application/json")
536            .send()
537            .await
538            .unwrap();
539
540        let status = res.status();
541
542        assert!(
543            status == StatusCode::CREATED || status == StatusCode::UNPROCESSABLE_ENTITY,
544            "UnexpectedStatus: {status}"
545        );
546    }
547
548    pub(crate) fn format_timestamp(timestamp: DateTime<Utc>, format: SecondsFormat) -> String {
549        strip_timestamp(timestamp.to_rfc3339_opts(format, true))
550    }
551
552    // InfluxDB strips off trailing zeros in timestamps in metrics
553    fn strip_timestamp(timestamp: String) -> String {
554        let strip_one = || format!("{}Z", &timestamp[..timestamp.len() - 2]);
555        match timestamp {
556            _ if timestamp.ends_with("0Z") => strip_timestamp(strip_one()),
557            _ if timestamp.ends_with(".Z") => strip_one(),
558            _ => timestamp,
559        }
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use serde::{Deserialize, Serialize};
566
567    use super::*;
568    use crate::sinks::influxdb::test_util::{assert_fields, tags, ts};
569
570    #[derive(Deserialize, Serialize, Debug, Clone, Default)]
571    #[serde(deny_unknown_fields)]
572    pub struct InfluxDbTestConfig {
573        #[serde(flatten)]
574        pub influxdb1_settings: Option<InfluxDb1Settings>,
575        #[serde(flatten)]
576        pub influxdb2_settings: Option<InfluxDb2Settings>,
577    }
578
579    #[test]
580    fn test_influxdb_settings_both() {
581        let config = r#"
582        bucket = "my-bucket"
583        org = "my-org"
584        token = "my-token"
585        database = "my-database"
586    "#;
587        let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
588        let settings = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings);
589        assert_eq!(
590            settings.expect_err("expected error").to_string(),
591            "Unclear settings. Both version configured v1: InfluxDb1Settings { database: \"my-database\", consistency: None, retention_policy_name: None, username: None, password: None }, v2: InfluxDb2Settings { org: \"my-org\", bucket: \"my-bucket\", token: \"**REDACTED**\" }.".to_owned()
592        );
593    }
594
595    #[test]
596    fn test_influxdb_settings_missing() {
597        let config = r"
598    ";
599        let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
600        let settings = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings);
601        assert_eq!(
602            settings.expect_err("expected error").to_string(),
603            "InfluxDB v1 or v2 should be configured as endpoint.".to_owned()
604        );
605    }
606
607    #[test]
608    fn test_influxdb1_settings() {
609        let config = r#"
610        database = "my-database"
611    "#;
612        let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
613        _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap();
614    }
615
616    #[test]
617    fn test_influxdb2_settings() {
618        let config = r#"
619        bucket = "my-bucket"
620        org = "my-org"
621        token = "my-token"
622    "#;
623        let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
624        _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap();
625    }
626
627    #[test]
628    fn test_influxdb1_test_write_uri() {
629        let settings = InfluxDb1Settings {
630            consistency: Some("quorum".to_owned()),
631            database: "vector_db".to_owned(),
632            retention_policy_name: Some("autogen".to_owned()),
633            username: Some("writer".to_owned()),
634            password: Some("secret".to_owned().into()),
635        };
636
637        let uri = settings
638            .write_uri("http://localhost:8086".to_owned())
639            .unwrap();
640        assert_eq!("http://localhost:8086/write?consistency=quorum&db=vector_db&rp=autogen&p=secret&u=writer&precision=ns", uri.to_string())
641    }
642
643    #[test]
644    fn test_influxdb2_test_write_uri() {
645        let settings = InfluxDb2Settings {
646            org: "my-org".to_owned(),
647            bucket: "my-bucket".to_owned(),
648            token: "my-token".to_owned().into(),
649        };
650
651        let uri = settings
652            .write_uri("http://localhost:9999".to_owned())
653            .unwrap();
654        assert_eq!(
655            "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns",
656            uri.to_string()
657        )
658    }
659
660    #[test]
661    fn test_influxdb1_test_healthcheck_uri() {
662        let settings = InfluxDb1Settings {
663            consistency: Some("quorum".to_owned()),
664            database: "vector_db".to_owned(),
665            retention_policy_name: Some("autogen".to_owned()),
666            username: Some("writer".to_owned()),
667            password: Some("secret".to_owned().into()),
668        };
669
670        let uri = settings
671            .healthcheck_uri("http://localhost:8086".to_owned())
672            .unwrap();
673        assert_eq!("http://localhost:8086/ping", uri.to_string())
674    }
675
676    #[test]
677    fn test_influxdb2_test_healthcheck_uri() {
678        let settings = InfluxDb2Settings {
679            org: "my-org".to_owned(),
680            bucket: "my-bucket".to_owned(),
681            token: "my-token".to_owned().into(),
682        };
683
684        let uri = settings
685            .healthcheck_uri("http://localhost:9999".to_owned())
686            .unwrap();
687        assert_eq!("http://localhost:9999/ping", uri.to_string())
688    }
689
690    #[test]
691    fn test_encode_tags() {
692        let mut value = BytesMut::new();
693        encode_tags(tags(), &mut value);
694
695        assert_eq!(value, "normal_tag=value,true_tag=true");
696
697        let tags_to_escape = vec![
698            ("tag".to_owned(), "val=ue".to_owned()),
699            ("name escape".to_owned(), "true".to_owned()),
700            ("value_escape".to_owned(), "value escape".to_owned()),
701            ("a_first_place".to_owned(), "10".to_owned()),
702        ]
703        .into_iter()
704        .collect();
705
706        let mut value = BytesMut::new();
707        encode_tags(tags_to_escape, &mut value);
708        assert_eq!(
709            value,
710            "a_first_place=10,name\\ escape=true,tag=val\\=ue,value_escape=value\\ escape"
711        );
712    }
713
714    #[test]
715    fn tags_order() {
716        let mut value = BytesMut::new();
717        encode_tags(
718            vec![
719                ("a", "value"),
720                ("b", "value"),
721                ("c", "value"),
722                ("d", "value"),
723                ("e", "value"),
724            ]
725            .into_iter()
726            .map(|(k, v)| (k.to_owned(), v.to_owned()))
727            .collect(),
728            &mut value,
729        );
730        assert_eq!(value, "a=value,b=value,c=value,d=value,e=value");
731    }
732
733    #[test]
734    fn test_encode_fields_v1() {
735        let fields = vec![
736            ("field_string".into(), Field::String("string value".into())),
737            (
738                "field_string_escape".into(),
739                Field::String("string\\val\"ue".into()),
740            ),
741            ("field_float".into(), Field::Float(123.45)),
742            ("field_unsigned_int".into(), Field::UnsignedInt(657)),
743            ("field_int".into(), Field::Int(657646)),
744            ("field_bool_true".into(), Field::Bool(true)),
745            ("field_bool_false".into(), Field::Bool(false)),
746            ("escape key".into(), Field::Float(10.0)),
747        ]
748        .into_iter()
749        .collect();
750
751        let mut value = BytesMut::new();
752        encode_fields(ProtocolVersion::V1, fields, &mut value);
753        let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap();
754        assert_fields(
755            value,
756            [
757                "escape\\ key=10",
758                "field_float=123.45",
759                "field_string=\"string value\"",
760                "field_string_escape=\"string\\\\val\\\"ue\"",
761                "field_unsigned_int=657i",
762                "field_int=657646i",
763                "field_bool_true=true",
764                "field_bool_false=false",
765            ]
766            .to_vec(),
767        )
768    }
769
770    #[test]
771    fn test_encode_fields() {
772        let fields = vec![
773            ("field_string".into(), Field::String("string value".into())),
774            (
775                "field_string_escape".into(),
776                Field::String("string\\val\"ue".into()),
777            ),
778            ("field_float".into(), Field::Float(123.45)),
779            ("field_unsigned_int".into(), Field::UnsignedInt(657)),
780            ("field_int".into(), Field::Int(657646)),
781            ("field_bool_true".into(), Field::Bool(true)),
782            ("field_bool_false".into(), Field::Bool(false)),
783            ("escape key".into(), Field::Float(10.0)),
784        ]
785        .into_iter()
786        .collect();
787
788        let mut value = BytesMut::new();
789        encode_fields(ProtocolVersion::V2, fields, &mut value);
790        let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap();
791        assert_fields(
792            value,
793            [
794                "escape\\ key=10",
795                "field_float=123.45",
796                "field_string=\"string value\"",
797                "field_string_escape=\"string\\\\val\\\"ue\"",
798                "field_unsigned_int=657u",
799                "field_int=657646i",
800                "field_bool_true=true",
801                "field_bool_false=false",
802            ]
803            .to_vec(),
804        )
805    }
806
807    #[test]
808    fn test_encode_string() {
809        let mut value = BytesMut::new();
810        encode_string("measurement_name", &mut value);
811        assert_eq!(value, "measurement_name");
812
813        let mut value = BytesMut::new();
814        encode_string("measurement name", &mut value);
815        assert_eq!(value, "measurement\\ name");
816
817        let mut value = BytesMut::new();
818        encode_string("measurement=name", &mut value);
819        assert_eq!(value, "measurement\\=name");
820
821        let mut value = BytesMut::new();
822        encode_string("measurement,name", &mut value);
823        assert_eq!(value, "measurement\\,name");
824    }
825
826    #[test]
827    fn test_encode_timestamp() {
828        let start = Utc::now()
829            .timestamp_nanos_opt()
830            .expect("Timestamp out of range");
831        assert_eq!(encode_timestamp(Some(ts())), 1542182950000000011);
832        assert!(encode_timestamp(None) >= start)
833    }
834
835    #[test]
836    fn test_encode_uri_valid() {
837        let uri = encode_uri(
838            "http://localhost:9999",
839            "api/v2/write",
840            &[
841                ("org", Some("my-org".to_owned())),
842                ("bucket", Some("my-bucket".to_owned())),
843                ("precision", Some("ns".to_owned())),
844            ],
845        )
846        .unwrap();
847        assert_eq!(
848            uri,
849            "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns"
850        );
851
852        let uri = encode_uri(
853            "http://localhost:9999/",
854            "api/v2/write",
855            &[
856                ("org", Some("my-org".to_owned())),
857                ("bucket", Some("my-bucket".to_owned())),
858            ],
859        )
860        .unwrap();
861        assert_eq!(
862            uri,
863            "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket"
864        );
865
866        let uri = encode_uri(
867            "http://localhost:9999",
868            "api/v2/write",
869            &[
870                ("org", Some("Organization name".to_owned())),
871                ("bucket", Some("Bucket=name".to_owned())),
872                ("none", None),
873            ],
874        )
875        .unwrap();
876        assert_eq!(
877            uri,
878            "http://localhost:9999/api/v2/write?org=Organization+name&bucket=Bucket%3Dname"
879        );
880    }
881
882    #[test]
883    fn test_encode_uri_invalid() {
884        encode_uri(
885            "localhost:9999",
886            "api/v2/write",
887            &[
888                ("org", Some("my-org".to_owned())),
889                ("bucket", Some("my-bucket".to_owned())),
890            ],
891        )
892        .unwrap_err();
893    }
894}
895
896#[cfg(feature = "influxdb-integration-tests")]
897#[cfg(test)]
898mod integration_tests {
899    use crate::{
900        config::ProxyConfig,
901        http::HttpClient,
902        sinks::influxdb::{
903            healthcheck,
904            test_util::{address_v1, address_v2, next_database, onboarding_v2, BUCKET, ORG, TOKEN},
905            InfluxDb1Settings, InfluxDb2Settings,
906        },
907    };
908
909    #[tokio::test]
910    async fn influxdb2_healthchecks_ok() {
911        let endpoint = address_v2();
912        onboarding_v2(&endpoint).await;
913
914        let endpoint = address_v2();
915        let influxdb1_settings = None;
916        let influxdb2_settings = Some(InfluxDb2Settings {
917            org: ORG.to_string(),
918            bucket: BUCKET.to_string(),
919            token: TOKEN.to_string().into(),
920        });
921        let proxy = ProxyConfig::default();
922        let client = HttpClient::new(None, &proxy).unwrap();
923
924        healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
925            .unwrap()
926            .await
927            .unwrap()
928    }
929
930    #[tokio::test]
931    #[should_panic]
932    async fn influxdb2_healthchecks_fail() {
933        let endpoint = "http://127.0.0.1:9999".to_string();
934        onboarding_v2(&endpoint).await;
935
936        let influxdb1_settings = None;
937        let influxdb2_settings = Some(InfluxDb2Settings {
938            org: ORG.to_string(),
939            bucket: BUCKET.to_string(),
940            token: TOKEN.to_string().into(),
941        });
942        let proxy = ProxyConfig::default();
943        let client = HttpClient::new(None, &proxy).unwrap();
944
945        healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
946            .unwrap()
947            .await
948            .unwrap();
949    }
950
951    #[tokio::test]
952    async fn influxdb1_healthchecks_ok() {
953        let endpoint = address_v1(false);
954
955        let influxdb1_settings = Some(InfluxDb1Settings {
956            database: next_database(),
957            consistency: None,
958            retention_policy_name: None,
959            username: None,
960            password: None,
961        });
962        let influxdb2_settings = None;
963        let proxy = ProxyConfig::default();
964        let client = HttpClient::new(None, &proxy).unwrap();
965
966        healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
967            .unwrap()
968            .await
969            .unwrap();
970    }
971
972    #[tokio::test]
973    #[should_panic]
974    async fn influxdb1_healthchecks_fail() {
975        let endpoint = "http://127.0.0.1:8086".to_string();
976        let influxdb1_settings = Some(InfluxDb1Settings {
977            database: next_database(),
978            consistency: None,
979            retention_policy_name: None,
980            username: None,
981            password: None,
982        });
983        let influxdb2_settings = None;
984        let proxy = ProxyConfig::default();
985        let client = HttpClient::new(None, &proxy).unwrap();
986
987        healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
988            .unwrap()
989            .await
990            .unwrap();
991    }
992}