vector/sinks/appsignal/
config.rs

1use futures::FutureExt;
2use http::{Request, Uri, header::AUTHORIZATION};
3use hyper::Body;
4use tower::ServiceBuilder;
5use vector_lib::{
6    config::{AcknowledgementsConfig, DataType, Input, proxy::ProxyConfig},
7    configurable::configurable_component,
8    sensitive_string::SensitiveString,
9    tls::{MaybeTlsSettings, TlsEnableableConfig},
10};
11
12use super::{
13    service::{AppsignalResponse, AppsignalService},
14    sink::AppsignalSink,
15};
16use crate::{
17    codecs::Transformer,
18    http::HttpClient,
19    sinks::{
20        BuildError, Healthcheck, HealthcheckError, VectorSink,
21        prelude::{SinkConfig, SinkContext},
22        util::{
23            BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig,
24            http::{HttpStatusRetryLogic, RetryStrategy},
25        },
26    },
27};
28
29/// Configuration for the `appsignal` sink.
30#[configurable_component(sink("appsignal", "Deliver log and metric event data to AppSignal."))]
31#[derive(Clone, Debug, Default)]
32pub(super) struct AppsignalConfig {
33    /// The URI for the AppSignal API to send data to.
34    #[configurable(validation(format = "uri"))]
35    #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))]
36    #[serde(default = "default_endpoint")]
37    pub(super) endpoint: String,
38
39    /// A valid app-level AppSignal Push API key.
40    #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
41    #[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))]
42    push_api_key: SensitiveString,
43
44    #[configurable(derived)]
45    #[serde(default = "Compression::gzip_default")]
46    compression: Compression,
47
48    #[configurable(derived)]
49    #[serde(default)]
50    batch: BatchConfig<AppsignalDefaultBatchSettings>,
51
52    #[configurable(derived)]
53    #[serde(default)]
54    request: TowerRequestConfig,
55
56    #[configurable(derived)]
57    tls: Option<TlsEnableableConfig>,
58
59    #[configurable(derived)]
60    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
61    encoding: Transformer,
62
63    #[configurable(derived)]
64    #[serde(
65        default,
66        deserialize_with = "crate::serde::bool_or_struct",
67        skip_serializing_if = "crate::serde::is_default"
68    )]
69    acknowledgements: AcknowledgementsConfig,
70
71    #[configurable(derived)]
72    #[serde(default)]
73    retry_strategy: RetryStrategy,
74}
75
76pub(super) fn default_endpoint() -> String {
77    "https://appsignal-endpoint.net".to_string()
78}
79
80#[derive(Clone, Copy, Debug, Default)]
81pub(super) struct AppsignalDefaultBatchSettings;
82
83impl SinkBatchSettings for AppsignalDefaultBatchSettings {
84    const MAX_EVENTS: Option<usize> = Some(100);
85    const MAX_BYTES: Option<usize> = Some(450_000);
86    const TIMEOUT_SECS: f64 = 1.0;
87}
88
89impl AppsignalConfig {
90    pub(super) fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
91        let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
92        let client = HttpClient::new(tls, proxy)?;
93        Ok(client)
94    }
95
96    pub(super) fn build_sink(&self, http_client: HttpClient) -> crate::Result<VectorSink> {
97        let batch_settings = self.batch.into_batcher_settings()?;
98
99        let endpoint = endpoint_uri(&self.endpoint, "vector/events")?;
100        let push_api_key = self.push_api_key.clone();
101        let compression = self.compression;
102        let service = AppsignalService::new(http_client, endpoint, push_api_key, compression);
103
104        let request_opts = self.request;
105        let request_settings = request_opts.into_settings();
106        let retry_logic = HttpStatusRetryLogic::new(
107            |req: &AppsignalResponse| req.http_status,
108            self.retry_strategy.clone(),
109        );
110
111        let service = ServiceBuilder::new()
112            .settings(request_settings, retry_logic)
113            .service(service);
114
115        let transformer = self.encoding.clone();
116        let sink = AppsignalSink {
117            service,
118            compression,
119            transformer,
120            batch_settings,
121        };
122
123        Ok(VectorSink::from_event_streamsink(sink))
124    }
125}
126
127impl_generate_config_from_default!(AppsignalConfig);
128
129#[async_trait::async_trait]
130#[typetag::serde(name = "appsignal")]
131impl SinkConfig for AppsignalConfig {
132    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
133        let client = self.build_client(cx.proxy())?;
134        let healthcheck = healthcheck(
135            endpoint_uri(&self.endpoint, "vector/healthcheck")?,
136            self.push_api_key.inner().to_string(),
137            client.clone(),
138        )
139        .boxed();
140        let sink = self.build_sink(client)?;
141
142        Ok((sink, healthcheck))
143    }
144
145    fn input(&self) -> Input {
146        Input::new(DataType::Metric | DataType::Log)
147    }
148
149    fn acknowledgements(&self) -> &AcknowledgementsConfig {
150        &self.acknowledgements
151    }
152}
153
154async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> {
155    let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {push_api_key}"));
156    let response = client.send(request.body(Body::empty()).unwrap()).await?;
157
158    match response.status() {
159        status if status.is_success() => Ok(()),
160        other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
161    }
162}
163
164pub fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result<Uri> {
165    let uri = if endpoint.ends_with('/') {
166        format!("{endpoint}{path}")
167    } else {
168        format!("{endpoint}/{path}")
169    };
170    match uri.parse::<Uri>() {
171        Ok(u) => Ok(u),
172        Err(e) => Err(Box::new(BuildError::UriParseError { source: e })),
173    }
174}
175
176#[cfg(test)]
177mod test {
178    use super::{AppsignalConfig, endpoint_uri};
179
180    #[test]
181    fn generate_config() {
182        crate::test_util::test_generate_config::<AppsignalConfig>();
183    }
184
185    #[test]
186    fn endpoint_uri_with_path() {
187        let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events");
188        assert_eq!(
189            uri.expect("Not a valid URI").to_string(),
190            "https://appsignal-endpoint.net/vector/events"
191        );
192    }
193
194    #[test]
195    fn endpoint_uri_with_trailing_slash() {
196        let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events");
197        assert_eq!(
198            uri.expect("Not a valid URI").to_string(),
199            "https://appsignal-endpoint.net/vector/events"
200        );
201    }
202}