vector/sinks/datadog/traces/
sink.rs1use std::{fmt::Debug, sync::Arc};
2
3use async_trait::async_trait;
4use futures_util::{
5 StreamExt,
6 stream::{self, BoxStream},
7};
8use tokio::sync::oneshot::{Sender, channel};
9use tower::Service;
10use vector_lib::{
11 config::log_schema,
12 event::Event,
13 partition::Partitioner,
14 sink::StreamSink,
15 stream::{BatcherSettings, DriverResponse},
16};
17use vrl::{event_path, path::PathPrefix};
18
19use super::service::TraceApiRequest;
20use crate::{
21 internal_events::DatadogTracesEncodingError,
22 sinks::{datadog::traces::request_builder::DatadogTracesRequestBuilder, util::SinkBuilderExt},
23};
24
25#[derive(Default)]
26struct EventPartitioner;
27
28#[derive(Hash, Eq, PartialEq, Clone, Debug)]
30pub(crate) struct PartitionKey {
31 pub(crate) api_key: Option<Arc<str>>,
32 pub(crate) env: Option<String>,
33 pub(crate) hostname: Option<String>,
34 pub(crate) agent_version: Option<String>,
35 pub(crate) target_tps: Option<i64>,
38 pub(crate) error_tps: Option<i64>,
39}
40
41impl Partitioner for EventPartitioner {
42 type Item = Event;
43 type Key = PartitionKey;
44
45 fn partition(&self, item: &Self::Item) -> Self::Key {
46 match item {
47 Event::Metric(_) => {
48 panic!("unexpected metric");
49 }
50 Event::Log(_) => {
51 panic!("unexpected log");
52 }
53 Event::Trace(t) => PartitionKey {
54 api_key: item.metadata().datadog_api_key(),
55 env: t
56 .get(event_path!("env"))
57 .map(|s| s.to_string_lossy().into_owned()),
58 hostname: log_schema().host_key().and_then(|key| {
59 t.get((PathPrefix::Event, key))
60 .map(|s| s.to_string_lossy().into_owned())
61 }),
62 agent_version: t
63 .get(event_path!("agent_version"))
64 .map(|s| s.to_string_lossy().into_owned()),
65 target_tps: t
66 .get(event_path!("target_tps"))
67 .and_then(|tps| tps.as_integer()),
68 error_tps: t
69 .get(event_path!("error_tps"))
70 .and_then(|tps| tps.as_integer()),
71 },
72 }
73 }
74}
75
76pub struct TracesSink<S> {
77 service: S,
78 request_builder: DatadogTracesRequestBuilder,
79 batch_settings: BatcherSettings,
80 shutdown: Sender<Sender<()>>,
81 protocol: String,
82}
83
84impl<S> TracesSink<S>
85where
86 S: Service<TraceApiRequest> + Send,
87 S::Error: Debug + Send + 'static,
88 S::Future: Send + 'static,
89 S::Response: DriverResponse,
90{
91 pub const fn new(
92 service: S,
93 request_builder: DatadogTracesRequestBuilder,
94 batch_settings: BatcherSettings,
95 shutdown: Sender<Sender<()>>,
96 protocol: String,
97 ) -> Self {
98 TracesSink {
99 service,
100 request_builder,
101 batch_settings,
102 shutdown,
103 protocol,
104 }
105 }
106
107 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
108 let batch_settings = self.batch_settings;
109
110 input
111 .batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config())
112 .incremental_request_builder(self.request_builder)
113 .flat_map(stream::iter)
114 .filter_map(|request| async move {
115 match request {
116 Err(e) => {
117 let (error_message, error_reason, dropped_events) = e.into_parts();
118 emit!(DatadogTracesEncodingError {
119 error_message,
120 error_reason,
121 dropped_events: dropped_events as usize,
122 });
123 None
124 }
125 Ok(req) => Some(req),
126 }
127 })
128 .into_driver(self.service)
129 .protocol(self.protocol)
130 .run()
131 .await?;
132
133 let (sender, receiver) = channel();
137
138 _ = self.shutdown.send(sender);
140
141 receiver.await.map_err(|_| ())
145 }
146}
147
148#[async_trait]
149impl<S> StreamSink<Event> for TracesSink<S>
150where
151 S: Service<TraceApiRequest> + Send,
152 S::Error: Debug + Send + 'static,
153 S::Future: Send + 'static,
154 S::Response: DriverResponse,
155{
156 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
157 self.run_inner(input).await
158 }
159}