vector/sinks/greptimedb/logs/
config.rs1use std::collections::HashMap;
2
3use vector_lib::{
4 codecs::{JsonSerializerConfig, NewlineDelimitedEncoderConfig, encoding::Framer},
5 configurable::configurable_component,
6 sensitive_string::SensitiveString,
7};
8
9use crate::{
10 http::{Auth, HttpClient},
11 sinks::{
12 greptimedb::{
13 GreptimeDBDefaultBatchSettings, default_dbname_template, default_pipeline_template,
14 logs::{
15 http_request_builder::{
16 GreptimeDBHttpRetryLogic, GreptimeDBLogsHttpRequestBuilder, PartitionKey,
17 http_healthcheck,
18 },
19 sink::{GreptimeDBLogsHttpSink, LogsSinkSetting},
20 },
21 },
22 prelude::*,
23 util::http::HttpService,
24 },
25};
26
27fn extra_params_examples() -> HashMap<String, String> {
28 HashMap::<_, _>::from_iter([("source".to_owned(), "vector".to_owned())])
29}
30
31#[configurable_component(sink("greptimedb_logs", "Ingest logs data into GreptimeDB."))]
33#[derive(Clone, Debug, Default, Derivative)]
34#[serde(deny_unknown_fields)]
35pub struct GreptimeDBLogsConfig {
36 #[serde(alias = "host")]
38 #[configurable(metadata(docs::examples = "http://localhost:4000"))]
39 pub endpoint: String,
40
41 #[configurable(metadata(docs::examples = "mytable"))]
43 pub table: Template,
44
45 #[configurable(metadata(docs::examples = "public"))]
55 #[derivative(Default(value = "default_dbname_template()"))]
56 #[serde(default = "default_dbname_template")]
57 pub dbname: Template,
58
59 #[configurable(metadata(docs::examples = "pipeline_name"))]
63 #[derivative(Default(value = "default_pipeline_template()"))]
64 #[serde(default = "default_pipeline_template")]
65 pub pipeline_name: Template,
66
67 #[configurable(metadata(docs::examples = "2024-06-07 06:46:23.858293"))]
69 pub pipeline_version: Option<Template>,
70
71 #[configurable(metadata(docs::examples = "username"))]
75 #[serde(default)]
76 pub username: Option<String>,
77 #[configurable(metadata(docs::examples = "password"))]
81 #[serde(default)]
82 pub password: Option<SensitiveString>,
83 #[configurable(derived)]
86 #[serde(default = "Compression::gzip_default")]
87 pub compression: Compression,
88
89 #[configurable(derived)]
90 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
91 pub encoding: Transformer,
92
93 #[serde(default)]
95 #[configurable(metadata(docs::advanced))]
96 #[configurable(metadata(docs::additional_props_description = "A query string parameter."))]
97 #[configurable(metadata(docs::examples = "extra_params_examples()"))]
98 pub extra_params: Option<HashMap<String, String>>,
99
100 #[serde(default)]
103 #[configurable(metadata(docs::advanced))]
104 #[configurable(metadata(
105 docs::additional_props_description = "Extra header key-value pairs."
106 ))]
107 pub extra_headers: Option<HashMap<String, String>>,
108
109 #[configurable(derived)]
110 #[serde(default)]
111 pub(crate) batch: BatchConfig<GreptimeDBDefaultBatchSettings>,
112
113 #[configurable(derived)]
114 #[serde(default)]
115 pub request: TowerRequestConfig,
116
117 #[configurable(derived)]
118 pub tls: Option<TlsConfig>,
119
120 #[configurable(derived)]
121 #[serde(
122 default,
123 deserialize_with = "crate::serde::bool_or_struct",
124 skip_serializing_if = "crate::serde::is_default"
125 )]
126 pub acknowledgements: AcknowledgementsConfig,
127}
128
129impl_generate_config_from_default!(GreptimeDBLogsConfig);
130
131#[async_trait::async_trait]
132#[typetag::serde(name = "greptimedb_logs")]
133impl SinkConfig for GreptimeDBLogsConfig {
134 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
135 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
136 let client = HttpClient::new(tls_settings, &cx.proxy)?;
137
138 let auth = match (self.username.clone(), self.password.clone()) {
139 (Some(username), Some(password)) => Some(Auth::Basic {
140 user: username,
141 password,
142 }),
143 _ => None,
144 };
145 let request_builder = GreptimeDBLogsHttpRequestBuilder {
146 endpoint: self.endpoint.clone(),
147 auth: auth.clone(),
148 encoder: (
149 self.encoding.clone(),
150 Encoder::<Framer>::new(
151 NewlineDelimitedEncoderConfig.build().into(),
152 JsonSerializerConfig::default().build().into(),
153 ),
154 ),
155 compression: self.compression,
156 extra_params: self.extra_params.clone(),
157 extra_headers: self.extra_headers.clone(),
158 };
159
160 let service: HttpService<GreptimeDBLogsHttpRequestBuilder, PartitionKey> =
161 HttpService::new(client.clone(), request_builder.clone());
162
163 let request_limits = self.request.into_settings();
164
165 let service = ServiceBuilder::new()
166 .settings(request_limits, GreptimeDBHttpRetryLogic::default())
167 .service(service);
168
169 let logs_sink_setting = LogsSinkSetting {
170 dbname: self.dbname.clone(),
171 table: self.table.clone(),
172 pipeline_name: self.pipeline_name.clone(),
173 pipeline_version: self.pipeline_version.clone(),
174 };
175
176 let sink = GreptimeDBLogsHttpSink::new(
177 self.batch.into_batcher_settings()?,
178 service,
179 request_builder,
180 logs_sink_setting,
181 );
182
183 let healthcheck = Box::pin(http_healthcheck(
184 client,
185 self.endpoint.clone(),
186 auth.clone(),
187 ));
188 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
189 }
190
191 fn input(&self) -> Input {
192 Input::log()
193 }
194
195 fn acknowledgements(&self) -> &AcknowledgementsConfig {
196 &self.acknowledgements
197 }
198}