vector/sinks/honeycomb/
config.rs

1//! Configuration for the `honeycomb` sink.
2
3use bytes::Bytes;
4use futures::FutureExt;
5use http::{Request, StatusCode, Uri};
6use vector_lib::configurable::configurable_component;
7use vector_lib::sensitive_string::SensitiveString;
8use vrl::value::Kind;
9
10use crate::{
11    http::HttpClient,
12    sinks::{
13        prelude::*,
14        util::{
15            http::{http_response_retry_logic, HttpService},
16            BatchConfig, BoxedRawValue,
17        },
18    },
19};
20
21use super::{
22    encoder::HoneycombEncoder, request_builder::HoneycombRequestBuilder,
23    service::HoneycombSvcRequestBuilder, sink::HoneycombSink,
24};
25
26pub(super) const HTTP_HEADER_HONEYCOMB: &str = "X-Honeycomb-Team";
27
28/// Configuration for the `honeycomb` sink.
29#[configurable_component(sink("honeycomb", "Deliver log events to Honeycomb."))]
30#[derive(Clone, Debug)]
31pub struct HoneycombConfig {
32    /// Honeycomb's endpoint to send logs to
33    #[serde(default = "default_endpoint")]
34    #[configurable(metadata(
35        docs::examples = "https://api.honeycomb.io",
36        docs::examples = "https://api.eu1.honeycomb.io",
37    ))]
38    #[configurable(validation(format = "uri"))]
39    pub(super) endpoint: String,
40
41    /// The API key that is used to authenticate against Honeycomb.
42    #[configurable(metadata(docs::examples = "${HONEYCOMB_API_KEY}"))]
43    #[configurable(metadata(docs::examples = "some-api-key"))]
44    api_key: SensitiveString,
45
46    /// The dataset to which logs are sent.
47    #[configurable(metadata(docs::examples = "my-honeycomb-dataset"))]
48    // TODO: we probably want to make this a template
49    // but this limits us in how we can do our healthcheck.
50    dataset: String,
51
52    #[configurable(derived)]
53    #[serde(default)]
54    batch: BatchConfig<HoneycombDefaultBatchSettings>,
55
56    #[configurable(derived)]
57    #[serde(default)]
58    request: TowerRequestConfig,
59
60    #[configurable(derived)]
61    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
62    encoding: Transformer,
63
64    /// The compression algorithm to use.
65    #[configurable(derived)]
66    #[serde(default = "Compression::zstd_default")]
67    compression: Compression,
68
69    #[configurable(derived)]
70    #[serde(
71        default,
72        deserialize_with = "crate::serde::bool_or_struct",
73        skip_serializing_if = "crate::serde::is_default"
74    )]
75    acknowledgements: AcknowledgementsConfig,
76}
77
78fn default_endpoint() -> String {
79    "https://api.honeycomb.io".to_string()
80}
81
82#[derive(Clone, Copy, Debug, Default)]
83struct HoneycombDefaultBatchSettings;
84
85impl SinkBatchSettings for HoneycombDefaultBatchSettings {
86    const MAX_EVENTS: Option<usize> = None;
87    const MAX_BYTES: Option<usize> = Some(100_000);
88    const TIMEOUT_SECS: f64 = 1.0;
89}
90
91impl GenerateConfig for HoneycombConfig {
92    fn generate_config() -> toml::Value {
93        toml::from_str(
94            r#"api_key = "${HONEYCOMB_API_KEY}"
95            dataset = "my-honeycomb-dataset""#,
96        )
97        .unwrap()
98    }
99}
100
101#[async_trait::async_trait]
102#[typetag::serde(name = "honeycomb")]
103impl SinkConfig for HoneycombConfig {
104    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
105        let batch_settings = self.batch.validate()?.into_batcher_settings()?;
106
107        let request_builder = HoneycombRequestBuilder {
108            encoder: HoneycombEncoder {
109                transformer: self.encoding.clone(),
110            },
111            compression: self.compression,
112        };
113
114        let uri = self.build_uri()?;
115
116        let honeycomb_service_request_builder = HoneycombSvcRequestBuilder {
117            uri: uri.clone(),
118            api_key: self.api_key.clone(),
119            compression: self.compression,
120        };
121
122        let client = HttpClient::new(None, cx.proxy())?;
123
124        let service = HttpService::new(client.clone(), honeycomb_service_request_builder);
125
126        let request_limits = self.request.into_settings();
127
128        let service = ServiceBuilder::new()
129            .settings(request_limits, http_response_retry_logic())
130            .service(service);
131
132        let sink = HoneycombSink::new(service, batch_settings, request_builder);
133
134        let healthcheck = healthcheck(uri, self.api_key.clone(), client).boxed();
135
136        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
137    }
138
139    fn input(&self) -> Input {
140        let requirement = Requirement::empty().optional_meaning("timestamp", Kind::timestamp());
141
142        Input::log().with_schema_requirement(requirement)
143    }
144
145    fn acknowledgements(&self) -> &AcknowledgementsConfig {
146        &self.acknowledgements
147    }
148}
149
150impl HoneycombConfig {
151    fn build_uri(&self) -> crate::Result<Uri> {
152        let uri = format!(
153            "{}/1/batch/{}",
154            self.endpoint.trim_end_matches('/'),
155            self.dataset
156        );
157        uri.parse::<Uri>().map_err(Into::into)
158    }
159}
160
161async fn healthcheck(uri: Uri, api_key: SensitiveString, client: HttpClient) -> crate::Result<()> {
162    let request = Request::post(uri).header(HTTP_HEADER_HONEYCOMB, api_key.inner());
163    let body = crate::serde::json::to_bytes(&Vec::<BoxedRawValue>::new())
164        .unwrap()
165        .freeze();
166    let req: Request<Bytes> = request.body(body)?;
167    let req = req.map(hyper::Body::from);
168
169    let res = client.send(req).await?;
170
171    let status = res.status();
172    let body = hyper::body::to_bytes(res.into_body()).await?;
173
174    if status == StatusCode::BAD_REQUEST {
175        Ok(())
176    } else if status == StatusCode::UNAUTHORIZED {
177        let json: serde_json::Value = serde_json::from_slice(&body[..])?;
178
179        let message = if let Some(s) = json
180            .as_object()
181            .and_then(|o| o.get("error"))
182            .and_then(|s| s.as_str())
183        {
184            s.to_string()
185        } else {
186            "Token is not valid, 401 returned.".to_string()
187        };
188
189        Err(message.into())
190    } else {
191        let body = String::from_utf8_lossy(&body[..]);
192
193        Err(format!("Server returned unexpected error status: {status} body: {body}").into())
194    }
195}