vector/sinks/greptimedb/metrics/
request.rs

1use std::num::NonZeroUsize;
2
3use greptimedb_ingester::{Error as GreptimeError, api::v1::*};
4use vector_lib::event::Metric;
5
6use crate::sinks::{
7    greptimedb::metrics::{
8        batch::GreptimeDBBatchSizer,
9        request_builder::{RequestBuilderOptions, metric_to_insert_request},
10    },
11    prelude::*,
12};
13
14/// GreptimeDBGrpcRequest is a wrapper around the RowInsertRequests
15/// that is used to send metrics to GreptimeDB.
16/// It also contains the finalizers and metadata that are used to
17#[derive(Clone)]
18pub struct GreptimeDBGrpcRequest {
19    pub(super) items: RowInsertRequests,
20    pub(super) finalizers: EventFinalizers,
21    pub(super) metadata: RequestMetadata,
22}
23
24impl GreptimeDBGrpcRequest {
25    // convert metrics event to GreptimeDBGrpcRequest
26    pub(super) fn from_metrics(metrics: Vec<Metric>, options: &RequestBuilderOptions) -> Self {
27        let mut items = Vec::with_capacity(metrics.len());
28        let mut finalizers = EventFinalizers::default();
29        let mut request_metadata_builder = RequestMetadataBuilder::default();
30
31        let sizer = GreptimeDBBatchSizer;
32        let mut estimated_request_size = 0;
33        for mut metric in metrics.into_iter() {
34            finalizers.merge(metric.take_finalizers());
35            estimated_request_size += sizer.estimated_size_of(&metric);
36
37            request_metadata_builder.track_event(metric.clone());
38
39            items.push(metric_to_insert_request(metric, options));
40        }
41
42        let request_size =
43            NonZeroUsize::new(estimated_request_size).expect("request should never be zero length");
44
45        GreptimeDBGrpcRequest {
46            items: RowInsertRequests { inserts: items },
47            finalizers,
48            metadata: request_metadata_builder.with_request_size(request_size),
49        }
50    }
51}
52
53impl Finalizable for GreptimeDBGrpcRequest {
54    fn take_finalizers(&mut self) -> EventFinalizers {
55        std::mem::take(&mut self.finalizers)
56    }
57}
58
59impl MetaDescriptive for GreptimeDBGrpcRequest {
60    fn get_metadata(&self) -> &RequestMetadata {
61        &self.metadata
62    }
63
64    fn metadata_mut(&mut self) -> &mut RequestMetadata {
65        &mut self.metadata
66    }
67}
68
69/// GreptimeDBGrpcBatchOutput is the output of the [`GreptimeDBGrpcService`]
70#[derive(Debug)]
71pub struct GreptimeDBGrpcBatchOutput {
72    pub _item_count: u32,
73    pub metadata: RequestMetadata,
74}
75
76impl DriverResponse for GreptimeDBGrpcBatchOutput {
77    fn event_status(&self) -> EventStatus {
78        EventStatus::Delivered
79    }
80
81    fn events_sent(&self) -> &GroupedCountByteSize {
82        self.metadata.events_estimated_json_encoded_byte_size()
83    }
84
85    fn bytes_sent(&self) -> Option<usize> {
86        Some(self.metadata.request_encoded_size())
87    }
88}
89
90/// GreptimeDBGrpcRetryLogic is the retry logic for the [`GreptimeDBGrpcSink`]
91#[derive(Clone, Default)]
92pub struct GreptimeDBGrpcRetryLogic;
93
94impl RetryLogic for GreptimeDBGrpcRetryLogic {
95    type Error = GreptimeError;
96    type Request = GreptimeDBGrpcRequest;
97    type Response = GreptimeDBGrpcBatchOutput;
98
99    fn is_retriable_error(&self, error: &Self::Error) -> bool {
100        error.is_retriable()
101    }
102}