1pub mod logs;
2pub mod metrics;
3
4use std::collections::HashMap;
5
6use bytes::{BufMut, BytesMut};
7use chrono::{DateTime, Utc};
8use futures::FutureExt;
9use http::{StatusCode, Uri};
10use snafu::{ResultExt, Snafu};
11use tower::Service;
12use vector_lib::{
13 configurable::configurable_component,
14 event::{KeyString, MetricTags},
15 sensitive_string::SensitiveString,
16};
17
18use crate::http::HttpClient;
19
20pub(in crate::sinks) enum Field {
21 String(String),
23 Float(f64),
25 UnsignedInt(u64),
29 Int(i64),
31 Bool(bool),
33}
34
35#[derive(Clone, Copy, Debug)]
36pub(in crate::sinks) enum ProtocolVersion {
37 V1,
38 V2,
39}
40
41#[derive(Debug, Snafu)]
42enum ConfigError {
43 #[snafu(display("InfluxDB v1 or v2 should be configured as endpoint."))]
44 MissingConfiguration,
45 #[snafu(display(
46 "Unclear settings. Both version configured v1: {:?}, v2: {:?}.",
47 v1_settings,
48 v2_settings
49 ))]
50 BothConfiguration {
51 v1_settings: InfluxDb1Settings,
52 v2_settings: InfluxDb2Settings,
53 },
54}
55
56#[configurable_component]
58#[derive(Clone, Debug)]
59pub struct InfluxDb1Settings {
60 #[configurable(metadata(docs::examples = "vector-database"))]
64 #[configurable(metadata(docs::examples = "iot-store"))]
65 database: String,
66
67 #[configurable(metadata(docs::examples = "any"))]
71 #[configurable(metadata(docs::examples = "one"))]
72 #[configurable(metadata(docs::examples = "quorum"))]
73 #[configurable(metadata(docs::examples = "all"))]
74 consistency: Option<String>,
75
76 #[configurable(metadata(docs::examples = "autogen"))]
80 #[configurable(metadata(docs::examples = "one_day_only"))]
81 retention_policy_name: Option<String>,
82
83 #[configurable(metadata(docs::examples = "todd"))]
87 #[configurable(metadata(docs::examples = "vector-source"))]
88 username: Option<String>,
89
90 #[configurable(metadata(docs::examples = "${INFLUXDB_PASSWORD}"))]
94 #[configurable(metadata(docs::examples = "influxdb4ever"))]
95 password: Option<SensitiveString>,
96}
97
98#[configurable_component]
100#[derive(Clone, Debug)]
101pub struct InfluxDb2Settings {
102 #[configurable(metadata(docs::examples = "my-org"))]
106 #[configurable(metadata(docs::examples = "33f2cff0a28e5b63"))]
107 org: String,
108
109 #[configurable(metadata(docs::examples = "vector-bucket"))]
113 #[configurable(metadata(docs::examples = "4d2225e4d3d49f75"))]
114 bucket: String,
115
116 #[configurable(metadata(docs::examples = "${INFLUXDB_TOKEN}"))]
122 #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
123 token: SensitiveString,
124}
125
126trait InfluxDbSettings: std::fmt::Debug {
127 fn write_uri(&self, endpoint: String) -> crate::Result<Uri>;
128 fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri>;
129 fn token(&self) -> SensitiveString;
130 fn protocol_version(&self) -> ProtocolVersion;
131}
132
133impl InfluxDbSettings for InfluxDb1Settings {
134 fn write_uri(&self, endpoint: String) -> crate::Result<Uri> {
135 encode_uri(
136 &endpoint,
137 "write",
138 &[
139 ("consistency", self.consistency.clone()),
140 ("db", Some(self.database.clone())),
141 ("rp", self.retention_policy_name.clone()),
142 ("p", self.password.as_ref().map(|v| v.inner().to_owned())),
143 ("u", self.username.clone()),
144 ("precision", Some("ns".to_owned())),
145 ],
146 )
147 }
148
149 fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri> {
150 encode_uri(&endpoint, "ping", &[])
151 }
152
153 fn token(&self) -> SensitiveString {
154 SensitiveString::default()
155 }
156
157 fn protocol_version(&self) -> ProtocolVersion {
158 ProtocolVersion::V1
159 }
160}
161
162impl InfluxDbSettings for InfluxDb2Settings {
163 fn write_uri(&self, endpoint: String) -> crate::Result<Uri> {
164 encode_uri(
165 &endpoint,
166 "api/v2/write",
167 &[
168 ("org", Some(self.org.clone())),
169 ("bucket", Some(self.bucket.clone())),
170 ("precision", Some("ns".to_owned())),
171 ],
172 )
173 }
174
175 fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri> {
176 encode_uri(&endpoint, "ping", &[])
177 }
178
179 fn token(&self) -> SensitiveString {
180 self.token.clone()
181 }
182
183 fn protocol_version(&self) -> ProtocolVersion {
184 ProtocolVersion::V2
185 }
186}
187
188fn influxdb_settings(
189 influxdb1_settings: Option<InfluxDb1Settings>,
190 influxdb2_settings: Option<InfluxDb2Settings>,
191) -> Result<Box<dyn InfluxDbSettings>, crate::Error> {
192 match (influxdb1_settings, influxdb2_settings) {
193 (Some(v1_settings), Some(v2_settings)) => Err(ConfigError::BothConfiguration {
194 v1_settings,
195 v2_settings,
196 }
197 .into()),
198 (None, None) => Err(ConfigError::MissingConfiguration.into()),
199 (Some(settings), _) => Ok(Box::new(settings)),
200 (_, Some(settings)) => Ok(Box::new(settings)),
201 }
202}
203
204fn healthcheck(
207 endpoint: String,
208 influxdb1_settings: Option<InfluxDb1Settings>,
209 influxdb2_settings: Option<InfluxDb2Settings>,
210 mut client: HttpClient,
211) -> crate::Result<super::Healthcheck> {
212 let settings = influxdb_settings(influxdb1_settings, influxdb2_settings)?;
213
214 let uri = settings.healthcheck_uri(endpoint)?;
215
216 let request = hyper::Request::get(uri).body(hyper::Body::empty()).unwrap();
217
218 Ok(async move {
219 client
220 .call(request)
221 .await
222 .map_err(|error| error.into())
223 .and_then(|response| match response.status() {
224 StatusCode::OK => Ok(()),
225 StatusCode::NO_CONTENT => Ok(()),
226 other => Err(super::HealthcheckError::UnexpectedStatus { status: other }.into()),
227 })
228 }
229 .boxed())
230}
231
232pub(in crate::sinks) fn influx_line_protocol(
234 protocol_version: ProtocolVersion,
235 measurement: &str,
236 tags: Option<MetricTags>,
237 fields: Option<HashMap<KeyString, Field>>,
238 timestamp: i64,
239 line_protocol: &mut BytesMut,
240) -> Result<(), &'static str> {
241 let unwrapped_fields = fields.unwrap_or_default();
243 if unwrapped_fields.is_empty() {
245 return Err("fields must not be empty");
246 }
247
248 encode_string(measurement, line_protocol);
249
250 let unwrapped_tags = tags.unwrap_or_default();
252 if !unwrapped_tags.is_empty() {
253 line_protocol.put_u8(b',');
254 encode_tags(unwrapped_tags, line_protocol);
255 }
256 line_protocol.put_u8(b' ');
257
258 encode_fields(protocol_version, unwrapped_fields, line_protocol);
260 line_protocol.put_u8(b' ');
261
262 line_protocol.put_slice(×tamp.to_string().into_bytes());
264 line_protocol.put_u8(b'\n');
265 Ok(())
266}
267
268fn encode_tags(tags: MetricTags, output: &mut BytesMut) {
269 let original_len = output.len();
270 for (key, value) in tags.iter_single() {
272 if key.is_empty() || value.is_empty() {
273 continue;
274 }
275 encode_string(key, output);
276 output.put_u8(b'=');
277 encode_string(value, output);
278 output.put_u8(b',');
279 }
280
281 if output.len() > original_len {
283 output.truncate(output.len() - 1);
284 }
285}
286
287fn encode_fields(
288 protocol_version: ProtocolVersion,
289 fields: HashMap<KeyString, Field>,
290 output: &mut BytesMut,
291) {
292 let original_len = output.len();
293 for (key, value) in fields.into_iter() {
294 encode_string(&key, output);
295 output.put_u8(b'=');
296 match value {
297 Field::String(s) => {
298 output.put_u8(b'"');
299 for c in s.chars() {
300 if "\\\"".contains(c) {
301 output.put_u8(b'\\');
302 }
303 let mut c_buffer: [u8; 4] = [0; 4];
304 output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
305 }
306 output.put_u8(b'"');
307 }
308 Field::Float(f) => output.put_slice(&f.to_string().into_bytes()),
309 Field::UnsignedInt(i) => {
310 output.put_slice(&i.to_string().into_bytes());
311 let c = match protocol_version {
312 ProtocolVersion::V1 => 'i',
313 ProtocolVersion::V2 => 'u',
314 };
315 let mut c_buffer: [u8; 4] = [0; 4];
316 output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
317 }
318 Field::Int(i) => {
319 output.put_slice(&i.to_string().into_bytes());
320 output.put_u8(b'i');
321 }
322 Field::Bool(b) => {
323 output.put_slice(&b.to_string().into_bytes());
324 }
325 };
326 output.put_u8(b',');
327 }
328
329 if output.len() > original_len {
331 output.truncate(output.len() - 1);
332 }
333}
334
335fn encode_string(key: &str, output: &mut BytesMut) {
336 for c in key.chars() {
337 if "\\, =".contains(c) {
338 output.put_u8(b'\\');
339 }
340 let mut c_buffer: [u8; 4] = [0; 4];
341 output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
342 }
343}
344
345pub(in crate::sinks) fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
346 if let Some(ts) = timestamp {
347 ts.timestamp_nanos_opt().unwrap()
348 } else {
349 encode_timestamp(Some(Utc::now()))
350 }
351}
352
353pub(in crate::sinks) fn encode_uri(
354 endpoint: &str,
355 path: &str,
356 pairs: &[(&str, Option<String>)],
357) -> crate::Result<Uri> {
358 let mut serializer = url::form_urlencoded::Serializer::new(String::new());
359
360 for pair in pairs {
361 if let Some(v) = &pair.1 {
362 serializer.append_pair(pair.0, v);
363 }
364 }
365
366 let mut url = if endpoint.ends_with('/') {
367 format!("{}{}?{}", endpoint, path, serializer.finish())
368 } else {
369 format!("{}/{}?{}", endpoint, path, serializer.finish())
370 };
371
372 if url.ends_with('?') {
373 url.pop();
374 }
375
376 Ok(url.parse::<Uri>().context(super::UriParseSnafu)?)
377}
378
379#[cfg(test)]
380#[allow(dead_code)]
381pub mod test_util {
382 use std::{fs::File, io::Read};
383
384 use chrono::{DateTime, SecondsFormat, Timelike, Utc, offset::TimeZone};
385 use vector_lib::metric_tags;
386
387 use super::*;
388 use crate::tls;
389
390 pub(crate) const ORG: &str = "my-org";
391 pub(crate) const BUCKET: &str = "my-bucket";
392 pub(crate) const TOKEN: &str = "my-token";
393
394 pub(crate) fn next_database() -> String {
395 format!("testdb{}", Utc::now().timestamp_nanos_opt().unwrap())
396 }
397
398 pub(crate) fn ts() -> DateTime<Utc> {
399 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
400 .single()
401 .and_then(|t| t.with_nanosecond(11))
402 .expect("invalid timestamp")
403 }
404
405 pub(crate) fn tags() -> MetricTags {
406 metric_tags!(
407 "normal_tag" => "value",
408 "true_tag" => "true",
409 "empty_tag" => "",
410 )
411 }
412
413 pub(crate) fn assert_fields(value: String, fields: Vec<&str>) {
414 let encoded_fields: Vec<&str> = value.split(',').collect();
415
416 assert_eq!(fields.len(), encoded_fields.len());
417
418 for field in fields.into_iter() {
419 assert!(
420 encoded_fields.contains(&field),
421 "Fields: {value} has to have: {field}"
422 )
423 }
424 }
425
426 pub(crate) fn address_v1(secure: bool) -> String {
427 if secure {
428 std::env::var("INFLUXDB_V1_HTTPS_ADDRESS")
429 .unwrap_or_else(|_| "http://localhost:8087".into())
430 } else {
431 std::env::var("INFLUXDB_V1_HTTP_ADDRESS")
432 .unwrap_or_else(|_| "http://localhost:8086".into())
433 }
434 }
435
436 pub(crate) fn address_v2() -> String {
437 std::env::var("INFLUXDB_V2_ADDRESS").unwrap_or_else(|_| "http://localhost:9999".into())
438 }
439
440 pub(crate) fn split_line_protocol(line_protocol: &str) -> (&str, &str, String, &str) {
450 let (name, fields) = line_protocol.split_once(' ').unwrap_or_default();
451 let (measurement, tags) = name.split_once(',').unwrap_or((name, ""));
453 let (fields, ts) = fields.split_once(' ').unwrap_or((fields, ""));
454
455 (measurement, tags, fields.to_string(), ts)
456 }
457
458 fn client() -> reqwest::Client {
459 let mut test_ca = Vec::<u8>::new();
460 File::open(tls::TEST_PEM_CA_PATH)
461 .unwrap()
462 .read_to_end(&mut test_ca)
463 .unwrap();
464 let test_ca = reqwest::Certificate::from_pem(&test_ca).unwrap();
465
466 reqwest::Client::builder()
467 .add_root_certificate(test_ca)
468 .build()
469 .unwrap()
470 }
471
472 pub(crate) async fn query_v1(endpoint: &str, query: &str) -> reqwest::Response {
473 client()
474 .get(format!("{endpoint}/query"))
475 .query(&[("q", query)])
476 .send()
477 .await
478 .unwrap()
479 }
480
481 pub(crate) async fn onboarding_v1(endpoint: &str) -> String {
482 let database = next_database();
483 let status = query_v1(endpoint, &format!("create database {database}"))
484 .await
485 .status();
486 assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {status}");
487 crate::test_util::wait_for(|| {
492 let write_url = format!("{}/write?db={}", endpoint, &database);
493 async move {
494 match client()
495 .post(&write_url)
496 .header("Content-Type", "text/plain")
497 .header("Authorization", &format!("Token {TOKEN}"))
498 .body("")
499 .send()
500 .await
501 .unwrap()
502 .status()
503 {
504 http::StatusCode::NO_CONTENT => true,
505 http::StatusCode::NOT_FOUND => false,
506 status => panic!("Unexpected status: {status}"),
507 }
508 }
509 })
510 .await;
511 database
512 }
513
514 pub(crate) async fn cleanup_v1(endpoint: &str, database: &str) {
515 let status = query_v1(endpoint, &format!("drop database {database}"))
516 .await
517 .status();
518 assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {status}");
519 }
520
521 pub(crate) async fn onboarding_v2(endpoint: &str) {
522 let mut body = std::collections::HashMap::new();
523 body.insert("username", "my-user");
524 body.insert("password", "my-password");
525 body.insert("org", ORG);
526 body.insert("bucket", BUCKET);
527 body.insert("token", TOKEN);
528
529 let client = reqwest::Client::builder()
530 .danger_accept_invalid_certs(true)
531 .build()
532 .unwrap();
533
534 let res = client
535 .post(format!("{endpoint}/api/v2/setup"))
536 .json(&body)
537 .header("accept", "application/json")
538 .send()
539 .await
540 .unwrap();
541
542 let status = res.status();
543
544 assert!(
545 status == StatusCode::CREATED || status == StatusCode::UNPROCESSABLE_ENTITY,
546 "UnexpectedStatus: {status}"
547 );
548 }
549
550 pub(crate) fn format_timestamp(timestamp: DateTime<Utc>, format: SecondsFormat) -> String {
551 strip_timestamp(timestamp.to_rfc3339_opts(format, true))
552 }
553
554 fn strip_timestamp(timestamp: String) -> String {
556 let strip_one = || format!("{}Z", ×tamp[..timestamp.len() - 2]);
557 match timestamp {
558 _ if timestamp.ends_with("0Z") => strip_timestamp(strip_one()),
559 _ if timestamp.ends_with(".Z") => strip_one(),
560 _ => timestamp,
561 }
562 }
563}
564
565#[cfg(test)]
566mod tests {
567 use serde::{Deserialize, Serialize};
568
569 use super::*;
570 use crate::sinks::influxdb::test_util::{assert_fields, tags, ts};
571
572 #[derive(Deserialize, Serialize, Debug, Clone, Default)]
573 #[serde(deny_unknown_fields)]
574 pub struct InfluxDbTestConfig {
575 #[serde(flatten)]
576 pub influxdb1_settings: Option<InfluxDb1Settings>,
577 #[serde(flatten)]
578 pub influxdb2_settings: Option<InfluxDb2Settings>,
579 }
580
581 #[test]
582 fn test_influxdb_settings_both() {
583 let config = r#"
584 bucket = "my-bucket"
585 org = "my-org"
586 token = "my-token"
587 database = "my-database"
588 "#;
589 let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
590 let settings = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings);
591 assert_eq!(
592 settings.expect_err("expected error").to_string(),
593 "Unclear settings. Both version configured v1: InfluxDb1Settings { database: \"my-database\", consistency: None, retention_policy_name: None, username: None, password: None }, v2: InfluxDb2Settings { org: \"my-org\", bucket: \"my-bucket\", token: \"**REDACTED**\" }.".to_owned()
594 );
595 }
596
597 #[test]
598 fn test_influxdb_settings_missing() {
599 let config = r"
600 ";
601 let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
602 let settings = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings);
603 assert_eq!(
604 settings.expect_err("expected error").to_string(),
605 "InfluxDB v1 or v2 should be configured as endpoint.".to_owned()
606 );
607 }
608
609 #[test]
610 fn test_influxdb1_settings() {
611 let config = r#"
612 database = "my-database"
613 "#;
614 let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
615 _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap();
616 }
617
618 #[test]
619 fn test_influxdb2_settings() {
620 let config = r#"
621 bucket = "my-bucket"
622 org = "my-org"
623 token = "my-token"
624 "#;
625 let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
626 _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap();
627 }
628
629 #[test]
630 fn test_influxdb1_test_write_uri() {
631 let settings = InfluxDb1Settings {
632 consistency: Some("quorum".to_owned()),
633 database: "vector_db".to_owned(),
634 retention_policy_name: Some("autogen".to_owned()),
635 username: Some("writer".to_owned()),
636 password: Some("secret".to_owned().into()),
637 };
638
639 let uri = settings
640 .write_uri("http://localhost:8086".to_owned())
641 .unwrap();
642 assert_eq!(
643 "http://localhost:8086/write?consistency=quorum&db=vector_db&rp=autogen&p=secret&u=writer&precision=ns",
644 uri.to_string()
645 )
646 }
647
648 #[test]
649 fn test_influxdb2_test_write_uri() {
650 let settings = InfluxDb2Settings {
651 org: "my-org".to_owned(),
652 bucket: "my-bucket".to_owned(),
653 token: "my-token".to_owned().into(),
654 };
655
656 let uri = settings
657 .write_uri("http://localhost:9999".to_owned())
658 .unwrap();
659 assert_eq!(
660 "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns",
661 uri.to_string()
662 )
663 }
664
665 #[test]
666 fn test_influxdb1_test_healthcheck_uri() {
667 let settings = InfluxDb1Settings {
668 consistency: Some("quorum".to_owned()),
669 database: "vector_db".to_owned(),
670 retention_policy_name: Some("autogen".to_owned()),
671 username: Some("writer".to_owned()),
672 password: Some("secret".to_owned().into()),
673 };
674
675 let uri = settings
676 .healthcheck_uri("http://localhost:8086".to_owned())
677 .unwrap();
678 assert_eq!("http://localhost:8086/ping", uri.to_string())
679 }
680
681 #[test]
682 fn test_influxdb2_test_healthcheck_uri() {
683 let settings = InfluxDb2Settings {
684 org: "my-org".to_owned(),
685 bucket: "my-bucket".to_owned(),
686 token: "my-token".to_owned().into(),
687 };
688
689 let uri = settings
690 .healthcheck_uri("http://localhost:9999".to_owned())
691 .unwrap();
692 assert_eq!("http://localhost:9999/ping", uri.to_string())
693 }
694
695 #[test]
696 fn test_encode_tags() {
697 let mut value = BytesMut::new();
698 encode_tags(tags(), &mut value);
699
700 assert_eq!(value, "normal_tag=value,true_tag=true");
701
702 let tags_to_escape = vec![
703 ("tag".to_owned(), "val=ue".to_owned()),
704 ("name escape".to_owned(), "true".to_owned()),
705 ("value_escape".to_owned(), "value escape".to_owned()),
706 ("a_first_place".to_owned(), "10".to_owned()),
707 ]
708 .into_iter()
709 .collect();
710
711 let mut value = BytesMut::new();
712 encode_tags(tags_to_escape, &mut value);
713 assert_eq!(
714 value,
715 "a_first_place=10,name\\ escape=true,tag=val\\=ue,value_escape=value\\ escape"
716 );
717 }
718
719 #[test]
720 fn tags_order() {
721 let mut value = BytesMut::new();
722 encode_tags(
723 vec![
724 ("a", "value"),
725 ("b", "value"),
726 ("c", "value"),
727 ("d", "value"),
728 ("e", "value"),
729 ]
730 .into_iter()
731 .map(|(k, v)| (k.to_owned(), v.to_owned()))
732 .collect(),
733 &mut value,
734 );
735 assert_eq!(value, "a=value,b=value,c=value,d=value,e=value");
736 }
737
738 #[test]
739 fn test_encode_fields_v1() {
740 let fields = vec![
741 ("field_string".into(), Field::String("string value".into())),
742 (
743 "field_string_escape".into(),
744 Field::String("string\\val\"ue".into()),
745 ),
746 ("field_float".into(), Field::Float(123.45)),
747 ("field_unsigned_int".into(), Field::UnsignedInt(657)),
748 ("field_int".into(), Field::Int(657646)),
749 ("field_bool_true".into(), Field::Bool(true)),
750 ("field_bool_false".into(), Field::Bool(false)),
751 ("escape key".into(), Field::Float(10.0)),
752 ]
753 .into_iter()
754 .collect();
755
756 let mut value = BytesMut::new();
757 encode_fields(ProtocolVersion::V1, fields, &mut value);
758 let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap();
759 assert_fields(
760 value,
761 [
762 "escape\\ key=10",
763 "field_float=123.45",
764 "field_string=\"string value\"",
765 "field_string_escape=\"string\\\\val\\\"ue\"",
766 "field_unsigned_int=657i",
767 "field_int=657646i",
768 "field_bool_true=true",
769 "field_bool_false=false",
770 ]
771 .to_vec(),
772 )
773 }
774
775 #[test]
776 fn test_encode_fields() {
777 let fields = vec![
778 ("field_string".into(), Field::String("string value".into())),
779 (
780 "field_string_escape".into(),
781 Field::String("string\\val\"ue".into()),
782 ),
783 ("field_float".into(), Field::Float(123.45)),
784 ("field_unsigned_int".into(), Field::UnsignedInt(657)),
785 ("field_int".into(), Field::Int(657646)),
786 ("field_bool_true".into(), Field::Bool(true)),
787 ("field_bool_false".into(), Field::Bool(false)),
788 ("escape key".into(), Field::Float(10.0)),
789 ]
790 .into_iter()
791 .collect();
792
793 let mut value = BytesMut::new();
794 encode_fields(ProtocolVersion::V2, fields, &mut value);
795 let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap();
796 assert_fields(
797 value,
798 [
799 "escape\\ key=10",
800 "field_float=123.45",
801 "field_string=\"string value\"",
802 "field_string_escape=\"string\\\\val\\\"ue\"",
803 "field_unsigned_int=657u",
804 "field_int=657646i",
805 "field_bool_true=true",
806 "field_bool_false=false",
807 ]
808 .to_vec(),
809 )
810 }
811
812 #[test]
813 fn test_encode_string() {
814 let mut value = BytesMut::new();
815 encode_string("measurement_name", &mut value);
816 assert_eq!(value, "measurement_name");
817
818 let mut value = BytesMut::new();
819 encode_string("measurement name", &mut value);
820 assert_eq!(value, "measurement\\ name");
821
822 let mut value = BytesMut::new();
823 encode_string("measurement=name", &mut value);
824 assert_eq!(value, "measurement\\=name");
825
826 let mut value = BytesMut::new();
827 encode_string("measurement,name", &mut value);
828 assert_eq!(value, "measurement\\,name");
829 }
830
831 #[test]
832 fn test_encode_timestamp() {
833 let start = Utc::now()
834 .timestamp_nanos_opt()
835 .expect("Timestamp out of range");
836 assert_eq!(encode_timestamp(Some(ts())), 1542182950000000011);
837 assert!(encode_timestamp(None) >= start)
838 }
839
840 #[test]
841 fn test_encode_uri_valid() {
842 let uri = encode_uri(
843 "http://localhost:9999",
844 "api/v2/write",
845 &[
846 ("org", Some("my-org".to_owned())),
847 ("bucket", Some("my-bucket".to_owned())),
848 ("precision", Some("ns".to_owned())),
849 ],
850 )
851 .unwrap();
852 assert_eq!(
853 uri,
854 "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns"
855 );
856
857 let uri = encode_uri(
858 "http://localhost:9999/",
859 "api/v2/write",
860 &[
861 ("org", Some("my-org".to_owned())),
862 ("bucket", Some("my-bucket".to_owned())),
863 ],
864 )
865 .unwrap();
866 assert_eq!(
867 uri,
868 "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket"
869 );
870
871 let uri = encode_uri(
872 "http://localhost:9999",
873 "api/v2/write",
874 &[
875 ("org", Some("Organization name".to_owned())),
876 ("bucket", Some("Bucket=name".to_owned())),
877 ("none", None),
878 ],
879 )
880 .unwrap();
881 assert_eq!(
882 uri,
883 "http://localhost:9999/api/v2/write?org=Organization+name&bucket=Bucket%3Dname"
884 );
885 }
886
887 #[test]
888 fn test_encode_uri_invalid() {
889 encode_uri(
890 "localhost:9999",
891 "api/v2/write",
892 &[
893 ("org", Some("my-org".to_owned())),
894 ("bucket", Some("my-bucket".to_owned())),
895 ],
896 )
897 .unwrap_err();
898 }
899}
900
901#[cfg(feature = "influxdb-integration-tests")]
902#[cfg(test)]
903mod integration_tests {
904 use crate::{
905 config::ProxyConfig,
906 http::HttpClient,
907 sinks::influxdb::{
908 InfluxDb1Settings, InfluxDb2Settings, healthcheck,
909 test_util::{BUCKET, ORG, TOKEN, address_v1, address_v2, next_database, onboarding_v2},
910 },
911 };
912
913 #[tokio::test]
914 async fn influxdb2_healthchecks_ok() {
915 let endpoint = address_v2();
916 onboarding_v2(&endpoint).await;
917
918 let endpoint = address_v2();
919 let influxdb1_settings = None;
920 let influxdb2_settings = Some(InfluxDb2Settings {
921 org: ORG.to_string(),
922 bucket: BUCKET.to_string(),
923 token: TOKEN.to_string().into(),
924 });
925 let proxy = ProxyConfig::default();
926 let client = HttpClient::new(None, &proxy).unwrap();
927
928 healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
929 .unwrap()
930 .await
931 .unwrap()
932 }
933
934 #[tokio::test]
935 #[should_panic]
936 async fn influxdb2_healthchecks_fail() {
937 let endpoint = "http://127.0.0.1:9999".to_string();
938 onboarding_v2(&endpoint).await;
939
940 let influxdb1_settings = None;
941 let influxdb2_settings = Some(InfluxDb2Settings {
942 org: ORG.to_string(),
943 bucket: BUCKET.to_string(),
944 token: TOKEN.to_string().into(),
945 });
946 let proxy = ProxyConfig::default();
947 let client = HttpClient::new(None, &proxy).unwrap();
948
949 healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
950 .unwrap()
951 .await
952 .unwrap();
953 }
954
955 #[tokio::test]
956 async fn influxdb1_healthchecks_ok() {
957 let endpoint = address_v1(false);
958
959 let influxdb1_settings = Some(InfluxDb1Settings {
960 database: next_database(),
961 consistency: None,
962 retention_policy_name: None,
963 username: None,
964 password: None,
965 });
966 let influxdb2_settings = None;
967 let proxy = ProxyConfig::default();
968 let client = HttpClient::new(None, &proxy).unwrap();
969
970 healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
971 .unwrap()
972 .await
973 .unwrap();
974 }
975
976 #[tokio::test]
977 #[should_panic]
978 async fn influxdb1_healthchecks_fail() {
979 let endpoint = "http://127.0.0.1:8086".to_string();
980 let influxdb1_settings = Some(InfluxDb1Settings {
981 database: next_database(),
982 consistency: None,
983 retention_policy_name: None,
984 username: None,
985 password: None,
986 });
987 let influxdb2_settings = None;
988 let proxy = ProxyConfig::default();
989 let client = HttpClient::new(None, &proxy).unwrap();
990
991 healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
992 .unwrap()
993 .await
994 .unwrap();
995 }
996}