use crate::sinks::{
greptimedb::{
default_dbname,
metrics::{
request::GreptimeDBGrpcRetryLogic,
request_builder::RequestBuilderOptions,
service::{healthcheck, GreptimeDBGrpcService},
sink,
},
GreptimeDBDefaultBatchSettings,
},
prelude::*,
};
use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
#[configurable_component(sink("greptimedb", "Ingest metrics data into GreptimeDB."))]
#[configurable(metadata(
deprecated = "The `greptimedb` sink has been renamed. Please use `greptimedb_metrics` instead."
))]
#[derive(Clone, Debug, Derivative)]
pub struct GreptimeDBConfig(GreptimeDBMetricsConfig);
impl GenerateConfig for GreptimeDBConfig {
fn generate_config() -> toml::Value {
<GreptimeDBMetricsConfig as GenerateConfig>::generate_config()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "greptimedb")]
impl SinkConfig for GreptimeDBConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
warn!("DEPRECATED: The `greptimedb` sink has been renamed. Please use `greptimedb_metrics` instead.");
self.0.build(cx).await
}
fn input(&self) -> Input {
self.0.input()
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
self.0.acknowledgements()
}
}
#[configurable_component(sink("greptimedb_metrics", "Ingest metrics data into GreptimeDB."))]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(deny_unknown_fields)]
pub struct GreptimeDBMetricsConfig {
#[configurable(metadata(docs::examples = "public"))]
#[derivative(Default(value = "default_dbname()"))]
#[serde(default = "default_dbname")]
pub dbname: String,
#[configurable(metadata(docs::examples = "example.com:4001"))]
pub endpoint: String,
#[configurable(metadata(docs::examples = "username"))]
#[serde(default)]
pub username: Option<String>,
#[configurable(metadata(docs::examples = "password"))]
#[serde(default)]
pub password: Option<SensitiveString>,
#[configurable(metadata(docs::examples = "gzip"))]
#[serde(default)]
pub grpc_compression: Option<String>,
#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig,
#[configurable(derived)]
#[serde(default)]
pub(crate) batch: BatchConfig<GreptimeDBDefaultBatchSettings>,
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
pub acknowledgements: AcknowledgementsConfig,
#[configurable(derived)]
pub tls: Option<TlsConfig>,
#[configurable]
pub new_naming: Option<bool>,
}
impl_generate_config_from_default!(GreptimeDBMetricsConfig);
#[typetag::serde(name = "greptimedb_metrics")]
#[async_trait::async_trait]
impl SinkConfig for GreptimeDBMetricsConfig {
async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let request_settings = self.request.into_settings();
let service = ServiceBuilder::new()
.settings(request_settings, GreptimeDBGrpcRetryLogic)
.service(GreptimeDBGrpcService::try_new(self)?);
let sink = sink::GreptimeDBGrpcSink {
service,
batch_settings: self.batch.into_batcher_settings()?,
request_builder_options: RequestBuilderOptions {
use_new_naming: self.new_naming.unwrap_or(false),
},
};
let healthcheck = healthcheck(self)?;
Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}
fn input(&self) -> Input {
Input::metric()
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}
#[cfg(test)]
mod tests {
use indoc::indoc;
use super::*;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<GreptimeDBMetricsConfig>();
}
#[test]
fn test_config_with_username() {
let config = indoc! {r#"
endpoint = "foo-bar.ap-southeast-1.aws.greptime.cloud:4001"
dbname = "foo-bar"
"#};
toml::from_str::<GreptimeDBMetricsConfig>(config).unwrap();
}
}