vector/sinks/datadog/traces/apm_stats/
flusher.rs

1use std::{
2    io::Write,
3    sync::{Arc, Mutex},
4};
5
6use bytes::Bytes;
7use snafu::ResultExt;
8use tokio::sync::oneshot::{Receiver, Sender};
9use vector_lib::{finalization::EventFinalizers, request_metadata::RequestMetadata};
10
11use super::{
12    aggregation::Aggregator, build_request, DDTracesMetadata, DatadogTracesEndpoint,
13    DatadogTracesEndpointConfiguration, RequestBuilderError, StatsPayload,
14    BUCKET_DURATION_NANOSECONDS,
15};
16use crate::{
17    http::{BuildRequestSnafu, HttpClient},
18    internal_events::DatadogTracesAPMStatsError,
19    sinks::util::{Compression, Compressor},
20};
21
22/// Flushes cached APM stats buckets to Datadog on a 10 second interval.
23/// When the sink signals this thread that it is shutting down, all remaining
24/// buckets are flush before the thread exits.
25///
26/// # arguments
27///
28/// * `tripwire`                 - Receiver that the sink signals when shutting down.
29/// * `client`                   - HttpClient to use in sending the stats payloads.
30/// * `compression`              - Compression to use when creating the HTTP requests.
31/// * `endpoint_configuration`   - Endpoint configuration to use when creating the HTTP requests.
32/// * `aggregator`               - The Aggregator object containing cached stats buckets.
33pub async fn flush_apm_stats_thread(
34    mut tripwire: Receiver<Sender<()>>,
35    client: HttpClient,
36    compression: Compression,
37    endpoint_configuration: DatadogTracesEndpointConfiguration,
38    aggregator: Arc<Mutex<Aggregator>>,
39) {
40    let sender = ApmStatsSender {
41        client,
42        compression,
43        endpoint_configuration,
44        aggregator,
45    };
46
47    // flush on the same interval as the stats buckets
48    let mut interval =
49        tokio::time::interval(std::time::Duration::from_nanos(BUCKET_DURATION_NANOSECONDS));
50
51    debug!("Starting APM stats flushing thread.");
52
53    loop {
54        tokio::select! {
55
56        _ = interval.tick() => {
57            // flush the oldest bucket from the cache to Datadog
58            sender.flush_apm_stats(false).await;
59        },
60        signal = &mut tripwire =>  match signal {
61            // sink has signaled us that the process is shutting down
62            Ok(sink_shutdown_ack_sender) => {
63
64                debug!("APM stats flushing thread received exit condition. Flushing remaining stats before exiting.");
65                sender.flush_apm_stats(true).await;
66
67                // signal the sink (who tripped the tripwire), that we are done flushing
68                _ = sink_shutdown_ack_sender.send(());
69                break;
70            }
71            Err(_) => {
72                error!(
73                    internal_log_rate_limit = true,
74                    message = "Tokio Sender unexpectedly dropped."
75                );
76                break;
77            },
78        }
79        }
80    }
81}
82
83struct ApmStatsSender {
84    client: HttpClient,
85    compression: Compression,
86    endpoint_configuration: DatadogTracesEndpointConfiguration,
87    aggregator: Arc<Mutex<Aggregator>>,
88}
89
90impl ApmStatsSender {
91    async fn flush_apm_stats(&self, force: bool) {
92        // explicit scope to minimize duration that the Aggregator is locked.
93        if let Some((payload, api_key)) = {
94            let mut aggregator = self.aggregator.lock().unwrap();
95            let client_stats_payloads = aggregator.flush(force);
96
97            if client_stats_payloads.is_empty() {
98                // no sense proceeding if no payloads to flush
99                None
100            } else {
101                let payload = StatsPayload {
102                    agent_hostname: aggregator.get_agent_hostname(),
103                    agent_env: aggregator.get_agent_env(),
104                    stats: client_stats_payloads,
105                    agent_version: aggregator.get_agent_version(),
106                    client_computed: false,
107                };
108
109                Some((payload, aggregator.get_api_key()))
110            }
111        } {
112            if let Err(error) = self.compress_and_send(payload, api_key).await {
113                emit!(DatadogTracesAPMStatsError { error });
114            }
115        }
116    }
117
118    async fn compress_and_send(
119        &self,
120        payload: StatsPayload,
121        api_key: Arc<str>,
122    ) -> Result<(), Box<dyn std::error::Error>> {
123        let (metadata, compressed_payload) = self.build_apm_stats_request_data(api_key, payload)?;
124
125        let request_metadata = RequestMetadata::default();
126        let trace_api_request = build_request(
127            (metadata, request_metadata),
128            compressed_payload,
129            self.compression,
130            &self.endpoint_configuration,
131        );
132
133        let http_request = trace_api_request
134            .into_http_request()
135            .context(BuildRequestSnafu)?;
136
137        self.client.send(http_request).await?;
138
139        Ok(())
140    }
141
142    fn build_apm_stats_request_data(
143        &self,
144        api_key: Arc<str>,
145        payload: StatsPayload,
146    ) -> Result<(DDTracesMetadata, Bytes), RequestBuilderError> {
147        let encoded_payload =
148            rmp_serde::to_vec_named(&payload).map_err(|e| RequestBuilderError::FailedToBuild {
149                message: "Encoding failed.",
150                reason: e.to_string(),
151                dropped_events: 0,
152            })?;
153        let uncompressed_size = encoded_payload.len();
154        let metadata = DDTracesMetadata {
155            api_key,
156            endpoint: DatadogTracesEndpoint::APMStats,
157            finalizers: EventFinalizers::default(),
158            uncompressed_size,
159            content_type: "application/msgpack".to_string(),
160        };
161
162        let mut compressor = Compressor::from(self.compression);
163        match compressor.write_all(&encoded_payload) {
164            Ok(()) => {
165                let bytes = compressor.into_inner().freeze();
166
167                Ok((metadata, bytes))
168            }
169            Err(e) => Err(RequestBuilderError::FailedToBuild {
170                message: "Compression failed.",
171                reason: e.to_string(),
172                dropped_events: 0,
173            }),
174        }
175    }
176}