1pub mod logs;
2pub mod metrics;
3
4use std::collections::HashMap;
5
6use bytes::{BufMut, BytesMut};
7use chrono::{DateTime, Utc};
8use futures::FutureExt;
9use http::{StatusCode, Uri};
10use snafu::{ResultExt, Snafu};
11use tower::Service;
12use vector_lib::configurable::configurable_component;
13use vector_lib::event::{KeyString, MetricTags};
14use vector_lib::sensitive_string::SensitiveString;
15
16use crate::http::HttpClient;
17
18pub(in crate::sinks) enum Field {
19 String(String),
21 Float(f64),
23 UnsignedInt(u64),
27 Int(i64),
29 Bool(bool),
31}
32
33#[derive(Clone, Copy, Debug)]
34pub(in crate::sinks) enum ProtocolVersion {
35 V1,
36 V2,
37}
38
39#[derive(Debug, Snafu)]
40enum ConfigError {
41 #[snafu(display("InfluxDB v1 or v2 should be configured as endpoint."))]
42 MissingConfiguration,
43 #[snafu(display(
44 "Unclear settings. Both version configured v1: {:?}, v2: {:?}.",
45 v1_settings,
46 v2_settings
47 ))]
48 BothConfiguration {
49 v1_settings: InfluxDb1Settings,
50 v2_settings: InfluxDb2Settings,
51 },
52}
53
54#[configurable_component]
56#[derive(Clone, Debug)]
57pub struct InfluxDb1Settings {
58 #[configurable(metadata(docs::examples = "vector-database"))]
62 #[configurable(metadata(docs::examples = "iot-store"))]
63 database: String,
64
65 #[configurable(metadata(docs::examples = "any"))]
69 #[configurable(metadata(docs::examples = "one"))]
70 #[configurable(metadata(docs::examples = "quorum"))]
71 #[configurable(metadata(docs::examples = "all"))]
72 consistency: Option<String>,
73
74 #[configurable(metadata(docs::examples = "autogen"))]
78 #[configurable(metadata(docs::examples = "one_day_only"))]
79 retention_policy_name: Option<String>,
80
81 #[configurable(metadata(docs::examples = "todd"))]
85 #[configurable(metadata(docs::examples = "vector-source"))]
86 username: Option<String>,
87
88 #[configurable(metadata(docs::examples = "${INFLUXDB_PASSWORD}"))]
92 #[configurable(metadata(docs::examples = "influxdb4ever"))]
93 password: Option<SensitiveString>,
94}
95
96#[configurable_component]
98#[derive(Clone, Debug)]
99pub struct InfluxDb2Settings {
100 #[configurable(metadata(docs::examples = "my-org"))]
104 #[configurable(metadata(docs::examples = "33f2cff0a28e5b63"))]
105 org: String,
106
107 #[configurable(metadata(docs::examples = "vector-bucket"))]
111 #[configurable(metadata(docs::examples = "4d2225e4d3d49f75"))]
112 bucket: String,
113
114 #[configurable(metadata(docs::examples = "${INFLUXDB_TOKEN}"))]
120 #[configurable(metadata(docs::examples = "ef8d5de700e7989468166c40fc8a0ccd"))]
121 token: SensitiveString,
122}
123
124trait InfluxDbSettings: std::fmt::Debug {
125 fn write_uri(&self, endpoint: String) -> crate::Result<Uri>;
126 fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri>;
127 fn token(&self) -> SensitiveString;
128 fn protocol_version(&self) -> ProtocolVersion;
129}
130
131impl InfluxDbSettings for InfluxDb1Settings {
132 fn write_uri(&self, endpoint: String) -> crate::Result<Uri> {
133 encode_uri(
134 &endpoint,
135 "write",
136 &[
137 ("consistency", self.consistency.clone()),
138 ("db", Some(self.database.clone())),
139 ("rp", self.retention_policy_name.clone()),
140 ("p", self.password.as_ref().map(|v| v.inner().to_owned())),
141 ("u", self.username.clone()),
142 ("precision", Some("ns".to_owned())),
143 ],
144 )
145 }
146
147 fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri> {
148 encode_uri(&endpoint, "ping", &[])
149 }
150
151 fn token(&self) -> SensitiveString {
152 SensitiveString::default()
153 }
154
155 fn protocol_version(&self) -> ProtocolVersion {
156 ProtocolVersion::V1
157 }
158}
159
160impl InfluxDbSettings for InfluxDb2Settings {
161 fn write_uri(&self, endpoint: String) -> crate::Result<Uri> {
162 encode_uri(
163 &endpoint,
164 "api/v2/write",
165 &[
166 ("org", Some(self.org.clone())),
167 ("bucket", Some(self.bucket.clone())),
168 ("precision", Some("ns".to_owned())),
169 ],
170 )
171 }
172
173 fn healthcheck_uri(&self, endpoint: String) -> crate::Result<Uri> {
174 encode_uri(&endpoint, "ping", &[])
175 }
176
177 fn token(&self) -> SensitiveString {
178 self.token.clone()
179 }
180
181 fn protocol_version(&self) -> ProtocolVersion {
182 ProtocolVersion::V2
183 }
184}
185
186fn influxdb_settings(
187 influxdb1_settings: Option<InfluxDb1Settings>,
188 influxdb2_settings: Option<InfluxDb2Settings>,
189) -> Result<Box<dyn InfluxDbSettings>, crate::Error> {
190 match (influxdb1_settings, influxdb2_settings) {
191 (Some(v1_settings), Some(v2_settings)) => Err(ConfigError::BothConfiguration {
192 v1_settings,
193 v2_settings,
194 }
195 .into()),
196 (None, None) => Err(ConfigError::MissingConfiguration.into()),
197 (Some(settings), _) => Ok(Box::new(settings)),
198 (_, Some(settings)) => Ok(Box::new(settings)),
199 }
200}
201
202fn healthcheck(
205 endpoint: String,
206 influxdb1_settings: Option<InfluxDb1Settings>,
207 influxdb2_settings: Option<InfluxDb2Settings>,
208 mut client: HttpClient,
209) -> crate::Result<super::Healthcheck> {
210 let settings = influxdb_settings(influxdb1_settings, influxdb2_settings)?;
211
212 let uri = settings.healthcheck_uri(endpoint)?;
213
214 let request = hyper::Request::get(uri).body(hyper::Body::empty()).unwrap();
215
216 Ok(async move {
217 client
218 .call(request)
219 .await
220 .map_err(|error| error.into())
221 .and_then(|response| match response.status() {
222 StatusCode::OK => Ok(()),
223 StatusCode::NO_CONTENT => Ok(()),
224 other => Err(super::HealthcheckError::UnexpectedStatus { status: other }.into()),
225 })
226 }
227 .boxed())
228}
229
230pub(in crate::sinks) fn influx_line_protocol(
232 protocol_version: ProtocolVersion,
233 measurement: &str,
234 tags: Option<MetricTags>,
235 fields: Option<HashMap<KeyString, Field>>,
236 timestamp: i64,
237 line_protocol: &mut BytesMut,
238) -> Result<(), &'static str> {
239 let unwrapped_fields = fields.unwrap_or_default();
241 if unwrapped_fields.is_empty() {
243 return Err("fields must not be empty");
244 }
245
246 encode_string(measurement, line_protocol);
247
248 let unwrapped_tags = tags.unwrap_or_default();
250 if !unwrapped_tags.is_empty() {
251 line_protocol.put_u8(b',');
252 encode_tags(unwrapped_tags, line_protocol);
253 }
254 line_protocol.put_u8(b' ');
255
256 encode_fields(protocol_version, unwrapped_fields, line_protocol);
258 line_protocol.put_u8(b' ');
259
260 line_protocol.put_slice(×tamp.to_string().into_bytes());
262 line_protocol.put_u8(b'\n');
263 Ok(())
264}
265
266fn encode_tags(tags: MetricTags, output: &mut BytesMut) {
267 let original_len = output.len();
268 for (key, value) in tags.iter_single() {
270 if key.is_empty() || value.is_empty() {
271 continue;
272 }
273 encode_string(key, output);
274 output.put_u8(b'=');
275 encode_string(value, output);
276 output.put_u8(b',');
277 }
278
279 if output.len() > original_len {
281 output.truncate(output.len() - 1);
282 }
283}
284
285fn encode_fields(
286 protocol_version: ProtocolVersion,
287 fields: HashMap<KeyString, Field>,
288 output: &mut BytesMut,
289) {
290 let original_len = output.len();
291 for (key, value) in fields.into_iter() {
292 encode_string(&key, output);
293 output.put_u8(b'=');
294 match value {
295 Field::String(s) => {
296 output.put_u8(b'"');
297 for c in s.chars() {
298 if "\\\"".contains(c) {
299 output.put_u8(b'\\');
300 }
301 let mut c_buffer: [u8; 4] = [0; 4];
302 output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
303 }
304 output.put_u8(b'"');
305 }
306 Field::Float(f) => output.put_slice(&f.to_string().into_bytes()),
307 Field::UnsignedInt(i) => {
308 output.put_slice(&i.to_string().into_bytes());
309 let c = match protocol_version {
310 ProtocolVersion::V1 => 'i',
311 ProtocolVersion::V2 => 'u',
312 };
313 let mut c_buffer: [u8; 4] = [0; 4];
314 output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
315 }
316 Field::Int(i) => {
317 output.put_slice(&i.to_string().into_bytes());
318 output.put_u8(b'i');
319 }
320 Field::Bool(b) => {
321 output.put_slice(&b.to_string().into_bytes());
322 }
323 };
324 output.put_u8(b',');
325 }
326
327 if output.len() > original_len {
329 output.truncate(output.len() - 1);
330 }
331}
332
333fn encode_string(key: &str, output: &mut BytesMut) {
334 for c in key.chars() {
335 if "\\, =".contains(c) {
336 output.put_u8(b'\\');
337 }
338 let mut c_buffer: [u8; 4] = [0; 4];
339 output.put_slice(c.encode_utf8(&mut c_buffer).as_bytes());
340 }
341}
342
343pub(in crate::sinks) fn encode_timestamp(timestamp: Option<DateTime<Utc>>) -> i64 {
344 if let Some(ts) = timestamp {
345 ts.timestamp_nanos_opt().unwrap()
346 } else {
347 encode_timestamp(Some(Utc::now()))
348 }
349}
350
351pub(in crate::sinks) fn encode_uri(
352 endpoint: &str,
353 path: &str,
354 pairs: &[(&str, Option<String>)],
355) -> crate::Result<Uri> {
356 let mut serializer = url::form_urlencoded::Serializer::new(String::new());
357
358 for pair in pairs {
359 if let Some(v) = &pair.1 {
360 serializer.append_pair(pair.0, v);
361 }
362 }
363
364 let mut url = if endpoint.ends_with('/') {
365 format!("{}{}?{}", endpoint, path, serializer.finish())
366 } else {
367 format!("{}/{}?{}", endpoint, path, serializer.finish())
368 };
369
370 if url.ends_with('?') {
371 url.pop();
372 }
373
374 Ok(url.parse::<Uri>().context(super::UriParseSnafu)?)
375}
376
377#[cfg(test)]
378#[allow(dead_code)]
379pub mod test_util {
380 use std::{fs::File, io::Read};
381
382 use chrono::{offset::TimeZone, DateTime, SecondsFormat, Timelike, Utc};
383 use vector_lib::metric_tags;
384
385 use super::*;
386 use crate::tls;
387
388 pub(crate) const ORG: &str = "my-org";
389 pub(crate) const BUCKET: &str = "my-bucket";
390 pub(crate) const TOKEN: &str = "my-token";
391
392 pub(crate) fn next_database() -> String {
393 format!("testdb{}", Utc::now().timestamp_nanos_opt().unwrap())
394 }
395
396 pub(crate) fn ts() -> DateTime<Utc> {
397 Utc.with_ymd_and_hms(2018, 11, 14, 8, 9, 10)
398 .single()
399 .and_then(|t| t.with_nanosecond(11))
400 .expect("invalid timestamp")
401 }
402
403 pub(crate) fn tags() -> MetricTags {
404 metric_tags!(
405 "normal_tag" => "value",
406 "true_tag" => "true",
407 "empty_tag" => "",
408 )
409 }
410
411 pub(crate) fn assert_fields(value: String, fields: Vec<&str>) {
412 let encoded_fields: Vec<&str> = value.split(',').collect();
413
414 assert_eq!(fields.len(), encoded_fields.len());
415
416 for field in fields.into_iter() {
417 assert!(
418 encoded_fields.contains(&field),
419 "Fields: {value} has to have: {field}"
420 )
421 }
422 }
423
424 pub(crate) fn address_v1(secure: bool) -> String {
425 if secure {
426 std::env::var("INFLUXDB_V1_HTTPS_ADDRESS")
427 .unwrap_or_else(|_| "http://localhost:8087".into())
428 } else {
429 std::env::var("INFLUXDB_V1_HTTP_ADDRESS")
430 .unwrap_or_else(|_| "http://localhost:8086".into())
431 }
432 }
433
434 pub(crate) fn address_v2() -> String {
435 std::env::var("INFLUXDB_V2_ADDRESS").unwrap_or_else(|_| "http://localhost:9999".into())
436 }
437
438 pub(crate) fn split_line_protocol(line_protocol: &str) -> (&str, &str, String, &str) {
448 let (name, fields) = line_protocol.split_once(' ').unwrap_or_default();
449 let (measurement, tags) = name.split_once(',').unwrap_or((name, ""));
451 let (fields, ts) = fields.split_once(' ').unwrap_or((fields, ""));
452
453 (measurement, tags, fields.to_string(), ts)
454 }
455
456 fn client() -> reqwest::Client {
457 let mut test_ca = Vec::<u8>::new();
458 File::open(tls::TEST_PEM_CA_PATH)
459 .unwrap()
460 .read_to_end(&mut test_ca)
461 .unwrap();
462 let test_ca = reqwest::Certificate::from_pem(&test_ca).unwrap();
463
464 reqwest::Client::builder()
465 .add_root_certificate(test_ca)
466 .build()
467 .unwrap()
468 }
469
470 pub(crate) async fn query_v1(endpoint: &str, query: &str) -> reqwest::Response {
471 client()
472 .get(format!("{endpoint}/query"))
473 .query(&[("q", query)])
474 .send()
475 .await
476 .unwrap()
477 }
478
479 pub(crate) async fn onboarding_v1(endpoint: &str) -> String {
480 let database = next_database();
481 let status = query_v1(endpoint, &format!("create database {database}"))
482 .await
483 .status();
484 assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {status}");
485 crate::test_util::wait_for(|| {
490 let write_url = format!("{}/write?db={}", endpoint, &database);
491 async move {
492 match client()
493 .post(&write_url)
494 .header("Content-Type", "text/plain")
495 .header("Authorization", &format!("Token {TOKEN}"))
496 .body("")
497 .send()
498 .await
499 .unwrap()
500 .status()
501 {
502 http::StatusCode::NO_CONTENT => true,
503 http::StatusCode::NOT_FOUND => false,
504 status => panic!("Unexpected status: {status}"),
505 }
506 }
507 })
508 .await;
509 database
510 }
511
512 pub(crate) async fn cleanup_v1(endpoint: &str, database: &str) {
513 let status = query_v1(endpoint, &format!("drop database {database}"))
514 .await
515 .status();
516 assert_eq!(status, http::StatusCode::OK, "UnexpectedStatus: {status}");
517 }
518
519 pub(crate) async fn onboarding_v2(endpoint: &str) {
520 let mut body = std::collections::HashMap::new();
521 body.insert("username", "my-user");
522 body.insert("password", "my-password");
523 body.insert("org", ORG);
524 body.insert("bucket", BUCKET);
525 body.insert("token", TOKEN);
526
527 let client = reqwest::Client::builder()
528 .danger_accept_invalid_certs(true)
529 .build()
530 .unwrap();
531
532 let res = client
533 .post(format!("{endpoint}/api/v2/setup"))
534 .json(&body)
535 .header("accept", "application/json")
536 .send()
537 .await
538 .unwrap();
539
540 let status = res.status();
541
542 assert!(
543 status == StatusCode::CREATED || status == StatusCode::UNPROCESSABLE_ENTITY,
544 "UnexpectedStatus: {status}"
545 );
546 }
547
548 pub(crate) fn format_timestamp(timestamp: DateTime<Utc>, format: SecondsFormat) -> String {
549 strip_timestamp(timestamp.to_rfc3339_opts(format, true))
550 }
551
552 fn strip_timestamp(timestamp: String) -> String {
554 let strip_one = || format!("{}Z", ×tamp[..timestamp.len() - 2]);
555 match timestamp {
556 _ if timestamp.ends_with("0Z") => strip_timestamp(strip_one()),
557 _ if timestamp.ends_with(".Z") => strip_one(),
558 _ => timestamp,
559 }
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use serde::{Deserialize, Serialize};
566
567 use super::*;
568 use crate::sinks::influxdb::test_util::{assert_fields, tags, ts};
569
570 #[derive(Deserialize, Serialize, Debug, Clone, Default)]
571 #[serde(deny_unknown_fields)]
572 pub struct InfluxDbTestConfig {
573 #[serde(flatten)]
574 pub influxdb1_settings: Option<InfluxDb1Settings>,
575 #[serde(flatten)]
576 pub influxdb2_settings: Option<InfluxDb2Settings>,
577 }
578
579 #[test]
580 fn test_influxdb_settings_both() {
581 let config = r#"
582 bucket = "my-bucket"
583 org = "my-org"
584 token = "my-token"
585 database = "my-database"
586 "#;
587 let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
588 let settings = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings);
589 assert_eq!(
590 settings.expect_err("expected error").to_string(),
591 "Unclear settings. Both version configured v1: InfluxDb1Settings { database: \"my-database\", consistency: None, retention_policy_name: None, username: None, password: None }, v2: InfluxDb2Settings { org: \"my-org\", bucket: \"my-bucket\", token: \"**REDACTED**\" }.".to_owned()
592 );
593 }
594
595 #[test]
596 fn test_influxdb_settings_missing() {
597 let config = r"
598 ";
599 let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
600 let settings = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings);
601 assert_eq!(
602 settings.expect_err("expected error").to_string(),
603 "InfluxDB v1 or v2 should be configured as endpoint.".to_owned()
604 );
605 }
606
607 #[test]
608 fn test_influxdb1_settings() {
609 let config = r#"
610 database = "my-database"
611 "#;
612 let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
613 _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap();
614 }
615
616 #[test]
617 fn test_influxdb2_settings() {
618 let config = r#"
619 bucket = "my-bucket"
620 org = "my-org"
621 token = "my-token"
622 "#;
623 let config: InfluxDbTestConfig = toml::from_str(config).unwrap();
624 _ = influxdb_settings(config.influxdb1_settings, config.influxdb2_settings).unwrap();
625 }
626
627 #[test]
628 fn test_influxdb1_test_write_uri() {
629 let settings = InfluxDb1Settings {
630 consistency: Some("quorum".to_owned()),
631 database: "vector_db".to_owned(),
632 retention_policy_name: Some("autogen".to_owned()),
633 username: Some("writer".to_owned()),
634 password: Some("secret".to_owned().into()),
635 };
636
637 let uri = settings
638 .write_uri("http://localhost:8086".to_owned())
639 .unwrap();
640 assert_eq!("http://localhost:8086/write?consistency=quorum&db=vector_db&rp=autogen&p=secret&u=writer&precision=ns", uri.to_string())
641 }
642
643 #[test]
644 fn test_influxdb2_test_write_uri() {
645 let settings = InfluxDb2Settings {
646 org: "my-org".to_owned(),
647 bucket: "my-bucket".to_owned(),
648 token: "my-token".to_owned().into(),
649 };
650
651 let uri = settings
652 .write_uri("http://localhost:9999".to_owned())
653 .unwrap();
654 assert_eq!(
655 "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns",
656 uri.to_string()
657 )
658 }
659
660 #[test]
661 fn test_influxdb1_test_healthcheck_uri() {
662 let settings = InfluxDb1Settings {
663 consistency: Some("quorum".to_owned()),
664 database: "vector_db".to_owned(),
665 retention_policy_name: Some("autogen".to_owned()),
666 username: Some("writer".to_owned()),
667 password: Some("secret".to_owned().into()),
668 };
669
670 let uri = settings
671 .healthcheck_uri("http://localhost:8086".to_owned())
672 .unwrap();
673 assert_eq!("http://localhost:8086/ping", uri.to_string())
674 }
675
676 #[test]
677 fn test_influxdb2_test_healthcheck_uri() {
678 let settings = InfluxDb2Settings {
679 org: "my-org".to_owned(),
680 bucket: "my-bucket".to_owned(),
681 token: "my-token".to_owned().into(),
682 };
683
684 let uri = settings
685 .healthcheck_uri("http://localhost:9999".to_owned())
686 .unwrap();
687 assert_eq!("http://localhost:9999/ping", uri.to_string())
688 }
689
690 #[test]
691 fn test_encode_tags() {
692 let mut value = BytesMut::new();
693 encode_tags(tags(), &mut value);
694
695 assert_eq!(value, "normal_tag=value,true_tag=true");
696
697 let tags_to_escape = vec![
698 ("tag".to_owned(), "val=ue".to_owned()),
699 ("name escape".to_owned(), "true".to_owned()),
700 ("value_escape".to_owned(), "value escape".to_owned()),
701 ("a_first_place".to_owned(), "10".to_owned()),
702 ]
703 .into_iter()
704 .collect();
705
706 let mut value = BytesMut::new();
707 encode_tags(tags_to_escape, &mut value);
708 assert_eq!(
709 value,
710 "a_first_place=10,name\\ escape=true,tag=val\\=ue,value_escape=value\\ escape"
711 );
712 }
713
714 #[test]
715 fn tags_order() {
716 let mut value = BytesMut::new();
717 encode_tags(
718 vec![
719 ("a", "value"),
720 ("b", "value"),
721 ("c", "value"),
722 ("d", "value"),
723 ("e", "value"),
724 ]
725 .into_iter()
726 .map(|(k, v)| (k.to_owned(), v.to_owned()))
727 .collect(),
728 &mut value,
729 );
730 assert_eq!(value, "a=value,b=value,c=value,d=value,e=value");
731 }
732
733 #[test]
734 fn test_encode_fields_v1() {
735 let fields = vec![
736 ("field_string".into(), Field::String("string value".into())),
737 (
738 "field_string_escape".into(),
739 Field::String("string\\val\"ue".into()),
740 ),
741 ("field_float".into(), Field::Float(123.45)),
742 ("field_unsigned_int".into(), Field::UnsignedInt(657)),
743 ("field_int".into(), Field::Int(657646)),
744 ("field_bool_true".into(), Field::Bool(true)),
745 ("field_bool_false".into(), Field::Bool(false)),
746 ("escape key".into(), Field::Float(10.0)),
747 ]
748 .into_iter()
749 .collect();
750
751 let mut value = BytesMut::new();
752 encode_fields(ProtocolVersion::V1, fields, &mut value);
753 let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap();
754 assert_fields(
755 value,
756 [
757 "escape\\ key=10",
758 "field_float=123.45",
759 "field_string=\"string value\"",
760 "field_string_escape=\"string\\\\val\\\"ue\"",
761 "field_unsigned_int=657i",
762 "field_int=657646i",
763 "field_bool_true=true",
764 "field_bool_false=false",
765 ]
766 .to_vec(),
767 )
768 }
769
770 #[test]
771 fn test_encode_fields() {
772 let fields = vec![
773 ("field_string".into(), Field::String("string value".into())),
774 (
775 "field_string_escape".into(),
776 Field::String("string\\val\"ue".into()),
777 ),
778 ("field_float".into(), Field::Float(123.45)),
779 ("field_unsigned_int".into(), Field::UnsignedInt(657)),
780 ("field_int".into(), Field::Int(657646)),
781 ("field_bool_true".into(), Field::Bool(true)),
782 ("field_bool_false".into(), Field::Bool(false)),
783 ("escape key".into(), Field::Float(10.0)),
784 ]
785 .into_iter()
786 .collect();
787
788 let mut value = BytesMut::new();
789 encode_fields(ProtocolVersion::V2, fields, &mut value);
790 let value = String::from_utf8(value.freeze().as_ref().to_owned()).unwrap();
791 assert_fields(
792 value,
793 [
794 "escape\\ key=10",
795 "field_float=123.45",
796 "field_string=\"string value\"",
797 "field_string_escape=\"string\\\\val\\\"ue\"",
798 "field_unsigned_int=657u",
799 "field_int=657646i",
800 "field_bool_true=true",
801 "field_bool_false=false",
802 ]
803 .to_vec(),
804 )
805 }
806
807 #[test]
808 fn test_encode_string() {
809 let mut value = BytesMut::new();
810 encode_string("measurement_name", &mut value);
811 assert_eq!(value, "measurement_name");
812
813 let mut value = BytesMut::new();
814 encode_string("measurement name", &mut value);
815 assert_eq!(value, "measurement\\ name");
816
817 let mut value = BytesMut::new();
818 encode_string("measurement=name", &mut value);
819 assert_eq!(value, "measurement\\=name");
820
821 let mut value = BytesMut::new();
822 encode_string("measurement,name", &mut value);
823 assert_eq!(value, "measurement\\,name");
824 }
825
826 #[test]
827 fn test_encode_timestamp() {
828 let start = Utc::now()
829 .timestamp_nanos_opt()
830 .expect("Timestamp out of range");
831 assert_eq!(encode_timestamp(Some(ts())), 1542182950000000011);
832 assert!(encode_timestamp(None) >= start)
833 }
834
835 #[test]
836 fn test_encode_uri_valid() {
837 let uri = encode_uri(
838 "http://localhost:9999",
839 "api/v2/write",
840 &[
841 ("org", Some("my-org".to_owned())),
842 ("bucket", Some("my-bucket".to_owned())),
843 ("precision", Some("ns".to_owned())),
844 ],
845 )
846 .unwrap();
847 assert_eq!(
848 uri,
849 "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket&precision=ns"
850 );
851
852 let uri = encode_uri(
853 "http://localhost:9999/",
854 "api/v2/write",
855 &[
856 ("org", Some("my-org".to_owned())),
857 ("bucket", Some("my-bucket".to_owned())),
858 ],
859 )
860 .unwrap();
861 assert_eq!(
862 uri,
863 "http://localhost:9999/api/v2/write?org=my-org&bucket=my-bucket"
864 );
865
866 let uri = encode_uri(
867 "http://localhost:9999",
868 "api/v2/write",
869 &[
870 ("org", Some("Organization name".to_owned())),
871 ("bucket", Some("Bucket=name".to_owned())),
872 ("none", None),
873 ],
874 )
875 .unwrap();
876 assert_eq!(
877 uri,
878 "http://localhost:9999/api/v2/write?org=Organization+name&bucket=Bucket%3Dname"
879 );
880 }
881
882 #[test]
883 fn test_encode_uri_invalid() {
884 encode_uri(
885 "localhost:9999",
886 "api/v2/write",
887 &[
888 ("org", Some("my-org".to_owned())),
889 ("bucket", Some("my-bucket".to_owned())),
890 ],
891 )
892 .unwrap_err();
893 }
894}
895
896#[cfg(feature = "influxdb-integration-tests")]
897#[cfg(test)]
898mod integration_tests {
899 use crate::{
900 config::ProxyConfig,
901 http::HttpClient,
902 sinks::influxdb::{
903 healthcheck,
904 test_util::{address_v1, address_v2, next_database, onboarding_v2, BUCKET, ORG, TOKEN},
905 InfluxDb1Settings, InfluxDb2Settings,
906 },
907 };
908
909 #[tokio::test]
910 async fn influxdb2_healthchecks_ok() {
911 let endpoint = address_v2();
912 onboarding_v2(&endpoint).await;
913
914 let endpoint = address_v2();
915 let influxdb1_settings = None;
916 let influxdb2_settings = Some(InfluxDb2Settings {
917 org: ORG.to_string(),
918 bucket: BUCKET.to_string(),
919 token: TOKEN.to_string().into(),
920 });
921 let proxy = ProxyConfig::default();
922 let client = HttpClient::new(None, &proxy).unwrap();
923
924 healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
925 .unwrap()
926 .await
927 .unwrap()
928 }
929
930 #[tokio::test]
931 #[should_panic]
932 async fn influxdb2_healthchecks_fail() {
933 let endpoint = "http://127.0.0.1:9999".to_string();
934 onboarding_v2(&endpoint).await;
935
936 let influxdb1_settings = None;
937 let influxdb2_settings = Some(InfluxDb2Settings {
938 org: ORG.to_string(),
939 bucket: BUCKET.to_string(),
940 token: TOKEN.to_string().into(),
941 });
942 let proxy = ProxyConfig::default();
943 let client = HttpClient::new(None, &proxy).unwrap();
944
945 healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
946 .unwrap()
947 .await
948 .unwrap();
949 }
950
951 #[tokio::test]
952 async fn influxdb1_healthchecks_ok() {
953 let endpoint = address_v1(false);
954
955 let influxdb1_settings = Some(InfluxDb1Settings {
956 database: next_database(),
957 consistency: None,
958 retention_policy_name: None,
959 username: None,
960 password: None,
961 });
962 let influxdb2_settings = None;
963 let proxy = ProxyConfig::default();
964 let client = HttpClient::new(None, &proxy).unwrap();
965
966 healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
967 .unwrap()
968 .await
969 .unwrap();
970 }
971
972 #[tokio::test]
973 #[should_panic]
974 async fn influxdb1_healthchecks_fail() {
975 let endpoint = "http://127.0.0.1:8086".to_string();
976 let influxdb1_settings = Some(InfluxDb1Settings {
977 database: next_database(),
978 consistency: None,
979 retention_policy_name: None,
980 username: None,
981 password: None,
982 });
983 let influxdb2_settings = None;
984 let proxy = ProxyConfig::default();
985 let client = HttpClient::new(None, &proxy).unwrap();
986
987 healthcheck(endpoint, influxdb1_settings, influxdb2_settings, client)
988 .unwrap()
989 .await
990 .unwrap();
991 }
992}