vector/sinks/greptimedb/metrics/
sink.rs

1use async_trait::async_trait;
2use futures::StreamExt;
3use futures_util::stream::BoxStream;
4use vector_lib::event::{Metric, MetricValue};
5
6use crate::sinks::{
7    greptimedb::metrics::{
8        batch::GreptimeDBBatchSizer,
9        request::{GreptimeDBGrpcRequest, GreptimeDBGrpcRetryLogic},
10        request_builder::RequestBuilderOptions,
11        service::GreptimeDBGrpcService,
12    },
13    prelude::*,
14    util::buffer::metrics::{MetricNormalize, MetricSet},
15};
16
17#[derive(Clone, Debug, Default)]
18pub struct GreptimeDBMetricNormalize;
19
20impl MetricNormalize for GreptimeDBMetricNormalize {
21    fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
22        match (metric.kind(), &metric.value()) {
23            (_, MetricValue::Counter { .. }) => state.make_absolute(metric),
24            (_, MetricValue::Gauge { .. }) => state.make_absolute(metric),
25            // All others are left as-is
26            _ => Some(metric),
27        }
28    }
29}
30
31/// GreptimeDBGrpcSink is a sink that sends metrics to GreptimeDB via gRPC.
32/// It uses the `GreptimeDBGrpcService` to send the metrics.
33pub struct GreptimeDBGrpcSink {
34    pub(super) service: Svc<GreptimeDBGrpcService, GreptimeDBGrpcRetryLogic>,
35    pub(super) batch_settings: BatcherSettings,
36    pub(super) request_builder_options: RequestBuilderOptions,
37}
38
39impl GreptimeDBGrpcSink {
40    async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
41        input
42            .map(|event| event.into_metric())
43            .normalized_with_default::<GreptimeDBMetricNormalize>()
44            .batched(
45                self.batch_settings
46                    .as_item_size_config(GreptimeDBBatchSizer),
47            )
48            .map(|m| GreptimeDBGrpcRequest::from_metrics(m, &self.request_builder_options))
49            .into_driver(self.service)
50            .protocol("grpc")
51            .run()
52            .await
53    }
54}
55
56#[async_trait]
57impl StreamSink<Event> for GreptimeDBGrpcSink {
58    async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
59        self.run_inner(input).await
60    }
61}