use futures::FutureExt;
use http::{header::AUTHORIZATION, Request, Uri};
use hyper::Body;
use tower::ServiceBuilder;
use vector_lib::configurable::configurable_component;
use vector_lib::sensitive_string::SensitiveString;
use vector_lib::{
config::{proxy::ProxyConfig, AcknowledgementsConfig, DataType, Input},
tls::{MaybeTlsSettings, TlsEnableableConfig},
};
use crate::{
codecs::Transformer,
http::HttpClient,
sinks::{
prelude::{SinkConfig, SinkContext},
util::{
http::HttpStatusRetryLogic, BatchConfig, Compression, ServiceBuilderExt,
SinkBatchSettings, TowerRequestConfig,
},
BuildError, Healthcheck, HealthcheckError, VectorSink,
},
};
use super::{
service::{AppsignalResponse, AppsignalService},
sink::AppsignalSink,
};
#[configurable_component(sink("appsignal", "Deliver log and metric event data to AppSignal."))]
#[derive(Clone, Debug, Default)]
pub(super) struct AppsignalConfig {
#[configurable(validation(format = "uri"))]
#[configurable(metadata(docs::examples = "https://appsignal-endpoint.net"))]
#[serde(default = "default_endpoint")]
pub(super) endpoint: String,
#[configurable(metadata(docs::examples = "00000000-0000-0000-0000-000000000000"))]
#[configurable(metadata(docs::examples = "${APPSIGNAL_PUSH_API_KEY}"))]
push_api_key: SensitiveString,
#[configurable(derived)]
#[serde(default = "Compression::gzip_default")]
compression: Compression,
#[configurable(derived)]
#[serde(default)]
batch: BatchConfig<AppsignalDefaultBatchSettings>,
#[configurable(derived)]
#[serde(default)]
request: TowerRequestConfig,
#[configurable(derived)]
tls: Option<TlsEnableableConfig>,
#[configurable(derived)]
#[serde(default, skip_serializing_if = "crate::serde::is_default")]
encoding: Transformer,
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::is_default"
)]
acknowledgements: AcknowledgementsConfig,
}
pub(super) fn default_endpoint() -> String {
"https://appsignal-endpoint.net".to_string()
}
#[derive(Clone, Copy, Debug, Default)]
pub(super) struct AppsignalDefaultBatchSettings;
impl SinkBatchSettings for AppsignalDefaultBatchSettings {
const MAX_EVENTS: Option<usize> = Some(100);
const MAX_BYTES: Option<usize> = Some(450_000);
const TIMEOUT_SECS: f64 = 1.0;
}
impl AppsignalConfig {
pub(super) fn build_client(&self, proxy: &ProxyConfig) -> crate::Result<HttpClient> {
let tls = MaybeTlsSettings::from_config(&self.tls, false)?;
let client = HttpClient::new(tls, proxy)?;
Ok(client)
}
pub(super) fn build_sink(&self, http_client: HttpClient) -> crate::Result<VectorSink> {
let batch_settings = self.batch.into_batcher_settings()?;
let endpoint = endpoint_uri(&self.endpoint, "vector/events")?;
let push_api_key = self.push_api_key.clone();
let compression = self.compression;
let service = AppsignalService::new(http_client, endpoint, push_api_key, compression);
let request_opts = self.request;
let request_settings = request_opts.into_settings();
let retry_logic = HttpStatusRetryLogic::new(|req: &AppsignalResponse| req.http_status);
let service = ServiceBuilder::new()
.settings(request_settings, retry_logic)
.service(service);
let transformer = self.encoding.clone();
let sink = AppsignalSink {
service,
compression,
transformer,
batch_settings,
};
Ok(VectorSink::from_event_streamsink(sink))
}
}
impl_generate_config_from_default!(AppsignalConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "appsignal")]
impl SinkConfig for AppsignalConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let client = self.build_client(cx.proxy())?;
let healthcheck = healthcheck(
endpoint_uri(&self.endpoint, "vector/healthcheck")?,
self.push_api_key.inner().to_string(),
client.clone(),
)
.boxed();
let sink = self.build_sink(client)?;
Ok((sink, healthcheck))
}
fn input(&self) -> Input {
Input::new(DataType::Metric | DataType::Log)
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}
async fn healthcheck(uri: Uri, push_api_key: String, client: HttpClient) -> crate::Result<()> {
let request = Request::get(uri).header(AUTHORIZATION, format!("Bearer {}", push_api_key));
let response = client.send(request.body(Body::empty()).unwrap()).await?;
match response.status() {
status if status.is_success() => Ok(()),
other => Err(HealthcheckError::UnexpectedStatus { status: other }.into()),
}
}
pub fn endpoint_uri(endpoint: &str, path: &str) -> crate::Result<Uri> {
let uri = if endpoint.ends_with('/') {
format!("{endpoint}{path}")
} else {
format!("{endpoint}/{path}")
};
match uri.parse::<Uri>() {
Ok(u) => Ok(u),
Err(e) => Err(Box::new(BuildError::UriParseError { source: e })),
}
}
#[cfg(test)]
mod test {
use super::{endpoint_uri, AppsignalConfig};
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<AppsignalConfig>();
}
#[test]
fn endpoint_uri_with_path() {
let uri = endpoint_uri("https://appsignal-endpoint.net", "vector/events");
assert_eq!(
uri.expect("Not a valid URI").to_string(),
"https://appsignal-endpoint.net/vector/events"
);
}
#[test]
fn endpoint_uri_with_trailing_slash() {
let uri = endpoint_uri("https://appsignal-endpoint.net/", "vector/events");
assert_eq!(
uri.expect("Not a valid URI").to_string(),
"https://appsignal-endpoint.net/vector/events"
);
}
}