1use std::collections::{HashMap, HashSet};
2
3use bytes::{Bytes, BytesMut};
4use futures::SinkExt;
5use http::{Request, Uri};
6use indoc::indoc;
7use vector_lib::{
8 config::log_schema,
9 configurable::configurable_component,
10 lookup::{PathPrefix, lookup_v2::OptionalValuePath},
11 schema,
12};
13use vrl::{event_path, path::OwnedValuePath, value::Kind};
14
15use super::{
16 Field, InfluxDb1Settings, InfluxDb2Settings, ProtocolVersion, encode_timestamp, healthcheck,
17 influx_line_protocol, influxdb_settings,
18};
19use crate::{
20 codecs::Transformer,
21 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
22 event::{Event, KeyString, MetricTags, Value},
23 http::HttpClient,
24 internal_events::InfluxdbEncodingError,
25 sinks::{
26 Healthcheck, VectorSink,
27 util::{
28 BatchConfig, Buffer, Compression, SinkBatchSettings, TowerRequestConfig,
29 http::{BatchedHttpSink, HttpEventEncoder, HttpSink},
30 },
31 },
32 tls::{TlsConfig, TlsSettings},
33};
34
35#[derive(Clone, Copy, Debug, Default)]
36pub struct InfluxDbLogsDefaultBatchSettings;
37
38impl SinkBatchSettings for InfluxDbLogsDefaultBatchSettings {
39 const MAX_EVENTS: Option<usize> = None;
40 const MAX_BYTES: Option<usize> = Some(1_000_000);
41 const TIMEOUT_SECS: f64 = 1.0;
42}
43
44#[configurable_component(sink("influxdb_logs", "Deliver log event data to InfluxDB."))]
46#[derive(Clone, Debug, Default)]
47#[serde(deny_unknown_fields)]
48pub struct InfluxDbLogsConfig {
49 #[configurable(
54 deprecated = "This field is deprecated, and `measurement` should be used instead."
55 )]
56 #[configurable(metadata(docs::examples = "service"))]
57 pub namespace: Option<String>,
58
59 #[configurable(metadata(docs::examples = "vector-logs"))]
61 pub measurement: Option<String>,
62
63 #[configurable(metadata(docs::examples = "http://localhost:8086"))]
67 pub endpoint: String,
68
69 #[serde(default)]
74 #[configurable(metadata(docs::examples = "field1"))]
75 #[configurable(metadata(docs::examples = "parent.child_field"))]
76 pub tags: Vec<KeyString>,
77
78 #[serde(flatten)]
79 pub influxdb1_settings: Option<InfluxDb1Settings>,
80
81 #[serde(flatten)]
82 pub influxdb2_settings: Option<InfluxDb2Settings>,
83
84 #[configurable(derived)]
85 #[serde(skip_serializing_if = "crate::serde::is_default", default)]
86 pub encoding: Transformer,
87
88 #[configurable(derived)]
89 #[serde(default)]
90 pub batch: BatchConfig<InfluxDbLogsDefaultBatchSettings>,
91
92 #[configurable(derived)]
93 #[serde(default)]
94 pub request: TowerRequestConfig,
95
96 #[configurable(derived)]
97 pub tls: Option<TlsConfig>,
98
99 #[configurable(derived)]
100 #[serde(
101 default,
102 deserialize_with = "crate::serde::bool_or_struct",
103 skip_serializing_if = "crate::serde::is_default"
104 )]
105 acknowledgements: AcknowledgementsConfig,
106
107 #[configurable(metadata(docs::examples = "hostname"))]
115 pub host_key: Option<OptionalValuePath>,
116
117 #[configurable(metadata(docs::examples = "text"))]
121 pub message_key: Option<OptionalValuePath>,
122
123 #[configurable(metadata(docs::examples = "source"))]
127 pub source_type_key: Option<OptionalValuePath>,
128}
129
130#[derive(Debug)]
131struct InfluxDbLogsSink {
132 uri: Uri,
133 token: String,
134 protocol_version: ProtocolVersion,
135 measurement: String,
136 tags: HashSet<KeyString>,
137 transformer: Transformer,
138 host_key: OwnedValuePath,
139 message_key: OwnedValuePath,
140 source_type_key: OwnedValuePath,
141}
142
143impl GenerateConfig for InfluxDbLogsConfig {
144 fn generate_config() -> toml::Value {
145 toml::from_str(indoc! {r#"
146 endpoint = "http://localhost:8086/"
147 namespace = "my-namespace"
148 tags = []
149 org = "my-org"
150 bucket = "my-bucket"
151 token = "${INFLUXDB_TOKEN}"
152 "#})
153 .unwrap()
154 }
155}
156
157#[async_trait::async_trait]
158#[typetag::serde(name = "influxdb_logs")]
159impl SinkConfig for InfluxDbLogsConfig {
160 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
161 let measurement = self.get_measurement()?;
162 let tags: HashSet<KeyString> = self.tags.iter().cloned().collect();
163
164 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
165 let client = HttpClient::new(tls_settings, cx.proxy())?;
166 let healthcheck = self.healthcheck(client.clone())?;
167
168 let batch = self.batch.into_batch_settings()?;
169 let request = self.request.into_settings();
170
171 let settings = influxdb_settings(
172 self.influxdb1_settings.clone(),
173 self.influxdb2_settings.clone(),
174 )
175 .unwrap();
176
177 let endpoint = self.endpoint.clone();
178 let uri = settings.write_uri(endpoint).unwrap();
179
180 let token = settings.token();
181 let protocol_version = settings.protocol_version();
182
183 let host_key = self
184 .host_key
185 .as_ref()
186 .and_then(|k| k.path.clone())
187 .or_else(|| log_schema().host_key().cloned())
188 .expect("global log_schema.host_key to be valid path");
189
190 let message_key = self
191 .message_key
192 .as_ref()
193 .and_then(|k| k.path.clone())
194 .or_else(|| log_schema().message_key().cloned())
195 .expect("global log_schema.message_key to be valid path");
196
197 let source_type_key = self
198 .source_type_key
199 .as_ref()
200 .and_then(|k| k.path.clone())
201 .or_else(|| log_schema().source_type_key().cloned())
202 .expect("global log_schema.source_type_key to be valid path");
203
204 let sink = InfluxDbLogsSink {
205 uri,
206 token: token.inner().to_owned(),
207 protocol_version,
208 measurement,
209 tags,
210 transformer: self.encoding.clone(),
211 host_key,
212 message_key,
213 source_type_key,
214 };
215
216 let sink = BatchedHttpSink::new(
217 sink,
218 Buffer::new(batch.size, Compression::None),
219 request,
220 batch.timeout,
221 client,
222 )
223 .sink_map_err(|error| error!(message = "Fatal influxdb_logs sink error.", %error));
224
225 #[allow(deprecated)]
226 Ok((VectorSink::from_event_sink(sink), healthcheck))
227 }
228
229 fn input(&self) -> Input {
230 let requirements = schema::Requirement::empty()
231 .optional_meaning("message", Kind::bytes())
232 .optional_meaning("host", Kind::bytes())
233 .optional_meaning("timestamp", Kind::timestamp());
234
235 Input::log().with_schema_requirement(requirements)
236 }
237
238 fn acknowledgements(&self) -> &AcknowledgementsConfig {
239 &self.acknowledgements
240 }
241}
242
243struct InfluxDbLogsEncoder {
244 protocol_version: ProtocolVersion,
245 measurement: String,
246 tags: HashSet<KeyString>,
247 transformer: Transformer,
248 host_key: OwnedValuePath,
249 message_key: OwnedValuePath,
250 source_type_key: OwnedValuePath,
251}
252
253impl HttpEventEncoder<BytesMut> for InfluxDbLogsEncoder {
254 fn encode_event(&mut self, event: Event) -> Option<BytesMut> {
255 let mut log = event.into_log();
256 if let Some(message_path) = log.message_path().cloned().as_ref() {
261 log.rename_key(message_path, (PathPrefix::Event, &self.message_key));
262 }
263 if let Some(host_path) = log.host_path().cloned().as_ref() {
266 self.tags.replace(host_path.path.to_string().into());
267 log.rename_key(host_path, (PathPrefix::Event, &self.host_key));
268 }
269
270 if let Some(source_type_path) = log.source_type_path().cloned().as_ref() {
271 self.tags.replace(source_type_path.path.to_string().into());
272 log.rename_key(source_type_path, (PathPrefix::Event, &self.source_type_key));
273 }
274
275 self.tags.replace("metric_type".into());
276 log.insert(event_path!("metric_type"), "logs");
277
278 let timestamp = encode_timestamp(match log.remove_timestamp() {
280 Some(Value::Timestamp(ts)) => Some(ts),
281 _ => None,
282 });
283
284 let log = {
285 let mut event = Event::from(log);
286 self.transformer.transform(&mut event);
287 event.into_log()
288 };
289
290 let mut tags = MetricTags::default();
292 let mut fields: HashMap<KeyString, Field> = HashMap::new();
293 log.convert_to_fields().for_each(|(key, value)| {
294 if self.tags.contains(&key[..]) {
295 tags.replace(key.into(), value.to_string_lossy().into_owned());
296 } else {
297 fields.insert(key, to_field(value));
298 }
299 });
300
301 let mut output = BytesMut::new();
302 if let Err(error_message) = influx_line_protocol(
303 self.protocol_version,
304 &self.measurement,
305 Some(tags),
306 Some(fields),
307 timestamp,
308 &mut output,
309 ) {
310 emit!(InfluxdbEncodingError {
311 error_message,
312 count: 1
313 });
314 return None;
315 };
316
317 Some(output)
318 }
319}
320
321impl HttpSink for InfluxDbLogsSink {
322 type Input = BytesMut;
323 type Output = BytesMut;
324 type Encoder = InfluxDbLogsEncoder;
325
326 fn build_encoder(&self) -> Self::Encoder {
327 InfluxDbLogsEncoder {
328 protocol_version: self.protocol_version,
329 measurement: self.measurement.clone(),
330 tags: self.tags.clone(),
331 transformer: self.transformer.clone(),
332 host_key: self.host_key.clone(),
333 message_key: self.message_key.clone(),
334 source_type_key: self.source_type_key.clone(),
335 }
336 }
337
338 async fn build_request(&self, events: Self::Output) -> crate::Result<Request<Bytes>> {
339 Request::post(&self.uri)
340 .header("Content-Type", "text/plain")
341 .header("Authorization", format!("Token {}", &self.token))
342 .body(events.freeze())
343 .map_err(Into::into)
344 }
345}
346
347impl InfluxDbLogsConfig {
348 fn get_measurement(&self) -> Result<String, &'static str> {
349 match (self.measurement.as_ref(), self.namespace.as_ref()) {
350 (Some(measure), Some(_)) => {
351 warn!("Option `namespace` has been superseded by `measurement`.");
352 Ok(measure.clone())
353 }
354 (Some(measure), None) => Ok(measure.clone()),
355 (None, Some(namespace)) => {
356 warn!(
357 "Option `namespace` has been deprecated. Use `measurement` instead. \
358 For example, you can use `measurement=<namespace>.vector` for the \
359 same effect."
360 );
361 Ok(format!("{namespace}.vector"))
362 }
363 (None, None) => Err("The `measurement` option is required."),
364 }
365 }
366
367 fn healthcheck(&self, client: HttpClient) -> crate::Result<Healthcheck> {
368 let config = self.clone();
369
370 let healthcheck = healthcheck(
371 config.endpoint,
372 config.influxdb1_settings,
373 config.influxdb2_settings,
374 client,
375 )?;
376
377 Ok(healthcheck)
378 }
379}
380
381fn to_field(value: &Value) -> Field {
382 match value {
383 Value::Integer(num) => Field::Int(*num),
384 Value::Float(num) => Field::Float(num.into_inner()),
385 Value::Boolean(b) => Field::Bool(*b),
386 _ => Field::String(value.to_string_lossy().into_owned()),
387 }
388}
389
390#[cfg(test)]
391mod tests {
392 use chrono::{Utc, offset::TimeZone};
393 use futures::{StreamExt, channel::mpsc, stream};
394 use http::{StatusCode, request::Parts};
395 use indoc::indoc;
396 use vector_lib::{
397 event::{BatchNotifier, BatchStatus, Event, LogEvent},
398 lookup::owned_value_path,
399 };
400
401 use super::*;
402 use crate::{
403 sinks::{
404 influxdb::test_util::{assert_fields, split_line_protocol, ts},
405 util::test::{build_test_server_status, load_sink},
406 },
407 test_util::{
408 components::{
409 COMPONENT_ERROR_TAGS, HTTP_SINK_TAGS, run_and_assert_sink_compliance,
410 run_and_assert_sink_error,
411 },
412 next_addr,
413 },
414 };
415
416 type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>;
417
418 #[test]
419 fn generate_config() {
420 crate::test_util::test_generate_config::<InfluxDbLogsConfig>();
421 }
422
423 #[test]
424 fn test_config_without_tags() {
425 let config = indoc! {r#"
426 namespace = "vector-logs"
427 endpoint = "http://localhost:9999"
428 bucket = "my-bucket"
429 org = "my-org"
430 token = "my-token"
431 "#};
432
433 toml::from_str::<InfluxDbLogsConfig>(config).unwrap();
434 }
435
436 #[test]
437 fn test_config_measurement_from_namespace() {
438 let config = indoc! {r#"
439 namespace = "ns"
440 endpoint = "http://localhost:9999"
441 "#};
442
443 let sink_config = toml::from_str::<InfluxDbLogsConfig>(config).unwrap();
444 assert_eq!("ns.vector", sink_config.get_measurement().unwrap());
445 }
446
447 #[test]
448 fn test_encode_event_apply_rules() {
449 let mut event = Event::Log(LogEvent::from("hello"));
450 event.as_mut_log().insert("host", "aws.cloud.eur");
451 event.as_mut_log().insert("timestamp", ts());
452
453 let mut sink = create_sink(
454 "http://localhost:9999",
455 "my-token",
456 ProtocolVersion::V1,
457 "vector",
458 ["metric_type", "host"].to_vec(),
459 );
460 sink.transformer
461 .set_except_fields(Some(vec!["host".into()]))
462 .unwrap();
463 let mut encoder = sink.build_encoder();
464
465 let bytes = encoder.encode_event(event.clone()).unwrap();
466 let string = std::str::from_utf8(&bytes).unwrap();
467
468 let line_protocol = split_line_protocol(string);
469 assert_eq!("vector", line_protocol.0);
470 assert_eq!("metric_type=logs", line_protocol.1);
471 assert_fields(line_protocol.2.to_string(), ["message=\"hello\""].to_vec());
472 assert_eq!("1542182950000000011\n", line_protocol.3);
473
474 sink.transformer
475 .set_except_fields(Some(vec!["metric_type".into()]))
476 .unwrap();
477 let mut encoder = sink.build_encoder();
478 let bytes = encoder.encode_event(event.clone()).unwrap();
479 let string = std::str::from_utf8(&bytes).unwrap();
480 let line_protocol = split_line_protocol(string);
481 assert_eq!(
482 "host=aws.cloud.eur", line_protocol.1,
483 "metric_type tag should be excluded"
484 );
485 assert_fields(line_protocol.2, ["message=\"hello\""].to_vec());
486 }
487
488 #[test]
489 fn test_encode_event_v1() {
490 let mut event = Event::Log(LogEvent::from("hello"));
491 event.as_mut_log().insert("host", "aws.cloud.eur");
492 event.as_mut_log().insert("source_type", "file");
493
494 event.as_mut_log().insert("int", 4i32);
495 event.as_mut_log().insert("float", 5.5);
496 event.as_mut_log().insert("bool", true);
497 event.as_mut_log().insert("string", "thisisastring");
498 event.as_mut_log().insert("timestamp", ts());
499
500 let sink = create_sink(
501 "http://localhost:9999",
502 "my-token",
503 ProtocolVersion::V1,
504 "vector",
505 ["source_type", "host", "metric_type"].to_vec(),
506 );
507 let mut encoder = sink.build_encoder();
508
509 let bytes = encoder.encode_event(event).unwrap();
510 let string = std::str::from_utf8(&bytes).unwrap();
511
512 let line_protocol = split_line_protocol(string);
513 assert_eq!("vector", line_protocol.0);
514 assert_eq!(
515 "host=aws.cloud.eur,metric_type=logs,source_type=file",
516 line_protocol.1
517 );
518 assert_fields(
519 line_protocol.2.to_string(),
520 [
521 "int=4i",
522 "float=5.5",
523 "bool=true",
524 "string=\"thisisastring\"",
525 "message=\"hello\"",
526 ]
527 .to_vec(),
528 );
529
530 assert_eq!("1542182950000000011\n", line_protocol.3);
531 }
532
533 #[test]
534 fn test_encode_event() {
535 let mut event = Event::Log(LogEvent::from("hello"));
536 event.as_mut_log().insert("host", "aws.cloud.eur");
537 event.as_mut_log().insert("source_type", "file");
538
539 event.as_mut_log().insert("int", 4i32);
540 event.as_mut_log().insert("float", 5.5);
541 event.as_mut_log().insert("bool", true);
542 event.as_mut_log().insert("string", "thisisastring");
543 event.as_mut_log().insert("timestamp", ts());
544
545 let sink = create_sink(
546 "http://localhost:9999",
547 "my-token",
548 ProtocolVersion::V2,
549 "vector",
550 ["source_type", "host", "metric_type"].to_vec(),
551 );
552 let mut encoder = sink.build_encoder();
553
554 let bytes = encoder.encode_event(event).unwrap();
555 let string = std::str::from_utf8(&bytes).unwrap();
556
557 let line_protocol = split_line_protocol(string);
558 assert_eq!("vector", line_protocol.0);
559 assert_eq!(
560 "host=aws.cloud.eur,metric_type=logs,source_type=file",
561 line_protocol.1
562 );
563 assert_fields(
564 line_protocol.2.to_string(),
565 [
566 "int=4i",
567 "float=5.5",
568 "bool=true",
569 "string=\"thisisastring\"",
570 "message=\"hello\"",
571 ]
572 .to_vec(),
573 );
574
575 assert_eq!("1542182950000000011\n", line_protocol.3);
576 }
577
578 #[test]
579 fn test_encode_event_without_tags() {
580 let mut event = Event::Log(LogEvent::from("hello"));
581
582 event.as_mut_log().insert("value", 100);
583 event.as_mut_log().insert("timestamp", ts());
584
585 let mut sink = create_sink(
586 "http://localhost:9999",
587 "my-token",
588 ProtocolVersion::V2,
589 "vector",
590 [].to_vec(),
591 );
592 sink.transformer
594 .set_except_fields(Some(vec!["metric_type".into()]))
595 .unwrap();
596 let mut encoder = sink.build_encoder();
597
598 let bytes = encoder.encode_event(event).unwrap();
599 let line = std::str::from_utf8(&bytes).unwrap();
600 assert!(
601 line.starts_with("vector "),
602 "measurement (without tags) should ends with space ' '"
603 );
604
605 let line_protocol = split_line_protocol(line);
606 assert_eq!("vector", line_protocol.0);
607 assert_eq!("", line_protocol.1, "tags should be empty");
608 assert_fields(
609 line_protocol.2,
610 ["value=100i", "message=\"hello\""].to_vec(),
611 );
612
613 assert_eq!("1542182950000000011\n", line_protocol.3);
614 }
615
616 #[test]
617 fn test_encode_nested_fields() {
618 let mut event = LogEvent::default();
619
620 event.insert("a", 1);
621 event.insert("nested.field", "2");
622 event.insert("nested.bool", true);
623 event.insert("nested.array[0]", "example-value");
624 event.insert("nested.array[2]", "another-value");
625 event.insert("nested.array[3]", 15);
626
627 let sink = create_sink(
628 "http://localhost:9999",
629 "my-token",
630 ProtocolVersion::V2,
631 "vector",
632 ["metric_type"].to_vec(),
633 );
634 let mut encoder = sink.build_encoder();
635
636 let bytes = encoder.encode_event(event.into()).unwrap();
637 let string = std::str::from_utf8(&bytes).unwrap();
638
639 let line_protocol = split_line_protocol(string);
640 assert_eq!("vector", line_protocol.0);
641 assert_eq!("metric_type=logs", line_protocol.1);
642 assert_fields(
643 line_protocol.2,
644 [
645 "a=1i",
646 "nested.array[0]=\"example-value\"",
647 "nested.array[1]=\"<null>\"",
648 "nested.array[2]=\"another-value\"",
649 "nested.array[3]=15i",
650 "nested.bool=true",
651 "nested.field=\"2\"",
652 ]
653 .to_vec(),
654 );
655 }
656
657 #[test]
658 fn test_add_tag() {
659 let mut event = Event::Log(LogEvent::from("hello"));
660 event.as_mut_log().insert("source_type", "file");
661
662 event.as_mut_log().insert("as_a_tag", 10);
663 event.as_mut_log().insert("timestamp", ts());
664
665 let sink = create_sink(
666 "http://localhost:9999",
667 "my-token",
668 ProtocolVersion::V2,
669 "vector",
670 ["as_a_tag", "not_exists_field", "source_type", "metric_type"].to_vec(),
671 );
672 let mut encoder = sink.build_encoder();
673
674 let bytes = encoder.encode_event(event).unwrap();
675 let string = std::str::from_utf8(&bytes).unwrap();
676
677 let line_protocol = split_line_protocol(string);
678 assert_eq!("vector", line_protocol.0);
679 assert_eq!(
680 "as_a_tag=10,metric_type=logs,source_type=file",
681 line_protocol.1
682 );
683 assert_fields(line_protocol.2.to_string(), ["message=\"hello\""].to_vec());
684
685 assert_eq!("1542182950000000011\n", line_protocol.3);
686 }
687
688 #[tokio::test]
689 async fn smoke_v1() {
690 let rx = smoke_test(
691 r#"database = "my-database""#,
692 StatusCode::OK,
693 BatchStatus::Delivered,
694 )
695 .await;
696
697 let query = receive_response(rx).await;
698 assert!(query.contains("db=my-database"));
699 assert!(query.contains("precision=ns"));
700 }
701
702 #[tokio::test]
703 async fn smoke_v1_failure() {
704 smoke_test(
705 r#"database = "my-database""#,
706 StatusCode::BAD_REQUEST,
707 BatchStatus::Rejected,
708 )
709 .await;
710 }
711
712 #[tokio::test]
713 async fn smoke_v2() {
714 let rx = smoke_test(
715 indoc! {r#"
716 bucket = "my-bucket"
717 org = "my-org"
718 token = "my-token"
719 "#},
720 StatusCode::OK,
721 BatchStatus::Delivered,
722 )
723 .await;
724
725 let query = receive_response(rx).await;
726 assert!(query.contains("org=my-org"));
727 assert!(query.contains("bucket=my-bucket"));
728 assert!(query.contains("precision=ns"));
729 }
730
731 #[tokio::test]
732 async fn smoke_v2_failure() {
733 smoke_test(
734 indoc! {r#"
735 bucket = "my-bucket"
736 org = "my-org"
737 token = "my-token"
738 "#},
739 StatusCode::BAD_REQUEST,
740 BatchStatus::Rejected,
741 )
742 .await;
743 }
744
745 async fn smoke_test(
746 config: &str,
747 status_code: StatusCode,
748 batch_status: BatchStatus,
749 ) -> Receiver {
750 let config = format!(
751 indoc! {r#"
752 measurement = "vector"
753 endpoint = "http://localhost:9999"
754 {}
755 "#},
756 config
757 );
758 let (mut config, cx) = load_sink::<InfluxDbLogsConfig>(&config).unwrap();
759
760 _ = config.build(cx.clone()).await.unwrap();
762
763 let addr = next_addr();
764 let host = format!("http://{addr}");
767 config.endpoint = host;
768
769 let (sink, _) = config.build(cx).await.unwrap();
770
771 let (rx, _trigger, server) = build_test_server_status(addr, status_code);
772 tokio::spawn(server);
773
774 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
775
776 let lines = std::iter::repeat(())
777 .map(move |_| "message_value")
778 .take(5)
779 .collect::<Vec<_>>();
780 let mut events = Vec::new();
781
782 for (i, line) in lines.iter().enumerate() {
784 let mut event = LogEvent::from(line.to_string()).with_batch_notifier(&batch);
785 event.insert(format!("key{i}").as_str(), format!("value{i}"));
786
787 let timestamp = Utc
788 .with_ymd_and_hms(1970, 1, 1, 0, 0, (i as u32) + 1)
789 .single()
790 .expect("invalid timestamp");
791 event.insert("timestamp", timestamp);
792 event.insert("source_type", "file");
793
794 events.push(Event::Log(event));
795 }
796 drop(batch);
797
798 if batch_status == BatchStatus::Delivered {
799 run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
800 } else {
801 run_and_assert_sink_error(sink, stream::iter(events), &COMPONENT_ERROR_TAGS).await;
802 }
803
804 assert_eq!(receiver.try_recv(), Ok(batch_status));
805
806 rx
807 }
808
809 async fn receive_response(mut rx: Receiver) -> String {
810 let output = rx.next().await.unwrap();
811
812 let request = &output.0;
813 let query = request.uri.query().unwrap();
814
815 let body = std::str::from_utf8(&output.1[..]).unwrap();
816 let mut lines = body.lines();
817
818 assert_eq!(5, lines.clone().count());
819 assert_line_protocol(0, lines.next());
820
821 query.into()
822 }
823
824 fn assert_line_protocol(i: i64, value: Option<&str>) {
825 let line_protocol = split_line_protocol(value.unwrap());
827 assert_eq!("vector", line_protocol.0);
828 assert_eq!("metric_type=logs,source_type=file", line_protocol.1);
829 assert_fields(
830 line_protocol.2.to_string(),
831 [
832 &*format!("key{i}=\"value{i}\""),
833 "message=\"message_value\"",
834 ]
835 .to_vec(),
836 );
837
838 assert_eq!(((i + 1) * 1000000000).to_string(), line_protocol.3);
839 }
840
841 fn create_sink(
842 uri: &str,
843 token: &str,
844 protocol_version: ProtocolVersion,
845 measurement: &str,
846 tags: Vec<&str>,
847 ) -> InfluxDbLogsSink {
848 let uri = uri.parse::<Uri>().unwrap();
849 let token = token.to_string();
850 let measurement = measurement.to_string();
851 let tags: HashSet<_> = tags.into_iter().map(|tag| tag.into()).collect();
852 InfluxDbLogsSink {
853 uri,
854 token,
855 protocol_version,
856 measurement,
857 tags,
858 transformer: Default::default(),
859 host_key: owned_value_path!("host"),
860 message_key: owned_value_path!("message"),
861 source_type_key: owned_value_path!("source_type"),
862 }
863 }
864}
865
866#[cfg(feature = "influxdb-integration-tests")]
867#[cfg(test)]
868mod integration_tests {
869 use std::sync::Arc;
870
871 use chrono::Utc;
872 use futures::stream;
873 use vector_lib::{
874 codecs::BytesDeserializerConfig,
875 config::{LegacyKey, LogNamespace},
876 event::{BatchNotifier, BatchStatus, Event, LogEvent},
877 lookup::{owned_value_path, path},
878 };
879 use vrl::value;
880
881 use super::*;
882 use crate::{
883 config::SinkContext,
884 sinks::influxdb::{
885 InfluxDb2Settings,
886 logs::InfluxDbLogsConfig,
887 test_util::{BUCKET, ORG, TOKEN, address_v2, onboarding_v2},
888 },
889 test_util::components::{HTTP_SINK_TAGS, run_and_assert_sink_compliance},
890 };
891
892 #[tokio::test]
893 async fn influxdb2_logs_put_data() {
894 let endpoint = address_v2();
895 onboarding_v2(&endpoint).await;
896
897 let now = Utc::now();
898 let measure = format!(
899 "vector-{}",
900 now.timestamp_nanos_opt().expect("Timestamp out of range")
901 );
902
903 let cx = SinkContext::default();
904
905 let config = InfluxDbLogsConfig {
906 namespace: None,
907 measurement: Some(measure.clone()),
908 endpoint: endpoint.clone(),
909 tags: Default::default(),
910 influxdb1_settings: None,
911 influxdb2_settings: Some(InfluxDb2Settings {
912 org: ORG.to_string(),
913 bucket: BUCKET.to_string(),
914 token: TOKEN.to_string().into(),
915 }),
916 encoding: Default::default(),
917 batch: Default::default(),
918 request: Default::default(),
919 tls: None,
920 acknowledgements: Default::default(),
921 host_key: None,
922 message_key: None,
923 source_type_key: None,
924 };
925
926 let (sink, _) = config.build(cx).await.unwrap();
927
928 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
929
930 let mut event1 = LogEvent::from("message_1").with_batch_notifier(&batch);
931 event1.insert("host", "aws.cloud.eur");
932 event1.insert("source_type", "file");
933
934 let mut event2 = LogEvent::from("message_2").with_batch_notifier(&batch);
935 event2.insert("host", "aws.cloud.eur");
936 event2.insert("source_type", "file");
937
938 let mut namespaced_log =
939 LogEvent::from(value!("namespaced message")).with_batch_notifier(&batch);
940 LogNamespace::Vector.insert_source_metadata(
941 "file",
942 &mut namespaced_log,
943 Some(LegacyKey::Overwrite(path!("host"))),
944 path!("host"),
945 "aws.cloud.eur",
946 );
947 LogNamespace::Vector.insert_standard_vector_source_metadata(
948 &mut namespaced_log,
949 "file",
950 now,
951 );
952 let schema = BytesDeserializerConfig
953 .schema_definition(LogNamespace::Vector)
954 .with_metadata_field(
955 &owned_value_path!("file", "host"),
956 Kind::bytes(),
957 Some("host"),
958 );
959 namespaced_log
960 .metadata_mut()
961 .set_schema_definition(&Arc::new(schema));
962
963 drop(batch);
964
965 let events = vec![
966 Event::Log(event1),
967 Event::Log(event2),
968 Event::Log(namespaced_log),
969 ];
970
971 run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
972
973 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
974
975 let mut body = std::collections::HashMap::new();
976 body.insert("query", format!("from(bucket:\"my-bucket\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"{}\")", measure.clone()));
977 body.insert("type", "flux".to_owned());
978
979 let client = reqwest::Client::builder()
980 .danger_accept_invalid_certs(true)
981 .build()
982 .unwrap();
983
984 let res = client
985 .post(format!("{endpoint}/api/v2/query?org=my-org"))
986 .json(&body)
987 .header("accept", "application/json")
988 .header("Authorization", "Token my-token")
989 .send()
990 .await
991 .unwrap();
992 let string = res.text().await.unwrap();
993
994 let lines = string.split('\n').collect::<Vec<&str>>();
995 let header = lines[0].split(',').collect::<Vec<&str>>();
996 let record1 = lines[1].split(',').collect::<Vec<&str>>();
997 let record2 = lines[2].split(',').collect::<Vec<&str>>();
998 let record_ns = lines[3].split(',').collect::<Vec<&str>>();
999
1000 assert_eq!(
1002 record1[header
1003 .iter()
1004 .position(|&r| r.trim() == "_measurement")
1005 .unwrap()]
1006 .trim(),
1007 measure.clone()
1008 );
1009 assert_eq!(
1010 record2[header
1011 .iter()
1012 .position(|&r| r.trim() == "_measurement")
1013 .unwrap()]
1014 .trim(),
1015 measure.clone()
1016 );
1017 assert_eq!(
1018 record_ns[header
1019 .iter()
1020 .position(|&r| r.trim() == "_measurement")
1021 .unwrap()]
1022 .trim(),
1023 measure.clone()
1024 );
1025
1026 assert_eq!(
1028 record1[header
1029 .iter()
1030 .position(|&r| r.trim() == "metric_type")
1031 .unwrap()]
1032 .trim(),
1033 "logs"
1034 );
1035 assert_eq!(
1036 record2[header
1037 .iter()
1038 .position(|&r| r.trim() == "metric_type")
1039 .unwrap()]
1040 .trim(),
1041 "logs"
1042 );
1043 assert_eq!(
1044 record_ns[header
1045 .iter()
1046 .position(|&r| r.trim() == "metric_type")
1047 .unwrap()]
1048 .trim(),
1049 "logs"
1050 );
1051 assert_eq!(
1052 record1[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
1053 "aws.cloud.eur"
1054 );
1055 assert_eq!(
1056 record2[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
1057 "aws.cloud.eur"
1058 );
1059 assert_eq!(
1060 record_ns[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
1061 "aws.cloud.eur"
1062 );
1063 assert_eq!(
1064 record1[header
1065 .iter()
1066 .position(|&r| r.trim() == "source_type")
1067 .unwrap()]
1068 .trim(),
1069 "file"
1070 );
1071 assert_eq!(
1072 record2[header
1073 .iter()
1074 .position(|&r| r.trim() == "source_type")
1075 .unwrap()]
1076 .trim(),
1077 "file"
1078 );
1079 assert_eq!(
1080 record_ns[header
1081 .iter()
1082 .position(|&r| r.trim() == "source_type")
1083 .unwrap()]
1084 .trim(),
1085 "file"
1086 );
1087
1088 assert_eq!(
1090 record1[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1091 "message"
1092 );
1093 assert_eq!(
1094 record2[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1095 "message"
1096 );
1097 assert_eq!(
1098 record_ns[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1099 "message"
1100 );
1101 assert_eq!(
1102 record1[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1103 "message_1"
1104 );
1105 assert_eq!(
1106 record2[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1107 "message_2"
1108 );
1109 assert_eq!(
1110 record_ns[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1111 "namespaced message"
1112 );
1113 }
1114}