1use std::{collections::HashMap, future::ready, task::Poll};
2
3use bytes::{Bytes, BytesMut};
4use futures::{FutureExt, SinkExt, future::BoxFuture, stream};
5use http::{StatusCode, Uri};
6use hyper::{Body, Request};
7use indoc::indoc;
8use tower::Service;
9use vector_lib::{
10 ByteSizeOf, EstimatedJsonEncodedSizeOf, configurable::configurable_component,
11 sensitive_string::SensitiveString,
12};
13
14use super::Region;
15use crate::{
16 Result,
17 config::{AcknowledgementsConfig, GenerateConfig, Input, SinkConfig, SinkContext},
18 event::{
19 Event, KeyString,
20 metric::{Metric, MetricValue},
21 },
22 http::HttpClient,
23 internal_events::{SematextMetricsEncodeEventError, SematextMetricsInvalidMetricError},
24 sinks::{
25 Healthcheck, HealthcheckError, VectorSink,
26 influxdb::{Field, ProtocolVersion, encode_timestamp, encode_uri, influx_line_protocol},
27 util::{
28 BatchConfig, EncodedEvent, SinkBatchSettings, TowerRequestConfig,
29 buffer::metrics::{MetricNormalize, MetricNormalizer, MetricSet, MetricsBuffer},
30 http::{HttpBatchService, HttpRetryLogic},
31 },
32 },
33 vector_version,
34};
35
36#[derive(Clone)]
37struct SematextMetricsService {
38 config: SematextMetricsConfig,
39 inner: HttpBatchService<BoxFuture<'static, Result<Request<Bytes>>>>,
40}
41
42#[derive(Clone, Copy, Debug, Default)]
43pub(crate) struct SematextMetricsDefaultBatchSettings;
44
45impl SinkBatchSettings for SematextMetricsDefaultBatchSettings {
46 const MAX_EVENTS: Option<usize> = Some(20);
47 const MAX_BYTES: Option<usize> = None;
48 const TIMEOUT_SECS: f64 = 1.0;
49}
50
51#[configurable_component(sink("sematext_metrics", "Publish metric events to Sematext."))]
53#[derive(Clone, Debug)]
54pub struct SematextMetricsConfig {
55 #[configurable(metadata(docs::examples = "service"))]
60 pub default_namespace: String,
61
62 #[serde(default = "super::default_region")]
63 #[configurable(derived)]
64 pub region: Region,
65
66 #[configurable(metadata(docs::examples = "http://127.0.0.1"))]
70 #[configurable(metadata(docs::examples = "https://example.com"))]
71 pub endpoint: Option<String>,
72
73 #[configurable(metadata(docs::examples = "${SEMATEXT_TOKEN}"))]
75 #[configurable(metadata(docs::examples = "some-sematext-token"))]
76 pub token: SensitiveString,
77
78 #[configurable(derived)]
79 #[serde(default)]
80 pub(self) batch: BatchConfig<SematextMetricsDefaultBatchSettings>,
81
82 #[configurable(derived)]
83 #[serde(default)]
84 pub request: TowerRequestConfig,
85
86 #[configurable(derived)]
87 #[serde(
88 default,
89 deserialize_with = "crate::serde::bool_or_struct",
90 skip_serializing_if = "crate::serde::is_default"
91 )]
92 acknowledgements: AcknowledgementsConfig,
93}
94
95impl GenerateConfig for SematextMetricsConfig {
96 fn generate_config() -> toml::Value {
97 toml::from_str(indoc! {r#"
98 default_namespace = "vector"
99 token = "${SEMATEXT_TOKEN}"
100 "#})
101 .unwrap()
102 }
103}
104
105async fn healthcheck(endpoint: String, client: HttpClient) -> Result<()> {
106 let uri = format!("{endpoint}/health");
107
108 let request = Request::get(uri)
109 .body(Body::empty())
110 .map_err(|e| e.to_string())?;
111
112 let response = client.send(request).await?;
113
114 match response.status() {
115 StatusCode::OK => Ok(()),
116 StatusCode::NO_CONTENT => Ok(()),
117 other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
118 }
119}
120
121const US_ENDPOINT: &str = "https://spm-receiver.sematext.com";
123const EU_ENDPOINT: &str = "https://spm-receiver.eu.sematext.com";
124
125#[async_trait::async_trait]
126#[typetag::serde(name = "sematext_metrics")]
127impl SinkConfig for SematextMetricsConfig {
128 async fn build(&self, cx: SinkContext) -> Result<(VectorSink, Healthcheck)> {
129 let client = HttpClient::new(None, cx.proxy())?;
130
131 let endpoint = match (&self.endpoint, &self.region) {
132 (Some(endpoint), _) => endpoint.clone(),
133 (None, Region::Us) => US_ENDPOINT.to_owned(),
134 (None, Region::Eu) => EU_ENDPOINT.to_owned(),
135 };
136
137 let healthcheck = healthcheck(endpoint.clone(), client.clone()).boxed();
138 let sink = SematextMetricsService::new(self.clone(), write_uri(&endpoint)?, client)?;
139
140 Ok((sink, healthcheck))
141 }
142
143 fn input(&self) -> Input {
144 Input::metric()
145 }
146
147 fn acknowledgements(&self) -> &AcknowledgementsConfig {
148 &self.acknowledgements
149 }
150}
151
152fn write_uri(endpoint: &str) -> Result<Uri> {
153 encode_uri(
154 endpoint,
155 "write",
156 &[
157 ("db", Some("metrics".into())),
158 ("v", Some(format!("vector-{}", vector_version()))),
159 ("precision", Some("ns".into())),
160 ],
161 )
162}
163
164impl SematextMetricsService {
165 pub fn new(
166 config: SematextMetricsConfig,
167 endpoint: http::Uri,
168 client: HttpClient,
169 ) -> Result<VectorSink> {
170 let batch = config.batch.into_batch_settings()?;
171 let request = config.request.into_settings();
172 let http_service = HttpBatchService::new(client, create_build_request(endpoint));
173 let sematext_service = SematextMetricsService {
174 config,
175 inner: http_service,
176 };
177 let mut normalizer = MetricNormalizer::<SematextMetricNormalize>::default();
178
179 let sink = request
180 .batch_sink(
181 HttpRetryLogic::default(),
182 sematext_service,
183 MetricsBuffer::new(batch.size),
184 batch.timeout,
185 )
186 .with_flat_map(move |event: Event| {
187 stream::iter({
188 let byte_size = event.size_of();
189 let json_byte_size = event.estimated_json_encoded_size_of();
190 normalizer
191 .normalize(event.into_metric())
192 .map(|item| Ok(EncodedEvent::new(item, byte_size, json_byte_size)))
193 })
194 })
195 .sink_map_err(|error| error!(message = "Fatal sematext metrics sink error.", %error, internal_log_rate_limit = false));
196
197 #[allow(deprecated)]
198 Ok(VectorSink::from_event_sink(sink))
199 }
200}
201
202impl Service<Vec<Metric>> for SematextMetricsService {
203 type Response = http::Response<bytes::Bytes>;
204 type Error = crate::Error;
205 type Future = BoxFuture<'static, std::result::Result<Self::Response, Self::Error>>;
206
207 fn poll_ready(
208 &mut self,
209 cx: &mut std::task::Context,
210 ) -> Poll<std::result::Result<(), Self::Error>> {
211 self.inner.poll_ready(cx)
212 }
213
214 fn call(&mut self, items: Vec<Metric>) -> Self::Future {
215 let input = encode_events(
216 self.config.token.inner(),
217 &self.config.default_namespace,
218 items,
219 );
220 let body = input.item;
221
222 self.inner.call(body)
223 }
224}
225
226#[derive(Default)]
227struct SematextMetricNormalize;
228
229impl MetricNormalize for SematextMetricNormalize {
230 fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
231 match &metric.value() {
232 MetricValue::Gauge { .. } => state.make_absolute(metric),
233 MetricValue::Counter { .. } => state.make_incremental(metric),
234 _ => {
235 emit!(SematextMetricsInvalidMetricError { metric: &metric });
236 None
237 }
238 }
239 }
240}
241
242fn create_build_request(
243 uri: http::Uri,
244) -> impl Fn(Bytes) -> BoxFuture<'static, Result<Request<Bytes>>> + Sync + Send + 'static {
245 move |body| {
246 Box::pin(ready(
247 Request::post(uri.clone())
248 .header("Content-Type", "text/plain")
249 .body(body)
250 .map_err(Into::into),
251 ))
252 }
253}
254
255fn encode_events(
256 token: &str,
257 default_namespace: &str,
258 metrics: Vec<Metric>,
259) -> EncodedEvent<Bytes> {
260 let mut output = BytesMut::new();
261 let byte_size = metrics.size_of();
262 let json_byte_size = metrics.estimated_json_encoded_size_of();
263 for metric in metrics.into_iter() {
264 let (series, data, _metadata) = metric.into_parts();
265 let namespace = series
266 .name
267 .namespace
268 .unwrap_or_else(|| default_namespace.into());
269 let label = series.name.name;
270 let ts = encode_timestamp(data.time.timestamp);
271
272 let mut tags = series.tags.unwrap_or_default();
274 tags.replace("token".into(), token.to_string());
275 let (metric_type, fields) = match data.value {
276 MetricValue::Counter { value } => ("counter", to_fields(label, value)),
277 MetricValue::Gauge { value } => ("gauge", to_fields(label, value)),
278 _ => unreachable!(), };
280
281 tags.replace("metric_type".into(), metric_type.to_string());
282
283 if let Err(error) = influx_line_protocol(
284 ProtocolVersion::V1,
285 &namespace,
286 Some(tags),
287 Some(fields),
288 ts,
289 &mut output,
290 ) {
291 emit!(SematextMetricsEncodeEventError { error });
292 };
293 }
294
295 if !output.is_empty() {
296 output.truncate(output.len() - 1);
297 }
298 EncodedEvent::new(output.freeze(), byte_size, json_byte_size)
299}
300
301fn to_fields(label: String, value: f64) -> HashMap<KeyString, Field> {
302 let mut result = HashMap::new();
303 result.insert(label.into(), Field::Float(value));
304 result
305}
306
307#[cfg(test)]
308mod tests {
309 use chrono::{Timelike, Utc, offset::TimeZone};
310 use futures::StreamExt;
311 use indoc::indoc;
312 use vector_lib::metric_tags;
313
314 use super::*;
315 use crate::{
316 event::{Event, metric::MetricKind},
317 sinks::util::test::{build_test_server, load_sink},
318 test_util::{
319 addr::next_addr,
320 components::{HTTP_SINK_TAGS, assert_sink_compliance},
321 test_generate_config,
322 },
323 };
324
325 #[test]
326 fn generate_config() {
327 test_generate_config::<SematextMetricsConfig>();
328 }
329
330 #[test]
331 fn test_encode_counter_event() {
332 let events = vec![
333 Metric::new(
334 "pool.used",
335 MetricKind::Incremental,
336 MetricValue::Counter { value: 42.0 },
337 )
338 .with_namespace(Some("jvm"))
339 .with_timestamp(Some(
340 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
341 .single()
342 .expect("invalid timestamp"),
343 )),
344 ];
345
346 assert_eq!(
347 "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000",
348 encode_events("aaa", "ns", events).item
349 );
350 }
351
352 #[test]
353 fn test_encode_counter_event_no_namespace() {
354 let events = vec![
355 Metric::new(
356 "used",
357 MetricKind::Incremental,
358 MetricValue::Counter { value: 42.0 },
359 )
360 .with_timestamp(Some(
361 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
362 .single()
363 .expect("invalid timestamp"),
364 )),
365 ];
366
367 assert_eq!(
368 "ns,metric_type=counter,token=aaa used=42 1597784400000000000",
369 encode_events("aaa", "ns", events).item
370 );
371 }
372
373 #[test]
374 fn test_encode_counter_multiple_events() {
375 let events = vec![
376 Metric::new(
377 "pool.used",
378 MetricKind::Incremental,
379 MetricValue::Counter { value: 42.0 },
380 )
381 .with_namespace(Some("jvm"))
382 .with_timestamp(Some(
383 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
384 .single()
385 .expect("invalid timestamp"),
386 )),
387 Metric::new(
388 "pool.committed",
389 MetricKind::Incremental,
390 MetricValue::Counter { value: 18874368.0 },
391 )
392 .with_namespace(Some("jvm"))
393 .with_timestamp(Some(
394 Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0)
395 .single()
396 .and_then(|t| t.with_nanosecond(1))
397 .expect("invalid timestamp"),
398 )),
399 ];
400
401 assert_eq!(
402 "jvm,metric_type=counter,token=aaa pool.used=42 1597784400000000000\n\
403 jvm,metric_type=counter,token=aaa pool.committed=18874368 1597784400000000001",
404 encode_events("aaa", "ns", events).item
405 );
406 }
407
408 #[tokio::test]
409 async fn smoke() {
410 assert_sink_compliance(&HTTP_SINK_TAGS, async {
411
412 let (mut config, cx) = load_sink::<SematextMetricsConfig>(indoc! {r#"
413 default_namespace = "ns"
414 token = "atoken"
415 batch.max_events = 1
416 "#})
417 .unwrap();
418
419 let (_guard, addr) = next_addr();
420 let endpoint = format!("http://{addr}");
423 config.endpoint = Some(endpoint.clone());
424
425 let (sink, _) = config.build(cx).await.unwrap();
426
427 let (rx, _trigger, server) = build_test_server(addr);
428 tokio::spawn(server);
429
430 let metrics = vec![
432 ("os", "swap.size", 324292.0),
433 ("os", "network.tx", 42000.0),
434 ("os", "network.rx", 54293.0),
435 ("process", "count", 12.0),
436 ("process", "uptime", 32423.0),
437 ("process", "rss", 2342333.0),
438 ("jvm", "pool.used", 18874368.0),
439 ("jvm", "pool.committed", 18868584.0),
440 ("jvm", "pool.max", 18874368.0),
441 ];
442
443 let mut events = Vec::new();
444 for (i, (namespace, metric, val)) in metrics.iter().enumerate() {
445 let event = Event::from(
446 Metric::new(
447 *metric,
448 MetricKind::Incremental,
449 MetricValue::Counter { value: *val },
450 )
451 .with_namespace(Some(*namespace))
452 .with_tags(Some(metric_tags!("os.host" => "somehost")))
453 .with_timestamp(Some(Utc.with_ymd_and_hms(2020, 8, 18, 21, 0, 0).single()
454 .and_then(|t| t.with_nanosecond(i as u32))
455 .expect("invalid timestamp"))),
456 );
457 events.push(event);
458 }
459
460 sink.run_events(events).await.unwrap();
461
462 let output = rx.take(metrics.len()).collect::<Vec<_>>().await;
463 assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken swap.size=324292 1597784400000000000", output[0].1);
464 assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken network.tx=42000 1597784400000000001", output[1].1);
465 assert_eq!("os,metric_type=counter,os.host=somehost,token=atoken network.rx=54293 1597784400000000002", output[2].1);
466 assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken count=12 1597784400000000003", output[3].1);
467 assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken uptime=32423 1597784400000000004", output[4].1);
468 assert_eq!("process,metric_type=counter,os.host=somehost,token=atoken rss=2342333 1597784400000000005", output[5].1);
469 assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.used=18874368 1597784400000000006", output[6].1);
470 assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.committed=18868584 1597784400000000007", output[7].1);
471 assert_eq!("jvm,metric_type=counter,os.host=somehost,token=atoken pool.max=18874368 1597784400000000008", output[8].1);
472 }).await;
473 }
474}