vector/sinks/greptimedb/metrics/
config.rs

1use crate::sinks::{
2    greptimedb::{
3        default_dbname,
4        metrics::{
5            request::GreptimeDBGrpcRetryLogic,
6            request_builder::RequestBuilderOptions,
7            service::{healthcheck, GreptimeDBGrpcService},
8            sink,
9        },
10        GreptimeDBDefaultBatchSettings,
11    },
12    prelude::*,
13};
14use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
15
16/// Configuration for the `greptimedb` sink.
17#[configurable_component(sink("greptimedb", "Ingest metrics data into GreptimeDB."))]
18#[configurable(metadata(
19    deprecated = "The `greptimedb` sink has been renamed. Please use `greptimedb_metrics` instead."
20))]
21#[derive(Clone, Debug, Derivative)]
22pub struct GreptimeDBConfig(GreptimeDBMetricsConfig);
23
24impl GenerateConfig for GreptimeDBConfig {
25    fn generate_config() -> toml::Value {
26        <GreptimeDBMetricsConfig as GenerateConfig>::generate_config()
27    }
28}
29
30#[async_trait::async_trait]
31#[typetag::serde(name = "greptimedb")]
32impl SinkConfig for GreptimeDBConfig {
33    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
34        warn!("DEPRECATED: The `greptimedb` sink has been renamed. Please use `greptimedb_metrics` instead.");
35        self.0.build(cx).await
36    }
37
38    fn input(&self) -> Input {
39        self.0.input()
40    }
41
42    fn acknowledgements(&self) -> &AcknowledgementsConfig {
43        self.0.acknowledgements()
44    }
45}
46
47/// Configuration items for GreptimeDB
48#[configurable_component(sink("greptimedb_metrics", "Ingest metrics data into GreptimeDB."))]
49#[derive(Clone, Debug, Derivative)]
50#[derivative(Default)]
51#[serde(deny_unknown_fields)]
52pub struct GreptimeDBMetricsConfig {
53    /// The [GreptimeDB database][database] name to connect.
54    ///
55    /// Default to `public`, the default database of GreptimeDB.
56    ///
57    /// Database can be created via `create database` statement on
58    /// GreptimeDB. If you are using GreptimeCloud, use `dbname` from the
59    /// connection information of your instance.
60    ///
61    /// [database]: https://docs.greptime.com/user-guide/concepts/key-concepts#database
62    #[configurable(metadata(docs::examples = "public"))]
63    #[derivative(Default(value = "default_dbname()"))]
64    #[serde(default = "default_dbname")]
65    pub dbname: String,
66    /// The host and port of GreptimeDB gRPC service.
67    ///
68    /// This sink uses GreptimeDB's gRPC interface for data ingestion. By
69    /// default, GreptimeDB listens to port 4001 for gRPC protocol.
70    ///
71    /// The address _must_ include a port.
72    #[configurable(metadata(docs::examples = "example.com:4001"))]
73    pub endpoint: String,
74    /// The username for your GreptimeDB instance.
75    ///
76    /// This is required if your instance has authentication enabled.
77    #[configurable(metadata(docs::examples = "username"))]
78    #[serde(default)]
79    pub username: Option<String>,
80    /// The password for your GreptimeDB instance.
81    ///
82    /// This is required if your instance has authentication enabled.
83    #[configurable(metadata(docs::examples = "password"))]
84    #[serde(default)]
85    pub password: Option<SensitiveString>,
86    /// Set gRPC compression encoding for the request
87    /// Default to none, `gzip` or `zstd` is supported.
88    #[configurable(metadata(docs::examples = "gzip"))]
89    #[serde(default)]
90    pub grpc_compression: Option<String>,
91
92    #[configurable(derived)]
93    #[serde(default)]
94    pub request: TowerRequestConfig,
95
96    #[configurable(derived)]
97    #[serde(default)]
98    pub(crate) batch: BatchConfig<GreptimeDBDefaultBatchSettings>,
99
100    #[configurable(derived)]
101    #[serde(
102        default,
103        deserialize_with = "crate::serde::bool_or_struct",
104        skip_serializing_if = "crate::serde::is_default"
105    )]
106    pub acknowledgements: AcknowledgementsConfig,
107
108    #[configurable(derived)]
109    pub tls: Option<TlsConfig>,
110
111    /// Use Greptime's prefixed naming for time index and value columns.
112    ///
113    /// This is to keep consistency with GreptimeDB's naming pattern. By
114    /// default, this sink will use `val` for value column name, and `ts` for
115    /// time index name. When turned on, `greptime_value` and
116    /// `greptime_timestamp` will be used for these names.
117    ///
118    /// If you are using this Vector sink together with other data ingestion
119    /// sources of GreptimeDB, like Prometheus Remote Write and Influxdb Line
120    /// Protocol, it is highly recommended to turn on this.
121    ///
122    /// Also if there is a tag name conflict from your data source, for
123    /// example, you have a tag named as `val` or `ts`, you need to turn on
124    /// this option to avoid the conflict.
125    ///
126    /// Default to `false` for compatibility.
127    #[configurable]
128    pub new_naming: Option<bool>,
129}
130
131impl_generate_config_from_default!(GreptimeDBMetricsConfig);
132
133#[typetag::serde(name = "greptimedb_metrics")]
134#[async_trait::async_trait]
135impl SinkConfig for GreptimeDBMetricsConfig {
136    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
137        let request_settings = self.request.into_settings();
138        let service = ServiceBuilder::new()
139            .settings(request_settings, GreptimeDBGrpcRetryLogic)
140            .service(GreptimeDBGrpcService::try_new(self)?);
141        let sink = sink::GreptimeDBGrpcSink {
142            service,
143            batch_settings: self.batch.into_batcher_settings()?,
144            request_builder_options: RequestBuilderOptions {
145                use_new_naming: self.new_naming.unwrap_or(false),
146            },
147        };
148
149        let healthcheck = healthcheck(self)?;
150        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
151    }
152
153    fn input(&self) -> Input {
154        Input::metric()
155    }
156
157    fn acknowledgements(&self) -> &AcknowledgementsConfig {
158        &self.acknowledgements
159    }
160}
161
162#[cfg(test)]
163mod tests {
164    use indoc::indoc;
165
166    use super::*;
167
168    #[test]
169    fn generate_config() {
170        crate::test_util::test_generate_config::<GreptimeDBMetricsConfig>();
171    }
172
173    #[test]
174    fn test_config_with_username() {
175        let config = indoc! {r#"
176            endpoint = "foo-bar.ap-southeast-1.aws.greptime.cloud:4001"
177            dbname = "foo-bar"
178        "#};
179
180        toml::from_str::<GreptimeDBMetricsConfig>(config).unwrap();
181    }
182}