vector/sinks/greptimedb/logs/
config.rs1use std::collections::HashMap;
2
3use vector_lib::{
4    codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
5    configurable::configurable_component,
6    sensitive_string::SensitiveString,
7};
8
9use crate::{
10    http::{Auth, HttpClient},
11    sinks::{
12        greptimedb::{
13            GreptimeDBDefaultBatchSettings, default_dbname_template, default_pipeline_template,
14            logs::{
15                http_request_builder::{
16                    GreptimeDBHttpRetryLogic, GreptimeDBLogsHttpRequestBuilder, PartitionKey,
17                    http_healthcheck,
18                },
19                sink::{GreptimeDBLogsHttpSink, LogsSinkSetting},
20            },
21        },
22        prelude::*,
23        util::http::HttpService,
24    },
25};
26
27fn extra_params_examples() -> HashMap<String, String> {
28    HashMap::<_, _>::from_iter([("source".to_owned(), "vector".to_owned())])
29}
30
31fn extra_headers_examples() -> HashMap<String, String> {
32    HashMap::new()
33}
34
35#[configurable_component(sink("greptimedb_logs", "Ingest logs data into GreptimeDB."))]
37#[derive(Clone, Debug, Default, Derivative)]
38#[serde(deny_unknown_fields)]
39pub struct GreptimeDBLogsConfig {
40    #[serde(alias = "host")]
42    #[configurable(metadata(docs::examples = "http://localhost:4000"))]
43    pub endpoint: String,
44
45    #[configurable(metadata(docs::examples = "mytable"))]
47    pub table: Template,
48
49    #[configurable(metadata(docs::examples = "public"))]
59    #[derivative(Default(value = "default_dbname_template()"))]
60    #[serde(default = "default_dbname_template")]
61    pub dbname: Template,
62
63    #[configurable(metadata(docs::examples = "pipeline_name"))]
67    #[derivative(Default(value = "default_pipeline_template()"))]
68    #[serde(default = "default_pipeline_template")]
69    pub pipeline_name: Template,
70
71    #[configurable(metadata(docs::examples = "2024-06-07 06:46:23.858293"))]
73    pub pipeline_version: Option<Template>,
74
75    #[configurable(metadata(docs::examples = "username"))]
79    #[serde(default)]
80    pub username: Option<String>,
81    #[configurable(metadata(docs::examples = "password"))]
85    #[serde(default)]
86    pub password: Option<SensitiveString>,
87    #[configurable(derived)]
90    #[serde(default = "Compression::gzip_default")]
91    pub compression: Compression,
92
93    #[configurable(derived)]
94    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
95    pub encoding: Transformer,
96
97    #[serde(default)]
99    #[configurable(metadata(docs::advanced))]
100    #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
101    #[configurable(metadata(docs::examples = "extra_params_examples()"))]
102    pub extra_params: Option<HashMap<String, String>>,
103
104    #[serde(default)]
107    #[configurable(metadata(docs::advanced))]
108    #[configurable(metadata(
109        docs::additional_props_description = "Extra header key-value pairs."
110    ))]
111    #[configurable(metadata(docs::examples = "extra_headers_examples()"))]
112    pub extra_headers: Option<HashMap<String, String>>,
113
114    #[configurable(derived)]
115    #[serde(default)]
116    pub(crate) batch: BatchConfig<GreptimeDBDefaultBatchSettings>,
117
118    #[configurable(derived)]
119    #[serde(default)]
120    pub request: TowerRequestConfig,
121
122    #[configurable(derived)]
123    pub tls: Option<TlsConfig>,
124
125    #[configurable(derived)]
126    #[serde(
127        default,
128        deserialize_with = "crate::serde::bool_or_struct",
129        skip_serializing_if = "crate::serde::is_default"
130    )]
131    pub acknowledgements: AcknowledgementsConfig,
132}
133
134impl_generate_config_from_default!(GreptimeDBLogsConfig);
135
136#[async_trait::async_trait]
137#[typetag::serde(name = "greptimedb_logs")]
138impl SinkConfig for GreptimeDBLogsConfig {
139    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
140        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
141        let client = HttpClient::new(tls_settings, &cx.proxy)?;
142
143        let auth = match (self.username.clone(), self.password.clone()) {
144            (Some(username), Some(password)) => Some(Auth::Basic {
145                user: username,
146                password,
147            }),
148            _ => None,
149        };
150        let request_builder = GreptimeDBLogsHttpRequestBuilder {
151            endpoint: self.endpoint.clone(),
152            auth: auth.clone(),
153            encoder: (
154                self.encoding.clone(),
155                Encoder::<Framer>::new(
156                    NewlineDelimitedEncoderConfig.build().into(),
157                    JsonSerializerConfig::default().build().into(),
158                ),
159            ),
160            compression: self.compression,
161            extra_params: self.extra_params.clone(),
162            extra_headers: self.extra_headers.clone(),
163        };
164
165        let service: HttpService<GreptimeDBLogsHttpRequestBuilder, PartitionKey> =
166            HttpService::new(client.clone(), request_builder.clone());
167
168        let request_limits = self.request.into_settings();
169
170        let service = ServiceBuilder::new()
171            .settings(request_limits, GreptimeDBHttpRetryLogic::default())
172            .service(service);
173
174        let logs_sink_setting = LogsSinkSetting {
175            dbname: self.dbname.clone(),
176            table: self.table.clone(),
177            pipeline_name: self.pipeline_name.clone(),
178            pipeline_version: self.pipeline_version.clone(),
179        };
180
181        let sink = GreptimeDBLogsHttpSink::new(
182            self.batch.into_batcher_settings()?,
183            service,
184            request_builder,
185            logs_sink_setting,
186        );
187
188        let healthcheck = Box::pin(http_healthcheck(
189            client,
190            self.endpoint.clone(),
191            auth.clone(),
192        ));
193        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
194    }
195
196    fn input(&self) -> Input {
197        Input::log()
198    }
199
200    fn acknowledgements(&self) -> &AcknowledgementsConfig {
201        &self.acknowledgements
202    }
203}