vector/sinks/databend/
service.rs1use std::{
2 collections::BTreeMap,
3 io::Cursor,
4 sync::Arc,
5 task::{Context, Poll},
6};
7
8use bytes::Bytes;
9use chrono::Utc;
10use databend_client::{APIClient as DatabendAPIClient, Error as DatabendError};
11use futures::future::BoxFuture;
12use rand::{Rng, rng};
13use rand_distr::Alphanumeric;
14use snafu::Snafu;
15use tower::Service;
16use vector_lib::{
17 finalization::{EventFinalizers, EventStatus, Finalizable},
18 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
19 stream::DriverResponse,
20};
21
22use crate::{internal_events::EndpointBytesSent, sinks::util::retries::RetryLogic};
23
24#[derive(Clone)]
25pub struct DatabendRetryLogic;
26
27impl RetryLogic for DatabendRetryLogic {
28 type Error = DatabendError;
29 type Request = DatabendRequest;
30 type Response = DatabendResponse;
31
32 fn is_retriable_error(&self, error: &Self::Error) -> bool {
33 match error {
34 DatabendError::Response { status, .. } => {
35 match status.as_u16() {
36 429 => true,
37 500 => true,
39 3902 => false,
41 1046 => false,
43 _ => false,
44 }
45 }
46 DatabendError::WithContext(boxed_error, ..) => self.is_retriable_error(boxed_error),
47 DatabendError::IO(_) => true,
48 _ => false,
49 }
50 }
51}
52
53#[derive(Clone)]
54pub struct DatabendService {
55 client: Arc<DatabendAPIClient>,
56 table: String,
57 file_format_options: BTreeMap<&'static str, &'static str>,
58 copy_options: BTreeMap<&'static str, &'static str>,
59}
60
61#[derive(Clone)]
62pub struct DatabendRequest {
63 pub data: Bytes,
64 pub finalizers: EventFinalizers,
65 pub metadata: RequestMetadata,
66}
67
68impl Finalizable for DatabendRequest {
69 fn take_finalizers(&mut self) -> EventFinalizers {
70 self.finalizers.take_finalizers()
71 }
72}
73
74impl MetaDescriptive for DatabendRequest {
75 fn get_metadata(&self) -> &RequestMetadata {
76 &self.metadata
77 }
78
79 fn metadata_mut(&mut self) -> &mut RequestMetadata {
80 &mut self.metadata
81 }
82}
83
84#[derive(Debug, Snafu)]
85pub struct DatabendResponse {
86 metadata: RequestMetadata,
87}
88
89impl DriverResponse for DatabendResponse {
90 fn event_status(&self) -> EventStatus {
91 EventStatus::Delivered
92 }
93
94 fn events_sent(&self) -> &GroupedCountByteSize {
95 self.metadata.events_estimated_json_encoded_byte_size()
96 }
97
98 fn bytes_sent(&self) -> Option<usize> {
99 Some(self.metadata.request_encoded_size())
100 }
101}
102
103impl DatabendService {
104 pub(super) fn new(
105 client: Arc<DatabendAPIClient>,
106 table: String,
107 file_format_options: BTreeMap<&'static str, &'static str>,
108 copy_options: BTreeMap<&'static str, &'static str>,
109 ) -> Result<Self, DatabendError> {
110 if table.is_empty() {
111 return Err(DatabendError::BadArgument("table is required".to_string()));
112 }
113 Ok(Self {
114 client,
115 table,
116 file_format_options,
117 copy_options,
118 })
119 }
120
121 async fn new_stage_location(&self) -> String {
122 let now = Utc::now().timestamp();
123 let database = self
124 .client
125 .current_database()
126 .unwrap_or("default".to_string());
127 let suffix = rng()
128 .sample_iter(&Alphanumeric)
129 .take(8)
130 .map(char::from)
131 .collect::<String>();
132 format!("@~/vector/{}/{}/{}-{}", database, self.table, now, suffix,)
133 }
134
135 pub(crate) async fn insert_with_stage(&self, data: Bytes) -> Result<(), DatabendError> {
136 let stage = self.new_stage_location().await;
137 let size = data.len() as u64;
138 let reader = Box::new(Cursor::new(data));
139 self.client.upload_to_stage(&stage, reader, size).await?;
140 let sql = format!("INSERT INTO `{}` VALUES", self.table);
141 let _ = self
142 .client
143 .insert_with_stage(
144 &sql,
145 &stage,
146 self.file_format_options.clone(),
147 self.copy_options.clone(),
148 )
149 .await?;
150 Ok(())
151 }
152}
153
154impl Service<DatabendRequest> for DatabendService {
155 type Response = DatabendResponse;
156 type Error = DatabendError;
157 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
158
159 fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
160 Poll::Ready(Ok(()))
161 }
162
163 fn call(&mut self, request: DatabendRequest) -> Self::Future {
164 let service = self.clone();
165
166 let future = async move {
167 let metadata = request.get_metadata().clone();
168 let protocol = service.client.scheme();
169 let host_port = format!("{}:{}", service.client.host(), service.client.port());
170 let endpoint = host_port.as_str();
171 let byte_size = request.data.len();
172 service.insert_with_stage(request.data).await?;
173 emit!(EndpointBytesSent {
174 byte_size,
175 protocol,
176 endpoint,
177 });
178 Ok(DatabendResponse { metadata })
179 };
180 Box::pin(future)
181 }
182}