vector/sinks/redis/
config.rs

1use redis::{
2    ProtocolVersion, RedisConnectionInfo, TlsMode,
3    sentinel::{Sentinel, SentinelNodeConnectionInfo},
4};
5use snafu::prelude::*;
6
7use super::{
8    RedisCreateFailedSnafu,
9    sink::{RedisConnection, RedisSink},
10};
11use crate::{
12    serde::OneOrMany,
13    sinks::{prelude::*, util::service::TowerRequestConfigDefaults},
14};
15
16#[derive(Clone, Copy, Debug)]
17pub struct RedisTowerRequestConfigDefaults;
18
19impl TowerRequestConfigDefaults for RedisTowerRequestConfigDefaults {
20    const CONCURRENCY: Concurrency = Concurrency::None;
21}
22
23/// Redis data type to store messages in.
24#[configurable_component]
25#[derive(Clone, Copy, Debug, Derivative)]
26#[derivative(Default)]
27#[serde(rename_all = "lowercase")]
28pub enum DataTypeConfig {
29    /// The Redis `list` type.
30    ///
31    /// This resembles a deque, where messages can be popped and pushed from either end.
32    ///
33    /// This is the default.
34    #[derivative(Default)]
35    List,
36
37    /// The Redis `sorted set` type.
38    ///
39    /// This resembles a priority queue, where messages can be pushed and popped with an
40    /// associated score.
41    SortedSet,
42
43    /// The Redis `channel` type.
44    ///
45    /// Redis channels function in a pub/sub fashion, allowing many-to-many broadcasting and receiving.
46    Channel,
47}
48
49/// List-specific options.
50#[configurable_component]
51#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
52#[serde(rename_all = "lowercase")]
53pub struct ListOption {
54    /// The method to use for pushing messages into a `list`.
55    pub method: ListMethod,
56}
57
58/// Method for pushing messages into a `list`.
59#[configurable_component]
60#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
61#[derivative(Default)]
62#[serde(rename_all = "lowercase")]
63pub enum ListMethod {
64    /// Use the `rpush` method.
65    ///
66    /// This pushes messages onto the tail of the list.
67    ///
68    /// This is the default.
69    #[derivative(Default)]
70    RPush,
71
72    /// Use the `lpush` method.
73    ///
74    /// This pushes messages onto the head of the list.
75    LPush,
76}
77
78/// Sorted Set-specific options
79#[configurable_component]
80#[derive(Clone, Debug, Derivative, Eq, PartialEq)]
81#[serde(rename_all = "lowercase")]
82pub struct SortedSetOption {
83    /// The method to use for pushing messages into a `sorted set`.
84    pub method: Option<SortedSetMethod>,
85
86    /// The score to publish a message with to a `sorted set`.
87    ///
88    /// Examples:
89    /// - `%s`
90    /// - `%Y%m%d%H%M%S`
91    // Examples added in Rustdoc as `vector-config`'s metadata doesn't handle
92    // UnsignedIntTemplate's properly yet. TODO: Improve this.
93    #[configurable(validation(length(min = 1)))]
94    pub score: Option<UnsignedIntTemplate>,
95}
96
97/// Method for pushing messages into a `sorted set`.
98#[configurable_component]
99#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
100#[derivative(Default)]
101#[serde(rename_all = "lowercase")]
102pub enum SortedSetMethod {
103    /// Use the `zadd` method.
104    ///
105    /// This adds messages onto a queue with a score.
106    ///
107    /// This is the default.
108    #[derivative(Default)]
109    ZAdd,
110}
111
112#[derive(Clone, Copy, Debug, Default)]
113pub struct RedisDefaultBatchSettings;
114
115impl SinkBatchSettings for RedisDefaultBatchSettings {
116    const MAX_EVENTS: Option<usize> = Some(1);
117    const MAX_BYTES: Option<usize> = None;
118    const TIMEOUT_SECS: f64 = 1.0;
119}
120
121/// Configuration for the `redis` sink.
122#[configurable_component(sink("redis", "Publish observability data to Redis."))]
123#[derive(Clone, Debug)]
124#[serde(deny_unknown_fields)]
125pub struct RedisSinkConfig {
126    #[configurable(derived)]
127    pub(super) encoding: EncodingConfig,
128
129    #[configurable(derived)]
130    #[serde(default)]
131    pub(super) data_type: DataTypeConfig,
132
133    #[configurable(derived)]
134    #[serde(alias = "list")]
135    pub(super) list_option: Option<ListOption>,
136
137    #[configurable(derived)]
138    #[serde(alias = "sorted_set")]
139    pub(super) sorted_set_option: Option<SortedSetOption>,
140
141    /// The URL of the Redis endpoint to connect to.
142    ///
143    /// The URL _must_ take the form of `protocol://server:port/db` where the protocol can either be
144    /// `redis` or `rediss` for connections secured via TLS.
145    #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
146    #[serde(alias = "url")]
147    pub(super) endpoint: OneOrMany<String>,
148
149    /// The service name to use for sentinel.
150    ///
151    /// If this is specified, `endpoint` will be used to reach sentinel instances instead of a
152    /// redis instance.
153    #[configurable]
154    pub(super) sentinel_service: Option<String>,
155
156    #[configurable(derived)]
157    #[serde(default)]
158    pub(super) sentinel_connect: Option<SentinelConnectionSettings>,
159
160    /// The Redis key to publish messages to.
161    #[configurable(validation(length(min = 1)))]
162    #[configurable(metadata(docs::examples = "syslog:{{ app }}", docs::examples = "vector"))]
163    pub(super) key: Template,
164
165    #[configurable(derived)]
166    #[serde(default)]
167    pub(super) batch: BatchConfig<RedisDefaultBatchSettings>,
168
169    #[configurable(derived)]
170    #[serde(default)]
171    pub(super) request: TowerRequestConfig<RedisTowerRequestConfigDefaults>,
172
173    #[configurable(derived)]
174    #[serde(
175        default,
176        deserialize_with = "crate::serde::bool_or_struct",
177        skip_serializing_if = "crate::serde::is_default"
178    )]
179    pub(super) acknowledgements: AcknowledgementsConfig,
180}
181
182impl GenerateConfig for RedisSinkConfig {
183    fn generate_config() -> toml::Value {
184        toml::from_str(
185            r#"
186            url = "redis://127.0.0.1:6379/0"
187            key = "vector"
188            data_type = "list"
189            list.method = "lpush"
190            encoding.codec = "json"
191            batch.max_events = 1
192            "#,
193        )
194        .unwrap()
195    }
196}
197
198#[async_trait::async_trait]
199#[typetag::serde(name = "redis")]
200impl SinkConfig for RedisSinkConfig {
201    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
202        if self.key.is_empty() {
203            return Err("`key` cannot be empty.".into());
204        }
205        let conn = self.build_connection().await?;
206        let healthcheck = RedisSinkConfig::healthcheck(conn.clone()).boxed();
207        let sink = RedisSink::new(self, conn)?;
208        Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
209    }
210
211    fn input(&self) -> Input {
212        Input::new(self.encoding.config().input_type())
213    }
214
215    fn acknowledgements(&self) -> &AcknowledgementsConfig {
216        &self.acknowledgements
217    }
218}
219
220impl RedisSinkConfig {
221    pub(super) async fn build_connection(&self) -> crate::Result<RedisConnection> {
222        let endpoints = self.endpoint.clone().to_vec();
223
224        if endpoints.is_empty() {
225            return Err("`endpoint` cannot be empty.".into());
226        }
227
228        if let Some(sentinel_service) = &self.sentinel_service {
229            let sentinel = Sentinel::build(endpoints).context(RedisCreateFailedSnafu)?;
230
231            Ok(RedisConnection::new_sentinel(
232                sentinel,
233                sentinel_service.clone(),
234                self.sentinel_connect.clone().unwrap_or_default().into(),
235            )
236            .await
237            .context(RedisCreateFailedSnafu)?)
238        } else {
239            // SAFETY: endpoints cannot be empty (checked above)
240            let client =
241                redis::Client::open(endpoints[0].as_str()).context(RedisCreateFailedSnafu)?;
242            let conn = client
243                .get_connection_manager()
244                .await
245                .context(RedisCreateFailedSnafu)?;
246
247            Ok(RedisConnection::new_direct(conn))
248        }
249    }
250
251    async fn healthcheck(mut conn: RedisConnection) -> crate::Result<()> {
252        redis::cmd("PING")
253            .query_async(&mut conn.get_connection_manager().await?.connection)
254            .await
255            .map_err(Into::into)
256    }
257}
258
259/// Controls how Redis Sentinel will connect to the servers belonging to it.
260#[configurable_component]
261#[derive(Clone, Debug, Default)]
262#[serde(deny_unknown_fields)]
263pub struct SentinelConnectionSettings {
264    #[configurable(derived)]
265    #[serde(default)]
266    pub tls: MaybeTlsMode,
267
268    #[configurable(derived)]
269    #[serde(default)]
270    pub connections: Option<RedisConnectionSettings>,
271}
272
273impl From<SentinelConnectionSettings> for SentinelNodeConnectionInfo {
274    fn from(value: SentinelConnectionSettings) -> Self {
275        SentinelNodeConnectionInfo {
276            tls_mode: value.tls.into(),
277            redis_connection_info: value.connections.map(Into::into),
278        }
279    }
280}
281
282/// How/if TLS should be established.
283#[configurable_component]
284#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
285#[derivative(Default)]
286#[serde(rename_all = "lowercase")]
287pub enum MaybeTlsMode {
288    /// Don't use TLS.
289    ///
290    /// This is the default.
291    #[derivative(Default)]
292    None,
293
294    /// Enable TLS with certificate verification.
295    Secure,
296
297    /// Enable TLS without certificate verification.
298    Insecure,
299}
300
301impl From<MaybeTlsMode> for Option<TlsMode> {
302    fn from(value: MaybeTlsMode) -> Self {
303        match value {
304            MaybeTlsMode::None => None,
305            MaybeTlsMode::Secure => Some(TlsMode::Secure),
306            MaybeTlsMode::Insecure => Some(TlsMode::Insecure),
307        }
308    }
309}
310
311/// Connection independent information used to establish a connection
312/// to a redis instance sentinel owns.
313#[configurable_component]
314#[derive(Clone, Debug, Derivative)]
315#[derivative(Default)]
316pub struct RedisConnectionSettings {
317    /// The database number to use. Usually `0`.
318    pub db: i64,
319
320    /// Optionally, the username to connection with.
321    pub username: Option<String>,
322
323    /// Optionally, the password to connection with.
324    pub password: Option<String>,
325
326    /// The version of RESP to use.
327    pub protocol: RedisProtocolVersion,
328}
329
330impl From<RedisConnectionSettings> for RedisConnectionInfo {
331    fn from(value: RedisConnectionSettings) -> Self {
332        RedisConnectionInfo {
333            db: value.db,
334            username: value.username,
335            password: value.password,
336            protocol: value.protocol.into(),
337        }
338    }
339}
340
341/// The communication protocol to use with the redis server.
342#[configurable_component]
343#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
344#[derivative(Default)]
345pub enum RedisProtocolVersion {
346    /// Use RESP2.
347    ///
348    /// This is the default.
349    #[derivative(Default)]
350    RESP2,
351
352    /// Use RESP3.
353    RESP3,
354}
355
356impl From<RedisProtocolVersion> for ProtocolVersion {
357    fn from(value: RedisProtocolVersion) -> Self {
358        match value {
359            RedisProtocolVersion::RESP2 => ProtocolVersion::RESP2,
360            RedisProtocolVersion::RESP3 => ProtocolVersion::RESP3,
361        }
362    }
363}