vector/sinks/sematext/
metrics.rs

1use std::{collections::HashMap, future::ready, task::Poll};
2
3use bytes::{Bytes, BytesMut};
4use futures::{future::BoxFuture, stream, FutureExt, SinkExt};
5use http::{StatusCode, Uri};
6use hyper::{Body, Request};
7use indoc::indoc;
8use tower::Service;
9use vector_lib::configurable::configurable_component;
10use vector_lib::sensitive_string::SensitiveString;
11use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf};
12
13use super::Region;
14use crate::{
15    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
16    event::{
17        metric::{Metric, MetricValue},
18        Event, KeyString,
19    },
20    http::HttpClient,
21    internal_events::{SematextMetricsEncodeEventError, SematextMetricsInvalidMetricError},
22    sinks::{
23        influxdb::{encode_timestamp, encode_uri, influx_line_protocol, Field, ProtocolVersion},
24        util::{
25            buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
26            http::{HttpBatchService, HttpRetryLogic},
27            BatchConfig, EncodedEvent, SinkBatchSettings, TowerRequestConfig,
28        },
29        Healthcheck, HealthcheckError, VectorSink,
30    },
31    vector_version, Result,
32};
33
34#[derive(Clone)]
35struct SematextMetricsService {
36    config: SematextMetricsConfig,
37    inner: HttpBatchService<BoxFuture<'static, Result<Request<Bytes>>>>,
38}
39
40#[derive(Clone, Copy, Debug, Default)]
41pub(crate) struct SematextMetricsDefaultBatchSettings;
42
43impl SinkBatchSettings for SematextMetricsDefaultBatchSettings {
44    const MAX_EVENTS: Option<usize> = Some(20);
45    const MAX_BYTES: Option<usize> = None;
46    const TIMEOUT_SECS: f64 = 1.0;
47}
48
49/// Configuration for the `sematext_metrics` sink.
50#[configurable_component(sink("sematext_metrics", "Publish metric events to Sematext."))]
51#[derive(Clone, Debug)]
52pub struct SematextMetricsConfig {
53    /// Sets the default namespace for any metrics sent.
54    ///
55    /// This namespace is only used if a metric has no existing namespace. When a namespace is
56    /// present, it is used as a prefix to the metric name, and separated with a period (`.`).
57    #[configurable(metadata(docs::examples = "service"))]
58    pub default_namespace: String,
59
60    #[serde(default = "super::default_region")]
61    #[configurable(derived)]
62    pub region: Region,
63
64    /// The endpoint to send data to.
65    ///
66    /// Setting this option overrides the `region` option.
67    #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
68    #[configurable(metadata(docs::examples = "https://example.com"))]
69    pub endpoint: Option<String>,
70
71    /// The token that is used to write to Sematext.
72    #[configurable(metadata(docs::examples = "${SEMATEXT_TOKEN}"))]
73    #[configurable(metadata(docs::examples = "some-sematext-token"))]
74    pub token: SensitiveString,
75
76    #[configurable(derived)]
77    #[serde(default)]
78    pub(self) batch: BatchConfig<SematextMetricsDefaultBatchSettings>,
79
80    #[configurable(derived)]
81    #[serde(default)]
82    pub request: TowerRequestConfig,
83
84    #[configurable(derived)]
85    #[serde(
86        default,
87        deserialize_with = "crate::serde::bool_or_struct",
88        skip_serializing_if = "crate::serde::is_default"
89    )]
90    acknowledgements: AcknowledgementsConfig,
91}
92
93impl GenerateConfig for SematextMetricsConfig {
94    fn generate_config() -> toml::Value {
95        toml::from_str(indoc! {r#"
96            default_namespace = "vector"
97            token = "${SEMATEXT_TOKEN}"
98        "#})
99        .unwrap()
100    }
101}
102
103async fn healthcheck(endpoint: String, client: HttpClient) -> Result<()> {
104    let uri = format!("{endpoint}/health");
105
106    let request = Request::get(uri)
107        .body(Body::empty())
108        .map_err(|e| e.to_string())?;
109
110    let response = client.send(request).await?;
111
112    match response.status() {
113        StatusCode::OK => Ok(()),
114        StatusCode::NO_CONTENT => Ok(()),
115        other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
116    }
117}
118
119// https://sematext.com/docs/monitoring/custom-metrics/
120const US_ENDPOINT: &str = "https://spm-receiver.sematext.com";
121const EU_ENDPOINT: &str = "https://spm-receiver.eu.sematext.com";
122
123#[async_trait::async_trait]
124#[typetag::serde(name = "sematext_metrics")]
125impl SinkConfig for SematextMetricsConfig {
126    async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
127        let client = HttpClient::new(None, cx.proxy())?;
128
129        let endpoint = match (&self.endpoint, &self.region) {
130            (Some(endpoint), _) => endpoint.clone(),
131            (None, Region::Us) => US_ENDPOINT.to_owned(),
132            (None, Region::Eu) => EU_ENDPOINT.to_owned(),
133        };
134
135        let healthcheck = healthcheck(endpoint.clone(), client.clone()).boxed();
136        let sink = SematextMetricsService::new(self.clone(), write_uri(&endpoint)?, client)?;
137
138        Ok((sink, healthcheck))
139    }
140
141    fn input(&self) -> Input {
142        Input::metric()
143    }
144
145    fn acknowledgements(&self) -> &AcknowledgementsConfig {
146        &self.acknowledgements
147    }
148}
149
150fn write_uri(endpoint: &str) -> Result<Uri> {
151    encode_uri(
152        endpoint,
153        "write",
154        &[
155            ("db", Some("metrics".into())),
156            ("v", Some(format!("vector-{}", vector_version()))),
157            ("precision", Some("ns".into())),
158        ],
159    )
160}
161
162impl SematextMetricsService {
163    pub fn new(
164        config: SematextMetricsConfig,
165        endpoint: http::Uri,
166        client: HttpClient,
167    ) -> Result<VectorSink> {
168        let batch = config.batch.into_batch_settings()?;
169        let request = config.request.into_settings();
170        let http_service = HttpBatchService::new(client, create_build_request(endpoint));
171        let sematext_service = SematextMetricsService {
172            config,
173            inner: http_service,
174        };
175        let mut normalizer = MetricNormalizer::<SematextMetricNormalize>::default();
176
177        let sink = request
178            .batch_sink(
179                HttpRetryLogic::default(),
180                sematext_service,
181                MetricsBuffer::new(batch.size),
182                batch.timeout,
183            )
184            .with_flat_map(move |event: Event| {
185                stream::iter({
186                    let byte_size = event.size_of();
187                    let json_byte_size = event.estimated_json_encoded_size_of();
188                    normalizer
189                        .normalize(event.into_metric())
190                        .map(|item| Ok(EncodedEvent::new(item, byte_size, json_byte_size)))
191                })
192            })
193            .sink_map_err(|error| error!(message = "Fatal sematext metrics sink error.", %error));
194
195        #[allow(deprecated)]
196        Ok(VectorSink::from_event_sink(sink))
197    }
198}
199
200impl Service<Vec<Metric>> for SematextMetricsService {
201    type Response = http::Response<bytes::Bytes>;
202    type Error = crate::Error;
203    type Future = BoxFuture<'static, std::result::Result<Self::Response, Self::Error>>;
204
205    fn poll_ready(
206        &mut self,
207        cx: &mut std::task::Context,
208    ) -> Poll<std::result::Result<(), Self::Error>> {
209        self.inner.poll_ready(cx)
210    }
211
212    fn call(&mut self, items: Vec<Metric>) -> Self::Future {
213        let input = encode_events(
214            self.config.token.inner(),
215            &self.config.default_namespace,
216            items,
217        );
218        let body = input.item;
219
220        self.inner.call(body)
221    }
222}
223
224#[derive(Default)]
225struct SematextMetricNormalize;
226
227impl MetricNormalize for SematextMetricNormalize {
228    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
229        match &metric.value() {
230            MetricValue::Gauge { .. } => state.make_absolute(metric),
231            MetricValue::Counter { .. } => state.make_incremental(metric),
232            _ => {
233                emit!(SematextMetricsInvalidMetricError { metric: &metric });
234                None
235            }
236        }
237    }
238}
239
240fn create_build_request(
241    uri: http::Uri,
242) -> impl Fn(Bytes) -> BoxFuture<'static, Result<Request<Bytes>>> + Sync + Send + 'static {
243    move |body| {
244        Box::pin(ready(
245            Request::post(uri.clone())
246                .header("Content-Type", "text/plain")
247                .body(body)
248                .map_err(Into::into),
249        ))
250    }
251}
252
253fn encode_events(
254    token: &str,
255    default_namespace: &str,
256    metrics: Vec<Metric>,
257) -> EncodedEvent<Bytes> {
258    let mut output = BytesMut::new();
259    let byte_size = metrics.size_of();
260    let json_byte_size = metrics.estimated_json_encoded_size_of();
261    for metric in metrics.into_iter() {
262        let (series, data, _metadata) = metric.into_parts();
263        let namespace = series
264            .name
265            .namespace
266            .unwrap_or_else(|| default_namespace.into());
267        let label = series.name.name;
268        let ts = encode_timestamp(data.time.timestamp);
269
270        // Authentication in Sematext is by inserting the token as a tag.
271        let mut tags = series.tags.unwrap_or_default();
272        tags.replace("token".into(), token.to_string());
273        let (metric_type, fields) = match data.value {
274            MetricValue::Counter { value } => ("counter", to_fields(label, value)),
275            MetricValue::Gauge { value } => ("gauge", to_fields(label, value)),
276            _ => unreachable!(), // handled by SematextMetricNormalize
277        };
278
279        tags.replace("metric_type".into(), metric_type.to_string());
280
281        if let Err(error) = influx_line_protocol(
282            ProtocolVersion::V1,
283            &namespace,
284            Some(tags),
285            Some(fields),
286            ts,
287            &mut output,
288        ) {
289            emit!(SematextMetricsEncodeEventError { error });
290        };
291    }
292
293    if !output.is_empty() {
294        output.truncate(output.len() - 1);
295    }
296    EncodedEvent::new(output.freeze(), byte_size, json_byte_size)
297}
298
299fn to_fields(label: String, value: f64) -> HashMap<KeyString, Field> {
300    let mut result = HashMap::new();
301    result.insert(label.into(), Field::Float(value));
302    result
303}
304
305#[cfg(test)]
306mod tests {
307    use chrono::{offset::TimeZone, Timelike, Utc};
308    use futures::StreamExt;
309    use indoc::indoc;
310    use vector_lib::metric_tags;
311
312    use super::*;
313    use crate::{
314        event::{metric::MetricKind, Event},
315        sinks::util::test::{build_test_server, load_sink},
316        test_util::{
317            components::{assert_sink_compliance, HTTP_SINK_TAGS},
318            next_addr, test_generate_config,
319        },
320    };
321
322    #[test]
323    fn generate_config() {
324        test_generate_config::<SematextMetricsConfig>();
325    }
326
327    #[test]
328    fn test_encode_counter_event() {
329        let events = vec![Metric::new(
330            "pool.used",
331            MetricKind::Incremental,
332            MetricValue::Counter { value: 42.0 },
333        )
334        .with_namespace(Some("jvm"))
335        .with_timestamp(Some(
336            Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
337                .single()
338                .expect("invalid timestamp"),
339        ))];
340
341        assert_eq!(
342            "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000",
343            encode_events("aaa", "ns", events).item
344        );
345    }
346
347    #[test]
348    fn test_encode_counter_event_no_namespace() {
349        let events = vec![Metric::new(
350            "used",
351            MetricKind::Incremental,
352            MetricValue::Counter { value: 42.0 },
353        )
354        .with_timestamp(Some(
355            Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
356                .single()
357                .expect("invalid timestamp"),
358        ))];
359
360        assert_eq!(
361            "ns,metric_type=counter,token=aaa used=42 1597784400000000000",
362            encode_events("aaa", "ns", events).item
363        );
364    }
365
366    #[test]
367    fn test_encode_counter_multiple_events() {
368        let events = vec![
369            Metric::new(
370                "pool.used",
371                MetricKind::Incremental,
372                MetricValue::Counter { value: 42.0 },
373            )
374            .with_namespace(Some("jvm"))
375            .with_timestamp(Some(
376                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
377                    .single()
378                    .expect("invalid timestamp"),
379            )),
380            Metric::new(
381                "pool.committed",
382                MetricKind::Incremental,
383                MetricValue::Counter { value: 18874368.0 },
384            )
385            .with_namespace(Some("jvm"))
386            .with_timestamp(Some(
387                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
388                    .single()
389                    .and_then(|t| t.with_nanosecond(1))
390                    .expect("invalid timestamp"),
391            )),
392        ];
393
394        assert_eq!(
395            "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000\n\
396             jvm,metric_type=counter,token=aaa pool.committed=18874368 1597784400000000001",
397            encode_events("aaa", "ns", events).item
398        );
399    }
400
401    #[tokio::test]
402    async fn smoke() {
403        assert_sink_compliance(&HTTP_SINK_TAGS, async {
404
405        let (mut config, cx) = load_sink::<SematextMetricsConfig>(indoc! {r#"
406            default_namespace = "ns"
407            token = "atoken"
408            batch.max_events = 1
409        "#})
410        .unwrap();
411
412        let addr = next_addr();
413        // Swap out the endpoint so we can force send it
414        // to our local server
415        let endpoint = format!("http://{addr}");
416        config.endpoint = Some(endpoint.clone());
417
418        let (sink, _) = config.build(cx).await.unwrap();
419
420        let (rx, _trigger, server) = build_test_server(addr);
421        tokio::spawn(server);
422
423        // Make our test metrics.
424        let metrics = vec![
425            ("os", "swap.size", 324292.0),
426            ("os", "network.tx", 42000.0),
427            ("os", "network.rx", 54293.0),
428            ("process", "count", 12.0),
429            ("process", "uptime", 32423.0),
430            ("process", "rss", 2342333.0),
431            ("jvm", "pool.used", 18874368.0),
432            ("jvm", "pool.committed", 18868584.0),
433            ("jvm", "pool.max", 18874368.0),
434        ];
435
436        let mut events = Vec::new();
437        for (i, (namespace, metric, val)) in metrics.iter().enumerate() {
438            let event = Event::from(
439                Metric::new(
440                    *metric,
441                    MetricKind::Incremental,
442                    MetricValue::Counter { value: *val },
443                )
444                .with_namespace(Some(*namespace))
445                .with_tags(Some(metric_tags!("os.host" => "somehost")))
446                    .with_timestamp(Some(Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0).single()
447                                         .and_then(|t| t.with_nanosecond(i as u32))
448                                         .expect("invalid timestamp"))),
449            );
450            events.push(event);
451        }
452
453        sink.run_events(events).await.unwrap();
454
455        let output = rx.take(metrics.len()).collect::<Vec<_>>().await;
456        assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken swap.size=324292 1597784400000000000", output[0].1);
457        assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken network.tx=42000 1597784400000000001", output[1].1);
458        assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken network.rx=54293 1597784400000000002", output[2].1);
459        assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken count=12 1597784400000000003", output[3].1);
460        assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken uptime=32423 1597784400000000004", output[4].1);
461        assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken rss=2342333 1597784400000000005", output[5].1);
462        assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.used=18874368 1597784400000000006", output[6].1);
463        assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.committed=18868584 1597784400000000007", output[7].1);
464        assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.max=18874368 1597784400000000008", output[8].1);
465        }).await;
466    }
467}