1#![deny(warnings)]
2
3#[macro_use]
4extern crate tracing;
5
6pub mod controller;
7pub mod notification;
8pub mod topology;
9
10use std::{borrow::Cow, collections::BTreeMap};
11
12use colored::{ColoredString, Colorize};
13use tokio::sync::mpsc as tokio_mpsc;
14use tokio::time::timeout;
15use tokio::time::{Duration, Instant};
16use tokio_stream::StreamExt;
17use url::Url;
18
19use vector_api_client::{
20 connect_subscription_client,
21 gql::{
22 output_events_by_component_id_patterns_subscription::OutputEventsByComponentIdPatternsSubscriptionOutputEventsByComponentIdPatterns as GraphQLTapOutputEvent,
23 TapEncodingFormat, TapSubscriptionExt,
24 },
25};
26
27#[derive(Clone, Debug)]
28pub struct EventFormatter {
29 meta: bool,
30 format: TapEncodingFormat,
31 component_id_label: ColoredString,
32 component_kind_label: ColoredString,
33 component_type_label: ColoredString,
34}
35
36impl EventFormatter {
37 pub fn new(meta: bool, format: TapEncodingFormat) -> Self {
38 Self {
39 meta,
40 format,
41 component_id_label: "component_id".green(),
42 component_kind_label: "component_kind".green(),
43 component_type_label: "component_type".green(),
44 }
45 }
46
47 pub fn format<'a>(
48 &self,
49 component_id: &str,
50 component_kind: &str,
51 component_type: &str,
52 event: &'a str,
53 ) -> Cow<'a, str> {
54 if self.meta {
55 match self.format {
56 TapEncodingFormat::Json => format!(
57 r#"{{"{}":"{}","{}":"{}","{}":"{}","event":{}}}"#,
58 self.component_id_label,
59 component_id.green(),
60 self.component_kind_label,
61 component_kind.green(),
62 self.component_type_label,
63 component_type.green(),
64 event
65 )
66 .into(),
67 TapEncodingFormat::Yaml => {
68 let mut value: BTreeMap<String, serde_yaml::Value> = BTreeMap::new();
69 value.insert("event".to_string(), serde_yaml::from_str(event).unwrap());
70 format!(
74 "{}{}: {}\n{}: {}\n{}: {}\n",
75 serde_yaml::to_string(&value).unwrap(),
76 self.component_id_label,
77 component_id.green(),
78 self.component_kind_label,
79 component_kind.green(),
80 self.component_type_label,
81 component_type.green()
82 )
83 .into()
84 }
85 TapEncodingFormat::Logfmt => format!(
86 "{}={} {}={} {}={} {}",
87 self.component_id_label,
88 component_id.green(),
89 self.component_kind_label,
90 component_kind.green(),
91 self.component_type_label,
92 component_type.green(),
93 event
94 )
95 .into(),
96 }
97 } else {
98 event.into()
99 }
100 }
101}
102
103#[derive(Clone, Debug)]
104pub enum OutputChannel {
105 Stdout(EventFormatter),
106 AsyncChannel(tokio_mpsc::Sender<Vec<GraphQLTapOutputEvent>>),
107}
108
109#[derive(Debug)]
111pub enum TapExecutorError {
112 ConnectionFailure(tokio_tungstenite::tungstenite::Error),
113 GraphQLError,
114}
115
116#[derive(Debug)]
117pub struct TapRunner<'a> {
118 url: &'a Url,
119 input_patterns: Vec<String>,
120 output_patterns: Vec<String>,
121 output_channel: &'a OutputChannel,
122 format: TapEncodingFormat,
123}
124
125impl<'a> TapRunner<'a> {
126 pub fn new(
127 url: &'a Url,
128 input_patterns: Vec<String>,
129 output_patterns: Vec<String>,
130 output_channel: &'a OutputChannel,
131 format: TapEncodingFormat,
132 ) -> Self {
133 TapRunner {
134 url,
135 input_patterns,
136 output_patterns,
137 output_channel,
138 format,
139 }
140 }
141
142 pub async fn run_tap(
143 &self,
144 interval: i64,
145 limit: i64,
146 duration_ms: Option<u64>,
147 quiet: bool,
148 ) -> Result<(), TapExecutorError> {
149 let subscription_client = connect_subscription_client((*self.url).clone())
150 .await
151 .map_err(TapExecutorError::ConnectionFailure)?;
152
153 tokio::pin! {
154 let stream = subscription_client.output_events_by_component_id_patterns_subscription(
155 self.output_patterns.clone(),
156 self.input_patterns.clone(),
157 self.format,
158 limit,
159 interval,
160 );
161 }
162
163 let start_time = Instant::now();
164 let stream_duration = duration_ms
165 .map(Duration::from_millis)
166 .unwrap_or(Duration::MAX);
167
168 loop {
170 let time_elapsed = start_time.elapsed();
171 if time_elapsed >= stream_duration {
172 return Ok(());
173 }
174
175 let message = timeout(stream_duration - time_elapsed, stream.next()).await;
176 match message {
177 Ok(Some(Some(res))) => {
178 if let Some(d) = res.data {
179 let output_events: Vec<GraphQLTapOutputEvent> = d
180 .output_events_by_component_id_patterns
181 .into_iter()
182 .filter(|event| {
183 !matches!(
184 (quiet, event),
185 (true, GraphQLTapOutputEvent::EventNotification(_))
186 )
187 })
188 .collect();
189
190 match &self.output_channel {
191 OutputChannel::Stdout(formatter) => {
192 self.output_event_stdout(&output_events, formatter);
193 }
194 OutputChannel::AsyncChannel(sender_tx) => {
195 if let Err(error) = sender_tx.send(output_events).await {
196 error!("Could not send tap events: {error}");
197 }
198 }
199 }
200 }
201 }
202 Err(_) =>
203 {
206 return Ok(())
207 }
208 Ok(_) => return Err(TapExecutorError::GraphQLError),
209 }
210 }
211 }
212
213 #[allow(clippy::print_stdout)]
214 fn output_event_stdout(
215 &self,
216 output_events: &[GraphQLTapOutputEvent],
217 formatter: &EventFormatter,
218 ) {
219 for tap_event in output_events.iter() {
220 match tap_event {
221 GraphQLTapOutputEvent::Log(ev) => {
222 println!(
223 "{}",
224 formatter.format(
225 ev.component_id.as_ref(),
226 ev.component_kind.as_ref(),
227 ev.component_type.as_ref(),
228 ev.string.as_ref()
229 )
230 );
231 }
232 GraphQLTapOutputEvent::Metric(ev) => {
233 println!(
234 "{}",
235 formatter.format(
236 ev.component_id.as_ref(),
237 ev.component_kind.as_ref(),
238 ev.component_type.as_ref(),
239 ev.string.as_ref()
240 )
241 );
242 }
243 GraphQLTapOutputEvent::Trace(ev) => {
244 println!(
245 "{}",
246 formatter.format(
247 ev.component_id.as_ref(),
248 ev.component_kind.as_ref(),
249 ev.component_type.as_ref(),
250 ev.string.as_ref()
251 )
252 );
253 }
254 #[allow(clippy::print_stderr)]
255 GraphQLTapOutputEvent::EventNotification(ev) => {
256 eprintln!("{}", ev.message);
257 }
258 }
259 }
260 }
261}