vector/sinks/greptimedb/metrics/
request.rs1use crate::sinks::{
2 greptimedb::metrics::{
3 batch::GreptimeDBBatchSizer,
4 request_builder::{metric_to_insert_request, RequestBuilderOptions},
5 },
6 prelude::*,
7};
8use greptimedb_ingester::{api::v1::*, Error as GreptimeError};
9use std::num::NonZeroUsize;
10use vector_lib::event::Metric;
11
12#[derive(Clone)]
16pub struct GreptimeDBGrpcRequest {
17 pub(super) items: RowInsertRequests,
18 pub(super) finalizers: EventFinalizers,
19 pub(super) metadata: RequestMetadata,
20}
21
22impl GreptimeDBGrpcRequest {
23 pub(super) fn from_metrics(metrics: Vec<Metric>, options: &RequestBuilderOptions) -> Self {
25 let mut items = Vec::with_capacity(metrics.len());
26 let mut finalizers = EventFinalizers::default();
27 let mut request_metadata_builder = RequestMetadataBuilder::default();
28
29 let sizer = GreptimeDBBatchSizer;
30 let mut estimated_request_size = 0;
31 for mut metric in metrics.into_iter() {
32 finalizers.merge(metric.take_finalizers());
33 estimated_request_size += sizer.estimated_size_of(&metric);
34
35 request_metadata_builder.track_event(metric.clone());
36
37 items.push(metric_to_insert_request(metric, options));
38 }
39
40 let request_size =
41 NonZeroUsize::new(estimated_request_size).expect("request should never be zero length");
42
43 GreptimeDBGrpcRequest {
44 items: RowInsertRequests { inserts: items },
45 finalizers,
46 metadata: request_metadata_builder.with_request_size(request_size),
47 }
48 }
49}
50
51impl Finalizable for GreptimeDBGrpcRequest {
52 fn take_finalizers(&mut self) -> EventFinalizers {
53 std::mem::take(&mut self.finalizers)
54 }
55}
56
57impl MetaDescriptive for GreptimeDBGrpcRequest {
58 fn get_metadata(&self) -> &RequestMetadata {
59 &self.metadata
60 }
61
62 fn metadata_mut(&mut self) -> &mut RequestMetadata {
63 &mut self.metadata
64 }
65}
66
67#[derive(Debug)]
69pub struct GreptimeDBGrpcBatchOutput {
70 pub _item_count: u32,
71 pub metadata: RequestMetadata,
72}
73
74impl DriverResponse for GreptimeDBGrpcBatchOutput {
75 fn event_status(&self) -> EventStatus {
76 EventStatus::Delivered
77 }
78
79 fn events_sent(&self) -> &GroupedCountByteSize {
80 self.metadata.events_estimated_json_encoded_byte_size()
81 }
82
83 fn bytes_sent(&self) -> Option<usize> {
84 Some(self.metadata.request_encoded_size())
85 }
86}
87
88#[derive(Clone, Default)]
90pub struct GreptimeDBGrpcRetryLogic;
91
92impl RetryLogic for GreptimeDBGrpcRetryLogic {
93 type Error = GreptimeError;
94 type Request = GreptimeDBGrpcRequest;
95 type Response = GreptimeDBGrpcBatchOutput;
96
97 fn is_retriable_error(&self, error: &Self::Error) -> bool {
98 error.is_retriable()
99 }
100}