use std::{fmt::Debug, sync::Arc};
use async_trait::async_trait;
use futures_util::{
stream::{self, BoxStream},
StreamExt,
};
use tokio::sync::oneshot::{channel, Sender};
use tower::Service;
use vector_lib::stream::{BatcherSettings, DriverResponse};
use vector_lib::{config::log_schema, event::Event, partition::Partitioner, sink::StreamSink};
use vrl::event_path;
use vrl::path::PathPrefix;
use crate::{
internal_events::DatadogTracesEncodingError,
sinks::{datadog::traces::request_builder::DatadogTracesRequestBuilder, util::SinkBuilderExt},
};
use super::service::TraceApiRequest;
#[derive(Default)]
struct EventPartitioner;
#[derive(Hash, Eq, PartialEq, Clone, Debug)]
pub(crate) struct PartitionKey {
pub(crate) api_key: Option<Arc<str>>,
pub(crate) env: Option<String>,
pub(crate) hostname: Option<String>,
pub(crate) agent_version: Option<String>,
pub(crate) target_tps: Option<i64>,
pub(crate) error_tps: Option<i64>,
}
impl Partitioner for EventPartitioner {
type Item = Event;
type Key = PartitionKey;
fn partition(&self, item: &Self::Item) -> Self::Key {
match item {
Event::Metric(_) => {
panic!("unexpected metric");
}
Event::Log(_) => {
panic!("unexpected log");
}
Event::Trace(t) => PartitionKey {
api_key: item.metadata().datadog_api_key(),
env: t
.get(event_path!("env"))
.map(|s| s.to_string_lossy().into_owned()),
hostname: log_schema().host_key().and_then(|key| {
t.get((PathPrefix::Event, key))
.map(|s| s.to_string_lossy().into_owned())
}),
agent_version: t
.get(event_path!("agent_version"))
.map(|s| s.to_string_lossy().into_owned()),
target_tps: t
.get(event_path!("target_tps"))
.and_then(|tps| tps.as_integer().map(Into::into)),
error_tps: t
.get(event_path!("error_tps"))
.and_then(|tps| tps.as_integer().map(Into::into)),
},
}
}
}
pub struct TracesSink<S> {
service: S,
request_builder: DatadogTracesRequestBuilder,
batch_settings: BatcherSettings,
shutdown: Sender<Sender<()>>,
protocol: String,
}
impl<S> TracesSink<S>
where
S: Service<TraceApiRequest> + Send,
S::Error: Debug + Send + 'static,
S::Future: Send + 'static,
S::Response: DriverResponse,
{
pub const fn new(
service: S,
request_builder: DatadogTracesRequestBuilder,
batch_settings: BatcherSettings,
shutdown: Sender<Sender<()>>,
protocol: String,
) -> Self {
TracesSink {
service,
request_builder,
batch_settings,
shutdown,
protocol,
}
}
async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
let batch_settings = self.batch_settings;
input
.batched_partitioned(EventPartitioner, || batch_settings.as_byte_size_config())
.incremental_request_builder(self.request_builder)
.flat_map(stream::iter)
.filter_map(|request| async move {
match request {
Err(e) => {
let (error_message, error_reason, dropped_events) = e.into_parts();
emit!(DatadogTracesEncodingError {
error_message,
error_reason,
dropped_events: dropped_events as usize,
});
None
}
Ok(req) => Some(req),
}
})
.into_driver(self.service)
.protocol(self.protocol)
.run()
.await?;
let (sender, receiver) = channel();
_ = self.shutdown.send(sender);
receiver.await.map_err(|_| ())
}
}
#[async_trait]
impl<S> StreamSink<Event> for TracesSink<S>
where
S: Service<TraceApiRequest> + Send,
S::Error: Debug + Send + 'static,
S::Future: Send + 'static,
S::Response: DriverResponse,
{
async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
self.run_inner(input).await
}
}