vector/sinks/databend/
service.rs1use std::collections::BTreeMap;
2use std::io::Cursor;
3use std::sync::Arc;
4use std::task::{Context, Poll};
5
6use bytes::Bytes;
7use chrono::Utc;
8use databend_client::APIClient as DatabendAPIClient;
9use databend_client::Error as DatabendError;
10use futures::future::BoxFuture;
11use rand::{rng, Rng};
12use rand_distr::Alphanumeric;
13use snafu::Snafu;
14use tower::Service;
15use vector_lib::finalization::{EventFinalizers, EventStatus, Finalizable};
16use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
17use vector_lib::stream::DriverResponse;
18
19use crate::{internal_events::EndpointBytesSent, sinks::util::retries::RetryLogic};
20
21#[derive(Clone)]
22pub struct DatabendRetryLogic;
23
24impl RetryLogic for DatabendRetryLogic {
25 type Error = DatabendError;
26 type Request = DatabendRequest;
27 type Response = DatabendResponse;
28
29 fn is_retriable_error(&self, error: &Self::Error) -> bool {
30 match error {
31 DatabendError::Response { status, .. } => {
32 match status.as_u16() {
33 429 => true,
34 500 => true,
36 3902 => false,
38 1046 => false,
40 _ => false,
41 }
42 }
43 DatabendError::WithContext(boxed_error, ..) => self.is_retriable_error(boxed_error),
44 DatabendError::IO(_) => true,
45 _ => false,
46 }
47 }
48}
49
50#[derive(Clone)]
51pub struct DatabendService {
52 client: Arc<DatabendAPIClient>,
53 table: String,
54 file_format_options: BTreeMap<&'static str, &'static str>,
55 copy_options: BTreeMap<&'static str, &'static str>,
56}
57
58#[derive(Clone)]
59pub struct DatabendRequest {
60 pub data: Bytes,
61 pub finalizers: EventFinalizers,
62 pub metadata: RequestMetadata,
63}
64
65impl Finalizable for DatabendRequest {
66 fn take_finalizers(&mut self) -> EventFinalizers {
67 self.finalizers.take_finalizers()
68 }
69}
70
71impl MetaDescriptive for DatabendRequest {
72 fn get_metadata(&self) -> &RequestMetadata {
73 &self.metadata
74 }
75
76 fn metadata_mut(&mut self) -> &mut RequestMetadata {
77 &mut self.metadata
78 }
79}
80
81#[derive(Debug, Snafu)]
82pub struct DatabendResponse {
83 metadata: RequestMetadata,
84}
85
86impl DriverResponse for DatabendResponse {
87 fn event_status(&self) -> EventStatus {
88 EventStatus::Delivered
89 }
90
91 fn events_sent(&self) -> &GroupedCountByteSize {
92 self.metadata.events_estimated_json_encoded_byte_size()
93 }
94
95 fn bytes_sent(&self) -> Option<usize> {
96 Some(self.metadata.request_encoded_size())
97 }
98}
99
100impl DatabendService {
101 pub(super) fn new(
102 client: Arc<DatabendAPIClient>,
103 table: String,
104 file_format_options: BTreeMap<&'static str, &'static str>,
105 copy_options: BTreeMap<&'static str, &'static str>,
106 ) -> Result<Self, DatabendError> {
107 if table.is_empty() {
108 return Err(DatabendError::BadArgument("table is required".to_string()));
109 }
110 Ok(Self {
111 client,
112 table,
113 file_format_options,
114 copy_options,
115 })
116 }
117
118 async fn new_stage_location(&self) -> String {
119 let now = Utc::now().timestamp();
120 let database = self
121 .client
122 .current_database()
123 .unwrap_or("default".to_string());
124 let suffix = rng()
125 .sample_iter(&Alphanumeric)
126 .take(8)
127 .map(char::from)
128 .collect::<String>();
129 format!("@~/vector/{}/{}/{}-{}", database, self.table, now, suffix,)
130 }
131
132 pub(crate) async fn insert_with_stage(&self, data: Bytes) -> Result<(), DatabendError> {
133 let stage = self.new_stage_location().await;
134 let size = data.len() as u64;
135 let reader = Box::new(Cursor::new(data));
136 self.client.upload_to_stage(&stage, reader, size).await?;
137 let sql = format!("INSERT INTO `{}` VALUES", self.table);
138 let _ = self
139 .client
140 .insert_with_stage(
141 &sql,
142 &stage,
143 self.file_format_options.clone(),
144 self.copy_options.clone(),
145 )
146 .await?;
147 Ok(())
148 }
149}
150
151impl Service<DatabendRequest> for DatabendService {
152 type Response = DatabendResponse;
153 type Error = DatabendError;
154 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
155
156 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
157 Poll::Ready(Ok(()))
158 }
159
160 fn call(&mut self, request: DatabendRequest) -> Self::Future {
161 let service = self.clone();
162
163 let future = async move {
164 let metadata = request.get_metadata().clone();
165 let protocol = service.client.scheme();
166 let host_port = format!("{}:{}", service.client.host(), service.client.port());
167 let endpoint = host_port.as_str();
168 let byte_size = request.data.len();
169 service.insert_with_stage(request.data).await?;
170 emit!(EndpointBytesSent {
171 byte_size,
172 protocol,
173 endpoint,
174 });
175 Ok(DatabendResponse { metadata })
176 };
177 Box::pin(future)
178 }
179}