vector/sinks/greptimedb/logs/
config.rs

1use std::collections::HashMap;
2
3use vector_lib::{
4    codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
5    configurable::configurable_component,
6    sensitive_string::SensitiveString,
7};
8
9use crate::{
10    http::{Auth, HttpClient},
11    sinks::{
12        greptimedb::{
13            GreptimeDBDefaultBatchSettings, default_dbname_template, default_pipeline_template,
14            logs::{
15                http_request_builder::{
16                    GreptimeDBHttpRetryLogic, GreptimeDBLogsHttpRequestBuilder, PartitionKey,
17                    http_healthcheck,
18                },
19                sink::{GreptimeDBLogsHttpSink, LogsSinkSetting},
20            },
21        },
22        prelude::*,
23        util::http::HttpService,
24    },
25};
26
27fn extra_params_examples() -> HashMap<String, String> {
28    HashMap::<_, _>::from_iter([("source".to_owned(), "vector".to_owned())])
29}
30
31/// Configuration for the `greptimedb_logs` sink.
32#[configurable_component(sink("greptimedb_logs", "Ingest logs data into GreptimeDB."))]
33#[derive(Clone, Debug, Default, Derivative)]
34#[serde(deny_unknown_fields)]
35pub struct GreptimeDBLogsConfig {
36    /// The endpoint of the GreptimeDB server.
37    #[serde(alias = "host")]
38    #[configurable(metadata(docs::examples = "http://localhost:4000"))]
39    pub endpoint: String,
40
41    /// The table that data is inserted into.
42    #[configurable(metadata(docs::examples = "mytable"))]
43    pub table: Template,
44
45    /// The [GreptimeDB database][database] name to connect.
46    ///
47    /// Default to `public`, the default database of GreptimeDB.
48    ///
49    /// Database can be created via `create database` statement on
50    /// GreptimeDB. If you are using GreptimeCloud, use `dbname` from the
51    /// connection information of your instance.
52    ///
53    /// [database]: https://docs.greptime.com/user-guide/concepts/key-concepts#database
54    #[configurable(metadata(docs::examples = "public"))]
55    #[derivative(Default(value = "default_dbname_template()"))]
56    #[serde(default = "default_dbname_template")]
57    pub dbname: Template,
58
59    /// Pipeline name to be used for the logs.
60    ///
61    /// Default to `greptime_identity`, use the original log structure
62    #[configurable(metadata(docs::examples = "pipeline_name"))]
63    #[derivative(Default(value = "default_pipeline_template()"))]
64    #[serde(default = "default_pipeline_template")]
65    pub pipeline_name: Template,
66
67    /// Pipeline version to be used for the logs.
68    #[configurable(metadata(docs::examples = "2024-06-07 06:46:23.858293"))]
69    pub pipeline_version: Option<Template>,
70
71    /// The username for your GreptimeDB instance.
72    ///
73    /// This is required if your instance has authentication enabled.
74    #[configurable(metadata(docs::examples = "username"))]
75    #[serde(default)]
76    pub username: Option<String>,
77    /// The password for your GreptimeDB instance.
78    ///
79    /// This is required if your instance has authentication enabled.
80    #[configurable(metadata(docs::examples = "password"))]
81    #[serde(default)]
82    pub password: Option<SensitiveString>,
83    /// Set http compression encoding for the request
84    /// Default to none, `gzip` or `zstd` is supported.
85    #[configurable(derived)]
86    #[serde(default = "Compression::gzip_default")]
87    pub compression: Compression,
88
89    #[configurable(derived)]
90    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
91    pub encoding: Transformer,
92
93    /// Custom parameters to add to the query string for each HTTP request sent to GreptimeDB.
94    #[serde(default)]
95    #[configurable(metadata(docs::advanced))]
96    #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
97    #[configurable(metadata(docs::examples = "extra_params_examples()"))]
98    pub extra_params: Option<HashMap<String, String>>,
99
100    /// Custom headers to add to the HTTP request sent to GreptimeDB.
101    /// Note that these headers will override the existing headers.
102    #[serde(default)]
103    #[configurable(metadata(docs::advanced))]
104    #[configurable(metadata(
105        docs::additional_props_description = "Extra header key-value pairs."
106    ))]
107    pub extra_headers: Option<HashMap<String, String>>,
108
109    #[configurable(derived)]
110    #[serde(default)]
111    pub(crate) batch: BatchConfig<GreptimeDBDefaultBatchSettings>,
112
113    #[configurable(derived)]
114    #[serde(default)]
115    pub request: TowerRequestConfig,
116
117    #[configurable(derived)]
118    pub tls: Option<TlsConfig>,
119
120    #[configurable(derived)]
121    #[serde(
122        default,
123        deserialize_with = "crate::serde::bool_or_struct",
124        skip_serializing_if = "crate::serde::is_default"
125    )]
126    pub acknowledgements: AcknowledgementsConfig,
127}
128
129impl_generate_config_from_default!(GreptimeDBLogsConfig);
130
131#[async_trait::async_trait]
132#[typetag::serde(name = "greptimedb_logs")]
133impl SinkConfig for GreptimeDBLogsConfig {
134    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
135        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
136        let client = HttpClient::new(tls_settings, &cx.proxy)?;
137
138        let auth = match (self.username.clone(), self.password.clone()) {
139            (Some(username), Some(password)) => Some(Auth::Basic {
140                user: username,
141                password,
142            }),
143            _ => None,
144        };
145        let request_builder = GreptimeDBLogsHttpRequestBuilder {
146            endpoint: self.endpoint.clone(),
147            auth: auth.clone(),
148            encoder: (
149                self.encoding.clone(),
150                Encoder::<Framer>::new(
151                    NewlineDelimitedEncoderConfig.build().into(),
152                    JsonSerializerConfig::default().build().into(),
153                ),
154            ),
155            compression: self.compression,
156            extra_params: self.extra_params.clone(),
157            extra_headers: self.extra_headers.clone(),
158        };
159
160        let service: HttpService<GreptimeDBLogsHttpRequestBuilder, PartitionKey> =
161            HttpService::new(client.clone(), request_builder.clone());
162
163        let request_limits = self.request.into_settings();
164
165        let service = ServiceBuilder::new()
166            .settings(request_limits, GreptimeDBHttpRetryLogic::default())
167            .service(service);
168
169        let logs_sink_setting = LogsSinkSetting {
170            dbname: self.dbname.clone(),
171            table: self.table.clone(),
172            pipeline_name: self.pipeline_name.clone(),
173            pipeline_version: self.pipeline_version.clone(),
174        };
175
176        let sink = GreptimeDBLogsHttpSink::new(
177            self.batch.into_batcher_settings()?,
178            service,
179            request_builder,
180            logs_sink_setting,
181        );
182
183        let healthcheck = Box::pin(http_healthcheck(
184            client,
185            self.endpoint.clone(),
186            auth.clone(),
187        ));
188        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
189    }
190
191    fn input(&self) -> Input {
192        Input::log()
193    }
194
195    fn acknowledgements(&self) -> &AcknowledgementsConfig {
196        &self.acknowledgements
197    }
198}