vector/sinks/util/
socket_bytes_sink.rs1use std::{
2 io::Error as IoError,
3 marker::Unpin,
4 pin::Pin,
5 task::{ready, Context, Poll},
6};
7
8use bytes::Bytes;
9use futures::Sink;
10use pin_project::{pin_project, pinned_drop};
11use tokio::io::AsyncWrite;
12use tokio_util::codec::{BytesCodec, FramedWrite};
13use vector_lib::{
14 finalization::{EventFinalizers, EventStatus},
15 json_size::JsonSize,
16};
17
18use super::EncodedEvent;
19use crate::internal_events::{SocketBytesSent, SocketEventsSent, SocketMode};
20
21const MAX_PENDING_ITEMS: usize = 1_000;
22
23pub enum ShutdownCheck {
24 Error(IoError),
25 Close(&'static str),
26 Alive,
27}
28
29#[pin_project(PinnedDrop)]
37pub struct BytesSink<T>
38where
39 T: AsyncWrite + Unpin,
40{
41 #[pin]
42 inner: FramedWrite<T, BytesCodec>,
43 shutdown_check: Box<dyn Fn(&mut T) -> ShutdownCheck + Send>,
44 state: State,
45}
46
47impl<T> BytesSink<T>
48where
49 T: AsyncWrite + Unpin,
50{
51 pub(crate) fn new(
52 inner: T,
53 shutdown_check: impl Fn(&mut T) -> ShutdownCheck + Send + 'static,
54 socket_mode: SocketMode,
55 ) -> Self {
56 Self {
57 inner: FramedWrite::new(inner, BytesCodec::new()),
58 shutdown_check: Box::new(shutdown_check),
59 state: State {
60 events_total: 0,
61 event_bytes: JsonSize::zero(),
62 bytes_total: 0,
63 socket_mode,
64 finalizers: Vec::new(),
65 },
66 }
67 }
68}
69
70struct State {
71 socket_mode: SocketMode,
72 events_total: usize,
73 event_bytes: JsonSize,
74 bytes_total: usize,
75 finalizers: Vec<EventFinalizers>,
76}
77
78impl State {
79 fn ack(&mut self, status: EventStatus) {
80 if self.events_total > 0 {
81 for finalizer in std::mem::take(&mut self.finalizers) {
82 finalizer.update_status(status);
83 }
84
85 if status == EventStatus::Delivered {
86 emit!(SocketEventsSent {
87 mode: self.socket_mode,
88 count: self.events_total as u64,
89 byte_size: self.event_bytes,
90 });
91 emit!(SocketBytesSent {
92 mode: self.socket_mode,
93 byte_size: self.bytes_total,
94 });
95 }
96
97 self.events_total = 0;
98 self.event_bytes = JsonSize::zero();
99 self.bytes_total = 0;
100 }
101 }
102}
103
104#[pinned_drop]
105impl<T> PinnedDrop for BytesSink<T>
106where
107 T: AsyncWrite + Unpin,
108{
109 fn drop(self: Pin<&mut Self>) {
110 self.get_mut().state.ack(EventStatus::Dropped)
111 }
112}
113
114impl<T> Sink<EncodedEvent<Bytes>> for BytesSink<T>
115where
116 T: AsyncWrite + Unpin,
117{
118 type Error = <FramedWrite<T, BytesCodec> as Sink<Bytes>>::Error;
119
120 fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
121 if self.as_mut().project().state.events_total >= MAX_PENDING_ITEMS {
122 if let Err(error) = ready!(self.as_mut().poll_flush(cx)) {
123 return Poll::Ready(Err(error));
124 }
125 }
126
127 let inner = self.project().inner;
128 <FramedWrite<T, BytesCodec> as Sink<Bytes>>::poll_ready(inner, cx)
129 }
130
131 fn start_send(self: Pin<&mut Self>, item: EncodedEvent<Bytes>) -> Result<(), Self::Error> {
132 let pinned = self.project();
133 pinned.state.finalizers.push(item.finalizers);
134 pinned.state.events_total += 1;
135 pinned.state.event_bytes += item.json_byte_size;
136 pinned.state.bytes_total += item.item.len();
137
138 let result = pinned.inner.start_send(item.item);
139 if result.is_err() {
140 pinned.state.ack(EventStatus::Errored);
141 }
142 result
143 }
144
145 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
146 let pinned = self.as_mut().project();
147 match (pinned.shutdown_check)(pinned.inner.get_mut().get_mut()) {
148 ShutdownCheck::Error(error) => return Poll::Ready(Err(error)),
149 ShutdownCheck::Close(reason) => {
150 if let Err(error) = ready!(self.as_mut().poll_close(cx)) {
151 return Poll::Ready(Err(error));
152 }
153
154 return Poll::Ready(Err(IoError::other(reason)));
155 }
156 ShutdownCheck::Alive => {}
157 }
158
159 let inner = self.as_mut().project().inner;
160 let result = ready!(<FramedWrite<T, BytesCodec> as Sink<Bytes>>::poll_flush(
161 inner, cx
162 ));
163 self.as_mut().get_mut().state.ack(match result {
164 Ok(_) => EventStatus::Delivered,
165 Err(_) => EventStatus::Errored,
166 });
167 Poll::Ready(result)
168 }
169
170 fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
171 let inner = self.as_mut().project().inner;
172 let result = ready!(<FramedWrite<T, BytesCodec> as Sink<Bytes>>::poll_close(
173 inner, cx
174 ));
175 self.as_mut().get_mut().state.ack(EventStatus::Dropped);
176 Poll::Ready(result)
177 }
178}