1use std::{error::Error, num::NonZeroUsize};
2
3use async_trait::async_trait;
4use snafu::{ResultExt, Snafu};
5use tracing::Span;
6
7use super::channel::{ChannelMetricMetadata, ReceiverAdapter, SenderAdapter};
8use crate::{
9 Bufferable, WhenFull,
10 buffer_usage_data::{BufferUsage, BufferUsageHandle},
11 config::MemoryBufferSize,
12 topology::channel::{BufferReceiver, BufferSender, limited},
13};
14
15#[async_trait]
17pub trait IntoBuffer<T: Bufferable>: Send {
18 fn provides_instrumentation(&self) -> bool {
28 false
29 }
30
31 async fn into_buffer_parts(
33 self: Box<Self>,
34 usage_handle: BufferUsageHandle,
35 ) -> Result<(SenderAdapter<T>, ReceiverAdapter<T>), Box<dyn Error + Send + Sync>>;
36}
37
38#[derive(Debug, Snafu)]
39pub enum TopologyError {
40 #[snafu(display("buffer topology cannot be empty"))]
41 EmptyTopology,
42 #[snafu(display(
43 "stage {} configured with block/drop newest behavior in front of subsequent stage",
44 stage_idx
45 ))]
46 NextStageNotUsed { stage_idx: usize },
47 #[snafu(display("last stage in buffer topology cannot be set to overflow mode"))]
48 OverflowWhenLast,
49 #[snafu(display("failed to build individual stage {}: {}", stage_idx, source))]
50 FailedToBuildStage {
51 stage_idx: usize,
52 source: Box<dyn Error + Send + Sync>,
53 },
54 #[snafu(display(
55 "multiple components with segmented acknowledgements cannot be used in the same buffer"
56 ))]
57 StackedAcks,
58}
59
60struct TopologyStage<T: Bufferable> {
61 untransformed: Box<dyn IntoBuffer<T>>,
62 when_full: WhenFull,
63}
64
65pub struct TopologyBuilder<T: Bufferable> {
67 stages: Vec<TopologyStage<T>>,
68}
69
70impl<T: Bufferable> TopologyBuilder<T> {
71 pub fn stage<S>(&mut self, stage: S, when_full: WhenFull) -> &mut Self
91 where
92 S: IntoBuffer<T> + 'static,
93 {
94 self.stages.push(TopologyStage {
95 untransformed: Box::new(stage),
96 when_full,
97 });
98 self
99 }
100
101 pub async fn build(
108 self,
109 buffer_id: String,
110 span: Span,
111 ) -> Result<(BufferSender<T>, BufferReceiver<T>), TopologyError> {
112 let mut buffer_usage = BufferUsage::from_span(span.clone());
114 let mut current_stage = None;
115
116 for (stage_idx, stage) in self.stages.into_iter().enumerate().rev() {
117 match stage.when_full {
119 WhenFull::Overflow => {
121 if current_stage.is_none() {
122 return Err(TopologyError::OverflowWhenLast);
123 }
124 }
125 WhenFull::Block | WhenFull::DropNewest => {
128 if current_stage.is_some() {
129 return Err(TopologyError::NextStageNotUsed { stage_idx });
130 }
131 }
132 }
133
134 let usage_handle = buffer_usage.add_stage(stage_idx);
139 let provides_instrumentation = stage.untransformed.provides_instrumentation();
140 let (sender, receiver) = stage
141 .untransformed
142 .into_buffer_parts(usage_handle.clone())
143 .await
144 .context(FailedToBuildStageSnafu { stage_idx })?;
145
146 let (mut sender, mut receiver) = match current_stage.take() {
147 None => (
148 BufferSender::new(sender, stage.when_full),
149 BufferReceiver::new(receiver),
150 ),
151 Some((current_sender, current_receiver)) => (
152 BufferSender::with_overflow(sender, current_sender),
153 BufferReceiver::with_overflow(receiver, current_receiver),
154 ),
155 };
156
157 sender.with_send_duration_instrumentation(stage_idx, &span);
158 if !provides_instrumentation {
159 sender.with_usage_instrumentation(usage_handle.clone());
160 receiver.with_usage_instrumentation(usage_handle);
161 }
162
163 current_stage = Some((sender, receiver));
164 }
165
166 let (sender, receiver) = current_stage.ok_or(TopologyError::EmptyTopology)?;
167
168 buffer_usage.install(buffer_id.as_str());
172
173 Ok((sender, receiver))
174 }
175}
176
177impl<T: Bufferable> TopologyBuilder<T> {
178 #[allow(clippy::print_stderr)]
189 pub fn standalone_memory(
190 max_events: NonZeroUsize,
191 when_full: WhenFull,
192 receiver_span: &Span,
193 metadata: Option<ChannelMetricMetadata>,
194 ) -> (BufferSender<T>, BufferReceiver<T>) {
195 let usage_handle = BufferUsageHandle::noop();
196 usage_handle.set_buffer_limits(None, Some(max_events.get()));
197
198 let limit = MemoryBufferSize::MaxEvents(max_events);
199 let (sender, receiver) = limited(limit, metadata);
200
201 let mode = match when_full {
202 WhenFull::Overflow => WhenFull::Block,
203 m => m,
204 };
205 let mut sender = BufferSender::new(sender.into(), mode);
206 sender.with_send_duration_instrumentation(0, receiver_span);
207 let receiver = BufferReceiver::new(receiver.into());
208
209 (sender, receiver)
210 }
211
212 #[cfg(test)]
226 pub fn standalone_memory_test(
227 max_events: NonZeroUsize,
228 when_full: WhenFull,
229 usage_handle: BufferUsageHandle,
230 metadata: Option<ChannelMetricMetadata>,
231 ) -> (BufferSender<T>, BufferReceiver<T>) {
232 usage_handle.set_buffer_limits(None, Some(max_events.get()));
233
234 let limit = MemoryBufferSize::MaxEvents(max_events);
235 let (sender, receiver) = limited(limit, metadata);
236
237 let mode = match when_full {
238 WhenFull::Overflow => WhenFull::Block,
239 m => m,
240 };
241 let mut sender = BufferSender::new(sender.into(), mode);
242 let mut receiver = BufferReceiver::new(receiver.into());
243
244 sender.with_usage_instrumentation(usage_handle.clone());
245 receiver.with_usage_instrumentation(usage_handle);
246
247 (sender, receiver)
248 }
249}
250
251impl<T: Bufferable> Default for TopologyBuilder<T> {
252 fn default() -> Self {
253 Self { stages: Vec::new() }
254 }
255}
256
257#[cfg(test)]
258mod tests {
259 use std::num::NonZeroUsize;
260
261 use tracing::Span;
262
263 use super::TopologyBuilder;
264 use crate::{
265 WhenFull,
266 topology::{
267 builder::TopologyError,
268 test_util::{Sample, assert_current_send_capacity},
269 },
270 variants::MemoryBuffer,
271 };
272
273 #[tokio::test]
274 async fn single_stage_topology_block() {
275 let mut builder = TopologyBuilder::<Sample>::default();
276 builder.stage(
277 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
278 WhenFull::Block,
279 );
280 let result = builder.build(String::from("test"), Span::none()).await;
281 assert!(result.is_ok());
282
283 let (mut sender, _) = result.unwrap();
284 assert_current_send_capacity(&mut sender, Some(1), None);
285 }
286
287 #[tokio::test]
288 async fn single_stage_topology_drop_newest() {
289 let mut builder = TopologyBuilder::<Sample>::default();
290 builder.stage(
291 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
292 WhenFull::DropNewest,
293 );
294 let result = builder.build(String::from("test"), Span::none()).await;
295 assert!(result.is_ok());
296
297 let (mut sender, _) = result.unwrap();
298 assert_current_send_capacity(&mut sender, Some(1), None);
299 }
300
301 #[tokio::test]
302 async fn single_stage_topology_overflow() {
303 let mut builder = TopologyBuilder::<Sample>::default();
304 builder.stage(
305 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
306 WhenFull::Overflow,
307 );
308 let result = builder.build(String::from("test"), Span::none()).await;
309 match result {
310 Err(TopologyError::OverflowWhenLast) => {}
311 r => panic!("unexpected build result: {r:?}"),
312 }
313 }
314
315 #[tokio::test]
316 async fn two_stage_topology_block() {
317 let mut builder = TopologyBuilder::<Sample>::default();
318 builder.stage(
319 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
320 WhenFull::Block,
321 );
322 builder.stage(
323 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
324 WhenFull::Block,
325 );
326 let result = builder.build(String::from("test"), Span::none()).await;
327 match result {
328 Err(TopologyError::NextStageNotUsed { stage_idx }) => assert_eq!(stage_idx, 0),
329 r => panic!("unexpected build result: {r:?}"),
330 }
331 }
332
333 #[tokio::test]
334 async fn two_stage_topology_drop_newest() {
335 let mut builder = TopologyBuilder::<Sample>::default();
336 builder.stage(
337 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
338 WhenFull::DropNewest,
339 );
340 builder.stage(
341 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
342 WhenFull::Block,
343 );
344 let result = builder.build(String::from("test"), Span::none()).await;
345 match result {
346 Err(TopologyError::NextStageNotUsed { stage_idx }) => assert_eq!(stage_idx, 0),
347 r => panic!("unexpected build result: {r:?}"),
348 }
349 }
350
351 #[tokio::test]
352 async fn two_stage_topology_overflow() {
353 let mut builder = TopologyBuilder::<Sample>::default();
354 builder.stage(
355 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
356 WhenFull::Overflow,
357 );
358 builder.stage(
359 MemoryBuffer::with_max_events(NonZeroUsize::new(1).unwrap()),
360 WhenFull::Block,
361 );
362
363 let result = builder.build(String::from("test"), Span::none()).await;
364 assert!(result.is_ok());
365
366 let (mut sender, _) = result.unwrap();
367 assert_current_send_capacity(&mut sender, Some(1), Some(1));
368 }
369}