vector/sinks/greptimedb/logs/
http_request_builder.rs

1use crate::{
2    codecs::{Encoder, Transformer},
3    event::{Event, EventFinalizers, Finalizable},
4    http::{Auth, HttpClient, HttpError},
5    sinks::{
6        prelude::*,
7        util::http::{HttpRequest, HttpResponse, HttpRetryLogic, HttpServiceRequestBuilder},
8        HTTPRequestBuilderSnafu, HealthcheckError,
9    },
10    Error,
11};
12use bytes::Bytes;
13use http::{
14    header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
15    Request, StatusCode,
16};
17use hyper::Body;
18use snafu::ResultExt;
19use std::collections::HashMap;
20use vector_lib::codecs::encoding::Framer;
21
22/// Partition key for GreptimeDB logs sink.
23#[derive(Hash, Eq, PartialEq, Clone, Debug)]
24pub(super) struct PartitionKey {
25    pub dbname: String,
26    pub table: String,
27    pub pipeline_name: String,
28    pub pipeline_version: Option<String>,
29}
30
31/// KeyPartitioner that partitions events by (dbname, table, pipeline_name, pipeline_version) pair.
32pub(super) struct KeyPartitioner {
33    dbname: Template,
34    table: Template,
35    pipeline_name: Template,
36    pipeline_version: Option<Template>,
37}
38
39impl KeyPartitioner {
40    pub const fn new(
41        db: Template,
42        table: Template,
43        pipeline_name: Template,
44        pipeline_version: Option<Template>,
45    ) -> Self {
46        Self {
47            dbname: db,
48            table,
49            pipeline_name,
50            pipeline_version,
51        }
52    }
53
54    fn render(template: &Template, item: &Event, field: &'static str) -> Option<String> {
55        template
56            .render_string(item)
57            .map_err(|error| {
58                emit!(TemplateRenderingError {
59                    error,
60                    field: Some(field),
61                    drop_event: true,
62                });
63            })
64            .ok()
65    }
66}
67
68impl Partitioner for KeyPartitioner {
69    type Item = Event;
70    type Key = Option<PartitionKey>;
71
72    fn partition(&self, item: &Self::Item) -> Self::Key {
73        let dbname = Self::render(&self.dbname, item, "dbname_key")?;
74        let table = Self::render(&self.table, item, "table_key")?;
75        let pipeline_name = Self::render(&self.pipeline_name, item, "pipeline_name")?;
76        let pipeline_version = self
77            .pipeline_version
78            .as_ref()
79            .and_then(|template| Self::render(template, item, "pipeline_version"));
80        Some(PartitionKey {
81            dbname,
82            table,
83            pipeline_name,
84            pipeline_version,
85        })
86    }
87}
88
89/// GreptimeDB logs HTTP request builder.
90#[derive(Debug, Clone)]
91pub(super) struct GreptimeDBLogsHttpRequestBuilder {
92    pub(super) endpoint: String,
93    pub(super) auth: Option<Auth>,
94    pub(super) encoder: (Transformer, Encoder<Framer>),
95    pub(super) compression: Compression,
96    pub(super) extra_params: Option<HashMap<String, String>>,
97    pub(super) extra_headers: Option<HashMap<String, String>>,
98}
99
100fn prepare_log_ingester_url(
101    endpoint: &str,
102    db: &str,
103    table: &str,
104    metadata: &PartitionKey,
105    extra_params: &Option<HashMap<String, String>>,
106) -> String {
107    let path = format!("{endpoint}/v1/events/logs");
108    let mut url = url::Url::parse(&path).unwrap();
109    let mut url_builder = url.query_pairs_mut();
110    url_builder
111        .append_pair("db", db)
112        .append_pair("table", table)
113        .append_pair("pipeline_name", &metadata.pipeline_name);
114
115    if let Some(pipeline_version) = metadata.pipeline_version.as_ref() {
116        url_builder.append_pair("pipeline_version", pipeline_version);
117    }
118
119    if let Some(extra_params) = extra_params.as_ref() {
120        for (key, value) in extra_params.iter() {
121            url_builder.append_pair(key, value);
122        }
123    }
124
125    url_builder.finish().to_string()
126}
127
128impl HttpServiceRequestBuilder<PartitionKey> for GreptimeDBLogsHttpRequestBuilder {
129    fn build(&self, mut request: HttpRequest<PartitionKey>) -> Result<Request<Bytes>, Error> {
130        let metadata = request.get_additional_metadata();
131        let table = metadata.table.clone();
132        let db = metadata.dbname.clone();
133
134        // prepare url
135        let url = prepare_log_ingester_url(
136            self.endpoint.as_str(),
137            &db,
138            &table,
139            metadata,
140            &self.extra_params,
141        );
142
143        // prepare body
144        let payload = request.take_payload();
145
146        let mut builder = Request::post(&url)
147            .header(CONTENT_TYPE, "application/x-ndjson")
148            .header(CONTENT_LENGTH, payload.len());
149
150        if let Some(extra_headers) = self.extra_headers.as_ref() {
151            for (key, value) in extra_headers.iter() {
152                builder = builder.header(key, value);
153            }
154        }
155
156        if let Some(ce) = self.compression.content_encoding() {
157            builder = builder.header(CONTENT_ENCODING, ce);
158        }
159
160        if let Some(auth) = self.auth.clone() {
161            builder = auth.apply_builder(builder);
162        }
163
164        builder
165            .body(payload)
166            .context(HTTPRequestBuilderSnafu)
167            .map_err(Into::into)
168    }
169}
170
171impl RequestBuilder<(PartitionKey, Vec<Event>)> for GreptimeDBLogsHttpRequestBuilder {
172    type Metadata = (PartitionKey, EventFinalizers);
173    type Events = Vec<Event>;
174    type Encoder = (Transformer, Encoder<Framer>);
175    type Payload = Bytes;
176    type Request = HttpRequest<PartitionKey>;
177    type Error = std::io::Error;
178
179    fn compression(&self) -> Compression {
180        self.compression
181    }
182
183    fn encoder(&self) -> &Self::Encoder {
184        &self.encoder
185    }
186
187    fn split_input(
188        &self,
189        input: (PartitionKey, Vec<Event>),
190    ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
191        let (key, mut events) = input;
192
193        let finalizers = events.take_finalizers();
194        let builder = RequestMetadataBuilder::from_events(&events);
195        ((key, finalizers), builder, events)
196    }
197
198    fn build_request(
199        &self,
200        metadata: Self::Metadata,
201        request_metadata: RequestMetadata,
202        payload: EncodeResult<Self::Payload>,
203    ) -> Self::Request {
204        let (key, finalizers) = metadata;
205        HttpRequest::new(
206            payload.into_payload(),
207            finalizers,
208            request_metadata,
209            PartitionKey {
210                dbname: key.dbname,
211                table: key.table,
212                pipeline_name: key.pipeline_name,
213                pipeline_version: key.pipeline_version,
214            },
215        )
216    }
217}
218
219pub(super) async fn http_healthcheck(
220    client: HttpClient,
221    endpoint: String,
222    auth: Option<Auth>,
223) -> crate::Result<()> {
224    let uri = format!("{endpoint}/health");
225    let mut request = Request::get(uri).body(Body::empty())?;
226
227    if let Some(auth) = auth {
228        auth.apply(&mut request);
229    }
230
231    let response = client.send(request).await?;
232
233    match response.status() {
234        StatusCode::OK => Ok(()),
235        status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
236    }
237}
238
239/// GreptimeDB HTTP retry logic.
240#[derive(Clone, Default)]
241pub(super) struct GreptimeDBHttpRetryLogic {
242    inner: HttpRetryLogic<HttpRequest<PartitionKey>>,
243}
244
245impl RetryLogic for GreptimeDBHttpRetryLogic {
246    type Error = HttpError;
247    type Request = HttpRequest<PartitionKey>;
248    type Response = HttpResponse;
249
250    fn is_retriable_error(&self, error: &Self::Error) -> bool {
251        error.is_retriable()
252    }
253
254    fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
255        self.inner.should_retry_response(&response.http_response)
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use super::*;
262
263    #[test]
264    fn test_prepare_url() {
265        let endpoint = "http://localhost:8080";
266        let db = "test_db";
267        let table = "test_table";
268        let metadata = PartitionKey {
269            dbname: "test_db".to_string(),
270            table: "test_table".to_string(),
271            pipeline_name: "test_pipeline".to_string(),
272            pipeline_version: Some("test_version".to_string()),
273        };
274        let params = vec![("param1", "value1"), ("param2", "value2")];
275        let extra_params = Some(
276            params
277                .into_iter()
278                .map(|(k, v)| (k.to_string(), v.to_string()))
279                .collect(),
280        );
281
282        let url = prepare_log_ingester_url(endpoint, db, table, &metadata, &extra_params);
283        let url = url::Url::parse(&url).unwrap();
284        let check = url.query_pairs().all(|(k, v)| match k.as_ref() {
285            "db" => v == "test_db",
286            "table" => v == "test_table",
287            "pipeline_name" => v == "test_pipeline",
288            "pipeline_version" => v == "test_version",
289            "param1" => v == "value1",
290            "param2" => v == "value2",
291            _ => false,
292        });
293        assert!(check);
294    }
295}