1use std::{
2 fmt,
3 future::Future,
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use futures::{future::BoxFuture, FutureExt};
9use pin_project::pin_project;
10use snafu::Snafu;
11use tokio::task::JoinError;
12use vector_lib::buffers::topology::channel::BufferReceiverStream;
13use vector_lib::event::EventArray;
14
15use crate::{config::ComponentKey, utilization::Utilization};
16
17#[allow(clippy::large_enum_variant)]
18pub(crate) enum TaskOutput {
19 Source,
20 Transform,
21 Sink(Utilization<BufferReceiverStream<EventArray>>),
23 Healthcheck,
24}
25
26#[derive(Debug, Snafu)]
27pub(crate) enum TaskError {
28 #[snafu(display("the task was cancelled before it completed"))]
29 Cancelled,
30 #[snafu(display("the task panicked and was aborted"))]
31 Panicked,
32 #[snafu(display("the task completed with an error"))]
33 Opaque,
34 #[snafu(display("{}", source))]
35 Wrapped { source: crate::Error },
36}
37
38impl TaskError {
39 pub fn wrapped(e: crate::Error) -> Self {
40 Self::Wrapped { source: e }
41 }
42}
43
44impl From<JoinError> for TaskError {
45 fn from(e: JoinError) -> Self {
46 if e.is_cancelled() {
47 Self::Cancelled
48 } else {
49 Self::Panicked
50 }
51 }
52}
53
54pub(crate) type TaskResult = Result<TaskOutput, TaskError>;
55
56#[pin_project]
58pub(crate) struct Task {
59 #[pin]
60 inner: BoxFuture<'static, TaskResult>,
61 key: ComponentKey,
62 typetag: String,
63}
64
65impl Task {
66 pub fn new<S, Fut>(key: ComponentKey, typetag: S, inner: Fut) -> Self
67 where
68 S: Into<String>,
69 Fut: Future<Output = TaskResult> + Send + 'static,
70 {
71 Self {
72 inner: inner.boxed(),
73 key,
74 typetag: typetag.into(),
75 }
76 }
77
78 pub fn id(&self) -> &str {
79 self.key.id()
80 }
81
82 #[allow(clippy::missing_const_for_fn)] pub fn typetag(&self) -> &str {
84 &self.typetag
85 }
86}
87
88impl Future for Task {
89 type Output = TaskResult;
90
91 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
92 let this: &mut Task = self.get_mut();
93 this.inner.as_mut().poll(cx)
94 }
95}
96
97impl fmt::Debug for Task {
98 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99 f.debug_struct("Task")
100 .field("id", &self.key.id().to_string())
101 .field("typetag", &self.typetag)
102 .finish()
103 }
104}