vector/sinks/datadog/traces/apm_stats/
flusher.rs1use std::{
2 io::Write,
3 sync::{Arc, Mutex},
4};
5
6use bytes::Bytes;
7use snafu::ResultExt;
8use tokio::sync::oneshot::{Receiver, Sender};
9use vector_lib::{finalization::EventFinalizers, request_metadata::RequestMetadata};
10
11use super::{
12 aggregation::Aggregator, build_request, DDTracesMetadata, DatadogTracesEndpoint,
13 DatadogTracesEndpointConfiguration, RequestBuilderError, StatsPayload,
14 BUCKET_DURATION_NANOSECONDS,
15};
16use crate::{
17 http::{BuildRequestSnafu, HttpClient},
18 internal_events::DatadogTracesAPMStatsError,
19 sinks::util::{Compression, Compressor},
20};
21
22pub async fn flush_apm_stats_thread(
34 mut tripwire: Receiver<Sender<()>>,
35 client: HttpClient,
36 compression: Compression,
37 endpoint_configuration: DatadogTracesEndpointConfiguration,
38 aggregator: Arc<Mutex<Aggregator>>,
39) {
40 let sender = ApmStatsSender {
41 client,
42 compression,
43 endpoint_configuration,
44 aggregator,
45 };
46
47 let mut interval =
49 tokio::time::interval(std::time::Duration::from_nanos(BUCKET_DURATION_NANOSECONDS));
50
51 debug!("Starting APM stats flushing thread.");
52
53 loop {
54 tokio::select! {
55
56 _ = interval.tick() => {
57 sender.flush_apm_stats(false).await;
59 },
60 signal = &mut tripwire => match signal {
61 Ok(sink_shutdown_ack_sender) => {
63
64 debug!("APM stats flushing thread received exit condition. Flushing remaining stats before exiting.");
65 sender.flush_apm_stats(true).await;
66
67 _ = sink_shutdown_ack_sender.send(());
69 break;
70 }
71 Err(_) => {
72 error!(
73 internal_log_rate_limit = true,
74 message = "Tokio Sender unexpectedly dropped."
75 );
76 break;
77 },
78 }
79 }
80 }
81}
82
83struct ApmStatsSender {
84 client: HttpClient,
85 compression: Compression,
86 endpoint_configuration: DatadogTracesEndpointConfiguration,
87 aggregator: Arc<Mutex<Aggregator>>,
88}
89
90impl ApmStatsSender {
91 async fn flush_apm_stats(&self, force: bool) {
92 if let Some((payload, api_key)) = {
94 let mut aggregator = self.aggregator.lock().unwrap();
95 let client_stats_payloads = aggregator.flush(force);
96
97 if client_stats_payloads.is_empty() {
98 None
100 } else {
101 let payload = StatsPayload {
102 agent_hostname: aggregator.get_agent_hostname(),
103 agent_env: aggregator.get_agent_env(),
104 stats: client_stats_payloads,
105 agent_version: aggregator.get_agent_version(),
106 client_computed: false,
107 };
108
109 Some((payload, aggregator.get_api_key()))
110 }
111 } {
112 if let Err(error) = self.compress_and_send(payload, api_key).await {
113 emit!(DatadogTracesAPMStatsError { error });
114 }
115 }
116 }
117
118 async fn compress_and_send(
119 &self,
120 payload: StatsPayload,
121 api_key: Arc<str>,
122 ) -> Result<(), Box<dyn std::error::Error>> {
123 let (metadata, compressed_payload) = self.build_apm_stats_request_data(api_key, payload)?;
124
125 let request_metadata = RequestMetadata::default();
126 let trace_api_request = build_request(
127 (metadata, request_metadata),
128 compressed_payload,
129 self.compression,
130 &self.endpoint_configuration,
131 );
132
133 let http_request = trace_api_request
134 .into_http_request()
135 .context(BuildRequestSnafu)?;
136
137 self.client.send(http_request).await?;
138
139 Ok(())
140 }
141
142 fn build_apm_stats_request_data(
143 &self,
144 api_key: Arc<str>,
145 payload: StatsPayload,
146 ) -> Result<(DDTracesMetadata, Bytes), RequestBuilderError> {
147 let encoded_payload =
148 rmp_serde::to_vec_named(&payload).map_err(|e| RequestBuilderError::FailedToBuild {
149 message: "Encoding failed.",
150 reason: e.to_string(),
151 dropped_events: 0,
152 })?;
153 let uncompressed_size = encoded_payload.len();
154 let metadata = DDTracesMetadata {
155 api_key,
156 endpoint: DatadogTracesEndpoint::APMStats,
157 finalizers: EventFinalizers::default(),
158 uncompressed_size,
159 content_type: "application/msgpack".to_string(),
160 };
161
162 let mut compressor = Compressor::from(self.compression);
163 match compressor.write_all(&encoded_payload) {
164 Ok(()) => {
165 let bytes = compressor.into_inner().freeze();
166
167 Ok((metadata, bytes))
168 }
169 Err(e) => Err(RequestBuilderError::FailedToBuild {
170 message: "Compression failed.",
171 reason: e.to_string(),
172 dropped_events: 0,
173 }),
174 }
175 }
176}