vector/sinks/appsignal/
config.rs

1use futures::FutureExt;
2use http::{header::AUTHORIZATION, Request, Uri};
3use hyper::Body;
4use tower::ServiceBuilder;
5use vector_lib::configurable::configurable_component;
6use vector_lib::sensitive_string::SensitiveString;
7use vector_lib::{
8    config::{proxy::ProxyConfig, AcknowledgementsConfig, DataType, Input},
9    tls::{MaybeTlsSettings, TlsEnableableConfig},
10};
11
12use crate::{
13    codecs::Transformer,
14    http::HttpClient,
15    sinks::{
16        prelude::{SinkConfig, SinkContext},
17        util::{
18            http::HttpStatusRetryLogic, BatchConfig, Compression, ServiceBuilderExt,
19            SinkBatchSettings, TowerRequestConfig,
20        },
21        BuildError, Healthcheck, HealthcheckError, VectorSink,
22    },
23};
24
25use super::{
26    service::{AppsignalResponse, AppsignalService},
27    sink::AppsignalSink,
28};
29
30/// Configuration for the `appsignal` sink.
31#[configurable_component(sink("appsignal", "Deliver log and metric event data to AppSignal."))]
32#[derive(Clone, Debug, Default)]
33pub(super) struct AppsignalConfig {
34    /// The URI for the AppSignal API to send data to.
35    #[configurable(validation(format = "uri"))]
36    #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))]
37    #[serde(default = "default_endpoint")]
38    pub(super) endpoint: String,
39
40    /// A valid app-level AppSignal Push API key.
41    #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
42    #[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))]
43    push_api_key: SensitiveString,
44
45    #[configurable(derived)]
46    #[serde(default = "Compression::gzip_default")]
47    compression: Compression,
48
49    #[configurable(derived)]
50    #[serde(default)]
51    batch: BatchConfig<AppsignalDefaultBatchSettings>,
52
53    #[configurable(derived)]
54    #[serde(default)]
55    request: TowerRequestConfig,
56
57    #[configurable(derived)]
58    tls: Option<TlsEnableableConfig>,
59
60    #[configurable(derived)]
61    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
62    encoding: Transformer,
63
64    #[configurable(derived)]
65    #[serde(
66        default,
67        deserialize_with = "crate::serde::bool_or_struct",
68        skip_serializing_if = "crate::serde::is_default"
69    )]
70    acknowledgements: AcknowledgementsConfig,
71}
72
73pub(super) fn default_endpoint() -> String {
74    "https://appsignal-endpoint.net".to_string()
75}
76
77#[derive(Clone, Copy, Debug, Default)]
78pub(super) struct AppsignalDefaultBatchSettings;
79
80impl SinkBatchSettings for AppsignalDefaultBatchSettings {
81    const MAX_EVENTS: Option<usize> = Some(100);
82    const MAX_BYTES: Option<usize> = Some(450_000);
83    const TIMEOUT_SECS: f64 = 1.0;
84}
85
86impl AppsignalConfig {
87    pub(super) fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
88        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
89        let client = HttpClient::new(tls, proxy)?;
90        Ok(client)
91    }
92
93    pub(super) fn build_sink(&self, http_client: HttpClient) -> crate::Result<VectorSink> {
94        let batch_settings = self.batch.into_batcher_settings()?;
95
96        let endpoint = endpoint_uri(&self.endpoint, "vector/events")?;
97        let push_api_key = self.push_api_key.clone();
98        let compression = self.compression;
99        let service = AppsignalService::new(http_client, endpoint, push_api_key, compression);
100
101        let request_opts = self.request;
102        let request_settings = request_opts.into_settings();
103        let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status);
104
105        let service = ServiceBuilder::new()
106            .settings(request_settings, retry_logic)
107            .service(service);
108
109        let transformer = self.encoding.clone();
110        let sink = AppsignalSink {
111            service,
112            compression,
113            transformer,
114            batch_settings,
115        };
116
117        Ok(VectorSink::from_event_streamsink(sink))
118    }
119}
120
121impl_generate_config_from_default!(AppsignalConfig);
122
123#[async_trait::async_trait]
124#[typetag::serde(name = "appsignal")]
125impl SinkConfig for AppsignalConfig {
126    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
127        let client = self.build_client(cx.proxy())?;
128        let healthcheck = healthcheck(
129            endpoint_uri(&self.endpoint, "vector/healthcheck")?,
130            self.push_api_key.inner().to_string(),
131            client.clone(),
132        )
133        .boxed();
134        let sink = self.build_sink(client)?;
135
136        Ok((sink, healthcheck))
137    }
138
139    fn input(&self) -> Input {
140        Input::new(DataType::Metric | DataType::Log)
141    }
142
143    fn acknowledgements(&self) -> &AcknowledgementsConfig {
144        &self.acknowledgements
145    }
146}
147
148async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> {
149    let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {push_api_key}"));
150    let response = client.send(request.body(Body::empty()).unwrap()).await?;
151
152    match response.status() {
153        status if status.is_success() => Ok(()),
154        other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
155    }
156}
157
158pub fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result<Uri> {
159    let uri = if endpoint.ends_with('/') {
160        format!("{endpoint}{path}")
161    } else {
162        format!("{endpoint}/{path}")
163    };
164    match uri.parse::<Uri>() {
165        Ok(u) => Ok(u),
166        Err(e) => Err(Box::new(BuildError::UriParseError { source: e })),
167    }
168}
169
170#[cfg(test)]
171mod test {
172    use super::{endpoint_uri, AppsignalConfig};
173
174    #[test]
175    fn generate_config() {
176        crate::test_util::test_generate_config::<AppsignalConfig>();
177    }
178
179    #[test]
180    fn endpoint_uri_with_path() {
181        let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events");
182        assert_eq!(
183            uri.expect("Not a valid URI").to_string(),
184            "https://appsignal-endpoint.net/vector/events"
185        );
186    }
187
188    #[test]
189    fn endpoint_uri_with_trailing_slash() {
190        let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events");
191        assert_eq!(
192            uri.expect("Not a valid URI").to_string(),
193            "https://appsignal-endpoint.net/vector/events"
194        );
195    }
196}