vector/sinks/greptimedb/metrics/
sink.rs1use async_trait::async_trait;
2use futures::StreamExt;
3use futures_util::stream::BoxStream;
4use vector_lib::event::{Metric, MetricValue};
5
6use crate::sinks::{
7 greptimedb::metrics::{
8 batch::GreptimeDBBatchSizer,
9 request::{GreptimeDBGrpcRequest, GreptimeDBGrpcRetryLogic},
10 request_builder::RequestBuilderOptions,
11 service::GreptimeDBGrpcService,
12 },
13 prelude::*,
14 util::buffer::metrics::{MetricNormalize, MetricSet},
15};
16
17#[derive(Clone, Debug, Default)]
18pub struct GreptimeDBMetricNormalize;
19
20impl MetricNormalize for GreptimeDBMetricNormalize {
21 fn normalize(&mut self, state: &mut MetricSet, metric: Metric) -> Option<Metric> {
22 match (metric.kind(), &metric.value()) {
23 (_, MetricValue::Counter { .. }) => state.make_absolute(metric),
24 (_, MetricValue::Gauge { .. }) => state.make_absolute(metric),
25 _ => Some(metric),
27 }
28 }
29}
30
31pub struct GreptimeDBGrpcSink {
34 pub(super) service: Svc<GreptimeDBGrpcService, GreptimeDBGrpcRetryLogic>,
35 pub(super) batch_settings: BatcherSettings,
36 pub(super) request_builder_options: RequestBuilderOptions,
37}
38
39impl GreptimeDBGrpcSink {
40 async fn run_inner(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
41 input
42 .map(|event| event.into_metric())
43 .normalized_with_default::<GreptimeDBMetricNormalize>()
44 .batched(
45 self.batch_settings
46 .as_item_size_config(GreptimeDBBatchSizer),
47 )
48 .map(|m| GreptimeDBGrpcRequest::from_metrics(m, &self.request_builder_options))
49 .into_driver(self.service)
50 .protocol("grpc")
51 .run()
52 .await
53 }
54}
55
56#[async_trait]
57impl StreamSink<Event> for GreptimeDBGrpcSink {
58 async fn run(self: Box<Self>, input: BoxStream<'_, Event>) -> Result<(), ()> {
59 self.run_inner(input).await
60 }
61}