vector/sinks/sematext/
metrics.rs

1use std::{collections::HashMap, future::ready, task::Poll};
2
3use bytes::{Bytes, BytesMut};
4use futures::{FutureExt, SinkExt, future::BoxFuture, stream};
5use http::{StatusCode, Uri};
6use hyper::{Body, Request};
7use indoc::indoc;
8use tower::Service;
9use vector_lib::{
10    ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component,
11    sensitive_string::SensitiveString,
12};
13
14use super::Region;
15use crate::{
16    Result,
17    config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
18    event::{
19        Event, KeyString,
20        metric::{Metric, MetricValue},
21    },
22    http::HttpClient,
23    internal_events::{SematextMetricsEncodeEventError, SematextMetricsInvalidMetricError},
24    sinks::{
25        Healthcheck, HealthcheckError, VectorSink,
26        influxdb::{Field, ProtocolVersion, encode_timestamp, encode_uri, influx_line_protocol},
27        util::{
28            BatchConfig, EncodedEvent, SinkBatchSettings, TowerRequestConfig,
29            buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
30            http::{HttpBatchService, HttpRetryLogic},
31        },
32    },
33    vector_version,
34};
35
36#[derive(Clone)]
37struct SematextMetricsService {
38    config: SematextMetricsConfig,
39    inner: HttpBatchService<BoxFuture<'static, Result<Request<Bytes>>>>,
40}
41
42#[derive(Clone, Copy, Debug, Default)]
43pub(crate) struct SematextMetricsDefaultBatchSettings;
44
45impl SinkBatchSettings for SematextMetricsDefaultBatchSettings {
46    const MAX_EVENTS: Option<usize> = Some(20);
47    const MAX_BYTES: Option<usize> = None;
48    const TIMEOUT_SECS: f64 = 1.0;
49}
50
51/// Configuration for the `sematext_metrics` sink.
52#[configurable_component(sink("sematext_metrics", "Publish metric events to Sematext."))]
53#[derive(Clone, Debug)]
54pub struct SematextMetricsConfig {
55    /// Sets the default namespace for any metrics sent.
56    ///
57    /// This namespace is only used if a metric has no existing namespace. When a namespace is
58    /// present, it is used as a prefix to the metric name, and separated with a period (`.`).
59    #[configurable(metadata(docs::examples = "service"))]
60    pub default_namespace: String,
61
62    #[serde(default = "super::default_region")]
63    #[configurable(derived)]
64    pub region: Region,
65
66    /// The endpoint to send data to.
67    ///
68    /// Setting this option overrides the `region` option.
69    #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
70    #[configurable(metadata(docs::examples = "https://example.com"))]
71    pub endpoint: Option<String>,
72
73    /// The token that is used to write to Sematext.
74    #[configurable(metadata(docs::examples = "${SEMATEXT_TOKEN}"))]
75    #[configurable(metadata(docs::examples = "some-sematext-token"))]
76    pub token: SensitiveString,
77
78    #[configurable(derived)]
79    #[serde(default)]
80    pub(self) batch: BatchConfig<SematextMetricsDefaultBatchSettings>,
81
82    #[configurable(derived)]
83    #[serde(default)]
84    pub request: TowerRequestConfig,
85
86    #[configurable(derived)]
87    #[serde(
88        default,
89        deserialize_with = "crate::serde::bool_or_struct",
90        skip_serializing_if = "crate::serde::is_default"
91    )]
92    acknowledgements: AcknowledgementsConfig,
93}
94
95impl GenerateConfig for SematextMetricsConfig {
96    fn generate_config() -> toml::Value {
97        toml::from_str(indoc! {r#"
98            default_namespace = "vector"
99            token = "${SEMATEXT_TOKEN}"
100        "#})
101        .unwrap()
102    }
103}
104
105async fn healthcheck(endpoint: String, client: HttpClient) -> Result<()> {
106    let uri = format!("{endpoint}/health");
107
108    let request = Request::get(uri)
109        .body(Body::empty())
110        .map_err(|e| e.to_string())?;
111
112    let response = client.send(request).await?;
113
114    match response.status() {
115        StatusCode::OK => Ok(()),
116        StatusCode::NO_CONTENT => Ok(()),
117        other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
118    }
119}
120
121// https://sematext.com/docs/monitoring/custom-metrics/
122const US_ENDPOINT: &str = "https://spm-receiver.sematext.com";
123const EU_ENDPOINT: &str = "https://spm-receiver.eu.sematext.com";
124
125#[async_trait::async_trait]
126#[typetag::serde(name = "sematext_metrics")]
127impl SinkConfig for SematextMetricsConfig {
128    async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
129        let client = HttpClient::new(None, cx.proxy())?;
130
131        let endpoint = match (&self.endpoint, &self.region) {
132            (Some(endpoint), _) => endpoint.clone(),
133            (None, Region::Us) => US_ENDPOINT.to_owned(),
134            (None, Region::Eu) => EU_ENDPOINT.to_owned(),
135        };
136
137        let healthcheck = healthcheck(endpoint.clone(), client.clone()).boxed();
138        let sink = SematextMetricsService::new(self.clone(), write_uri(&endpoint)?, client)?;
139
140        Ok((sink, healthcheck))
141    }
142
143    fn input(&self) -> Input {
144        Input::metric()
145    }
146
147    fn acknowledgements(&self) -> &AcknowledgementsConfig {
148        &self.acknowledgements
149    }
150}
151
152fn write_uri(endpoint: &str) -> Result<Uri> {
153    encode_uri(
154        endpoint,
155        "write",
156        &[
157            ("db", Some("metrics".into())),
158            ("v", Some(format!("vector-{}", vector_version()))),
159            ("precision", Some("ns".into())),
160        ],
161    )
162}
163
164impl SematextMetricsService {
165    pub fn new(
166        config: SematextMetricsConfig,
167        endpoint: http::Uri,
168        client: HttpClient,
169    ) -> Result<VectorSink> {
170        let batch = config.batch.into_batch_settings()?;
171        let request = config.request.into_settings();
172        let http_service = HttpBatchService::new(client, create_build_request(endpoint));
173        let sematext_service = SematextMetricsService {
174            config,
175            inner: http_service,
176        };
177        let mut normalizer = MetricNormalizer::<SematextMetricNormalize>::default();
178
179        let sink = request
180            .batch_sink(
181                HttpRetryLogic::default(),
182                sematext_service,
183                MetricsBuffer::new(batch.size),
184                batch.timeout,
185            )
186            .with_flat_map(move |event: Event| {
187                stream::iter({
188                    let byte_size = event.size_of();
189                    let json_byte_size = event.estimated_json_encoded_size_of();
190                    normalizer
191                        .normalize(event.into_metric())
192                        .map(|item| Ok(EncodedEvent::new(item, byte_size, json_byte_size)))
193                })
194            })
195            .sink_map_err(|error| error!(message = "Fatal sematext metrics sink error.", %error, internal_log_rate_limit = false));
196
197        #[allow(deprecated)]
198        Ok(VectorSink::from_event_sink(sink))
199    }
200}
201
202impl Service<Vec<Metric>> for SematextMetricsService {
203    type Response = http::Response<bytes::Bytes>;
204    type Error = crate::Error;
205    type Future = BoxFuture<'static, std::result::Result<Self::Response, Self::Error>>;
206
207    fn poll_ready(
208        &mut self,
209        cx: &mut std::task::Context,
210    ) -> Poll<std::result::Result<(), Self::Error>> {
211        self.inner.poll_ready(cx)
212    }
213
214    fn call(&mut self, items: Vec<Metric>) -> Self::Future {
215        let input = encode_events(
216            self.config.token.inner(),
217            &self.config.default_namespace,
218            items,
219        );
220        let body = input.item;
221
222        self.inner.call(body)
223    }
224}
225
226#[derive(Default)]
227struct SematextMetricNormalize;
228
229impl MetricNormalize for SematextMetricNormalize {
230    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
231        match &metric.value() {
232            MetricValue::Gauge { .. } => state.make_absolute(metric),
233            MetricValue::Counter { .. } => state.make_incremental(metric),
234            _ => {
235                emit!(SematextMetricsInvalidMetricError { metric: &metric });
236                None
237            }
238        }
239    }
240}
241
242fn create_build_request(
243    uri: http::Uri,
244) -> impl Fn(Bytes) -> BoxFuture<'static, Result<Request<Bytes>>> + Sync + Send + 'static {
245    move |body| {
246        Box::pin(ready(
247            Request::post(uri.clone())
248                .header("Content-Type", "text/plain")
249                .body(body)
250                .map_err(Into::into),
251        ))
252    }
253}
254
255fn encode_events(
256    token: &str,
257    default_namespace: &str,
258    metrics: Vec<Metric>,
259) -> EncodedEvent<Bytes> {
260    let mut output = BytesMut::new();
261    let byte_size = metrics.size_of();
262    let json_byte_size = metrics.estimated_json_encoded_size_of();
263    for metric in metrics.into_iter() {
264        let (series, data, _metadata) = metric.into_parts();
265        let namespace = series
266            .name
267            .namespace
268            .unwrap_or_else(|| default_namespace.into());
269        let label = series.name.name;
270        let ts = encode_timestamp(data.time.timestamp);
271
272        // Authentication in Sematext is by inserting the token as a tag.
273        let mut tags = series.tags.unwrap_or_default();
274        tags.replace("token".into(), token.to_string());
275        let (metric_type, fields) = match data.value {
276            MetricValue::Counter { value } => ("counter", to_fields(label, value)),
277            MetricValue::Gauge { value } => ("gauge", to_fields(label, value)),
278            _ => unreachable!(), // handled by SematextMetricNormalize
279        };
280
281        tags.replace("metric_type".into(), metric_type.to_string());
282
283        if let Err(error) = influx_line_protocol(
284            ProtocolVersion::V1,
285            &namespace,
286            Some(tags),
287            Some(fields),
288            ts,
289            &mut output,
290        ) {
291            emit!(SematextMetricsEncodeEventError { error });
292        };
293    }
294
295    if !output.is_empty() {
296        output.truncate(output.len() - 1);
297    }
298    EncodedEvent::new(output.freeze(), byte_size, json_byte_size)
299}
300
301fn to_fields(label: String, value: f64) -> HashMap<KeyString, Field> {
302    let mut result = HashMap::new();
303    result.insert(label.into(), Field::Float(value));
304    result
305}
306
307#[cfg(test)]
308mod tests {
309    use chrono::{Timelike, Utc, offset::TimeZone};
310    use futures::StreamExt;
311    use indoc::indoc;
312    use vector_lib::metric_tags;
313
314    use super::*;
315    use crate::{
316        event::{Event, metric::MetricKind},
317        sinks::util::test::{build_test_server, load_sink},
318        test_util::{
319            addr::next_addr,
320            components::{HTTP_SINK_TAGS, assert_sink_compliance},
321            test_generate_config,
322        },
323    };
324
325    #[test]
326    fn generate_config() {
327        test_generate_config::<SematextMetricsConfig>();
328    }
329
330    #[test]
331    fn test_encode_counter_event() {
332        let events = vec![
333            Metric::new(
334                "pool.used",
335                MetricKind::Incremental,
336                MetricValue::Counter { value: 42.0 },
337            )
338            .with_namespace(Some("jvm"))
339            .with_timestamp(Some(
340                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
341                    .single()
342                    .expect("invalid timestamp"),
343            )),
344        ];
345
346        assert_eq!(
347            "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000",
348            encode_events("aaa", "ns", events).item
349        );
350    }
351
352    #[test]
353    fn test_encode_counter_event_no_namespace() {
354        let events = vec![
355            Metric::new(
356                "used",
357                MetricKind::Incremental,
358                MetricValue::Counter { value: 42.0 },
359            )
360            .with_timestamp(Some(
361                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
362                    .single()
363                    .expect("invalid timestamp"),
364            )),
365        ];
366
367        assert_eq!(
368            "ns,metric_type=counter,token=aaa used=42 1597784400000000000",
369            encode_events("aaa", "ns", events).item
370        );
371    }
372
373    #[test]
374    fn test_encode_counter_multiple_events() {
375        let events = vec![
376            Metric::new(
377                "pool.used",
378                MetricKind::Incremental,
379                MetricValue::Counter { value: 42.0 },
380            )
381            .with_namespace(Some("jvm"))
382            .with_timestamp(Some(
383                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
384                    .single()
385                    .expect("invalid timestamp"),
386            )),
387            Metric::new(
388                "pool.committed",
389                MetricKind::Incremental,
390                MetricValue::Counter { value: 18874368.0 },
391            )
392            .with_namespace(Some("jvm"))
393            .with_timestamp(Some(
394                Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
395                    .single()
396                    .and_then(|t| t.with_nanosecond(1))
397                    .expect("invalid timestamp"),
398            )),
399        ];
400
401        assert_eq!(
402            "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000\n\
403             jvm,metric_type=counter,token=aaa pool.committed=18874368 1597784400000000001",
404            encode_events("aaa", "ns", events).item
405        );
406    }
407
408    #[tokio::test]
409    async fn smoke() {
410        assert_sink_compliance(&HTTP_SINK_TAGS, async {
411
412        let (mut config, cx) = load_sink::<SematextMetricsConfig>(indoc! {r#"
413            default_namespace = "ns"
414            token = "atoken"
415            batch.max_events = 1
416        "#})
417        .unwrap();
418
419        let (_guard, addr) = next_addr();
420        // Swap out the endpoint so we can force send it
421        // to our local server
422        let endpoint = format!("http://{addr}");
423        config.endpoint = Some(endpoint.clone());
424
425        let (sink, _) = config.build(cx).await.unwrap();
426
427        let (rx, _trigger, server) = build_test_server(addr);
428        tokio::spawn(server);
429
430        // Make our test metrics.
431        let metrics = vec![
432            ("os", "swap.size", 324292.0),
433            ("os", "network.tx", 42000.0),
434            ("os", "network.rx", 54293.0),
435            ("process", "count", 12.0),
436            ("process", "uptime", 32423.0),
437            ("process", "rss", 2342333.0),
438            ("jvm", "pool.used", 18874368.0),
439            ("jvm", "pool.committed", 18868584.0),
440            ("jvm", "pool.max", 18874368.0),
441        ];
442
443        let mut events = Vec::new();
444        for (i, (namespace, metric, val)) in metrics.iter().enumerate() {
445            let event = Event::from(
446                Metric::new(
447                    *metric,
448                    MetricKind::Incremental,
449                    MetricValue::Counter { value: *val },
450                )
451                .with_namespace(Some(*namespace))
452                .with_tags(Some(metric_tags!("os.host" => "somehost")))
453                    .with_timestamp(Some(Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0).single()
454                                         .and_then(|t| t.with_nanosecond(i as u32))
455                                         .expect("invalid timestamp"))),
456            );
457            events.push(event);
458        }
459
460        sink.run_events(events).await.unwrap();
461
462        let output = rx.take(metrics.len()).collect::<Vec<_>>().await;
463        assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken swap.size=324292 1597784400000000000", output[0].1);
464        assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken network.tx=42000 1597784400000000001", output[1].1);
465        assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken network.rx=54293 1597784400000000002", output[2].1);
466        assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken count=12 1597784400000000003", output[3].1);
467        assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken uptime=32423 1597784400000000004", output[4].1);
468        assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken rss=2342333 1597784400000000005", output[5].1);
469        assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.used=18874368 1597784400000000006", output[6].1);
470        assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.committed=18868584 1597784400000000007", output[7].1);
471        assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.max=18874368 1597784400000000008", output[8].1);
472        }).await;
473    }
474}