1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
use redis::{aio::ConnectionManager, RedisResult};
use snafu::prelude::*;

use crate::sinks::{prelude::*, util::service::TowerRequestConfigDefaults};

use super::{sink::RedisSink, RedisCreateFailedSnafu};

#[derive(Clone, Copy, Debug)]
pub struct RedisTowerRequestConfigDefaults;

impl TowerRequestConfigDefaults for RedisTowerRequestConfigDefaults {
    const CONCURRENCY: Concurrency = Concurrency::None;
}

/// Redis data type to store messages in.
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative)]
#[derivative(Default)]
#[serde(rename_all = "lowercase")]
pub enum DataTypeConfig {
    /// The Redis `list` type.
    ///
    /// This resembles a deque, where messages can be popped and pushed from either end.
    ///
    /// This is the default.
    #[derivative(Default)]
    List,

    /// The Redis `channel` type.
    ///
    /// Redis channels function in a pub/sub fashion, allowing many-to-many broadcasting and receiving.
    Channel,
}

/// List-specific options.
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
pub struct ListOption {
    /// The method to use for pushing messages into a `list`.
    pub(super) method: Method,
}

/// Method for pushing messages into a `list`.
#[configurable_component]
#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
#[derivative(Default)]
#[serde(rename_all = "lowercase")]
pub enum Method {
    /// Use the `rpush` method.
    ///
    /// This pushes messages onto the tail of the list.
    ///
    /// This is the default.
    #[derivative(Default)]
    RPush,

    /// Use the `lpush` method.
    ///
    /// This pushes messages onto the head of the list.
    LPush,
}

#[derive(Clone, Copy, Debug, Default)]
pub struct RedisDefaultBatchSettings;

impl SinkBatchSettings for RedisDefaultBatchSettings {
    const MAX_EVENTS: Option<usize> = Some(1);
    const MAX_BYTES: Option<usize> = None;
    const TIMEOUT_SECS: f64 = 1.0;
}

/// Configuration for the `redis` sink.
#[configurable_component(sink("redis", "Publish observability data to Redis."))]
#[derive(Clone, Debug)]
#[serde(deny_unknown_fields)]
pub struct RedisSinkConfig {
    #[configurable(derived)]
    pub(super) encoding: EncodingConfig,

    #[configurable(derived)]
    #[serde(default)]
    pub(super) data_type: DataTypeConfig,

    #[configurable(derived)]
    #[serde(alias = "list")]
    pub(super) list_option: Option<ListOption>,

    /// The URL of the Redis endpoint to connect to.
    ///
    /// The URL _must_ take the form of `protocol://server:port/db` where the protocol can either be
    /// `redis` or `rediss` for connections secured via TLS.
    #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
    #[serde(alias = "url")]
    pub(super) endpoint: String,

    /// The Redis key to publish messages to.
    #[configurable(validation(length(min = 1)))]
    #[configurable(metadata(docs::examples = "syslog:{{ app }}", docs::examples = "vector"))]
    pub(super) key: Template,

    #[configurable(derived)]
    #[serde(default)]
    pub(super) batch: BatchConfig<RedisDefaultBatchSettings>,

    #[configurable(derived)]
    #[serde(default)]
    pub(super) request: TowerRequestConfig<RedisTowerRequestConfigDefaults>,

    #[configurable(derived)]
    #[serde(
        default,
        deserialize_with = "crate::serde::bool_or_struct",
        skip_serializing_if = "crate::serde::is_default"
    )]
    pub(super) acknowledgements: AcknowledgementsConfig,
}

impl GenerateConfig for RedisSinkConfig {
    fn generate_config() -> toml::Value {
        toml::from_str(
            r#"
            url = "redis://127.0.0.1:6379/0"
            key = "vector"
            data_type = "list"
            list.method = "lpush"
            encoding.codec = "json"
            batch.max_events = 1
            "#,
        )
        .unwrap()
    }
}

#[async_trait::async_trait]
#[typetag::serde(name = "redis")]
impl SinkConfig for RedisSinkConfig {
    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
        if self.key.is_empty() {
            return Err("`key` cannot be empty.".into());
        }
        let conn = self.build_client().await.context(RedisCreateFailedSnafu)?;
        let healthcheck = RedisSinkConfig::healthcheck(conn.clone()).boxed();
        let sink = RedisSink::new(self, conn)?;
        Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
    }

    fn input(&self) -> Input {
        Input::new(self.encoding.config().input_type() & DataType::Log)
    }

    fn acknowledgements(&self) -> &AcknowledgementsConfig {
        &self.acknowledgements
    }
}

impl RedisSinkConfig {
    pub(super) async fn build_client(&self) -> RedisResult<ConnectionManager> {
        let client = redis::Client::open(self.endpoint.as_str())?;
        client.get_connection_manager().await
    }

    async fn healthcheck(mut conn: ConnectionManager) -> crate::Result<()> {
        redis::cmd("PING")
            .query_async(&mut conn)
            .await
            .map_err(Into::into)
    }
}