vector/sinks/honeycomb/
config.rs1use bytes::Bytes;
4use futures::FutureExt;
5use http::{Request, StatusCode, Uri};
6use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
7use vrl::value::Kind;
8
9use super::{
10 encoder::HoneycombEncoder, request_builder::HoneycombRequestBuilder,
11 service::HoneycombSvcRequestBuilder, sink::HoneycombSink,
12};
13use crate::{
14 http::HttpClient,
15 sinks::{
16 prelude::*,
17 util::{
18 BatchConfig, BoxedRawValue,
19 http::{HttpService, http_response_retry_logic},
20 },
21 },
22};
23
24pub(super) const HTTP_HEADER_HONEYCOMB: &str = "X-Honeycomb-Team";
25
26#[configurable_component(sink("honeycomb", "Deliver log events to Honeycomb."))]
28#[derive(Clone, Debug)]
29pub struct HoneycombConfig {
30 #[serde(default = "default_endpoint")]
32 #[configurable(metadata(
33 docs::examples = "https://api.honeycomb.io",
34 docs::examples = "https://api.eu1.honeycomb.io",
35 ))]
36 #[configurable(validation(format = "uri"))]
37 pub(super) endpoint: String,
38
39 #[configurable(metadata(docs::examples = "${HONEYCOMB_API_KEY}"))]
41 #[configurable(metadata(docs::examples = "some-api-key"))]
42 api_key: SensitiveString,
43
44 #[configurable(metadata(docs::examples = "my-honeycomb-dataset"))]
46 dataset: String,
49
50 #[configurable(derived)]
51 #[serde(default)]
52 batch: BatchConfig<HoneycombDefaultBatchSettings>,
53
54 #[configurable(derived)]
55 #[serde(default)]
56 request: TowerRequestConfig,
57
58 #[configurable(derived)]
59 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
60 encoding: Transformer,
61
62 #[configurable(derived)]
64 #[serde(default = "Compression::zstd_default")]
65 compression: Compression,
66
67 #[configurable(derived)]
68 #[serde(
69 default,
70 deserialize_with = "crate::serde::bool_or_struct",
71 skip_serializing_if = "crate::serde::is_default"
72 )]
73 acknowledgements: AcknowledgementsConfig,
74}
75
76fn default_endpoint() -> String {
77 "https://api.honeycomb.io".to_string()
78}
79
80#[derive(Clone, Copy, Debug, Default)]
81struct HoneycombDefaultBatchSettings;
82
83impl SinkBatchSettings for HoneycombDefaultBatchSettings {
84 const MAX_EVENTS: Option<usize> = None;
85 const MAX_BYTES: Option<usize> = Some(100_000);
86 const TIMEOUT_SECS: f64 = 1.0;
87}
88
89impl GenerateConfig for HoneycombConfig {
90 fn generate_config() -> toml::Value {
91 toml::from_str(
92 r#"api_key = "${HONEYCOMB_API_KEY}"
93 dataset = "my-honeycomb-dataset""#,
94 )
95 .unwrap()
96 }
97}
98
99#[async_trait::async_trait]
100#[typetag::serde(name = "honeycomb")]
101impl SinkConfig for HoneycombConfig {
102 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
103 let batch_settings = self.batch.validate()?.into_batcher_settings()?;
104
105 let request_builder = HoneycombRequestBuilder {
106 encoder: HoneycombEncoder {
107 transformer: self.encoding.clone(),
108 },
109 compression: self.compression,
110 };
111
112 let uri = self.build_uri()?;
113
114 let honeycomb_service_request_builder = HoneycombSvcRequestBuilder {
115 uri: uri.clone(),
116 api_key: self.api_key.clone(),
117 compression: self.compression,
118 };
119
120 let client = HttpClient::new(None, cx.proxy())?;
121
122 let service = HttpService::new(client.clone(), honeycomb_service_request_builder);
123
124 let request_limits = self.request.into_settings();
125
126 let service = ServiceBuilder::new()
127 .settings(request_limits, http_response_retry_logic())
128 .service(service);
129
130 let sink = HoneycombSink::new(service, batch_settings, request_builder);
131
132 let healthcheck = healthcheck(uri, self.api_key.clone(), client).boxed();
133
134 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
135 }
136
137 fn input(&self) -> Input {
138 let requirement = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
139
140 Input::log().with_schema_requirement(requirement)
141 }
142
143 fn acknowledgements(&self) -> &AcknowledgementsConfig {
144 &self.acknowledgements
145 }
146}
147
148impl HoneycombConfig {
149 fn build_uri(&self) -> crate::Result<Uri> {
150 let uri = format!(
151 "{}/1/batch/{}",
152 self.endpoint.trim_end_matches('/'),
153 self.dataset
154 );
155 uri.parse::<Uri>().map_err(Into::into)
156 }
157}
158
159async fn healthcheck(uri: Uri, api_key: SensitiveString, client: HttpClient) -> crate::Result<()> {
160 let request = Request::post(uri).header(HTTP_HEADER_HONEYCOMB, api_key.inner());
161 let body = crate::serde::json::to_bytes(&Vec::<BoxedRawValue>::new())
162 .unwrap()
163 .freeze();
164 let req: Request<Bytes> = request.body(body)?;
165 let req = req.map(hyper::Body::from);
166
167 let res = client.send(req).await?;
168
169 let status = res.status();
170 let body = hyper::body::to_bytes(res.into_body()).await?;
171
172 if status == StatusCode::BAD_REQUEST {
173 Ok(())
174 } else if status == StatusCode::UNAUTHORIZED {
175 let json: serde_json::Value = serde_json::from_slice(&body[..])?;
176
177 let message = if let Some(s) = json
178 .as_object()
179 .and_then(|o| o.get("error"))
180 .and_then(|s| s.as_str())
181 {
182 s.to_string()
183 } else {
184 "Token is not valid, 401 returned.".to_string()
185 };
186
187 Err(message.into())
188 } else {
189 let body = String::from_utf8_lossy(&body[..]);
190
191 Err(format!("Server returned unexpected error status: {status} body: {body}").into())
192 }
193}