vector/sinks/appsignal/
config.rs1use futures::FutureExt;
2use http::{Request, Uri, header::AUTHORIZATION};
3use hyper::Body;
4use tower::ServiceBuilder;
5use vector_lib::{
6 config::{AcknowledgementsConfig, DataType, Input, proxy::ProxyConfig},
7 configurable::configurable_component,
8 sensitive_string::SensitiveString,
9 tls::{MaybeTlsSettings, TlsEnableableConfig},
10};
11
12use super::{
13 service::{AppsignalResponse, AppsignalService},
14 sink::AppsignalSink,
15};
16use crate::{
17 codecs::Transformer,
18 http::HttpClient,
19 sinks::{
20 BuildError, Healthcheck, HealthcheckError, VectorSink,
21 prelude::{SinkConfig, SinkContext},
22 util::{
23 BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig,
24 http::{HttpStatusRetryLogic, RetryStrategy},
25 },
26 },
27};
28
29#[configurable_component(sink("appsignal", "Deliver log and metric event data to AppSignal."))]
31#[derive(Clone, Debug, Default)]
32pub(super) struct AppsignalConfig {
33 #[configurable(validation(format = "uri"))]
35 #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))]
36 #[serde(default = "default_endpoint")]
37 pub(super) endpoint: String,
38
39 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
41 #[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))]
42 push_api_key: SensitiveString,
43
44 #[configurable(derived)]
45 #[serde(default = "Compression::gzip_default")]
46 compression: Compression,
47
48 #[configurable(derived)]
49 #[serde(default)]
50 batch: BatchConfig<AppsignalDefaultBatchSettings>,
51
52 #[configurable(derived)]
53 #[serde(default)]
54 request: TowerRequestConfig,
55
56 #[configurable(derived)]
57 tls: Option<TlsEnableableConfig>,
58
59 #[configurable(derived)]
60 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
61 encoding: Transformer,
62
63 #[configurable(derived)]
64 #[serde(
65 default,
66 deserialize_with = "crate::serde::bool_or_struct",
67 skip_serializing_if = "crate::serde::is_default"
68 )]
69 acknowledgements: AcknowledgementsConfig,
70
71 #[configurable(derived)]
72 #[serde(default)]
73 retry_strategy: RetryStrategy,
74}
75
76pub(super) fn default_endpoint() -> String {
77 "https://appsignal-endpoint.net".to_string()
78}
79
80#[derive(Clone, Copy, Debug, Default)]
81pub(super) struct AppsignalDefaultBatchSettings;
82
83impl SinkBatchSettings for AppsignalDefaultBatchSettings {
84 const MAX_EVENTS: Option<usize> = Some(100);
85 const MAX_BYTES: Option<usize> = Some(450_000);
86 const TIMEOUT_SECS: f64 = 1.0;
87}
88
89impl AppsignalConfig {
90 pub(super) fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
91 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
92 let client = HttpClient::new(tls, proxy)?;
93 Ok(client)
94 }
95
96 pub(super) fn build_sink(&self, http_client: HttpClient) -> crate::Result<VectorSink> {
97 let batch_settings = self.batch.into_batcher_settings()?;
98
99 let endpoint = endpoint_uri(&self.endpoint, "vector/events")?;
100 let push_api_key = self.push_api_key.clone();
101 let compression = self.compression;
102 let service = AppsignalService::new(http_client, endpoint, push_api_key, compression);
103
104 let request_opts = self.request;
105 let request_settings = request_opts.into_settings();
106 let retry_logic = HttpStatusRetryLogic::new(
107 |req: &AppsignalResponse| req.http_status,
108 self.retry_strategy.clone(),
109 );
110
111 let service = ServiceBuilder::new()
112 .settings(request_settings, retry_logic)
113 .service(service);
114
115 let transformer = self.encoding.clone();
116 let sink = AppsignalSink {
117 service,
118 compression,
119 transformer,
120 batch_settings,
121 };
122
123 Ok(VectorSink::from_event_streamsink(sink))
124 }
125}
126
127impl_generate_config_from_default!(AppsignalConfig);
128
129#[async_trait::async_trait]
130#[typetag::serde(name = "appsignal")]
131impl SinkConfig for AppsignalConfig {
132 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
133 let client = self.build_client(cx.proxy())?;
134 let healthcheck = healthcheck(
135 endpoint_uri(&self.endpoint, "vector/healthcheck")?,
136 self.push_api_key.inner().to_string(),
137 client.clone(),
138 )
139 .boxed();
140 let sink = self.build_sink(client)?;
141
142 Ok((sink, healthcheck))
143 }
144
145 fn input(&self) -> Input {
146 Input::new(DataType::Metric | DataType::Log)
147 }
148
149 fn acknowledgements(&self) -> &AcknowledgementsConfig {
150 &self.acknowledgements
151 }
152}
153
154async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> {
155 let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {push_api_key}"));
156 let response = client.send(request.body(Body::empty()).unwrap()).await?;
157
158 match response.status() {
159 status if status.is_success() => Ok(()),
160 other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
161 }
162}
163
164pub fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result<Uri> {
165 let uri = if endpoint.ends_with('/') {
166 format!("{endpoint}{path}")
167 } else {
168 format!("{endpoint}/{path}")
169 };
170 match uri.parse::<Uri>() {
171 Ok(u) => Ok(u),
172 Err(e) => Err(Box::new(BuildError::UriParseError { source: e })),
173 }
174}
175
176#[cfg(test)]
177mod test {
178 use super::{AppsignalConfig, endpoint_uri};
179
180 #[test]
181 fn generate_config() {
182 crate::test_util::test_generate_config::<AppsignalConfig>();
183 }
184
185 #[test]
186 fn endpoint_uri_with_path() {
187 let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events");
188 assert_eq!(
189 uri.expect("Not a valid URI").to_string(),
190 "https://appsignal-endpoint.net/vector/events"
191 );
192 }
193
194 #[test]
195 fn endpoint_uri_with_trailing_slash() {
196 let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events");
197 assert_eq!(
198 uri.expect("Not a valid URI").to_string(),
199 "https://appsignal-endpoint.net/vector/events"
200 );
201 }
202}