1use std::{collections::HashMap, future::ready, task::Poll};
2
3use bytes::{Bytes, BytesMut};
4use futures::{future::BoxFuture, stream, FutureExt, SinkExt};
5use http::{StatusCode, Uri};
6use hyper::{Body, Request};
7use indoc::indoc;
8use tower::Service;
9use vector_lib::configurable::configurable_component;
10use vector_lib::sensitive_string::SensitiveString;
11use vector_lib::{ByteSizeOf, EstimatedJsonEncodedSizeOf};
12
13use super::Region;
14use crate::{
15 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
16 event::{
17 metric::{Metric, MetricValue},
18 Event, KeyString,
19 },
20 http::HttpClient,
21 internal_events::{SematextMetricsEncodeEventError, SematextMetricsInvalidMetricError},
22 sinks::{
23 influxdb::{encode_timestamp, encode_uri, influx_line_protocol, Field, ProtocolVersion},
24 util::{
25 buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
26 http::{HttpBatchService, HttpRetryLogic},
27 BatchConfig, EncodedEvent, SinkBatchSettings, TowerRequestConfig,
28 },
29 Healthcheck, HealthcheckError, VectorSink,
30 },
31 vector_version, Result,
32};
33
34#[derive(Clone)]
35struct SematextMetricsService {
36 config: SematextMetricsConfig,
37 inner: HttpBatchService<BoxFuture<'static, Result<Request<Bytes>>>>,
38}
39
40#[derive(Clone, Copy, Debug, Default)]
41pub(crate) struct SematextMetricsDefaultBatchSettings;
42
43impl SinkBatchSettings for SematextMetricsDefaultBatchSettings {
44 const MAX_EVENTS: Option<usize> = Some(20);
45 const MAX_BYTES: Option<usize> = None;
46 const TIMEOUT_SECS: f64 = 1.0;
47}
48
49#[configurable_component(sink("sematext_metrics", "Publish metric events to Sematext."))]
51#[derive(Clone, Debug)]
52pub struct SematextMetricsConfig {
53 #[configurable(metadata(docs::examples = "service"))]
58 pub default_namespace: String,
59
60 #[serde(default = "super::default_region")]
61 #[configurable(derived)]
62 pub region: Region,
63
64 #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
68 #[configurable(metadata(docs::examples = "https://example.com"))]
69 pub endpoint: Option<String>,
70
71 #[configurable(metadata(docs::examples = "${SEMATEXT_TOKEN}"))]
73 #[configurable(metadata(docs::examples = "some-sematext-token"))]
74 pub token: SensitiveString,
75
76 #[configurable(derived)]
77 #[serde(default)]
78 pub(self) batch: BatchConfig<SematextMetricsDefaultBatchSettings>,
79
80 #[configurable(derived)]
81 #[serde(default)]
82 pub request: TowerRequestConfig,
83
84 #[configurable(derived)]
85 #[serde(
86 default,
87 deserialize_with = "crate::serde::bool_or_struct",
88 skip_serializing_if = "crate::serde::is_default"
89 )]
90 acknowledgements: AcknowledgementsConfig,
91}
92
93impl GenerateConfig for SematextMetricsConfig {
94 fn generate_config() -> toml::Value {
95 toml::from_str(indoc! {r#"
96 default_namespace = "vector"
97 token = "${SEMATEXT_TOKEN}"
98 "#})
99 .unwrap()
100 }
101}
102
103async fn healthcheck(endpoint: String, client: HttpClient) -> Result<()> {
104 let uri = format!("{endpoint}/health");
105
106 let request = Request::get(uri)
107 .body(Body::empty())
108 .map_err(|e| e.to_string())?;
109
110 let response = client.send(request).await?;
111
112 match response.status() {
113 StatusCode::OK => Ok(()),
114 StatusCode::NO_CONTENT => Ok(()),
115 other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
116 }
117}
118
119const US_ENDPOINT: &str = "https://spm-receiver.sematext.com";
121const EU_ENDPOINT: &str = "https://spm-receiver.eu.sematext.com";
122
123#[async_trait::async_trait]
124#[typetag::serde(name = "sematext_metrics")]
125impl SinkConfig for SematextMetricsConfig {
126 async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
127 let client = HttpClient::new(None, cx.proxy())?;
128
129 let endpoint = match (&self.endpoint, &self.region) {
130 (Some(endpoint), _) => endpoint.clone(),
131 (None, Region::Us) => US_ENDPOINT.to_owned(),
132 (None, Region::Eu) => EU_ENDPOINT.to_owned(),
133 };
134
135 let healthcheck = healthcheck(endpoint.clone(), client.clone()).boxed();
136 let sink = SematextMetricsService::new(self.clone(), write_uri(&endpoint)?, client)?;
137
138 Ok((sink, healthcheck))
139 }
140
141 fn input(&self) -> Input {
142 Input::metric()
143 }
144
145 fn acknowledgements(&self) -> &AcknowledgementsConfig {
146 &self.acknowledgements
147 }
148}
149
150fn write_uri(endpoint: &str) -> Result<Uri> {
151 encode_uri(
152 endpoint,
153 "write",
154 &[
155 ("db", Some("metrics".into())),
156 ("v", Some(format!("vector-{}", vector_version()))),
157 ("precision", Some("ns".into())),
158 ],
159 )
160}
161
162impl SematextMetricsService {
163 pub fn new(
164 config: SematextMetricsConfig,
165 endpoint: http::Uri,
166 client: HttpClient,
167 ) -> Result<VectorSink> {
168 let batch = config.batch.into_batch_settings()?;
169 let request = config.request.into_settings();
170 let http_service = HttpBatchService::new(client, create_build_request(endpoint));
171 let sematext_service = SematextMetricsService {
172 config,
173 inner: http_service,
174 };
175 let mut normalizer = MetricNormalizer::<SematextMetricNormalize>::default();
176
177 let sink = request
178 .batch_sink(
179 HttpRetryLogic::default(),
180 sematext_service,
181 MetricsBuffer::new(batch.size),
182 batch.timeout,
183 )
184 .with_flat_map(move |event: Event| {
185 stream::iter({
186 let byte_size = event.size_of();
187 let json_byte_size = event.estimated_json_encoded_size_of();
188 normalizer
189 .normalize(event.into_metric())
190 .map(|item| Ok(EncodedEvent::new(item, byte_size, json_byte_size)))
191 })
192 })
193 .sink_map_err(|error| error!(message = "Fatal sematext metrics sink error.", %error));
194
195 #[allow(deprecated)]
196 Ok(VectorSink::from_event_sink(sink))
197 }
198}
199
200impl Service<Vec<Metric>> for SematextMetricsService {
201 type Response = http::Response<bytes::Bytes>;
202 type Error = crate::Error;
203 type Future = BoxFuture<'static, std::result::Result<Self::Response, Self::Error>>;
204
205 fn poll_ready(
206 &mut self,
207 cx: &mut std::task::Context,
208 ) -> Poll<std::result::Result<(), Self::Error>> {
209 self.inner.poll_ready(cx)
210 }
211
212 fn call(&mut self, items: Vec<Metric>) -> Self::Future {
213 let input = encode_events(
214 self.config.token.inner(),
215 &self.config.default_namespace,
216 items,
217 );
218 let body = input.item;
219
220 self.inner.call(body)
221 }
222}
223
224#[derive(Default)]
225struct SematextMetricNormalize;
226
227impl MetricNormalize for SematextMetricNormalize {
228 fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
229 match &metric.value() {
230 MetricValue::Gauge { .. } => state.make_absolute(metric),
231 MetricValue::Counter { .. } => state.make_incremental(metric),
232 _ => {
233 emit!(SematextMetricsInvalidMetricError { metric: &metric });
234 None
235 }
236 }
237 }
238}
239
240fn create_build_request(
241 uri: http::Uri,
242) -> impl Fn(Bytes) -> BoxFuture<'static, Result<Request<Bytes>>> + Sync + Send + 'static {
243 move |body| {
244 Box::pin(ready(
245 Request::post(uri.clone())
246 .header("Content-Type", "text/plain")
247 .body(body)
248 .map_err(Into::into),
249 ))
250 }
251}
252
253fn encode_events(
254 token: &str,
255 default_namespace: &str,
256 metrics: Vec<Metric>,
257) -> EncodedEvent<Bytes> {
258 let mut output = BytesMut::new();
259 let byte_size = metrics.size_of();
260 let json_byte_size = metrics.estimated_json_encoded_size_of();
261 for metric in metrics.into_iter() {
262 let (series, data, _metadata) = metric.into_parts();
263 let namespace = series
264 .name
265 .namespace
266 .unwrap_or_else(|| default_namespace.into());
267 let label = series.name.name;
268 let ts = encode_timestamp(data.time.timestamp);
269
270 let mut tags = series.tags.unwrap_or_default();
272 tags.replace("token".into(), token.to_string());
273 let (metric_type, fields) = match data.value {
274 MetricValue::Counter { value } => ("counter", to_fields(label, value)),
275 MetricValue::Gauge { value } => ("gauge", to_fields(label, value)),
276 _ => unreachable!(), };
278
279 tags.replace("metric_type".into(), metric_type.to_string());
280
281 if let Err(error) = influx_line_protocol(
282 ProtocolVersion::V1,
283 &namespace,
284 Some(tags),
285 Some(fields),
286 ts,
287 &mut output,
288 ) {
289 emit!(SematextMetricsEncodeEventError { error });
290 };
291 }
292
293 if !output.is_empty() {
294 output.truncate(output.len() - 1);
295 }
296 EncodedEvent::new(output.freeze(), byte_size, json_byte_size)
297}
298
299fn to_fields(label: String, value: f64) -> HashMap<KeyString, Field> {
300 let mut result = HashMap::new();
301 result.insert(label.into(), Field::Float(value));
302 result
303}
304
305#[cfg(test)]
306mod tests {
307 use chrono::{offset::TimeZone, Timelike, Utc};
308 use futures::StreamExt;
309 use indoc::indoc;
310 use vector_lib::metric_tags;
311
312 use super::*;
313 use crate::{
314 event::{metric::MetricKind, Event},
315 sinks::util::test::{build_test_server, load_sink},
316 test_util::{
317 components::{assert_sink_compliance, HTTP_SINK_TAGS},
318 next_addr, test_generate_config,
319 },
320 };
321
322 #[test]
323 fn generate_config() {
324 test_generate_config::<SematextMetricsConfig>();
325 }
326
327 #[test]
328 fn test_encode_counter_event() {
329 let events = vec![Metric::new(
330 "pool.used",
331 MetricKind::Incremental,
332 MetricValue::Counter { value: 42.0 },
333 )
334 .with_namespace(Some("jvm"))
335 .with_timestamp(Some(
336 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
337 .single()
338 .expect("invalid timestamp"),
339 ))];
340
341 assert_eq!(
342 "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000",
343 encode_events("aaa", "ns", events).item
344 );
345 }
346
347 #[test]
348 fn test_encode_counter_event_no_namespace() {
349 let events = vec![Metric::new(
350 "used",
351 MetricKind::Incremental,
352 MetricValue::Counter { value: 42.0 },
353 )
354 .with_timestamp(Some(
355 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
356 .single()
357 .expect("invalid timestamp"),
358 ))];
359
360 assert_eq!(
361 "ns,metric_type=counter,token=aaa used=42 1597784400000000000",
362 encode_events("aaa", "ns", events).item
363 );
364 }
365
366 #[test]
367 fn test_encode_counter_multiple_events() {
368 let events = vec![
369 Metric::new(
370 "pool.used",
371 MetricKind::Incremental,
372 MetricValue::Counter { value: 42.0 },
373 )
374 .with_namespace(Some("jvm"))
375 .with_timestamp(Some(
376 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
377 .single()
378 .expect("invalid timestamp"),
379 )),
380 Metric::new(
381 "pool.committed",
382 MetricKind::Incremental,
383 MetricValue::Counter { value: 18874368.0 },
384 )
385 .with_namespace(Some("jvm"))
386 .with_timestamp(Some(
387 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
388 .single()
389 .and_then(|t| t.with_nanosecond(1))
390 .expect("invalid timestamp"),
391 )),
392 ];
393
394 assert_eq!(
395 "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000\n\
396 jvm,metric_type=counter,token=aaa pool.committed=18874368 1597784400000000001",
397 encode_events("aaa", "ns", events).item
398 );
399 }
400
401 #[tokio::test]
402 async fn smoke() {
403 assert_sink_compliance(&HTTP_SINK_TAGS, async {
404
405 let (mut config, cx) = load_sink::<SematextMetricsConfig>(indoc! {r#"
406 default_namespace = "ns"
407 token = "atoken"
408 batch.max_events = 1
409 "#})
410 .unwrap();
411
412 let addr = next_addr();
413 let endpoint = format!("http://{addr}");
416 config.endpoint = Some(endpoint.clone());
417
418 let (sink, _) = config.build(cx).await.unwrap();
419
420 let (rx, _trigger, server) = build_test_server(addr);
421 tokio::spawn(server);
422
423 let metrics = vec![
425 ("os", "swap.size", 324292.0),
426 ("os", "network.tx", 42000.0),
427 ("os", "network.rx", 54293.0),
428 ("process", "count", 12.0),
429 ("process", "uptime", 32423.0),
430 ("process", "rss", 2342333.0),
431 ("jvm", "pool.used", 18874368.0),
432 ("jvm", "pool.committed", 18868584.0),
433 ("jvm", "pool.max", 18874368.0),
434 ];
435
436 let mut events = Vec::new();
437 for (i, (namespace, metric, val)) in metrics.iter().enumerate() {
438 let event = Event::from(
439 Metric::new(
440 *metric,
441 MetricKind::Incremental,
442 MetricValue::Counter { value: *val },
443 )
444 .with_namespace(Some(*namespace))
445 .with_tags(Some(metric_tags!("os.host" => "somehost")))
446 .with_timestamp(Some(Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0).single()
447 .and_then(|t| t.with_nanosecond(i as u32))
448 .expect("invalid timestamp"))),
449 );
450 events.push(event);
451 }
452
453 sink.run_events(events).await.unwrap();
454
455 let output = rx.take(metrics.len()).collect::<Vec<_>>().await;
456 assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken swap.size=324292 1597784400000000000", output[0].1);
457 assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken network.tx=42000 1597784400000000001", output[1].1);
458 assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken network.rx=54293 1597784400000000002", output[2].1);
459 assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken count=12 1597784400000000003", output[3].1);
460 assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken uptime=32423 1597784400000000004", output[4].1);
461 assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken rss=2342333 1597784400000000005", output[5].1);
462 assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.used=18874368 1597784400000000006", output[6].1);
463 assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.committed=18868584 1597784400000000007", output[7].1);
464 assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.max=18874368 1597784400000000008", output[8].1);
465 }).await;
466 }
467}