vector/sinks/datadog/traces/
sink.rs1use std::{fmt::Debug, sync::Arc};
2
3use async_trait::async_trait;
4use futures_util::{
5 stream::{self, BoxStream},
6 StreamExt,
7};
8use tokio::sync::oneshot::{channel, Sender};
9use tower::Service;
10use vector_lib::stream::{BatcherSettings, DriverResponse};
11use vector_lib::{config::log_schema, event::Event, partition::Partitioner, sink::StreamSink};
12use vrl::event_path;
13use vrl::path::PathPrefix;
14
15use crate::{
16 internal_events::DatadogTracesEncodingError,
17 sinks::{datadog::traces::request_builder::DatadogTracesRequestBuilder, util::SinkBuilderExt},
18};
19
20use super::service::TraceApiRequest;
21
22#[derive(Default)]
23struct EventPartitioner;
24
25#[derive(Hash, Eq, PartialEq, Clone, Debug)]
27pub(crate) struct PartitionKey {
28 pub(crate) api_key: Option<Arc<str>>,
29 pub(crate) env: Option<String>,
30 pub(crate) hostname: Option<String>,
31 pub(crate) agent_version: Option<String>,
32 pub(crate) target_tps: Option<i64>,
35 pub(crate) error_tps: Option<i64>,
36}
37
38impl Partitioner for EventPartitioner {
39 type Item = Event;
40 type Key = PartitionKey;
41
42 fn partition(&self, item: &Self::Item) -> Self::Key {
43 match item {
44 Event::Metric(_) => {
45 panic!("unexpected metric");
46 }
47 Event::Log(_) => {
48 panic!("unexpected log");
49 }
50 Event::Trace(t) => PartitionKey {
51 api_key: item.metadata().datadog_api_key(),
52 env: t
53 .get(event_path!("env"))
54 .map(|s| s.to_string_lossy().into_owned()),
55 hostname: log_schema().host_key().and_then(|key| {
56 t.get((PathPrefix::Event, key))
57 .map(|s| s.to_string_lossy().into_owned())
58 }),
59 agent_version: t
60 .get(event_path!("agent_version"))
61 .map(|s| s.to_string_lossy().into_owned()),
62 target_tps: t
63 .get(event_path!("target_tps"))
64 .and_then(|tps| tps.as_integer()),
65 error_tps: t
66 .get(event_path!("error_tps"))
67 .and_then(|tps| tps.as_integer()),
68 },
69 }
70 }
71}
72
73pub struct TracesSink<S> {
74 service: S,
75 request_builder: DatadogTracesRequestBuilder,
76 batch_settings: BatcherSettings,
77 shutdown: Sender<Sender<()>>,
78 protocol: String,
79}
80
81impl<S> TracesSink<S>
82where
83 S: Service<TraceApiRequest> + Send,
84 S::Error: Debug + Send + 'static,
85 S::Future: Send + 'static,
86 S::Response: DriverResponse,
87{
88 pub const fn new(
89 service: S,
90 request_builder: DatadogTracesRequestBuilder,
91 batch_settings: BatcherSettings,
92 shutdown: Sender<Sender<()>>,
93 protocol: String,
94 ) -> Self {
95 TracesSink {
96 service,
97 request_builder,
98 batch_settings,
99 shutdown,
100 protocol,
101 }
102 }
103
104 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
105 let batch_settings = self.batch_settings;
106
107 input
108 .batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config())
109 .incremental_request_builder(self.request_builder)
110 .flat_map(stream::iter)
111 .filter_map(|request| async move {
112 match request {
113 Err(e) => {
114 let (error_message, error_reason, dropped_events) = e.into_parts();
115 emit!(DatadogTracesEncodingError {
116 error_message,
117 error_reason,
118 dropped_events: dropped_events as usize,
119 });
120 None
121 }
122 Ok(req) => Some(req),
123 }
124 })
125 .into_driver(self.service)
126 .protocol(self.protocol)
127 .run()
128 .await?;
129
130 let (sender, receiver) = channel();
134
135 _ = self.shutdown.send(sender);
137
138 receiver.await.map_err(|_| ())
142 }
143}
144
145#[async_trait]
146impl<S> StreamSink<Event> for TracesSink<S>
147where
148 S: Service<TraceApiRequest> + Send,
149 S::Error: Debug + Send + 'static,
150 S::Future: Send + 'static,
151 S::Response: DriverResponse,
152{
153 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
154 self.run_inner(input).await
155 }
156}