vector/sinks/doris/
config.rs

1//! Configuration for the `Doris` sink.
2
3use super::sink::DorisSink;
4
5use crate::{
6    codecs::EncodingConfigWithFraming,
7    http::{Auth, HttpClient},
8    sinks::{
9        doris::{
10            client::DorisSinkClient, common::DorisCommon, health::DorisHealthLogic,
11            retry::DorisRetryLogic, service::DorisService,
12        },
13        prelude::*,
14        util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde, service::HealthConfig},
15    },
16};
17use futures;
18use futures_util::TryFutureExt;
19use std::collections::HashMap;
20use std::sync::Arc;
21
22/// Configuration for the `doris` sink.
23#[configurable_component(sink("doris", "Deliver log data to an Apache Doris database."))]
24#[derive(Clone, Debug)]
25#[serde(deny_unknown_fields)]
26pub struct DorisConfig {
27    /// A list of Doris endpoints to send logs to.
28    ///
29    /// The endpoint must contain an HTTP scheme, and may specify a
30    /// hostname or IP address and port.
31    #[serde(default)]
32    #[configurable(metadata(docs::examples = "http://127.0.0.1:8030"))]
33    pub endpoints: Vec<UriSerde>,
34
35    /// The database that contains the table data will be inserted into.
36    #[configurable(metadata(docs::examples = "mydatabase"))]
37    pub database: Template,
38
39    /// The table data is inserted into.
40    #[configurable(metadata(docs::examples = "mytable"))]
41    pub table: Template,
42
43    /// The prefix for Stream Load label.
44    /// The final label will be in format: `{label_prefix}_{database}_{table}_{timestamp}_{uuid}`.
45    #[configurable(metadata(docs::examples = "vector"))]
46    #[serde(default = "default_label_prefix")]
47    pub label_prefix: String,
48
49    /// Enable request logging.
50    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
51    pub log_request: bool,
52
53    /// Custom HTTP headers to add to the request.
54    ///
55    /// These headers can be used to set Doris-specific Stream Load parameters:
56    /// - `format`: Data format (json, csv.)
57    /// - `read_json_by_line`: Whether to read JSON line by line
58    /// - `strip_outer_array`: Whether to strip outer array brackets
59    /// - Column mappings and transformations
60    ///
61    /// See [Doris Stream Load documentation](https://doris.apache.org/docs/data-operate/import/import-way/stream-load-manual)
62    /// for all available parameters.
63    #[serde(default)]
64    #[configurable(metadata(docs::additional_props_description = "An HTTP header value."))]
65    pub headers: HashMap<String, String>,
66
67    #[serde(flatten)]
68    pub encoding: EncodingConfigWithFraming,
69
70    /// Compression algorithm to use for HTTP requests.
71    #[serde(default)]
72    pub compression: Compression,
73
74    /// Number of retries attempted before failing.
75    #[serde(default = "default_max_retries")]
76    pub max_retries: isize,
77
78    #[configurable(derived)]
79    #[serde(default)]
80    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
81
82    #[configurable(derived)]
83    pub auth: Option<Auth>,
84
85    #[serde(default)]
86    #[configurable(derived)]
87    pub request: TowerRequestConfig,
88
89    #[configurable(derived)]
90    pub tls: Option<TlsConfig>,
91
92    /// Options for determining the health of Doris endpoints.
93    #[serde(default)]
94    #[configurable(derived)]
95    #[serde(rename = "distribution")]
96    pub endpoint_health: Option<HealthConfig>,
97
98    #[configurable(derived)]
99    #[serde(
100        default,
101        deserialize_with = "crate::serde::bool_or_struct",
102        skip_serializing_if = "crate::serde::is_default"
103    )]
104    pub acknowledgements: AcknowledgementsConfig,
105}
106
107fn default_label_prefix() -> String {
108    "vector".to_string()
109}
110
111const fn default_max_retries() -> isize {
112    -1
113}
114
115impl Default for DorisConfig {
116    fn default() -> Self {
117        Self {
118            endpoints: Vec::new(),
119            database: Template::try_from("").unwrap(),
120            table: Template::try_from("").unwrap(),
121            label_prefix: default_label_prefix(),
122            log_request: false,
123            headers: HashMap::new(),
124            encoding: (
125                Some(vector_lib::codecs::encoding::FramingConfig::NewlineDelimited),
126                vector_lib::codecs::JsonSerializerConfig::default(),
127            )
128                .into(),
129            compression: Compression::default(),
130            max_retries: default_max_retries(),
131            batch: BatchConfig::default(),
132            auth: None,
133            request: TowerRequestConfig::default(),
134            tls: None,
135            endpoint_health: None,
136            acknowledgements: AcknowledgementsConfig::default(),
137        }
138    }
139}
140
141impl_generate_config_from_default!(DorisConfig);
142
143#[async_trait::async_trait]
144#[typetag::serde(name = "doris")]
145impl SinkConfig for DorisConfig {
146    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
147        let endpoints = self.endpoints.clone();
148
149        if endpoints.is_empty() {
150            return Err("No endpoints configured.'.".into());
151        }
152        let commons = DorisCommon::parse_many(self).await?;
153        let common = commons[0].clone();
154
155        let client = HttpClient::new(common.tls_settings.clone(), &cx.proxy)?;
156
157        // Setup retry logic using the configured request settings
158        let request_settings = self.request.into_settings();
159
160        let health_config = self.endpoint_health.clone().unwrap_or_default();
161
162        let services_futures = commons
163            .iter()
164            .map(|common| {
165                let client_clone = client.clone();
166                let compression = self.compression;
167                let label_prefix = self.label_prefix.clone();
168                let headers = self.headers.clone();
169                let log_request = self.log_request;
170                let base_url = common.base_url.clone();
171                let auth = common.auth.clone();
172
173                async move {
174                    let endpoint = base_url.to_string();
175
176                    let doris_client = DorisSinkClient::new(
177                        client_clone,
178                        base_url,
179                        auth,
180                        compression,
181                        label_prefix,
182                        headers,
183                    )
184                    .await;
185
186                    let doris_client_safe = doris_client.into_thread_safe();
187
188                    let service = DorisService::new(doris_client_safe, log_request);
189
190                    Ok::<_, crate::Error>((endpoint, service))
191                }
192            })
193            .collect::<Vec<_>>();
194
195        // Wait for all futures to complete
196        let services_results = futures::future::join_all(services_futures).await;
197
198        // Filter out successful results
199        let services = services_results
200            .into_iter()
201            .filter_map(Result::ok)
202            .collect::<Vec<_>>();
203
204        let service = request_settings.distributed_service(
205            DorisRetryLogic {},
206            services,
207            health_config,
208            DorisHealthLogic,
209            1, // Buffer bound is hardcoded to 1 for sinks
210        );
211
212        // Create DorisSink with the configured service
213        let sink = DorisSink::new(service, self, &common)?;
214
215        let sink = VectorSink::from_event_streamsink(sink);
216
217        // Create a shared client instance to avoid repeated creation
218        let healthcheck_doris_client = {
219            let doris_client = DorisSinkClient::new(
220                client.clone(),
221                common.base_url.clone(),
222                common.auth.clone(),
223                self.compression,
224                self.label_prefix.clone(),
225                self.headers.clone(),
226            )
227            .await;
228            doris_client.into_thread_safe()
229        };
230
231        // Use the previously saved client for health check, no need to create a new instance
232        let healthcheck = futures::future::select_ok(commons.into_iter().map(move |common| {
233            let client = Arc::clone(&healthcheck_doris_client);
234            async move { common.healthcheck(client).await }.boxed()
235        }))
236        .map_ok(|((), _)| ())
237        .boxed();
238
239        Ok((sink, healthcheck))
240    }
241
242    fn input(&self) -> Input {
243        Input::log()
244    }
245
246    fn acknowledgements(&self) -> &AcknowledgementsConfig {
247        &self.acknowledgements
248    }
249}
250
251#[cfg(test)]
252mod tests {
253    use super::*;
254
255    #[test]
256    fn generate_config() {
257        crate::test_util::test_generate_config::<DorisConfig>();
258    }
259
260    #[test]
261    fn test_default_values() {
262        assert_eq!(default_label_prefix(), "vector");
263        assert_eq!(default_max_retries(), -1);
264    }
265}