vector/sinks/greptimedb/logs/
config.rs1use std::collections::HashMap;
2
3use vector_lib::{
4 codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
5 configurable::configurable_component,
6 sensitive_string::SensitiveString,
7};
8
9use crate::{
10 http::{Auth, HttpClient},
11 sinks::{
12 greptimedb::{
13 GreptimeDBDefaultBatchSettings, default_dbname_template, default_pipeline_template,
14 logs::{
15 http_request_builder::{
16 GreptimeDBHttpRetryLogic, GreptimeDBLogsHttpRequestBuilder, PartitionKey,
17 http_healthcheck,
18 },
19 sink::{GreptimeDBLogsHttpSink, LogsSinkSetting},
20 },
21 },
22 prelude::*,
23 util::http::HttpService,
24 },
25};
26
27fn extra_params_examples() -> HashMap<String, String> {
28 HashMap::<_, _>::from_iter([("source".to_owned(), "vector".to_owned())])
29}
30
31fn extra_headers_examples() -> HashMap<String, String> {
32 HashMap::new()
33}
34
35#[configurable_component(sink("greptimedb_logs", "Ingest logs data into GreptimeDB."))]
37#[derive(Clone, Debug, Default, Derivative)]
38#[serde(deny_unknown_fields)]
39pub struct GreptimeDBLogsConfig {
40 #[serde(alias = "host")]
42 #[configurable(metadata(docs::examples = "http://localhost:4000"))]
43 pub endpoint: String,
44
45 #[configurable(metadata(docs::examples = "mytable"))]
47 pub table: Template,
48
49 #[configurable(metadata(docs::examples = "public"))]
59 #[derivative(Default(value = "default_dbname_template()"))]
60 #[serde(default = "default_dbname_template")]
61 pub dbname: Template,
62
63 #[configurable(metadata(docs::examples = "pipeline_name"))]
67 #[derivative(Default(value = "default_pipeline_template()"))]
68 #[serde(default = "default_pipeline_template")]
69 pub pipeline_name: Template,
70
71 #[configurable(metadata(docs::examples = "2024-06-07 06:46:23.858293"))]
73 pub pipeline_version: Option<Template>,
74
75 #[configurable(metadata(docs::examples = "username"))]
79 #[serde(default)]
80 pub username: Option<String>,
81 #[configurable(metadata(docs::examples = "password"))]
85 #[serde(default)]
86 pub password: Option<SensitiveString>,
87 #[configurable(derived)]
90 #[serde(default = "Compression::gzip_default")]
91 pub compression: Compression,
92
93 #[configurable(derived)]
94 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
95 pub encoding: Transformer,
96
97 #[serde(default)]
99 #[configurable(metadata(docs::advanced))]
100 #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
101 #[configurable(metadata(docs::examples = "extra_params_examples()"))]
102 pub extra_params: Option<HashMap<String, String>>,
103
104 #[serde(default)]
107 #[configurable(metadata(docs::advanced))]
108 #[configurable(metadata(
109 docs::additional_props_description = "Extra header key-value pairs."
110 ))]
111 #[configurable(metadata(docs::examples = "extra_headers_examples()"))]
112 pub extra_headers: Option<HashMap<String, String>>,
113
114 #[configurable(derived)]
115 #[serde(default)]
116 pub(crate) batch: BatchConfig<GreptimeDBDefaultBatchSettings>,
117
118 #[configurable(derived)]
119 #[serde(default)]
120 pub request: TowerRequestConfig,
121
122 #[configurable(derived)]
123 pub tls: Option<TlsConfig>,
124
125 #[configurable(derived)]
126 #[serde(
127 default,
128 deserialize_with = "crate::serde::bool_or_struct",
129 skip_serializing_if = "crate::serde::is_default"
130 )]
131 pub acknowledgements: AcknowledgementsConfig,
132}
133
134impl_generate_config_from_default!(GreptimeDBLogsConfig);
135
136#[async_trait::async_trait]
137#[typetag::serde(name = "greptimedb_logs")]
138impl SinkConfig for GreptimeDBLogsConfig {
139 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
140 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
141 let client = HttpClient::new(tls_settings, &cx.proxy)?;
142
143 let auth = match (self.username.clone(), self.password.clone()) {
144 (Some(username), Some(password)) => Some(Auth::Basic {
145 user: username,
146 password,
147 }),
148 _ => None,
149 };
150 let request_builder = GreptimeDBLogsHttpRequestBuilder {
151 endpoint: self.endpoint.clone(),
152 auth: auth.clone(),
153 encoder: (
154 self.encoding.clone(),
155 Encoder::<Framer>::new(
156 NewlineDelimitedEncoderConfig.build().into(),
157 JsonSerializerConfig::default().build().into(),
158 ),
159 ),
160 compression: self.compression,
161 extra_params: self.extra_params.clone(),
162 extra_headers: self.extra_headers.clone(),
163 };
164
165 let service: HttpService<GreptimeDBLogsHttpRequestBuilder, PartitionKey> =
166 HttpService::new(client.clone(), request_builder.clone());
167
168 let request_limits = self.request.into_settings();
169
170 let service = ServiceBuilder::new()
171 .settings(request_limits, GreptimeDBHttpRetryLogic::default())
172 .service(service);
173
174 let logs_sink_setting = LogsSinkSetting {
175 dbname: self.dbname.clone(),
176 table: self.table.clone(),
177 pipeline_name: self.pipeline_name.clone(),
178 pipeline_version: self.pipeline_version.clone(),
179 };
180
181 let sink = GreptimeDBLogsHttpSink::new(
182 self.batch.into_batcher_settings()?,
183 service,
184 request_builder,
185 logs_sink_setting,
186 );
187
188 let healthcheck = Box::pin(http_healthcheck(
189 client,
190 self.endpoint.clone(),
191 auth.clone(),
192 ));
193 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
194 }
195
196 fn input(&self) -> Input {
197 Input::log()
198 }
199
200 fn acknowledgements(&self) -> &AcknowledgementsConfig {
201 &self.acknowledgements
202 }
203}