vector/sinks/appsignal/
config.rs

1use futures::FutureExt;
2use http::{Request, Uri, header::AUTHORIZATION};
3use hyper::Body;
4use tower::ServiceBuilder;
5use vector_lib::{
6    config::{AcknowledgementsConfig, DataType, Input, proxy::ProxyConfig},
7    configurable::configurable_component,
8    sensitive_string::SensitiveString,
9    tls::{MaybeTlsSettings, TlsEnableableConfig},
10};
11
12use super::{
13    service::{AppsignalResponse, AppsignalService},
14    sink::AppsignalSink,
15};
16use crate::{
17    codecs::Transformer,
18    http::HttpClient,
19    sinks::{
20        BuildError, Healthcheck, HealthcheckError, VectorSink,
21        prelude::{SinkConfig, SinkContext},
22        util::{
23            BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig,
24            http::HttpStatusRetryLogic,
25        },
26    },
27};
28
29/// Configuration for the `appsignal` sink.
30#[configurable_component(sink("appsignal", "Deliver log and metric event data to AppSignal."))]
31#[derive(Clone, Debug, Default)]
32pub(super) struct AppsignalConfig {
33    /// The URI for the AppSignal API to send data to.
34    #[configurable(validation(format = "uri"))]
35    #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))]
36    #[serde(default = "default_endpoint")]
37    pub(super) endpoint: String,
38
39    /// A valid app-level AppSignal Push API key.
40    #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
41    #[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))]
42    push_api_key: SensitiveString,
43
44    #[configurable(derived)]
45    #[serde(default = "Compression::gzip_default")]
46    compression: Compression,
47
48    #[configurable(derived)]
49    #[serde(default)]
50    batch: BatchConfig<AppsignalDefaultBatchSettings>,
51
52    #[configurable(derived)]
53    #[serde(default)]
54    request: TowerRequestConfig,
55
56    #[configurable(derived)]
57    tls: Option<TlsEnableableConfig>,
58
59    #[configurable(derived)]
60    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
61    encoding: Transformer,
62
63    #[configurable(derived)]
64    #[serde(
65        default,
66        deserialize_with = "crate::serde::bool_or_struct",
67        skip_serializing_if = "crate::serde::is_default"
68    )]
69    acknowledgements: AcknowledgementsConfig,
70}
71
72pub(super) fn default_endpoint() -> String {
73    "https://appsignal-endpoint.net".to_string()
74}
75
76#[derive(Clone, Copy, Debug, Default)]
77pub(super) struct AppsignalDefaultBatchSettings;
78
79impl SinkBatchSettings for AppsignalDefaultBatchSettings {
80    const MAX_EVENTS: Option<usize> = Some(100);
81    const MAX_BYTES: Option<usize> = Some(450_000);
82    const TIMEOUT_SECS: f64 = 1.0;
83}
84
85impl AppsignalConfig {
86    pub(super) fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
87        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
88        let client = HttpClient::new(tls, proxy)?;
89        Ok(client)
90    }
91
92    pub(super) fn build_sink(&self, http_client: HttpClient) -> crate::Result<VectorSink> {
93        let batch_settings = self.batch.into_batcher_settings()?;
94
95        let endpoint = endpoint_uri(&self.endpoint, "vector/events")?;
96        let push_api_key = self.push_api_key.clone();
97        let compression = self.compression;
98        let service = AppsignalService::new(http_client, endpoint, push_api_key, compression);
99
100        let request_opts = self.request;
101        let request_settings = request_opts.into_settings();
102        let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status);
103
104        let service = ServiceBuilder::new()
105            .settings(request_settings, retry_logic)
106            .service(service);
107
108        let transformer = self.encoding.clone();
109        let sink = AppsignalSink {
110            service,
111            compression,
112            transformer,
113            batch_settings,
114        };
115
116        Ok(VectorSink::from_event_streamsink(sink))
117    }
118}
119
120impl_generate_config_from_default!(AppsignalConfig);
121
122#[async_trait::async_trait]
123#[typetag::serde(name = "appsignal")]
124impl SinkConfig for AppsignalConfig {
125    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
126        let client = self.build_client(cx.proxy())?;
127        let healthcheck = healthcheck(
128            endpoint_uri(&self.endpoint, "vector/healthcheck")?,
129            self.push_api_key.inner().to_string(),
130            client.clone(),
131        )
132        .boxed();
133        let sink = self.build_sink(client)?;
134
135        Ok((sink, healthcheck))
136    }
137
138    fn input(&self) -> Input {
139        Input::new(DataType::Metric | DataType::Log)
140    }
141
142    fn acknowledgements(&self) -> &AcknowledgementsConfig {
143        &self.acknowledgements
144    }
145}
146
147async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> {
148    let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {push_api_key}"));
149    let response = client.send(request.body(Body::empty()).unwrap()).await?;
150
151    match response.status() {
152        status if status.is_success() => Ok(()),
153        other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
154    }
155}
156
157pub fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result<Uri> {
158    let uri = if endpoint.ends_with('/') {
159        format!("{endpoint}{path}")
160    } else {
161        format!("{endpoint}/{path}")
162    };
163    match uri.parse::<Uri>() {
164        Ok(u) => Ok(u),
165        Err(e) => Err(Box::new(BuildError::UriParseError { source: e })),
166    }
167}
168
169#[cfg(test)]
170mod test {
171    use super::{AppsignalConfig, endpoint_uri};
172
173    #[test]
174    fn generate_config() {
175        crate::test_util::test_generate_config::<AppsignalConfig>();
176    }
177
178    #[test]
179    fn endpoint_uri_with_path() {
180        let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events");
181        assert_eq!(
182            uri.expect("Not a valid URI").to_string(),
183            "https://appsignal-endpoint.net/vector/events"
184        );
185    }
186
187    #[test]
188    fn endpoint_uri_with_trailing_slash() {
189        let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events");
190        assert_eq!(
191            uri.expect("Not a valid URI").to_string(),
192            "https://appsignal-endpoint.net/vector/events"
193        );
194    }
195}