vector/sinks/datadog/traces/
sink.rs

1use std::{fmt::Debug, sync::Arc};
2
3use async_trait::async_trait;
4use futures_util::{
5    StreamExt,
6    stream::{self, BoxStream},
7};
8use tokio::sync::oneshot::{Sender, channel};
9use tower::Service;
10use vector_lib::{
11    config::log_schema,
12    event::Event,
13    partition::Partitioner,
14    sink::StreamSink,
15    stream::{BatcherSettings, DriverResponse},
16};
17use vrl::{event_path, path::PathPrefix};
18
19use super::service::TraceApiRequest;
20use crate::{
21    internal_events::DatadogTracesEncodingError,
22    sinks::{datadog::traces::request_builder::DatadogTracesRequestBuilder, util::SinkBuilderExt},
23};
24
25#[derive(Default)]
26struct EventPartitioner;
27
28// Use all fields from the top level protobuf construct associated with the API key
29#[derive(Hash, Eq, PartialEq, Clone, Debug)]
30pub(crate) struct PartitionKey {
31    pub(crate) api_key: Option<Arc<str>>,
32    pub(crate) env: Option<String>,
33    pub(crate) hostname: Option<String>,
34    pub(crate) agent_version: Option<String>,
35    // Those two last fields are configuration value and not a per-trace/span information, they come from the Datadog
36    // trace-agent config directly: https://github.com/DataDog/datadog-agent/blob/0f73a78/pkg/trace/config/config.go#L293-L294
37    pub(crate) target_tps: Option<i64>,
38    pub(crate) error_tps: Option<i64>,
39}
40
41impl Partitioner for EventPartitioner {
42    type Item = Event;
43    type Key = PartitionKey;
44
45    fn partition(&self, item: &Self::Item) -> Self::Key {
46        match item {
47            Event::Metric(_) => {
48                panic!("unexpected metric");
49            }
50            Event::Log(_) => {
51                panic!("unexpected log");
52            }
53            Event::Trace(t) => PartitionKey {
54                api_key: item.metadata().datadog_api_key(),
55                env: t
56                    .get(event_path!("env"))
57                    .map(|s| s.to_string_lossy().into_owned()),
58                hostname: log_schema().host_key().and_then(|key| {
59                    t.get((PathPrefix::Event, key))
60                        .map(|s| s.to_string_lossy().into_owned())
61                }),
62                agent_version: t
63                    .get(event_path!("agent_version"))
64                    .map(|s| s.to_string_lossy().into_owned()),
65                target_tps: t
66                    .get(event_path!("target_tps"))
67                    .and_then(|tps| tps.as_integer()),
68                error_tps: t
69                    .get(event_path!("error_tps"))
70                    .and_then(|tps| tps.as_integer()),
71            },
72        }
73    }
74}
75
76pub struct TracesSink<S> {
77    service: S,
78    request_builder: DatadogTracesRequestBuilder,
79    batch_settings: BatcherSettings,
80    shutdown: Sender<Sender<()>>,
81    protocol: String,
82}
83
84impl<S> TracesSink<S>
85where
86    S: Service<TraceApiRequest> + Send,
87    S::Error: Debug + Send + 'static,
88    S::Future: Send + 'static,
89    S::Response: DriverResponse,
90{
91    pub const fn new(
92        service: S,
93        request_builder: DatadogTracesRequestBuilder,
94        batch_settings: BatcherSettings,
95        shutdown: Sender<Sender<()>>,
96        protocol: String,
97    ) -> Self {
98        TracesSink {
99            service,
100            request_builder,
101            batch_settings,
102            shutdown,
103            protocol,
104        }
105    }
106
107    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
108        let batch_settings = self.batch_settings;
109
110        input
111            .batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config())
112            .incremental_request_builder(self.request_builder)
113            .flat_map(stream::iter)
114            .filter_map(|request| async move {
115                match request {
116                    Err(e) => {
117                        let (error_message, error_reason, dropped_events) = e.into_parts();
118                        emit!(DatadogTracesEncodingError {
119                            error_message,
120                            error_reason,
121                            dropped_events: dropped_events as usize,
122                        });
123                        None
124                    }
125                    Ok(req) => Some(req),
126                }
127            })
128            .into_driver(self.service)
129            .protocol(self.protocol)
130            .run()
131            .await?;
132
133        // Create a channel for the stats flushing thread to communicate back that it has flushed
134        // remaining stats. This is necessary so that we do not terminate the process while the
135        // stats flushing thread is trying to complete the HTTP request.
136        let (sender, receiver) = channel();
137
138        // Signal the stats thread task to flush remaining payloads and shutdown.
139        _ = self.shutdown.send(sender);
140
141        // The stats flushing thread has until the component shutdown grace period to end
142        // gracefully. Otherwise the sink + stats flushing thread will be killed and an error
143        // reported upstream.
144        receiver.await.map_err(|_| ())
145    }
146}
147
148#[async_trait]
149impl<S> StreamSink<Event> for TracesSink<S>
150where
151    S: Service<TraceApiRequest> + Send,
152    S::Error: Debug + Send + 'static,
153    S::Future: Send + 'static,
154    S::Response: DriverResponse,
155{
156    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
157        self.run_inner(input).await
158    }
159}