vector/sinks/redis/
config.rs

1use redis::{
2    ProtocolVersion, RedisConnectionInfo, TlsMode,
3    sentinel::{Sentinel, SentinelNodeConnectionInfo},
4};
5use snafu::prelude::*;
6
7use super::{
8    RedisCreateFailedSnafu,
9    sink::{RedisConnection, RedisSink},
10};
11use crate::{
12    serde::OneOrMany,
13    sinks::{prelude::*, util::service::TowerRequestConfigDefaults},
14};
15
16#[derive(Clone, Copy, Debug)]
17pub struct RedisTowerRequestConfigDefaults;
18
19impl TowerRequestConfigDefaults for RedisTowerRequestConfigDefaults {
20    const CONCURRENCY: Concurrency = Concurrency::None;
21}
22
23/// Redis data type to store messages in.
24#[configurable_component]
25#[derive(Clone, Copy, Debug, Default)]
26#[serde(rename_all = "lowercase")]
27pub enum DataTypeConfig {
28    /// The Redis `list` type.
29    ///
30    /// This resembles a deque, where messages can be popped and pushed from either end.
31    ///
32    /// This is the default.
33    #[default]
34    List,
35
36    /// The Redis `sorted set` type.
37    ///
38    /// This resembles a priority queue, where messages can be pushed and popped with an
39    /// associated score.
40    SortedSet,
41
42    /// The Redis `channel` type.
43    ///
44    /// Redis channels function in a pub/sub fashion, allowing many-to-many broadcasting and receiving.
45    Channel,
46}
47
48/// List-specific options.
49#[configurable_component]
50#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
51#[serde(rename_all = "lowercase")]
52pub struct ListOption {
53    /// The method to use for pushing messages into a `list`.
54    pub method: ListMethod,
55}
56
57/// Method for pushing messages into a `list`.
58#[configurable_component]
59#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
60#[serde(rename_all = "lowercase")]
61pub enum ListMethod {
62    /// Use the `rpush` method.
63    ///
64    /// This pushes messages onto the tail of the list.
65    ///
66    /// This is the default.
67    #[default]
68    RPush,
69
70    /// Use the `lpush` method.
71    ///
72    /// This pushes messages onto the head of the list.
73    LPush,
74}
75
76/// Sorted Set-specific options
77#[configurable_component]
78#[derive(Clone, Debug, Default, Eq, PartialEq)]
79#[serde(rename_all = "lowercase")]
80pub struct SortedSetOption {
81    /// The method to use for pushing messages into a `sorted set`.
82    pub method: Option<SortedSetMethod>,
83
84    /// The score to publish a message with to a `sorted set`.
85    ///
86    /// Examples:
87    /// - `%s`
88    /// - `%Y%m%d%H%M%S`
89    // Examples added in Rustdoc as `vector-config`'s metadata doesn't handle
90    // UnsignedIntTemplate's properly yet. TODO: Improve this.
91    #[configurable(validation(length(min = 1)))]
92    pub score: Option<UnsignedIntTemplate>,
93}
94
95/// Method for pushing messages into a `sorted set`.
96#[configurable_component]
97#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
98#[serde(rename_all = "lowercase")]
99pub enum SortedSetMethod {
100    /// Use the `zadd` method.
101    ///
102    /// This adds messages onto a queue with a score.
103    ///
104    /// This is the default.
105    #[default]
106    ZAdd,
107}
108
109#[derive(Clone, Copy, Debug, Default)]
110pub struct RedisDefaultBatchSettings;
111
112impl SinkBatchSettings for RedisDefaultBatchSettings {
113    const MAX_EVENTS: Option<usize> = Some(1);
114    const MAX_BYTES: Option<usize> = None;
115    const TIMEOUT_SECS: f64 = 1.0;
116}
117
118/// Configuration for the `redis` sink.
119#[configurable_component(sink("redis", "Publish observability data to Redis."))]
120#[derive(Clone, Debug)]
121#[serde(deny_unknown_fields)]
122pub struct RedisSinkConfig {
123    #[configurable(derived)]
124    pub(super) encoding: EncodingConfig,
125
126    #[configurable(derived)]
127    #[serde(default)]
128    pub(super) data_type: DataTypeConfig,
129
130    #[configurable(derived)]
131    #[serde(alias = "list")]
132    pub(super) list_option: Option<ListOption>,
133
134    #[configurable(derived)]
135    #[serde(alias = "sorted_set")]
136    pub(super) sorted_set_option: Option<SortedSetOption>,
137
138    /// The URL of the Redis endpoint to connect to.
139    ///
140    /// The URL _must_ take the form of `protocol://server:port/db` where the protocol can either be
141    /// `redis` or `rediss` for connections secured via TLS.
142    #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
143    #[serde(alias = "url")]
144    pub(super) endpoint: OneOrMany<String>,
145
146    /// The service name to use for sentinel.
147    ///
148    /// If this is specified, `endpoint` will be used to reach sentinel instances instead of a
149    /// redis instance.
150    #[configurable]
151    pub(super) sentinel_service: Option<String>,
152
153    #[configurable(derived)]
154    #[serde(default)]
155    pub(super) sentinel_connect: Option<SentinelConnectionSettings>,
156
157    /// The Redis key to publish messages to.
158    #[configurable(validation(length(min = 1)))]
159    #[configurable(metadata(docs::examples = "syslog:{{ app }}", docs::examples = "vector"))]
160    pub(super) key: Template,
161
162    #[configurable(derived)]
163    #[serde(default)]
164    pub(super) batch: BatchConfig<RedisDefaultBatchSettings>,
165
166    #[configurable(derived)]
167    #[serde(default)]
168    pub(super) request: TowerRequestConfig<RedisTowerRequestConfigDefaults>,
169
170    #[configurable(derived)]
171    #[serde(
172        default,
173        deserialize_with = "crate::serde::bool_or_struct",
174        skip_serializing_if = "crate::serde::is_default"
175    )]
176    pub(super) acknowledgements: AcknowledgementsConfig,
177}
178
179impl GenerateConfig for RedisSinkConfig {
180    fn generate_config() -> toml::Value {
181        toml::from_str(
182            r#"
183            url = "redis://127.0.0.1:6379/0"
184            key = "vector"
185            data_type = "list"
186            list.method = "lpush"
187            encoding.codec = "json"
188            batch.max_events = 1
189            "#,
190        )
191        .unwrap()
192    }
193}
194
195#[async_trait::async_trait]
196#[typetag::serde(name = "redis")]
197impl SinkConfig for RedisSinkConfig {
198    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
199        if self.key.is_empty() {
200            return Err("`key` cannot be empty.".into());
201        }
202        let conn = self.build_connection().await?;
203        let healthcheck = RedisSinkConfig::healthcheck(conn.clone()).boxed();
204        let sink = RedisSink::new(self, conn)?;
205        Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
206    }
207
208    fn input(&self) -> Input {
209        Input::new(self.encoding.config().input_type())
210    }
211
212    fn acknowledgements(&self) -> &AcknowledgementsConfig {
213        &self.acknowledgements
214    }
215}
216
217impl RedisSinkConfig {
218    pub(super) async fn build_connection(&self) -> crate::Result<RedisConnection> {
219        let endpoints = self.endpoint.clone().to_vec();
220
221        if endpoints.is_empty() {
222            return Err("`endpoint` cannot be empty.".into());
223        }
224
225        if let Some(sentinel_service) = &self.sentinel_service {
226            let sentinel = Sentinel::build(endpoints).context(RedisCreateFailedSnafu)?;
227
228            Ok(RedisConnection::new_sentinel(
229                sentinel,
230                sentinel_service.clone(),
231                self.sentinel_connect.clone().unwrap_or_default().into(),
232            )
233            .await
234            .context(RedisCreateFailedSnafu)?)
235        } else {
236            // SAFETY: endpoints cannot be empty (checked above)
237            let client =
238                redis::Client::open(endpoints[0].as_str()).context(RedisCreateFailedSnafu)?;
239            let conn = client
240                .get_connection_manager()
241                .await
242                .context(RedisCreateFailedSnafu)?;
243
244            Ok(RedisConnection::new_direct(conn))
245        }
246    }
247
248    async fn healthcheck(mut conn: RedisConnection) -> crate::Result<()> {
249        redis::cmd("PING")
250            .query_async(&mut conn.get_connection_manager().await?.connection)
251            .await
252            .map_err(Into::into)
253    }
254}
255
256/// Controls how Redis Sentinel will connect to the servers belonging to it.
257#[configurable_component]
258#[derive(Clone, Debug, Default)]
259#[serde(deny_unknown_fields)]
260pub struct SentinelConnectionSettings {
261    #[configurable(derived)]
262    #[serde(default)]
263    pub tls: MaybeTlsMode,
264
265    #[configurable(derived)]
266    #[serde(default)]
267    pub connections: Option<RedisConnectionSettings>,
268}
269
270impl From<SentinelConnectionSettings> for SentinelNodeConnectionInfo {
271    fn from(value: SentinelConnectionSettings) -> Self {
272        SentinelNodeConnectionInfo {
273            tls_mode: value.tls.into(),
274            redis_connection_info: value.connections.map(Into::into),
275        }
276    }
277}
278
279/// How/if TLS should be established.
280#[configurable_component]
281#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
282#[serde(rename_all = "lowercase")]
283pub enum MaybeTlsMode {
284    /// Don't use TLS.
285    ///
286    /// This is the default.
287    #[default]
288    None,
289
290    /// Enable TLS with certificate verification.
291    Secure,
292
293    /// Enable TLS without certificate verification.
294    Insecure,
295}
296
297impl From<MaybeTlsMode> for Option<TlsMode> {
298    fn from(value: MaybeTlsMode) -> Self {
299        match value {
300            MaybeTlsMode::None => None,
301            MaybeTlsMode::Secure => Some(TlsMode::Secure),
302            MaybeTlsMode::Insecure => Some(TlsMode::Insecure),
303        }
304    }
305}
306
307/// Connection independent information used to establish a connection
308/// to a redis instance sentinel owns.
309#[configurable_component]
310#[derive(Clone, Debug, Default)]
311pub struct RedisConnectionSettings {
312    /// The database number to use. Usually `0`.
313    pub db: i64,
314
315    /// Optionally, the username to connection with.
316    pub username: Option<String>,
317
318    /// Optionally, the password to connection with.
319    pub password: Option<String>,
320
321    /// The version of RESP to use.
322    pub protocol: RedisProtocolVersion,
323}
324
325impl From<RedisConnectionSettings> for RedisConnectionInfo {
326    fn from(value: RedisConnectionSettings) -> Self {
327        RedisConnectionInfo {
328            db: value.db,
329            username: value.username,
330            password: value.password,
331            protocol: value.protocol.into(),
332        }
333    }
334}
335
336/// The communication protocol to use with the redis server.
337#[configurable_component]
338#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
339pub enum RedisProtocolVersion {
340    /// Use RESP2.
341    ///
342    /// This is the default.
343    #[default]
344    RESP2,
345
346    /// Use RESP3.
347    RESP3,
348}
349
350impl From<RedisProtocolVersion> for ProtocolVersion {
351    fn from(value: RedisProtocolVersion) -> Self {
352        match value {
353            RedisProtocolVersion::RESP2 => ProtocolVersion::RESP2,
354            RedisProtocolVersion::RESP3 => ProtocolVersion::RESP3,
355        }
356    }
357}