vector/sinks/influxdb/
logs.rs

1use std::collections::{HashMap, HashSet};
2
3use bytes::{Bytes, BytesMut};
4use futures::SinkExt;
5use http::{Request, Uri};
6use indoc::indoc;
7use vrl::event_path;
8use vrl::path::OwnedValuePath;
9use vrl::value::Kind;
10
11use vector_lib::config::log_schema;
12use vector_lib::configurable::configurable_component;
13use vector_lib::lookup::lookup_v2::OptionalValuePath;
14use vector_lib::lookup::PathPrefix;
15use vector_lib::schema;
16
17use super::{
18    encode_timestamp, healthcheck, influx_line_protocol, influxdb_settings, Field,
19    InfluxDb1Settings, InfluxDb2Settings, ProtocolVersion,
20};
21use crate::{
22    codecs::Transformer,
23    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
24    event::{Event, KeyString, MetricTags, Value},
25    http::HttpClient,
26    internal_events::InfluxdbEncodingError,
27    sinks::{
28        util::{
29            http::{BatchedHttpSink, HttpEventEncoder, HttpSink},
30            BatchConfig, Buffer, Compression, SinkBatchSettings, TowerRequestConfig,
31        },
32        Healthcheck, VectorSink,
33    },
34    tls::{TlsConfig, TlsSettings},
35};
36
37#[derive(Clone, Copy, Debug, Default)]
38pub struct InfluxDbLogsDefaultBatchSettings;
39
40impl SinkBatchSettings for InfluxDbLogsDefaultBatchSettings {
41    const MAX_EVENTS: Option<usize> = None;
42    const MAX_BYTES: Option<usize> = Some(1_000_000);
43    const TIMEOUT_SECS: f64 = 1.0;
44}
45
46/// Configuration for the `influxdb_logs` sink.
47#[configurable_component(sink("influxdb_logs", "Deliver log event data to InfluxDB."))]
48#[derive(Clone, Debug, Default)]
49#[serde(deny_unknown_fields)]
50pub struct InfluxDbLogsConfig {
51    /// The namespace of the measurement name to use.
52    ///
53    /// When specified, the measurement name is `<namespace>.vector`.
54    ///
55    #[configurable(
56        deprecated = "This field is deprecated, and `measurement` should be used instead."
57    )]
58    #[configurable(metadata(docs::examples = "service"))]
59    pub namespace: Option<String>,
60
61    /// The name of the InfluxDB measurement that is written to.
62    #[configurable(metadata(docs::examples = "vector-logs"))]
63    pub measurement: Option<String>,
64
65    /// The endpoint to send data to.
66    ///
67    /// This should be a full HTTP URI, including the scheme, host, and port.
68    #[configurable(metadata(docs::examples = "http://localhost:8086"))]
69    pub endpoint: String,
70
71    /// The list of names of log fields that should be added as tags to each measurement.
72    ///
73    /// By default Vector adds `metric_type` as well as the configured `log_schema.host_key` and
74    /// `log_schema.source_type_key` options.
75    #[serde(default)]
76    #[configurable(metadata(docs::examples = "field1"))]
77    #[configurable(metadata(docs::examples = "parent.child_field"))]
78    pub tags: Vec<KeyString>,
79
80    #[serde(flatten)]
81    pub influxdb1_settings: Option<InfluxDb1Settings>,
82
83    #[serde(flatten)]
84    pub influxdb2_settings: Option<InfluxDb2Settings>,
85
86    #[configurable(derived)]
87    #[serde(skip_serializing_if = "crate::serde::is_default", default)]
88    pub encoding: Transformer,
89
90    #[configurable(derived)]
91    #[serde(default)]
92    pub batch: BatchConfig<InfluxDbLogsDefaultBatchSettings>,
93
94    #[configurable(derived)]
95    #[serde(default)]
96    pub request: TowerRequestConfig,
97
98    #[configurable(derived)]
99    pub tls: Option<TlsConfig>,
100
101    #[configurable(derived)]
102    #[serde(
103        default,
104        deserialize_with = "crate::serde::bool_or_struct",
105        skip_serializing_if = "crate::serde::is_default"
106    )]
107    acknowledgements: AcknowledgementsConfig,
108
109    // `host_key`, `message_key`, and `source_type_key` are `Option` as we want `vector generate`
110    // to produce a config with these as `None`, to not accidentally override a users configured
111    // `log_schema`. Generating is constrained by build-time and can't account for changes to the
112    // default `log_schema`.
113    /// Use this option to customize the key containing the hostname.
114    ///
115    /// The setting of `log_schema.host_key`, usually `host`, is used here by default.
116    #[configurable(metadata(docs::examples = "hostname"))]
117    pub host_key: Option<OptionalValuePath>,
118
119    /// Use this option to customize the key containing the message.
120    ///
121    /// The setting of `log_schema.message_key`, usually `message`, is used here by default.
122    #[configurable(metadata(docs::examples = "text"))]
123    pub message_key: Option<OptionalValuePath>,
124
125    /// Use this option to customize the key containing the source_type.
126    ///
127    /// The setting of `log_schema.source_type_key`, usually `source_type`, is used here by default.
128    #[configurable(metadata(docs::examples = "source"))]
129    pub source_type_key: Option<OptionalValuePath>,
130}
131
132#[derive(Debug)]
133struct InfluxDbLogsSink {
134    uri: Uri,
135    token: String,
136    protocol_version: ProtocolVersion,
137    measurement: String,
138    tags: HashSet<KeyString>,
139    transformer: Transformer,
140    host_key: OwnedValuePath,
141    message_key: OwnedValuePath,
142    source_type_key: OwnedValuePath,
143}
144
145impl GenerateConfig for InfluxDbLogsConfig {
146    fn generate_config() -> toml::Value {
147        toml::from_str(indoc! {r#"
148            endpoint = "http://localhost:8086/"
149            namespace = "my-namespace"
150            tags = []
151            org = "my-org"
152            bucket = "my-bucket"
153            token = "${INFLUXDB_TOKEN}"
154        "#})
155        .unwrap()
156    }
157}
158
159#[async_trait::async_trait]
160#[typetag::serde(name = "influxdb_logs")]
161impl SinkConfig for InfluxDbLogsConfig {
162    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
163        let measurement = self.get_measurement()?;
164        let tags: HashSet<KeyString> = self.tags.iter().cloned().collect();
165
166        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
167        let client = HttpClient::new(tls_settings, cx.proxy())?;
168        let healthcheck = self.healthcheck(client.clone())?;
169
170        let batch = self.batch.into_batch_settings()?;
171        let request = self.request.into_settings();
172
173        let settings = influxdb_settings(
174            self.influxdb1_settings.clone(),
175            self.influxdb2_settings.clone(),
176        )
177        .unwrap();
178
179        let endpoint = self.endpoint.clone();
180        let uri = settings.write_uri(endpoint).unwrap();
181
182        let token = settings.token();
183        let protocol_version = settings.protocol_version();
184
185        let host_key = self
186            .host_key
187            .as_ref()
188            .and_then(|k| k.path.clone())
189            .or_else(|| log_schema().host_key().cloned())
190            .expect("global log_schema.host_key to be valid path");
191
192        let message_key = self
193            .message_key
194            .as_ref()
195            .and_then(|k| k.path.clone())
196            .or_else(|| log_schema().message_key().cloned())
197            .expect("global log_schema.message_key to be valid path");
198
199        let source_type_key = self
200            .source_type_key
201            .as_ref()
202            .and_then(|k| k.path.clone())
203            .or_else(|| log_schema().source_type_key().cloned())
204            .expect("global log_schema.source_type_key to be valid path");
205
206        let sink = InfluxDbLogsSink {
207            uri,
208            token: token.inner().to_owned(),
209            protocol_version,
210            measurement,
211            tags,
212            transformer: self.encoding.clone(),
213            host_key,
214            message_key,
215            source_type_key,
216        };
217
218        let sink = BatchedHttpSink::new(
219            sink,
220            Buffer::new(batch.size, Compression::None),
221            request,
222            batch.timeout,
223            client,
224        )
225        .sink_map_err(|error| error!(message = "Fatal influxdb_logs sink error.", %error));
226
227        #[allow(deprecated)]
228        Ok((VectorSink::from_event_sink(sink), healthcheck))
229    }
230
231    fn input(&self) -> Input {
232        let requirements = schema::Requirement::empty()
233            .optional_meaning("message", Kind::bytes())
234            .optional_meaning("host", Kind::bytes())
235            .optional_meaning("timestamp", Kind::timestamp());
236
237        Input::log().with_schema_requirement(requirements)
238    }
239
240    fn acknowledgements(&self) -> &AcknowledgementsConfig {
241        &self.acknowledgements
242    }
243}
244
245struct InfluxDbLogsEncoder {
246    protocol_version: ProtocolVersion,
247    measurement: String,
248    tags: HashSet<KeyString>,
249    transformer: Transformer,
250    host_key: OwnedValuePath,
251    message_key: OwnedValuePath,
252    source_type_key: OwnedValuePath,
253}
254
255impl HttpEventEncoder<BytesMut> for InfluxDbLogsEncoder {
256    fn encode_event(&mut self, event: Event) -> Option<BytesMut> {
257        let mut log = event.into_log();
258        // If the event isn't an object (`. = "foo"`), inserting or renaming will result in losing
259        // the original value that was assigned to the root. To avoid this we intentionally rename
260        // the path that points to "message" such that it has a dedicated key.
261        // TODO: add a `TargetPath::is_event_root()` to conditionally rename?
262        if let Some(message_path) = log.message_path().cloned().as_ref() {
263            log.rename_key(message_path, (PathPrefix::Event, &self.message_key));
264        }
265        // Add the `host` and `source_type` to the HashSet of tags to include
266        // Ensure those paths are on the event to be encoded, rather than metadata
267        if let Some(host_path) = log.host_path().cloned().as_ref() {
268            self.tags.replace(host_path.path.to_string().into());
269            log.rename_key(host_path, (PathPrefix::Event, &self.host_key));
270        }
271
272        if let Some(source_type_path) = log.source_type_path().cloned().as_ref() {
273            self.tags.replace(source_type_path.path.to_string().into());
274            log.rename_key(source_type_path, (PathPrefix::Event, &self.source_type_key));
275        }
276
277        self.tags.replace("metric_type".into());
278        log.insert(event_path!("metric_type"), "logs");
279
280        // Timestamp
281        let timestamp = encode_timestamp(match log.remove_timestamp() {
282            Some(Value::Timestamp(ts)) => Some(ts),
283            _ => None,
284        });
285
286        let log = {
287            let mut event = Event::from(log);
288            self.transformer.transform(&mut event);
289            event.into_log()
290        };
291
292        // Tags + Fields
293        let mut tags = MetricTags::default();
294        let mut fields: HashMap<KeyString, Field> = HashMap::new();
295        log.convert_to_fields().for_each(|(key, value)| {
296            if self.tags.contains(&key[..]) {
297                tags.replace(key.into(), value.to_string_lossy().into_owned());
298            } else {
299                fields.insert(key, to_field(value));
300            }
301        });
302
303        let mut output = BytesMut::new();
304        if let Err(error_message) = influx_line_protocol(
305            self.protocol_version,
306            &self.measurement,
307            Some(tags),
308            Some(fields),
309            timestamp,
310            &mut output,
311        ) {
312            emit!(InfluxdbEncodingError {
313                error_message,
314                count: 1
315            });
316            return None;
317        };
318
319        Some(output)
320    }
321}
322
323impl HttpSink for InfluxDbLogsSink {
324    type Input = BytesMut;
325    type Output = BytesMut;
326    type Encoder = InfluxDbLogsEncoder;
327
328    fn build_encoder(&self) -> Self::Encoder {
329        InfluxDbLogsEncoder {
330            protocol_version: self.protocol_version,
331            measurement: self.measurement.clone(),
332            tags: self.tags.clone(),
333            transformer: self.transformer.clone(),
334            host_key: self.host_key.clone(),
335            message_key: self.message_key.clone(),
336            source_type_key: self.source_type_key.clone(),
337        }
338    }
339
340    async fn build_request(&self, events: Self::Output) -> crate::Result<Request<Bytes>> {
341        Request::post(&self.uri)
342            .header("Content-Type", "text/plain")
343            .header("Authorization", format!("Token {}", &self.token))
344            .body(events.freeze())
345            .map_err(Into::into)
346    }
347}
348
349impl InfluxDbLogsConfig {
350    fn get_measurement(&self) -> Result<String, &'static str> {
351        match (self.measurement.as_ref(), self.namespace.as_ref()) {
352            (Some(measure), Some(_)) => {
353                warn!("Option `namespace` has been superseded by `measurement`.");
354                Ok(measure.clone())
355            }
356            (Some(measure), None) => Ok(measure.clone()),
357            (None, Some(namespace)) => {
358                warn!(
359                    "Option `namespace` has been deprecated. Use `measurement` instead. \
360                       For example, you can use `measurement=<namespace>.vector` for the \
361                       same effect."
362                );
363                Ok(format!("{namespace}.vector"))
364            }
365            (None, None) => Err("The `measurement` option is required."),
366        }
367    }
368
369    fn healthcheck(&self, client: HttpClient) -> crate::Result<Healthcheck> {
370        let config = self.clone();
371
372        let healthcheck = healthcheck(
373            config.endpoint,
374            config.influxdb1_settings,
375            config.influxdb2_settings,
376            client,
377        )?;
378
379        Ok(healthcheck)
380    }
381}
382
383fn to_field(value: &Value) -> Field {
384    match value {
385        Value::Integer(num) => Field::Int(*num),
386        Value::Float(num) => Field::Float(num.into_inner()),
387        Value::Boolean(b) => Field::Bool(*b),
388        _ => Field::String(value.to_string_lossy().into_owned()),
389    }
390}
391
392#[cfg(test)]
393mod tests {
394    use chrono::{offset::TimeZone, Utc};
395    use futures::{channel::mpsc, stream, StreamExt};
396    use http::{request::Parts, StatusCode};
397    use indoc::indoc;
398
399    use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
400    use vector_lib::lookup::owned_value_path;
401
402    use crate::{
403        sinks::{
404            influxdb::test_util::{assert_fields, split_line_protocol, ts},
405            util::test::{build_test_server_status, load_sink},
406        },
407        test_util::{
408            components::{
409                run_and_assert_sink_compliance, run_and_assert_sink_error, COMPONENT_ERROR_TAGS,
410                HTTP_SINK_TAGS,
411            },
412            next_addr,
413        },
414    };
415
416    use super::*;
417
418    type Receiver = mpsc::Receiver<(Parts, bytes::Bytes)>;
419
420    #[test]
421    fn generate_config() {
422        crate::test_util::test_generate_config::<InfluxDbLogsConfig>();
423    }
424
425    #[test]
426    fn test_config_without_tags() {
427        let config = indoc! {r#"
428            namespace = "vector-logs"
429            endpoint = "http://localhost:9999"
430            bucket = "my-bucket"
431            org = "my-org"
432            token = "my-token"
433        "#};
434
435        toml::from_str::<InfluxDbLogsConfig>(config).unwrap();
436    }
437
438    #[test]
439    fn test_config_measurement_from_namespace() {
440        let config = indoc! {r#"
441            namespace = "ns"
442            endpoint = "http://localhost:9999"
443        "#};
444
445        let sink_config = toml::from_str::<InfluxDbLogsConfig>(config).unwrap();
446        assert_eq!("ns.vector", sink_config.get_measurement().unwrap());
447    }
448
449    #[test]
450    fn test_encode_event_apply_rules() {
451        let mut event = Event::Log(LogEvent::from("hello"));
452        event.as_mut_log().insert("host", "aws.cloud.eur");
453        event.as_mut_log().insert("timestamp", ts());
454
455        let mut sink = create_sink(
456            "http://localhost:9999",
457            "my-token",
458            ProtocolVersion::V1,
459            "vector",
460            ["metric_type", "host"].to_vec(),
461        );
462        sink.transformer
463            .set_except_fields(Some(vec!["host".into()]))
464            .unwrap();
465        let mut encoder = sink.build_encoder();
466
467        let bytes = encoder.encode_event(event.clone()).unwrap();
468        let string = std::str::from_utf8(&bytes).unwrap();
469
470        let line_protocol = split_line_protocol(string);
471        assert_eq!("vector", line_protocol.0);
472        assert_eq!("metric_type=logs", line_protocol.1);
473        assert_fields(line_protocol.2.to_string(), ["message=\"hello\""].to_vec());
474        assert_eq!("1542182950000000011\n", line_protocol.3);
475
476        sink.transformer
477            .set_except_fields(Some(vec!["metric_type".into()]))
478            .unwrap();
479        let mut encoder = sink.build_encoder();
480        let bytes = encoder.encode_event(event.clone()).unwrap();
481        let string = std::str::from_utf8(&bytes).unwrap();
482        let line_protocol = split_line_protocol(string);
483        assert_eq!(
484            "host=aws.cloud.eur", line_protocol.1,
485            "metric_type tag should be excluded"
486        );
487        assert_fields(line_protocol.2, ["message=\"hello\""].to_vec());
488    }
489
490    #[test]
491    fn test_encode_event_v1() {
492        let mut event = Event::Log(LogEvent::from("hello"));
493        event.as_mut_log().insert("host", "aws.cloud.eur");
494        event.as_mut_log().insert("source_type", "file");
495
496        event.as_mut_log().insert("int", 4i32);
497        event.as_mut_log().insert("float", 5.5);
498        event.as_mut_log().insert("bool", true);
499        event.as_mut_log().insert("string", "thisisastring");
500        event.as_mut_log().insert("timestamp", ts());
501
502        let sink = create_sink(
503            "http://localhost:9999",
504            "my-token",
505            ProtocolVersion::V1,
506            "vector",
507            ["source_type", "host", "metric_type"].to_vec(),
508        );
509        let mut encoder = sink.build_encoder();
510
511        let bytes = encoder.encode_event(event).unwrap();
512        let string = std::str::from_utf8(&bytes).unwrap();
513
514        let line_protocol = split_line_protocol(string);
515        assert_eq!("vector", line_protocol.0);
516        assert_eq!(
517            "host=aws.cloud.eur,metric_type=logs,source_type=file",
518            line_protocol.1
519        );
520        assert_fields(
521            line_protocol.2.to_string(),
522            [
523                "int=4i",
524                "float=5.5",
525                "bool=true",
526                "string=\"thisisastring\"",
527                "message=\"hello\"",
528            ]
529            .to_vec(),
530        );
531
532        assert_eq!("1542182950000000011\n", line_protocol.3);
533    }
534
535    #[test]
536    fn test_encode_event() {
537        let mut event = Event::Log(LogEvent::from("hello"));
538        event.as_mut_log().insert("host", "aws.cloud.eur");
539        event.as_mut_log().insert("source_type", "file");
540
541        event.as_mut_log().insert("int", 4i32);
542        event.as_mut_log().insert("float", 5.5);
543        event.as_mut_log().insert("bool", true);
544        event.as_mut_log().insert("string", "thisisastring");
545        event.as_mut_log().insert("timestamp", ts());
546
547        let sink = create_sink(
548            "http://localhost:9999",
549            "my-token",
550            ProtocolVersion::V2,
551            "vector",
552            ["source_type", "host", "metric_type"].to_vec(),
553        );
554        let mut encoder = sink.build_encoder();
555
556        let bytes = encoder.encode_event(event).unwrap();
557        let string = std::str::from_utf8(&bytes).unwrap();
558
559        let line_protocol = split_line_protocol(string);
560        assert_eq!("vector", line_protocol.0);
561        assert_eq!(
562            "host=aws.cloud.eur,metric_type=logs,source_type=file",
563            line_protocol.1
564        );
565        assert_fields(
566            line_protocol.2.to_string(),
567            [
568                "int=4i",
569                "float=5.5",
570                "bool=true",
571                "string=\"thisisastring\"",
572                "message=\"hello\"",
573            ]
574            .to_vec(),
575        );
576
577        assert_eq!("1542182950000000011\n", line_protocol.3);
578    }
579
580    #[test]
581    fn test_encode_event_without_tags() {
582        let mut event = Event::Log(LogEvent::from("hello"));
583
584        event.as_mut_log().insert("value", 100);
585        event.as_mut_log().insert("timestamp", ts());
586
587        let mut sink = create_sink(
588            "http://localhost:9999",
589            "my-token",
590            ProtocolVersion::V2,
591            "vector",
592            [].to_vec(),
593        );
594        // exclude default metric_type tag so to emit empty tags
595        sink.transformer
596            .set_except_fields(Some(vec!["metric_type".into()]))
597            .unwrap();
598        let mut encoder = sink.build_encoder();
599
600        let bytes = encoder.encode_event(event).unwrap();
601        let line = std::str::from_utf8(&bytes).unwrap();
602        assert!(
603            line.starts_with("vector "),
604            "measurement (without tags) should ends with space ' '"
605        );
606
607        let line_protocol = split_line_protocol(line);
608        assert_eq!("vector", line_protocol.0);
609        assert_eq!("", line_protocol.1, "tags should be empty");
610        assert_fields(
611            line_protocol.2,
612            ["value=100i", "message=\"hello\""].to_vec(),
613        );
614
615        assert_eq!("1542182950000000011\n", line_protocol.3);
616    }
617
618    #[test]
619    fn test_encode_nested_fields() {
620        let mut event = LogEvent::default();
621
622        event.insert("a", 1);
623        event.insert("nested.field", "2");
624        event.insert("nested.bool", true);
625        event.insert("nested.array[0]", "example-value");
626        event.insert("nested.array[2]", "another-value");
627        event.insert("nested.array[3]", 15);
628
629        let sink = create_sink(
630            "http://localhost:9999",
631            "my-token",
632            ProtocolVersion::V2,
633            "vector",
634            ["metric_type"].to_vec(),
635        );
636        let mut encoder = sink.build_encoder();
637
638        let bytes = encoder.encode_event(event.into()).unwrap();
639        let string = std::str::from_utf8(&bytes).unwrap();
640
641        let line_protocol = split_line_protocol(string);
642        assert_eq!("vector", line_protocol.0);
643        assert_eq!("metric_type=logs", line_protocol.1);
644        assert_fields(
645            line_protocol.2,
646            [
647                "a=1i",
648                "nested.array[0]=\"example-value\"",
649                "nested.array[1]=\"<null>\"",
650                "nested.array[2]=\"another-value\"",
651                "nested.array[3]=15i",
652                "nested.bool=true",
653                "nested.field=\"2\"",
654            ]
655            .to_vec(),
656        );
657    }
658
659    #[test]
660    fn test_add_tag() {
661        let mut event = Event::Log(LogEvent::from("hello"));
662        event.as_mut_log().insert("source_type", "file");
663
664        event.as_mut_log().insert("as_a_tag", 10);
665        event.as_mut_log().insert("timestamp", ts());
666
667        let sink = create_sink(
668            "http://localhost:9999",
669            "my-token",
670            ProtocolVersion::V2,
671            "vector",
672            ["as_a_tag", "not_exists_field", "source_type", "metric_type"].to_vec(),
673        );
674        let mut encoder = sink.build_encoder();
675
676        let bytes = encoder.encode_event(event).unwrap();
677        let string = std::str::from_utf8(&bytes).unwrap();
678
679        let line_protocol = split_line_protocol(string);
680        assert_eq!("vector", line_protocol.0);
681        assert_eq!(
682            "as_a_tag=10,metric_type=logs,source_type=file",
683            line_protocol.1
684        );
685        assert_fields(line_protocol.2.to_string(), ["message=\"hello\""].to_vec());
686
687        assert_eq!("1542182950000000011\n", line_protocol.3);
688    }
689
690    #[tokio::test]
691    async fn smoke_v1() {
692        let rx = smoke_test(
693            r#"database = "my-database""#,
694            StatusCode::OK,
695            BatchStatus::Delivered,
696        )
697        .await;
698
699        let query = receive_response(rx).await;
700        assert!(query.contains("db=my-database"));
701        assert!(query.contains("precision=ns"));
702    }
703
704    #[tokio::test]
705    async fn smoke_v1_failure() {
706        smoke_test(
707            r#"database = "my-database""#,
708            StatusCode::BAD_REQUEST,
709            BatchStatus::Rejected,
710        )
711        .await;
712    }
713
714    #[tokio::test]
715    async fn smoke_v2() {
716        let rx = smoke_test(
717            indoc! {r#"
718            bucket = "my-bucket"
719            org = "my-org"
720            token = "my-token"
721        "#},
722            StatusCode::OK,
723            BatchStatus::Delivered,
724        )
725        .await;
726
727        let query = receive_response(rx).await;
728        assert!(query.contains("org=my-org"));
729        assert!(query.contains("bucket=my-bucket"));
730        assert!(query.contains("precision=ns"));
731    }
732
733    #[tokio::test]
734    async fn smoke_v2_failure() {
735        smoke_test(
736            indoc! {r#"
737            bucket = "my-bucket"
738            org = "my-org"
739            token = "my-token"
740        "#},
741            StatusCode::BAD_REQUEST,
742            BatchStatus::Rejected,
743        )
744        .await;
745    }
746
747    async fn smoke_test(
748        config: &str,
749        status_code: StatusCode,
750        batch_status: BatchStatus,
751    ) -> Receiver {
752        let config = format!(
753            indoc! {r#"
754            measurement = "vector"
755            endpoint = "http://localhost:9999"
756            {}
757        "#},
758            config
759        );
760        let (mut config, cx) = load_sink::<InfluxDbLogsConfig>(&config).unwrap();
761
762        // Make sure we can build the config
763        _ = config.build(cx.clone()).await.unwrap();
764
765        let addr = next_addr();
766        // Swap out the host so we can force send it
767        // to our local server
768        let host = format!("http://{addr}");
769        config.endpoint = host;
770
771        let (sink, _) = config.build(cx).await.unwrap();
772
773        let (rx, _trigger, server) = build_test_server_status(addr, status_code);
774        tokio::spawn(server);
775
776        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
777
778        let lines = std::iter::repeat(())
779            .map(move |_| "message_value")
780            .take(5)
781            .collect::<Vec<_>>();
782        let mut events = Vec::new();
783
784        // Create 5 events with custom field
785        for (i, line) in lines.iter().enumerate() {
786            let mut event = LogEvent::from(line.to_string()).with_batch_notifier(&batch);
787            event.insert(format!("key{i}").as_str(), format!("value{i}"));
788
789            let timestamp = Utc
790                .with_ymd_and_hms(1970, 1, 1, 0, 0, (i as u32) + 1)
791                .single()
792                .expect("invalid timestamp");
793            event.insert("timestamp", timestamp);
794            event.insert("source_type", "file");
795
796            events.push(Event::Log(event));
797        }
798        drop(batch);
799
800        if batch_status == BatchStatus::Delivered {
801            run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
802        } else {
803            run_and_assert_sink_error(sink, stream::iter(events), &COMPONENT_ERROR_TAGS).await;
804        }
805
806        assert_eq!(receiver.try_recv(), Ok(batch_status));
807
808        rx
809    }
810
811    async fn receive_response(mut rx: Receiver) -> String {
812        let output = rx.next().await.unwrap();
813
814        let request = &output.0;
815        let query = request.uri.query().unwrap();
816
817        let body = std::str::from_utf8(&output.1[..]).unwrap();
818        let mut lines = body.lines();
819
820        assert_eq!(5, lines.clone().count());
821        assert_line_protocol(0, lines.next());
822
823        query.into()
824    }
825
826    fn assert_line_protocol(i: i64, value: Option<&str>) {
827        //vector,metric_type=logs key0="value0",message="message_value" 1000000000
828        let line_protocol = split_line_protocol(value.unwrap());
829        assert_eq!("vector", line_protocol.0);
830        assert_eq!("metric_type=logs,source_type=file", line_protocol.1);
831        assert_fields(
832            line_protocol.2.to_string(),
833            [
834                &*format!("key{i}=\"value{i}\""),
835                "message=\"message_value\"",
836            ]
837            .to_vec(),
838        );
839
840        assert_eq!(((i + 1) * 1000000000).to_string(), line_protocol.3);
841    }
842
843    fn create_sink(
844        uri: &str,
845        token: &str,
846        protocol_version: ProtocolVersion,
847        measurement: &str,
848        tags: Vec<&str>,
849    ) -> InfluxDbLogsSink {
850        let uri = uri.parse::<Uri>().unwrap();
851        let token = token.to_string();
852        let measurement = measurement.to_string();
853        let tags: HashSet<_> = tags.into_iter().map(|tag| tag.into()).collect();
854        InfluxDbLogsSink {
855            uri,
856            token,
857            protocol_version,
858            measurement,
859            tags,
860            transformer: Default::default(),
861            host_key: owned_value_path!("host"),
862            message_key: owned_value_path!("message"),
863            source_type_key: owned_value_path!("source_type"),
864        }
865    }
866}
867
868#[cfg(feature = "influxdb-integration-tests")]
869#[cfg(test)]
870mod integration_tests {
871    use std::sync::Arc;
872
873    use chrono::Utc;
874    use futures::stream;
875    use vrl::value;
876
877    use vector_lib::codecs::BytesDeserializerConfig;
878    use vector_lib::config::{LegacyKey, LogNamespace};
879    use vector_lib::event::{BatchNotifier, BatchStatus, Event, LogEvent};
880    use vector_lib::lookup::{owned_value_path, path};
881
882    use crate::{
883        config::SinkContext,
884        sinks::influxdb::{
885            logs::InfluxDbLogsConfig,
886            test_util::{address_v2, onboarding_v2, BUCKET, ORG, TOKEN},
887            InfluxDb2Settings,
888        },
889        test_util::components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS},
890    };
891
892    use super::*;
893
894    #[tokio::test]
895    async fn influxdb2_logs_put_data() {
896        let endpoint = address_v2();
897        onboarding_v2(&endpoint).await;
898
899        let now = Utc::now();
900        let measure = format!(
901            "vector-{}",
902            now.timestamp_nanos_opt().expect("Timestamp out of range")
903        );
904
905        let cx = SinkContext::default();
906
907        let config = InfluxDbLogsConfig {
908            namespace: None,
909            measurement: Some(measure.clone()),
910            endpoint: endpoint.clone(),
911            tags: Default::default(),
912            influxdb1_settings: None,
913            influxdb2_settings: Some(InfluxDb2Settings {
914                org: ORG.to_string(),
915                bucket: BUCKET.to_string(),
916                token: TOKEN.to_string().into(),
917            }),
918            encoding: Default::default(),
919            batch: Default::default(),
920            request: Default::default(),
921            tls: None,
922            acknowledgements: Default::default(),
923            host_key: None,
924            message_key: None,
925            source_type_key: None,
926        };
927
928        let (sink, _) = config.build(cx).await.unwrap();
929
930        let (batch, mut receiver) = BatchNotifier::new_with_receiver();
931
932        let mut event1 = LogEvent::from("message_1").with_batch_notifier(&batch);
933        event1.insert("host", "aws.cloud.eur");
934        event1.insert("source_type", "file");
935
936        let mut event2 = LogEvent::from("message_2").with_batch_notifier(&batch);
937        event2.insert("host", "aws.cloud.eur");
938        event2.insert("source_type", "file");
939
940        let mut namespaced_log =
941            LogEvent::from(value!("namespaced message")).with_batch_notifier(&batch);
942        LogNamespace::Vector.insert_source_metadata(
943            "file",
944            &mut namespaced_log,
945            Some(LegacyKey::Overwrite(path!("host"))),
946            path!("host"),
947            "aws.cloud.eur",
948        );
949        LogNamespace::Vector.insert_standard_vector_source_metadata(
950            &mut namespaced_log,
951            "file",
952            now,
953        );
954        let schema = BytesDeserializerConfig
955            .schema_definition(LogNamespace::Vector)
956            .with_metadata_field(
957                &owned_value_path!("file", "host"),
958                Kind::bytes(),
959                Some("host"),
960            );
961        namespaced_log
962            .metadata_mut()
963            .set_schema_definition(&Arc::new(schema));
964
965        drop(batch);
966
967        let events = vec![
968            Event::Log(event1),
969            Event::Log(event2),
970            Event::Log(namespaced_log),
971        ];
972
973        run_and_assert_sink_compliance(sink, stream::iter(events), &HTTP_SINK_TAGS).await;
974
975        assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
976
977        let mut body = std::collections::HashMap::new();
978        body.insert("query", format!("from(bucket:\"my-bucket\") |> range(start: 0) |> filter(fn: (r) => r._measurement == \"{}\")", measure.clone()));
979        body.insert("type", "flux".to_owned());
980
981        let client = reqwest::Client::builder()
982            .danger_accept_invalid_certs(true)
983            .build()
984            .unwrap();
985
986        let res = client
987            .post(format!("{endpoint}/api/v2/query?org=my-org"))
988            .json(&body)
989            .header("accept", "application/json")
990            .header("Authorization", "Token my-token")
991            .send()
992            .await
993            .unwrap();
994        let string = res.text().await.unwrap();
995
996        let lines = string.split('\n').collect::<Vec<&str>>();
997        let header = lines[0].split(',').collect::<Vec<&str>>();
998        let record1 = lines[1].split(',').collect::<Vec<&str>>();
999        let record2 = lines[2].split(',').collect::<Vec<&str>>();
1000        let record_ns = lines[3].split(',').collect::<Vec<&str>>();
1001
1002        // measurement
1003        assert_eq!(
1004            record1[header
1005                .iter()
1006                .position(|&r| r.trim() == "_measurement")
1007                .unwrap()]
1008            .trim(),
1009            measure.clone()
1010        );
1011        assert_eq!(
1012            record2[header
1013                .iter()
1014                .position(|&r| r.trim() == "_measurement")
1015                .unwrap()]
1016            .trim(),
1017            measure.clone()
1018        );
1019        assert_eq!(
1020            record_ns[header
1021                .iter()
1022                .position(|&r| r.trim() == "_measurement")
1023                .unwrap()]
1024            .trim(),
1025            measure.clone()
1026        );
1027
1028        // tags
1029        assert_eq!(
1030            record1[header
1031                .iter()
1032                .position(|&r| r.trim() == "metric_type")
1033                .unwrap()]
1034            .trim(),
1035            "logs"
1036        );
1037        assert_eq!(
1038            record2[header
1039                .iter()
1040                .position(|&r| r.trim() == "metric_type")
1041                .unwrap()]
1042            .trim(),
1043            "logs"
1044        );
1045        assert_eq!(
1046            record_ns[header
1047                .iter()
1048                .position(|&r| r.trim() == "metric_type")
1049                .unwrap()]
1050            .trim(),
1051            "logs"
1052        );
1053        assert_eq!(
1054            record1[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
1055            "aws.cloud.eur"
1056        );
1057        assert_eq!(
1058            record2[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
1059            "aws.cloud.eur"
1060        );
1061        assert_eq!(
1062            record_ns[header.iter().position(|&r| r.trim() == "host").unwrap()].trim(),
1063            "aws.cloud.eur"
1064        );
1065        assert_eq!(
1066            record1[header
1067                .iter()
1068                .position(|&r| r.trim() == "source_type")
1069                .unwrap()]
1070            .trim(),
1071            "file"
1072        );
1073        assert_eq!(
1074            record2[header
1075                .iter()
1076                .position(|&r| r.trim() == "source_type")
1077                .unwrap()]
1078            .trim(),
1079            "file"
1080        );
1081        assert_eq!(
1082            record_ns[header
1083                .iter()
1084                .position(|&r| r.trim() == "source_type")
1085                .unwrap()]
1086            .trim(),
1087            "file"
1088        );
1089
1090        // field
1091        assert_eq!(
1092            record1[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1093            "message"
1094        );
1095        assert_eq!(
1096            record2[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1097            "message"
1098        );
1099        assert_eq!(
1100            record_ns[header.iter().position(|&r| r.trim() == "_field").unwrap()].trim(),
1101            "message"
1102        );
1103        assert_eq!(
1104            record1[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1105            "message_1"
1106        );
1107        assert_eq!(
1108            record2[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1109            "message_2"
1110        );
1111        assert_eq!(
1112            record_ns[header.iter().position(|&r| r.trim() == "_value").unwrap()].trim(),
1113            "namespaced message"
1114        );
1115    }
1116}