vector/sinks/databend/
service.rs

1use std::collections::BTreeMap;
2use std::io::Cursor;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use bytes::Bytes;
7use chrono::Utc;
8use databend_client::APIClient as DatabendAPIClient;
9use databend_client::Error as DatabendError;
10use futures::future::BoxFuture;
11use rand::{rng, Rng};
12use rand_distr::Alphanumeric;
13use snafu::Snafu;
14use tower::Service;
15use vector_lib::finalization::{EventFinalizers, EventStatus, Finalizable};
16use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
17use vector_lib::stream::DriverResponse;
18
19use crate::{internal_events::EndpointBytesSent, sinks::util::retries::RetryLogic};
20
21#[derive(Clone)]
22pub struct DatabendRetryLogic;
23
24impl RetryLogic for DatabendRetryLogic {
25    type Error = DatabendError;
26    type Request = DatabendRequest;
27    type Response = DatabendResponse;
28
29    fn is_retriable_error(&self, error: &Self::Error) -> bool {
30        match error {
31            DatabendError::Response { status, .. } => {
32                match status.as_u16() {
33                    429 => true,
34                    // general server error
35                    500 => true,
36                    // storage doesn't support presign operation
37                    3902 => false,
38                    // fail to parse stage attachment
39                    1046 => false,
40                    _ => false,
41                }
42            }
43            DatabendError::WithContext(boxed_error, ..) => self.is_retriable_error(boxed_error),
44            DatabendError::IO(_) => true,
45            _ => false,
46        }
47    }
48}
49
50#[derive(Clone)]
51pub struct DatabendService {
52    client: Arc<DatabendAPIClient>,
53    table: String,
54    file_format_options: BTreeMap<&'static str, &'static str>,
55    copy_options: BTreeMap<&'static str, &'static str>,
56}
57
58#[derive(Clone)]
59pub struct DatabendRequest {
60    pub data: Bytes,
61    pub finalizers: EventFinalizers,
62    pub metadata: RequestMetadata,
63}
64
65impl Finalizable for DatabendRequest {
66    fn take_finalizers(&mut self) -> EventFinalizers {
67        self.finalizers.take_finalizers()
68    }
69}
70
71impl MetaDescriptive for DatabendRequest {
72    fn get_metadata(&self) -> &RequestMetadata {
73        &self.metadata
74    }
75
76    fn metadata_mut(&mut self) -> &mut RequestMetadata {
77        &mut self.metadata
78    }
79}
80
81#[derive(Debug, Snafu)]
82pub struct DatabendResponse {
83    metadata: RequestMetadata,
84}
85
86impl DriverResponse for DatabendResponse {
87    fn event_status(&self) -> EventStatus {
88        EventStatus::Delivered
89    }
90
91    fn events_sent(&self) -> &GroupedCountByteSize {
92        self.metadata.events_estimated_json_encoded_byte_size()
93    }
94
95    fn bytes_sent(&self) -> Option<usize> {
96        Some(self.metadata.request_encoded_size())
97    }
98}
99
100impl DatabendService {
101    pub(super) fn new(
102        client: Arc<DatabendAPIClient>,
103        table: String,
104        file_format_options: BTreeMap<&'static str, &'static str>,
105        copy_options: BTreeMap<&'static str, &'static str>,
106    ) -> Result<Self, DatabendError> {
107        if table.is_empty() {
108            return Err(DatabendError::BadArgument("table is required".to_string()));
109        }
110        Ok(Self {
111            client,
112            table,
113            file_format_options,
114            copy_options,
115        })
116    }
117
118    async fn new_stage_location(&self) -> String {
119        let now = Utc::now().timestamp();
120        let database = self
121            .client
122            .current_database()
123            .unwrap_or("default".to_string());
124        let suffix = rng()
125            .sample_iter(&Alphanumeric)
126            .take(8)
127            .map(char::from)
128            .collect::<String>();
129        format!("@~/vector/{}/{}/{}-{}", database, self.table, now, suffix,)
130    }
131
132    pub(crate) async fn insert_with_stage(&self, data: Bytes) -> Result<(), DatabendError> {
133        let stage = self.new_stage_location().await;
134        let size = data.len() as u64;
135        let reader = Box::new(Cursor::new(data));
136        self.client.upload_to_stage(&stage, reader, size).await?;
137        let sql = format!("INSERT INTO `{}` VALUES", self.table);
138        let _ = self
139            .client
140            .insert_with_stage(
141                &sql,
142                &stage,
143                self.file_format_options.clone(),
144                self.copy_options.clone(),
145            )
146            .await?;
147        Ok(())
148    }
149}
150
151impl Service<DatabendRequest> for DatabendService {
152    type Response = DatabendResponse;
153    type Error = DatabendError;
154    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
155
156    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
157        Poll::Ready(Ok(()))
158    }
159
160    fn call(&mut self, request: DatabendRequest) -> Self::Future {
161        let service = self.clone();
162
163        let future = async move {
164            let metadata = request.get_metadata().clone();
165            let protocol = service.client.scheme();
166            let host_port = format!("{}:{}", service.client.host(), service.client.port());
167            let endpoint = host_port.as_str();
168            let byte_size = request.data.len();
169            service.insert_with_stage(request.data).await?;
170            emit!(EndpointBytesSent {
171                byte_size,
172                protocol,
173                endpoint,
174            });
175            Ok(DatabendResponse { metadata })
176        };
177        Box::pin(future)
178    }
179}