vector/sinks/datadog/traces/
sink.rs

1use std::{fmt::Debug, sync::Arc};
2
3use async_trait::async_trait;
4use futures_util::{
5    stream::{self, BoxStream},
6    StreamExt,
7};
8use tokio::sync::oneshot::{channel, Sender};
9use tower::Service;
10use vector_lib::stream::{BatcherSettings, DriverResponse};
11use vector_lib::{config::log_schema, event::Event, partition::Partitioner, sink::StreamSink};
12use vrl::event_path;
13use vrl::path::PathPrefix;
14
15use crate::{
16    internal_events::DatadogTracesEncodingError,
17    sinks::{datadog::traces::request_builder::DatadogTracesRequestBuilder, util::SinkBuilderExt},
18};
19
20use super::service::TraceApiRequest;
21
22#[derive(Default)]
23struct EventPartitioner;
24
25// Use all fields from the top level protobuf construct associated with the API key
26#[derive(Hash, Eq, PartialEq, Clone, Debug)]
27pub(crate) struct PartitionKey {
28    pub(crate) api_key: Option<Arc<str>>,
29    pub(crate) env: Option<String>,
30    pub(crate) hostname: Option<String>,
31    pub(crate) agent_version: Option<String>,
32    // Those two last fields are configuration value and not a per-trace/span information, they come from the Datadog
33    // trace-agent config directly: https://github.com/DataDog/datadog-agent/blob/0f73a78/pkg/trace/config/config.go#L293-L294
34    pub(crate) target_tps: Option<i64>,
35    pub(crate) error_tps: Option<i64>,
36}
37
38impl Partitioner for EventPartitioner {
39    type Item = Event;
40    type Key = PartitionKey;
41
42    fn partition(&self, item: &Self::Item) -> Self::Key {
43        match item {
44            Event::Metric(_) => {
45                panic!("unexpected metric");
46            }
47            Event::Log(_) => {
48                panic!("unexpected log");
49            }
50            Event::Trace(t) => PartitionKey {
51                api_key: item.metadata().datadog_api_key(),
52                env: t
53                    .get(event_path!("env"))
54                    .map(|s| s.to_string_lossy().into_owned()),
55                hostname: log_schema().host_key().and_then(|key| {
56                    t.get((PathPrefix::Event, key))
57                        .map(|s| s.to_string_lossy().into_owned())
58                }),
59                agent_version: t
60                    .get(event_path!("agent_version"))
61                    .map(|s| s.to_string_lossy().into_owned()),
62                target_tps: t
63                    .get(event_path!("target_tps"))
64                    .and_then(|tps| tps.as_integer()),
65                error_tps: t
66                    .get(event_path!("error_tps"))
67                    .and_then(|tps| tps.as_integer()),
68            },
69        }
70    }
71}
72
73pub struct TracesSink<S> {
74    service: S,
75    request_builder: DatadogTracesRequestBuilder,
76    batch_settings: BatcherSettings,
77    shutdown: Sender<Sender<()>>,
78    protocol: String,
79}
80
81impl<S> TracesSink<S>
82where
83    S: Service<TraceApiRequest> + Send,
84    S::Error: Debug + Send + 'static,
85    S::Future: Send + 'static,
86    S::Response: DriverResponse,
87{
88    pub const fn new(
89        service: S,
90        request_builder: DatadogTracesRequestBuilder,
91        batch_settings: BatcherSettings,
92        shutdown: Sender<Sender<()>>,
93        protocol: String,
94    ) -> Self {
95        TracesSink {
96            service,
97            request_builder,
98            batch_settings,
99            shutdown,
100            protocol,
101        }
102    }
103
104    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
105        let batch_settings = self.batch_settings;
106
107        input
108            .batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config())
109            .incremental_request_builder(self.request_builder)
110            .flat_map(stream::iter)
111            .filter_map(|request| async move {
112                match request {
113                    Err(e) => {
114                        let (error_message, error_reason, dropped_events) = e.into_parts();
115                        emit!(DatadogTracesEncodingError {
116                            error_message,
117                            error_reason,
118                            dropped_events: dropped_events as usize,
119                        });
120                        None
121                    }
122                    Ok(req) => Some(req),
123                }
124            })
125            .into_driver(self.service)
126            .protocol(self.protocol)
127            .run()
128            .await?;
129
130        // Create a channel for the stats flushing thread to communicate back that it has flushed
131        // remaining stats. This is necessary so that we do not terminate the process while the
132        // stats flushing thread is trying to complete the HTTP request.
133        let (sender, receiver) = channel();
134
135        // Signal the stats thread task to flush remaining payloads and shutdown.
136        _ = self.shutdown.send(sender);
137
138        // The stats flushing thread has until the component shutdown grace period to end
139        // gracefully. Otherwise the sink + stats flushing thread will be killed and an error
140        // reported upstream.
141        receiver.await.map_err(|_| ())
142    }
143}
144
145#[async_trait]
146impl<S> StreamSink<Event> for TracesSink<S>
147where
148    S: Service<TraceApiRequest> + Send,
149    S::Error: Debug + Send + 'static,
150    S::Future: Send + 'static,
151    S::Response: DriverResponse,
152{
153    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
154        self.run_inner(input).await
155    }
156}