1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
use std::{
    fmt,
    future::Future,
    pin::Pin,
    task::{Context, Poll},
};

use futures::{future::BoxFuture, FutureExt};
use pin_project::pin_project;
use snafu::Snafu;
use tokio::task::JoinError;
use vector_lib::buffers::topology::channel::BufferReceiverStream;
use vector_lib::event::EventArray;

use crate::{config::ComponentKey, utilization::Utilization};

#[allow(clippy::large_enum_variant)]
pub(crate) enum TaskOutput {
    Source,
    Transform,
    /// Buffer of sink
    Sink(Utilization<BufferReceiverStream<EventArray>>),
    Healthcheck,
}

#[derive(Debug, Snafu)]
pub(crate) enum TaskError {
    #[snafu(display("the task was cancelled before it completed"))]
    Cancelled,
    #[snafu(display("the task panicked and was aborted"))]
    Panicked,
    #[snafu(display("the task completed with an error"))]
    Opaque,
    #[snafu(display("{}", source))]
    Wrapped { source: crate::Error },
}

impl TaskError {
    pub fn wrapped(e: crate::Error) -> Self {
        Self::Wrapped { source: e }
    }
}

impl From<JoinError> for TaskError {
    fn from(e: JoinError) -> Self {
        if e.is_cancelled() {
            Self::Cancelled
        } else {
            Self::Panicked
        }
    }
}

pub(crate) type TaskResult = Result<TaskOutput, TaskError>;

/// High level topology task.
#[pin_project]
pub(crate) struct Task {
    #[pin]
    inner: BoxFuture<'static, TaskResult>,
    key: ComponentKey,
    typetag: String,
}

impl Task {
    pub fn new<S, Fut>(key: ComponentKey, typetag: S, inner: Fut) -> Self
    where
        S: Into<String>,
        Fut: Future<Output = TaskResult> + Send + 'static,
    {
        Self {
            inner: inner.boxed(),
            key,
            typetag: typetag.into(),
        }
    }

    pub fn id(&self) -> &str {
        self.key.id()
    }

    pub fn typetag(&self) -> &str {
        &self.typetag
    }
}

impl Future for Task {
    type Output = TaskResult;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this: &mut Task = self.get_mut();
        this.inner.as_mut().poll(cx)
    }
}

impl fmt::Debug for Task {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("Task")
            .field("id", &self.key.id().to_string())
            .field("typetag", &self.typetag)
            .finish()
    }
}