vector/sinks/greptimedb/metrics/
sink.rs1use crate::sinks::{
2 greptimedb::metrics::{
3 batch::GreptimeDBBatchSizer, request::GreptimeDBGrpcRequest,
4 request::GreptimeDBGrpcRetryLogic, request_builder::RequestBuilderOptions,
5 service::GreptimeDBGrpcService,
6 },
7 prelude::*,
8 util::buffer::metrics::{MetricNormalize, MetricSet},
9};
10use async_trait::async_trait;
11use futures::StreamExt;
12use futures_util::stream::BoxStream;
13use vector_lib::event::{Metric, MetricValue};
14
15#[derive(Clone, Debug, Default)]
16pub struct GreptimeDBMetricNormalize;
17
18impl MetricNormalize for GreptimeDBMetricNormalize {
19 fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
20 match (metric.kind(), &metric.value()) {
21 (_, MetricValue::Counter { .. }) => state.make_absolute(metric),
22 (_, MetricValue::Gauge { .. }) => state.make_absolute(metric),
23 _ => Some(metric),
25 }
26 }
27}
28
29pub struct GreptimeDBGrpcSink {
32 pub(super) service: Svc<GreptimeDBGrpcService, GreptimeDBGrpcRetryLogic>,
33 pub(super) batch_settings: BatcherSettings,
34 pub(super) request_builder_options: RequestBuilderOptions,
35}
36
37impl GreptimeDBGrpcSink {
38 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
39 input
40 .map(|event| event.into_metric())
41 .normalized_with_default::<GreptimeDBMetricNormalize>()
42 .batched(
43 self.batch_settings
44 .as_item_size_config(GreptimeDBBatchSizer),
45 )
46 .map(|m| GreptimeDBGrpcRequest::from_metrics(m, &self.request_builder_options))
47 .into_driver(self.service)
48 .protocol("grpc")
49 .run()
50 .await
51 }
52}
53
54#[async_trait]
55impl StreamSink<Event> for GreptimeDBGrpcSink {
56 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
57 self.run_inner(input).await
58 }
59}