vector/sinks/appsignal/
config.rs1use futures::FutureExt;
2use http::{header::AUTHORIZATION, Request, Uri};
3use hyper::Body;
4use tower::ServiceBuilder;
5use vector_lib::configurable::configurable_component;
6use vector_lib::sensitive_string::SensitiveString;
7use vector_lib::{
8 config::{proxy::ProxyConfig, AcknowledgementsConfig, DataType, Input},
9 tls::{MaybeTlsSettings, TlsEnableableConfig},
10};
11
12use crate::{
13 codecs::Transformer,
14 http::HttpClient,
15 sinks::{
16 prelude::{SinkConfig, SinkContext},
17 util::{
18 http::HttpStatusRetryLogic, BatchConfig, Compression, ServiceBuilderExt,
19 SinkBatchSettings, TowerRequestConfig,
20 },
21 BuildError, Healthcheck, HealthcheckError, VectorSink,
22 },
23};
24
25use super::{
26 service::{AppsignalResponse, AppsignalService},
27 sink::AppsignalSink,
28};
29
30#[configurable_component(sink("appsignal", "Deliver log and metric event data to AppSignal."))]
32#[derive(Clone, Debug, Default)]
33pub(super) struct AppsignalConfig {
34 #[configurable(validation(format = "uri"))]
36 #[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))]
37 #[serde(default = "default_endpoint")]
38 pub(super) endpoint: String,
39
40 #[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
42 #[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))]
43 push_api_key: SensitiveString,
44
45 #[configurable(derived)]
46 #[serde(default = "Compression::gzip_default")]
47 compression: Compression,
48
49 #[configurable(derived)]
50 #[serde(default)]
51 batch: BatchConfig<AppsignalDefaultBatchSettings>,
52
53 #[configurable(derived)]
54 #[serde(default)]
55 request: TowerRequestConfig,
56
57 #[configurable(derived)]
58 tls: Option<TlsEnableableConfig>,
59
60 #[configurable(derived)]
61 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
62 encoding: Transformer,
63
64 #[configurable(derived)]
65 #[serde(
66 default,
67 deserialize_with = "crate::serde::bool_or_struct",
68 skip_serializing_if = "crate::serde::is_default"
69 )]
70 acknowledgements: AcknowledgementsConfig,
71}
72
73pub(super) fn default_endpoint() -> String {
74 "https://appsignal-endpoint.net".to_string()
75}
76
77#[derive(Clone, Copy, Debug, Default)]
78pub(super) struct AppsignalDefaultBatchSettings;
79
80impl SinkBatchSettings for AppsignalDefaultBatchSettings {
81 const MAX_EVENTS: Option<usize> = Some(100);
82 const MAX_BYTES: Option<usize> = Some(450_000);
83 const TIMEOUT_SECS: f64 = 1.0;
84}
85
86impl AppsignalConfig {
87 pub(super) fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
88 let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), false)?;
89 let client = HttpClient::new(tls, proxy)?;
90 Ok(client)
91 }
92
93 pub(super) fn build_sink(&self, http_client: HttpClient) -> crate::Result<VectorSink> {
94 let batch_settings = self.batch.into_batcher_settings()?;
95
96 let endpoint = endpoint_uri(&self.endpoint, "vector/events")?;
97 let push_api_key = self.push_api_key.clone();
98 let compression = self.compression;
99 let service = AppsignalService::new(http_client, endpoint, push_api_key, compression);
100
101 let request_opts = self.request;
102 let request_settings = request_opts.into_settings();
103 let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status);
104
105 let service = ServiceBuilder::new()
106 .settings(request_settings, retry_logic)
107 .service(service);
108
109 let transformer = self.encoding.clone();
110 let sink = AppsignalSink {
111 service,
112 compression,
113 transformer,
114 batch_settings,
115 };
116
117 Ok(VectorSink::from_event_streamsink(sink))
118 }
119}
120
121impl_generate_config_from_default!(AppsignalConfig);
122
123#[async_trait::async_trait]
124#[typetag::serde(name = "appsignal")]
125impl SinkConfig for AppsignalConfig {
126 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
127 let client = self.build_client(cx.proxy())?;
128 let healthcheck = healthcheck(
129 endpoint_uri(&self.endpoint, "vector/healthcheck")?,
130 self.push_api_key.inner().to_string(),
131 client.clone(),
132 )
133 .boxed();
134 let sink = self.build_sink(client)?;
135
136 Ok((sink, healthcheck))
137 }
138
139 fn input(&self) -> Input {
140 Input::new(DataType::Metric | DataType::Log)
141 }
142
143 fn acknowledgements(&self) -> &AcknowledgementsConfig {
144 &self.acknowledgements
145 }
146}
147
148async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> {
149 let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {push_api_key}"));
150 let response = client.send(request.body(Body::empty()).unwrap()).await?;
151
152 match response.status() {
153 status if status.is_success() => Ok(()),
154 other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
155 }
156}
157
158pub fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result<Uri> {
159 let uri = if endpoint.ends_with('/') {
160 format!("{endpoint}{path}")
161 } else {
162 format!("{endpoint}/{path}")
163 };
164 match uri.parse::<Uri>() {
165 Ok(u) => Ok(u),
166 Err(e) => Err(Box::new(BuildError::UriParseError { source: e })),
167 }
168}
169
170#[cfg(test)]
171mod test {
172 use super::{endpoint_uri, AppsignalConfig};
173
174 #[test]
175 fn generate_config() {
176 crate::test_util::test_generate_config::<AppsignalConfig>();
177 }
178
179 #[test]
180 fn endpoint_uri_with_path() {
181 let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events");
182 assert_eq!(
183 uri.expect("Not a valid URI").to_string(),
184 "https://appsignal-endpoint.net/vector/events"
185 );
186 }
187
188 #[test]
189 fn endpoint_uri_with_trailing_slash() {
190 let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events");
191 assert_eq!(
192 uri.expect("Not a valid URI").to_string(),
193 "https://appsignal-endpoint.net/vector/events"
194 );
195 }
196}