use std::{collections::HashMap, net::SocketAddr};
use bytes::{Bytes, BytesMut};
use chrono::Utc;
use http::StatusCode;
use http_serde;
use tokio_util::codec::Decoder as _;
use vrl::value::{kind::Collection, Kind};
use warp::http::HeaderMap;
use vector_lib::codecs::{
decoding::{DeserializerConfig, FramingConfig},
BytesDecoderConfig, BytesDeserializerConfig, JsonDeserializerConfig,
NewlineDelimitedDecoderConfig,
};
use vector_lib::configurable::configurable_component;
use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path};
use vector_lib::{
config::{DataType, LegacyKey, LogNamespace},
schema::Definition,
};
use crate::{
codecs::{Decoder, DecodingConfig},
config::{
GenerateConfig, Resource, SourceAcknowledgementsConfig, SourceConfig, SourceContext,
SourceOutput,
},
event::Event,
http::KeepaliveConfig,
serde::{bool_or_struct, default_decoding},
sources::util::{
http::{add_headers, add_query_parameters, HttpMethod},
Encoding, ErrorMessage, HttpSource, HttpSourceAuthConfig,
},
tls::TlsEnableableConfig,
};
#[configurable_component(source("http", "Host an HTTP endpoint to receive logs."))]
#[configurable(metadata(deprecated))]
#[derive(Clone, Debug)]
pub struct HttpConfig(SimpleHttpConfig);
impl GenerateConfig for HttpConfig {
fn generate_config() -> toml::Value {
<SimpleHttpConfig as GenerateConfig>::generate_config()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "http")]
impl SourceConfig for HttpConfig {
async fn build(&self, cx: SourceContext) -> vector_lib::Result<super::Source> {
self.0.build(cx).await
}
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
self.0.outputs(global_log_namespace)
}
fn resources(&self) -> Vec<Resource> {
self.0.resources()
}
fn can_acknowledge(&self) -> bool {
self.0.can_acknowledge()
}
}
#[configurable_component(source("http_server", "Host an HTTP endpoint to receive logs."))]
#[derive(Clone, Debug)]
pub struct SimpleHttpConfig {
#[configurable(metadata(docs::examples = "0.0.0.0:80"))]
#[configurable(metadata(docs::examples = "localhost:80"))]
address: SocketAddr,
#[serde(default)]
encoding: Option<Encoding>,
#[serde(default)]
#[configurable(metadata(docs::examples = "User-Agent"))]
#[configurable(metadata(docs::examples = "X-My-Custom-Header"))]
#[configurable(metadata(docs::examples = "X-*"))]
#[configurable(metadata(docs::examples = "*"))]
headers: Vec<String>,
#[serde(default)]
#[configurable(metadata(docs::examples = "application"))]
#[configurable(metadata(docs::examples = "source"))]
#[configurable(metadata(docs::examples = "param*"))]
#[configurable(metadata(docs::examples = "*"))]
query_parameters: Vec<String>,
#[configurable(derived)]
auth: Option<HttpSourceAuthConfig>,
#[serde(default = "crate::serde::default_true")]
strict_path: bool,
#[serde(default = "default_path")]
#[configurable(metadata(docs::examples = "/event/path"))]
#[configurable(metadata(docs::examples = "/logs"))]
path: String,
#[serde(default = "default_path_key")]
#[configurable(metadata(docs::examples = "vector_http_path"))]
path_key: OptionalValuePath,
#[serde(default = "default_host_key")]
#[configurable(metadata(docs::examples = "hostname"))]
host_key: OptionalValuePath,
#[serde(default = "default_http_method")]
method: HttpMethod,
#[configurable(metadata(docs::examples = 202))]
#[configurable(metadata(docs::numeric_type = "uint"))]
#[serde(with = "http_serde::status_code")]
#[serde(default = "default_http_response_code")]
response_code: StatusCode,
#[configurable(derived)]
tls: Option<TlsEnableableConfig>,
#[configurable(derived)]
framing: Option<FramingConfig>,
#[configurable(derived)]
decoding: Option<DeserializerConfig>,
#[configurable(derived)]
#[serde(default, deserialize_with = "bool_or_struct")]
acknowledgements: SourceAcknowledgementsConfig,
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,
#[configurable(derived)]
#[serde(default)]
keepalive: KeepaliveConfig,
}
impl SimpleHttpConfig {
fn schema_definition(&self, log_namespace: LogNamespace) -> Definition {
let mut schema_definition = self
.decoding
.as_ref()
.unwrap_or(&default_decoding())
.schema_definition(log_namespace)
.with_source_metadata(
SimpleHttpConfig::NAME,
self.path_key.path.clone().map(LegacyKey::InsertIfEmpty),
&owned_value_path!("path"),
Kind::bytes(),
None,
)
.with_source_metadata(
SimpleHttpConfig::NAME,
None,
&owned_value_path!("headers"),
Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
None,
)
.with_source_metadata(
SimpleHttpConfig::NAME,
None,
&owned_value_path!("query_parameters"),
Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
None,
)
.with_source_metadata(
SimpleHttpConfig::NAME,
self.host_key.path.clone().map(LegacyKey::Overwrite),
&owned_value_path!("host"),
Kind::bytes().or_undefined(),
None,
)
.with_standard_vector_source_metadata();
if log_namespace == LogNamespace::Legacy {
schema_definition = schema_definition.unknown_fields(Kind::bytes());
}
schema_definition
}
fn get_decoding_config(&self) -> crate::Result<DecodingConfig> {
if self.encoding.is_some() && (self.framing.is_some() || self.decoding.is_some()) {
return Err("Using `encoding` is deprecated and does not have any effect when `decoding` or `framing` is provided. Configure `framing` and `decoding` instead.".into());
}
let (framing, decoding) = if let Some(encoding) = self.encoding {
match encoding {
Encoding::Text => (
NewlineDelimitedDecoderConfig::new().into(),
BytesDeserializerConfig::new().into(),
),
Encoding::Json => (
BytesDecoderConfig::new().into(),
JsonDeserializerConfig::default().into(),
),
Encoding::Ndjson => (
NewlineDelimitedDecoderConfig::new().into(),
JsonDeserializerConfig::default().into(),
),
Encoding::Binary => (
BytesDecoderConfig::new().into(),
BytesDeserializerConfig::new().into(),
),
}
} else {
let decoding = self.decoding.clone().unwrap_or_else(default_decoding);
let framing = self
.framing
.clone()
.unwrap_or_else(|| decoding.default_stream_framing());
(framing, decoding)
};
Ok(DecodingConfig::new(
framing,
decoding,
self.log_namespace.unwrap_or(false).into(),
))
}
}
impl Default for SimpleHttpConfig {
fn default() -> Self {
Self {
address: "0.0.0.0:8080".parse().unwrap(),
encoding: None,
headers: Vec::new(),
query_parameters: Vec::new(),
tls: None,
auth: None,
path: default_path(),
path_key: default_path_key(),
host_key: default_host_key(),
method: default_http_method(),
response_code: default_http_response_code(),
strict_path: true,
framing: None,
decoding: Some(default_decoding()),
acknowledgements: SourceAcknowledgementsConfig::default(),
log_namespace: None,
keepalive: KeepaliveConfig::default(),
}
}
}
impl_generate_config_from_default!(SimpleHttpConfig);
const fn default_http_method() -> HttpMethod {
HttpMethod::Post
}
fn default_path() -> String {
"/".to_string()
}
fn default_path_key() -> OptionalValuePath {
OptionalValuePath::from(owned_value_path!("path"))
}
fn default_host_key() -> OptionalValuePath {
OptionalValuePath::none()
}
const fn default_http_response_code() -> StatusCode {
StatusCode::OK
}
pub fn remove_duplicates(mut list: Vec<String>, list_name: &str) -> Vec<String> {
list.sort();
let mut dedup = false;
for (idx, name) in list.iter().enumerate() {
if idx < list.len() - 1 && list[idx] == list[idx + 1] {
warn!(
"`{}` configuration contains duplicate entry for `{}`. Removing duplicate.",
list_name, name
);
dedup = true;
}
}
if dedup {
list.dedup();
}
list
}
fn socket_addr_to_ip_string(addr: &SocketAddr) -> String {
addr.ip().to_string()
}
#[derive(Clone)]
pub enum HttpConfigParamKind {
Glob(glob::Pattern),
Exact(String),
}
pub fn build_param_matcher(list: &[String]) -> crate::Result<Vec<HttpConfigParamKind>> {
list.iter()
.map(|s| match s.contains('*') {
true => Ok(HttpConfigParamKind::Glob(glob::Pattern::new(s)?)),
false => Ok(HttpConfigParamKind::Exact(s.to_string())),
})
.collect::<crate::Result<Vec<HttpConfigParamKind>>>()
}
#[async_trait::async_trait]
#[typetag::serde(name = "http_server")]
impl SourceConfig for SimpleHttpConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);
let decoder = self
.get_decoding_config()?
.build()?
.with_log_namespace(log_namespace);
let source = SimpleHttpSource {
headers: build_param_matcher(&remove_duplicates(self.headers.clone(), "headers"))?,
query_parameters: build_param_matcher(&remove_duplicates(
self.query_parameters.clone(),
"query_parameters",
))?,
path_key: self.path_key.clone(),
host_key: self.host_key.clone(),
decoder,
log_namespace,
};
source.run(
self.address,
self.path.as_str(),
self.method,
self.response_code,
self.strict_path,
&self.tls,
&self.auth,
cx,
self.acknowledgements,
self.keepalive.clone(),
)
}
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
let log_namespace = global_log_namespace.merge(self.log_namespace);
let schema_definition = self.schema_definition(log_namespace);
vec![SourceOutput::new_maybe_logs(
self.decoding
.as_ref()
.map(|d| d.output_type())
.unwrap_or(DataType::Log),
schema_definition,
)]
}
fn resources(&self) -> Vec<Resource> {
vec![Resource::tcp(self.address)]
}
fn can_acknowledge(&self) -> bool {
true
}
}
#[derive(Clone)]
struct SimpleHttpSource {
headers: Vec<HttpConfigParamKind>,
query_parameters: Vec<HttpConfigParamKind>,
path_key: OptionalValuePath,
host_key: OptionalValuePath,
decoder: Decoder,
log_namespace: LogNamespace,
}
impl HttpSource for SimpleHttpSource {
fn enrich_events(
&self,
events: &mut [Event],
request_path: &str,
headers: &HeaderMap,
query_parameters: &HashMap<String, String>,
source_ip: Option<&SocketAddr>,
) {
let now = Utc::now();
for event in events.iter_mut() {
match event {
Event::Log(log) => {
self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
self.path_key.path.as_ref().map(LegacyKey::InsertIfEmpty),
path!("path"),
request_path.to_owned(),
);
self.log_namespace.insert_standard_vector_source_metadata(
log,
SimpleHttpConfig::NAME,
now,
);
if let Some(addr) = source_ip {
self.log_namespace.insert_source_metadata(
SimpleHttpConfig::NAME,
log,
self.host_key.path.as_ref().map(LegacyKey::Overwrite),
path!("host"),
socket_addr_to_ip_string(addr),
);
}
}
_ => {
continue;
}
}
}
add_headers(
events,
&self.headers,
headers,
self.log_namespace,
SimpleHttpConfig::NAME,
);
add_query_parameters(
events,
&self.query_parameters,
query_parameters,
self.log_namespace,
SimpleHttpConfig::NAME,
);
}
fn build_events(
&self,
body: Bytes,
_header_map: &HeaderMap,
_query_parameters: &HashMap<String, String>,
_request_path: &str,
) -> Result<Vec<Event>, ErrorMessage> {
let mut decoder = self.decoder.clone();
let mut events = Vec::new();
let mut bytes = BytesMut::new();
bytes.extend_from_slice(&body);
loop {
match decoder.decode_eof(&mut bytes) {
Ok(Some((next, _))) => {
events.extend(next);
}
Ok(None) => break,
Err(error) => {
return Err(ErrorMessage::new(
StatusCode::BAD_REQUEST,
format!("Failed decoding body: {}", error),
));
}
}
}
Ok(events)
}
fn enable_source_ip(&self) -> bool {
self.host_key.path.is_some()
}
}
#[cfg(test)]
mod tests {
use std::str::FromStr;
use std::{io::Write, net::SocketAddr};
use flate2::{
write::{GzEncoder, ZlibEncoder},
Compression,
};
use futures::Stream;
use http::{HeaderMap, Method, StatusCode, Uri};
use similar_asserts::assert_eq;
use vector_lib::codecs::{
decoding::{DeserializerConfig, FramingConfig},
BytesDecoderConfig, JsonDeserializerConfig,
};
use vector_lib::config::LogNamespace;
use vector_lib::event::LogEvent;
use vector_lib::lookup::lookup_v2::OptionalValuePath;
use vector_lib::lookup::{event_path, owned_value_path, OwnedTargetPath, PathPrefix};
use vector_lib::schema::Definition;
use vrl::value::{kind::Collection, Kind, ObjectMap};
use crate::sources::http_server::HttpMethod;
use crate::{
components::validation::prelude::*,
config::{log_schema, SourceConfig, SourceContext},
event::{Event, EventStatus, Value},
test_util::{
components::{self, assert_source_compliance, HTTP_PUSH_SOURCE_TAGS},
next_addr, spawn_collect_n, wait_for_tcp,
},
SourceSender,
};
use super::{remove_duplicates, SimpleHttpConfig};
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<SimpleHttpConfig>();
}
#[allow(clippy::too_many_arguments)]
async fn source<'a>(
headers: Vec<String>,
query_parameters: Vec<String>,
path_key: &'a str,
host_key: &'a str,
path: &'a str,
method: &'a str,
response_code: StatusCode,
strict_path: bool,
status: EventStatus,
acknowledgements: bool,
framing: Option<FramingConfig>,
decoding: Option<DeserializerConfig>,
) -> (impl Stream<Item = Event> + 'a, SocketAddr) {
let (sender, recv) = SourceSender::new_test_finalize(status);
let address = next_addr();
let path = path.to_owned();
let host_key = OptionalValuePath::from(owned_value_path!(host_key));
let path_key = OptionalValuePath::from(owned_value_path!(path_key));
let context = SourceContext::new_test(sender, None);
let method = match Method::from_str(method).unwrap() {
Method::GET => HttpMethod::Get,
Method::POST => HttpMethod::Post,
_ => HttpMethod::Post,
};
tokio::spawn(async move {
SimpleHttpConfig {
address,
headers,
encoding: None,
query_parameters,
response_code,
tls: None,
auth: None,
strict_path,
path_key,
host_key,
path,
method,
framing,
decoding,
acknowledgements: acknowledgements.into(),
log_namespace: None,
keepalive: Default::default(),
}
.build(context)
.await
.unwrap()
.await
.unwrap();
});
wait_for_tcp(address).await;
(recv, address)
}
async fn send(address: SocketAddr, body: &str) -> u16 {
reqwest::Client::new()
.post(format!("http://{}/", address))
.body(body.to_owned())
.send()
.await
.unwrap()
.status()
.as_u16()
}
async fn send_with_headers(address: SocketAddr, body: &str, headers: HeaderMap) -> u16 {
reqwest::Client::new()
.post(format!("http://{}/", address))
.headers(headers)
.body(body.to_owned())
.send()
.await
.unwrap()
.status()
.as_u16()
}
async fn send_with_query(address: SocketAddr, body: &str, query: &str) -> u16 {
reqwest::Client::new()
.post(format!("http://{}?{}", address, query))
.body(body.to_owned())
.send()
.await
.unwrap()
.status()
.as_u16()
}
async fn send_with_path(address: SocketAddr, body: &str, path: &str) -> u16 {
reqwest::Client::new()
.post(format!("http://{}{}", address, path))
.body(body.to_owned())
.send()
.await
.unwrap()
.status()
.as_u16()
}
async fn send_request(address: SocketAddr, method: &str, body: &str, path: &str) -> u16 {
let method = Method::from_bytes(method.to_owned().as_bytes()).unwrap();
reqwest::Client::new()
.request(method, format!("http://{address}{path}"))
.body(body.to_owned())
.send()
.await
.unwrap()
.status()
.as_u16()
}
async fn send_bytes(address: SocketAddr, body: Vec<u8>, headers: HeaderMap) -> u16 {
reqwest::Client::new()
.post(format!("http://{address}/"))
.headers(headers)
.body(body)
.send()
.await
.unwrap()
.status()
.as_u16()
}
async fn spawn_ok_collect_n(
send: impl std::future::Future<Output = u16> + Send + 'static,
rx: impl Stream<Item = Event> + Unpin,
n: usize,
) -> Vec<Event> {
spawn_collect_n(async move { assert_eq!(200, send.await) }, rx, n).await
}
#[tokio::test]
async fn http_multiline_text() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
let body = "test body\ntest body 2";
let (rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
None,
)
.await;
spawn_ok_collect_n(send(addr, body), rx, 2).await
})
.await;
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(*log.get_message().unwrap(), "test body".into());
assert!(log.get_timestamp().is_some());
assert_eq!(
*log.get_source_type().unwrap(),
SimpleHttpConfig::NAME.into()
);
assert_eq!(log["http_path"], "/".into());
assert_event_metadata(log).await;
}
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(*log.get_message().unwrap(), "test body 2".into());
assert_event_metadata(log).await;
}
}
#[tokio::test]
async fn http_multiline_text2() {
let body = "test body\ntest body 2\n";
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
let (rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
None,
)
.await;
spawn_ok_collect_n(send(addr, body), rx, 2).await
})
.await;
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(*log.get_message().unwrap(), "test body".into());
assert_event_metadata(log).await;
}
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(*log.get_message().unwrap(), "test body 2".into());
assert_event_metadata(log).await;
}
}
#[tokio::test]
async fn http_bytes_codec_preserves_newlines() {
let body = "foo\nbar";
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
let (rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
Some(BytesDecoderConfig::new().into()),
None,
)
.await;
spawn_ok_collect_n(send(addr, body), rx, 1).await
})
.await;
assert_eq!(events.len(), 1);
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(*log.get_message().unwrap(), "foo\nbar".into());
assert_event_metadata(log).await;
}
}
#[tokio::test]
async fn http_json_parsing() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;
spawn_collect_n(
async move {
assert_eq!(400, send(addr, "{").await); assert_eq!(400, send(addr, r#"{"key"}"#).await); assert_eq!(200, send(addr, "{}").await); assert_eq!(200, send(addr, "[{},{},{}]").await);
},
rx,
2,
)
.await
})
.await;
assert!(events.remove(1).as_log().get_timestamp().is_some());
assert!(events.remove(0).as_log().get_timestamp().is_some());
}
#[tokio::test]
async fn http_json_values() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;
spawn_collect_n(
async move {
assert_eq!(200, send(addr, r#"[{"key":"value"}]"#).await);
assert_eq!(200, send(addr, r#"{"key2":"value2"}"#).await);
},
rx,
2,
)
.await
})
.await;
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key"], "value".into());
assert_event_metadata(log).await;
}
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key2"], "value2".into());
assert_event_metadata(log).await;
}
}
#[tokio::test]
async fn http_json_dotted_keys() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;
spawn_collect_n(
async move {
assert_eq!(200, send(addr, r#"[{"dotted.key":"value"}]"#).await);
assert_eq!(
200,
send(addr, r#"{"nested":{"dotted.key2":"value2"}}"#).await
);
},
rx,
2,
)
.await
})
.await;
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(
log.get(event_path!("dotted.key")).unwrap(),
&Value::from("value")
);
}
{
let event = events.remove(0);
let log = event.as_log();
let mut map = ObjectMap::new();
map.insert("dotted.key2".into(), Value::from("value2"));
assert_eq!(log["nested"], map.into());
}
}
#[tokio::test]
async fn http_ndjson() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;
spawn_collect_n(
async move {
assert_eq!(
200,
send(addr, r#"[{"key1":"value1"},{"key2":"value2"}]"#).await
);
assert_eq!(
200,
send(addr, "{\"key1\":\"value1\"}\n\n{\"key2\":\"value2\"}").await
);
},
rx,
4,
)
.await
})
.await;
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key1"], "value1".into());
assert_event_metadata(log).await;
}
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key2"], "value2".into());
assert_event_metadata(log).await;
}
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key1"], "value1".into());
assert_event_metadata(log).await;
}
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key2"], "value2".into());
assert_event_metadata(log).await;
}
}
async fn assert_event_metadata(log: &LogEvent) {
assert!(log.get_timestamp().is_some());
let source_type_key_value = log
.get((PathPrefix::Event, log_schema().source_type_key().unwrap()))
.unwrap()
.as_str()
.unwrap();
assert_eq!(source_type_key_value, SimpleHttpConfig::NAME);
assert_eq!(log["http_path"], "/".into());
}
#[tokio::test]
async fn http_headers() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let mut headers = HeaderMap::new();
headers.insert("User-Agent", "test_client".parse().unwrap());
headers.insert("Upgrade-Insecure-Requests", "false".parse().unwrap());
headers.insert("X-Test-Header", "true".parse().unwrap());
let (rx, addr) = source(
vec![
"User-Agent".to_string(),
"Upgrade-Insecure-Requests".to_string(),
"X-*".to_string(),
"AbsentHeader".to_string(),
],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;
spawn_ok_collect_n(
send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
rx,
1,
)
.await
})
.await;
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key1"], "value1".into());
assert_eq!(log["\"User-Agent\""], "test_client".into());
assert_eq!(log["\"Upgrade-Insecure-Requests\""], "false".into());
assert_eq!(log["\"x-test-header\""], "true".into());
assert_eq!(log["AbsentHeader"], Value::Null);
assert_event_metadata(log).await;
}
}
#[tokio::test]
async fn http_headers_wildcard() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let mut headers = HeaderMap::new();
headers.insert("User-Agent", "test_client".parse().unwrap());
headers.insert("X-Case-Sensitive-Value", "CaseSensitive".parse().unwrap());
headers.insert("key1", "value_from_header".parse().unwrap());
let (rx, addr) = source(
vec!["*".to_string()],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;
spawn_ok_collect_n(
send_with_headers(addr, "{\"key1\":\"value1\"}", headers),
rx,
1,
)
.await
})
.await;
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key1"], "value1".into());
assert_eq!(log["\"user-agent\""], "test_client".into());
assert_eq!(log["\"x-case-sensitive-value\""], "CaseSensitive".into());
assert_event_metadata(log).await;
}
}
#[tokio::test]
async fn http_query() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec![
"source".to_string(),
"region".to_string(),
"absent".to_string(),
],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;
spawn_ok_collect_n(
send_with_query(addr, "{\"key1\":\"value1\"}", "source=staging®ion=gb"),
rx,
1,
)
.await
})
.await;
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key1"], "value1".into());
assert_eq!(log["source"], "staging".into());
assert_eq!(log["region"], "gb".into());
assert_eq!(log["absent"], Value::Null);
assert_event_metadata(log).await;
}
}
#[tokio::test]
async fn http_query_wildcard() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec!["*".to_string()],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;
spawn_ok_collect_n(
send_with_query(
addr,
"{\"key1\":\"value1\",\"key2\":\"value2\"}",
"source=staging®ion=gb&key1=value_from_query",
),
rx,
1,
)
.await
})
.await;
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key1"], "value_from_query".into());
assert_eq!(log["key2"], "value2".into());
assert_eq!(log["source"], "staging".into());
assert_eq!(log["region"], "gb".into());
assert_event_metadata(log).await;
}
}
#[tokio::test]
async fn http_gzip_deflate() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let body = "test body";
let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
encoder.write_all(body.as_bytes()).unwrap();
let body = encoder.finish().unwrap();
let mut encoder = ZlibEncoder::new(Vec::new(), Compression::default());
encoder.write_all(body.as_slice()).unwrap();
let body = encoder.finish().unwrap();
let mut headers = HeaderMap::new();
headers.insert("Content-Encoding", "gzip, deflate".parse().unwrap());
let (rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
None,
)
.await;
spawn_ok_collect_n(send_bytes(addr, body, headers), rx, 1).await
})
.await;
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(*log.get_message().unwrap(), "test body".into());
assert_event_metadata(log).await;
}
}
#[tokio::test]
async fn http_path() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec![],
"vector_http_path",
"vector_remote_ip",
"/event/path",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;
spawn_ok_collect_n(
send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path"),
rx,
1,
)
.await
})
.await;
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key1"], "value1".into());
assert_eq!(log["vector_http_path"], "/event/path".into());
assert!(log.get_timestamp().is_some());
assert_eq!(
*log.get_source_type().unwrap(),
SimpleHttpConfig::NAME.into()
);
}
}
#[tokio::test]
async fn http_path_no_restriction() {
let mut events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec![],
"vector_http_path",
"vector_remote_ip",
"/event",
"POST",
StatusCode::OK,
false,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;
spawn_collect_n(
async move {
assert_eq!(
200,
send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path1").await
);
assert_eq!(
200,
send_with_path(addr, "{\"key2\":\"value2\"}", "/event/path2").await
);
},
rx,
2,
)
.await
})
.await;
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key1"], "value1".into());
assert_eq!(log["vector_http_path"], "/event/path1".into());
assert!(log.get_timestamp().is_some());
assert_eq!(
*log.get_source_type().unwrap(),
SimpleHttpConfig::NAME.into()
);
}
{
let event = events.remove(0);
let log = event.as_log();
assert_eq!(log["key2"], "value2".into());
assert_eq!(log["vector_http_path"], "/event/path2".into());
assert!(log.get_timestamp().is_some());
assert_eq!(
*log.get_source_type().unwrap(),
SimpleHttpConfig::NAME.into()
);
}
}
#[tokio::test]
async fn http_wrong_path() {
components::init_test();
let (_rx, addr) = source(
vec![],
vec![],
"vector_http_path",
"vector_remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
Some(JsonDeserializerConfig::default().into()),
)
.await;
assert_eq!(
404,
send_with_path(addr, "{\"key1\":\"value1\"}", "/event/path").await
);
}
#[tokio::test]
async fn http_status_code() {
assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async move {
let (rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::ACCEPTED,
true,
EventStatus::Delivered,
true,
None,
None,
)
.await;
spawn_collect_n(
async move {
assert_eq!(
StatusCode::ACCEPTED,
send(addr, "{\"key1\":\"value1\"}").await
);
},
rx,
1,
)
.await;
})
.await;
}
#[tokio::test]
async fn http_delivery_failure() {
assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Rejected,
true,
None,
None,
)
.await;
spawn_collect_n(
async move {
assert_eq!(400, send(addr, "test body\n").await);
},
rx,
1,
)
.await;
})
.await;
}
#[tokio::test]
async fn ignores_disabled_acknowledgements() {
let events = assert_source_compliance(&HTTP_PUSH_SOURCE_TAGS, async {
let (rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"POST",
StatusCode::OK,
true,
EventStatus::Rejected,
false,
None,
None,
)
.await;
spawn_collect_n(
async move {
assert_eq!(200, send(addr, "test body\n").await);
},
rx,
1,
)
.await
})
.await;
assert_eq!(events.len(), 1);
}
#[tokio::test]
async fn http_get_method() {
components::init_test();
let (_rx, addr) = source(
vec![],
vec![],
"http_path",
"remote_ip",
"/",
"GET",
StatusCode::OK,
true,
EventStatus::Delivered,
true,
None,
None,
)
.await;
assert_eq!(200, send_request(addr, "GET", "", "/").await);
}
#[test]
fn output_schema_definition_vector_namespace() {
let config = SimpleHttpConfig {
log_namespace: Some(true),
..Default::default()
};
let definitions = config
.outputs(LogNamespace::Vector)
.remove(0)
.schema_definition(true);
let expected_definition =
Definition::new_with_default_metadata(Kind::bytes(), [LogNamespace::Vector])
.with_meaning(OwnedTargetPath::event_root(), "message")
.with_metadata_field(
&owned_value_path!("vector", "source_type"),
Kind::bytes(),
None,
)
.with_metadata_field(
&owned_value_path!(SimpleHttpConfig::NAME, "path"),
Kind::bytes(),
None,
)
.with_metadata_field(
&owned_value_path!(SimpleHttpConfig::NAME, "headers"),
Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
None,
)
.with_metadata_field(
&owned_value_path!(SimpleHttpConfig::NAME, "query_parameters"),
Kind::object(Collection::empty().with_unknown(Kind::bytes())).or_undefined(),
None,
)
.with_metadata_field(
&owned_value_path!(SimpleHttpConfig::NAME, "host"),
Kind::bytes().or_undefined(),
None,
)
.with_metadata_field(
&owned_value_path!("vector", "ingest_timestamp"),
Kind::timestamp(),
None,
);
assert_eq!(definitions, Some(expected_definition))
}
#[test]
fn output_schema_definition_legacy_namespace() {
let config = SimpleHttpConfig::default();
let definitions = config
.outputs(LogNamespace::Legacy)
.remove(0)
.schema_definition(true);
let expected_definition = Definition::new_with_default_metadata(
Kind::object(Collection::empty()),
[LogNamespace::Legacy],
)
.with_event_field(
&owned_value_path!("message"),
Kind::bytes(),
Some("message"),
)
.with_event_field(&owned_value_path!("source_type"), Kind::bytes(), None)
.with_event_field(&owned_value_path!("timestamp"), Kind::timestamp(), None)
.with_event_field(&owned_value_path!("path"), Kind::bytes(), None)
.with_event_field(
&owned_value_path!("host"),
Kind::bytes().or_undefined(),
None,
)
.unknown_fields(Kind::bytes());
assert_eq!(definitions, Some(expected_definition))
}
#[test]
fn validate_remove_duplicates() {
let mut list = vec![
"a".to_owned(),
"b".to_owned(),
"c".to_owned(),
"d".to_owned(),
];
{
let list_dedup = remove_duplicates(list.clone(), "foo");
assert_eq!(list, list_dedup);
}
list.push("b".to_owned());
{
let list_dedup = remove_duplicates(list.clone(), "foo");
assert_eq!(
vec![
"a".to_owned(),
"b".to_owned(),
"c".to_owned(),
"d".to_owned()
],
list_dedup
);
}
}
impl ValidatableComponent for SimpleHttpConfig {
fn validation_configuration() -> ValidationConfiguration {
let config = Self {
decoding: Some(DeserializerConfig::Json(Default::default())),
..Default::default()
};
let log_namespace: LogNamespace = config.log_namespace.unwrap_or(false).into();
let listen_addr_http = format!("http://{}/", config.address);
let uri = Uri::try_from(&listen_addr_http).expect("should not fail to parse URI");
let external_resource = ExternalResource::new(
ResourceDirection::Push,
HttpResourceConfig::from_parts(uri, Some(config.method.into())),
config
.get_decoding_config()
.expect("should not fail to get decoding config"),
);
ValidationConfiguration::from_source(
Self::NAME,
log_namespace,
vec![ComponentTestCaseConfig::from_source(
config,
None,
Some(external_resource),
)],
)
}
}
register_validatable_component!(SimpleHttpConfig);
}