vector/sinks/greptimedb/logs/
http_request_builder.rs1use crate::{
2 codecs::{Encoder, Transformer},
3 event::{Event, EventFinalizers, Finalizable},
4 http::{Auth, HttpClient, HttpError},
5 sinks::{
6 prelude::*,
7 util::http::{HttpRequest, HttpResponse, HttpRetryLogic, HttpServiceRequestBuilder},
8 HTTPRequestBuilderSnafu, HealthcheckError,
9 },
10 Error,
11};
12use bytes::Bytes;
13use http::{
14 header::{CONTENT_ENCODING, CONTENT_LENGTH, CONTENT_TYPE},
15 Request, StatusCode,
16};
17use hyper::Body;
18use snafu::ResultExt;
19use std::collections::HashMap;
20use vector_lib::codecs::encoding::Framer;
21
22#[derive(Hash, Eq, PartialEq, Clone, Debug)]
24pub(super) struct PartitionKey {
25 pub dbname: String,
26 pub table: String,
27 pub pipeline_name: String,
28 pub pipeline_version: Option<String>,
29}
30
31pub(super) struct KeyPartitioner {
33 dbname: Template,
34 table: Template,
35 pipeline_name: Template,
36 pipeline_version: Option<Template>,
37}
38
39impl KeyPartitioner {
40 pub const fn new(
41 db: Template,
42 table: Template,
43 pipeline_name: Template,
44 pipeline_version: Option<Template>,
45 ) -> Self {
46 Self {
47 dbname: db,
48 table,
49 pipeline_name,
50 pipeline_version,
51 }
52 }
53
54 fn render(template: &Template, item: &Event, field: &'static str) -> Option<String> {
55 template
56 .render_string(item)
57 .map_err(|error| {
58 emit!(TemplateRenderingError {
59 error,
60 field: Some(field),
61 drop_event: true,
62 });
63 })
64 .ok()
65 }
66}
67
68impl Partitioner for KeyPartitioner {
69 type Item = Event;
70 type Key = Option<PartitionKey>;
71
72 fn partition(&self, item: &Self::Item) -> Self::Key {
73 let dbname = Self::render(&self.dbname, item, "dbname_key")?;
74 let table = Self::render(&self.table, item, "table_key")?;
75 let pipeline_name = Self::render(&self.pipeline_name, item, "pipeline_name")?;
76 let pipeline_version = self
77 .pipeline_version
78 .as_ref()
79 .and_then(|template| Self::render(template, item, "pipeline_version"));
80 Some(PartitionKey {
81 dbname,
82 table,
83 pipeline_name,
84 pipeline_version,
85 })
86 }
87}
88
89#[derive(Debug, Clone)]
91pub(super) struct GreptimeDBLogsHttpRequestBuilder {
92 pub(super) endpoint: String,
93 pub(super) auth: Option<Auth>,
94 pub(super) encoder: (Transformer, Encoder<Framer>),
95 pub(super) compression: Compression,
96 pub(super) extra_params: Option<HashMap<String, String>>,
97 pub(super) extra_headers: Option<HashMap<String, String>>,
98}
99
100fn prepare_log_ingester_url(
101 endpoint: &str,
102 db: &str,
103 table: &str,
104 metadata: &PartitionKey,
105 extra_params: &Option<HashMap<String, String>>,
106) -> String {
107 let path = format!("{endpoint}/v1/events/logs");
108 let mut url = url::Url::parse(&path).unwrap();
109 let mut url_builder = url.query_pairs_mut();
110 url_builder
111 .append_pair("db", db)
112 .append_pair("table", table)
113 .append_pair("pipeline_name", &metadata.pipeline_name);
114
115 if let Some(pipeline_version) = metadata.pipeline_version.as_ref() {
116 url_builder.append_pair("pipeline_version", pipeline_version);
117 }
118
119 if let Some(extra_params) = extra_params.as_ref() {
120 for (key, value) in extra_params.iter() {
121 url_builder.append_pair(key, value);
122 }
123 }
124
125 url_builder.finish().to_string()
126}
127
128impl HttpServiceRequestBuilder<PartitionKey> for GreptimeDBLogsHttpRequestBuilder {
129 fn build(&self, mut request: HttpRequest<PartitionKey>) -> Result<Request<Bytes>, Error> {
130 let metadata = request.get_additional_metadata();
131 let table = metadata.table.clone();
132 let db = metadata.dbname.clone();
133
134 let url = prepare_log_ingester_url(
136 self.endpoint.as_str(),
137 &db,
138 &table,
139 metadata,
140 &self.extra_params,
141 );
142
143 let payload = request.take_payload();
145
146 let mut builder = Request::post(&url)
147 .header(CONTENT_TYPE, "application/x-ndjson")
148 .header(CONTENT_LENGTH, payload.len());
149
150 if let Some(extra_headers) = self.extra_headers.as_ref() {
151 for (key, value) in extra_headers.iter() {
152 builder = builder.header(key, value);
153 }
154 }
155
156 if let Some(ce) = self.compression.content_encoding() {
157 builder = builder.header(CONTENT_ENCODING, ce);
158 }
159
160 if let Some(auth) = self.auth.clone() {
161 builder = auth.apply_builder(builder);
162 }
163
164 builder
165 .body(payload)
166 .context(HTTPRequestBuilderSnafu)
167 .map_err(Into::into)
168 }
169}
170
171impl RequestBuilder<(PartitionKey, Vec<Event>)> for GreptimeDBLogsHttpRequestBuilder {
172 type Metadata = (PartitionKey, EventFinalizers);
173 type Events = Vec<Event>;
174 type Encoder = (Transformer, Encoder<Framer>);
175 type Payload = Bytes;
176 type Request = HttpRequest<PartitionKey>;
177 type Error = std::io::Error;
178
179 fn compression(&self) -> Compression {
180 self.compression
181 }
182
183 fn encoder(&self) -> &Self::Encoder {
184 &self.encoder
185 }
186
187 fn split_input(
188 &self,
189 input: (PartitionKey, Vec<Event>),
190 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
191 let (key, mut events) = input;
192
193 let finalizers = events.take_finalizers();
194 let builder = RequestMetadataBuilder::from_events(&events);
195 ((key, finalizers), builder, events)
196 }
197
198 fn build_request(
199 &self,
200 metadata: Self::Metadata,
201 request_metadata: RequestMetadata,
202 payload: EncodeResult<Self::Payload>,
203 ) -> Self::Request {
204 let (key, finalizers) = metadata;
205 HttpRequest::new(
206 payload.into_payload(),
207 finalizers,
208 request_metadata,
209 PartitionKey {
210 dbname: key.dbname,
211 table: key.table,
212 pipeline_name: key.pipeline_name,
213 pipeline_version: key.pipeline_version,
214 },
215 )
216 }
217}
218
219pub(super) async fn http_healthcheck(
220 client: HttpClient,
221 endpoint: String,
222 auth: Option<Auth>,
223) -> crate::Result<()> {
224 let uri = format!("{endpoint}/health");
225 let mut request = Request::get(uri).body(Body::empty())?;
226
227 if let Some(auth) = auth {
228 auth.apply(&mut request);
229 }
230
231 let response = client.send(request).await?;
232
233 match response.status() {
234 StatusCode::OK => Ok(()),
235 status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
236 }
237}
238
239#[derive(Clone, Default)]
241pub(super) struct GreptimeDBHttpRetryLogic {
242 inner: HttpRetryLogic<HttpRequest<PartitionKey>>,
243}
244
245impl RetryLogic for GreptimeDBHttpRetryLogic {
246 type Error = HttpError;
247 type Request = HttpRequest<PartitionKey>;
248 type Response = HttpResponse;
249
250 fn is_retriable_error(&self, error: &Self::Error) -> bool {
251 error.is_retriable()
252 }
253
254 fn should_retry_response(&self, response: &Self::Response) -> RetryAction<Self::Request> {
255 self.inner.should_retry_response(&response.http_response)
256 }
257}
258
259#[cfg(test)]
260mod tests {
261 use super::*;
262
263 #[test]
264 fn test_prepare_url() {
265 let endpoint = "http://localhost:8080";
266 let db = "test_db";
267 let table = "test_table";
268 let metadata = PartitionKey {
269 dbname: "test_db".to_string(),
270 table: "test_table".to_string(),
271 pipeline_name: "test_pipeline".to_string(),
272 pipeline_version: Some("test_version".to_string()),
273 };
274 let params = vec![("param1", "value1"), ("param2", "value2")];
275 let extra_params = Some(
276 params
277 .into_iter()
278 .map(|(k, v)| (k.to_string(), v.to_string()))
279 .collect(),
280 );
281
282 let url = prepare_log_ingester_url(endpoint, db, table, &metadata, &extra_params);
283 let url = url::Url::parse(&url).unwrap();
284 let check = url.query_pairs().all(|(k, v)| match k.as_ref() {
285 "db" => v == "test_db",
286 "table" => v == "test_table",
287 "pipeline_name" => v == "test_pipeline",
288 "pipeline_version" => v == "test_version",
289 "param1" => v == "value1",
290 "param2" => v == "value2",
291 _ => false,
292 });
293 assert!(check);
294 }
295}