vector/sinks/greptimedb/metrics/
sink.rs

1use crate::sinks::{
2    greptimedb::metrics::{
3        batch::GreptimeDBBatchSizer, request::GreptimeDBGrpcRequest,
4        request::GreptimeDBGrpcRetryLogic, request_builder::RequestBuilderOptions,
5        service::GreptimeDBGrpcService,
6    },
7    prelude::*,
8    util::buffer::metrics::{MetricNormalize, MetricSet},
9};
10use async_trait::async_trait;
11use futures::StreamExt;
12use futures_util::stream::BoxStream;
13use vector_lib::event::{Metric, MetricValue};
14
15#[derive(Clone, Debug, Default)]
16pub struct GreptimeDBMetricNormalize;
17
18impl MetricNormalize for GreptimeDBMetricNormalize {
19    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
20        match (metric.kind(), &metric.value()) {
21            (_, MetricValue::Counter { .. }) => state.make_absolute(metric),
22            (_, MetricValue::Gauge { .. }) => state.make_absolute(metric),
23            // All others are left as-is
24            _ => Some(metric),
25        }
26    }
27}
28
29/// GreptimeDBGrpcSink is a sink that sends metrics to GreptimeDB via gRPC.
30/// It uses the `GreptimeDBGrpcService` to send the metrics.
31pub struct GreptimeDBGrpcSink {
32    pub(super) service: Svc<GreptimeDBGrpcService, GreptimeDBGrpcRetryLogic>,
33    pub(super) batch_settings: BatcherSettings,
34    pub(super) request_builder_options: RequestBuilderOptions,
35}
36
37impl GreptimeDBGrpcSink {
38    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
39        input
40            .map(|event| event.into_metric())
41            .normalized_with_default::<GreptimeDBMetricNormalize>()
42            .batched(
43                self.batch_settings
44                    .as_item_size_config(GreptimeDBBatchSizer),
45            )
46            .map(|m| GreptimeDBGrpcRequest::from_metrics(m, &self.request_builder_options))
47            .into_driver(self.service)
48            .protocol("grpc")
49            .run()
50            .await
51    }
52}
53
54#[async_trait]
55impl StreamSink<Event> for GreptimeDBGrpcSink {
56    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
57        self.run_inner(input).await
58    }
59}