use bytes::Bytes;
use chrono::Utc;
use futures::StreamExt;
use snafu::{ResultExt, Snafu};
use tokio_util::codec::FramedRead;
use vector_lib::codecs::{
decoding::{DeserializerConfig, FramingConfig},
StreamDecodingError,
};
use vector_lib::configurable::configurable_component;
use vector_lib::internal_event::{
ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol, Registered,
};
use vector_lib::lookup::{lookup_v2::OptionalValuePath, owned_value_path, path, OwnedValuePath};
use vector_lib::{
config::{LegacyKey, LogNamespace},
EstimatedJsonEncodedSizeOf,
};
use vrl::value::Kind;
use crate::{
codecs::{Decoder, DecodingConfig},
config::{log_schema, GenerateConfig, SourceConfig, SourceContext, SourceOutput},
event::Event,
internal_events::{EventsReceived, StreamClosedError},
serde::{default_decoding, default_framing_message_based},
};
mod channel;
mod list;
#[derive(Debug, Snafu)]
enum BuildError {
#[snafu(display("Failed to build redis client: {}", source))]
Client { source: redis::RedisError },
}
#[configurable_component]
#[derive(Copy, Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(rename_all = "lowercase")]
pub enum DataTypeConfig {
#[derivative(Default)]
List,
Channel,
}
#[configurable_component]
#[derive(Copy, Clone, Debug, Default, Derivative, Eq, PartialEq)]
#[serde(deny_unknown_fields, rename_all = "lowercase")]
pub struct ListOption {
#[configurable(derived)]
method: Method,
}
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
#[derivative(Default)]
#[serde(rename_all = "lowercase")]
pub enum Method {
#[derivative(Default)]
Lpop,
Rpop,
}
pub struct ConnectionInfo {
protocol: &'static str,
endpoint: String,
}
impl From<&redis::ConnectionInfo> for ConnectionInfo {
fn from(redis_conn_info: &redis::ConnectionInfo) -> Self {
let (protocol, endpoint) = match &redis_conn_info.addr {
redis::ConnectionAddr::Tcp(host, port)
| redis::ConnectionAddr::TcpTls { host, port, .. } => {
("tcp", format!("{}:{}", host, port))
}
redis::ConnectionAddr::Unix(path) => ("uds", path.to_string_lossy().to_string()),
};
Self { protocol, endpoint }
}
}
#[configurable_component(source("redis", "Collect observability data from Redis."))]
#[derive(Clone, Debug, Derivative)]
#[serde(deny_unknown_fields)]
pub struct RedisSourceConfig {
#[serde(default)]
data_type: DataTypeConfig,
#[configurable(derived)]
list: Option<ListOption>,
#[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
url: String,
#[configurable(metadata(docs::examples = "vector"))]
key: String,
#[configurable(metadata(docs::examples = "redis_key"))]
redis_key: Option<OptionalValuePath>,
#[configurable(derived)]
#[serde(default = "default_framing_message_based")]
#[derivative(Default(value = "default_framing_message_based()"))]
framing: FramingConfig,
#[configurable(derived)]
#[serde(default = "default_decoding")]
#[derivative(Default(value = "default_decoding()"))]
decoding: DeserializerConfig,
#[configurable(metadata(docs::hidden))]
#[serde(default)]
log_namespace: Option<bool>,
}
impl GenerateConfig for RedisSourceConfig {
fn generate_config() -> toml::Value {
toml::from_str(
r#"
url = "redis://127.0.0.1:6379/0"
key = "vector"
data_type = "list"
list.method = "lpop"
redis_key = "redis_key"
"#,
)
.unwrap()
}
}
#[async_trait::async_trait]
#[typetag::serde(name = "redis")]
impl SourceConfig for RedisSourceConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<super::Source> {
let log_namespace = cx.log_namespace(self.log_namespace);
if self.key.is_empty() {
return Err("`key` cannot be empty.".into());
}
let redis_key = self.redis_key.clone().and_then(|k| k.path);
let client = redis::Client::open(self.url.as_str()).context(ClientSnafu {})?;
let connection_info = ConnectionInfo::from(client.get_connection_info());
let decoder =
DecodingConfig::new(self.framing.clone(), self.decoding.clone(), log_namespace)
.build()?;
let bytes_received = register!(BytesReceived::from(Protocol::from(
connection_info.protocol
)));
let events_received = register!(EventsReceived);
let handler = InputHandler {
client,
bytes_received: bytes_received.clone(),
events_received: events_received.clone(),
key: self.key.clone(),
redis_key,
decoder,
cx,
log_namespace,
};
match self.data_type {
DataTypeConfig::List => {
let method = self.list.unwrap_or_default().method;
handler.watch(method).await
}
DataTypeConfig::Channel => handler.subscribe(connection_info).await,
}
}
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
let log_namespace = global_log_namespace.merge(self.log_namespace);
let redis_key_path = self
.redis_key
.clone()
.and_then(|k| k.path)
.map(LegacyKey::InsertIfEmpty);
let schema_definition = self
.decoding
.schema_definition(log_namespace)
.with_source_metadata(
Self::NAME,
redis_key_path,
&owned_value_path!("key"),
Kind::bytes(),
None,
)
.with_standard_vector_source_metadata();
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
}
fn can_acknowledge(&self) -> bool {
false
}
}
struct InputHandler {
pub client: redis::Client,
pub bytes_received: Registered<BytesReceived>,
pub events_received: Registered<EventsReceived>,
pub key: String,
pub redis_key: Option<OwnedValuePath>,
pub decoder: Decoder,
pub log_namespace: LogNamespace,
pub cx: SourceContext,
}
impl InputHandler {
async fn handle_line(&mut self, line: String) -> Result<(), ()> {
let now = Utc::now();
self.bytes_received.emit(ByteSize(line.len()));
let mut stream = FramedRead::new(line.as_ref(), self.decoder.clone());
while let Some(next) = stream.next().await {
match next {
Ok((events, _byte_size)) => {
let count = events.len();
let byte_size = events.estimated_json_encoded_size_of();
self.events_received.emit(CountByteSize(count, byte_size));
let events = events.into_iter().map(|mut event| {
if let Event::Log(ref mut log) = event {
self.log_namespace.insert_vector_metadata(
log,
log_schema().source_type_key(),
path!("source_type"),
Bytes::from(RedisSourceConfig::NAME),
);
self.log_namespace.insert_vector_metadata(
log,
log_schema().timestamp_key(),
path!("ingest_timestamp"),
now,
);
self.log_namespace.insert_source_metadata(
RedisSourceConfig::NAME,
log,
self.redis_key.as_ref().map(LegacyKey::InsertIfEmpty),
path!("key"),
self.key.as_str(),
);
};
event
});
if (self.cx.out.send_batch(events).await).is_err() {
emit!(StreamClosedError { count });
return Err(());
}
}
Err(error) => {
if !error.can_continue() {
break;
}
}
}
}
Ok(())
}
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<RedisSourceConfig>();
}
}
#[cfg(all(test, feature = "redis-integration-tests"))]
mod integration_test {
use redis::AsyncCommands;
use super::*;
use crate::{
config::log_schema,
test_util::{
collect_n,
components::{run_and_assert_source_compliance_n, SOURCE_TAGS},
random_string,
},
SourceSender,
};
use vrl::value;
const REDIS_SERVER: &str = "redis://redis:6379/0";
#[tokio::test]
async fn redis_source_list_rpop() {
let client = redis::Client::open(REDIS_SERVER).unwrap();
let mut conn = client.get_connection_manager().await.unwrap();
let key = format!("test-key-{}", random_string(10));
debug!("Test key name: {}.", key);
let _: i32 = conn.rpush(&key, "1").await.unwrap();
let _: i32 = conn.rpush(&key, "2").await.unwrap();
let _: i32 = conn.rpush(&key, "3").await.unwrap();
let config = RedisSourceConfig {
data_type: DataTypeConfig::List,
list: Some(ListOption {
method: Method::Rpop,
}),
url: REDIS_SERVER.to_owned(),
key: key.clone(),
redis_key: None,
framing: default_framing_message_based(),
decoding: default_decoding(),
log_namespace: Some(false),
};
let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
assert_eq!(
events[0].as_log()[log_schema().message_key().unwrap().to_string()],
"3".into()
);
assert_eq!(
events[1].as_log()[log_schema().message_key().unwrap().to_string()],
"2".into()
);
assert_eq!(
events[2].as_log()[log_schema().message_key().unwrap().to_string()],
"1".into()
);
}
#[tokio::test]
async fn redis_source_list_rpop_with_log_namespace() {
let client = redis::Client::open(REDIS_SERVER).unwrap();
let mut conn = client.get_connection_manager().await.unwrap();
let key = format!("test-key-{}", random_string(10));
debug!("Test key name: {}.", key);
let _: i32 = conn.rpush(&key, "1").await.unwrap();
let config = RedisSourceConfig {
data_type: DataTypeConfig::List,
list: Some(ListOption {
method: Method::Rpop,
}),
url: REDIS_SERVER.to_owned(),
key: key.clone(),
redis_key: Some(OptionalValuePath::from(owned_value_path!("remapped_key"))),
framing: default_framing_message_based(),
decoding: default_decoding(),
log_namespace: Some(true),
};
let events = run_and_assert_source_compliance_n(config, 1, &SOURCE_TAGS).await;
let log_event = events[0].as_log();
let meta = log_event.metadata();
assert_eq!(log_event.value(), &"1".into());
assert_eq!(
meta.value()
.get(path!(RedisSourceConfig::NAME, "key"))
.unwrap(),
&value!(key)
);
}
#[tokio::test]
async fn redis_source_list_lpop() {
let client = redis::Client::open(REDIS_SERVER).unwrap();
let mut conn = client.get_connection_manager().await.unwrap();
let key = format!("test-key-{}", random_string(10));
debug!("Test key name: {}.", key);
let _: i32 = conn.rpush(&key, "1").await.unwrap();
let _: i32 = conn.rpush(&key, "2").await.unwrap();
let _: i32 = conn.rpush(&key, "3").await.unwrap();
let config = RedisSourceConfig {
data_type: DataTypeConfig::List,
list: Some(ListOption {
method: Method::Lpop,
}),
url: REDIS_SERVER.to_owned(),
key: key.clone(),
redis_key: None,
framing: default_framing_message_based(),
decoding: default_decoding(),
log_namespace: Some(false),
};
let events = run_and_assert_source_compliance_n(config, 3, &SOURCE_TAGS).await;
assert_eq!(
events[0].as_log()[log_schema().message_key().unwrap().to_string()],
"1".into()
);
assert_eq!(
events[1].as_log()[log_schema().message_key().unwrap().to_string()],
"2".into()
);
assert_eq!(
events[2].as_log()[log_schema().message_key().unwrap().to_string()],
"3".into()
);
}
#[tokio::test]
async fn redis_source_channel_consume_event() {
let key = format!("test-channel-{}", random_string(10));
let text = "test message for channel";
let config = RedisSourceConfig {
data_type: DataTypeConfig::Channel,
list: None,
url: REDIS_SERVER.to_owned(),
key: key.clone(),
redis_key: None,
framing: default_framing_message_based(),
decoding: default_decoding(),
log_namespace: Some(false),
};
let (tx, rx) = SourceSender::new_test();
let context = SourceContext::new_test(tx, None);
let source = config
.build(context)
.await
.expect("source should not fail to build");
tokio::spawn(source);
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
let client = redis::Client::open(REDIS_SERVER).unwrap();
let mut async_conn = client
.get_async_connection()
.await
.expect("Failed to get redis async connection.");
for _i in 0..10000 {
let _: i32 = async_conn.publish(key.clone(), text).await.unwrap();
}
let events = collect_n(rx, 10000).await;
assert_eq!(events.len(), 10000);
for event in events {
assert_eq!(
event.as_log()[log_schema().message_key().unwrap().to_string()],
text.into()
);
assert_eq!(
event.as_log()[log_schema().source_type_key().unwrap().to_string()],
RedisSourceConfig::NAME.into()
);
}
}
}