vector/sinks/greptimedb/metrics/
request.rs1use std::num::NonZeroUsize;
2
3use greptimedb_ingester::{Error as GreptimeError, api::v1::*};
4use vector_lib::event::Metric;
5
6use crate::sinks::{
7 greptimedb::metrics::{
8 batch::GreptimeDBBatchSizer,
9 request_builder::{RequestBuilderOptions, metric_to_insert_request},
10 },
11 prelude::*,
12};
13
14#[derive(Clone)]
18pub struct GreptimeDBGrpcRequest {
19 pub(super) items: RowInsertRequests,
20 pub(super) finalizers: EventFinalizers,
21 pub(super) metadata: RequestMetadata,
22}
23
24impl GreptimeDBGrpcRequest {
25 pub(super) fn from_metrics(metrics: Vec<Metric>, options: &RequestBuilderOptions) -> Self {
27 let mut items = Vec::with_capacity(metrics.len());
28 let mut finalizers = EventFinalizers::default();
29 let mut request_metadata_builder = RequestMetadataBuilder::default();
30
31 let sizer = GreptimeDBBatchSizer;
32 let mut estimated_request_size = 0;
33 for mut metric in metrics.into_iter() {
34 finalizers.merge(metric.take_finalizers());
35 estimated_request_size += sizer.estimated_size_of(&metric);
36
37 request_metadata_builder.track_event(metric.clone());
38
39 items.push(metric_to_insert_request(metric, options));
40 }
41
42 let request_size =
43 NonZeroUsize::new(estimated_request_size).expect("request should never be zero length");
44
45 GreptimeDBGrpcRequest {
46 items: RowInsertRequests { inserts: items },
47 finalizers,
48 metadata: request_metadata_builder.with_request_size(request_size),
49 }
50 }
51}
52
53impl Finalizable for GreptimeDBGrpcRequest {
54 fn take_finalizers(&mut self) -> EventFinalizers {
55 std::mem::take(&mut self.finalizers)
56 }
57}
58
59impl MetaDescriptive for GreptimeDBGrpcRequest {
60 fn get_metadata(&self) -> &RequestMetadata {
61 &self.metadata
62 }
63
64 fn metadata_mut(&mut self) -> &mut RequestMetadata {
65 &mut self.metadata
66 }
67}
68
69#[derive(Debug)]
71pub struct GreptimeDBGrpcBatchOutput {
72 pub _item_count: u32,
73 pub metadata: RequestMetadata,
74}
75
76impl DriverResponse for GreptimeDBGrpcBatchOutput {
77 fn event_status(&self) -> EventStatus {
78 EventStatus::Delivered
79 }
80
81 fn events_sent(&self) -> &GroupedCountByteSize {
82 self.metadata.events_estimated_json_encoded_byte_size()
83 }
84
85 fn bytes_sent(&self) -> Option<usize> {
86 Some(self.metadata.request_encoded_size())
87 }
88}
89
90#[derive(Clone, Default)]
92pub struct GreptimeDBGrpcRetryLogic;
93
94impl RetryLogic for GreptimeDBGrpcRetryLogic {
95 type Error = GreptimeError;
96 type Request = GreptimeDBGrpcRequest;
97 type Response = GreptimeDBGrpcBatchOutput;
98
99 fn is_retriable_error(&self, error: &Self::Error) -> bool {
100 error.is_retriable()
101 }
102}