1use std::{
2 fmt,
3 future::Future,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use futures::{FutureExt, future::BoxFuture};
9use pin_project::pin_project;
10use snafu::Snafu;
11use tokio::task::JoinError;
12use vector_lib::{buffers::topology::channel::BufferReceiverStream, event::EventArray};
13
14use crate::{config::ComponentKey, utilization::Utilization};
15
16#[allow(clippy::large_enum_variant)]
17pub(crate) enum TaskOutput {
18 Source,
19 Transform,
20 Sink(Utilization<BufferReceiverStream<EventArray>>),
22 Healthcheck,
23}
24
25#[derive(Debug, Snafu)]
26pub(crate) enum TaskError {
27 #[snafu(display("the task was cancelled before it completed"))]
28 Cancelled,
29 #[snafu(display("the task panicked and was aborted"))]
30 Panicked,
31 #[snafu(display("the task completed with an error"))]
32 Opaque,
33 #[snafu(display("{}", source))]
34 Wrapped { source: crate::Error },
35}
36
37impl TaskError {
38 pub fn wrapped(e: crate::Error) -> Self {
39 Self::Wrapped { source: e }
40 }
41}
42
43impl From<JoinError> for TaskError {
44 fn from(e: JoinError) -> Self {
45 if e.is_cancelled() {
46 Self::Cancelled
47 } else {
48 Self::Panicked
49 }
50 }
51}
52
53pub(crate) type TaskResult = Result<TaskOutput, TaskError>;
54
55#[pin_project]
57pub(crate) struct Task {
58 #[pin]
59 inner: BoxFuture<'static, TaskResult>,
60 key: ComponentKey,
61 typetag: String,
62}
63
64impl Task {
65 pub fn new<S, Fut>(key: ComponentKey, typetag: S, inner: Fut) -> Self
66 where
67 S: Into<String>,
68 Fut: Future<Output = TaskResult> + Send + 'static,
69 {
70 Self {
71 inner: inner.boxed(),
72 key,
73 typetag: typetag.into(),
74 }
75 }
76
77 pub fn id(&self) -> &str {
78 self.key.id()
79 }
80
81 #[allow(clippy::missing_const_for_fn)] pub fn typetag(&self) -> &str {
83 &self.typetag
84 }
85}
86
87impl Future for Task {
88 type Output = TaskResult;
89
90 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
91 let this: &mut Task = self.get_mut();
92 this.inner.as_mut().poll(cx)
93 }
94}
95
96impl fmt::Debug for Task {
97 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
98 f.debug_struct("Task")
99 .field("id", &self.key.id().to_string())
100 .field("typetag", &self.typetag)
101 .finish()
102 }
103}