vector/sinks/databend/
service.rs

1use std::{
2    collections::BTreeMap,
3    io::Cursor,
4    sync::Arc,
5    task::{Context, Poll},
6};
7
8use bytes::Bytes;
9use chrono::Utc;
10use databend_client::{APIClient as DatabendAPIClient, Error as DatabendError};
11use futures::future::BoxFuture;
12use rand::{Rng, rng};
13use rand_distr::Alphanumeric;
14use snafu::Snafu;
15use tower::Service;
16use vector_lib::{
17    finalization::{EventFinalizers, EventStatus, Finalizable},
18    request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
19    stream::DriverResponse,
20};
21
22use crate::{internal_events::EndpointBytesSent, sinks::util::retries::RetryLogic};
23
24#[derive(Clone)]
25pub struct DatabendRetryLogic;
26
27impl RetryLogic for DatabendRetryLogic {
28    type Error = DatabendError;
29    type Request = DatabendRequest;
30    type Response = DatabendResponse;
31
32    fn is_retriable_error(&self, error: &Self::Error) -> bool {
33        match error {
34            DatabendError::Response { status, .. } => {
35                match status.as_u16() {
36                    429 => true,
37                    // general server error
38                    500 => true,
39                    // storage doesn't support presign operation
40                    3902 => false,
41                    // fail to parse stage attachment
42                    1046 => false,
43                    _ => false,
44                }
45            }
46            DatabendError::WithContext(boxed_error, ..) => self.is_retriable_error(boxed_error),
47            DatabendError::IO(_) => true,
48            _ => false,
49        }
50    }
51}
52
53#[derive(Clone)]
54pub struct DatabendService {
55    client: Arc<DatabendAPIClient>,
56    table: String,
57    file_format_options: BTreeMap<&'static str, &'static str>,
58    copy_options: BTreeMap<&'static str, &'static str>,
59}
60
61#[derive(Clone)]
62pub struct DatabendRequest {
63    pub data: Bytes,
64    pub finalizers: EventFinalizers,
65    pub metadata: RequestMetadata,
66}
67
68impl Finalizable for DatabendRequest {
69    fn take_finalizers(&mut self) -> EventFinalizers {
70        self.finalizers.take_finalizers()
71    }
72}
73
74impl MetaDescriptive for DatabendRequest {
75    fn get_metadata(&self) -> &RequestMetadata {
76        &self.metadata
77    }
78
79    fn metadata_mut(&mut self) -> &mut RequestMetadata {
80        &mut self.metadata
81    }
82}
83
84#[derive(Debug, Snafu)]
85pub struct DatabendResponse {
86    metadata: RequestMetadata,
87}
88
89impl DriverResponse for DatabendResponse {
90    fn event_status(&self) -> EventStatus {
91        EventStatus::Delivered
92    }
93
94    fn events_sent(&self) -> &GroupedCountByteSize {
95        self.metadata.events_estimated_json_encoded_byte_size()
96    }
97
98    fn bytes_sent(&self) -> Option<usize> {
99        Some(self.metadata.request_encoded_size())
100    }
101}
102
103impl DatabendService {
104    pub(super) fn new(
105        client: Arc<DatabendAPIClient>,
106        table: String,
107        file_format_options: BTreeMap<&'static str, &'static str>,
108        copy_options: BTreeMap<&'static str, &'static str>,
109    ) -> Result<Self, DatabendError> {
110        if table.is_empty() {
111            return Err(DatabendError::BadArgument("table is required".to_string()));
112        }
113        Ok(Self {
114            client,
115            table,
116            file_format_options,
117            copy_options,
118        })
119    }
120
121    async fn new_stage_location(&self) -> String {
122        let now = Utc::now().timestamp();
123        let database = self
124            .client
125            .current_database()
126            .unwrap_or("default".to_string());
127        let suffix = rng()
128            .sample_iter(&Alphanumeric)
129            .take(8)
130            .map(char::from)
131            .collect::<String>();
132        format!("@~/vector/{}/{}/{}-{}", database, self.table, now, suffix,)
133    }
134
135    pub(crate) async fn insert_with_stage(&self, data: Bytes) -> Result<(), DatabendError> {
136        let stage = self.new_stage_location().await;
137        let size = data.len() as u64;
138        let reader = Box::new(Cursor::new(data));
139        self.client.upload_to_stage(&stage, reader, size).await?;
140        let sql = format!("INSERT INTO `{}` VALUES", self.table);
141        let _ = self
142            .client
143            .insert_with_stage(
144                &sql,
145                &stage,
146                self.file_format_options.clone(),
147                self.copy_options.clone(),
148            )
149            .await?;
150        Ok(())
151    }
152}
153
154impl Service<DatabendRequest> for DatabendService {
155    type Response = DatabendResponse;
156    type Error = DatabendError;
157    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
158
159    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
160        Poll::Ready(Ok(()))
161    }
162
163    fn call(&mut self, request: DatabendRequest) -> Self::Future {
164        let service = self.clone();
165
166        let future = async move {
167            let metadata = request.get_metadata().clone();
168            let protocol = service.client.scheme();
169            let host_port = format!("{}:{}", service.client.host(), service.client.port());
170            let endpoint = host_port.as_str();
171            let byte_size = request.data.len();
172            service.insert_with_stage(request.data).await?;
173            emit!(EndpointBytesSent {
174                byte_size,
175                protocol,
176                endpoint,
177            });
178            Ok(DatabendResponse { metadata })
179        };
180        Box::pin(future)
181    }
182}