vector_tap/
lib.rs

1#![deny(warnings)]
2
3#[macro_use]
4extern crate tracing;
5
6pub mod controller;
7pub mod notification;
8pub mod topology;
9
10use std::{borrow::Cow, collections::BTreeMap};
11
12use colored::{ColoredString, Colorize};
13use tokio::sync::mpsc as tokio_mpsc;
14use tokio::time::timeout;
15use tokio::time::{Duration, Instant};
16use tokio_stream::StreamExt;
17use url::Url;
18
19use vector_api_client::{
20    connect_subscription_client,
21    gql::{
22        output_events_by_component_id_patterns_subscription::OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns as GraphQLTapOutputEvent,
23        TapEncodingFormat, TapSubscriptionExt,
24    },
25};
26
27#[derive(Clone, Debug)]
28pub struct EventFormatter {
29    meta: bool,
30    format: TapEncodingFormat,
31    component_id_label: ColoredString,
32    component_kind_label: ColoredString,
33    component_type_label: ColoredString,
34}
35
36impl EventFormatter {
37    pub fn new(meta: bool, format: TapEncodingFormat) -> Self {
38        Self {
39            meta,
40            format,
41            component_id_label: "component_id".green(),
42            component_kind_label: "component_kind".green(),
43            component_type_label: "component_type".green(),
44        }
45    }
46
47    pub fn format<'a>(
48        &self,
49        component_id: &str,
50        component_kind: &str,
51        component_type: &str,
52        event: &'a str,
53    ) -> Cow<'a, str> {
54        if self.meta {
55            match self.format {
56                TapEncodingFormat::Json => format!(
57                    r#"{{"{}":"{}","{}":"{}","{}":"{}","event":{}}}"#,
58                    self.component_id_label,
59                    component_id.green(),
60                    self.component_kind_label,
61                    component_kind.green(),
62                    self.component_type_label,
63                    component_type.green(),
64                    event
65                )
66                .into(),
67                TapEncodingFormat::Yaml => {
68                    let mut value: BTreeMap<String, serde_yaml::Value> = BTreeMap::new();
69                    value.insert("event".to_string(), serde_yaml::from_str(event).unwrap());
70                    // We interpolate to include component_id rather than
71                    // include it in the map to correctly preserve color
72                    // formatting
73                    format!(
74                        "{}{}: {}\n{}: {}\n{}: {}\n",
75                        serde_yaml::to_string(&value).unwrap(),
76                        self.component_id_label,
77                        component_id.green(),
78                        self.component_kind_label,
79                        component_kind.green(),
80                        self.component_type_label,
81                        component_type.green()
82                    )
83                    .into()
84                }
85                TapEncodingFormat::Logfmt => format!(
86                    "{}={} {}={} {}={} {}",
87                    self.component_id_label,
88                    component_id.green(),
89                    self.component_kind_label,
90                    component_kind.green(),
91                    self.component_type_label,
92                    component_type.green(),
93                    event
94                )
95                .into(),
96            }
97        } else {
98            event.into()
99        }
100    }
101}
102
103#[derive(Clone, Debug)]
104pub enum OutputChannel {
105    Stdout(EventFormatter),
106    AsyncChannel(tokio_mpsc::Sender<Vec<GraphQLTapOutputEvent>>),
107}
108
109/// Error type for DNS message parsing
110#[derive(Debug)]
111pub enum TapExecutorError {
112    ConnectionFailure(tokio_tungstenite::tungstenite::Error),
113    GraphQLError,
114}
115
116#[derive(Debug)]
117pub struct TapRunner<'a> {
118    url: &'a Url,
119    input_patterns: Vec<String>,
120    output_patterns: Vec<String>,
121    output_channel: &'a OutputChannel,
122    format: TapEncodingFormat,
123}
124
125impl<'a> TapRunner<'a> {
126    pub fn new(
127        url: &'a Url,
128        input_patterns: Vec<String>,
129        output_patterns: Vec<String>,
130        output_channel: &'a OutputChannel,
131        format: TapEncodingFormat,
132    ) -> Self {
133        TapRunner {
134            url,
135            input_patterns,
136            output_patterns,
137            output_channel,
138            format,
139        }
140    }
141
142    pub async fn run_tap(
143        &self,
144        interval: i64,
145        limit: i64,
146        duration_ms: Option<u64>,
147        quiet: bool,
148    ) -> Result<(), TapExecutorError> {
149        let subscription_client = connect_subscription_client((*self.url).clone())
150            .await
151            .map_err(TapExecutorError::ConnectionFailure)?;
152
153        tokio::pin! {
154            let stream = subscription_client.output_events_by_component_id_patterns_subscription(
155                self.output_patterns.clone(),
156                self.input_patterns.clone(),
157                self.format,
158                limit,
159                interval,
160            );
161        }
162
163        let start_time = Instant::now();
164        let stream_duration = duration_ms
165            .map(Duration::from_millis)
166            .unwrap_or(Duration::MAX);
167
168        // Loop over the returned results, printing out tap events.
169        loop {
170            let time_elapsed = start_time.elapsed();
171            if time_elapsed >= stream_duration {
172                return Ok(());
173            }
174
175            let message = timeout(stream_duration - time_elapsed, stream.next()).await;
176            match message {
177                Ok(Some(Some(res))) => {
178                    if let Some(d) = res.data {
179                        let output_events: Vec<GraphQLTapOutputEvent> = d
180                            .output_events_by_component_id_patterns
181                            .into_iter()
182                            .filter(|event| {
183                                !matches!(
184                                    (quiet, event),
185                                    (true, GraphQLTapOutputEvent::EventNotification(_))
186                                )
187                            })
188                            .collect();
189
190                        match &self.output_channel {
191                            OutputChannel::Stdout(formatter) => {
192                                self.output_event_stdout(&output_events, formatter);
193                            }
194                            OutputChannel::AsyncChannel(sender_tx) => {
195                                if let Err(error) = sender_tx.send(output_events).await {
196                                    error!("Could not send tap events: {error}");
197                                }
198                            }
199                        }
200                    }
201                }
202                Err(_) =>
203                // If the stream times out, that indicates the duration specified by the user
204                // has elapsed. We should exit gracefully.
205                {
206                    return Ok(())
207                }
208                Ok(_) => return Err(TapExecutorError::GraphQLError),
209            }
210        }
211    }
212
213    #[allow(clippy::print_stdout)]
214    fn output_event_stdout(
215        &self,
216        output_events: &[GraphQLTapOutputEvent],
217        formatter: &EventFormatter,
218    ) {
219        for tap_event in output_events.iter() {
220            match tap_event {
221                GraphQLTapOutputEvent::Log(ev) => {
222                    println!(
223                        "{}",
224                        formatter.format(
225                            ev.component_id.as_ref(),
226                            ev.component_kind.as_ref(),
227                            ev.component_type.as_ref(),
228                            ev.string.as_ref()
229                        )
230                    );
231                }
232                GraphQLTapOutputEvent::Metric(ev) => {
233                    println!(
234                        "{}",
235                        formatter.format(
236                            ev.component_id.as_ref(),
237                            ev.component_kind.as_ref(),
238                            ev.component_type.as_ref(),
239                            ev.string.as_ref()
240                        )
241                    );
242                }
243                GraphQLTapOutputEvent::Trace(ev) => {
244                    println!(
245                        "{}",
246                        formatter.format(
247                            ev.component_id.as_ref(),
248                            ev.component_kind.as_ref(),
249                            ev.component_type.as_ref(),
250                            ev.string.as_ref()
251                        )
252                    );
253                }
254                #[allow(clippy::print_stderr)]
255                GraphQLTapOutputEvent::EventNotification(ev) => {
256                    eprintln!("{}", ev.message);
257                }
258            }
259        }
260    }
261}