vector_buffers/topology/channel/
sender.rs1use std::{sync::Arc, time::Instant};
2
3use async_recursion::async_recursion;
4use derivative::Derivative;
5use tokio::sync::Mutex;
6use tracing::Span;
7use vector_common::internal_event::{InternalEventHandle, Registered, register};
8
9use super::limited_queue::LimitedSender;
10use crate::{
11 BufferInstrumentation, Bufferable, WhenFull,
12 buffer_usage_data::BufferUsageHandle,
13 internal_events::BufferSendDuration,
14 variants::disk_v2::{self, ProductionFilesystem},
15};
16
17#[derive(Clone, Debug)]
19pub enum SenderAdapter<T: Bufferable> {
20 InMemory(LimitedSender<T>),
22
23 DiskV2(Arc<Mutex<disk_v2::BufferWriter<T, ProductionFilesystem>>>),
25}
26
27impl<T: Bufferable> From<LimitedSender<T>> for SenderAdapter<T> {
28 fn from(v: LimitedSender<T>) -> Self {
29 Self::InMemory(v)
30 }
31}
32
33impl<T: Bufferable> From<disk_v2::BufferWriter<T, ProductionFilesystem>> for SenderAdapter<T> {
34 fn from(v: disk_v2::BufferWriter<T, ProductionFilesystem>) -> Self {
35 Self::DiskV2(Arc::new(Mutex::new(v)))
36 }
37}
38
39impl<T> SenderAdapter<T>
40where
41 T: Bufferable,
42{
43 pub(crate) async fn send(&mut self, item: T) -> crate::Result<()> {
44 match self {
45 Self::InMemory(tx) => tx.send(item).await.map_err(Into::into),
46 Self::DiskV2(writer) => {
47 let mut writer = writer.lock().await;
48
49 writer.write_record(item).await.map(|_| ()).map_err(|e| {
50 error!("Disk buffer writer has encountered an unrecoverable error.");
56
57 e.into()
58 })
59 }
60 }
61 }
62
63 pub(crate) async fn try_send(&mut self, item: T) -> crate::Result<Option<T>> {
64 match self {
65 Self::InMemory(tx) => tx
66 .try_send(item)
67 .map(|()| None)
68 .or_else(|e| Ok(Some(e.into_inner()))),
69 Self::DiskV2(writer) => {
70 let mut writer = writer.lock().await;
71
72 writer.try_write_record(item).await.map_err(|e| {
73 error!("Disk buffer writer has encountered an unrecoverable error.");
79
80 e.into()
81 })
82 }
83 }
84 }
85
86 pub(crate) async fn flush(&mut self) -> crate::Result<()> {
87 match self {
88 Self::InMemory(_) => Ok(()),
89 Self::DiskV2(writer) => {
90 let mut writer = writer.lock().await;
91 writer.flush().await.map_err(|e| {
92 error!("Disk buffer writer has encountered an unrecoverable error.");
94
95 e.into()
96 })
97 }
98 }
99 }
100
101 pub fn capacity(&self) -> Option<usize> {
102 match self {
103 Self::InMemory(tx) => Some(tx.available_capacity()),
104 Self::DiskV2(_) => None,
105 }
106 }
107}
108
109#[derive(Clone, Derivative)]
132#[derivative(Debug)]
133pub struct BufferSender<T: Bufferable> {
134 base: SenderAdapter<T>,
135 overflow: Option<Box<BufferSender<T>>>,
136 when_full: WhenFull,
137 usage_instrumentation: Option<BufferUsageHandle>,
138 #[derivative(Debug = "ignore")]
139 send_duration: Option<Registered<BufferSendDuration>>,
140 #[derivative(Debug = "ignore")]
141 custom_instrumentation: Option<Arc<dyn BufferInstrumentation<T>>>,
142}
143
144impl<T: Bufferable> BufferSender<T> {
145 pub fn new(base: SenderAdapter<T>, when_full: WhenFull) -> Self {
147 Self {
148 base,
149 overflow: None,
150 when_full,
151 usage_instrumentation: None,
152 send_duration: None,
153 custom_instrumentation: None,
154 }
155 }
156
157 pub fn with_overflow(base: SenderAdapter<T>, overflow: BufferSender<T>) -> Self {
159 Self {
160 base,
161 overflow: Some(Box::new(overflow)),
162 when_full: WhenFull::Overflow,
163 usage_instrumentation: None,
164 send_duration: None,
165 custom_instrumentation: None,
166 }
167 }
168
169 #[cfg(test)]
174 pub fn switch_to_overflow(&mut self, overflow: BufferSender<T>) {
175 self.overflow = Some(Box::new(overflow));
176 self.when_full = WhenFull::Overflow;
177 }
178
179 pub fn with_usage_instrumentation(&mut self, handle: BufferUsageHandle) {
181 self.usage_instrumentation = Some(handle);
182 }
183
184 pub fn with_send_duration_instrumentation(&mut self, stage: usize, span: &Span) {
186 let _enter = span.enter();
187 self.send_duration = Some(register(BufferSendDuration { stage }));
188 }
189
190 pub fn with_custom_instrumentation(&mut self, instrumentation: impl BufferInstrumentation<T>) {
192 self.custom_instrumentation = Some(Arc::new(instrumentation));
193 }
194}
195
196impl<T: Bufferable> BufferSender<T> {
197 #[cfg(test)]
198 pub(crate) fn get_base_ref(&self) -> &SenderAdapter<T> {
199 &self.base
200 }
201
202 #[cfg(test)]
203 pub(crate) fn get_overflow_ref(&self) -> Option<&BufferSender<T>> {
204 self.overflow.as_ref().map(AsRef::as_ref)
205 }
206
207 #[async_recursion]
208 pub async fn send(
209 &mut self,
210 mut item: T,
211 send_reference: Option<Instant>,
212 ) -> crate::Result<()> {
213 if let Some(instrumentation) = self.custom_instrumentation.as_ref() {
214 instrumentation.on_send(&mut item);
215 }
216 let item_sizing = self
217 .usage_instrumentation
218 .as_ref()
219 .map(|_| (item.event_count(), item.size_of()));
220
221 let mut was_dropped = false;
222
223 if let Some(instrumentation) = self.usage_instrumentation.as_ref()
224 && let Some((item_count, item_size)) = item_sizing
225 {
226 instrumentation
227 .increment_received_event_count_and_byte_size(item_count as u64, item_size as u64);
228 }
229 match self.when_full {
230 WhenFull::Block => self.base.send(item).await?,
231 WhenFull::DropNewest => {
232 if self.base.try_send(item).await?.is_some() {
233 was_dropped = true;
234 }
235 }
236 WhenFull::Overflow => {
237 if let Some(item) = self.base.try_send(item).await? {
238 was_dropped = true;
239 self.overflow
240 .as_mut()
241 .unwrap_or_else(|| unreachable!("overflow must exist"))
242 .send(item, send_reference)
243 .await?;
244 }
245 }
246 }
247
248 if let Some(instrumentation) = self.usage_instrumentation.as_ref()
249 && let Some((item_count, item_size)) = item_sizing
250 && was_dropped
251 {
252 instrumentation.increment_dropped_event_count_and_byte_size(
253 item_count as u64,
254 item_size as u64,
255 true,
256 );
257 }
258 if let Some(send_duration) = self.send_duration.as_ref()
259 && let Some(send_reference) = send_reference
260 {
261 send_duration.emit(send_reference.elapsed());
262 }
263
264 Ok(())
265 }
266
267 #[async_recursion]
268 pub async fn flush(&mut self) -> crate::Result<()> {
269 self.base.flush().await?;
270 if let Some(overflow) = self.overflow.as_mut() {
271 overflow.flush().await?;
272 }
273
274 Ok(())
275 }
276}