vector/sinks/honeycomb/
config.rs1use bytes::Bytes;
4use futures::FutureExt;
5use http::{Request, StatusCode, Uri};
6use vector_lib::configurable::configurable_component;
7use vector_lib::sensitive_string::SensitiveString;
8use vrl::value::Kind;
9
10use crate::{
11 http::HttpClient,
12 sinks::{
13 prelude::*,
14 util::{
15 http::{http_response_retry_logic, HttpService},
16 BatchConfig, BoxedRawValue,
17 },
18 },
19};
20
21use super::{
22 encoder::HoneycombEncoder, request_builder::HoneycombRequestBuilder,
23 service::HoneycombSvcRequestBuilder, sink::HoneycombSink,
24};
25
26pub(super) const HTTP_HEADER_HONEYCOMB: &str = "X-Honeycomb-Team";
27
28#[configurable_component(sink("honeycomb", "Deliver log events to Honeycomb."))]
30#[derive(Clone, Debug)]
31pub struct HoneycombConfig {
32 #[serde(default = "default_endpoint")]
34 #[configurable(metadata(
35 docs::examples = "https://api.honeycomb.io",
36 docs::examples = "https://api.eu1.honeycomb.io",
37 ))]
38 #[configurable(validation(format = "uri"))]
39 pub(super) endpoint: String,
40
41 #[configurable(metadata(docs::examples = "${HONEYCOMB_API_KEY}"))]
43 #[configurable(metadata(docs::examples = "some-api-key"))]
44 api_key: SensitiveString,
45
46 #[configurable(metadata(docs::examples = "my-honeycomb-dataset"))]
48 dataset: String,
51
52 #[configurable(derived)]
53 #[serde(default)]
54 batch: BatchConfig<HoneycombDefaultBatchSettings>,
55
56 #[configurable(derived)]
57 #[serde(default)]
58 request: TowerRequestConfig,
59
60 #[configurable(derived)]
61 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
62 encoding: Transformer,
63
64 #[configurable(derived)]
66 #[serde(default = "Compression::zstd_default")]
67 compression: Compression,
68
69 #[configurable(derived)]
70 #[serde(
71 default,
72 deserialize_with = "crate::serde::bool_or_struct",
73 skip_serializing_if = "crate::serde::is_default"
74 )]
75 acknowledgements: AcknowledgementsConfig,
76}
77
78fn default_endpoint() -> String {
79 "https://api.honeycomb.io".to_string()
80}
81
82#[derive(Clone, Copy, Debug, Default)]
83struct HoneycombDefaultBatchSettings;
84
85impl SinkBatchSettings for HoneycombDefaultBatchSettings {
86 const MAX_EVENTS: Option<usize> = None;
87 const MAX_BYTES: Option<usize> = Some(100_000);
88 const TIMEOUT_SECS: f64 = 1.0;
89}
90
91impl GenerateConfig for HoneycombConfig {
92 fn generate_config() -> toml::Value {
93 toml::from_str(
94 r#"api_key = "${HONEYCOMB_API_KEY}"
95 dataset = "my-honeycomb-dataset""#,
96 )
97 .unwrap()
98 }
99}
100
101#[async_trait::async_trait]
102#[typetag::serde(name = "honeycomb")]
103impl SinkConfig for HoneycombConfig {
104 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
105 let batch_settings = self.batch.validate()?.into_batcher_settings()?;
106
107 let request_builder = HoneycombRequestBuilder {
108 encoder: HoneycombEncoder {
109 transformer: self.encoding.clone(),
110 },
111 compression: self.compression,
112 };
113
114 let uri = self.build_uri()?;
115
116 let honeycomb_service_request_builder = HoneycombSvcRequestBuilder {
117 uri: uri.clone(),
118 api_key: self.api_key.clone(),
119 compression: self.compression,
120 };
121
122 let client = HttpClient::new(None, cx.proxy())?;
123
124 let service = HttpService::new(client.clone(), honeycomb_service_request_builder);
125
126 let request_limits = self.request.into_settings();
127
128 let service = ServiceBuilder::new()
129 .settings(request_limits, http_response_retry_logic())
130 .service(service);
131
132 let sink = HoneycombSink::new(service, batch_settings, request_builder);
133
134 let healthcheck = healthcheck(uri, self.api_key.clone(), client).boxed();
135
136 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
137 }
138
139 fn input(&self) -> Input {
140 let requirement = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
141
142 Input::log().with_schema_requirement(requirement)
143 }
144
145 fn acknowledgements(&self) -> &AcknowledgementsConfig {
146 &self.acknowledgements
147 }
148}
149
150impl HoneycombConfig {
151 fn build_uri(&self) -> crate::Result<Uri> {
152 let uri = format!(
153 "{}/1/batch/{}",
154 self.endpoint.trim_end_matches('/'),
155 self.dataset
156 );
157 uri.parse::<Uri>().map_err(Into::into)
158 }
159}
160
161async fn healthcheck(uri: Uri, api_key: SensitiveString, client: HttpClient) -> crate::Result<()> {
162 let request = Request::post(uri).header(HTTP_HEADER_HONEYCOMB, api_key.inner());
163 let body = crate::serde::json::to_bytes(&Vec::<BoxedRawValue>::new())
164 .unwrap()
165 .freeze();
166 let req: Request<Bytes> = request.body(body)?;
167 let req = req.map(hyper::Body::from);
168
169 let res = client.send(req).await?;
170
171 let status = res.status();
172 let body = hyper::body::to_bytes(res.into_body()).await?;
173
174 if status == StatusCode::BAD_REQUEST {
175 Ok(())
176 } else if status == StatusCode::UNAUTHORIZED {
177 let json: serde_json::Value = serde_json::from_slice(&body[..])?;
178
179 let message = if let Some(s) = json
180 .as_object()
181 .and_then(|o| o.get("error"))
182 .and_then(|s| s.as_str())
183 {
184 s.to_string()
185 } else {
186 "Token is not valid, 401 returned.".to_string()
187 };
188
189 Err(message.into())
190 } else {
191 let body = String::from_utf8_lossy(&body[..]);
192
193 Err(format!("Server returned unexpected error status: {status} body: {body}").into())
194 }
195}