1use std::{collections::HashMap, error, sync::Arc, time::Instant};
2
3use vector_common::{
4 EventDataEq,
5 byte_size_of::ByteSizeOf,
6 internal_event::{
7 self, CountByteSize, DEFAULT_OUTPUT, EventsSent, InternalEventHandle as _, Registered,
8 register,
9 },
10 json_size::JsonSize,
11};
12
13use crate::{
14 config,
15 config::{ComponentKey, OutputId},
16 event::{EstimatedJsonEncodedSizeOf, Event, EventArray, EventContainer, EventMutRef, EventRef},
17 fanout::{self, Fanout},
18 schema,
19};
20
21struct TransformOutput {
22 fanout: Fanout,
23 events_sent: Registered<EventsSent>,
24 log_schema_definitions: HashMap<OutputId, Arc<schema::Definition>>,
25 output_id: Arc<OutputId>,
26}
27
28pub struct TransformOutputs {
29 outputs_spec: Vec<config::TransformOutput>,
30 primary_output: Option<TransformOutput>,
31 named_outputs: HashMap<String, TransformOutput>,
32}
33
34impl TransformOutputs {
35 pub fn new(
36 outputs_in: Vec<config::TransformOutput>,
37 component_key: &ComponentKey,
38 ) -> (Self, HashMap<Option<String>, fanout::ControlChannel>) {
39 let outputs_spec = outputs_in.clone();
40 let mut primary_output = None;
41 let mut named_outputs = HashMap::new();
42 let mut controls = HashMap::new();
43
44 for output in outputs_in {
45 let (fanout, control) = Fanout::new();
46
47 let log_schema_definitions = output
48 .log_schema_definitions
49 .into_iter()
50 .map(|(id, definition)| (id, Arc::new(definition)))
51 .collect();
52
53 match output.port {
54 None => {
55 primary_output = Some(TransformOutput {
56 fanout,
57 events_sent: register(EventsSent::from(internal_event::Output(Some(
58 DEFAULT_OUTPUT.into(),
59 )))),
60 log_schema_definitions,
61 output_id: Arc::new(OutputId {
62 component: component_key.clone(),
63 port: None,
64 }),
65 });
66 controls.insert(None, control);
67 }
68 Some(name) => {
69 named_outputs.insert(
70 name.clone(),
71 TransformOutput {
72 fanout,
73 events_sent: register(EventsSent::from(internal_event::Output(Some(
74 name.clone().into(),
75 )))),
76 log_schema_definitions,
77 output_id: Arc::new(OutputId {
78 component: component_key.clone(),
79 port: Some(name.clone()),
80 }),
81 },
82 );
83 controls.insert(Some(name.clone()), control);
84 }
85 }
86 }
87
88 let me = Self {
89 outputs_spec,
90 primary_output,
91 named_outputs,
92 };
93
94 (me, controls)
95 }
96
97 pub fn new_buf_with_capacity(&self, capacity: usize) -> TransformOutputsBuf {
98 TransformOutputsBuf::new_with_capacity(self.outputs_spec.clone(), capacity)
99 }
100
101 pub async fn send(
108 &mut self,
109 buf: &mut TransformOutputsBuf,
110 ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
111 if let Some(primary) = self.primary_output.as_mut() {
112 let Some(buf) = buf.primary_buffer.as_mut() else {
113 unreachable!("mismatched outputs");
114 };
115 Self::send_single_buffer(buf, primary).await?;
116 }
117 for (key, buf) in &mut buf.named_buffers {
118 let Some(output) = self.named_outputs.get_mut(key) else {
119 unreachable!("unknown output");
120 };
121 Self::send_single_buffer(buf, output).await?;
122 }
123 Ok(())
124 }
125
126 async fn send_single_buffer(
127 buf: &mut OutputBuffer,
128 output: &mut TransformOutput,
129 ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
130 for event in buf.events_mut() {
131 super::update_runtime_schema_definition(
132 event,
133 &output.output_id,
134 &output.log_schema_definitions,
135 );
136 }
137 let count = buf.len();
138 let byte_size = buf.estimated_json_encoded_size_of();
139 buf.send(&mut output.fanout).await?;
140 output.events_sent.emit(CountByteSize(count, byte_size));
141 Ok(())
142 }
143}
144
145#[derive(Debug, Clone)]
146pub struct TransformOutputsBuf {
147 pub(super) primary_buffer: Option<OutputBuffer>,
148 pub(super) named_buffers: HashMap<String, OutputBuffer>,
149}
150
151impl TransformOutputsBuf {
152 pub fn new_with_capacity(outputs_in: Vec<config::TransformOutput>, capacity: usize) -> Self {
153 let mut primary_buffer = None;
154 let mut named_buffers = HashMap::new();
155
156 for output in outputs_in {
157 match output.port {
158 None => {
159 primary_buffer = Some(OutputBuffer::with_capacity(capacity));
160 }
161 Some(name) => {
162 named_buffers.insert(name.clone(), OutputBuffer::default());
163 }
164 }
165 }
166
167 Self {
168 primary_buffer,
169 named_buffers,
170 }
171 }
172
173 pub fn push(&mut self, name: Option<&str>, event: Event) {
179 match name {
180 Some(name) => self.named_buffers.get_mut(name),
181 None => self.primary_buffer.as_mut(),
182 }
183 .expect("unknown output")
184 .push(event);
185 }
186
187 pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
193 self.primary_buffer
194 .as_mut()
195 .expect("no default output")
196 .drain()
197 }
198
199 pub fn drain_named(&mut self, name: &str) -> impl Iterator<Item = Event> + '_ {
205 self.named_buffers
206 .get_mut(name)
207 .expect("unknown output")
208 .drain()
209 }
210
211 pub fn take_primary(&mut self) -> OutputBuffer {
217 std::mem::take(self.primary_buffer.as_mut().expect("no default output"))
218 }
219
220 pub fn take_all_named(&mut self) -> HashMap<String, OutputBuffer> {
221 std::mem::take(&mut self.named_buffers)
222 }
223
224 pub fn for_each_array_mut(&mut self, mut f: impl FnMut(&mut EventArray)) {
229 if let Some(primary) = self.primary_buffer.as_mut() {
230 primary.for_each_array_mut(&mut f);
231 }
232 for buf in self.named_buffers.values_mut() {
233 buf.for_each_array_mut(&mut f);
234 }
235 }
236}
237
238impl ByteSizeOf for TransformOutputsBuf {
239 fn allocated_bytes(&self) -> usize {
240 self.primary_buffer.size_of()
241 + self
242 .named_buffers
243 .values()
244 .map(ByteSizeOf::size_of)
245 .sum::<usize>()
246 }
247}
248
249#[derive(Debug, Default, Clone)]
250pub struct OutputBuffer(pub(super) Vec<EventArray>);
251
252impl OutputBuffer {
253 pub fn with_capacity(capacity: usize) -> Self {
254 Self(Vec::with_capacity(capacity))
255 }
256
257 pub fn push(&mut self, event: Event) {
258 match (event, self.0.last_mut()) {
260 (Event::Log(log), Some(EventArray::Logs(logs))) => {
261 logs.push(log);
262 }
263 (Event::Metric(metric), Some(EventArray::Metrics(metrics))) => {
264 metrics.push(metric);
265 }
266 (Event::Trace(trace), Some(EventArray::Traces(traces))) => {
267 traces.push(trace);
268 }
269 (event, _) => {
270 self.0.push(event.into());
271 }
272 }
273 }
274
275 pub fn append(&mut self, events: &mut Vec<Event>) {
276 for event in events.drain(..) {
277 self.push(event);
278 }
279 }
280
281 pub fn extend(&mut self, events: impl Iterator<Item = Event>) {
282 for event in events {
283 self.push(event);
284 }
285 }
286
287 pub fn is_empty(&self) -> bool {
288 self.0.is_empty()
289 }
290
291 pub fn len(&self) -> usize {
292 self.0.iter().map(EventArray::len).sum()
293 }
294
295 pub fn capacity(&self) -> usize {
296 self.0.capacity()
297 }
298
299 pub fn first(&self) -> Option<EventRef<'_>> {
300 self.0.first().and_then(|first| match first {
301 EventArray::Logs(l) => l.first().map(Into::into),
302 EventArray::Metrics(m) => m.first().map(Into::into),
303 EventArray::Traces(t) => t.first().map(Into::into),
304 })
305 }
306
307 pub fn drain(&mut self) -> impl Iterator<Item = Event> + '_ {
308 self.0.drain(..).flat_map(EventArray::into_events)
309 }
310
311 pub fn for_each_array_mut(&mut self, mut f: impl FnMut(&mut EventArray)) {
313 for array in &mut self.0 {
314 f(array);
315 }
316 }
317
318 async fn send(
319 &mut self,
320 output: &mut Fanout,
321 ) -> Result<(), Box<dyn error::Error + Send + Sync>> {
322 let send_start = Some(Instant::now());
323 for array in std::mem::take(&mut self.0) {
324 output.send(array, send_start).await?;
325 }
326
327 Ok(())
328 }
329
330 fn iter_events(&self) -> impl Iterator<Item = EventRef<'_>> {
331 self.0.iter().flat_map(EventArray::iter_events)
332 }
333
334 fn events_mut(&mut self) -> impl Iterator<Item = EventMutRef<'_>> {
335 self.0.iter_mut().flat_map(EventArray::iter_events_mut)
336 }
337
338 pub fn into_events(self) -> impl Iterator<Item = Event> {
339 self.0.into_iter().flat_map(EventArray::into_events)
340 }
341}
342
343impl ByteSizeOf for OutputBuffer {
344 fn allocated_bytes(&self) -> usize {
345 self.0.iter().map(ByteSizeOf::size_of).sum()
346 }
347}
348
349impl EventDataEq<Vec<Event>> for OutputBuffer {
350 fn event_data_eq(&self, other: &Vec<Event>) -> bool {
351 struct Comparator<'a>(EventRef<'a>);
352
353 impl PartialEq<&Event> for Comparator<'_> {
354 fn eq(&self, that: &&Event) -> bool {
355 self.0.event_data_eq(that)
356 }
357 }
358
359 self.iter_events().map(Comparator).eq(other.iter())
360 }
361}
362
363impl EstimatedJsonEncodedSizeOf for OutputBuffer {
364 fn estimated_json_encoded_size_of(&self) -> JsonSize {
365 self.0
366 .iter()
367 .map(EstimatedJsonEncodedSizeOf::estimated_json_encoded_size_of)
368 .sum()
369 }
370}
371
372impl From<Vec<Event>> for OutputBuffer {
373 fn from(events: Vec<Event>) -> Self {
374 let mut result = Self::default();
375 result.extend(events.into_iter());
376 result
377 }
378}