1use std::{collections::HashMap, future::ready, task::Poll};
2
3use bytes::{Bytes, BytesMut};
4use futures::{FutureExt, SinkExt, future::BoxFuture, stream};
5use http::{StatusCode, Uri};
6use hyper::{Body, Request};
7use indoc::indoc;
8use tower::Service;
9use vector_lib::{
10 ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component,
11 sensitive_string::SensitiveString,
12};
13
14use super::Region;
15use crate::{
16 Result,
17 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
18 event::{
19 Event, KeyString,
20 metric::{Metric, MetricValue},
21 },
22 http::HttpClient,
23 internal_events::{SematextMetricsEncodeEventError, SematextMetricsInvalidMetricError},
24 sinks::{
25 Healthcheck, HealthcheckError, VectorSink,
26 influxdb::{Field, ProtocolVersion, encode_timestamp, encode_uri, influx_line_protocol},
27 util::{
28 BatchConfig, EncodedEvent, SinkBatchSettings, TowerRequestConfig,
29 buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
30 http::{HttpBatchService, HttpRetryLogic},
31 },
32 },
33 vector_version,
34};
35
36#[derive(Clone)]
37struct SematextMetricsService {
38 config: SematextMetricsConfig,
39 inner: HttpBatchService<BoxFuture<'static, Result<Request<Bytes>>>>,
40}
41
42#[derive(Clone, Copy, Debug, Default)]
43pub(crate) struct SematextMetricsDefaultBatchSettings;
44
45impl SinkBatchSettings for SematextMetricsDefaultBatchSettings {
46 const MAX_EVENTS: Option<usize> = Some(20);
47 const MAX_BYTES: Option<usize> = None;
48 const TIMEOUT_SECS: f64 = 1.0;
49}
50
51#[configurable_component(sink("sematext_metrics", "Publish metric events to Sematext."))]
53#[derive(Clone, Debug)]
54pub struct SematextMetricsConfig {
55 #[configurable(metadata(docs::examples = "service"))]
60 pub default_namespace: String,
61
62 #[serde(default = "super::default_region")]
63 #[configurable(derived)]
64 pub region: Region,
65
66 #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
70 #[configurable(metadata(docs::examples = "https://example.com"))]
71 pub endpoint: Option<String>,
72
73 #[configurable(metadata(docs::examples = "${SEMATEXT_TOKEN}"))]
75 #[configurable(metadata(docs::examples = "some-sematext-token"))]
76 pub token: SensitiveString,
77
78 #[configurable(derived)]
79 #[serde(default)]
80 pub(self) batch: BatchConfig<SematextMetricsDefaultBatchSettings>,
81
82 #[configurable(derived)]
83 #[serde(default)]
84 pub request: TowerRequestConfig,
85
86 #[configurable(derived)]
87 #[serde(
88 default,
89 deserialize_with = "crate::serde::bool_or_struct",
90 skip_serializing_if = "crate::serde::is_default"
91 )]
92 acknowledgements: AcknowledgementsConfig,
93}
94
95impl GenerateConfig for SematextMetricsConfig {
96 fn generate_config() -> toml::Value {
97 toml::from_str(indoc! {r#"
98 default_namespace = "vector"
99 token = "${SEMATEXT_TOKEN}"
100 "#})
101 .unwrap()
102 }
103}
104
105async fn healthcheck(endpoint: String, client: HttpClient) -> Result<()> {
106 let uri = format!("{endpoint}/health");
107
108 let request = Request::get(uri)
109 .body(Body::empty())
110 .map_err(|e| e.to_string())?;
111
112 let response = client.send(request).await?;
113
114 match response.status() {
115 StatusCode::OK => Ok(()),
116 StatusCode::NO_CONTENT => Ok(()),
117 other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
118 }
119}
120
121const US_ENDPOINT: &str = "https://spm-receiver.sematext.com";
123const EU_ENDPOINT: &str = "https://spm-receiver.eu.sematext.com";
124
125#[async_trait::async_trait]
126#[typetag::serde(name = "sematext_metrics")]
127impl SinkConfig for SematextMetricsConfig {
128 async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
129 let client = HttpClient::new(None, cx.proxy())?;
130
131 let endpoint = match (&self.endpoint, &self.region) {
132 (Some(endpoint), _) => endpoint.clone(),
133 (None, Region::Us) => US_ENDPOINT.to_owned(),
134 (None, Region::Eu) => EU_ENDPOINT.to_owned(),
135 };
136
137 let healthcheck = healthcheck(endpoint.clone(), client.clone()).boxed();
138 let sink = SematextMetricsService::new(self.clone(), write_uri(&endpoint)?, client)?;
139
140 Ok((sink, healthcheck))
141 }
142
143 fn input(&self) -> Input {
144 Input::metric()
145 }
146
147 fn acknowledgements(&self) -> &AcknowledgementsConfig {
148 &self.acknowledgements
149 }
150}
151
152fn write_uri(endpoint: &str) -> Result<Uri> {
153 encode_uri(
154 endpoint,
155 "write",
156 &[
157 ("db", Some("metrics".into())),
158 ("v", Some(format!("vector-{}", vector_version()))),
159 ("precision", Some("ns".into())),
160 ],
161 )
162}
163
164impl SematextMetricsService {
165 pub fn new(
166 config: SematextMetricsConfig,
167 endpoint: http::Uri,
168 client: HttpClient,
169 ) -> Result<VectorSink> {
170 let batch = config.batch.into_batch_settings()?;
171 let request = config.request.into_settings();
172 let http_service = HttpBatchService::new(client, create_build_request(endpoint));
173 let sematext_service = SematextMetricsService {
174 config,
175 inner: http_service,
176 };
177 let mut normalizer = MetricNormalizer::<SematextMetricNormalize>::default();
178
179 let sink = request
180 .batch_sink(
181 HttpRetryLogic::default(),
182 sematext_service,
183 MetricsBuffer::new(batch.size),
184 batch.timeout,
185 )
186 .with_flat_map(move |event: Event| {
187 stream::iter({
188 let byte_size = event.size_of();
189 let json_byte_size = event.estimated_json_encoded_size_of();
190 normalizer
191 .normalize(event.into_metric())
192 .map(|item| Ok(EncodedEvent::new(item, byte_size, json_byte_size)))
193 })
194 })
195 .sink_map_err(|error| error!(message = "Fatal sematext metrics sink error.", %error));
196
197 #[allow(deprecated)]
198 Ok(VectorSink::from_event_sink(sink))
199 }
200}
201
202impl Service<Vec<Metric>> for SematextMetricsService {
203 type Response = http::Response<bytes::Bytes>;
204 type Error = crate::Error;
205 type Future = BoxFuture<'static, std::result::Result<Self::Response, Self::Error>>;
206
207 fn poll_ready(
208 &mut self,
209 cx: &mut std::task::Context,
210 ) -> Poll<std::result::Result<(), Self::Error>> {
211 self.inner.poll_ready(cx)
212 }
213
214 fn call(&mut self, items: Vec<Metric>) -> Self::Future {
215 let input = encode_events(
216 self.config.token.inner(),
217 &self.config.default_namespace,
218 items,
219 );
220 let body = input.item;
221
222 self.inner.call(body)
223 }
224}
225
226#[derive(Default)]
227struct SematextMetricNormalize;
228
229impl MetricNormalize for SematextMetricNormalize {
230 fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
231 match &metric.value() {
232 MetricValue::Gauge { .. } => state.make_absolute(metric),
233 MetricValue::Counter { .. } => state.make_incremental(metric),
234 _ => {
235 emit!(SematextMetricsInvalidMetricError { metric: &metric });
236 None
237 }
238 }
239 }
240}
241
242fn create_build_request(
243 uri: http::Uri,
244) -> impl Fn(Bytes) -> BoxFuture<'static, Result<Request<Bytes>>> + Sync + Send + 'static {
245 move |body| {
246 Box::pin(ready(
247 Request::post(uri.clone())
248 .header("Content-Type", "text/plain")
249 .body(body)
250 .map_err(Into::into),
251 ))
252 }
253}
254
255fn encode_events(
256 token: &str,
257 default_namespace: &str,
258 metrics: Vec<Metric>,
259) -> EncodedEvent<Bytes> {
260 let mut output = BytesMut::new();
261 let byte_size = metrics.size_of();
262 let json_byte_size = metrics.estimated_json_encoded_size_of();
263 for metric in metrics.into_iter() {
264 let (series, data, _metadata) = metric.into_parts();
265 let namespace = series
266 .name
267 .namespace
268 .unwrap_or_else(|| default_namespace.into());
269 let label = series.name.name;
270 let ts = encode_timestamp(data.time.timestamp);
271
272 let mut tags = series.tags.unwrap_or_default();
274 tags.replace("token".into(), token.to_string());
275 let (metric_type, fields) = match data.value {
276 MetricValue::Counter { value } => ("counter", to_fields(label, value)),
277 MetricValue::Gauge { value } => ("gauge", to_fields(label, value)),
278 _ => unreachable!(), };
280
281 tags.replace("metric_type".into(), metric_type.to_string());
282
283 if let Err(error) = influx_line_protocol(
284 ProtocolVersion::V1,
285 &namespace,
286 Some(tags),
287 Some(fields),
288 ts,
289 &mut output,
290 ) {
291 emit!(SematextMetricsEncodeEventError { error });
292 };
293 }
294
295 if !output.is_empty() {
296 output.truncate(output.len() - 1);
297 }
298 EncodedEvent::new(output.freeze(), byte_size, json_byte_size)
299}
300
301fn to_fields(label: String, value: f64) -> HashMap<KeyString, Field> {
302 let mut result = HashMap::new();
303 result.insert(label.into(), Field::Float(value));
304 result
305}
306
307#[cfg(test)]
308mod tests {
309 use chrono::{Timelike, Utc, offset::TimeZone};
310 use futures::StreamExt;
311 use indoc::indoc;
312 use vector_lib::metric_tags;
313
314 use super::*;
315 use crate::{
316 event::{Event, metric::MetricKind},
317 sinks::util::test::{build_test_server, load_sink},
318 test_util::{
319 components::{HTTP_SINK_TAGS, assert_sink_compliance},
320 next_addr, test_generate_config,
321 },
322 };
323
324 #[test]
325 fn generate_config() {
326 test_generate_config::<SematextMetricsConfig>();
327 }
328
329 #[test]
330 fn test_encode_counter_event() {
331 let events = vec![
332 Metric::new(
333 "pool.used",
334 MetricKind::Incremental,
335 MetricValue::Counter { value: 42.0 },
336 )
337 .with_namespace(Some("jvm"))
338 .with_timestamp(Some(
339 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
340 .single()
341 .expect("invalid timestamp"),
342 )),
343 ];
344
345 assert_eq!(
346 "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000",
347 encode_events("aaa", "ns", events).item
348 );
349 }
350
351 #[test]
352 fn test_encode_counter_event_no_namespace() {
353 let events = vec![
354 Metric::new(
355 "used",
356 MetricKind::Incremental,
357 MetricValue::Counter { value: 42.0 },
358 )
359 .with_timestamp(Some(
360 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
361 .single()
362 .expect("invalid timestamp"),
363 )),
364 ];
365
366 assert_eq!(
367 "ns,metric_type=counter,token=aaa used=42 1597784400000000000",
368 encode_events("aaa", "ns", events).item
369 );
370 }
371
372 #[test]
373 fn test_encode_counter_multiple_events() {
374 let events = vec![
375 Metric::new(
376 "pool.used",
377 MetricKind::Incremental,
378 MetricValue::Counter { value: 42.0 },
379 )
380 .with_namespace(Some("jvm"))
381 .with_timestamp(Some(
382 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
383 .single()
384 .expect("invalid timestamp"),
385 )),
386 Metric::new(
387 "pool.committed",
388 MetricKind::Incremental,
389 MetricValue::Counter { value: 18874368.0 },
390 )
391 .with_namespace(Some("jvm"))
392 .with_timestamp(Some(
393 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
394 .single()
395 .and_then(|t| t.with_nanosecond(1))
396 .expect("invalid timestamp"),
397 )),
398 ];
399
400 assert_eq!(
401 "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000\n\
402 jvm,metric_type=counter,token=aaa pool.committed=18874368 1597784400000000001",
403 encode_events("aaa", "ns", events).item
404 );
405 }
406
407 #[tokio::test]
408 async fn smoke() {
409 assert_sink_compliance(&HTTP_SINK_TAGS, async {
410
411 let (mut config, cx) = load_sink::<SematextMetricsConfig>(indoc! {r#"
412 default_namespace = "ns"
413 token = "atoken"
414 batch.max_events = 1
415 "#})
416 .unwrap();
417
418 let addr = next_addr();
419 let endpoint = format!("http://{addr}");
422 config.endpoint = Some(endpoint.clone());
423
424 let (sink, _) = config.build(cx).await.unwrap();
425
426 let (rx, _trigger, server) = build_test_server(addr);
427 tokio::spawn(server);
428
429 let metrics = vec![
431 ("os", "swap.size", 324292.0),
432 ("os", "network.tx", 42000.0),
433 ("os", "network.rx", 54293.0),
434 ("process", "count", 12.0),
435 ("process", "uptime", 32423.0),
436 ("process", "rss", 2342333.0),
437 ("jvm", "pool.used", 18874368.0),
438 ("jvm", "pool.committed", 18868584.0),
439 ("jvm", "pool.max", 18874368.0),
440 ];
441
442 let mut events = Vec::new();
443 for (i, (namespace, metric, val)) in metrics.iter().enumerate() {
444 let event = Event::from(
445 Metric::new(
446 *metric,
447 MetricKind::Incremental,
448 MetricValue::Counter { value: *val },
449 )
450 .with_namespace(Some(*namespace))
451 .with_tags(Some(metric_tags!("os.host" => "somehost")))
452 .with_timestamp(Some(Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0).single()
453 .and_then(|t| t.with_nanosecond(i as u32))
454 .expect("invalid timestamp"))),
455 );
456 events.push(event);
457 }
458
459 sink.run_events(events).await.unwrap();
460
461 let output = rx.take(metrics.len()).collect::<Vec<_>>().await;
462 assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken swap.size=324292 1597784400000000000", output[0].1);
463 assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken network.tx=42000 1597784400000000001", output[1].1);
464 assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken network.rx=54293 1597784400000000002", output[2].1);
465 assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken count=12 1597784400000000003", output[3].1);
466 assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken uptime=32423 1597784400000000004", output[4].1);
467 assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken rss=2342333 1597784400000000005", output[5].1);
468 assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.used=18874368 1597784400000000006", output[6].1);
469 assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.committed=18868584 1597784400000000007", output[7].1);
470 assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.max=18874368 1597784400000000008", output[8].1);
471 }).await;
472 }
473}