vector/sinks/honeycomb/
config.rs

1//! Configuration for the `honeycomb` sink.
2
3use bytes::Bytes;
4use futures::FutureExt;
5use http::{Request, StatusCode, Uri};
6use vector_lib::{configurable::configurable_component, sensitive_string::SensitiveString};
7use vrl::value::Kind;
8
9use super::{
10    encoder::HoneycombEncoder, request_builder::HoneycombRequestBuilder,
11    service::HoneycombSvcRequestBuilder, sink::HoneycombSink,
12};
13use crate::{
14    http::HttpClient,
15    sinks::{
16        prelude::*,
17        util::{
18            BatchConfig, BoxedRawValue,
19            http::{HttpService, http_response_retry_logic},
20        },
21    },
22};
23
24pub(super) const HTTP_HEADER_HONEYCOMB: &str = "X-Honeycomb-Team";
25
26/// Configuration for the `honeycomb` sink.
27#[configurable_component(sink("honeycomb", "Deliver log events to Honeycomb."))]
28#[derive(Clone, Debug)]
29pub struct HoneycombConfig {
30    /// Honeycomb's endpoint to send logs to
31    #[serde(default = "default_endpoint")]
32    #[configurable(metadata(
33        docs::examples = "https://api.honeycomb.io",
34        docs::examples = "https://api.eu1.honeycomb.io",
35    ))]
36    #[configurable(validation(format = "uri"))]
37    pub(super) endpoint: String,
38
39    /// The API key that is used to authenticate against Honeycomb.
40    #[configurable(metadata(docs::examples = "${HONEYCOMB_API_KEY}"))]
41    #[configurable(metadata(docs::examples = "some-api-key"))]
42    api_key: SensitiveString,
43
44    /// The dataset to which logs are sent.
45    #[configurable(metadata(docs::examples = "my-honeycomb-dataset"))]
46    // TODO: we probably want to make this a template
47    // but this limits us in how we can do our healthcheck.
48    dataset: String,
49
50    #[configurable(derived)]
51    #[serde(default)]
52    batch: BatchConfig<HoneycombDefaultBatchSettings>,
53
54    #[configurable(derived)]
55    #[serde(default)]
56    request: TowerRequestConfig,
57
58    #[configurable(derived)]
59    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
60    encoding: Transformer,
61
62    /// The compression algorithm to use.
63    #[configurable(derived)]
64    #[serde(default = "Compression::zstd_default")]
65    compression: Compression,
66
67    #[configurable(derived)]
68    #[serde(
69        default,
70        deserialize_with = "crate::serde::bool_or_struct",
71        skip_serializing_if = "crate::serde::is_default"
72    )]
73    acknowledgements: AcknowledgementsConfig,
74}
75
76fn default_endpoint() -> String {
77    "https://api.honeycomb.io".to_string()
78}
79
80#[derive(Clone, Copy, Debug, Default)]
81struct HoneycombDefaultBatchSettings;
82
83impl SinkBatchSettings for HoneycombDefaultBatchSettings {
84    const MAX_EVENTS: Option<usize> = None;
85    const MAX_BYTES: Option<usize> = Some(100_000);
86    const TIMEOUT_SECS: f64 = 1.0;
87}
88
89impl GenerateConfig for HoneycombConfig {
90    fn generate_config() -> toml::Value {
91        toml::from_str(
92            r#"api_key = "${HONEYCOMB_API_KEY}"
93            dataset = "my-honeycomb-dataset""#,
94        )
95        .unwrap()
96    }
97}
98
99#[async_trait::async_trait]
100#[typetag::serde(name = "honeycomb")]
101impl SinkConfig for HoneycombConfig {
102    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
103        let batch_settings = self.batch.validate()?.into_batcher_settings()?;
104
105        let request_builder = HoneycombRequestBuilder {
106            encoder: HoneycombEncoder {
107                transformer: self.encoding.clone(),
108            },
109            compression: self.compression,
110        };
111
112        let uri = self.build_uri()?;
113
114        let honeycomb_service_request_builder = HoneycombSvcRequestBuilder {
115            uri: uri.clone(),
116            api_key: self.api_key.clone(),
117            compression: self.compression,
118        };
119
120        let client = HttpClient::new(None, cx.proxy())?;
121
122        let service = HttpService::new(client.clone(), honeycomb_service_request_builder);
123
124        let request_limits = self.request.into_settings();
125
126        let service = ServiceBuilder::new()
127            .settings(request_limits, http_response_retry_logic())
128            .service(service);
129
130        let sink = HoneycombSink::new(service, batch_settings, request_builder);
131
132        let healthcheck = healthcheck(uri, self.api_key.clone(), client).boxed();
133
134        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
135    }
136
137    fn input(&self) -> Input {
138        let requirement = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
139
140        Input::log().with_schema_requirement(requirement)
141    }
142
143    fn acknowledgements(&self) -> &AcknowledgementsConfig {
144        &self.acknowledgements
145    }
146}
147
148impl HoneycombConfig {
149    fn build_uri(&self) -> crate::Result<Uri> {
150        let uri = format!(
151            "{}/1/batch/{}",
152            self.endpoint.trim_end_matches('/'),
153            self.dataset
154        );
155        uri.parse::<Uri>().map_err(Into::into)
156    }
157}
158
159async fn healthcheck(uri: Uri, api_key: SensitiveString, client: HttpClient) -> crate::Result<()> {
160    let request = Request::post(uri).header(HTTP_HEADER_HONEYCOMB, api_key.inner());
161    let body = crate::serde::json::to_bytes(&Vec::<BoxedRawValue>::new())
162        .unwrap()
163        .freeze();
164    let req: Request<Bytes> = request.body(body)?;
165    let req = req.map(hyper::Body::from);
166
167    let res = client.send(req).await?;
168
169    let status = res.status();
170    let body = hyper::body::to_bytes(res.into_body()).await?;
171
172    if status == StatusCode::BAD_REQUEST {
173        Ok(())
174    } else if status == StatusCode::UNAUTHORIZED {
175        let json: serde_json::Value = serde_json::from_slice(&body[..])?;
176
177        let message = if let Some(s) = json
178            .as_object()
179            .and_then(|o| o.get("error"))
180            .and_then(|s| s.as_str())
181        {
182            s.to_string()
183        } else {
184            "Token is not valid, 401 returned.".to_string()
185        };
186
187        Err(message.into())
188    } else {
189        let body = String::from_utf8_lossy(&body[..]);
190
191        Err(format!("Server returned unexpected error status: {status} body: {body}").into())
192    }
193}