vector/topology/
task.rs

1use std::{
2    fmt,
3    future::Future,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use futures::{FutureExt, future::BoxFuture};
9use pin_project::pin_project;
10use snafu::Snafu;
11use tokio::task::JoinError;
12use vector_lib::{buffers::topology::channel::BufferReceiverStream, event::EventArray};
13
14use crate::{config::ComponentKey, utilization::Utilization};
15
16#[allow(clippy::large_enum_variant)]
17pub(crate) enum TaskOutput {
18    Source,
19    Transform,
20    /// Buffer of sink
21    Sink(Utilization<BufferReceiverStream<EventArray>>),
22    Healthcheck,
23}
24
25#[derive(Debug, Snafu)]
26pub(crate) enum TaskError {
27    #[snafu(display("the task was cancelled before it completed"))]
28    Cancelled,
29    #[snafu(display("the task panicked and was aborted"))]
30    Panicked,
31    #[snafu(display("the task completed with an error"))]
32    Opaque,
33    #[snafu(display("{}", source))]
34    Wrapped { source: crate::Error },
35}
36
37impl TaskError {
38    pub fn wrapped(e: crate::Error) -> Self {
39        Self::Wrapped { source: e }
40    }
41}
42
43impl From<JoinError> for TaskError {
44    fn from(e: JoinError) -> Self {
45        if e.is_cancelled() {
46            Self::Cancelled
47        } else {
48            Self::Panicked
49        }
50    }
51}
52
53pub(crate) type TaskResult = Result<TaskOutput, TaskError>;
54
55/// High level topology task.
56#[pin_project]
57pub(crate) struct Task {
58    #[pin]
59    inner: BoxFuture<'static, TaskResult>,
60    key: ComponentKey,
61    typetag: String,
62}
63
64impl Task {
65    pub fn new<S, Fut>(key: ComponentKey, typetag: S, inner: Fut) -> Self
66    where
67        S: Into<String>,
68        Fut: Future<Output = TaskResult> + Send + 'static,
69    {
70        Self {
71            inner: inner.boxed(),
72            key,
73            typetag: typetag.into(),
74        }
75    }
76
77    pub fn id(&self) -> &str {
78        self.key.id()
79    }
80
81    #[allow(clippy::missing_const_for_fn)] // Adding `const` results in https://doc.rust-lang.org/error_codes/E0015.html
82    pub fn typetag(&self) -> &str {
83        &self.typetag
84    }
85}
86
87impl Future for Task {
88    type Output = TaskResult;
89
90    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
91        let this: &mut Task = self.get_mut();
92        this.inner.as_mut().poll(cx)
93    }
94}
95
96impl fmt::Debug for Task {
97    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98        f.debug_struct("Task")
99            .field("id", &self.key.id().to_string())
100            .field("typetag", &self.typetag)
101            .finish()
102    }
103}