vector/sinks/influxdb/
mod.rs

1pub mod logs;
2pub mod metrics;
3
4use std::collections::HashMap;
5
6use bytes::{BufMut, BytesMut};
7use chrono::{DateTime, Utc};
8use futures::FutureExt;
9use http::{StatusCode, Uri};
10use snafu::{ResultExt, Snafu};
11use tower::Service;
12use vector_lib::{
13    configurable::configurable_component,
14    event::{KeyString, MetricTags},
15    sensitive_string::SensitiveString,
16};
17
18use crate::http::HttpClient;
19
20pub(in crate::sinks) enum Field {
21    /// string
22    String(String),
23    /// float
24    Float(f64),
25    /// unsigned integer
26    /// Influx can support 64 bit integers if compiled with a flag, see:
27    /// <https://github.com/influxdata/influxdb/issues/7801#issuecomment-466801839>
28    UnsignedInt(u64),
29    /// integer
30    Int(i64),
31    /// boolean
32    Bool(bool),
33}
34
35#[derive(Clone, Copy, Debug)]
36pub(in crate::sinks) enum ProtocolVersion {
37    V1,
38    V2,
39}
40
41#[derive(Debug, Snafu)]
42enum ConfigError {
43    #[snafu(display("InfluxDB v1 or v2 should be configured as endpoint."))]
44    MissingConfiguration,
45    #[snafu(display(
46        "Unclear settings. Both version configured v1: {:?}, v2: {:?}.",
47        v1_settings,
48        v2_settings
49    ))]
50    BothConfiguration {
51        v1_settings: InfluxDb1Settings,
52        v2_settings: InfluxDb2Settings,
53    },
54}
55
56/// Configuration settings for InfluxDB v0.x/v1.x.
57#[configurable_component]
58#[derive(Clone, Debug)]
59pub struct InfluxDb1Settings {
60    /// The name of the database to write into.
61    ///
62    /// Only relevant when using InfluxDB v0.x/v1.x.
63    #[configurable(metadata(docs::examples = "vector-database"))]
64    #[configurable(metadata(docs::examples = "iot-store"))]
65    database: String,
66
67    /// The consistency level to use for writes.
68    ///
69    /// Only relevant when using InfluxDB v0.x/v1.x.
70    #[configurable(metadata(docs::examples = "any"))]
71    #[configurable(metadata(docs::examples = "one"))]
72    #[configurable(metadata(docs::examples = "quorum"))]
73    #[configurable(metadata(docs::examples = "all"))]
74    consistency: Option<String>,
75
76    /// The target retention policy for writes.
77    ///
78    /// Only relevant when using InfluxDB v0.x/v1.x.
79    #[configurable(metadata(docs::examples = "autogen"))]
80    #[configurable(metadata(docs::examples = "one_day_only"))]
81    retention_policy_name: Option<String>,
82
83    /// The username to authenticate with.
84    ///
85    /// Only relevant when using InfluxDB v0.x/v1.x.
86    #[configurable(metadata(docs::examples = "todd"))]
87    #[configurable(metadata(docs::examples = "vector-source"))]
88    username: Option<String>,
89
90    /// The password to authenticate with.
91    ///
92    /// Only relevant when using InfluxDB v0.x/v1.x.
93    #[configurable(metadata(docs::examples = "${INFLUXDB_PASSWORD}"))]
94    #[configurable(metadata(docs::examples = "influxdb4ever"))]
95    password: Option<SensitiveString>,
96}
97
98/// Configuration settings for InfluxDB v2.x.
99#[configurable_component]
100#[derive(Clone, Debug)]
101pub struct InfluxDb2Settings {
102    /// The name of the organization to write into.
103    ///
104    /// Only relevant when using InfluxDB v2.x and above.
105    #[configurable(metadata(docs::examples = "my-org"))]
106    #[configurable(metadata(docs::examples = "33f2cff0a28e5b63"))]
107    org: String,
108
109    /// The name of the bucket to write into.
110    ///
111    /// Only relevant when using InfluxDB v2.x and above.
112    #[configurable(metadata(docs::examples = "vector-bucket"))]
113    #[configurable(metadata(docs::examples = "4d2225e4d3d49f75"))]
114    bucket: String,
115
116    /// The [token][token_docs] to authenticate with.
117    ///
118    /// Only relevant when using InfluxDB v2.x and above.
119    ///
120    /// [token_docs]: https://v2.docs.influxdata.com/v2.0/security/tokens/
121    #[configurable(metadata(docs::examples = "${INFLUXDB_TOKEN}"))]
122    #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
123    token: SensitiveString,
124}
125
126trait InfluxDbSettings: std::fmt::Debug {
127    fn write_uri(&self, endpoint: String) -> crate::Result<Uri>;
128    fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri>;
129    fn token(&self) -> SensitiveString;
130    fn protocol_version(&self) -> ProtocolVersion;
131}
132
133impl InfluxDbSettings for InfluxDb1Settings {
134    fn write_uri(&self, endpoint: String) -> crate::Result<Uri> {
135        encode_uri(
136            &endpoint,
137            "write",
138            &[
139                ("consistency", self.consistency.clone()),
140                ("db", Some(self.database.clone())),
141                ("rp", self.retention_policy_name.clone()),
142                ("p", self.password.as_ref().map(|v| v.inner().to_owned())),
143                ("u", self.username.clone()),
144                ("precision", Some("ns".to_owned())),
145            ],
146        )
147    }
148
149    fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri> {
150        encode_uri(&endpoint, "ping", &[])
151    }
152
153    fn token(&self) -> SensitiveString {
154        SensitiveString::default()
155    }
156
157    fn protocol_version(&self) -> ProtocolVersion {
158        ProtocolVersion::V1
159    }
160}
161
162impl InfluxDbSettings for InfluxDb2Settings {
163    fn write_uri(&self, endpoint: String) -> crate::Result<Uri> {
164        encode_uri(
165            &endpoint,
166            "api/v2/write",
167            &[
168                ("org", Some(self.org.clone())),
169                ("bucket", Some(self.bucket.clone())),
170                ("precision", Some("ns".to_owned())),
171            ],
172        )
173    }
174
175    fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri> {
176        encode_uri(&endpoint, "ping", &[])
177    }
178
179    fn token(&self) -> SensitiveString {
180        self.token.clone()
181    }
182
183    fn protocol_version(&self) -> ProtocolVersion {
184        ProtocolVersion::V2
185    }
186}
187
188fn influxdb_settings(
189    influxdb1_settings: Option<InfluxDb1Settings>,
190    influxdb2_settings: Option<InfluxDb2Settings>,
191) -> Result<Box<dyn InfluxDbSettings>, crate::Error> {
192    match (influxdb1_settings, influxdb2_settings) {
193        (Some(v1_settings), Some(v2_settings)) => Err(ConfigError::BothConfiguration {
194            v1_settings,
195            v2_settings,
196        }
197        .into()),
198        (None, None) => Err(ConfigError::MissingConfiguration.into()),
199        (Some(settings), _) => Ok(Box::new(settings)),
200        (_, Some(settings)) => Ok(Box::new(settings)),
201    }
202}
203
204// V1: https://docs.influxdata.com/influxdb/v1.7/tools/api/#ping-http-endpoint
205// V2: https://v2.docs.influxdata.com/v2.0/api/#operation/GetHealth
206fn healthcheck(
207    endpoint: String,
208    influxdb1_settings: Option<InfluxDb1Settings>,
209    influxdb2_settings: Option<InfluxDb2Settings>,
210    mut client: HttpClient,
211) -> crate::Result<super::Healthcheck> {
212    let settings = influxdb_settings(influxdb1_settings, influxdb2_settings)?;
213
214    let uri = settings.healthcheck_uri(endpoint)?;
215
216    let request = hyper::Request::get(uri).body(hyper::Body::empty()).unwrap();
217
218    Ok(async move {
219        client
220            .call(request)
221            .await
222            .map_err(|error| error.into())
223            .and_then(|response| match response.status() {
224                StatusCode::OK => Ok(()),
225                StatusCode::NO_CONTENT => Ok(()),
226                other => Err(super::HealthcheckError::UnexpectedStatus { status: other }.into()),
227            })
228    }
229    .boxed())
230}
231
232// https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/
233pub(in crate::sinks) fn influx_line_protocol(
234    protocol_version: ProtocolVersion,
235    measurement: &str,
236    tags: Option<MetricTags>,
237    fields: Option<HashMap<KeyString, Field>>,
238    timestamp: i64,
239    line_protocol: &mut BytesMut,
240) -> Result<(), &'static str> {
241    // Fields
242    let unwrapped_fields = fields.unwrap_or_default();
243    // LineProtocol should have a field
244    if unwrapped_fields.is_empty() {
245        return Err("fields must not be empty");
246    }
247
248    encode_string(measurement, line_protocol);
249
250    // Tags are optional
251    let unwrapped_tags = tags.unwrap_or_default();
252    if !unwrapped_tags.is_empty() {
253        line_protocol.put_u8(b',');
254        encode_tags(unwrapped_tags, line_protocol);
255    }
256    line_protocol.put_u8(b' ');
257
258    // Fields
259    encode_fields(protocol_version, unwrapped_fields, line_protocol);
260    line_protocol.put_u8(b' ');
261
262    // Timestamp
263    line_protocol.put_slice(&timestamp.to_string().into_bytes());
264    line_protocol.put_u8(b'\n');
265    Ok(())
266}
267
268fn encode_tags(tags: MetricTags, output: &mut BytesMut) {
269    let original_len = output.len();
270    // `tags` is already sorted
271    for (key, value) in tags.iter_single() {
272        if key.is_empty() || value.is_empty() {
273            continue;
274        }
275        encode_string(key, output);
276        output.put_u8(b'=');
277        encode_string(value, output);
278        output.put_u8(b',');
279    }
280
281    // remove last ','
282    if output.len() > original_len {
283        output.truncate(output.len() - 1);
284    }
285}
286
287fn encode_fields(
288    protocol_version: ProtocolVersion,
289    fields: HashMap<KeyString, Field>,
290    output: &mut BytesMut,
291) {
292    let original_len = output.len();
293    for (key, value) in fields.into_iter() {
294        encode_string(&key, output);
295        output.put_u8(b'=');
296        match value {
297            Field::String(s) => {
298                output.put_u8(b'"');
299                for c in s.chars() {
300                    if "\\\"".contains(c) {
301                        output.put_u8(b'\\');
302                    }
303                    let mut c_buffer: [u8; 4] = [0; 4];
304                    output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
305                }
306                output.put_u8(b'"');
307            }
308            Field::Float(f) => output.put_slice(&f.to_string().into_bytes()),
309            Field::UnsignedInt(i) => {
310                output.put_slice(&i.to_string().into_bytes());
311                let c = match protocol_version {
312                    ProtocolVersion::V1 => 'i',
313                    ProtocolVersion::V2 => 'u',
314                };
315                let mut c_buffer: [u8; 4] = [0; 4];
316                output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
317            }
318            Field::Int(i) => {
319                output.put_slice(&i.to_string().into_bytes());
320                output.put_u8(b'i');
321            }
322            Field::Bool(b) => {
323                output.put_slice(&b.to_string().into_bytes());
324            }
325        };
326        output.put_u8(b',');
327    }
328
329    // remove last ','
330    if output.len() > original_len {
331        output.truncate(output.len() - 1);
332    }
333}
334
335fn encode_string(key: &str, output: &mut BytesMut) {
336    for c in key.chars() {
337        if "\\, =".contains(c) {
338            output.put_u8(b'\\');
339        }
340        let mut c_buffer: [u8; 4] = [0; 4];
341        output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
342    }
343}
344
345pub(in crate::sinks) fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
346    if let Some(ts) = timestamp {
347        ts.timestamp_nanos_opt().unwrap()
348    } else {
349        encode_timestamp(Some(Utc::now()))
350    }
351}
352
353pub(in crate::sinks) fn encode_uri(
354    endpoint: &str,
355    path: &str,
356    pairs: &[(&str, Option<String>)],
357) -> crate::Result<Uri> {
358    let mut serializer = url::form_urlencoded::Serializer::new(String::new());
359
360    for pair in pairs {
361        if let Some(v) = &pair.1 {
362            serializer.append_pair(pair.0, v);
363        }
364    }
365
366    let mut url = if endpoint.ends_with('/') {
367        format!("{}{}?{}", endpoint, path, serializer.finish())
368    } else {
369        format!("{}/{}?{}", endpoint, path, serializer.finish())
370    };
371
372    if url.ends_with('?') {
373        url.pop();
374    }
375
376    Ok(url.parse::<Uri>().context(super::UriParseSnafu)?)
377}
378
379#[cfg(test)]
380#[allow(dead_code)]
381pub mod test_util {
382    use std::{fs::File, io::Read};
383
384    use chrono::{DateTime, SecondsFormat, Timelike, Utc, offset::TimeZone};
385    use vector_lib::metric_tags;
386
387    use super::*;
388    use crate::tls;
389
390    pub(crate) const ORG: &str = "my-org";
391    pub(crate) const BUCKET: &str = "my-bucket";
392    pub(crate) const TOKEN: &str = "my-token";
393
394    pub(crate) fn next_database() -> String {
395        format!("testdb{}", Utc::now().timestamp_nanos_opt().unwrap())
396    }
397
398    pub(crate) fn ts() -> DateTime<Utc> {
399        Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
400            .single()
401            .and_then(|t| t.with_nanosecond(11))
402            .expect("invalid timestamp")
403    }
404
405    pub(crate) fn tags() -> MetricTags {
406        metric_tags!(
407            "normal_tag" => "value",
408            "true_tag" => "true",
409            "empty_tag" => "",
410        )
411    }
412
413    pub(crate) fn assert_fields(value: String, fields: Vec<&str>) {
414        let encoded_fields: Vec<&str> = value.split(',').collect();
415
416        assert_eq!(fields.len(), encoded_fields.len());
417
418        for field in fields.into_iter() {
419            assert!(
420                encoded_fields.contains(&field),
421                "Fields: {value} has to have: {field}"
422            )
423        }
424    }
425
426    pub(crate) fn address_v1(secure: bool) -> String {
427        if secure {
428            std::env::var("INFLUXDB_V1_HTTPS_ADDRESS")
429                .unwrap_or_else(|_| "http://localhost:8087".into())
430        } else {
431            std::env::var("INFLUXDB_V1_HTTP_ADDRESS")
432                .unwrap_or_else(|_| "http://localhost:8086".into())
433        }
434    }
435
436    pub(crate) fn address_v2() -> String {
437        std::env::var("INFLUXDB_V2_ADDRESS").unwrap_or_else(|_| "http://localhost:9999".into())
438    }
439
440    // ns.requests,metric_type=distribution,normal_tag=value,true_tag=true avg=1.875,count=8,max=3,median=2,min=1,quantile_0.95=3,sum=15 1542182950000000011
441    //
442    // =>
443    //
444    // ns.requests
445    // metric_type=distribution,normal_tag=value,true_tag=true
446    // avg=1.875,count=8,max=3,median=2,min=1,quantile_0.95=3,sum=15
447    // 1542182950000000011
448    //
449    pub(crate) fn split_line_protocol(line_protocol: &str) -> (&str, &str, String, &str) {
450        let (name, fields) = line_protocol.split_once(' ').unwrap_or_default();
451        // tags and timestamp may not be present
452        let (measurement, tags) = name.split_once(',').unwrap_or((name, ""));
453        let (fields, ts) = fields.split_once(' ').unwrap_or((fields, ""));
454
455        (measurement, tags, fields.to_string(), ts)
456    }
457
458    fn client() -> reqwest::Client {
459        let mut test_ca = Vec::<u8>::new();
460        File::open(tls::TEST_PEM_CA_PATH)
461            .unwrap()
462            .read_to_end(&mut test_ca)
463            .unwrap();
464        let test_ca = reqwest::Certificate::from_pem(&test_ca).unwrap();
465
466        reqwest::Client::builder()
467            .add_root_certificate(test_ca)
468            .build()
469            .unwrap()
470    }
471
472    pub(crate) async fn query_v1(endpoint: &str, query: &str) -> reqwest::Response {
473        client()
474            .get(format!("{endpoint}/query"))
475            .query(&[("q", query)])
476            .send()
477            .await
478            .unwrap()
479    }
480
481    pub(crate) async fn onboarding_v1(endpoint: &str) -> String {
482        let database = next_database();
483        let status = query_v1(endpoint, &format!("create database {database}"))
484            .await
485            .status();
486        assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {status}");
487        // Some times InfluxDB will return OK before it can actually
488        // accept writes to the database, leading to test failures. Test
489        // this with empty writes and loop if it reports the database
490        // does not exist yet.
491        crate::test_util::wait_for(|| {
492            let write_url = format!("{}/write?db={}", endpoint, &database);
493            async move {
494                match client()
495                    .post(&write_url)
496                    .header("Content-Type", "text/plain")
497                    .header("Authorization", &format!("Token {TOKEN}"))
498                    .body("")
499                    .send()
500                    .await
501                    .unwrap()
502                    .status()
503                {
504                    http::StatusCode::NO_CONTENT => true,
505                    http::StatusCode::NOT_FOUND => false,
506                    status => panic!("Unexpected status: {status}"),
507                }
508            }
509        })
510        .await;
511        database
512    }
513
514    pub(crate) async fn cleanup_v1(endpoint: &str, database: &str) {
515        let status = query_v1(endpoint, &format!("drop database {database}"))
516            .await
517            .status();
518        assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {status}");
519    }
520
521    pub(crate) async fn onboarding_v2(endpoint: &str) {
522        let mut body = std::collections::HashMap::new();
523        body.insert("username", "my-user");
524        body.insert("password", "my-password");
525        body.insert("org", ORG);
526        body.insert("bucket", BUCKET);
527        body.insert("token", TOKEN);
528
529        let client = reqwest::Client::builder()
530            .danger_accept_invalid_certs(true)
531            .build()
532            .unwrap();
533
534        let res = client
535            .post(format!("{endpoint}/api/v2/setup"))
536            .json(&body)
537            .header("accept", "application/json")
538            .send()
539            .await
540            .unwrap();
541
542        let status = res.status();
543
544        assert!(
545            status == StatusCode::CREATED || status == StatusCode::UNPROCESSABLE_ENTITY,
546            "UnexpectedStatus: {status}"
547        );
548    }
549
550    pub(crate) fn format_timestamp(timestamp: DateTime<Utc>, format: SecondsFormat) -> String {
551        strip_timestamp(timestamp.to_rfc3339_opts(format, true))
552    }
553
554    // InfluxDB strips off trailing zeros in timestamps in metrics
555    fn strip_timestamp(timestamp: String) -> String {
556        let strip_one = || format!("{}Z", &timestamp[..timestamp.len() - 2]);
557        match timestamp {
558            _ if timestamp.ends_with("0Z") => strip_timestamp(strip_one()),
559            _ if timestamp.ends_with(".Z") => strip_one(),
560            _ => timestamp,
561        }
562    }
563}
564
565#[cfg(test)]
566mod tests {
567    use serde::{Deserialize, Serialize};
568
569    use super::*;
570    use crate::sinks::influxdb::test_util::{assert_fields, tags, ts};
571
572    #[derive(Deserialize, Serialize, Debug, Clone, Default)]
573    #[serde(deny_unknown_fields)]
574    pub struct InfluxDbTestConfig {
575        #[serde(flatten)]
576        pub influxdb1_settings: Option<InfluxDb1Settings>,
577        #[serde(flatten)]
578        pub influxdb2_settings: Option<InfluxDb2Settings>,
579    }
580
581    #[test]
582    fn test_influxdb_settings_both() {
583        let config = r#"
584        bucket = "my-bucket"
585        org = "my-org"
586        token = "my-token"
587        database = "my-database"
588    "#;
589        let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
590        let settings = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings);
591        assert_eq!(
592            settings.expect_err("expected error").to_string(),
593            "Unclear settings. Both version configured v1: InfluxDb1Settings { database: \"my-database\", consistency: None, retention_policy_name: None, username: None, password: None }, v2: InfluxDb2Settings { org: \"my-org\", bucket: \"my-bucket\", token: \"**REDACTED**\" }.".to_owned()
594        );
595    }
596
597    #[test]
598    fn test_influxdb_settings_missing() {
599        let config = r"
600    ";
601        let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
602        let settings = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings);
603        assert_eq!(
604            settings.expect_err("expected error").to_string(),
605            "InfluxDB v1 or v2 should be configured as endpoint.".to_owned()
606        );
607    }
608
609    #[test]
610    fn test_influxdb1_settings() {
611        let config = r#"
612        database = "my-database"
613    "#;
614        let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
615        _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap();
616    }
617
618    #[test]
619    fn test_influxdb2_settings() {
620        let config = r#"
621        bucket = "my-bucket"
622        org = "my-org"
623        token = "my-token"
624    "#;
625        let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
626        _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap();
627    }
628
629    #[test]
630    fn test_influxdb1_test_write_uri() {
631        let settings = InfluxDb1Settings {
632            consistency: Some("quorum".to_owned()),
633            database: "vector_db".to_owned(),
634            retention_policy_name: Some("autogen".to_owned()),
635            username: Some("writer".to_owned()),
636            password: Some("secret".to_owned().into()),
637        };
638
639        let uri = settings
640            .write_uri("http://localhost:8086".to_owned())
641            .unwrap();
642        assert_eq!(
643            "http://localhost:8086/write?consistency=quorum&db=vector_db&rp=autogen&p=secret&u=writer&precision=ns",
644            uri.to_string()
645        )
646    }
647
648    #[test]
649    fn test_influxdb2_test_write_uri() {
650        let settings = InfluxDb2Settings {
651            org: "my-org".to_owned(),
652            bucket: "my-bucket".to_owned(),
653            token: "my-token".to_owned().into(),
654        };
655
656        let uri = settings
657            .write_uri("http://localhost:9999".to_owned())
658            .unwrap();
659        assert_eq!(
660            "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns",
661            uri.to_string()
662        )
663    }
664
665    #[test]
666    fn test_influxdb1_test_healthcheck_uri() {
667        let settings = InfluxDb1Settings {
668            consistency: Some("quorum".to_owned()),
669            database: "vector_db".to_owned(),
670            retention_policy_name: Some("autogen".to_owned()),
671            username: Some("writer".to_owned()),
672            password: Some("secret".to_owned().into()),
673        };
674
675        let uri = settings
676            .healthcheck_uri("http://localhost:8086".to_owned())
677            .unwrap();
678        assert_eq!("http://localhost:8086/ping", uri.to_string())
679    }
680
681    #[test]
682    fn test_influxdb2_test_healthcheck_uri() {
683        let settings = InfluxDb2Settings {
684            org: "my-org".to_owned(),
685            bucket: "my-bucket".to_owned(),
686            token: "my-token".to_owned().into(),
687        };
688
689        let uri = settings
690            .healthcheck_uri("http://localhost:9999".to_owned())
691            .unwrap();
692        assert_eq!("http://localhost:9999/ping", uri.to_string())
693    }
694
695    #[test]
696    fn test_encode_tags() {
697        let mut value = BytesMut::new();
698        encode_tags(tags(), &mut value);
699
700        assert_eq!(value, "normal_tag=value,true_tag=true");
701
702        let tags_to_escape = vec![
703            ("tag".to_owned(), "val=ue".to_owned()),
704            ("name escape".to_owned(), "true".to_owned()),
705            ("value_escape".to_owned(), "value escape".to_owned()),
706            ("a_first_place".to_owned(), "10".to_owned()),
707        ]
708        .into_iter()
709        .collect();
710
711        let mut value = BytesMut::new();
712        encode_tags(tags_to_escape, &mut value);
713        assert_eq!(
714            value,
715            "a_first_place=10,name\\ escape=true,tag=val\\=ue,value_escape=value\\ escape"
716        );
717    }
718
719    #[test]
720    fn tags_order() {
721        let mut value = BytesMut::new();
722        encode_tags(
723            vec![
724                ("a", "value"),
725                ("b", "value"),
726                ("c", "value"),
727                ("d", "value"),
728                ("e", "value"),
729            ]
730            .into_iter()
731            .map(|(k, v)| (k.to_owned(), v.to_owned()))
732            .collect(),
733            &mut value,
734        );
735        assert_eq!(value, "a=value,b=value,c=value,d=value,e=value");
736    }
737
738    #[test]
739    fn test_encode_fields_v1() {
740        let fields = vec![
741            ("field_string".into(), Field::String("string value".into())),
742            (
743                "field_string_escape".into(),
744                Field::String("string\\val\"ue".into()),
745            ),
746            ("field_float".into(), Field::Float(123.45)),
747            ("field_unsigned_int".into(), Field::UnsignedInt(657)),
748            ("field_int".into(), Field::Int(657646)),
749            ("field_bool_true".into(), Field::Bool(true)),
750            ("field_bool_false".into(), Field::Bool(false)),
751            ("escape key".into(), Field::Float(10.0)),
752        ]
753        .into_iter()
754        .collect();
755
756        let mut value = BytesMut::new();
757        encode_fields(ProtocolVersion::V1, fields, &mut value);
758        let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap();
759        assert_fields(
760            value,
761            [
762                "escape\\ key=10",
763                "field_float=123.45",
764                "field_string=\"string value\"",
765                "field_string_escape=\"string\\\\val\\\"ue\"",
766                "field_unsigned_int=657i",
767                "field_int=657646i",
768                "field_bool_true=true",
769                "field_bool_false=false",
770            ]
771            .to_vec(),
772        )
773    }
774
775    #[test]
776    fn test_encode_fields() {
777        let fields = vec![
778            ("field_string".into(), Field::String("string value".into())),
779            (
780                "field_string_escape".into(),
781                Field::String("string\\val\"ue".into()),
782            ),
783            ("field_float".into(), Field::Float(123.45)),
784            ("field_unsigned_int".into(), Field::UnsignedInt(657)),
785            ("field_int".into(), Field::Int(657646)),
786            ("field_bool_true".into(), Field::Bool(true)),
787            ("field_bool_false".into(), Field::Bool(false)),
788            ("escape key".into(), Field::Float(10.0)),
789        ]
790        .into_iter()
791        .collect();
792
793        let mut value = BytesMut::new();
794        encode_fields(ProtocolVersion::V2, fields, &mut value);
795        let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap();
796        assert_fields(
797            value,
798            [
799                "escape\\ key=10",
800                "field_float=123.45",
801                "field_string=\"string value\"",
802                "field_string_escape=\"string\\\\val\\\"ue\"",
803                "field_unsigned_int=657u",
804                "field_int=657646i",
805                "field_bool_true=true",
806                "field_bool_false=false",
807            ]
808            .to_vec(),
809        )
810    }
811
812    #[test]
813    fn test_encode_string() {
814        let mut value = BytesMut::new();
815        encode_string("measurement_name", &mut value);
816        assert_eq!(value, "measurement_name");
817
818        let mut value = BytesMut::new();
819        encode_string("measurement name", &mut value);
820        assert_eq!(value, "measurement\\ name");
821
822        let mut value = BytesMut::new();
823        encode_string("measurement=name", &mut value);
824        assert_eq!(value, "measurement\\=name");
825
826        let mut value = BytesMut::new();
827        encode_string("measurement,name", &mut value);
828        assert_eq!(value, "measurement\\,name");
829    }
830
831    #[test]
832    fn test_encode_timestamp() {
833        let start = Utc::now()
834            .timestamp_nanos_opt()
835            .expect("Timestamp out of range");
836        assert_eq!(encode_timestamp(Some(ts())), 1542182950000000011);
837        assert!(encode_timestamp(None) >= start)
838    }
839
840    #[test]
841    fn test_encode_uri_valid() {
842        let uri = encode_uri(
843            "http://localhost:9999",
844            "api/v2/write",
845            &[
846                ("org", Some("my-org".to_owned())),
847                ("bucket", Some("my-bucket".to_owned())),
848                ("precision", Some("ns".to_owned())),
849            ],
850        )
851        .unwrap();
852        assert_eq!(
853            uri,
854            "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns"
855        );
856
857        let uri = encode_uri(
858            "http://localhost:9999/",
859            "api/v2/write",
860            &[
861                ("org", Some("my-org".to_owned())),
862                ("bucket", Some("my-bucket".to_owned())),
863            ],
864        )
865        .unwrap();
866        assert_eq!(
867            uri,
868            "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket"
869        );
870
871        let uri = encode_uri(
872            "http://localhost:9999",
873            "api/v2/write",
874            &[
875                ("org", Some("Organization name".to_owned())),
876                ("bucket", Some("Bucket=name".to_owned())),
877                ("none", None),
878            ],
879        )
880        .unwrap();
881        assert_eq!(
882            uri,
883            "http://localhost:9999/api/v2/write?org=Organization+name&bucket=Bucket%3Dname"
884        );
885    }
886
887    #[test]
888    fn test_encode_uri_invalid() {
889        encode_uri(
890            "localhost:9999",
891            "api/v2/write",
892            &[
893                ("org", Some("my-org".to_owned())),
894                ("bucket", Some("my-bucket".to_owned())),
895            ],
896        )
897        .unwrap_err();
898    }
899}
900
901#[cfg(feature = "influxdb-integration-tests")]
902#[cfg(test)]
903mod integration_tests {
904    use crate::{
905        config::ProxyConfig,
906        http::HttpClient,
907        sinks::influxdb::{
908            InfluxDb1Settings, InfluxDb2Settings, healthcheck,
909            test_util::{BUCKET, ORG, TOKEN, address_v1, address_v2, next_database, onboarding_v2},
910        },
911    };
912
913    #[tokio::test]
914    async fn influxdb2_healthchecks_ok() {
915        let endpoint = address_v2();
916        onboarding_v2(&endpoint).await;
917
918        let endpoint = address_v2();
919        let influxdb1_settings = None;
920        let influxdb2_settings = Some(InfluxDb2Settings {
921            org: ORG.to_string(),
922            bucket: BUCKET.to_string(),
923            token: TOKEN.to_string().into(),
924        });
925        let proxy = ProxyConfig::default();
926        let client = HttpClient::new(None, &proxy).unwrap();
927
928        healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
929            .unwrap()
930            .await
931            .unwrap()
932    }
933
934    #[tokio::test]
935    #[should_panic]
936    async fn influxdb2_healthchecks_fail() {
937        let endpoint = "http://127.0.0.1:9999".to_string();
938        onboarding_v2(&endpoint).await;
939
940        let influxdb1_settings = None;
941        let influxdb2_settings = Some(InfluxDb2Settings {
942            org: ORG.to_string(),
943            bucket: BUCKET.to_string(),
944            token: TOKEN.to_string().into(),
945        });
946        let proxy = ProxyConfig::default();
947        let client = HttpClient::new(None, &proxy).unwrap();
948
949        healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
950            .unwrap()
951            .await
952            .unwrap();
953    }
954
955    #[tokio::test]
956    async fn influxdb1_healthchecks_ok() {
957        let endpoint = address_v1(false);
958
959        let influxdb1_settings = Some(InfluxDb1Settings {
960            database: next_database(),
961            consistency: None,
962            retention_policy_name: None,
963            username: None,
964            password: None,
965        });
966        let influxdb2_settings = None;
967        let proxy = ProxyConfig::default();
968        let client = HttpClient::new(None, &proxy).unwrap();
969
970        healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
971            .unwrap()
972            .await
973            .unwrap();
974    }
975
976    #[tokio::test]
977    #[should_panic]
978    async fn influxdb1_healthchecks_fail() {
979        let endpoint = "http://127.0.0.1:8086".to_string();
980        let influxdb1_settings = Some(InfluxDb1Settings {
981            database: next_database(),
982            consistency: None,
983            retention_policy_name: None,
984            username: None,
985            password: None,
986        });
987        let influxdb2_settings = None;
988        let proxy = ProxyConfig::default();
989        let client = HttpClient::new(None, &proxy).unwrap();
990
991        healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
992            .unwrap()
993            .await
994            .unwrap();
995    }
996}