vector/sinks/util/
socket_bytes_sink.rs

1use std::{
2    io::Error as IoError,
3    marker::Unpin,
4    pin::Pin,
5    task::{ready, Context, Poll},
6};
7
8use bytes::Bytes;
9use futures::Sink;
10use pin_project::{pin_project, pinned_drop};
11use tokio::io::AsyncWrite;
12use tokio_util::codec::{BytesCodec, FramedWrite};
13use vector_lib::{
14    finalization::{EventFinalizers, EventStatus},
15    json_size::JsonSize,
16};
17
18use super::EncodedEvent;
19use crate::internal_events::{SocketBytesSent, SocketEventsSent, SocketMode};
20
21const MAX_PENDING_ITEMS: usize = 1_000;
22
23pub enum ShutdownCheck {
24    Error(IoError),
25    Close(&'static str),
26    Alive,
27}
28
29/// [FramedWrite](https://docs.rs/tokio-util/0.3.1/tokio_util/codec/struct.FramedWrite.html) wrapper.
30/// Wrapper acts like [Sink](https://docs.rs/futures/0.3.7/futures/sink/trait.Sink.html) forwarding all
31/// calls to `FramedWrite`, but in addition:
32/// - Call `shutdown_check` on each `poll_flush`, so we can stop sending data if other side disconnected.
33/// - Flush all data on each `poll_ready` if total number of events in queue more than some limit.
34/// - Count event size on each `start_send`.
35/// - Ack all sent events on successful `poll_flush` and `poll_close` or on `Drop`.
36#[pin_project(PinnedDrop)]
37pub struct BytesSink<T>
38where
39    T: AsyncWrite + Unpin,
40{
41    #[pin]
42    inner: FramedWrite<T, BytesCodec>,
43    shutdown_check: Box<dyn Fn(&mut T) -> ShutdownCheck + Send>,
44    state: State,
45}
46
47impl<T> BytesSink<T>
48where
49    T: AsyncWrite + Unpin,
50{
51    pub(crate) fn new(
52        inner: T,
53        shutdown_check: impl Fn(&mut T) -> ShutdownCheck + Send + 'static,
54        socket_mode: SocketMode,
55    ) -> Self {
56        Self {
57            inner: FramedWrite::new(inner, BytesCodec::new()),
58            shutdown_check: Box::new(shutdown_check),
59            state: State {
60                events_total: 0,
61                event_bytes: JsonSize::zero(),
62                bytes_total: 0,
63                socket_mode,
64                finalizers: Vec::new(),
65            },
66        }
67    }
68}
69
70struct State {
71    socket_mode: SocketMode,
72    events_total: usize,
73    event_bytes: JsonSize,
74    bytes_total: usize,
75    finalizers: Vec<EventFinalizers>,
76}
77
78impl State {
79    fn ack(&mut self, status: EventStatus) {
80        if self.events_total > 0 {
81            for finalizer in std::mem::take(&mut self.finalizers) {
82                finalizer.update_status(status);
83            }
84
85            if status == EventStatus::Delivered {
86                emit!(SocketEventsSent {
87                    mode: self.socket_mode,
88                    count: self.events_total as u64,
89                    byte_size: self.event_bytes,
90                });
91                emit!(SocketBytesSent {
92                    mode: self.socket_mode,
93                    byte_size: self.bytes_total,
94                });
95            }
96
97            self.events_total = 0;
98            self.event_bytes = JsonSize::zero();
99            self.bytes_total = 0;
100        }
101    }
102}
103
104#[pinned_drop]
105impl<T> PinnedDrop for BytesSink<T>
106where
107    T: AsyncWrite + Unpin,
108{
109    fn drop(self: Pin<&mut Self>) {
110        self.get_mut().state.ack(EventStatus::Dropped)
111    }
112}
113
114impl<T> Sink<EncodedEvent<Bytes>> for BytesSink<T>
115where
116    T: AsyncWrite + Unpin,
117{
118    type Error = <FramedWrite<T, BytesCodec> as Sink<Bytes>>::Error;
119
120    fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
121        if self.as_mut().project().state.events_total >= MAX_PENDING_ITEMS {
122            if let Err(error) = ready!(self.as_mut().poll_flush(cx)) {
123                return Poll::Ready(Err(error));
124            }
125        }
126
127        let inner = self.project().inner;
128        <FramedWrite<T, BytesCodec> as Sink<Bytes>>::poll_ready(inner, cx)
129    }
130
131    fn start_send(self: Pin<&mut Self>, item: EncodedEvent<Bytes>) -> Result<(), Self::Error> {
132        let pinned = self.project();
133        pinned.state.finalizers.push(item.finalizers);
134        pinned.state.events_total += 1;
135        pinned.state.event_bytes += item.json_byte_size;
136        pinned.state.bytes_total += item.item.len();
137
138        let result = pinned.inner.start_send(item.item);
139        if result.is_err() {
140            pinned.state.ack(EventStatus::Errored);
141        }
142        result
143    }
144
145    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
146        let pinned = self.as_mut().project();
147        match (pinned.shutdown_check)(pinned.inner.get_mut().get_mut()) {
148            ShutdownCheck::Error(error) => return Poll::Ready(Err(error)),
149            ShutdownCheck::Close(reason) => {
150                if let Err(error) = ready!(self.as_mut().poll_close(cx)) {
151                    return Poll::Ready(Err(error));
152                }
153
154                return Poll::Ready(Err(IoError::other(reason)));
155            }
156            ShutdownCheck::Alive => {}
157        }
158
159        let inner = self.as_mut().project().inner;
160        let result = ready!(<FramedWrite<T, BytesCodec> as Sink<Bytes>>::poll_flush(
161            inner, cx
162        ));
163        self.as_mut().get_mut().state.ack(match result {
164            Ok(_) => EventStatus::Delivered,
165            Err(_) => EventStatus::Errored,
166        });
167        Poll::Ready(result)
168    }
169
170    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
171        let inner = self.as_mut().project().inner;
172        let result = ready!(<FramedWrite<T, BytesCodec> as Sink<Bytes>>::poll_close(
173            inner, cx
174        ));
175        self.as_mut().get_mut().state.ack(EventStatus::Dropped);
176        Poll::Ready(result)
177    }
178}