vector/sinks/
opendal_common.rs

1//! opendal_common provide real sink supports for all opendal based services.
2//!
3//! # TODO
4//!
5//! opendal service now only support very basic sink features. To make it
6//! useful, we need to add the following features:
7//!
8//! - Error handling
9//! - Limitation
10
11use std::{fmt, task::Poll};
12
13use bytes::Bytes;
14use opendal::Operator;
15use snafu::Snafu;
16use tracing::Instrument;
17use vector_lib::codecs::encoding::Framer;
18
19use crate::sinks::{prelude::*, util::partitioner::KeyPartitioner};
20
21/// OpenDalSink provides generic a service upon OpenDAL.
22///
23/// # Notes
24///
25/// OpenDAL based service only need to provide a `<Service>Config`, and
26/// implement `build_processor` like `WebHdfs` does.
27pub struct OpenDalSink<Svc> {
28    service: Svc,
29    request_builder: OpenDalRequestBuilder,
30    partitioner: KeyPartitioner,
31    batcher_settings: BatcherSettings,
32}
33
34impl<Svc> OpenDalSink<Svc> {
35    /// Build a new OpenDalSink via given input
36    pub const fn new(
37        service: Svc,
38        request_builder: OpenDalRequestBuilder,
39        partitioner: KeyPartitioner,
40        batcher_settings: BatcherSettings,
41    ) -> Self {
42        Self {
43            service,
44            request_builder,
45            partitioner,
46            batcher_settings,
47        }
48    }
49}
50
51#[async_trait::async_trait]
52impl<Svc> StreamSink<Event> for OpenDalSink<Svc>
53where
54    Svc: Service<OpenDalRequest> + Send + 'static,
55    Svc::Future: Send + 'static,
56    Svc::Response: DriverResponse + Send + 'static,
57    Svc::Error: fmt::Debug + Into<crate::Error> + Send,
58{
59    async fn run(
60        self: Box<Self>,
61        input: futures_util::stream::BoxStream<'_, Event>,
62    ) -> Result<(), ()> {
63        self.run_inner(input).await
64    }
65}
66
67impl<Svc> OpenDalSink<Svc>
68where
69    Svc: Service<OpenDalRequest> + Send + 'static,
70    Svc::Future: Send + 'static,
71    Svc::Response: DriverResponse + Send + 'static,
72    Svc::Error: fmt::Debug + Into<crate::Error> + Send,
73{
74    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
75        let partitioner = self.partitioner;
76        let settings = self.batcher_settings;
77
78        let request_builder = self.request_builder;
79
80        input
81            .batched_partitioned(partitioner, || settings.as_byte_size_config())
82            .filter_map(|(key, batch)| async move {
83                // We don't need to emit an error here if the event is dropped since this will occur if the template
84                // couldn't be rendered during the partitioning. A `TemplateRenderingError` is already emitted when
85                // that occurs.
86                key.map(move |k| (k, batch))
87            })
88            .request_builder(default_request_builder_concurrency_limit(), request_builder)
89            .filter_map(|request| async move {
90                match request {
91                    Err(error) => {
92                        emit!(SinkRequestBuildError { error });
93                        None
94                    }
95                    Ok(req) => Some(req),
96                }
97            })
98            .into_driver(self.service)
99            // TODO: set protocol with services scheme instead hardcoded file
100            .protocol("file")
101            .run()
102            .await
103    }
104}
105
106/// OpenDalService is just a simple wrapper of `opendal::Operator` to
107/// implement traits we needed.
108#[derive(Debug, Clone)]
109pub struct OpenDalService {
110    op: Operator,
111}
112
113impl OpenDalService {
114    pub const fn new(op: Operator) -> OpenDalService {
115        OpenDalService { op }
116    }
117}
118
119/// OpenDalRequest is request will be handled by opendal services.
120///
121/// It will carry all information that opendal needed, like payload and
122/// metadata.
123#[derive(Clone)]
124pub struct OpenDalRequest {
125    pub payload: Bytes,
126    pub metadata: OpenDalMetadata,
127    pub request_metadata: RequestMetadata,
128}
129
130impl MetaDescriptive for OpenDalRequest {
131    fn get_metadata(&self) -> &RequestMetadata {
132        &self.request_metadata
133    }
134
135    fn metadata_mut(&mut self) -> &mut RequestMetadata {
136        &mut self.request_metadata
137    }
138}
139
140impl Finalizable for OpenDalRequest {
141    fn take_finalizers(&mut self) -> EventFinalizers {
142        std::mem::take(&mut self.metadata.finalizers)
143    }
144}
145
146/// OpenDalMetadata carries metadata that opendal service needed to write.
147#[derive(Clone)]
148pub struct OpenDalMetadata {
149    pub partition_key: String,
150    pub count: usize,
151    pub byte_size: JsonSize,
152    pub finalizers: EventFinalizers,
153}
154
155/// OpenDalRequestBuilder will collect and encode input events to build a
156/// valid [`OpenDalRequest`].
157pub struct OpenDalRequestBuilder {
158    pub encoder: (Transformer, Encoder<Framer>),
159    pub compression: Compression,
160}
161
162impl RequestBuilder<(String, Vec<Event>)> for OpenDalRequestBuilder {
163    type Metadata = OpenDalMetadata;
164    type Events = Vec<Event>;
165    type Encoder = (Transformer, Encoder<Framer>);
166    type Payload = Bytes;
167    type Request = OpenDalRequest;
168    type Error = std::io::Error;
169
170    fn compression(&self) -> Compression {
171        self.compression
172    }
173
174    fn encoder(&self) -> &Self::Encoder {
175        &self.encoder
176    }
177
178    fn split_input(
179        &self,
180        input: (String, Vec<Event>),
181    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
182        let (partition_key, mut events) = input;
183        let finalizers = events.take_finalizers();
184        let opendal_metadata = OpenDalMetadata {
185            partition_key,
186            count: events.len(),
187            byte_size: events.estimated_json_encoded_size_of(),
188            finalizers,
189        };
190
191        let builder = RequestMetadataBuilder::from_events(&events);
192
193        (opendal_metadata, builder, events)
194    }
195
196    fn build_request(
197        &self,
198        mut metadata: Self::Metadata,
199        request_metadata: RequestMetadata,
200        payload: EncodeResult<Self::Payload>,
201    ) -> Self::Request {
202        // TODO: we can support time format later.
203        let name = uuid::Uuid::new_v4().to_string();
204        let extension = self.compression.extension();
205
206        metadata.partition_key = format!("{}{}.{}", metadata.partition_key, name, extension);
207
208        OpenDalRequest {
209            metadata,
210            payload: payload.into_payload(),
211            request_metadata,
212        }
213    }
214}
215
216/// OpenDalResponse is the response returned by OpenDAL services.
217#[derive(Debug)]
218pub struct OpenDalResponse {
219    pub events_byte_size: GroupedCountByteSize,
220    pub byte_size: usize,
221}
222
223impl DriverResponse for OpenDalResponse {
224    fn event_status(&self) -> EventStatus {
225        EventStatus::Delivered
226    }
227
228    fn events_sent(&self) -> &GroupedCountByteSize {
229        &self.events_byte_size
230    }
231
232    fn bytes_sent(&self) -> Option<usize> {
233        Some(self.byte_size)
234    }
235}
236
237impl Service<OpenDalRequest> for OpenDalService {
238    type Response = OpenDalResponse;
239    type Error = opendal::Error;
240    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
241
242    // Emission of an internal event in case of errors is handled upstream by the caller.
243    fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
244        Poll::Ready(Ok(()))
245    }
246
247    // Emission of internal events for errors and dropped events is handled upstream by the caller.
248    fn call(&mut self, request: OpenDalRequest) -> Self::Future {
249        let byte_size = request.payload.len();
250        let op = self.op.clone();
251
252        Box::pin(async move {
253            let result = op
254                .write(&request.metadata.partition_key, request.payload)
255                .in_current_span()
256                .await;
257            result.map(|_| OpenDalResponse {
258                events_byte_size: request
259                    .request_metadata
260                    .into_events_estimated_json_encoded_byte_size(),
261                byte_size,
262            })
263        })
264    }
265}
266
267/// OpenDalError is the error returned by opendal services.
268///
269/// # TODO
270///
271/// We need to provide more context about opendal errors.
272#[derive(Debug, Snafu)]
273pub enum OpenDalError {
274    #[snafu(display("Failed to call OpenDal: {}", source))]
275    OpenDal { source: opendal::Error },
276}
277
278impl From<opendal::Error> for OpenDalError {
279    fn from(source: opendal::Error) -> Self {
280        Self::OpenDal { source }
281    }
282}