vector/topology/
task.rs

1use std::{
2    fmt,
3    future::Future,
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use futures::{future::BoxFuture, FutureExt};
9use pin_project::pin_project;
10use snafu::Snafu;
11use tokio::task::JoinError;
12use vector_lib::buffers::topology::channel::BufferReceiverStream;
13use vector_lib::event::EventArray;
14
15use crate::{config::ComponentKey, utilization::Utilization};
16
17#[allow(clippy::large_enum_variant)]
18pub(crate) enum TaskOutput {
19    Source,
20    Transform,
21    /// Buffer of sink
22    Sink(Utilization<BufferReceiverStream<EventArray>>),
23    Healthcheck,
24}
25
26#[derive(Debug, Snafu)]
27pub(crate) enum TaskError {
28    #[snafu(display("the task was cancelled before it completed"))]
29    Cancelled,
30    #[snafu(display("the task panicked and was aborted"))]
31    Panicked,
32    #[snafu(display("the task completed with an error"))]
33    Opaque,
34    #[snafu(display("{}", source))]
35    Wrapped { source: crate::Error },
36}
37
38impl TaskError {
39    pub fn wrapped(e: crate::Error) -> Self {
40        Self::Wrapped { source: e }
41    }
42}
43
44impl From<JoinError> for TaskError {
45    fn from(e: JoinError) -> Self {
46        if e.is_cancelled() {
47            Self::Cancelled
48        } else {
49            Self::Panicked
50        }
51    }
52}
53
54pub(crate) type TaskResult = Result<TaskOutput, TaskError>;
55
56/// High level topology task.
57#[pin_project]
58pub(crate) struct Task {
59    #[pin]
60    inner: BoxFuture<'static, TaskResult>,
61    key: ComponentKey,
62    typetag: String,
63}
64
65impl Task {
66    pub fn new<S, Fut>(key: ComponentKey, typetag: S, inner: Fut) -> Self
67    where
68        S: Into<String>,
69        Fut: Future<Output = TaskResult> + Send + 'static,
70    {
71        Self {
72            inner: inner.boxed(),
73            key,
74            typetag: typetag.into(),
75        }
76    }
77
78    pub fn id(&self) -> &str {
79        self.key.id()
80    }
81
82    #[allow(clippy::missing_const_for_fn)] // Adding `const` results in https://doc.rust-lang.org/error_codes/E0015.html
83    pub fn typetag(&self) -> &str {
84        &self.typetag
85    }
86}
87
88impl Future for Task {
89    type Output = TaskResult;
90
91    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92        let this: &mut Task = self.get_mut();
93        this.inner.as_mut().poll(cx)
94    }
95}
96
97impl fmt::Debug for Task {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        f.debug_struct("Task")
100            .field("id", &self.key.id().to_string())
101            .field("typetag", &self.typetag)
102            .finish()
103    }
104}