vector/sinks/greptimedb/metrics/
config.rs

1use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
2
3use crate::sinks::{
4    greptimedb::{
5        GreptimeDBDefaultBatchSettings, default_dbname,
6        metrics::{
7            request::GreptimeDBGrpcRetryLogic,
8            request_builder::RequestBuilderOptions,
9            service::{GreptimeDBGrpcService, healthcheck},
10            sink,
11        },
12    },
13    prelude::*,
14};
15
16/// Configuration for the `greptimedb` sink.
17#[configurable_component(sink("greptimedb", "Ingest metrics data into GreptimeDB."))]
18#[configurable(metadata(
19    deprecated = "The `greptimedb` sink has been renamed. Please use `greptimedb_metrics` instead."
20))]
21#[derive(Clone, Debug, Derivative)]
22pub struct GreptimeDBConfig(GreptimeDBMetricsConfig);
23
24impl GenerateConfig for GreptimeDBConfig {
25    fn generate_config() -> toml::Value {
26        <GreptimeDBMetricsConfig as GenerateConfig>::generate_config()
27    }
28}
29
30#[async_trait::async_trait]
31#[typetag::serde(name = "greptimedb")]
32impl SinkConfig for GreptimeDBConfig {
33    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
34        warn!(
35            "DEPRECATED: The `greptimedb` sink has been renamed. Please use `greptimedb_metrics` instead."
36        );
37        self.0.build(cx).await
38    }
39
40    fn input(&self) -> Input {
41        self.0.input()
42    }
43
44    fn acknowledgements(&self) -> &AcknowledgementsConfig {
45        self.0.acknowledgements()
46    }
47}
48
49/// Configuration items for GreptimeDB
50#[configurable_component(sink("greptimedb_metrics", "Ingest metrics data into GreptimeDB."))]
51#[derive(Clone, Debug, Derivative)]
52#[derivative(Default)]
53#[serde(deny_unknown_fields)]
54pub struct GreptimeDBMetricsConfig {
55    /// The [GreptimeDB database][database] name to connect.
56    ///
57    /// Default to `public`, the default database of GreptimeDB.
58    ///
59    /// Database can be created via `create database` statement on
60    /// GreptimeDB. If you are using GreptimeCloud, use `dbname` from the
61    /// connection information of your instance.
62    ///
63    /// [database]: https://docs.greptime.com/user-guide/concepts/key-concepts#database
64    #[configurable(metadata(docs::examples = "public"))]
65    #[derivative(Default(value = "default_dbname()"))]
66    #[serde(default = "default_dbname")]
67    pub dbname: String,
68    /// The host and port of GreptimeDB gRPC service.
69    ///
70    /// This sink uses GreptimeDB's gRPC interface for data ingestion. By
71    /// default, GreptimeDB listens to port 4001 for gRPC protocol.
72    ///
73    /// The address _must_ include a port.
74    #[configurable(metadata(docs::examples = "example.com:4001"))]
75    pub endpoint: String,
76    /// The username for your GreptimeDB instance.
77    ///
78    /// This is required if your instance has authentication enabled.
79    #[configurable(metadata(docs::examples = "username"))]
80    #[serde(default)]
81    pub username: Option<String>,
82    /// The password for your GreptimeDB instance.
83    ///
84    /// This is required if your instance has authentication enabled.
85    #[configurable(metadata(docs::examples = "password"))]
86    #[serde(default)]
87    pub password: Option<SensitiveString>,
88    /// Set gRPC compression encoding for the request
89    /// Default to none, `gzip` or `zstd` is supported.
90    #[configurable(metadata(docs::examples = "gzip"))]
91    #[serde(default)]
92    pub grpc_compression: Option<String>,
93
94    #[configurable(derived)]
95    #[serde(default)]
96    pub request: TowerRequestConfig,
97
98    #[configurable(derived)]
99    #[serde(default)]
100    pub(crate) batch: BatchConfig<GreptimeDBDefaultBatchSettings>,
101
102    #[configurable(derived)]
103    #[serde(
104        default,
105        deserialize_with = "crate::serde::bool_or_struct",
106        skip_serializing_if = "crate::serde::is_default"
107    )]
108    pub acknowledgements: AcknowledgementsConfig,
109
110    #[configurable(derived)]
111    pub tls: Option<TlsConfig>,
112
113    /// Use Greptime's prefixed naming for time index and value columns.
114    ///
115    /// This is to keep consistency with GreptimeDB's naming pattern. By
116    /// default, this sink will use `val` for value column name, and `ts` for
117    /// time index name. When turned on, `greptime_value` and
118    /// `greptime_timestamp` will be used for these names.
119    ///
120    /// If you are using this Vector sink together with other data ingestion
121    /// sources of GreptimeDB, like Prometheus Remote Write and Influxdb Line
122    /// Protocol, it is highly recommended to turn on this.
123    ///
124    /// Also if there is a tag name conflict from your data source, for
125    /// example, you have a tag named as `val` or `ts`, you need to turn on
126    /// this option to avoid the conflict.
127    ///
128    /// Default to `false` for compatibility.
129    #[configurable]
130    pub new_naming: Option<bool>,
131}
132
133impl_generate_config_from_default!(GreptimeDBMetricsConfig);
134
135#[typetag::serde(name = "greptimedb_metrics")]
136#[async_trait::async_trait]
137impl SinkConfig for GreptimeDBMetricsConfig {
138    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
139        let request_settings = self.request.into_settings();
140        let service = ServiceBuilder::new()
141            .settings(request_settings, GreptimeDBGrpcRetryLogic)
142            .service(GreptimeDBGrpcService::try_new(self)?);
143        let sink = sink::GreptimeDBGrpcSink {
144            service,
145            batch_settings: self.batch.into_batcher_settings()?,
146            request_builder_options: RequestBuilderOptions {
147                use_new_naming: self.new_naming.unwrap_or(false),
148            },
149        };
150
151        let healthcheck = healthcheck(self)?;
152        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
153    }
154
155    fn input(&self) -> Input {
156        Input::metric()
157    }
158
159    fn acknowledgements(&self) -> &AcknowledgementsConfig {
160        &self.acknowledgements
161    }
162}
163
164#[cfg(test)]
165mod tests {
166    use indoc::indoc;
167
168    use super::*;
169
170    #[test]
171    fn generate_config() {
172        crate::test_util::test_generate_config::<GreptimeDBMetricsConfig>();
173    }
174
175    #[test]
176    fn test_config_with_username() {
177        let config = indoc! {r#"
178            endpoint = "foo-bar.ap-southeast-1.aws.greptime.cloud:4001"
179            dbname = "foo-bar"
180        "#};
181
182        toml::from_str::<GreptimeDBMetricsConfig>(config).unwrap();
183    }
184}