vector/sinks/greptimedb/logs/
http_request_builder.rs1use std::collections::HashMap;
2
3use bytes::Bytes;
4use http::{
5 Request, StatusCode,
6 header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
7};
8use hyper::Body;
9use snafu::ResultExt;
10use vector_lib::codecs::encoding::Framer;
11
12use crate::{
13 Error,
14 codecs::{Encoder, Transformer},
15 event::{Event, EventFinalizers, Finalizable},
16 http::{Auth, HttpClient, HttpError},
17 sinks::{
18 HTTPRequestBuilderSnafu, HealthcheckError,
19 prelude::*,
20 util::http::{HttpRequest, HttpResponse, HttpRetryLogic, HttpServiceRequestBuilder},
21 },
22};
23
24#[derive(Hash, Eq, PartialEq, Clone, Debug)]
26pub(super) struct PartitionKey {
27 pub dbname: String,
28 pub table: String,
29 pub pipeline_name: String,
30 pub pipeline_version: Option<String>,
31}
32
33pub(super) struct KeyPartitioner {
35 dbname: Template,
36 table: Template,
37 pipeline_name: Template,
38 pipeline_version: Option<Template>,
39}
40
41impl KeyPartitioner {
42 pub const fn new(
43 db: Template,
44 table: Template,
45 pipeline_name: Template,
46 pipeline_version: Option<Template>,
47 ) -> Self {
48 Self {
49 dbname: db,
50 table,
51 pipeline_name,
52 pipeline_version,
53 }
54 }
55
56 fn render(template: &Template, item: &Event, field: &'static str) -> Option<String> {
57 template
58 .render_string(item)
59 .map_err(|error| {
60 emit!(TemplateRenderingError {
61 error,
62 field: Some(field),
63 drop_event: true,
64 });
65 })
66 .ok()
67 }
68}
69
70impl Partitioner for KeyPartitioner {
71 type Item = Event;
72 type Key = Option<PartitionKey>;
73
74 fn partition(&self, item: &Self::Item) -> Self::Key {
75 let dbname = Self::render(&self.dbname, item, "dbname_key")?;
76 let table = Self::render(&self.table, item, "table_key")?;
77 let pipeline_name = Self::render(&self.pipeline_name, item, "pipeline_name")?;
78 let pipeline_version = self
79 .pipeline_version
80 .as_ref()
81 .and_then(|template| Self::render(template, item, "pipeline_version"));
82 Some(PartitionKey {
83 dbname,
84 table,
85 pipeline_name,
86 pipeline_version,
87 })
88 }
89}
90
91#[derive(Debug, Clone)]
93pub(super) struct GreptimeDBLogsHttpRequestBuilder {
94 pub(super) endpoint: String,
95 pub(super) auth: Option<Auth>,
96 pub(super) encoder: (Transformer, Encoder<Framer>),
97 pub(super) compression: Compression,
98 pub(super) extra_params: Option<HashMap<String, String>>,
99 pub(super) extra_headers: Option<HashMap<String, String>>,
100}
101
102fn prepare_log_ingester_url(
103 endpoint: &str,
104 db: &str,
105 table: &str,
106 metadata: &PartitionKey,
107 extra_params: &Option<HashMap<String, String>>,
108) -> String {
109 let path = format!("{endpoint}/v1/events/logs");
110 let mut url = url::Url::parse(&path).unwrap();
111 let mut url_builder = url.query_pairs_mut();
112 url_builder
113 .append_pair("db", db)
114 .append_pair("table", table)
115 .append_pair("pipeline_name", &metadata.pipeline_name);
116
117 if let Some(pipeline_version) = metadata.pipeline_version.as_ref() {
118 url_builder.append_pair("pipeline_version", pipeline_version);
119 }
120
121 if let Some(extra_params) = extra_params.as_ref() {
122 for (key, value) in extra_params.iter() {
123 url_builder.append_pair(key, value);
124 }
125 }
126
127 url_builder.finish().to_string()
128}
129
130impl HttpServiceRequestBuilder<PartitionKey> for GreptimeDBLogsHttpRequestBuilder {
131 fn build(&self, mut request: HttpRequest<PartitionKey>) -> Result<Request<Bytes>, Error> {
132 let metadata = request.get_additional_metadata();
133 let table = metadata.table.clone();
134 let db = metadata.dbname.clone();
135
136 let url = prepare_log_ingester_url(
138 self.endpoint.as_str(),
139 &db,
140 &table,
141 metadata,
142 &self.extra_params,
143 );
144
145 let payload = request.take_payload();
147
148 let mut builder = Request::post(&url)
149 .header(CONTENT_TYPE, "application/x-ndjson")
150 .header(CONTENT_LENGTH, payload.len());
151
152 if let Some(extra_headers) = self.extra_headers.as_ref() {
153 for (key, value) in extra_headers.iter() {
154 builder = builder.header(key, value);
155 }
156 }
157
158 if let Some(ce) = self.compression.content_encoding() {
159 builder = builder.header(CONTENT_ENCODING, ce);
160 }
161
162 if let Some(auth) = self.auth.clone() {
163 builder = auth.apply_builder(builder);
164 }
165
166 builder
167 .body(payload)
168 .context(HTTPRequestBuilderSnafu)
169 .map_err(Into::into)
170 }
171}
172
173impl RequestBuilder<(PartitionKey, Vec<Event>)> for GreptimeDBLogsHttpRequestBuilder {
174 type Metadata = (PartitionKey, EventFinalizers);
175 type Events = Vec<Event>;
176 type Encoder = (Transformer, Encoder<Framer>);
177 type Payload = Bytes;
178 type Request = HttpRequest<PartitionKey>;
179 type Error = std::io::Error;
180
181 fn compression(&self) -> Compression {
182 self.compression
183 }
184
185 fn encoder(&self) -> &Self::Encoder {
186 &self.encoder
187 }
188
189 fn split_input(
190 &self,
191 input: (PartitionKey, Vec<Event>),
192 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
193 let (key, mut events) = input;
194
195 let finalizers = events.take_finalizers();
196 let builder = RequestMetadataBuilder::from_events(&events);
197 ((key, finalizers), builder, events)
198 }
199
200 fn build_request(
201 &self,
202 metadata: Self::Metadata,
203 request_metadata: RequestMetadata,
204 payload: EncodeResult<Self::Payload>,
205 ) -> Self::Request {
206 let (key, finalizers) = metadata;
207 HttpRequest::new(
208 payload.into_payload(),
209 finalizers,
210 request_metadata,
211 PartitionKey {
212 dbname: key.dbname,
213 table: key.table,
214 pipeline_name: key.pipeline_name,
215 pipeline_version: key.pipeline_version,
216 },
217 )
218 }
219}
220
221pub(super) async fn http_healthcheck(
222 client: HttpClient,
223 endpoint: String,
224 auth: Option<Auth>,
225) -> crate::Result<()> {
226 let uri = format!("{endpoint}/health");
227 let mut request = Request::get(uri).body(Body::empty())?;
228
229 if let Some(auth) = auth {
230 auth.apply(&mut request);
231 }
232
233 let response = client.send(request).await?;
234
235 match response.status() {
236 StatusCode::OK => Ok(()),
237 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
238 }
239}
240
241#[derive(Clone, Default)]
243pub(super) struct GreptimeDBHttpRetryLogic {
244 inner: HttpRetryLogic<HttpRequest<PartitionKey>>,
245}
246
247impl RetryLogic for GreptimeDBHttpRetryLogic {
248 type Error = HttpError;
249 type Request = HttpRequest<PartitionKey>;
250 type Response = HttpResponse;
251
252 fn is_retriable_error(&self, error: &Self::Error) -> bool {
253 error.is_retriable()
254 }
255
256 fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
257 self.inner.should_retry_response(&response.http_response)
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use super::*;
264
265 #[test]
266 fn test_prepare_url() {
267 let endpoint = "http://localhost:8080";
268 let db = "test_db";
269 let table = "test_table";
270 let metadata = PartitionKey {
271 dbname: "test_db".to_string(),
272 table: "test_table".to_string(),
273 pipeline_name: "test_pipeline".to_string(),
274 pipeline_version: Some("test_version".to_string()),
275 };
276 let params = vec![("param1", "value1"), ("param2", "value2")];
277 let extra_params = Some(
278 params
279 .into_iter()
280 .map(|(k, v)| (k.to_string(), v.to_string()))
281 .collect(),
282 );
283
284 let url = prepare_log_ingester_url(endpoint, db, table, &metadata, &extra_params);
285 let url = url::Url::parse(&url).unwrap();
286 let check = url.query_pairs().all(|(k, v)| match k.as_ref() {
287 "db" => v == "test_db",
288 "table" => v == "test_table",
289 "pipeline_name" => v == "test_pipeline",
290 "pipeline_version" => v == "test_version",
291 "param1" => v == "value1",
292 "param2" => v == "value2",
293 _ => false,
294 });
295 assert!(check);
296 }
297}