vector/transforms/
trace_to_log.rs1use vector_lib::config::clone_input_definitions;
2use vector_lib::configurable::configurable_component;
3
4use crate::config::OutputId;
5use crate::{
6 config::{DataType, GenerateConfig, Input, TransformConfig, TransformContext, TransformOutput},
7 event::{Event, LogEvent},
8 schema::Definition,
9 transforms::{FunctionTransform, OutputBuffer, Transform},
10};
11
12#[configurable_component(transform("trace_to_log", "Convert trace events to log events."))]
18#[derive(Clone, Debug, Default)]
19#[serde(deny_unknown_fields)]
20pub struct TraceToLogConfig {
21 #[serde(default)]
23 #[configurable(metadata(docs::hidden))]
24 pub log_namespace: Option<bool>,
25}
26
27impl GenerateConfig for TraceToLogConfig {
28 fn generate_config() -> toml::Value {
29 toml::Value::try_from(Self {
30 log_namespace: None,
31 })
32 .unwrap()
33 }
34}
35
36#[async_trait::async_trait]
37#[typetag::serde(name = "trace_to_log")]
38impl TransformConfig for TraceToLogConfig {
39 async fn build(&self, _context: &TransformContext) -> crate::Result<Transform> {
40 Ok(Transform::function(TraceToLog))
41 }
42
43 fn enable_concurrency(&self) -> bool {
44 true
45 }
46
47 fn input(&self) -> Input {
48 Input::trace()
49 }
50
51 fn outputs(
52 &self,
53 _: &TransformContext,
54 input_definitions: &[(OutputId, Definition)],
55 ) -> Vec<TransformOutput> {
56 vec![TransformOutput::new(
57 DataType::Log,
58 clone_input_definitions(input_definitions),
59 )]
60 }
61}
62
63#[derive(Clone, Debug)]
64pub struct TraceToLog;
65
66impl FunctionTransform for TraceToLog {
67 fn transform(&mut self, output: &mut OutputBuffer, event: Event) {
68 if let Event::Trace(trace) = event {
69 output.push(Event::Log(LogEvent::from(trace)));
70 }
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use super::*;
77 use crate::test_util::components::assert_transform_compliance;
78 use crate::transforms::test::create_topology;
79 use tokio::sync::mpsc;
80 use tokio_stream::wrappers::ReceiverStream;
81 use vector_lib::event::TraceEvent;
82
83 #[test]
84 fn generate_config() {
85 crate::test_util::test_generate_config::<TraceToLogConfig>();
86 }
87
88 async fn do_transform(trace: TraceEvent) -> Option<LogEvent> {
89 assert_transform_compliance(async move {
90 let config = TraceToLogConfig {
91 log_namespace: Some(false),
92 };
93 let (tx, rx) = mpsc::channel(1);
94 let (topology, mut out) = create_topology(ReceiverStream::new(rx), config).await;
95
96 tx.send(trace.into()).await.unwrap();
97
98 let result = out.recv().await;
99
100 drop(tx);
101 topology.stop().await;
102 assert_eq!(out.recv().await, None);
103
104 result
105 })
106 .await
107 .map(|e| e.into_log())
108 }
109
110 #[tokio::test]
111 async fn transform_trace() {
112 use vrl::btreemap;
113
114 let trace = TraceEvent::from(btreemap! {
115 "span_id" => "abc123",
116 "trace_id" => "xyz789",
117 "span_name" => "test-span",
118 "service" => "my-service",
119 });
120
121 let (expected_map, _) = trace.clone().into_parts();
122
123 let log = do_transform(trace).await.unwrap();
124 let (actual_value, _) = log.into_parts();
125 let actual_map = actual_value
126 .into_object()
127 .expect("log value should be an object");
128
129 assert_eq!(
130 actual_map, expected_map,
131 "Trace data fields should be preserved"
132 );
133 }
134}