1use std::collections::{HashMap, HashSet};
2
3use bytes::{Bytes, BytesMut};
4use futures::SinkExt;
5use http::{Request, Uri};
6use indoc::indoc;
7use vrl::event_path;
8use vrl::path::OwnedValuePath;
9use vrl::value::Kind;
10
11use vector_lib::config::log_schema;
12use vector_lib::configurable::configurable_component;
13use vector_lib::lookup::lookup_v2::OptionalValuePath;
14use vector_lib::lookup::PathPrefix;
15use vector_lib::schema;
16
17use super::{
18 encode_timestamp, healthcheck, influx_line_protocol, influxdb_settings, Field,
19 InfluxDb1Settings, InfluxDb2Settings, ProtocolVersion,
20};
21use crate::{
22 codecs::Transformer,
23 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
24 event::{Event, KeyString, MetricTags, Value},
25 http::HttpClient,
26 internal_events::InfluxdbEncodingError,
27 sinks::{
28 util::{
29 http::{BatchedHttpSink, HttpEventEncoder, HttpSink},
30 BatchConfig, Buffer, Compression, SinkBatchSettings, TowerRequestConfig,
31 },
32 Healthcheck, VectorSink,
33 },
34 tls::{TlsConfig, TlsSettings},
35};
36
37#[derive(Clone, Copy, Debug, Default)]
38pub struct InfluxDbLogsDefaultBatchSettings;
39
40impl SinkBatchSettings for InfluxDbLogsDefaultBatchSettings {
41 const MAX_EVENTS: Option<usize> = None;
42 const MAX_BYTES: Option<usize> = Some(1_000_000);
43 const TIMEOUT_SECS: f64 = 1.0;
44}
45
46#[configurable_component(sink("influxdb_logs", "Deliver log event data to InfluxDB."))]
48#[derive(Clone, Debug, Default)]
49#[serde(deny_unknown_fields)]
50pub struct InfluxDbLogsConfig {
51 #[configurable(
56 deprecated = "This field is deprecated, and `measurement` should be used instead."
57 )]
58 #[configurable(metadata(docs::examples = "service"))]
59 pub namespace: Option<String>,
60
61 #[configurable(metadata(docs::examples = "vector-logs"))]
63 pub measurement: Option<String>,
64
65 #[configurable(metadata(docs::examples = "http://localhost:8086"))]
69 pub endpoint: String,
70
71 #[serde(default)]
76 #[configurable(metadata(docs::examples = "field1"))]
77 #[configurable(metadata(docs::examples = "parent.child_field"))]
78 pub tags: Vec<KeyString>,
79
80 #[serde(flatten)]
81 pub influxdb1_settings: Option<InfluxDb1Settings>,
82
83 #[serde(flatten)]
84 pub influxdb2_settings: Option<InfluxDb2Settings>,
85
86 #[configurable(derived)]
87 #[serde(skip_serializing_if = "crate::serde::is_default", default)]
88 pub encoding: Transformer,
89
90 #[configurable(derived)]
91 #[serde(default)]
92 pub batch: BatchConfig<InfluxDbLogsDefaultBatchSettings>,
93
94 #[configurable(derived)]
95 #[serde(default)]
96 pub request: TowerRequestConfig,
97
98 #[configurable(derived)]
99 pub tls: Option<TlsConfig>,
100
101 #[configurable(derived)]
102 #[serde(
103 default,
104 deserialize_with = "crate::serde::bool_or_struct",
105 skip_serializing_if = "crate::serde::is_default"
106 )]
107 acknowledgements: AcknowledgementsConfig,
108
109 #[configurable(metadata(docs::examples = "hostname"))]
117 pub host_key: Option<OptionalValuePath>,
118
119 #[configurable(metadata(docs::examples = "text"))]
123 pub message_key: Option<OptionalValuePath>,
124
125 #[configurable(metadata(docs::examples = "source"))]
129 pub source_type_key: Option<OptionalValuePath>,
130}
131
132#[derive(Debug)]
133struct InfluxDbLogsSink {
134 uri: Uri,
135 token: String,
136 protocol_version: ProtocolVersion,
137 measurement: String,
138 tags: HashSet<KeyString>,
139 transformer: Transformer,
140 host_key: OwnedValuePath,
141 message_key: OwnedValuePath,
142 source_type_key: OwnedValuePath,
143}
144
145impl GenerateConfig for InfluxDbLogsConfig {
146 fn generate_config() -> toml::Value {
147 toml::from_str(indoc! {r#"
148 endpoint = "http://localhost:8086/"
149 namespace = "my-namespace"
150 tags = []
151 org = "my-org"
152 bucket = "my-bucket"
153 token = "${INFLUXDB_TOKEN}"
154 "#})
155 .unwrap()
156 }
157}
158
159#[async_trait::async_trait]
160#[typetag::serde(name = "influxdb_logs")]
161impl SinkConfig for InfluxDbLogsConfig {
162 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
163 let measurement = self.get_measurement()?;
164 let tags: HashSet<KeyString> = self.tags.iter().cloned().collect();
165
166 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
167 let client = HttpClient::new(tls_settings, cx.proxy())?;
168 let healthcheck = self.healthcheck(client.clone())?;
169
170 let batch = self.batch.into_batch_settings()?;
171 let request = self.request.into_settings();
172
173 let settings = influxdb_settings(
174 self.influxdb1_settings.clone(),
175 self.influxdb2_settings.clone(),
176 )
177 .unwrap();
178
179 let endpoint = self.endpoint.clone();
180 let uri = settings.write_uri(endpoint).unwrap();
181
182 let token = settings.token();
183 let protocol_version = settings.protocol_version();
184
185 let host_key = self
186 .host_key
187 .as_ref()
188 .and_then(|k| k.path.clone())
189 .or_else(|| log_schema().host_key().cloned())
190 .expect("global log_schema.host_key to be valid path");
191
192 let message_key = self
193 .message_key
194 .as_ref()
195 .and_then(|k| k.path.clone())
196 .or_else(|| log_schema().message_key().cloned())
197 .expect("global log_schema.message_key to be valid path");
198
199 let source_type_key = self
200 .source_type_key
201 .as_ref()
202 .and_then(|k| k.path.clone())
203 .or_else(|| log_schema().source_type_key().cloned())
204 .expect("global log_schema.source_type_key to be valid path");
205
206 let sink = InfluxDbLogsSink {
207 uri,
208 token: token.inner().to_owned(),
209 protocol_version,
210 measurement,
211 tags,
212 transformer: self.encoding.clone(),
213 host_key,
214 message_key,
215 source_type_key,
216 };
217
218 let sink = BatchedHttpSink::new(
219 sink,
220 Buffer::new(batch.size, Compression::None),
221 request,
222 batch.timeout,
223 client,
224 )
225 .sink_map_err(|error| error!(message = "Fatal influxdb_logs sink error.", %error));
226
227 #[allow(deprecated)]
228 Ok((VectorSink::from_event_sink(sink), healthcheck))
229 }
230
231 fn input(&self) -> Input {
232 let requirements = schema::Requirement::empty()
233 .optional_meaning("message", Kind::bytes())
234 .optional_meaning("host", Kind::bytes())
235 .optional_meaning("timestamp", Kind::timestamp());
236
237 Input::log().with_schema_requirement(requirements)
238 }
239
240 fn acknowledgements(&self) -> &AcknowledgementsConfig {
241 &self.acknowledgements
242 }
243}
244
245struct InfluxDbLogsEncoder {
246 protocol_version: ProtocolVersion,
247 measurement: String,
248 tags: HashSet<KeyString>,
249 transformer: Transformer,
250 host_key: OwnedValuePath,
251 message_key: OwnedValuePath,
252 source_type_key: OwnedValuePath,
253}
254
255impl HttpEventEncoder<BytesMut> for InfluxDbLogsEncoder {
256 fn encode_event(&mut self, event: Event) -> Option<BytesMut> {
257 let mut log = event.into_log();
258 if let Some(message_path) = log.message_path().cloned().as_ref() {
263 log.rename_key(message_path, (PathPrefix::Event, &self.message_key));
264 }
265 if let Some(host_path) = log.host_path().cloned().as_ref() {
268 self.tags.replace(host_path.path.to_string().into());
269 log.rename_key(host_path, (PathPrefix::Event, &self.host_key));
270 }
271
272 if let Some(source_type_path) = log.source_type_path().cloned().as_ref() {
273 self.tags.replace(source_type_path.path.to_string().into());
274 log.rename_key(source_type_path, (PathPrefix::Event, &self.source_type_key));
275 }
276
277 self.tags.replace("metric_type".into());
278 log.insert(event_path!("metric_type"), "logs");
279
280 let timestamp = encode_timestamp(match log.remove_timestamp() {
282 Some(Value::Timestamp(ts)) => Some(ts),
283 _ => None,
284 });
285
286 let log = {
287 let mut event = Event::from(log);
288 self.transformer.transform(&mut event);
289 event.into_log()
290 };
291
292 let mut tags = MetricTags::default();
294 let mut fields: HashMap<KeyString, Field> = HashMap::new();
295 log.convert_to_fields().for_each(|(key, value)| {
296 if self.tags.contains(&key[..]) {
297 tags.replace(key.into(), value.to_string_lossy().into_owned());
298 } else {
299 fields.insert(key, to_field(value));
300 }
301 });
302
303 let mut output = BytesMut::new();
304 if let Err(error_message) = influx_line_protocol(
305 self.protocol_version,
306 &self.measurement,
307 Some(tags),
308 Some(fields),
309 timestamp,
310 &mut output,
311 ) {
312 emit!(InfluxdbEncodingError {
313 error_message,
314 count: 1
315 });
316 return None;
317 };
318
319 Some(output)
320 }
321}
322
323impl HttpSink for InfluxDbLogsSink {
324 type Input = BytesMut;
325 type Output = BytesMut;
326 type Encoder = InfluxDbLogsEncoder;
327
328 fn build_encoder(&self) -> Self::Encoder {
329 InfluxDbLogsEncoder {
330 protocol_version: self.protocol_version,
331 measurement: self.measurement.clone(),
332 tags: self.tags.clone(),
333 transformer: self.transformer.clone(),
334 host_key: self.host_key.clone(),
335 message_key: self.message_key.clone(),
336 source_type_key: self.source_type_key.clone(),
337 }
338 }
339
340 async fn build_request(&self, events: Self::Output) -> crate::Result<Request<Bytes>> {
341 Request::post(&self.uri)
342 .header("Content-Type", "text/plain")
343 .header("Authorization", format!("Token {}", &self.token))
344 .body(events.freeze())
345 .map_err(Into::into)
346 }
347}
348
349impl InfluxDbLogsConfig {
350 fn get_measurement(&self) -> Result<String, &'static str> {
351 match (self.measurement.as_ref(), self.namespace.as_ref()) {
352 (Some(measure), Some(_)) => {
353 warn!("Option `namespace` has been superseded by `measurement`.");
354 Ok(measure.clone())
355 }
356 (Some(measure), None) => Ok(measure.clone()),
357 (None, Some(namespace)) => {
358 warn!(
359 "Option `namespace` has been deprecated. Use `measurement` instead. \
360 For example, you can use `measurement=<namespace>.vector` for the \
361 same effect."
362 );
363 Ok(format!("{namespace}.vector"))
364 }
365 (None, None) => Err("The `measurement` option is required."),
366 }
367 }
368
369 fn healthcheck(&self, client: HttpClient) -> crate::Result<Healthcheck> {
370 let config = self.clone();
371
372 let healthcheck = healthcheck(
373 config.endpoint,
374 config.influxdb1_settings,
375 config.influxdb2_settings,
376 client,
377 )?;
378
379 Ok(healthcheck)
380 }
381}
382
383fn to_field(value: &Value) -> Field {
384 match value {
385 Value::Integer(num) => Field::Int(*num),
386 Value::Float(num) => Field::Float(num.into_inner()),
387 Value::Boolean(b) => Field::Bool(*b),
388 _ => Field::String(value.to_string_lossy().into_owned()),
389 }
390}
391
392#[cfg(test)]
393mod tests {
394 use chrono::{offset::TimeZone, Utc};
395 use futures::{channel::mpsc, stream, StreamExt};
396 use http::{request::Parts, StatusCode};
397 use indoc::indoc;
398
399 use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
400 use vector_lib::lookup::owned_value_path;
401
402 use crate::{
403 sinks::{
404 influxdb::test_util::{assert_fields, split_line_protocol, ts},
405 util::test::{build_test_server_status, load_sink},
406 },
407 test_util::{
408 components::{
409 run_and_assert_sink_compliance, run_and_assert_sink_error, COMPONENT_ERROR_TAGS,
410 HTTP_SINK_TAGS,
411 },
412 next_addr,
413 },
414 };
415
416 use super::*;
417
418 type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>;
419
420 #[test]
421 fn generate_config() {
422 crate::test_util::test_generate_config::<InfluxDbLogsConfig>();
423 }
424
425 #[test]
426 fn test_config_without_tags() {
427 let config = indoc! {r#"
428 namespace = "vector-logs"
429 endpoint = "http://localhost:9999"
430 bucket = "my-bucket"
431 org = "my-org"
432 token = "my-token"
433 "#};
434
435 toml::from_str::<InfluxDbLogsConfig>(config).unwrap();
436 }
437
438 #[test]
439 fn test_config_measurement_from_namespace() {
440 let config = indoc! {r#"
441 namespace = "ns"
442 endpoint = "http://localhost:9999"
443 "#};
444
445 let sink_config = toml::from_str::<InfluxDbLogsConfig>(config).unwrap();
446 assert_eq!("ns.vector", sink_config.get_measurement().unwrap());
447 }
448
449 #[test]
450 fn test_encode_event_apply_rules() {
451 let mut event = Event::Log(LogEvent::from("hello"));
452 event.as_mut_log().insert("host", "aws.cloud.eur");
453 event.as_mut_log().insert("timestamp", ts());
454
455 let mut sink = create_sink(
456 "http://localhost:9999",
457 "my-token",
458 ProtocolVersion::V1,
459 "vector",
460 ["metric_type", "host"].to_vec(),
461 );
462 sink.transformer
463 .set_except_fields(Some(vec!["host".into()]))
464 .unwrap();
465 let mut encoder = sink.build_encoder();
466
467 let bytes = encoder.encode_event(event.clone()).unwrap();
468 let string = std::str::from_utf8(&bytes).unwrap();
469
470 let line_protocol = split_line_protocol(string);
471 assert_eq!("vector", line_protocol.0);
472 assert_eq!("metric_type=logs", line_protocol.1);
473 assert_fields(line_protocol.2.to_string(), ["message=\"hello\""].to_vec());
474 assert_eq!("1542182950000000011\n", line_protocol.3);
475
476 sink.transformer
477 .set_except_fields(Some(vec!["metric_type".into()]))
478 .unwrap();
479 let mut encoder = sink.build_encoder();
480 let bytes = encoder.encode_event(event.clone()).unwrap();
481 let string = std::str::from_utf8(&bytes).unwrap();
482 let line_protocol = split_line_protocol(string);
483 assert_eq!(
484 "host=aws.cloud.eur", line_protocol.1,
485 "metric_type tag should be excluded"
486 );
487 assert_fields(line_protocol.2, ["message=\"hello\""].to_vec());
488 }
489
490 #[test]
491 fn test_encode_event_v1() {
492 let mut event = Event::Log(LogEvent::from("hello"));
493 event.as_mut_log().insert("host", "aws.cloud.eur");
494 event.as_mut_log().insert("source_type", "file");
495
496 event.as_mut_log().insert("int", 4i32);
497 event.as_mut_log().insert("float", 5.5);
498 event.as_mut_log().insert("bool", true);
499 event.as_mut_log().insert("string", "thisisastring");
500 event.as_mut_log().insert("timestamp", ts());
501
502 let sink = create_sink(
503 "http://localhost:9999",
504 "my-token",
505 ProtocolVersion::V1,
506 "vector",
507 ["source_type", "host", "metric_type"].to_vec(),
508 );
509 let mut encoder = sink.build_encoder();
510
511 let bytes = encoder.encode_event(event).unwrap();
512 let string = std::str::from_utf8(&bytes).unwrap();
513
514 let line_protocol = split_line_protocol(string);
515 assert_eq!("vector", line_protocol.0);
516 assert_eq!(
517 "host=aws.cloud.eur,metric_type=logs,source_type=file",
518 line_protocol.1
519 );
520 assert_fields(
521 line_protocol.2.to_string(),
522 [
523 "int=4i",
524 "float=5.5",
525 "bool=true",
526 "string=\"thisisastring\"",
527 "message=\"hello\"",
528 ]
529 .to_vec(),
530 );
531
532 assert_eq!("1542182950000000011\n", line_protocol.3);
533 }
534
535 #[test]
536 fn test_encode_event() {
537 let mut event = Event::Log(LogEvent::from("hello"));
538 event.as_mut_log().insert("host", "aws.cloud.eur");
539 event.as_mut_log().insert("source_type", "file");
540
541 event.as_mut_log().insert("int", 4i32);
542 event.as_mut_log().insert("float", 5.5);
543 event.as_mut_log().insert("bool", true);
544 event.as_mut_log().insert("string", "thisisastring");
545 event.as_mut_log().insert("timestamp", ts());
546
547 let sink = create_sink(
548 "http://localhost:9999",
549 "my-token",
550 ProtocolVersion::V2,
551 "vector",
552 ["source_type", "host", "metric_type"].to_vec(),
553 );
554 let mut encoder = sink.build_encoder();
555
556 let bytes = encoder.encode_event(event).unwrap();
557 let string = std::str::from_utf8(&bytes).unwrap();
558
559 let line_protocol = split_line_protocol(string);
560 assert_eq!("vector", line_protocol.0);
561 assert_eq!(
562 "host=aws.cloud.eur,metric_type=logs,source_type=file",
563 line_protocol.1
564 );
565 assert_fields(
566 line_protocol.2.to_string(),
567 [
568 "int=4i",
569 "float=5.5",
570 "bool=true",
571 "string=\"thisisastring\"",
572 "message=\"hello\"",
573 ]
574 .to_vec(),
575 );
576
577 assert_eq!("1542182950000000011\n", line_protocol.3);
578 }
579
580 #[test]
581 fn test_encode_event_without_tags() {
582 let mut event = Event::Log(LogEvent::from("hello"));
583
584 event.as_mut_log().insert("value", 100);
585 event.as_mut_log().insert("timestamp", ts());
586
587 let mut sink = create_sink(
588 "http://localhost:9999",
589 "my-token",
590 ProtocolVersion::V2,
591 "vector",
592 [].to_vec(),
593 );
594 sink.transformer
596 .set_except_fields(Some(vec!["metric_type".into()]))
597 .unwrap();
598 let mut encoder = sink.build_encoder();
599
600 let bytes = encoder.encode_event(event).unwrap();
601 let line = std::str::from_utf8(&bytes).unwrap();
602 assert!(
603 line.starts_with("vector "),
604 "measurement (without tags) should ends with space ' '"
605 );
606
607 let line_protocol = split_line_protocol(line);
608 assert_eq!("vector", line_protocol.0);
609 assert_eq!("", line_protocol.1, "tags should be empty");
610 assert_fields(
611 line_protocol.2,
612 ["value=100i", "message=\"hello\""].to_vec(),
613 );
614
615 assert_eq!("1542182950000000011\n", line_protocol.3);
616 }
617
618 #[test]
619 fn test_encode_nested_fields() {
620 let mut event = LogEvent::default();
621
622 event.insert("a", 1);
623 event.insert("nested.field", "2");
624 event.insert("nested.bool", true);
625 event.insert("nested.array[0]", "example-value");
626 event.insert("nested.array[2]", "another-value");
627 event.insert("nested.array[3]", 15);
628
629 let sink = create_sink(
630 "http://localhost:9999",
631 "my-token",
632 ProtocolVersion::V2,
633 "vector",
634 ["metric_type"].to_vec(),
635 );
636 let mut encoder = sink.build_encoder();
637
638 let bytes = encoder.encode_event(event.into()).unwrap();
639 let string = std::str::from_utf8(&bytes).unwrap();
640
641 let line_protocol = split_line_protocol(string);
642 assert_eq!("vector", line_protocol.0);
643 assert_eq!("metric_type=logs", line_protocol.1);
644 assert_fields(
645 line_protocol.2,
646 [
647 "a=1i",
648 "nested.array[0]=\"example-value\"",
649 "nested.array[1]=\"<null>\"",
650 "nested.array[2]=\"another-value\"",
651 "nested.array[3]=15i",
652 "nested.bool=true",
653 "nested.field=\"2\"",
654 ]
655 .to_vec(),
656 );
657 }
658
659 #[test]
660 fn test_add_tag() {
661 let mut event = Event::Log(LogEvent::from("hello"));
662 event.as_mut_log().insert("source_type", "file");
663
664 event.as_mut_log().insert("as_a_tag", 10);
665 event.as_mut_log().insert("timestamp", ts());
666
667 let sink = create_sink(
668 "http://localhost:9999",
669 "my-token",
670 ProtocolVersion::V2,
671 "vector",
672 ["as_a_tag", "not_exists_field", "source_type", "metric_type"].to_vec(),
673 );
674 let mut encoder = sink.build_encoder();
675
676 let bytes = encoder.encode_event(event).unwrap();
677 let string = std::str::from_utf8(&bytes).unwrap();
678
679 let line_protocol = split_line_protocol(string);
680 assert_eq!("vector", line_protocol.0);
681 assert_eq!(
682 "as_a_tag=10,metric_type=logs,source_type=file",
683 line_protocol.1
684 );
685 assert_fields(line_protocol.2.to_string(), ["message=\"hello\""].to_vec());
686
687 assert_eq!("1542182950000000011\n", line_protocol.3);
688 }
689
690 #[tokio::test]
691 async fn smoke_v1() {
692 let rx = smoke_test(
693 r#"database = "my-database""#,
694 StatusCode::OK,
695 BatchStatus::Delivered,
696 )
697 .await;
698
699 let query = receive_response(rx).await;
700 assert!(query.contains("db=my-database"));
701 assert!(query.contains("precision=ns"));
702 }
703
704 #[tokio::test]
705 async fn smoke_v1_failure() {
706 smoke_test(
707 r#"database = "my-database""#,
708 StatusCode::BAD_REQUEST,
709 BatchStatus::Rejected,
710 )
711 .await;
712 }
713
714 #[tokio::test]
715 async fn smoke_v2() {
716 let rx = smoke_test(
717 indoc! {r#"
718 bucket = "my-bucket"
719 org = "my-org"
720 token = "my-token"
721 "#},
722 StatusCode::OK,
723 BatchStatus::Delivered,
724 )
725 .await;
726
727 let query = receive_response(rx).await;
728 assert!(query.contains("org=my-org"));
729 assert!(query.contains("bucket=my-bucket"));
730 assert!(query.contains("precision=ns"));
731 }
732
733 #[tokio::test]
734 async fn smoke_v2_failure() {
735 smoke_test(
736 indoc! {r#"
737 bucket = "my-bucket"
738 org = "my-org"
739 token = "my-token"
740 "#},
741 StatusCode::BAD_REQUEST,
742 BatchStatus::Rejected,
743 )
744 .await;
745 }
746
747 async fn smoke_test(
748 config: &str,
749 status_code: StatusCode,
750 batch_status: BatchStatus,
751 ) -> Receiver {
752 let config = format!(
753 indoc! {r#"
754 measurement = "vector"
755 endpoint = "http://localhost:9999"
756 {}
757 "#},
758 config
759 );
760 let (mut config, cx) = load_sink::<InfluxDbLogsConfig>(&config).unwrap();
761
762 _ = config.build(cx.clone()).await.unwrap();
764
765 let addr = next_addr();
766 let host = format!("http://{addr}");
769 config.endpoint = host;
770
771 let (sink, _) = config.build(cx).await.unwrap();
772
773 let (rx, _trigger, server) = build_test_server_status(addr, status_code);
774 tokio::spawn(server);
775
776 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
777
778 let lines = std::iter::repeat(())
779 .map(move |_| "message_value")
780 .take(5)
781 .collect::<Vec<_>>();
782 let mut events = Vec::new();
783
784 for (i, line) in lines.iter().enumerate() {
786 let mut event = LogEvent::from(line.to_string()).with_batch_notifier(&batch);
787 event.insert(format!("key{i}").as_str(), format!("value{i}"));
788
789 let timestamp = Utc
790 .with_ymd_and_hms(1970, 1, 1, 0, 0, (i as u32) + 1)
791 .single()
792 .expect("invalid timestamp");
793 event.insert("timestamp", timestamp);
794 event.insert("source_type", "file");
795
796 events.push(Event::Log(event));
797 }
798 drop(batch);
799
800 if batch_status == BatchStatus::Delivered {
801 run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
802 } else {
803 run_and_assert_sink_error(sink, stream::iter(events), &COMPONENT_ERROR_TAGS).await;
804 }
805
806 assert_eq!(receiver.try_recv(), Ok(batch_status));
807
808 rx
809 }
810
811 async fn receive_response(mut rx: Receiver) -> String {
812 let output = rx.next().await.unwrap();
813
814 let request = &output.0;
815 let query = request.uri.query().unwrap();
816
817 let body = std::str::from_utf8(&output.1[..]).unwrap();
818 let mut lines = body.lines();
819
820 assert_eq!(5, lines.clone().count());
821 assert_line_protocol(0, lines.next());
822
823 query.into()
824 }
825
826 fn assert_line_protocol(i: i64, value: Option<&str>) {
827 let line_protocol = split_line_protocol(value.unwrap());
829 assert_eq!("vector", line_protocol.0);
830 assert_eq!("metric_type=logs,source_type=file", line_protocol.1);
831 assert_fields(
832 line_protocol.2.to_string(),
833 [
834 &*format!("key{i}=\"value{i}\""),
835 "message=\"message_value\"",
836 ]
837 .to_vec(),
838 );
839
840 assert_eq!(((i + 1) * 1000000000).to_string(), line_protocol.3);
841 }
842
843 fn create_sink(
844 uri: &str,
845 token: &str,
846 protocol_version: ProtocolVersion,
847 measurement: &str,
848 tags: Vec<&str>,
849 ) -> InfluxDbLogsSink {
850 let uri = uri.parse::<Uri>().unwrap();
851 let token = token.to_string();
852 let measurement = measurement.to_string();
853 let tags: HashSet<_> = tags.into_iter().map(|tag| tag.into()).collect();
854 InfluxDbLogsSink {
855 uri,
856 token,
857 protocol_version,
858 measurement,
859 tags,
860 transformer: Default::default(),
861 host_key: owned_value_path!("host"),
862 message_key: owned_value_path!("message"),
863 source_type_key: owned_value_path!("source_type"),
864 }
865 }
866}
867
868#[cfg(feature = "influxdb-integration-tests")]
869#[cfg(test)]
870mod integration_tests {
871 use std::sync::Arc;
872
873 use chrono::Utc;
874 use futures::stream;
875 use vrl::value;
876
877 use vector_lib::codecs::BytesDeserializerConfig;
878 use vector_lib::config::{LegacyKey, LogNamespace};
879 use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
880 use vector_lib::lookup::{owned_value_path, path};
881
882 use crate::{
883 config::SinkContext,
884 sinks::influxdb::{
885 logs::InfluxDbLogsConfig,
886 test_util::{address_v2, onboarding_v2, BUCKET, ORG, TOKEN},
887 InfluxDb2Settings,
888 },
889 test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
890 };
891
892 use super::*;
893
894 #[tokio::test]
895 async fn influxdb2_logs_put_data() {
896 let endpoint = address_v2();
897 onboarding_v2(&endpoint).await;
898
899 let now = Utc::now();
900 let measure = format!(
901 "vector-{}",
902 now.timestamp_nanos_opt().expect("Timestamp out of range")
903 );
904
905 let cx = SinkContext::default();
906
907 let config = InfluxDbLogsConfig {
908 namespace: None,
909 measurement: Some(measure.clone()),
910 endpoint: endpoint.clone(),
911 tags: Default::default(),
912 influxdb1_settings: None,
913 influxdb2_settings: Some(InfluxDb2Settings {
914 org: ORG.to_string(),
915 bucket: BUCKET.to_string(),
916 token: TOKEN.to_string().into(),
917 }),
918 encoding: Default::default(),
919 batch: Default::default(),
920 request: Default::default(),
921 tls: None,
922 acknowledgements: Default::default(),
923 host_key: None,
924 message_key: None,
925 source_type_key: None,
926 };
927
928 let (sink, _) = config.build(cx).await.unwrap();
929
930 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
931
932 let mut event1 = LogEvent::from("message_1").with_batch_notifier(&batch);
933 event1.insert("host", "aws.cloud.eur");
934 event1.insert("source_type", "file");
935
936 let mut event2 = LogEvent::from("message_2").with_batch_notifier(&batch);
937 event2.insert("host", "aws.cloud.eur");
938 event2.insert("source_type", "file");
939
940 let mut namespaced_log =
941 LogEvent::from(value!("namespaced message")).with_batch_notifier(&batch);
942 LogNamespace::Vector.insert_source_metadata(
943 "file",
944 &mut namespaced_log,
945 Some(LegacyKey::Overwrite(path!("host"))),
946 path!("host"),
947 "aws.cloud.eur",
948 );
949 LogNamespace::Vector.insert_standard_vector_source_metadata(
950 &mut namespaced_log,
951 "file",
952 now,
953 );
954 let schema = BytesDeserializerConfig
955 .schema_definition(LogNamespace::Vector)
956 .with_metadata_field(
957 &owned_value_path!("file", "host"),
958 Kind::bytes(),
959 Some("host"),
960 );
961 namespaced_log
962 .metadata_mut()
963 .set_schema_definition(&Arc::new(schema));
964
965 drop(batch);
966
967 let events = vec![
968 Event::Log(event1),
969 Event::Log(event2),
970 Event::Log(namespaced_log),
971 ];
972
973 run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
974
975 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
976
977 let mut body = std::collections::HashMap::new();
978 body.insert("query", format!("from(bucket:\"my-bucket\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"{}\")", measure.clone()));
979 body.insert("type", "flux".to_owned());
980
981 let client = reqwest::Client::builder()
982 .danger_accept_invalid_certs(true)
983 .build()
984 .unwrap();
985
986 let res = client
987 .post(format!("{endpoint}/api/v2/query?org=my-org"))
988 .json(&body)
989 .header("accept", "application/json")
990 .header("Authorization", "Token my-token")
991 .send()
992 .await
993 .unwrap();
994 let string = res.text().await.unwrap();
995
996 let lines = string.split('\n').collect::<Vec<&str>>();
997 let header = lines[0].split(',').collect::<Vec<&str>>();
998 let record1 = lines[1].split(',').collect::<Vec<&str>>();
999 let record2 = lines[2].split(',').collect::<Vec<&str>>();
1000 let record_ns = lines[3].split(',').collect::<Vec<&str>>();
1001
1002 assert_eq!(
1004 record1[header
1005 .iter()
1006 .position(|&r| r.trim() == "_measurement")
1007 .unwrap()]
1008 .trim(),
1009 measure.clone()
1010 );
1011 assert_eq!(
1012 record2[header
1013 .iter()
1014 .position(|&r| r.trim() == "_measurement")
1015 .unwrap()]
1016 .trim(),
1017 measure.clone()
1018 );
1019 assert_eq!(
1020 record_ns[header
1021 .iter()
1022 .position(|&r| r.trim() == "_measurement")
1023 .unwrap()]
1024 .trim(),
1025 measure.clone()
1026 );
1027
1028 assert_eq!(
1030 record1[header
1031 .iter()
1032 .position(|&r| r.trim() == "metric_type")
1033 .unwrap()]
1034 .trim(),
1035 "logs"
1036 );
1037 assert_eq!(
1038 record2[header
1039 .iter()
1040 .position(|&r| r.trim() == "metric_type")
1041 .unwrap()]
1042 .trim(),
1043 "logs"
1044 );
1045 assert_eq!(
1046 record_ns[header
1047 .iter()
1048 .position(|&r| r.trim() == "metric_type")
1049 .unwrap()]
1050 .trim(),
1051 "logs"
1052 );
1053 assert_eq!(
1054 record1[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
1055 "aws.cloud.eur"
1056 );
1057 assert_eq!(
1058 record2[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
1059 "aws.cloud.eur"
1060 );
1061 assert_eq!(
1062 record_ns[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
1063 "aws.cloud.eur"
1064 );
1065 assert_eq!(
1066 record1[header
1067 .iter()
1068 .position(|&r| r.trim() == "source_type")
1069 .unwrap()]
1070 .trim(),
1071 "file"
1072 );
1073 assert_eq!(
1074 record2[header
1075 .iter()
1076 .position(|&r| r.trim() == "source_type")
1077 .unwrap()]
1078 .trim(),
1079 "file"
1080 );
1081 assert_eq!(
1082 record_ns[header
1083 .iter()
1084 .position(|&r| r.trim() == "source_type")
1085 .unwrap()]
1086 .trim(),
1087 "file"
1088 );
1089
1090 assert_eq!(
1092 record1[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1093 "message"
1094 );
1095 assert_eq!(
1096 record2[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1097 "message"
1098 );
1099 assert_eq!(
1100 record_ns[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1101 "message"
1102 );
1103 assert_eq!(
1104 record1[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1105 "message_1"
1106 );
1107 assert_eq!(
1108 record2[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1109 "message_2"
1110 );
1111 assert_eq!(
1112 record_ns[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1113 "namespaced message"
1114 );
1115 }
1116}