1use std::{borrow::Cow, collections::BTreeMap};
2
3use bytes::Bytes;
4use colored::{ColoredString, Colorize};
5use prost::Message;
6use tokio::{
7 sync::mpsc as tokio_mpsc,
8 time::{Duration, Instant, timeout},
9};
10use tokio_stream::StreamExt;
11use tokio_util::codec::Encoder;
12use url::Url;
13use vector_api_client::{
14 Client,
15 proto::{StreamOutputEventsRequest, StreamOutputEventsResponse},
16};
17use vector_core::event::Event;
18
19#[derive(Clone, Copy, Debug, clap::ValueEnum)]
20pub enum TapEncodingFormat {
21 Json,
22 Yaml,
23 Logfmt,
24}
25
26#[derive(Clone, Debug)]
30pub struct EventFormatter {
31 meta: bool,
32 format: TapEncodingFormat,
33 component_id_label: ColoredString,
34 component_kind_label: ColoredString,
35 component_type_label: ColoredString,
36}
37
38impl EventFormatter {
39 pub fn new(meta: bool, format: TapEncodingFormat) -> Self {
40 Self {
41 meta,
42 format,
43 component_id_label: "component_id".green(),
44 component_kind_label: "component_kind".green(),
45 component_type_label: "component_type".green(),
46 }
47 }
48
49 pub fn format<'a>(
50 &self,
51 component_id: &str,
52 component_kind: &str,
53 component_type: &str,
54 event: &'a str,
55 ) -> Cow<'a, str> {
56 if self.meta {
57 match self.format {
58 TapEncodingFormat::Json => format!(
59 r#"{{"{}":"{}","{}":"{}","{}":"{}","event":{}}}"#,
60 self.component_id_label,
61 component_id.green(),
62 self.component_kind_label,
63 component_kind.green(),
64 self.component_type_label,
65 component_type.green(),
66 event
67 )
68 .into(),
69 TapEncodingFormat::Yaml => {
70 let mut value: BTreeMap<String, serde_yaml::Value> = BTreeMap::new();
71 value.insert("event".to_string(), serde_yaml::from_str(event).unwrap());
72 format!(
76 "{}{}: {}\n{}: {}\n{}: {}\n",
77 serde_yaml::to_string(&value).unwrap(),
78 self.component_id_label,
79 component_id.green(),
80 self.component_kind_label,
81 component_kind.green(),
82 self.component_type_label,
83 component_type.green()
84 )
85 .into()
86 }
87 TapEncodingFormat::Logfmt => format!(
88 "{}={} {}={} {}={} {}",
89 self.component_id_label,
90 component_id.green(),
91 self.component_kind_label,
92 component_kind.green(),
93 self.component_type_label,
94 component_type.green(),
95 event
96 )
97 .into(),
98 }
99 } else {
100 event.into()
101 }
102 }
103}
104
105#[derive(Clone, Debug)]
106pub enum OutputChannel {
107 Stdout(EventFormatter),
108 AsyncChannel(tokio_mpsc::Sender<Vec<StreamOutputEventsResponse>>),
109}
110
111#[derive(Debug)]
113pub enum TapExecutorError {
114 ConnectionFailure(String),
115 GrpcError(String),
116 Fatal(String),
118}
119
120impl TapExecutorError {
121 pub fn is_fatal(&self) -> bool {
122 matches!(self, TapExecutorError::Fatal(_))
123 }
124}
125
126impl From<vector_api_client::Error> for TapExecutorError {
127 fn from(err: vector_api_client::Error) -> Self {
128 if err.is_fatal() {
129 TapExecutorError::Fatal(format!("{}", err))
130 } else {
131 TapExecutorError::GrpcError(format!("{}", err))
132 }
133 }
134}
135
136#[derive(Debug)]
137pub struct TapRunner<'a> {
138 url: &'a Url,
139 input_patterns: Vec<String>,
140 output_patterns: Vec<String>,
141 output_channel: &'a OutputChannel,
142}
143
144impl<'a> TapRunner<'a> {
145 pub fn new(
146 url: &'a Url,
147 input_patterns: Vec<String>,
148 output_patterns: Vec<String>,
149 output_channel: &'a OutputChannel,
150 ) -> Self {
151 TapRunner {
152 url,
153 input_patterns,
154 output_patterns,
155 output_channel,
156 }
157 }
158
159 pub async fn run_tap(
160 &self,
161 interval: i64,
162 limit: i64,
163 duration_ms: Option<u64>,
164 quiet: bool,
165 ) -> Result<(), TapExecutorError> {
166 let uri = self
167 .url
168 .as_str()
169 .parse()
170 .map_err(|e| TapExecutorError::Fatal(format!("Invalid URL: {e}")))?;
171 let mut client = Client::new(uri);
172 client.connect().await?;
173 self.run_tap_with_client(client, interval, limit, duration_ms, quiet)
174 .await
175 }
176
177 pub async fn run_tap_with_client(
180 &self,
181 mut client: Client,
182 interval: i64,
183 limit: i64,
184 duration_ms: Option<u64>,
185 quiet: bool,
186 ) -> Result<(), TapExecutorError> {
187 let request = StreamOutputEventsRequest {
188 outputs_patterns: self.output_patterns.clone(),
189 inputs_patterns: self.input_patterns.clone(),
190 limit: limit as i32,
191 interval_ms: interval as i32,
192 };
193
194 let mut stream = client.stream_output_events(request).await?;
195
196 let start_time = Instant::now();
197 let stream_duration = duration_ms
198 .map(Duration::from_millis)
199 .unwrap_or(Duration::MAX);
200
201 loop {
203 let time_elapsed = start_time.elapsed();
204 if time_elapsed >= stream_duration {
205 return Ok(());
206 }
207
208 let message = timeout(stream_duration - time_elapsed, stream.next()).await;
209 match message {
210 Ok(Some(Ok(output_event))) => {
211 if quiet
213 && matches!(
214 output_event.event,
215 Some(
216 vector_api_client::proto::stream_output_events_response::Event::Notification(
217 _
218 )
219 )
220 )
221 {
222 continue;
223 }
224
225 match &self.output_channel {
226 OutputChannel::Stdout(formatter) => {
227 self.output_event_stdout(&output_event, formatter);
228 }
229 OutputChannel::AsyncChannel(sender_tx) => {
230 if let Err(error) = sender_tx.send(vec![output_event]).await {
231 error!("Could not send tap events: {error}");
232 }
233 }
234 }
235 }
236 Err(_) =>
237 {
240 return Ok(());
241 }
242 Ok(None) => {
243 return Err(TapExecutorError::GrpcError(
244 "Stream ended unexpectedly".to_string(),
245 ));
246 }
247 Ok(Some(Err(err))) => return Err(TapExecutorError::from(err)),
248 }
249 }
250 }
251
252 fn serialize_event(
254 event_wrapper: &vector_api_client::proto::event::EventWrapper,
255 format: TapEncodingFormat,
256 ) -> Result<String, String> {
257 let bytes = event_wrapper.encode_to_vec();
263
264 let core_event_wrapper =
265 vector_core::event::proto::EventWrapper::decode(Bytes::from(bytes))
266 .map_err(|e| format!("Failed to decode event: {}", e))?;
267
268 let event: Event = core_event_wrapper.into();
270
271 match format {
273 TapEncodingFormat::Json => serde_json::to_string(&event)
274 .map_err(|e| format!("JSON serialization failed: {}", e)),
275 TapEncodingFormat::Yaml => serde_yaml::to_string(&event)
276 .map_err(|e| format!("YAML serialization failed: {}", e)),
277 TapEncodingFormat::Logfmt => {
278 match event {
280 Event::Log(log_event) => {
281 let mut serializer =
282 codecs::encoding::format::LogfmtSerializerConfig.build();
283 let mut bytes = bytes::BytesMut::new();
284 serializer
286 .encode(Event::Log(log_event), &mut bytes)
287 .map_err(|e| format!("Logfmt serialization failed: {}", e))?;
288 String::from_utf8(bytes.to_vec())
289 .map_err(|e| format!("UTF-8 conversion failed: {}", e))
290 }
291 Event::Metric(_) => {
292 Err("logfmt format is only supported for log events".to_string())
293 }
294 Event::Trace(_) => {
295 Err("logfmt format is only supported for log events".to_string())
296 }
297 }
298 }
299 }
300 }
301
302 #[allow(clippy::print_stdout)]
303 fn output_event_stdout(
304 &self,
305 output_event: &StreamOutputEventsResponse,
306 formatter: &EventFormatter,
307 ) {
308 use vector_api_client::proto::stream_output_events_response::Event as OutputEventType;
309
310 match &output_event.event {
311 Some(OutputEventType::TappedEvent(ev)) => {
312 let encoded_string = if let Some(ref event_wrapper) = ev.event {
314 match Self::serialize_event(event_wrapper, formatter.format) {
315 Ok(s) => s,
316 Err(e) => {
317 error!(message = "Failed to serialize event.", error = %e);
318 format!("{:?}", event_wrapper)
319 }
320 }
321 } else {
322 "No event data".to_string()
323 };
324
325 println!(
326 "{}",
327 formatter.format(
328 &ev.component_id,
329 &ev.component_kind,
330 &ev.component_type,
331 &encoded_string
332 )
333 );
334 }
335 #[allow(clippy::print_stderr)]
336 Some(OutputEventType::Notification(ev)) => {
337 eprintln!("{}", ev.message);
338 }
339 None => {
340 error!("Received StreamOutputEventsResponse with no event data");
341 }
342 }
343 }
344}