vector/sinks/
opendal_common.rs1use std::{fmt, task::Poll};
12
13use bytes::Bytes;
14use opendal::Operator;
15use snafu::Snafu;
16use tracing::Instrument;
17use vector_lib::codecs::encoding::Framer;
18
19use crate::sinks::{prelude::*, util::partitioner::KeyPartitioner};
20
21pub struct OpenDalSink<Svc> {
28 service: Svc,
29 request_builder: OpenDalRequestBuilder,
30 partitioner: KeyPartitioner,
31 batcher_settings: BatcherSettings,
32}
33
34impl<Svc> OpenDalSink<Svc> {
35 pub const fn new(
37 service: Svc,
38 request_builder: OpenDalRequestBuilder,
39 partitioner: KeyPartitioner,
40 batcher_settings: BatcherSettings,
41 ) -> Self {
42 Self {
43 service,
44 request_builder,
45 partitioner,
46 batcher_settings,
47 }
48 }
49}
50
51#[async_trait::async_trait]
52impl<Svc> StreamSink<Event> for OpenDalSink<Svc>
53where
54 Svc: Service<OpenDalRequest> + Send + 'static,
55 Svc::Future: Send + 'static,
56 Svc::Response: DriverResponse + Send + 'static,
57 Svc::Error: fmt::Debug + Into<crate::Error> + Send,
58{
59 async fn run(
60 self: Box<Self>,
61 input: futures_util::stream::BoxStream<'_, Event>,
62 ) -> Result<(), ()> {
63 self.run_inner(input).await
64 }
65}
66
67impl<Svc> OpenDalSink<Svc>
68where
69 Svc: Service<OpenDalRequest> + Send + 'static,
70 Svc::Future: Send + 'static,
71 Svc::Response: DriverResponse + Send + 'static,
72 Svc::Error: fmt::Debug + Into<crate::Error> + Send,
73{
74 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
75 let partitioner = self.partitioner;
76 let settings = self.batcher_settings;
77
78 let request_builder = self.request_builder;
79
80 input
81 .batched_partitioned(partitioner, || settings.as_byte_size_config())
82 .filter_map(|(key, batch)| async move {
83 key.map(move |k| (k, batch))
87 })
88 .request_builder(default_request_builder_concurrency_limit(), request_builder)
89 .filter_map(|request| async move {
90 match request {
91 Err(error) => {
92 emit!(SinkRequestBuildError { error });
93 None
94 }
95 Ok(req) => Some(req),
96 }
97 })
98 .into_driver(self.service)
99 .protocol("file")
101 .run()
102 .await
103 }
104}
105
106#[derive(Debug, Clone)]
109pub struct OpenDalService {
110 op: Operator,
111}
112
113impl OpenDalService {
114 pub const fn new(op: Operator) -> OpenDalService {
115 OpenDalService { op }
116 }
117}
118
119#[derive(Clone)]
124pub struct OpenDalRequest {
125 pub payload: Bytes,
126 pub metadata: OpenDalMetadata,
127 pub request_metadata: RequestMetadata,
128}
129
130impl MetaDescriptive for OpenDalRequest {
131 fn get_metadata(&self) -> &RequestMetadata {
132 &self.request_metadata
133 }
134
135 fn metadata_mut(&mut self) -> &mut RequestMetadata {
136 &mut self.request_metadata
137 }
138}
139
140impl Finalizable for OpenDalRequest {
141 fn take_finalizers(&mut self) -> EventFinalizers {
142 std::mem::take(&mut self.metadata.finalizers)
143 }
144}
145
146#[derive(Clone)]
148pub struct OpenDalMetadata {
149 pub partition_key: String,
150 pub count: usize,
151 pub byte_size: JsonSize,
152 pub finalizers: EventFinalizers,
153}
154
155pub struct OpenDalRequestBuilder {
158 pub encoder: (Transformer, Encoder<Framer>),
159 pub compression: Compression,
160}
161
162impl RequestBuilder<(String, Vec<Event>)> for OpenDalRequestBuilder {
163 type Metadata = OpenDalMetadata;
164 type Events = Vec<Event>;
165 type Encoder = (Transformer, Encoder<Framer>);
166 type Payload = Bytes;
167 type Request = OpenDalRequest;
168 type Error = std::io::Error;
169
170 fn compression(&self) -> Compression {
171 self.compression
172 }
173
174 fn encoder(&self) -> &Self::Encoder {
175 &self.encoder
176 }
177
178 fn split_input(
179 &self,
180 input: (String, Vec<Event>),
181 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
182 let (partition_key, mut events) = input;
183 let finalizers = events.take_finalizers();
184 let opendal_metadata = OpenDalMetadata {
185 partition_key,
186 count: events.len(),
187 byte_size: events.estimated_json_encoded_size_of(),
188 finalizers,
189 };
190
191 let builder = RequestMetadataBuilder::from_events(&events);
192
193 (opendal_metadata, builder, events)
194 }
195
196 fn build_request(
197 &self,
198 mut metadata: Self::Metadata,
199 request_metadata: RequestMetadata,
200 payload: EncodeResult<Self::Payload>,
201 ) -> Self::Request {
202 let name = uuid::Uuid::new_v4().to_string();
204 let extension = self.compression.extension();
205
206 metadata.partition_key = format!("{}{}.{}", metadata.partition_key, name, extension);
207
208 OpenDalRequest {
209 metadata,
210 payload: payload.into_payload(),
211 request_metadata,
212 }
213 }
214}
215
216#[derive(Debug)]
218pub struct OpenDalResponse {
219 pub events_byte_size: GroupedCountByteSize,
220 pub byte_size: usize,
221}
222
223impl DriverResponse for OpenDalResponse {
224 fn event_status(&self) -> EventStatus {
225 EventStatus::Delivered
226 }
227
228 fn events_sent(&self) -> &GroupedCountByteSize {
229 &self.events_byte_size
230 }
231
232 fn bytes_sent(&self) -> Option<usize> {
233 Some(self.byte_size)
234 }
235}
236
237impl Service<OpenDalRequest> for OpenDalService {
238 type Response = OpenDalResponse;
239 type Error = opendal::Error;
240 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
241
242 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
244 Poll::Ready(Ok(()))
245 }
246
247 fn call(&mut self, request: OpenDalRequest) -> Self::Future {
249 let byte_size = request.payload.len();
250 let op = self.op.clone();
251
252 Box::pin(async move {
253 let result = op
254 .write(&request.metadata.partition_key, request.payload)
255 .in_current_span()
256 .await;
257 result.map(|_| OpenDalResponse {
258 events_byte_size: request
259 .request_metadata
260 .into_events_estimated_json_encoded_byte_size(),
261 byte_size,
262 })
263 })
264 }
265}
266
267#[derive(Debug, Snafu)]
273pub enum OpenDalError {
274 #[snafu(display("Failed to call OpenDal: {}", source))]
275 OpenDal { source: opendal::Error },
276}
277
278impl From<opendal::Error> for OpenDalError {
279 fn from(source: opendal::Error) -> Self {
280 Self::OpenDal { source }
281 }
282}