vector/sinks/influxdb/
logs.rs

1use std::collections::{HashMap, HashSet};
2
3use bytes::{Bytes, BytesMut};
4use futures::SinkExt;
5use http::{Request, Uri};
6use indoc::indoc;
7use vector_lib::{
8    config::log_schema,
9    configurable::configurable_component,
10    lookup::{PathPrefix, lookup_v2::OptionalValuePath},
11    schema,
12};
13use vrl::{event_path, path::OwnedValuePath, value::Kind};
14
15use super::{
16    Field, InfluxDb1Settings, InfluxDb2Settings, ProtocolVersion, encode_timestamp, healthcheck,
17    influx_line_protocol, influxdb_settings,
18};
19use crate::{
20    codecs::Transformer,
21    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
22    event::{Event, KeyString, MetricTags, Value},
23    http::HttpClient,
24    internal_events::InfluxdbEncodingError,
25    sinks::{
26        Healthcheck, VectorSink,
27        util::{
28            BatchConfig, Buffer, Compression, SinkBatchSettings, TowerRequestConfig,
29            http::{BatchedHttpSink, HttpEventEncoder, HttpSink},
30        },
31    },
32    tls::{TlsConfig, TlsSettings},
33};
34
35#[derive(Clone, Copy, Debug, Default)]
36pub struct InfluxDbLogsDefaultBatchSettings;
37
38impl SinkBatchSettings for InfluxDbLogsDefaultBatchSettings {
39    const MAX_EVENTS: Option<usize> = None;
40    const MAX_BYTES: Option<usize> = Some(1_000_000);
41    const TIMEOUT_SECS: f64 = 1.0;
42}
43
44/// Configuration for the `influxdb_logs` sink.
45#[configurable_component(sink("influxdb_logs", "Deliver log event data to InfluxDB."))]
46#[derive(Clone, Debug, Default)]
47#[serde(deny_unknown_fields)]
48pub struct InfluxDbLogsConfig {
49    /// The namespace of the measurement name to use.
50    ///
51    /// When specified, the measurement name is `<namespace>.vector`.
52    ///
53    #[configurable(
54        deprecated = "This field is deprecated, and `measurement` should be used instead."
55    )]
56    #[configurable(metadata(docs::examples = "service"))]
57    pub namespace: Option<String>,
58
59    /// The name of the InfluxDB measurement that is written to.
60    #[configurable(metadata(docs::examples = "vector-logs"))]
61    pub measurement: Option<String>,
62
63    /// The endpoint to send data to.
64    ///
65    /// This should be a full HTTP URI, including the scheme, host, and port.
66    #[configurable(metadata(docs::examples = "http://localhost:8086"))]
67    pub endpoint: String,
68
69    /// The list of names of log fields that should be added as tags to each measurement.
70    ///
71    /// By default Vector adds `metric_type` as well as the configured `log_schema.host_key` and
72    /// `log_schema.source_type_key` options.
73    #[serde(default)]
74    #[configurable(metadata(docs::examples = "field1"))]
75    #[configurable(metadata(docs::examples = "parent.child_field"))]
76    pub tags: Vec<KeyString>,
77
78    #[serde(flatten)]
79    pub influxdb1_settings: Option<InfluxDb1Settings>,
80
81    #[serde(flatten)]
82    pub influxdb2_settings: Option<InfluxDb2Settings>,
83
84    #[configurable(derived)]
85    #[serde(skip_serializing_if = "crate::serde::is_default", default)]
86    pub encoding: Transformer,
87
88    #[configurable(derived)]
89    #[serde(default)]
90    pub batch: BatchConfig<InfluxDbLogsDefaultBatchSettings>,
91
92    #[configurable(derived)]
93    #[serde(default)]
94    pub request: TowerRequestConfig,
95
96    #[configurable(derived)]
97    pub tls: Option<TlsConfig>,
98
99    #[configurable(derived)]
100    #[serde(
101        default,
102        deserialize_with = "crate::serde::bool_or_struct",
103        skip_serializing_if = "crate::serde::is_default"
104    )]
105    acknowledgements: AcknowledgementsConfig,
106
107    // `host_key`, `message_key`, and `source_type_key` are `Option` as we want `vector generate`
108    // to produce a config with these as `None`, to not accidentally override a users configured
109    // `log_schema`. Generating is constrained by build-time and can't account for changes to the
110    // default `log_schema`.
111    /// Use this option to customize the key containing the hostname.
112    ///
113    /// The setting of `log_schema.host_key`, usually `host`, is used here by default.
114    #[configurable(metadata(docs::examples = "hostname"))]
115    pub host_key: Option<OptionalValuePath>,
116
117    /// Use this option to customize the key containing the message.
118    ///
119    /// The setting of `log_schema.message_key`, usually `message`, is used here by default.
120    #[configurable(metadata(docs::examples = "text"))]
121    pub message_key: Option<OptionalValuePath>,
122
123    /// Use this option to customize the key containing the source_type.
124    ///
125    /// The setting of `log_schema.source_type_key`, usually `source_type`, is used here by default.
126    #[configurable(metadata(docs::examples = "source"))]
127    pub source_type_key: Option<OptionalValuePath>,
128}
129
130#[derive(Debug)]
131struct InfluxDbLogsSink {
132    uri: Uri,
133    token: String,
134    protocol_version: ProtocolVersion,
135    measurement: String,
136    tags: HashSet<KeyString>,
137    transformer: Transformer,
138    host_key: OwnedValuePath,
139    message_key: OwnedValuePath,
140    source_type_key: OwnedValuePath,
141}
142
143impl GenerateConfig for InfluxDbLogsConfig {
144    fn generate_config() -> toml::Value {
145        toml::from_str(indoc! {r#"
146            endpoint = "http://localhost:8086/"
147            namespace = "my-namespace"
148            tags = []
149            org = "my-org"
150            bucket = "my-bucket"
151            token = "${INFLUXDB_TOKEN}"
152        "#})
153        .unwrap()
154    }
155}
156
157#[async_trait::async_trait]
158#[typetag::serde(name = "influxdb_logs")]
159impl SinkConfig for InfluxDbLogsConfig {
160    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
161        let measurement = self.get_measurement()?;
162        let tags: HashSet<KeyString> = self.tags.iter().cloned().collect();
163
164        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
165        let client = HttpClient::new(tls_settings, cx.proxy())?;
166        let healthcheck = self.healthcheck(client.clone())?;
167
168        let batch = self.batch.into_batch_settings()?;
169        let request = self.request.into_settings();
170
171        let settings = influxdb_settings(
172            self.influxdb1_settings.clone(),
173            self.influxdb2_settings.clone(),
174        )
175        .unwrap();
176
177        let endpoint = self.endpoint.clone();
178        let uri = settings.write_uri(endpoint).unwrap();
179
180        let token = settings.token();
181        let protocol_version = settings.protocol_version();
182
183        let host_key = self
184            .host_key
185            .as_ref()
186            .and_then(|k| k.path.clone())
187            .or_else(|| log_schema().host_key().cloned())
188            .expect("global log_schema.host_key to be valid path");
189
190        let message_key = self
191            .message_key
192            .as_ref()
193            .and_then(|k| k.path.clone())
194            .or_else(|| log_schema().message_key().cloned())
195            .expect("global log_schema.message_key to be valid path");
196
197        let source_type_key = self
198            .source_type_key
199            .as_ref()
200            .and_then(|k| k.path.clone())
201            .or_else(|| log_schema().source_type_key().cloned())
202            .expect("global log_schema.source_type_key to be valid path");
203
204        let sink = InfluxDbLogsSink {
205            uri,
206            token: token.inner().to_owned(),
207            protocol_version,
208            measurement,
209            tags,
210            transformer: self.encoding.clone(),
211            host_key,
212            message_key,
213            source_type_key,
214        };
215
216        let sink = BatchedHttpSink::new(
217            sink,
218            Buffer::new(batch.size, Compression::None),
219            request,
220            batch.timeout,
221            client,
222        )
223        .sink_map_err(|error| error!(message = "Fatal influxdb_logs sink error.", %error));
224
225        #[allow(deprecated)]
226        Ok((VectorSink::from_event_sink(sink), healthcheck))
227    }
228
229    fn input(&self) -> Input {
230        let requirements = schema::Requirement::empty()
231            .optional_meaning("message", Kind::bytes())
232            .optional_meaning("host", Kind::bytes())
233            .optional_meaning("timestamp", Kind::timestamp());
234
235        Input::log().with_schema_requirement(requirements)
236    }
237
238    fn acknowledgements(&self) -> &AcknowledgementsConfig {
239        &self.acknowledgements
240    }
241}
242
243struct InfluxDbLogsEncoder {
244    protocol_version: ProtocolVersion,
245    measurement: String,
246    tags: HashSet<KeyString>,
247    transformer: Transformer,
248    host_key: OwnedValuePath,
249    message_key: OwnedValuePath,
250    source_type_key: OwnedValuePath,
251}
252
253impl HttpEventEncoder<BytesMut> for InfluxDbLogsEncoder {
254    fn encode_event(&mut self, event: Event) -> Option<BytesMut> {
255        let mut log = event.into_log();
256        // If the event isn't an object (`. = "foo"`), inserting or renaming will result in losing
257        // the original value that was assigned to the root. To avoid this we intentionally rename
258        // the path that points to "message" such that it has a dedicated key.
259        // TODO: add a `TargetPath::is_event_root()` to conditionally rename?
260        if let Some(message_path) = log.message_path().cloned().as_ref() {
261            log.rename_key(message_path, (PathPrefix::Event, &self.message_key));
262        }
263        // Add the `host` and `source_type` to the HashSet of tags to include
264        // Ensure those paths are on the event to be encoded, rather than metadata
265        if let Some(host_path) = log.host_path().cloned().as_ref() {
266            self.tags.replace(host_path.path.to_string().into());
267            log.rename_key(host_path, (PathPrefix::Event, &self.host_key));
268        }
269
270        if let Some(source_type_path) = log.source_type_path().cloned().as_ref() {
271            self.tags.replace(source_type_path.path.to_string().into());
272            log.rename_key(source_type_path, (PathPrefix::Event, &self.source_type_key));
273        }
274
275        self.tags.replace("metric_type".into());
276        log.insert(event_path!("metric_type"), "logs");
277
278        // Timestamp
279        let timestamp = encode_timestamp(match log.remove_timestamp() {
280            Some(Value::Timestamp(ts)) => Some(ts),
281            _ => None,
282        });
283
284        let log = {
285            let mut event = Event::from(log);
286            self.transformer.transform(&mut event);
287            event.into_log()
288        };
289
290        // Tags + Fields
291        let mut tags = MetricTags::default();
292        let mut fields: HashMap<KeyString, Field> = HashMap::new();
293        log.convert_to_fields().for_each(|(key, value)| {
294            if self.tags.contains(&key[..]) {
295                tags.replace(key.into(), value.to_string_lossy().into_owned());
296            } else {
297                fields.insert(key, to_field(value));
298            }
299        });
300
301        let mut output = BytesMut::new();
302        if let Err(error_message) = influx_line_protocol(
303            self.protocol_version,
304            &self.measurement,
305            Some(tags),
306            Some(fields),
307            timestamp,
308            &mut output,
309        ) {
310            emit!(InfluxdbEncodingError {
311                error_message,
312                count: 1
313            });
314            return None;
315        };
316
317        Some(output)
318    }
319}
320
321impl HttpSink for InfluxDbLogsSink {
322    type Input = BytesMut;
323    type Output = BytesMut;
324    type Encoder = InfluxDbLogsEncoder;
325
326    fn build_encoder(&self) -> Self::Encoder {
327        InfluxDbLogsEncoder {
328            protocol_version: self.protocol_version,
329            measurement: self.measurement.clone(),
330            tags: self.tags.clone(),
331            transformer: self.transformer.clone(),
332            host_key: self.host_key.clone(),
333            message_key: self.message_key.clone(),
334            source_type_key: self.source_type_key.clone(),
335        }
336    }
337
338    async fn build_request(&self, events: Self::Output) -> crate::Result<Request<Bytes>> {
339        Request::post(&self.uri)
340            .header("Content-Type", "text/plain")
341            .header("Authorization", format!("Token {}", &self.token))
342            .body(events.freeze())
343            .map_err(Into::into)
344    }
345}
346
347impl InfluxDbLogsConfig {
348    fn get_measurement(&self) -> Result<String, &'static str> {
349        match (self.measurement.as_ref(), self.namespace.as_ref()) {
350            (Some(measure), Some(_)) => {
351                warn!("Option `namespace` has been superseded by `measurement`.");
352                Ok(measure.clone())
353            }
354            (Some(measure), None) => Ok(measure.clone()),
355            (None, Some(namespace)) => {
356                warn!(
357                    "Option `namespace` has been deprecated. Use `measurement` instead. \
358                       For example, you can use `measurement=<namespace>.vector` for the \
359                       same effect."
360                );
361                Ok(format!("{namespace}.vector"))
362            }
363            (None, None) => Err("The `measurement` option is required."),
364        }
365    }
366
367    fn healthcheck(&self, client: HttpClient) -> crate::Result<Healthcheck> {
368        let config = self.clone();
369
370        let healthcheck = healthcheck(
371            config.endpoint,
372            config.influxdb1_settings,
373            config.influxdb2_settings,
374            client,
375        )?;
376
377        Ok(healthcheck)
378    }
379}
380
381fn to_field(value: &Value) -> Field {
382    match value {
383        Value::Integer(num) => Field::Int(*num),
384        Value::Float(num) => Field::Float(num.into_inner()),
385        Value::Boolean(b) => Field::Bool(*b),
386        _ => Field::String(value.to_string_lossy().into_owned()),
387    }
388}
389
390#[cfg(test)]
391mod tests {
392    use chrono::{Utc, offset::TimeZone};
393    use futures::{StreamExt, channel::mpsc, stream};
394    use http::{StatusCode, request::Parts};
395    use indoc::indoc;
396    use vector_lib::{
397        event::{BatchNotifier, BatchStatus, Event, LogEvent},
398        lookup::owned_value_path,
399    };
400
401    use super::*;
402    use crate::{
403        sinks::{
404            influxdb::test_util::{assert_fields, split_line_protocol, ts},
405            util::test::{build_test_server_status, load_sink},
406        },
407        test_util::{
408            components::{
409                COMPONENT_ERROR_TAGS, HTTP_SINK_TAGS, run_and_assert_sink_compliance,
410                run_and_assert_sink_error,
411            },
412            next_addr,
413        },
414    };
415
416    type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>;
417
418    #[test]
419    fn generate_config() {
420        crate::test_util::test_generate_config::<InfluxDbLogsConfig>();
421    }
422
423    #[test]
424    fn test_config_without_tags() {
425        let config = indoc! {r#"
426            namespace = "vector-logs"
427            endpoint = "http://localhost:9999"
428            bucket = "my-bucket"
429            org = "my-org"
430            token = "my-token"
431        "#};
432
433        toml::from_str::<InfluxDbLogsConfig>(config).unwrap();
434    }
435
436    #[test]
437    fn test_config_measurement_from_namespace() {
438        let config = indoc! {r#"
439            namespace = "ns"
440            endpoint = "http://localhost:9999"
441        "#};
442
443        let sink_config = toml::from_str::<InfluxDbLogsConfig>(config).unwrap();
444        assert_eq!("ns.vector", sink_config.get_measurement().unwrap());
445    }
446
447    #[test]
448    fn test_encode_event_apply_rules() {
449        let mut event = Event::Log(LogEvent::from("hello"));
450        event.as_mut_log().insert("host", "aws.cloud.eur");
451        event.as_mut_log().insert("timestamp", ts());
452
453        let mut sink = create_sink(
454            "http://localhost:9999",
455            "my-token",
456            ProtocolVersion::V1,
457            "vector",
458            ["metric_type", "host"].to_vec(),
459        );
460        sink.transformer
461            .set_except_fields(Some(vec!["host".into()]))
462            .unwrap();
463        let mut encoder = sink.build_encoder();
464
465        let bytes = encoder.encode_event(event.clone()).unwrap();
466        let string = std::str::from_utf8(&bytes).unwrap();
467
468        let line_protocol = split_line_protocol(string);
469        assert_eq!("vector", line_protocol.0);
470        assert_eq!("metric_type=logs", line_protocol.1);
471        assert_fields(line_protocol.2.to_string(), ["message=\"hello\""].to_vec());
472        assert_eq!("1542182950000000011\n", line_protocol.3);
473
474        sink.transformer
475            .set_except_fields(Some(vec!["metric_type".into()]))
476            .unwrap();
477        let mut encoder = sink.build_encoder();
478        let bytes = encoder.encode_event(event.clone()).unwrap();
479        let string = std::str::from_utf8(&bytes).unwrap();
480        let line_protocol = split_line_protocol(string);
481        assert_eq!(
482            "host=aws.cloud.eur", line_protocol.1,
483            "metric_type tag should be excluded"
484        );
485        assert_fields(line_protocol.2, ["message=\"hello\""].to_vec());
486    }
487
488    #[test]
489    fn test_encode_event_v1() {
490        let mut event = Event::Log(LogEvent::from("hello"));
491        event.as_mut_log().insert("host", "aws.cloud.eur");
492        event.as_mut_log().insert("source_type", "file");
493
494        event.as_mut_log().insert("int", 4i32);
495        event.as_mut_log().insert("float", 5.5);
496        event.as_mut_log().insert("bool", true);
497        event.as_mut_log().insert("string", "thisisastring");
498        event.as_mut_log().insert("timestamp", ts());
499
500        let sink = create_sink(
501            "http://localhost:9999",
502            "my-token",
503            ProtocolVersion::V1,
504            "vector",
505            ["source_type", "host", "metric_type"].to_vec(),
506        );
507        let mut encoder = sink.build_encoder();
508
509        let bytes = encoder.encode_event(event).unwrap();
510        let string = std::str::from_utf8(&bytes).unwrap();
511
512        let line_protocol = split_line_protocol(string);
513        assert_eq!("vector", line_protocol.0);
514        assert_eq!(
515            "host=aws.cloud.eur,metric_type=logs,source_type=file",
516            line_protocol.1
517        );
518        assert_fields(
519            line_protocol.2.to_string(),
520            [
521                "int=4i",
522                "float=5.5",
523                "bool=true",
524                "string=\"thisisastring\"",
525                "message=\"hello\"",
526            ]
527            .to_vec(),
528        );
529
530        assert_eq!("1542182950000000011\n", line_protocol.3);
531    }
532
533    #[test]
534    fn test_encode_event() {
535        let mut event = Event::Log(LogEvent::from("hello"));
536        event.as_mut_log().insert("host", "aws.cloud.eur");
537        event.as_mut_log().insert("source_type", "file");
538
539        event.as_mut_log().insert("int", 4i32);
540        event.as_mut_log().insert("float", 5.5);
541        event.as_mut_log().insert("bool", true);
542        event.as_mut_log().insert("string", "thisisastring");
543        event.as_mut_log().insert("timestamp", ts());
544
545        let sink = create_sink(
546            "http://localhost:9999",
547            "my-token",
548            ProtocolVersion::V2,
549            "vector",
550            ["source_type", "host", "metric_type"].to_vec(),
551        );
552        let mut encoder = sink.build_encoder();
553
554        let bytes = encoder.encode_event(event).unwrap();
555        let string = std::str::from_utf8(&bytes).unwrap();
556
557        let line_protocol = split_line_protocol(string);
558        assert_eq!("vector", line_protocol.0);
559        assert_eq!(
560            "host=aws.cloud.eur,metric_type=logs,source_type=file",
561            line_protocol.1
562        );
563        assert_fields(
564            line_protocol.2.to_string(),
565            [
566                "int=4i",
567                "float=5.5",
568                "bool=true",
569                "string=\"thisisastring\"",
570                "message=\"hello\"",
571            ]
572            .to_vec(),
573        );
574
575        assert_eq!("1542182950000000011\n", line_protocol.3);
576    }
577
578    #[test]
579    fn test_encode_event_without_tags() {
580        let mut event = Event::Log(LogEvent::from("hello"));
581
582        event.as_mut_log().insert("value", 100);
583        event.as_mut_log().insert("timestamp", ts());
584
585        let mut sink = create_sink(
586            "http://localhost:9999",
587            "my-token",
588            ProtocolVersion::V2,
589            "vector",
590            [].to_vec(),
591        );
592        // exclude default metric_type tag so to emit empty tags
593        sink.transformer
594            .set_except_fields(Some(vec!["metric_type".into()]))
595            .unwrap();
596        let mut encoder = sink.build_encoder();
597
598        let bytes = encoder.encode_event(event).unwrap();
599        let line = std::str::from_utf8(&bytes).unwrap();
600        assert!(
601            line.starts_with("vector "),
602            "measurement (without tags) should ends with space ' '"
603        );
604
605        let line_protocol = split_line_protocol(line);
606        assert_eq!("vector", line_protocol.0);
607        assert_eq!("", line_protocol.1, "tags should be empty");
608        assert_fields(
609            line_protocol.2,
610            ["value=100i", "message=\"hello\""].to_vec(),
611        );
612
613        assert_eq!("1542182950000000011\n", line_protocol.3);
614    }
615
616    #[test]
617    fn test_encode_nested_fields() {
618        let mut event = LogEvent::default();
619
620        event.insert("a", 1);
621        event.insert("nested.field", "2");
622        event.insert("nested.bool", true);
623        event.insert("nested.array[0]", "example-value");
624        event.insert("nested.array[2]", "another-value");
625        event.insert("nested.array[3]", 15);
626
627        let sink = create_sink(
628            "http://localhost:9999",
629            "my-token",
630            ProtocolVersion::V2,
631            "vector",
632            ["metric_type"].to_vec(),
633        );
634        let mut encoder = sink.build_encoder();
635
636        let bytes = encoder.encode_event(event.into()).unwrap();
637        let string = std::str::from_utf8(&bytes).unwrap();
638
639        let line_protocol = split_line_protocol(string);
640        assert_eq!("vector", line_protocol.0);
641        assert_eq!("metric_type=logs", line_protocol.1);
642        assert_fields(
643            line_protocol.2,
644            [
645                "a=1i",
646                "nested.array[0]=\"example-value\"",
647                "nested.array[1]=\"<null>\"",
648                "nested.array[2]=\"another-value\"",
649                "nested.array[3]=15i",
650                "nested.bool=true",
651                "nested.field=\"2\"",
652            ]
653            .to_vec(),
654        );
655    }
656
657    #[test]
658    fn test_add_tag() {
659        let mut event = Event::Log(LogEvent::from("hello"));
660        event.as_mut_log().insert("source_type", "file");
661
662        event.as_mut_log().insert("as_a_tag", 10);
663        event.as_mut_log().insert("timestamp", ts());
664
665        let sink = create_sink(
666            "http://localhost:9999",
667            "my-token",
668            ProtocolVersion::V2,
669            "vector",
670            ["as_a_tag", "not_exists_field", "source_type", "metric_type"].to_vec(),
671        );
672        let mut encoder = sink.build_encoder();
673
674        let bytes = encoder.encode_event(event).unwrap();
675        let string = std::str::from_utf8(&bytes).unwrap();
676
677        let line_protocol = split_line_protocol(string);
678        assert_eq!("vector", line_protocol.0);
679        assert_eq!(
680            "as_a_tag=10,metric_type=logs,source_type=file",
681            line_protocol.1
682        );
683        assert_fields(line_protocol.2.to_string(), ["message=\"hello\""].to_vec());
684
685        assert_eq!("1542182950000000011\n", line_protocol.3);
686    }
687
688    #[tokio::test]
689    async fn smoke_v1() {
690        let rx = smoke_test(
691            r#"database = "my-database""#,
692            StatusCode::OK,
693            BatchStatus::Delivered,
694        )
695        .await;
696
697        let query = receive_response(rx).await;
698        assert!(query.contains("db=my-database"));
699        assert!(query.contains("precision=ns"));
700    }
701
702    #[tokio::test]
703    async fn smoke_v1_failure() {
704        smoke_test(
705            r#"database = "my-database""#,
706            StatusCode::BAD_REQUEST,
707            BatchStatus::Rejected,
708        )
709        .await;
710    }
711
712    #[tokio::test]
713    async fn smoke_v2() {
714        let rx = smoke_test(
715            indoc! {r#"
716            bucket = "my-bucket"
717            org = "my-org"
718            token = "my-token"
719        "#},
720            StatusCode::OK,
721            BatchStatus::Delivered,
722        )
723        .await;
724
725        let query = receive_response(rx).await;
726        assert!(query.contains("org=my-org"));
727        assert!(query.contains("bucket=my-bucket"));
728        assert!(query.contains("precision=ns"));
729    }
730
731    #[tokio::test]
732    async fn smoke_v2_failure() {
733        smoke_test(
734            indoc! {r#"
735            bucket = "my-bucket"
736            org = "my-org"
737            token = "my-token"
738        "#},
739            StatusCode::BAD_REQUEST,
740            BatchStatus::Rejected,
741        )
742        .await;
743    }
744
745    async fn smoke_test(
746        config: &str,
747        status_code: StatusCode,
748        batch_status: BatchStatus,
749    ) -> Receiver {
750        let config = format!(
751            indoc! {r#"
752            measurement = "vector"
753            endpoint = "http://localhost:9999"
754            {}
755        "#},
756            config
757        );
758        let (mut config, cx) = load_sink::<InfluxDbLogsConfig>(&config).unwrap();
759
760        // Make sure we can build the config
761        _ = config.build(cx.clone()).await.unwrap();
762
763        let addr = next_addr();
764        // Swap out the host so we can force send it
765        // to our local server
766        let host = format!("http://{addr}");
767        config.endpoint = host;
768
769        let (sink, _) = config.build(cx).await.unwrap();
770
771        let (rx, _trigger, server) = build_test_server_status(addr, status_code);
772        tokio::spawn(server);
773
774        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
775
776        let lines = std::iter::repeat(())
777            .map(move |_| "message_value")
778            .take(5)
779            .collect::<Vec<_>>();
780        let mut events = Vec::new();
781
782        // Create 5 events with custom field
783        for (i, line) in lines.iter().enumerate() {
784            let mut event = LogEvent::from(line.to_string()).with_batch_notifier(&batch);
785            event.insert(format!("key{i}").as_str(), format!("value{i}"));
786
787            let timestamp = Utc
788                .with_ymd_and_hms(1970, 1, 1, 0, 0, (i as u32) + 1)
789                .single()
790                .expect("invalid timestamp");
791            event.insert("timestamp", timestamp);
792            event.insert("source_type", "file");
793
794            events.push(Event::Log(event));
795        }
796        drop(batch);
797
798        if batch_status == BatchStatus::Delivered {
799            run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
800        } else {
801            run_and_assert_sink_error(sink, stream::iter(events), &COMPONENT_ERROR_TAGS).await;
802        }
803
804        assert_eq!(receiver.try_recv(), Ok(batch_status));
805
806        rx
807    }
808
809    async fn receive_response(mut rx: Receiver) -> String {
810        let output = rx.next().await.unwrap();
811
812        let request = &output.0;
813        let query = request.uri.query().unwrap();
814
815        let body = std::str::from_utf8(&output.1[..]).unwrap();
816        let mut lines = body.lines();
817
818        assert_eq!(5, lines.clone().count());
819        assert_line_protocol(0, lines.next());
820
821        query.into()
822    }
823
824    fn assert_line_protocol(i: i64, value: Option<&str>) {
825        //vector,metric_type=logs key0="value0",message="message_value" 1000000000
826        let line_protocol = split_line_protocol(value.unwrap());
827        assert_eq!("vector", line_protocol.0);
828        assert_eq!("metric_type=logs,source_type=file", line_protocol.1);
829        assert_fields(
830            line_protocol.2.to_string(),
831            [
832                &*format!("key{i}=\"value{i}\""),
833                "message=\"message_value\"",
834            ]
835            .to_vec(),
836        );
837
838        assert_eq!(((i + 1) * 1000000000).to_string(), line_protocol.3);
839    }
840
841    fn create_sink(
842        uri: &str,
843        token: &str,
844        protocol_version: ProtocolVersion,
845        measurement: &str,
846        tags: Vec<&str>,
847    ) -> InfluxDbLogsSink {
848        let uri = uri.parse::<Uri>().unwrap();
849        let token = token.to_string();
850        let measurement = measurement.to_string();
851        let tags: HashSet<_> = tags.into_iter().map(|tag| tag.into()).collect();
852        InfluxDbLogsSink {
853            uri,
854            token,
855            protocol_version,
856            measurement,
857            tags,
858            transformer: Default::default(),
859            host_key: owned_value_path!("host"),
860            message_key: owned_value_path!("message"),
861            source_type_key: owned_value_path!("source_type"),
862        }
863    }
864}
865
866#[cfg(feature = "influxdb-integration-tests")]
867#[cfg(test)]
868mod integration_tests {
869    use std::sync::Arc;
870
871    use chrono::Utc;
872    use futures::stream;
873    use vector_lib::{
874        codecs::BytesDeserializerConfig,
875        config::{LegacyKey, LogNamespace},
876        event::{BatchNotifier, BatchStatus, Event, LogEvent},
877        lookup::{owned_value_path, path},
878    };
879    use vrl::value;
880
881    use super::*;
882    use crate::{
883        config::SinkContext,
884        sinks::influxdb::{
885            InfluxDb2Settings,
886            logs::InfluxDbLogsConfig,
887            test_util::{BUCKET, ORG, TOKEN, address_v2, onboarding_v2},
888        },
889        test_util::components::{HTTP_SINK_TAGS, run_and_assert_sink_compliance},
890    };
891
892    #[tokio::test]
893    async fn influxdb2_logs_put_data() {
894        let endpoint = address_v2();
895        onboarding_v2(&endpoint).await;
896
897        let now = Utc::now();
898        let measure = format!(
899            "vector-{}",
900            now.timestamp_nanos_opt().expect("Timestamp out of range")
901        );
902
903        let cx = SinkContext::default();
904
905        let config = InfluxDbLogsConfig {
906            namespace: None,
907            measurement: Some(measure.clone()),
908            endpoint: endpoint.clone(),
909            tags: Default::default(),
910            influxdb1_settings: None,
911            influxdb2_settings: Some(InfluxDb2Settings {
912                org: ORG.to_string(),
913                bucket: BUCKET.to_string(),
914                token: TOKEN.to_string().into(),
915            }),
916            encoding: Default::default(),
917            batch: Default::default(),
918            request: Default::default(),
919            tls: None,
920            acknowledgements: Default::default(),
921            host_key: None,
922            message_key: None,
923            source_type_key: None,
924        };
925
926        let (sink, _) = config.build(cx).await.unwrap();
927
928        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
929
930        let mut event1 = LogEvent::from("message_1").with_batch_notifier(&batch);
931        event1.insert("host", "aws.cloud.eur");
932        event1.insert("source_type", "file");
933
934        let mut event2 = LogEvent::from("message_2").with_batch_notifier(&batch);
935        event2.insert("host", "aws.cloud.eur");
936        event2.insert("source_type", "file");
937
938        let mut namespaced_log =
939            LogEvent::from(value!("namespaced message")).with_batch_notifier(&batch);
940        LogNamespace::Vector.insert_source_metadata(
941            "file",
942            &mut namespaced_log,
943            Some(LegacyKey::Overwrite(path!("host"))),
944            path!("host"),
945            "aws.cloud.eur",
946        );
947        LogNamespace::Vector.insert_standard_vector_source_metadata(
948            &mut namespaced_log,
949            "file",
950            now,
951        );
952        let schema = BytesDeserializerConfig
953            .schema_definition(LogNamespace::Vector)
954            .with_metadata_field(
955                &owned_value_path!("file", "host"),
956                Kind::bytes(),
957                Some("host"),
958            );
959        namespaced_log
960            .metadata_mut()
961            .set_schema_definition(&Arc::new(schema));
962
963        drop(batch);
964
965        let events = vec![
966            Event::Log(event1),
967            Event::Log(event2),
968            Event::Log(namespaced_log),
969        ];
970
971        run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
972
973        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
974
975        let mut body = std::collections::HashMap::new();
976        body.insert("query", format!("from(bucket:\"my-bucket\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"{}\")", measure.clone()));
977        body.insert("type", "flux".to_owned());
978
979        let client = reqwest::Client::builder()
980            .danger_accept_invalid_certs(true)
981            .build()
982            .unwrap();
983
984        let res = client
985            .post(format!("{endpoint}/api/v2/query?org=my-org"))
986            .json(&body)
987            .header("accept", "application/json")
988            .header("Authorization", "Token my-token")
989            .send()
990            .await
991            .unwrap();
992        let string = res.text().await.unwrap();
993
994        let lines = string.split('\n').collect::<Vec<&str>>();
995        let header = lines[0].split(',').collect::<Vec<&str>>();
996        let record1 = lines[1].split(',').collect::<Vec<&str>>();
997        let record2 = lines[2].split(',').collect::<Vec<&str>>();
998        let record_ns = lines[3].split(',').collect::<Vec<&str>>();
999
1000        // measurement
1001        assert_eq!(
1002            record1[header
1003                .iter()
1004                .position(|&r| r.trim() == "_measurement")
1005                .unwrap()]
1006            .trim(),
1007            measure.clone()
1008        );
1009        assert_eq!(
1010            record2[header
1011                .iter()
1012                .position(|&r| r.trim() == "_measurement")
1013                .unwrap()]
1014            .trim(),
1015            measure.clone()
1016        );
1017        assert_eq!(
1018            record_ns[header
1019                .iter()
1020                .position(|&r| r.trim() == "_measurement")
1021                .unwrap()]
1022            .trim(),
1023            measure.clone()
1024        );
1025
1026        // tags
1027        assert_eq!(
1028            record1[header
1029                .iter()
1030                .position(|&r| r.trim() == "metric_type")
1031                .unwrap()]
1032            .trim(),
1033            "logs"
1034        );
1035        assert_eq!(
1036            record2[header
1037                .iter()
1038                .position(|&r| r.trim() == "metric_type")
1039                .unwrap()]
1040            .trim(),
1041            "logs"
1042        );
1043        assert_eq!(
1044            record_ns[header
1045                .iter()
1046                .position(|&r| r.trim() == "metric_type")
1047                .unwrap()]
1048            .trim(),
1049            "logs"
1050        );
1051        assert_eq!(
1052            record1[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
1053            "aws.cloud.eur"
1054        );
1055        assert_eq!(
1056            record2[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
1057            "aws.cloud.eur"
1058        );
1059        assert_eq!(
1060            record_ns[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
1061            "aws.cloud.eur"
1062        );
1063        assert_eq!(
1064            record1[header
1065                .iter()
1066                .position(|&r| r.trim() == "source_type")
1067                .unwrap()]
1068            .trim(),
1069            "file"
1070        );
1071        assert_eq!(
1072            record2[header
1073                .iter()
1074                .position(|&r| r.trim() == "source_type")
1075                .unwrap()]
1076            .trim(),
1077            "file"
1078        );
1079        assert_eq!(
1080            record_ns[header
1081                .iter()
1082                .position(|&r| r.trim() == "source_type")
1083                .unwrap()]
1084            .trim(),
1085            "file"
1086        );
1087
1088        // field
1089        assert_eq!(
1090            record1[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1091            "message"
1092        );
1093        assert_eq!(
1094            record2[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1095            "message"
1096        );
1097        assert_eq!(
1098            record_ns[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1099            "message"
1100        );
1101        assert_eq!(
1102            record1[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1103            "message_1"
1104        );
1105        assert_eq!(
1106            record2[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1107            "message_2"
1108        );
1109        assert_eq!(
1110            record_ns[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1111            "namespaced message"
1112        );
1113    }
1114}