vector/sinks/doris/
config.rs1use super::sink::DorisSink;
4
5use crate::{
6 codecs::EncodingConfigWithFraming,
7 http::{Auth, HttpClient},
8 sinks::{
9 doris::{
10 client::DorisSinkClient, common::DorisCommon, health::DorisHealthLogic,
11 retry::DorisRetryLogic, service::DorisService,
12 },
13 prelude::*,
14 util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, service::HealthConfig},
15 },
16};
17use futures;
18use futures_util::TryFutureExt;
19use std::collections::HashMap;
20use std::sync::Arc;
21
22#[configurable_component(sink("doris", "Deliver log data to an Apache Doris database."))]
24#[derive(Clone, Debug)]
25#[serde(deny_unknown_fields)]
26pub struct DorisConfig {
27 #[serde(default)]
32 #[configurable(metadata(docs::examples = "http://127.0.0.1:8030"))]
33 pub endpoints: Vec<UriSerde>,
34
35 #[configurable(metadata(docs::examples = "mydatabase"))]
37 pub database: Template,
38
39 #[configurable(metadata(docs::examples = "mytable"))]
41 pub table: Template,
42
43 #[configurable(metadata(docs::examples = "vector"))]
46 #[serde(default = "default_label_prefix")]
47 pub label_prefix: String,
48
49 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
51 pub log_request: bool,
52
53 #[serde(default)]
64 #[configurable(metadata(docs::additional_props_description = "An HTTP header value."))]
65 pub headers: HashMap<String, String>,
66
67 #[serde(flatten)]
68 pub encoding: EncodingConfigWithFraming,
69
70 #[serde(default)]
72 pub compression: Compression,
73
74 #[serde(default = "default_max_retries")]
76 pub max_retries: isize,
77
78 #[configurable(derived)]
79 #[serde(default)]
80 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
81
82 #[configurable(derived)]
83 pub auth: Option<Auth>,
84
85 #[serde(default)]
86 #[configurable(derived)]
87 pub request: TowerRequestConfig,
88
89 #[configurable(derived)]
90 pub tls: Option<TlsConfig>,
91
92 #[serde(default)]
94 #[configurable(derived)]
95 #[serde(rename = "distribution")]
96 pub endpoint_health: Option<HealthConfig>,
97
98 #[configurable(derived)]
99 #[serde(
100 default,
101 deserialize_with = "crate::serde::bool_or_struct",
102 skip_serializing_if = "crate::serde::is_default"
103 )]
104 pub acknowledgements: AcknowledgementsConfig,
105}
106
107fn default_label_prefix() -> String {
108 "vector".to_string()
109}
110
111const fn default_max_retries() -> isize {
112 -1
113}
114
115impl Default for DorisConfig {
116 fn default() -> Self {
117 Self {
118 endpoints: Vec::new(),
119 database: Template::try_from("").unwrap(),
120 table: Template::try_from("").unwrap(),
121 label_prefix: default_label_prefix(),
122 log_request: false,
123 headers: HashMap::new(),
124 encoding: (
125 Some(vector_lib::codecs::encoding::FramingConfig::NewlineDelimited),
126 vector_lib::codecs::JsonSerializerConfig::default(),
127 )
128 .into(),
129 compression: Compression::default(),
130 max_retries: default_max_retries(),
131 batch: BatchConfig::default(),
132 auth: None,
133 request: TowerRequestConfig::default(),
134 tls: None,
135 endpoint_health: None,
136 acknowledgements: AcknowledgementsConfig::default(),
137 }
138 }
139}
140
141impl_generate_config_from_default!(DorisConfig);
142
143#[async_trait::async_trait]
144#[typetag::serde(name = "doris")]
145impl SinkConfig for DorisConfig {
146 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
147 let endpoints = self.endpoints.clone();
148
149 if endpoints.is_empty() {
150 return Err("No endpoints configured.'.".into());
151 }
152 let commons = DorisCommon::parse_many(self).await?;
153 let common = commons[0].clone();
154
155 let client = HttpClient::new(common.tls_settings.clone(), &cx.proxy)?;
156
157 let request_settings = self.request.into_settings();
159
160 let health_config = self.endpoint_health.clone().unwrap_or_default();
161
162 let services_futures = commons
163 .iter()
164 .map(|common| {
165 let client_clone = client.clone();
166 let compression = self.compression;
167 let label_prefix = self.label_prefix.clone();
168 let headers = self.headers.clone();
169 let log_request = self.log_request;
170 let base_url = common.base_url.clone();
171 let auth = common.auth.clone();
172
173 async move {
174 let endpoint = base_url.to_string();
175
176 let doris_client = DorisSinkClient::new(
177 client_clone,
178 base_url,
179 auth,
180 compression,
181 label_prefix,
182 headers,
183 )
184 .await;
185
186 let doris_client_safe = doris_client.into_thread_safe();
187
188 let service = DorisService::new(doris_client_safe, log_request);
189
190 Ok::<_, crate::Error>((endpoint, service))
191 }
192 })
193 .collect::<Vec<_>>();
194
195 let services_results = futures::future::join_all(services_futures).await;
197
198 let services = services_results
200 .into_iter()
201 .filter_map(Result::ok)
202 .collect::<Vec<_>>();
203
204 let service = request_settings.distributed_service(
205 DorisRetryLogic {},
206 services,
207 health_config,
208 DorisHealthLogic,
209 1, );
211
212 let sink = DorisSink::new(service, self, &common)?;
214
215 let sink = VectorSink::from_event_streamsink(sink);
216
217 let healthcheck_doris_client = {
219 let doris_client = DorisSinkClient::new(
220 client.clone(),
221 common.base_url.clone(),
222 common.auth.clone(),
223 self.compression,
224 self.label_prefix.clone(),
225 self.headers.clone(),
226 )
227 .await;
228 doris_client.into_thread_safe()
229 };
230
231 let healthcheck = futures::future::select_ok(commons.into_iter().map(move |common| {
233 let client = Arc::clone(&healthcheck_doris_client);
234 async move { common.healthcheck(client).await }.boxed()
235 }))
236 .map_ok(|((), _)| ())
237 .boxed();
238
239 Ok((sink, healthcheck))
240 }
241
242 fn input(&self) -> Input {
243 Input::log()
244 }
245
246 fn acknowledgements(&self) -> &AcknowledgementsConfig {
247 &self.acknowledgements
248 }
249}
250
251#[cfg(test)]
252mod tests {
253 use super::*;
254
255 #[test]
256 fn generate_config() {
257 crate::test_util::test_generate_config::<DorisConfig>();
258 }
259
260 #[test]
261 fn test_default_values() {
262 assert_eq!(default_label_prefix(), "vector");
263 assert_eq!(default_max_retries(), -1);
264 }
265}