vector/sinks/appsignal/
config.rs1use futures::FutureExt;
2use http::{Request, Uri, header::AUTHORIZATION};
3use hyper::Body;
4use tower::ServiceBuilder;
5use vector_lib::{
6 config::{AcknowledgementsConfig, DataType, Input, proxy::ProxyConfig},
7 configurable::configurable_component,
8 sensitive_string::SensitiveString,
9 tls::{MaybeTlsSettings, TlsEnableableConfig},
10};
11
12use super::{
13 service::{AppsignalResponse, AppsignalService},
14 sink::AppsignalSink,
15};
16use crate::{
17 codecs::Transformer,
18 http::HttpClient,
19 sinks::{
20 BuildError, Healthcheck, HealthcheckError, VectorSink,
21 prelude::{SinkConfig, SinkContext},
22 util::{
23 BatchConfig, Compression, ServiceBuilderExt, SinkBatchSettings, TowerRequestConfig,
24 http::HttpStatusRetryLogic,
25 },
26 },
27};
28
29#[configurable_component(sink("appsignal", "Deliver log and metric event data to AppSignal."))]
31#[derive(Clone, Debug, Default)]
32pub(super) struct AppsignalConfig {
33 #[configurable(validation(format = "uri"))]
35 #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))]
36 #[serde(default = "default_endpoint")]
37 pub(super) endpoint: String,
38
39 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
41 #[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))]
42 push_api_key: SensitiveString,
43
44 #[configurable(derived)]
45 #[serde(default = "Compression::gzip_default")]
46 compression: Compression,
47
48 #[configurable(derived)]
49 #[serde(default)]
50 batch: BatchConfig<AppsignalDefaultBatchSettings>,
51
52 #[configurable(derived)]
53 #[serde(default)]
54 request: TowerRequestConfig,
55
56 #[configurable(derived)]
57 tls: Option<TlsEnableableConfig>,
58
59 #[configurable(derived)]
60 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
61 encoding: Transformer,
62
63 #[configurable(derived)]
64 #[serde(
65 default,
66 deserialize_with = "crate::serde::bool_or_struct",
67 skip_serializing_if = "crate::serde::is_default"
68 )]
69 acknowledgements: AcknowledgementsConfig,
70}
71
72pub(super) fn default_endpoint() -> String {
73 "https://appsignal-endpoint.net".to_string()
74}
75
76#[derive(Clone, Copy, Debug, Default)]
77pub(super) struct AppsignalDefaultBatchSettings;
78
79impl SinkBatchSettings for AppsignalDefaultBatchSettings {
80 const MAX_EVENTS: Option<usize> = Some(100);
81 const MAX_BYTES: Option<usize> = Some(450_000);
82 const TIMEOUT_SECS: f64 = 1.0;
83}
84
85impl AppsignalConfig {
86 pub(super) fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
87 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
88 let client = HttpClient::new(tls, proxy)?;
89 Ok(client)
90 }
91
92 pub(super) fn build_sink(&self, http_client: HttpClient) -> crate::Result<VectorSink> {
93 let batch_settings = self.batch.into_batcher_settings()?;
94
95 let endpoint = endpoint_uri(&self.endpoint, "vector/events")?;
96 let push_api_key = self.push_api_key.clone();
97 let compression = self.compression;
98 let service = AppsignalService::new(http_client, endpoint, push_api_key, compression);
99
100 let request_opts = self.request;
101 let request_settings = request_opts.into_settings();
102 let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status);
103
104 let service = ServiceBuilder::new()
105 .settings(request_settings, retry_logic)
106 .service(service);
107
108 let transformer = self.encoding.clone();
109 let sink = AppsignalSink {
110 service,
111 compression,
112 transformer,
113 batch_settings,
114 };
115
116 Ok(VectorSink::from_event_streamsink(sink))
117 }
118}
119
120impl_generate_config_from_default!(AppsignalConfig);
121
122#[async_trait::async_trait]
123#[typetag::serde(name = "appsignal")]
124impl SinkConfig for AppsignalConfig {
125 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
126 let client = self.build_client(cx.proxy())?;
127 let healthcheck = healthcheck(
128 endpoint_uri(&self.endpoint, "vector/healthcheck")?,
129 self.push_api_key.inner().to_string(),
130 client.clone(),
131 )
132 .boxed();
133 let sink = self.build_sink(client)?;
134
135 Ok((sink, healthcheck))
136 }
137
138 fn input(&self) -> Input {
139 Input::new(DataType::Metric | DataType::Log)
140 }
141
142 fn acknowledgements(&self) -> &AcknowledgementsConfig {
143 &self.acknowledgements
144 }
145}
146
147async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> {
148 let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {push_api_key}"));
149 let response = client.send(request.body(Body::empty()).unwrap()).await?;
150
151 match response.status() {
152 status if status.is_success() => Ok(()),
153 other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
154 }
155}
156
157pub fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result<Uri> {
158 let uri = if endpoint.ends_with('/') {
159 format!("{endpoint}{path}")
160 } else {
161 format!("{endpoint}/{path}")
162 };
163 match uri.parse::<Uri>() {
164 Ok(u) => Ok(u),
165 Err(e) => Err(Box::new(BuildError::UriParseError { source: e })),
166 }
167}
168
169#[cfg(test)]
170mod test {
171 use super::{AppsignalConfig, endpoint_uri};
172
173 #[test]
174 fn generate_config() {
175 crate::test_util::test_generate_config::<AppsignalConfig>();
176 }
177
178 #[test]
179 fn endpoint_uri_with_path() {
180 let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events");
181 assert_eq!(
182 uri.expect("Not a valid URI").to_string(),
183 "https://appsignal-endpoint.net/vector/events"
184 );
185 }
186
187 #[test]
188 fn endpoint_uri_with_trailing_slash() {
189 let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events");
190 assert_eq!(
191 uri.expect("Not a valid URI").to_string(),
192 "https://appsignal-endpoint.net/vector/events"
193 );
194 }
195}