vector_buffers/topology/channel/
sender.rs

1use std::{sync::Arc, time::Instant};
2
3use async_recursion::async_recursion;
4use derivative::Derivative;
5use tokio::sync::Mutex;
6use tracing::Span;
7use vector_common::internal_event::{InternalEventHandle, Registered, register};
8
9use super::limited_queue::LimitedSender;
10use crate::{
11    BufferInstrumentation, Bufferable, WhenFull,
12    buffer_usage_data::BufferUsageHandle,
13    internal_events::BufferSendDuration,
14    variants::disk_v2::{self, ProductionFilesystem},
15};
16
17/// Adapter for papering over various sender backends.
18#[derive(Clone, Debug)]
19pub enum SenderAdapter<T: Bufferable> {
20    /// The in-memory channel buffer.
21    InMemory(LimitedSender<T>),
22
23    /// The disk v2 buffer.
24    DiskV2(Arc<Mutex<disk_v2::BufferWriter<T, ProductionFilesystem>>>),
25}
26
27impl<T: Bufferable> From<LimitedSender<T>> for SenderAdapter<T> {
28    fn from(v: LimitedSender<T>) -> Self {
29        Self::InMemory(v)
30    }
31}
32
33impl<T: Bufferable> From<disk_v2::BufferWriter<T, ProductionFilesystem>> for SenderAdapter<T> {
34    fn from(v: disk_v2::BufferWriter<T, ProductionFilesystem>) -> Self {
35        Self::DiskV2(Arc::new(Mutex::new(v)))
36    }
37}
38
39impl<T> SenderAdapter<T>
40where
41    T: Bufferable,
42{
43    pub(crate) async fn send(&mut self, item: T) -> crate::Result<()> {
44        match self {
45            Self::InMemory(tx) => tx.send(item).await.map_err(Into::into),
46            Self::DiskV2(writer) => {
47                let mut writer = writer.lock().await;
48
49                writer.write_record(item).await.map(|_| ()).map_err(|e| {
50                    // TODO: Could some errors be handled and not be unrecoverable? Right now,
51                    // encoding should theoretically be recoverable -- encoded value was too big, or
52                    // error during encoding -- but the traits don't allow for recovering the
53                    // original event value because we have to consume it to do the encoding... but
54                    // that might not always be the case.
55                    error!("Disk buffer writer has encountered an unrecoverable error.");
56
57                    e.into()
58                })
59            }
60        }
61    }
62
63    pub(crate) async fn try_send(&mut self, item: T) -> crate::Result<Option<T>> {
64        match self {
65            Self::InMemory(tx) => tx
66                .try_send(item)
67                .map(|()| None)
68                .or_else(|e| Ok(Some(e.into_inner()))),
69            Self::DiskV2(writer) => {
70                let mut writer = writer.lock().await;
71
72                writer.try_write_record(item).await.map_err(|e| {
73                    // TODO: Could some errors be handled and not be unrecoverable? Right now,
74                    // encoding should theoretically be recoverable -- encoded value was too big, or
75                    // error during encoding -- but the traits don't allow for recovering the
76                    // original event value because we have to consume it to do the encoding... but
77                    // that might not always be the case.
78                    error!("Disk buffer writer has encountered an unrecoverable error.");
79
80                    e.into()
81                })
82            }
83        }
84    }
85
86    pub(crate) async fn flush(&mut self) -> crate::Result<()> {
87        match self {
88            Self::InMemory(_) => Ok(()),
89            Self::DiskV2(writer) => {
90                let mut writer = writer.lock().await;
91                writer.flush().await.map_err(|e| {
92                    // Errors on the I/O path, which is all that flushing touches, are never recoverable.
93                    error!("Disk buffer writer has encountered an unrecoverable error.");
94
95                    e.into()
96                })
97            }
98        }
99    }
100
101    pub fn capacity(&self) -> Option<usize> {
102        match self {
103            Self::InMemory(tx) => Some(tx.available_capacity()),
104            Self::DiskV2(_) => None,
105        }
106    }
107}
108
109/// A buffer sender.
110///
111/// The sender handles sending events into the buffer, as well as the behavior around handling
112/// events when the internal channel is full.
113///
114/// When creating a buffer sender/receiver pair, callers can specify the "when full" behavior of the
115/// sender.  This controls how events are handled when the internal channel is full.  Three modes
116/// are possible:
117/// - block
118/// - drop newest
119/// - overflow
120///
121/// In "block" mode, callers are simply forced to wait until the channel has enough capacity to
122/// accept the event.  In "drop newest" mode, any event being sent when the channel is full will be
123/// dropped and proceed no further. In "overflow" mode, events will be sent to another buffer
124/// sender.  Callers can specify the overflow sender to use when constructing their buffers initially.
125///
126/// TODO: We should eventually rework `BufferSender`/`BufferReceiver` so that they contain a vector
127/// of the fields we already have here, but instead of cascading via calling into `overflow`, we'd
128/// linearize the nesting instead, so that `BufferSender` would only ever be calling the underlying
129/// `SenderAdapter` instances instead... which would let us get rid of the boxing and
130/// `#[async_recursion]` stuff.
131#[derive(Clone, Derivative)]
132#[derivative(Debug)]
133pub struct BufferSender<T: Bufferable> {
134    base: SenderAdapter<T>,
135    overflow: Option<Box<BufferSender<T>>>,
136    when_full: WhenFull,
137    usage_instrumentation: Option<BufferUsageHandle>,
138    #[derivative(Debug = "ignore")]
139    send_duration: Option<Registered<BufferSendDuration>>,
140    #[derivative(Debug = "ignore")]
141    custom_instrumentation: Option<Arc<dyn BufferInstrumentation<T>>>,
142}
143
144impl<T: Bufferable> BufferSender<T> {
145    /// Creates a new [`BufferSender`] wrapping the given channel sender.
146    pub fn new(base: SenderAdapter<T>, when_full: WhenFull) -> Self {
147        Self {
148            base,
149            overflow: None,
150            when_full,
151            usage_instrumentation: None,
152            send_duration: None,
153            custom_instrumentation: None,
154        }
155    }
156
157    /// Creates a new [`BufferSender`] wrapping the given channel sender and overflow sender.
158    pub fn with_overflow(base: SenderAdapter<T>, overflow: BufferSender<T>) -> Self {
159        Self {
160            base,
161            overflow: Some(Box::new(overflow)),
162            when_full: WhenFull::Overflow,
163            usage_instrumentation: None,
164            send_duration: None,
165            custom_instrumentation: None,
166        }
167    }
168
169    /// Converts this sender into an overflowing sender using the given `BufferSender<T>`.
170    ///
171    /// Note: this resets the internal state of this sender, and so this should not be called except
172    /// when initially constructing `BufferSender<T>`.
173    #[cfg(test)]
174    pub fn switch_to_overflow(&mut self, overflow: BufferSender<T>) {
175        self.overflow = Some(Box::new(overflow));
176        self.when_full = WhenFull::Overflow;
177    }
178
179    /// Configures this sender to instrument the items passing through it.
180    pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) {
181        self.usage_instrumentation = Some(handle);
182    }
183
184    /// Configures this sender to instrument the send duration.
185    pub fn with_send_duration_instrumentation(&mut self, stage: usize, span: &Span) {
186        let _enter = span.enter();
187        self.send_duration = Some(register(BufferSendDuration { stage }));
188    }
189
190    /// Configures this sender to invoke a custom instrumentation hook.
191    pub fn with_custom_instrumentation(&mut self, instrumentation: impl BufferInstrumentation<T>) {
192        self.custom_instrumentation = Some(Arc::new(instrumentation));
193    }
194}
195
196impl<T: Bufferable> BufferSender<T> {
197    #[cfg(test)]
198    pub(crate) fn get_base_ref(&self) -> &SenderAdapter<T> {
199        &self.base
200    }
201
202    #[cfg(test)]
203    pub(crate) fn get_overflow_ref(&self) -> Option<&BufferSender<T>> {
204        self.overflow.as_ref().map(AsRef::as_ref)
205    }
206
207    #[async_recursion]
208    pub async fn send(
209        &mut self,
210        mut item: T,
211        send_reference: Option<Instant>,
212    ) -> crate::Result<()> {
213        if let Some(instrumentation) = self.custom_instrumentation.as_ref() {
214            instrumentation.on_send(&mut item);
215        }
216        let item_sizing = self
217            .usage_instrumentation
218            .as_ref()
219            .map(|_| (item.event_count(), item.size_of()));
220
221        let mut was_dropped = false;
222
223        if let Some(instrumentation) = self.usage_instrumentation.as_ref()
224            && let Some((item_count, item_size)) = item_sizing
225        {
226            instrumentation
227                .increment_received_event_count_and_byte_size(item_count as u64, item_size as u64);
228        }
229        match self.when_full {
230            WhenFull::Block => self.base.send(item).await?,
231            WhenFull::DropNewest => {
232                if self.base.try_send(item).await?.is_some() {
233                    was_dropped = true;
234                }
235            }
236            WhenFull::Overflow => {
237                if let Some(item) = self.base.try_send(item).await? {
238                    was_dropped = true;
239                    self.overflow
240                        .as_mut()
241                        .unwrap_or_else(|| unreachable!("overflow must exist"))
242                        .send(item, send_reference)
243                        .await?;
244                }
245            }
246        }
247
248        if let Some(instrumentation) = self.usage_instrumentation.as_ref()
249            && let Some((item_count, item_size)) = item_sizing
250            && was_dropped
251        {
252            instrumentation.increment_dropped_event_count_and_byte_size(
253                item_count as u64,
254                item_size as u64,
255                true,
256            );
257        }
258        if let Some(send_duration) = self.send_duration.as_ref()
259            && let Some(send_reference) = send_reference
260        {
261            send_duration.emit(send_reference.elapsed());
262        }
263
264        Ok(())
265    }
266
267    #[async_recursion]
268    pub async fn flush(&mut self) -> crate::Result<()> {
269        self.base.flush().await?;
270        if let Some(overflow) = self.overflow.as_mut() {
271            overflow.flush().await?;
272        }
273
274        Ok(())
275    }
276}