vector/sinks/greptimedb/logs/
http_request_builder.rs

1use std::collections::HashMap;
2
3use bytes::Bytes;
4use http::{
5    Request, StatusCode,
6    header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
7};
8use hyper::Body;
9use snafu::ResultExt;
10use vector_lib::codecs::encoding::Framer;
11
12use crate::{
13    Error,
14    codecs::{Encoder, Transformer},
15    event::{Event, EventFinalizers, Finalizable},
16    http::{Auth, HttpClient, HttpError},
17    sinks::{
18        HTTPRequestBuilderSnafu, HealthcheckError,
19        prelude::*,
20        util::http::{HttpRequest, HttpResponse, HttpRetryLogic, HttpServiceRequestBuilder},
21    },
22};
23
24/// Partition key for GreptimeDB logs sink.
25#[derive(Hash, Eq, PartialEq, Clone, Debug)]
26pub(super) struct PartitionKey {
27    pub dbname: String,
28    pub table: String,
29    pub pipeline_name: String,
30    pub pipeline_version: Option<String>,
31}
32
33/// KeyPartitioner that partitions events by (dbname, table, pipeline_name, pipeline_version) pair.
34pub(super) struct KeyPartitioner {
35    dbname: Template,
36    table: Template,
37    pipeline_name: Template,
38    pipeline_version: Option<Template>,
39}
40
41impl KeyPartitioner {
42    pub const fn new(
43        db: Template,
44        table: Template,
45        pipeline_name: Template,
46        pipeline_version: Option<Template>,
47    ) -> Self {
48        Self {
49            dbname: db,
50            table,
51            pipeline_name,
52            pipeline_version,
53        }
54    }
55
56    fn render(template: &Template, item: &Event, field: &'static str) -> Option<String> {
57        template
58            .render_string(item)
59            .map_err(|error| {
60                emit!(TemplateRenderingError {
61                    error,
62                    field: Some(field),
63                    drop_event: true,
64                });
65            })
66            .ok()
67    }
68}
69
70impl Partitioner for KeyPartitioner {
71    type Item = Event;
72    type Key = Option<PartitionKey>;
73
74    fn partition(&self, item: &Self::Item) -> Self::Key {
75        let dbname = Self::render(&self.dbname, item, "dbname_key")?;
76        let table = Self::render(&self.table, item, "table_key")?;
77        let pipeline_name = Self::render(&self.pipeline_name, item, "pipeline_name")?;
78        let pipeline_version = self
79            .pipeline_version
80            .as_ref()
81            .and_then(|template| Self::render(template, item, "pipeline_version"));
82        Some(PartitionKey {
83            dbname,
84            table,
85            pipeline_name,
86            pipeline_version,
87        })
88    }
89}
90
91/// GreptimeDB logs HTTP request builder.
92#[derive(Debug, Clone)]
93pub(super) struct GreptimeDBLogsHttpRequestBuilder {
94    pub(super) endpoint: String,
95    pub(super) auth: Option<Auth>,
96    pub(super) encoder: (Transformer, Encoder<Framer>),
97    pub(super) compression: Compression,
98    pub(super) extra_params: Option<HashMap<String, String>>,
99    pub(super) extra_headers: Option<HashMap<String, String>>,
100}
101
102fn prepare_log_ingester_url(
103    endpoint: &str,
104    db: &str,
105    table: &str,
106    metadata: &PartitionKey,
107    extra_params: &Option<HashMap<String, String>>,
108) -> String {
109    let path = format!("{endpoint}/v1/events/logs");
110    let mut url = url::Url::parse(&path).unwrap();
111    let mut url_builder = url.query_pairs_mut();
112    url_builder
113        .append_pair("db", db)
114        .append_pair("table", table)
115        .append_pair("pipeline_name", &metadata.pipeline_name);
116
117    if let Some(pipeline_version) = metadata.pipeline_version.as_ref() {
118        url_builder.append_pair("pipeline_version", pipeline_version);
119    }
120
121    if let Some(extra_params) = extra_params.as_ref() {
122        for (key, value) in extra_params.iter() {
123            url_builder.append_pair(key, value);
124        }
125    }
126
127    url_builder.finish().to_string()
128}
129
130impl HttpServiceRequestBuilder<PartitionKey> for GreptimeDBLogsHttpRequestBuilder {
131    fn build(&self, mut request: HttpRequest<PartitionKey>) -> Result<Request<Bytes>, Error> {
132        let metadata = request.get_additional_metadata();
133        let table = metadata.table.clone();
134        let db = metadata.dbname.clone();
135
136        // prepare url
137        let url = prepare_log_ingester_url(
138            self.endpoint.as_str(),
139            &db,
140            &table,
141            metadata,
142            &self.extra_params,
143        );
144
145        // prepare body
146        let payload = request.take_payload();
147
148        let mut builder = Request::post(&url)
149            .header(CONTENT_TYPE, "application/x-ndjson")
150            .header(CONTENT_LENGTH, payload.len());
151
152        if let Some(extra_headers) = self.extra_headers.as_ref() {
153            for (key, value) in extra_headers.iter() {
154                builder = builder.header(key, value);
155            }
156        }
157
158        if let Some(ce) = self.compression.content_encoding() {
159            builder = builder.header(CONTENT_ENCODING, ce);
160        }
161
162        if let Some(auth) = self.auth.clone() {
163            builder = auth.apply_builder(builder);
164        }
165
166        builder
167            .body(payload)
168            .context(HTTPRequestBuilderSnafu)
169            .map_err(Into::into)
170    }
171}
172
173impl RequestBuilder<(PartitionKey, Vec<Event>)> for GreptimeDBLogsHttpRequestBuilder {
174    type Metadata = (PartitionKey, EventFinalizers);
175    type Events = Vec<Event>;
176    type Encoder = (Transformer, Encoder<Framer>);
177    type Payload = Bytes;
178    type Request = HttpRequest<PartitionKey>;
179    type Error = std::io::Error;
180
181    fn compression(&self) -> Compression {
182        self.compression
183    }
184
185    fn encoder(&self) -> &Self::Encoder {
186        &self.encoder
187    }
188
189    fn split_input(
190        &self,
191        input: (PartitionKey, Vec<Event>),
192    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
193        let (key, mut events) = input;
194
195        let finalizers = events.take_finalizers();
196        let builder = RequestMetadataBuilder::from_events(&events);
197        ((key, finalizers), builder, events)
198    }
199
200    fn build_request(
201        &self,
202        metadata: Self::Metadata,
203        request_metadata: RequestMetadata,
204        payload: EncodeResult<Self::Payload>,
205    ) -> Self::Request {
206        let (key, finalizers) = metadata;
207        HttpRequest::new(
208            payload.into_payload(),
209            finalizers,
210            request_metadata,
211            PartitionKey {
212                dbname: key.dbname,
213                table: key.table,
214                pipeline_name: key.pipeline_name,
215                pipeline_version: key.pipeline_version,
216            },
217        )
218    }
219}
220
221pub(super) async fn http_healthcheck(
222    client: HttpClient,
223    endpoint: String,
224    auth: Option<Auth>,
225) -> crate::Result<()> {
226    let uri = format!("{endpoint}/health");
227    let mut request = Request::get(uri).body(Body::empty())?;
228
229    if let Some(auth) = auth {
230        auth.apply(&mut request);
231    }
232
233    let response = client.send(request).await?;
234
235    match response.status() {
236        StatusCode::OK => Ok(()),
237        status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
238    }
239}
240
241/// GreptimeDB HTTP retry logic.
242#[derive(Clone, Default)]
243pub(super) struct GreptimeDBHttpRetryLogic {
244    inner: HttpRetryLogic<HttpRequest<PartitionKey>>,
245}
246
247impl RetryLogic for GreptimeDBHttpRetryLogic {
248    type Error = HttpError;
249    type Request = HttpRequest<PartitionKey>;
250    type Response = HttpResponse;
251
252    fn is_retriable_error(&self, error: &Self::Error) -> bool {
253        error.is_retriable()
254    }
255
256    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
257        self.inner.should_retry_response(&response.http_response)
258    }
259}
260
261#[cfg(test)]
262mod tests {
263    use super::*;
264
265    #[test]
266    fn test_prepare_url() {
267        let endpoint = "http://localhost:8080";
268        let db = "test_db";
269        let table = "test_table";
270        let metadata = PartitionKey {
271            dbname: "test_db".to_string(),
272            table: "test_table".to_string(),
273            pipeline_name: "test_pipeline".to_string(),
274            pipeline_version: Some("test_version".to_string()),
275        };
276        let params = vec![("param1", "value1"), ("param2", "value2")];
277        let extra_params = Some(
278            params
279                .into_iter()
280                .map(|(k, v)| (k.to_string(), v.to_string()))
281                .collect(),
282        );
283
284        let url = prepare_log_ingester_url(endpoint, db, table, &metadata, &extra_params);
285        let url = url::Url::parse(&url).unwrap();
286        let check = url.query_pairs().all(|(k, v)| match k.as_ref() {
287            "db" => v == "test_db",
288            "table" => v == "test_table",
289            "pipeline_name" => v == "test_pipeline",
290            "pipeline_version" => v == "test_version",
291            "param1" => v == "value1",
292            "param2" => v == "value2",
293            _ => false,
294        });
295        assert!(check);
296    }
297}