1use std::{error::Error, num::NonZeroUsize};
2
3use async_trait::async_trait;
4use snafu::{ResultExt, Snafu};
5use tracing::Span;
6
7use super::channel::{ReceiverAdapter, SenderAdapter};
8use crate::{
9 buffer_usage_data::{BufferUsage, BufferUsageHandle},
10 topology::channel::{BufferReceiver, BufferSender},
11 variants::MemoryBuffer,
12 Bufferable, WhenFull,
13};
14
15#[async_trait]
17pub trait IntoBuffer<T: Bufferable>: Send {
18 fn provides_instrumentation(&self) -> bool {
28 false
29 }
30
31 async fn into_buffer_parts(
33 self: Box<Self>,
34 usage_handle: BufferUsageHandle,
35 ) -> Result<(SenderAdapter<T>, ReceiverAdapter<T>), Box<dyn Error + Send + Sync>>;
36}
37
38#[derive(Debug, Snafu)]
39pub enum TopologyError {
40 #[snafu(display("buffer topology cannot be empty"))]
41 EmptyTopology,
42 #[snafu(display(
43 "stage {} configured with block/drop newest behavior in front of subsequent stage",
44 stage_idx
45 ))]
46 NextStageNotUsed { stage_idx: usize },
47 #[snafu(display("last stage in buffer topology cannot be set to overflow mode"))]
48 OverflowWhenLast,
49 #[snafu(display("failed to build individual stage {}: {}", stage_idx, source))]
50 FailedToBuildStage {
51 stage_idx: usize,
52 source: Box<dyn Error + Send + Sync>,
53 },
54 #[snafu(display(
55 "multiple components with segmented acknowledgements cannot be used in the same buffer"
56 ))]
57 StackedAcks,
58}
59
60struct TopologyStage<T: Bufferable> {
61 untransformed: Box<dyn IntoBuffer<T>>,
62 when_full: WhenFull,
63}
64
65pub struct TopologyBuilder<T: Bufferable> {
67 stages: Vec<TopologyStage<T>>,
68}
69
70impl<T: Bufferable> TopologyBuilder<T> {
71 pub fn stage<S>(&mut self, stage: S, when_full: WhenFull) -> &mut Self
91 where
92 S: IntoBuffer<T> + 'static,
93 {
94 self.stages.push(TopologyStage {
95 untransformed: Box::new(stage),
96 when_full,
97 });
98 self
99 }
100
101 pub async fn build(
108 self,
109 buffer_id: String,
110 span: Span,
111 ) -> Result<(BufferSender<T>, BufferReceiver<T>), TopologyError> {
112 let mut buffer_usage = BufferUsage::from_span(span.clone());
114 let mut current_stage = None;
115
116 for (stage_idx, stage) in self.stages.into_iter().enumerate().rev() {
117 match stage.when_full {
119 WhenFull::Overflow => {
121 if current_stage.is_none() {
122 return Err(TopologyError::OverflowWhenLast);
123 }
124 }
125 WhenFull::Block | WhenFull::DropNewest => {
128 if current_stage.is_some() {
129 return Err(TopologyError::NextStageNotUsed { stage_idx });
130 }
131 }
132 }
133
134 let usage_handle = buffer_usage.add_stage(stage_idx);
139 let provides_instrumentation = stage.untransformed.provides_instrumentation();
140 let (sender, receiver) = stage
141 .untransformed
142 .into_buffer_parts(usage_handle.clone())
143 .await
144 .context(FailedToBuildStageSnafu { stage_idx })?;
145
146 let (mut sender, mut receiver) = match current_stage.take() {
147 None => (
148 BufferSender::new(sender, stage.when_full),
149 BufferReceiver::new(receiver),
150 ),
151 Some((current_sender, current_receiver)) => (
152 BufferSender::with_overflow(sender, current_sender),
153 BufferReceiver::with_overflow(receiver, current_receiver),
154 ),
155 };
156
157 sender.with_send_duration_instrumentation(stage_idx, &span);
158 if !provides_instrumentation {
159 sender.with_usage_instrumentation(usage_handle.clone());
160 receiver.with_usage_instrumentation(usage_handle);
161 }
162
163 current_stage = Some((sender, receiver));
164 }
165
166 let (sender, receiver) = current_stage.ok_or(TopologyError::EmptyTopology)?;
167
168 buffer_usage.install(buffer_id.as_str());
172
173 Ok((sender, receiver))
174 }
175}
176
177impl<T: Bufferable> TopologyBuilder<T> {
178 #[allow(clippy::print_stderr)]
189 pub async fn standalone_memory(
190 max_events: NonZeroUsize,
191 when_full: WhenFull,
192 receiver_span: &Span,
193 ) -> (BufferSender<T>, BufferReceiver<T>) {
194 let usage_handle = BufferUsageHandle::noop();
195
196 let memory_buffer = Box::new(MemoryBuffer::with_max_events(max_events));
197 let (sender, receiver) = memory_buffer
198 .into_buffer_parts(usage_handle.clone())
199 .await
200 .unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));
201
202 let mode = match when_full {
203 WhenFull::Overflow => WhenFull::Block,
204 m => m,
205 };
206 let mut sender = BufferSender::new(sender, mode);
207 sender.with_send_duration_instrumentation(0, receiver_span);
208 let receiver = BufferReceiver::new(receiver);
209
210 (sender, receiver)
211 }
212
213 #[cfg(test)]
227 pub async fn standalone_memory_test(
228 max_events: NonZeroUsize,
229 when_full: WhenFull,
230 usage_handle: BufferUsageHandle,
231 ) -> (BufferSender<T>, BufferReceiver<T>) {
232 let memory_buffer = Box::new(MemoryBuffer::with_max_events(max_events));
233 let (sender, receiver) = memory_buffer
234 .into_buffer_parts(usage_handle.clone())
235 .await
236 .unwrap_or_else(|_| unreachable!("should not fail to directly create a memory buffer"));
237
238 let mode = match when_full {
239 WhenFull::Overflow => WhenFull::Block,
240 m => m,
241 };
242 let mut sender = BufferSender::new(sender, mode);
243 let mut receiver = BufferReceiver::new(receiver);
244
245 sender.with_usage_instrumentation(usage_handle.clone());
246 receiver.with_usage_instrumentation(usage_handle);
247
248 (sender, receiver)
249 }
250}
251
252impl<T: Bufferable> Default for TopologyBuilder<T> {
253 fn default() -> Self {
254 Self { stages: Vec::new() }
255 }
256}
257
258#[cfg(test)]
259mod tests {
260 use std::num::NonZeroUsize;
261
262 use tracing::Span;
263
264 use super::TopologyBuilder;
265 use crate::{
266 topology::{
267 builder::TopologyError,
268 test_util::{assert_current_send_capacity, Sample},
269 },
270 variants::MemoryBuffer,
271 WhenFull,
272 };
273
274 #[tokio::test]
275 async fn single_stage_topology_block() {
276 let mut builder = TopologyBuilder::<Sample>::default();
277 builder.stage(
278 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
279 WhenFull::Block,
280 );
281 let result = builder.build(String::from("test"), Span::none()).await;
282 assert!(result.is_ok());
283
284 let (mut sender, _) = result.unwrap();
285 assert_current_send_capacity(&mut sender, Some(1), None);
286 }
287
288 #[tokio::test]
289 async fn single_stage_topology_drop_newest() {
290 let mut builder = TopologyBuilder::<Sample>::default();
291 builder.stage(
292 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
293 WhenFull::DropNewest,
294 );
295 let result = builder.build(String::from("test"), Span::none()).await;
296 assert!(result.is_ok());
297
298 let (mut sender, _) = result.unwrap();
299 assert_current_send_capacity(&mut sender, Some(1), None);
300 }
301
302 #[tokio::test]
303 async fn single_stage_topology_overflow() {
304 let mut builder = TopologyBuilder::<Sample>::default();
305 builder.stage(
306 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
307 WhenFull::Overflow,
308 );
309 let result = builder.build(String::from("test"), Span::none()).await;
310 match result {
311 Err(TopologyError::OverflowWhenLast) => {}
312 r => panic!("unexpected build result: {r:?}"),
313 }
314 }
315
316 #[tokio::test]
317 async fn two_stage_topology_block() {
318 let mut builder = TopologyBuilder::<Sample>::default();
319 builder.stage(
320 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
321 WhenFull::Block,
322 );
323 builder.stage(
324 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
325 WhenFull::Block,
326 );
327 let result = builder.build(String::from("test"), Span::none()).await;
328 match result {
329 Err(TopologyError::NextStageNotUsed { stage_idx }) => assert_eq!(stage_idx, 0),
330 r => panic!("unexpected build result: {r:?}"),
331 }
332 }
333
334 #[tokio::test]
335 async fn two_stage_topology_drop_newest() {
336 let mut builder = TopologyBuilder::<Sample>::default();
337 builder.stage(
338 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
339 WhenFull::DropNewest,
340 );
341 builder.stage(
342 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
343 WhenFull::Block,
344 );
345 let result = builder.build(String::from("test"), Span::none()).await;
346 match result {
347 Err(TopologyError::NextStageNotUsed { stage_idx }) => assert_eq!(stage_idx, 0),
348 r => panic!("unexpected build result: {r:?}"),
349 }
350 }
351
352 #[tokio::test]
353 async fn two_stage_topology_overflow() {
354 let mut builder = TopologyBuilder::<Sample>::default();
355 builder.stage(
356 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
357 WhenFull::Overflow,
358 );
359 builder.stage(
360 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
361 WhenFull::Block,
362 );
363
364 let result = builder.build(String::from("test"), Span::none()).await;
365 assert!(result.is_ok());
366
367 let (mut sender, _) = result.unwrap();
368 assert_current_send_capacity(&mut sender, Some(1), Some(1));
369 }
370}