vector/sinks/greptimedb/metrics/
request.rs

1use crate::sinks::{
2    greptimedb::metrics::{
3        batch::GreptimeDBBatchSizer,
4        request_builder::{metric_to_insert_request, RequestBuilderOptions},
5    },
6    prelude::*,
7};
8use greptimedb_ingester::{api::v1::*, Error as GreptimeError};
9use std::num::NonZeroUsize;
10use vector_lib::event::Metric;
11
12/// GreptimeDBGrpcRequest is a wrapper around the RowInsertRequests
13/// that is used to send metrics to GreptimeDB.
14/// It also contains the finalizers and metadata that are used to
15#[derive(Clone)]
16pub struct GreptimeDBGrpcRequest {
17    pub(super) items: RowInsertRequests,
18    pub(super) finalizers: EventFinalizers,
19    pub(super) metadata: RequestMetadata,
20}
21
22impl GreptimeDBGrpcRequest {
23    // convert metrics event to GreptimeDBGrpcRequest
24    pub(super) fn from_metrics(metrics: Vec<Metric>, options: &RequestBuilderOptions) -> Self {
25        let mut items = Vec::with_capacity(metrics.len());
26        let mut finalizers = EventFinalizers::default();
27        let mut request_metadata_builder = RequestMetadataBuilder::default();
28
29        let sizer = GreptimeDBBatchSizer;
30        let mut estimated_request_size = 0;
31        for mut metric in metrics.into_iter() {
32            finalizers.merge(metric.take_finalizers());
33            estimated_request_size += sizer.estimated_size_of(&metric);
34
35            request_metadata_builder.track_event(metric.clone());
36
37            items.push(metric_to_insert_request(metric, options));
38        }
39
40        let request_size =
41            NonZeroUsize::new(estimated_request_size).expect("request should never be zero length");
42
43        GreptimeDBGrpcRequest {
44            items: RowInsertRequests { inserts: items },
45            finalizers,
46            metadata: request_metadata_builder.with_request_size(request_size),
47        }
48    }
49}
50
51impl Finalizable for GreptimeDBGrpcRequest {
52    fn take_finalizers(&mut self) -> EventFinalizers {
53        std::mem::take(&mut self.finalizers)
54    }
55}
56
57impl MetaDescriptive for GreptimeDBGrpcRequest {
58    fn get_metadata(&self) -> &RequestMetadata {
59        &self.metadata
60    }
61
62    fn metadata_mut(&mut self) -> &mut RequestMetadata {
63        &mut self.metadata
64    }
65}
66
67/// GreptimeDBGrpcBatchOutput is the output of the [`GreptimeDBGrpcService`]
68#[derive(Debug)]
69pub struct GreptimeDBGrpcBatchOutput {
70    pub _item_count: u32,
71    pub metadata: RequestMetadata,
72}
73
74impl DriverResponse for GreptimeDBGrpcBatchOutput {
75    fn event_status(&self) -> EventStatus {
76        EventStatus::Delivered
77    }
78
79    fn events_sent(&self) -> &GroupedCountByteSize {
80        self.metadata.events_estimated_json_encoded_byte_size()
81    }
82
83    fn bytes_sent(&self) -> Option<usize> {
84        Some(self.metadata.request_encoded_size())
85    }
86}
87
88/// GreptimeDBGrpcRetryLogic is the retry logic for the [`GreptimeDBGrpcSink`]
89#[derive(Clone, Default)]
90pub struct GreptimeDBGrpcRetryLogic;
91
92impl RetryLogic for GreptimeDBGrpcRetryLogic {
93    type Error = GreptimeError;
94    type Request = GreptimeDBGrpcRequest;
95    type Response = GreptimeDBGrpcBatchOutput;
96
97    fn is_retriable_error(&self, error: &Self::Error) -> bool {
98        error.is_retriable()
99    }
100}