vector/sinks/sematext/
metrics.rs

1use std::{collections::HashMap, future::ready, task::Poll};
2
3use bytes::{Bytes, BytesMut};
4use futures::{FutureExt, SinkExt, future::BoxFuture, stream};
5use http::{StatusCode, Uri};
6use hyper::{Body, Request};
7use indoc::indoc;
8use tower::Service;
9use vector_lib::{
10    ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component,
11    sensitive_string::SensitiveString,
12};
13
14use super::Region;
15use crate::{
16    Result,
17    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
18    event::{
19        Event, KeyString,
20        metric::{Metric, MetricValue},
21    },
22    http::HttpClient,
23    internal_events::{SematextMetricsEncodeEventError, SematextMetricsInvalidMetricError},
24    sinks::{
25        Healthcheck, HealthcheckError, VectorSink,
26        influxdb::{Field, ProtocolVersion, encode_timestamp, encode_uri, influx_line_protocol},
27        util::{
28            BatchConfig, EncodedEvent, SinkBatchSettings, TowerRequestConfig,
29            buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
30            http::{HttpBatchService, HttpRetryLogic},
31        },
32    },
33    vector_version,
34};
35
36#[derive(Clone)]
37struct SematextMetricsService {
38    config: SematextMetricsConfig,
39    inner: HttpBatchService<BoxFuture<'static, Result<Request<Bytes>>>>,
40}
41
42#[derive(Clone, Copy, Debug, Default)]
43pub(crate) struct SematextMetricsDefaultBatchSettings;
44
45impl SinkBatchSettings for SematextMetricsDefaultBatchSettings {
46    const MAX_EVENTS: Option<usize> = Some(20);
47    const MAX_BYTES: Option<usize> = None;
48    const TIMEOUT_SECS: f64 = 1.0;
49}
50
51/// Configuration for the `sematext_metrics` sink.
52#[configurable_component(sink("sematext_metrics", "Publish metric events to Sematext."))]
53#[derive(Clone, Debug)]
54pub struct SematextMetricsConfig {
55    /// Sets the default namespace for any metrics sent.
56    ///
57    /// This namespace is only used if a metric has no existing namespace. When a namespace is
58    /// present, it is used as a prefix to the metric name, and separated with a period (`.`).
59    #[configurable(metadata(docs::examples = "service"))]
60    pub default_namespace: String,
61
62    #[serde(default = "super::default_region")]
63    #[configurable(derived)]
64    pub region: Region,
65
66    /// The endpoint to send data to.
67    ///
68    /// Setting this option overrides the `region` option.
69    #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
70    #[configurable(metadata(docs::examples = "https://example.com"))]
71    pub endpoint: Option<String>,
72
73    /// The token that is used to write to Sematext.
74    #[configurable(metadata(docs::examples = "${SEMATEXT_TOKEN}"))]
75    #[configurable(metadata(docs::examples = "some-sematext-token"))]
76    pub token: SensitiveString,
77
78    #[configurable(derived)]
79    #[serde(default)]
80    pub(self) batch: BatchConfig<SematextMetricsDefaultBatchSettings>,
81
82    #[configurable(derived)]
83    #[serde(default)]
84    pub request: TowerRequestConfig,
85
86    #[configurable(derived)]
87    #[serde(
88        default,
89        deserialize_with = "crate::serde::bool_or_struct",
90        skip_serializing_if = "crate::serde::is_default"
91    )]
92    acknowledgements: AcknowledgementsConfig,
93}
94
95impl GenerateConfig for SematextMetricsConfig {
96    fn generate_config() -> toml::Value {
97        toml::from_str(indoc! {r#"
98            default_namespace = "vector"
99            token = "${SEMATEXT_TOKEN}"
100        "#})
101        .unwrap()
102    }
103}
104
105async fn healthcheck(endpoint: String, client: HttpClient) -> Result<()> {
106    let uri = format!("{endpoint}/health");
107
108    let request = Request::get(uri)
109        .body(Body::empty())
110        .map_err(|e| e.to_string())?;
111
112    let response = client.send(request).await?;
113
114    match response.status() {
115        StatusCode::OK => Ok(()),
116        StatusCode::NO_CONTENT => Ok(()),
117        other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
118    }
119}
120
121// https://sematext.com/docs/monitoring/custom-metrics/
122const US_ENDPOINT: &str = "https://spm-receiver.sematext.com";
123const EU_ENDPOINT: &str = "https://spm-receiver.eu.sematext.com";
124
125#[async_trait::async_trait]
126#[typetag::serde(name = "sematext_metrics")]
127impl SinkConfig for SematextMetricsConfig {
128    async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
129        let client = HttpClient::new(None, cx.proxy())?;
130
131        let endpoint = match (&self.endpoint, &self.region) {
132            (Some(endpoint), _) => endpoint.clone(),
133            (None, Region::Us) => US_ENDPOINT.to_owned(),
134            (None, Region::Eu) => EU_ENDPOINT.to_owned(),
135        };
136
137        let healthcheck = healthcheck(endpoint.clone(), client.clone()).boxed();
138        let sink = SematextMetricsService::new(self.clone(), write_uri(&endpoint)?, client)?;
139
140        Ok((sink, healthcheck))
141    }
142
143    fn input(&self) -> Input {
144        Input::metric()
145    }
146
147    fn acknowledgements(&self) -> &AcknowledgementsConfig {
148        &self.acknowledgements
149    }
150}
151
152fn write_uri(endpoint: &str) -> Result<Uri> {
153    encode_uri(
154        endpoint,
155        "write",
156        &[
157            ("db", Some("metrics".into())),
158            ("v", Some(format!("vector-{}", vector_version()))),
159            ("precision", Some("ns".into())),
160        ],
161    )
162}
163
164impl SematextMetricsService {
165    pub fn new(
166        config: SematextMetricsConfig,
167        endpoint: http::Uri,
168        client: HttpClient,
169    ) -> Result<VectorSink> {
170        let batch = config.batch.into_batch_settings()?;
171        let request = config.request.into_settings();
172        let http_service = HttpBatchService::new(client, create_build_request(endpoint));
173        let sematext_service = SematextMetricsService {
174            config,
175            inner: http_service,
176        };
177        let mut normalizer = MetricNormalizer::<SematextMetricNormalize>::default();
178
179        let sink = request
180            .batch_sink(
181                HttpRetryLogic::default(),
182                sematext_service,
183                MetricsBuffer::new(batch.size),
184                batch.timeout,
185            )
186            .with_flat_map(move |event: Event| {
187                stream::iter({
188                    let byte_size = event.size_of();
189                    let json_byte_size = event.estimated_json_encoded_size_of();
190                    normalizer
191                        .normalize(event.into_metric())
192                        .map(|item| Ok(EncodedEvent::new(item, byte_size, json_byte_size)))
193                })
194            })
195            .sink_map_err(|error| error!(message = "Fatal sematext metrics sink error.", %error));
196
197        #[allow(deprecated)]
198        Ok(VectorSink::from_event_sink(sink))
199    }
200}
201
202impl Service<Vec<Metric>> for SematextMetricsService {
203    type Response = http::Response<bytes::Bytes>;
204    type Error = crate::Error;
205    type Future = BoxFuture<'static, std::result::Result<Self::Response, Self::Error>>;
206
207    fn poll_ready(
208        &mut self,
209        cx: &mut std::task::Context,
210    ) -> Poll<std::result::Result<(), Self::Error>> {
211        self.inner.poll_ready(cx)
212    }
213
214    fn call(&mut self, items: Vec<Metric>) -> Self::Future {
215        let input = encode_events(
216            self.config.token.inner(),
217            &self.config.default_namespace,
218            items,
219        );
220        let body = input.item;
221
222        self.inner.call(body)
223    }
224}
225
226#[derive(Default)]
227struct SematextMetricNormalize;
228
229impl MetricNormalize for SematextMetricNormalize {
230    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
231        match &metric.value() {
232            MetricValue::Gauge { .. } => state.make_absolute(metric),
233            MetricValue::Counter { .. } => state.make_incremental(metric),
234            _ => {
235                emit!(SematextMetricsInvalidMetricError { metric: &metric });
236                None
237            }
238        }
239    }
240}
241
242fn create_build_request(
243    uri: http::Uri,
244) -> impl Fn(Bytes) -> BoxFuture<'static, Result<Request<Bytes>>> + Sync + Send + 'static {
245    move |body| {
246        Box::pin(ready(
247            Request::post(uri.clone())
248                .header("Content-Type", "text/plain")
249                .body(body)
250                .map_err(Into::into),
251        ))
252    }
253}
254
255fn encode_events(
256    token: &str,
257    default_namespace: &str,
258    metrics: Vec<Metric>,
259) -> EncodedEvent<Bytes> {
260    let mut output = BytesMut::new();
261    let byte_size = metrics.size_of();
262    let json_byte_size = metrics.estimated_json_encoded_size_of();
263    for metric in metrics.into_iter() {
264        let (series, data, _metadata) = metric.into_parts();
265        let namespace = series
266            .name
267            .namespace
268            .unwrap_or_else(|| default_namespace.into());
269        let label = series.name.name;
270        let ts = encode_timestamp(data.time.timestamp);
271
272        // Authentication in Sematext is by inserting the token as a tag.
273        let mut tags = series.tags.unwrap_or_default();
274        tags.replace("token".into(), token.to_string());
275        let (metric_type, fields) = match data.value {
276            MetricValue::Counter { value } => ("counter", to_fields(label, value)),
277            MetricValue::Gauge { value } => ("gauge", to_fields(label, value)),
278            _ => unreachable!(), // handled by SematextMetricNormalize
279        };
280
281        tags.replace("metric_type".into(), metric_type.to_string());
282
283        if let Err(error) = influx_line_protocol(
284            ProtocolVersion::V1,
285            &namespace,
286            Some(tags),
287            Some(fields),
288            ts,
289            &mut output,
290        ) {
291            emit!(SematextMetricsEncodeEventError { error });
292        };
293    }
294
295    if !output.is_empty() {
296        output.truncate(output.len() - 1);
297    }
298    EncodedEvent::new(output.freeze(), byte_size, json_byte_size)
299}
300
301fn to_fields(label: String, value: f64) -> HashMap<KeyString, Field> {
302    let mut result = HashMap::new();
303    result.insert(label.into(), Field::Float(value));
304    result
305}
306
307#[cfg(test)]
308mod tests {
309    use chrono::{Timelike, Utc, offset::TimeZone};
310    use futures::StreamExt;
311    use indoc::indoc;
312    use vector_lib::metric_tags;
313
314    use super::*;
315    use crate::{
316        event::{Event, metric::MetricKind},
317        sinks::util::test::{build_test_server, load_sink},
318        test_util::{
319            components::{HTTP_SINK_TAGS, assert_sink_compliance},
320            next_addr, test_generate_config,
321        },
322    };
323
324    #[test]
325    fn generate_config() {
326        test_generate_config::<SematextMetricsConfig>();
327    }
328
329    #[test]
330    fn test_encode_counter_event() {
331        let events = vec![
332            Metric::new(
333                "pool.used",
334                MetricKind::Incremental,
335                MetricValue::Counter { value: 42.0 },
336            )
337            .with_namespace(Some("jvm"))
338            .with_timestamp(Some(
339                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
340                    .single()
341                    .expect("invalid timestamp"),
342            )),
343        ];
344
345        assert_eq!(
346            "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000",
347            encode_events("aaa", "ns", events).item
348        );
349    }
350
351    #[test]
352    fn test_encode_counter_event_no_namespace() {
353        let events = vec![
354            Metric::new(
355                "used",
356                MetricKind::Incremental,
357                MetricValue::Counter { value: 42.0 },
358            )
359            .with_timestamp(Some(
360                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
361                    .single()
362                    .expect("invalid timestamp"),
363            )),
364        ];
365
366        assert_eq!(
367            "ns,metric_type=counter,token=aaa used=42 1597784400000000000",
368            encode_events("aaa", "ns", events).item
369        );
370    }
371
372    #[test]
373    fn test_encode_counter_multiple_events() {
374        let events = vec![
375            Metric::new(
376                "pool.used",
377                MetricKind::Incremental,
378                MetricValue::Counter { value: 42.0 },
379            )
380            .with_namespace(Some("jvm"))
381            .with_timestamp(Some(
382                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
383                    .single()
384                    .expect("invalid timestamp"),
385            )),
386            Metric::new(
387                "pool.committed",
388                MetricKind::Incremental,
389                MetricValue::Counter { value: 18874368.0 },
390            )
391            .with_namespace(Some("jvm"))
392            .with_timestamp(Some(
393                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
394                    .single()
395                    .and_then(|t| t.with_nanosecond(1))
396                    .expect("invalid timestamp"),
397            )),
398        ];
399
400        assert_eq!(
401            "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000\n\
402             jvm,metric_type=counter,token=aaa pool.committed=18874368 1597784400000000001",
403            encode_events("aaa", "ns", events).item
404        );
405    }
406
407    #[tokio::test]
408    async fn smoke() {
409        assert_sink_compliance(&HTTP_SINK_TAGS, async {
410
411        let (mut config, cx) = load_sink::<SematextMetricsConfig>(indoc! {r#"
412            default_namespace = "ns"
413            token = "atoken"
414            batch.max_events = 1
415        "#})
416        .unwrap();
417
418        let addr = next_addr();
419        // Swap out the endpoint so we can force send it
420        // to our local server
421        let endpoint = format!("http://{addr}");
422        config.endpoint = Some(endpoint.clone());
423
424        let (sink, _) = config.build(cx).await.unwrap();
425
426        let (rx, _trigger, server) = build_test_server(addr);
427        tokio::spawn(server);
428
429        // Make our test metrics.
430        let metrics = vec![
431            ("os", "swap.size", 324292.0),
432            ("os", "network.tx", 42000.0),
433            ("os", "network.rx", 54293.0),
434            ("process", "count", 12.0),
435            ("process", "uptime", 32423.0),
436            ("process", "rss", 2342333.0),
437            ("jvm", "pool.used", 18874368.0),
438            ("jvm", "pool.committed", 18868584.0),
439            ("jvm", "pool.max", 18874368.0),
440        ];
441
442        let mut events = Vec::new();
443        for (i, (namespace, metric, val)) in metrics.iter().enumerate() {
444            let event = Event::from(
445                Metric::new(
446                    *metric,
447                    MetricKind::Incremental,
448                    MetricValue::Counter { value: *val },
449                )
450                .with_namespace(Some(*namespace))
451                .with_tags(Some(metric_tags!("os.host" => "somehost")))
452                    .with_timestamp(Some(Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0).single()
453                                         .and_then(|t| t.with_nanosecond(i as u32))
454                                         .expect("invalid timestamp"))),
455            );
456            events.push(event);
457        }
458
459        sink.run_events(events).await.unwrap();
460
461        let output = rx.take(metrics.len()).collect::<Vec<_>>().await;
462        assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken swap.size=324292 1597784400000000000", output[0].1);
463        assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken network.tx=42000 1597784400000000001", output[1].1);
464        assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken network.rx=54293 1597784400000000002", output[2].1);
465        assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken count=12 1597784400000000003", output[3].1);
466        assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken uptime=32423 1597784400000000004", output[4].1);
467        assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken rss=2342333 1597784400000000005", output[5].1);
468        assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.used=18874368 1597784400000000006", output[6].1);
469        assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.committed=18868584 1597784400000000007", output[7].1);
470        assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.max=18874368 1597784400000000008", output[8].1);
471        }).await;
472    }
473}