vector_tap/
runner.rs

1use std::{borrow::Cow, collections::BTreeMap};
2
3use bytes::Bytes;
4use colored::{ColoredString, Colorize};
5use prost::Message;
6use tokio::{
7    sync::mpsc as tokio_mpsc,
8    time::{Duration, Instant, timeout},
9};
10use tokio_stream::StreamExt;
11use tokio_util::codec::Encoder;
12use url::Url;
13use vector_api_client::{
14    Client,
15    proto::{StreamOutputEventsRequest, StreamOutputEventsResponse},
16};
17use vector_core::event::Event;
18
19#[derive(Clone, Copy, Debug, clap::ValueEnum)]
20pub enum TapEncodingFormat {
21    Json,
22    Yaml,
23    Logfmt,
24}
25
26// Note: TapEncodingFormat is kept for CLI compatibility but not used in the gRPC API
27// The server now sends proto events directly, which clients serialize as needed
28
29#[derive(Clone, Debug)]
30pub struct EventFormatter {
31    meta: bool,
32    format: TapEncodingFormat,
33    component_id_label: ColoredString,
34    component_kind_label: ColoredString,
35    component_type_label: ColoredString,
36}
37
38impl EventFormatter {
39    pub fn new(meta: bool, format: TapEncodingFormat) -> Self {
40        Self {
41            meta,
42            format,
43            component_id_label: "component_id".green(),
44            component_kind_label: "component_kind".green(),
45            component_type_label: "component_type".green(),
46        }
47    }
48
49    pub fn format<'a>(
50        &self,
51        component_id: &str,
52        component_kind: &str,
53        component_type: &str,
54        event: &'a str,
55    ) -> Cow<'a, str> {
56        if self.meta {
57            match self.format {
58                TapEncodingFormat::Json => format!(
59                    r#"{{"{}":"{}","{}":"{}","{}":"{}","event":{}}}"#,
60                    self.component_id_label,
61                    component_id.green(),
62                    self.component_kind_label,
63                    component_kind.green(),
64                    self.component_type_label,
65                    component_type.green(),
66                    event
67                )
68                .into(),
69                TapEncodingFormat::Yaml => {
70                    let mut value: BTreeMap<String, serde_yaml::Value> = BTreeMap::new();
71                    value.insert("event".to_string(), serde_yaml::from_str(event).unwrap());
72                    // We interpolate to include component_id rather than
73                    // include it in the map to correctly preserve color
74                    // formatting
75                    format!(
76                        "{}{}: {}\n{}: {}\n{}: {}\n",
77                        serde_yaml::to_string(&value).unwrap(),
78                        self.component_id_label,
79                        component_id.green(),
80                        self.component_kind_label,
81                        component_kind.green(),
82                        self.component_type_label,
83                        component_type.green()
84                    )
85                    .into()
86                }
87                TapEncodingFormat::Logfmt => format!(
88                    "{}={} {}={} {}={} {}",
89                    self.component_id_label,
90                    component_id.green(),
91                    self.component_kind_label,
92                    component_kind.green(),
93                    self.component_type_label,
94                    component_type.green(),
95                    event
96                )
97                .into(),
98            }
99        } else {
100            event.into()
101        }
102    }
103}
104
105#[derive(Clone, Debug)]
106pub enum OutputChannel {
107    Stdout(EventFormatter),
108    AsyncChannel(tokio_mpsc::Sender<Vec<StreamOutputEventsResponse>>),
109}
110
111/// Error type for tap execution
112#[derive(Debug)]
113pub enum TapExecutorError {
114    ConnectionFailure(String),
115    GrpcError(String),
116    /// Permanent error that should not trigger a reconnect (e.g. invalid arguments).
117    Fatal(String),
118}
119
120impl TapExecutorError {
121    pub fn is_fatal(&self) -> bool {
122        matches!(self, TapExecutorError::Fatal(_))
123    }
124}
125
126impl From<vector_api_client::Error> for TapExecutorError {
127    fn from(err: vector_api_client::Error) -> Self {
128        if err.is_fatal() {
129            TapExecutorError::Fatal(format!("{}", err))
130        } else {
131            TapExecutorError::GrpcError(format!("{}", err))
132        }
133    }
134}
135
136#[derive(Debug)]
137pub struct TapRunner<'a> {
138    url: &'a Url,
139    input_patterns: Vec<String>,
140    output_patterns: Vec<String>,
141    output_channel: &'a OutputChannel,
142}
143
144impl<'a> TapRunner<'a> {
145    pub fn new(
146        url: &'a Url,
147        input_patterns: Vec<String>,
148        output_patterns: Vec<String>,
149        output_channel: &'a OutputChannel,
150    ) -> Self {
151        TapRunner {
152            url,
153            input_patterns,
154            output_patterns,
155            output_channel,
156        }
157    }
158
159    pub async fn run_tap(
160        &self,
161        interval: i64,
162        limit: i64,
163        duration_ms: Option<u64>,
164        quiet: bool,
165    ) -> Result<(), TapExecutorError> {
166        let uri = self
167            .url
168            .as_str()
169            .parse()
170            .map_err(|e| TapExecutorError::Fatal(format!("Invalid URL: {e}")))?;
171        let mut client = Client::new(uri);
172        client.connect().await?;
173        self.run_tap_with_client(client, interval, limit, duration_ms, quiet)
174            .await
175    }
176
177    /// Run tap using a pre-connected client (avoids an extra connection round-trip when the
178    /// caller has already connected and health-checked the client).
179    pub async fn run_tap_with_client(
180        &self,
181        mut client: Client,
182        interval: i64,
183        limit: i64,
184        duration_ms: Option<u64>,
185        quiet: bool,
186    ) -> Result<(), TapExecutorError> {
187        let request = StreamOutputEventsRequest {
188            outputs_patterns: self.output_patterns.clone(),
189            inputs_patterns: self.input_patterns.clone(),
190            limit: limit as i32,
191            interval_ms: interval as i32,
192        };
193
194        let mut stream = client.stream_output_events(request).await?;
195
196        let start_time = Instant::now();
197        let stream_duration = duration_ms
198            .map(Duration::from_millis)
199            .unwrap_or(Duration::MAX);
200
201        // Loop over the returned results, processing tap events
202        loop {
203            let time_elapsed = start_time.elapsed();
204            if time_elapsed >= stream_duration {
205                return Ok(());
206            }
207
208            let message = timeout(stream_duration - time_elapsed, stream.next()).await;
209            match message {
210                Ok(Some(Ok(output_event))) => {
211                    // Filter out notifications if quiet mode is enabled
212                    if quiet
213                        && matches!(
214                            output_event.event,
215                            Some(
216                                vector_api_client::proto::stream_output_events_response::Event::Notification(
217                                    _
218                                )
219                            )
220                        )
221                    {
222                        continue;
223                    }
224
225                    match &self.output_channel {
226                        OutputChannel::Stdout(formatter) => {
227                            self.output_event_stdout(&output_event, formatter);
228                        }
229                        OutputChannel::AsyncChannel(sender_tx) => {
230                            if let Err(error) = sender_tx.send(vec![output_event]).await {
231                                error!("Could not send tap events: {error}");
232                            }
233                        }
234                    }
235                }
236                Err(_) =>
237                // If the stream times out, that indicates the duration specified by the user
238                // has elapsed. We should exit gracefully.
239                {
240                    return Ok(());
241                }
242                Ok(None) => {
243                    return Err(TapExecutorError::GrpcError(
244                        "Stream ended unexpectedly".to_string(),
245                    ));
246                }
247                Ok(Some(Err(err))) => return Err(TapExecutorError::from(err)),
248            }
249        }
250    }
251
252    /// Convert and serialize a protobuf EventWrapper to the requested format
253    fn serialize_event(
254        event_wrapper: &vector_api_client::proto::event::EventWrapper,
255        format: TapEncodingFormat,
256    ) -> Result<String, String> {
257        // INTENTIONAL round-trip through protobuf bytes: `vector_api_client` compiles
258        // `event.proto` independently to avoid taking a dependency on `vector_core` (a large
259        // crate with many features). Both types share the same proto schema, so encoding one
260        // and decoding the other is always safe. If the schemas ever diverge this will surface
261        // as a runtime decode error.
262        let bytes = event_wrapper.encode_to_vec();
263
264        let core_event_wrapper =
265            vector_core::event::proto::EventWrapper::decode(Bytes::from(bytes))
266                .map_err(|e| format!("Failed to decode event: {}", e))?;
267
268        // Convert to vector-core Event (which has Serialize)
269        let event: Event = core_event_wrapper.into();
270
271        // Serialize based on format
272        match format {
273            TapEncodingFormat::Json => serde_json::to_string(&event)
274                .map_err(|e| format!("JSON serialization failed: {}", e)),
275            TapEncodingFormat::Yaml => serde_yaml::to_string(&event)
276                .map_err(|e| format!("YAML serialization failed: {}", e)),
277            TapEncodingFormat::Logfmt => {
278                // For logfmt, we need to extract the log event and serialize it
279                match event {
280                    Event::Log(log_event) => {
281                        let mut serializer =
282                            codecs::encoding::format::LogfmtSerializerConfig.build();
283                        let mut bytes = bytes::BytesMut::new();
284                        // Wrap the LogEvent back into Event for the serializer
285                        serializer
286                            .encode(Event::Log(log_event), &mut bytes)
287                            .map_err(|e| format!("Logfmt serialization failed: {}", e))?;
288                        String::from_utf8(bytes.to_vec())
289                            .map_err(|e| format!("UTF-8 conversion failed: {}", e))
290                    }
291                    Event::Metric(_) => {
292                        Err("logfmt format is only supported for log events".to_string())
293                    }
294                    Event::Trace(_) => {
295                        Err("logfmt format is only supported for log events".to_string())
296                    }
297                }
298            }
299        }
300    }
301
302    #[allow(clippy::print_stdout)]
303    fn output_event_stdout(
304        &self,
305        output_event: &StreamOutputEventsResponse,
306        formatter: &EventFormatter,
307    ) {
308        use vector_api_client::proto::stream_output_events_response::Event as OutputEventType;
309
310        match &output_event.event {
311            Some(OutputEventType::TappedEvent(ev)) => {
312                // Format the proto event for display
313                let encoded_string = if let Some(ref event_wrapper) = ev.event {
314                    match Self::serialize_event(event_wrapper, formatter.format) {
315                        Ok(s) => s,
316                        Err(e) => {
317                            error!(message = "Failed to serialize event.", error = %e);
318                            format!("{:?}", event_wrapper)
319                        }
320                    }
321                } else {
322                    "No event data".to_string()
323                };
324
325                println!(
326                    "{}",
327                    formatter.format(
328                        &ev.component_id,
329                        &ev.component_kind,
330                        &ev.component_type,
331                        &encoded_string
332                    )
333                );
334            }
335            #[allow(clippy::print_stderr)]
336            Some(OutputEventType::Notification(ev)) => {
337                eprintln!("{}", ev.message);
338            }
339            None => {
340                error!("Received StreamOutputEventsResponse with no event data");
341            }
342        }
343    }
344}