vector/sinks/redis/
config.rs

1use redis::{
2    sentinel::{Sentinel, SentinelNodeConnectionInfo},
3    ProtocolVersion, RedisConnectionInfo, TlsMode,
4};
5use snafu::prelude::*;
6
7use crate::{
8    serde::OneOrMany,
9    sinks::{prelude::*, util::service::TowerRequestConfigDefaults},
10};
11
12use super::{
13    sink::{RedisConnection, RedisSink},
14    RedisCreateFailedSnafu,
15};
16
17#[derive(Clone, Copy, Debug)]
18pub struct RedisTowerRequestConfigDefaults;
19
20impl TowerRequestConfigDefaults for RedisTowerRequestConfigDefaults {
21    const CONCURRENCY: Concurrency = Concurrency::None;
22}
23
24/// Redis data type to store messages in.
25#[configurable_component]
26#[derive(Clone, Copy, Debug, Derivative)]
27#[derivative(Default)]
28#[serde(rename_all = "lowercase")]
29pub enum DataTypeConfig {
30    /// The Redis `list` type.
31    ///
32    /// This resembles a deque, where messages can be popped and pushed from either end.
33    ///
34    /// This is the default.
35    #[derivative(Default)]
36    List,
37
38    /// The Redis `sorted set` type.
39    ///
40    /// This resembles a priority queue, where messages can be pushed and popped with an
41    /// associated score.
42    SortedSet,
43
44    /// The Redis `channel` type.
45    ///
46    /// Redis channels function in a pub/sub fashion, allowing many-to-many broadcasting and receiving.
47    Channel,
48}
49
50/// List-specific options.
51#[configurable_component]
52#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
53#[serde(rename_all = "lowercase")]
54pub struct ListOption {
55    /// The method to use for pushing messages into a `list`.
56    pub method: ListMethod,
57}
58
59/// Method for pushing messages into a `list`.
60#[configurable_component]
61#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
62#[derivative(Default)]
63#[serde(rename_all = "lowercase")]
64pub enum ListMethod {
65    /// Use the `rpush` method.
66    ///
67    /// This pushes messages onto the tail of the list.
68    ///
69    /// This is the default.
70    #[derivative(Default)]
71    RPush,
72
73    /// Use the `lpush` method.
74    ///
75    /// This pushes messages onto the head of the list.
76    LPush,
77}
78
79/// Sorted Set-specific options
80#[configurable_component]
81#[derive(Clone, Debug, Derivative, Eq, PartialEq)]
82#[serde(rename_all = "lowercase")]
83pub struct SortedSetOption {
84    /// The method to use for pushing messages into a `sorted set`.
85    pub method: Option<SortedSetMethod>,
86
87    /// The score to publish a message with to a `sorted set`.
88    ///
89    /// Examples:
90    /// - `%s`
91    /// - `%Y%m%d%H%M%S`
92    // Examples added in Rustdoc as `vector-config`'s metadata doesn't handle
93    // UnsignedIntTemplate's properly yet. TODO: Improve this.
94    #[configurable(validation(length(min = 1)))]
95    pub score: Option<UnsignedIntTemplate>,
96}
97
98/// Method for pushing messages into a `sorted set`.
99#[configurable_component]
100#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
101#[derivative(Default)]
102#[serde(rename_all = "lowercase")]
103pub enum SortedSetMethod {
104    /// Use the `zadd` method.
105    ///
106    /// This adds messages onto a queue with a score.
107    ///
108    /// This is the default.
109    #[derivative(Default)]
110    ZAdd,
111}
112
113#[derive(Clone, Copy, Debug, Default)]
114pub struct RedisDefaultBatchSettings;
115
116impl SinkBatchSettings for RedisDefaultBatchSettings {
117    const MAX_EVENTS: Option<usize> = Some(1);
118    const MAX_BYTES: Option<usize> = None;
119    const TIMEOUT_SECS: f64 = 1.0;
120}
121
122/// Configuration for the `redis` sink.
123#[configurable_component(sink("redis", "Publish observability data to Redis."))]
124#[derive(Clone, Debug)]
125#[serde(deny_unknown_fields)]
126pub struct RedisSinkConfig {
127    #[configurable(derived)]
128    pub(super) encoding: EncodingConfig,
129
130    #[configurable(derived)]
131    #[serde(default)]
132    pub(super) data_type: DataTypeConfig,
133
134    #[configurable(derived)]
135    #[serde(alias = "list")]
136    pub(super) list_option: Option<ListOption>,
137
138    #[configurable(derived)]
139    #[serde(alias = "sorted_set")]
140    pub(super) sorted_set_option: Option<SortedSetOption>,
141
142    /// The URL of the Redis endpoint to connect to.
143    ///
144    /// The URL _must_ take the form of `protocol://server:port/db` where the protocol can either be
145    /// `redis` or `rediss` for connections secured via TLS.
146    #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
147    #[serde(alias = "url")]
148    pub(super) endpoint: OneOrMany<String>,
149
150    /// The service name to use for sentinel.
151    ///
152    /// If this is specified, `endpoint` will be used to reach sentinel instances instead of a
153    /// redis instance.
154    #[configurable]
155    pub(super) sentinel_service: Option<String>,
156
157    #[configurable(derived)]
158    #[serde(default)]
159    pub(super) sentinel_connect: Option<SentinelConnectionSettings>,
160
161    /// The Redis key to publish messages to.
162    #[configurable(validation(length(min = 1)))]
163    #[configurable(metadata(docs::examples = "syslog:{{ app }}", docs::examples = "vector"))]
164    pub(super) key: Template,
165
166    #[configurable(derived)]
167    #[serde(default)]
168    pub(super) batch: BatchConfig<RedisDefaultBatchSettings>,
169
170    #[configurable(derived)]
171    #[serde(default)]
172    pub(super) request: TowerRequestConfig<RedisTowerRequestConfigDefaults>,
173
174    #[configurable(derived)]
175    #[serde(
176        default,
177        deserialize_with = "crate::serde::bool_or_struct",
178        skip_serializing_if = "crate::serde::is_default"
179    )]
180    pub(super) acknowledgements: AcknowledgementsConfig,
181}
182
183impl GenerateConfig for RedisSinkConfig {
184    fn generate_config() -> toml::Value {
185        toml::from_str(
186            r#"
187            url = "redis://127.0.0.1:6379/0"
188            key = "vector"
189            data_type = "list"
190            list.method = "lpush"
191            encoding.codec = "json"
192            batch.max_events = 1
193            "#,
194        )
195        .unwrap()
196    }
197}
198
199#[async_trait::async_trait]
200#[typetag::serde(name = "redis")]
201impl SinkConfig for RedisSinkConfig {
202    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
203        if self.key.is_empty() {
204            return Err("`key` cannot be empty.".into());
205        }
206        let conn = self.build_connection().await?;
207        let healthcheck = RedisSinkConfig::healthcheck(conn.clone()).boxed();
208        let sink = RedisSink::new(self, conn)?;
209        Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
210    }
211
212    fn input(&self) -> Input {
213        Input::new(self.encoding.config().input_type())
214    }
215
216    fn acknowledgements(&self) -> &AcknowledgementsConfig {
217        &self.acknowledgements
218    }
219}
220
221impl RedisSinkConfig {
222    pub(super) async fn build_connection(&self) -> crate::Result<RedisConnection> {
223        let endpoints = self.endpoint.clone().to_vec();
224
225        if endpoints.is_empty() {
226            return Err("`endpoint` cannot be empty.".into());
227        }
228
229        if let Some(sentinel_service) = &self.sentinel_service {
230            let sentinel = Sentinel::build(endpoints).context(RedisCreateFailedSnafu)?;
231
232            Ok(RedisConnection::new_sentinel(
233                sentinel,
234                sentinel_service.clone(),
235                self.sentinel_connect.clone().unwrap_or_default().into(),
236            )
237            .await
238            .context(RedisCreateFailedSnafu)?)
239        } else {
240            // SAFETY: endpoints cannot be empty (checked above)
241            let client =
242                redis::Client::open(endpoints[0].as_str()).context(RedisCreateFailedSnafu)?;
243            let conn = client
244                .get_connection_manager()
245                .await
246                .context(RedisCreateFailedSnafu)?;
247
248            Ok(RedisConnection::new_direct(conn))
249        }
250    }
251
252    async fn healthcheck(mut conn: RedisConnection) -> crate::Result<()> {
253        redis::cmd("PING")
254            .query_async(&mut conn.get_connection_manager().await?.connection)
255            .await
256            .map_err(Into::into)
257    }
258}
259
260/// Controls how Redis Sentinel will connect to the servers belonging to it.
261#[configurable_component]
262#[derive(Clone, Debug, Default)]
263#[serde(deny_unknown_fields)]
264pub struct SentinelConnectionSettings {
265    #[configurable(derived)]
266    #[serde(default)]
267    pub tls: MaybeTlsMode,
268
269    #[configurable(derived)]
270    #[serde(default)]
271    pub connections: Option<RedisConnectionSettings>,
272}
273
274impl From<SentinelConnectionSettings> for SentinelNodeConnectionInfo {
275    fn from(value: SentinelConnectionSettings) -> Self {
276        SentinelNodeConnectionInfo {
277            tls_mode: value.tls.into(),
278            redis_connection_info: value.connections.map(Into::into),
279        }
280    }
281}
282
283/// How/if TLS should be established.
284#[configurable_component]
285#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
286#[derivative(Default)]
287#[serde(rename_all = "lowercase")]
288pub enum MaybeTlsMode {
289    /// Don't use TLS.
290    ///
291    /// This is the default.
292    #[derivative(Default)]
293    None,
294
295    /// Enable TLS with certificate verification.
296    Secure,
297
298    /// Enable TLS without certificate verification.
299    Insecure,
300}
301
302impl From<MaybeTlsMode> for Option<TlsMode> {
303    fn from(value: MaybeTlsMode) -> Self {
304        match value {
305            MaybeTlsMode::None => None,
306            MaybeTlsMode::Secure => Some(TlsMode::Secure),
307            MaybeTlsMode::Insecure => Some(TlsMode::Insecure),
308        }
309    }
310}
311
312/// Connection independent information used to establish a connection
313/// to a redis instance sentinel owns.
314#[configurable_component]
315#[derive(Clone, Debug, Derivative)]
316#[derivative(Default)]
317pub struct RedisConnectionSettings {
318    /// The database number to use. Usually `0`.
319    pub db: i64,
320
321    /// Optionally, the username to connection with.
322    pub username: Option<String>,
323
324    /// Optionally, the password to connection with.
325    pub password: Option<String>,
326
327    /// The version of RESP to use.
328    pub protocol: RedisProtocolVersion,
329}
330
331impl From<RedisConnectionSettings> for RedisConnectionInfo {
332    fn from(value: RedisConnectionSettings) -> Self {
333        RedisConnectionInfo {
334            db: value.db,
335            username: value.username,
336            password: value.password,
337            protocol: value.protocol.into(),
338        }
339    }
340}
341
342/// The communication protocol to use with the redis server.
343#[configurable_component]
344#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
345#[derivative(Default)]
346pub enum RedisProtocolVersion {
347    /// Use RESP2.
348    ///
349    /// This is the default.
350    #[derivative(Default)]
351    RESP2,
352
353    /// Use RESP3.
354    RESP3,
355}
356
357impl From<RedisProtocolVersion> for ProtocolVersion {
358    fn from(value: RedisProtocolVersion) -> Self {
359        match value {
360            RedisProtocolVersion::RESP2 => ProtocolVersion::RESP2,
361            RedisProtocolVersion::RESP3 => ProtocolVersion::RESP3,
362        }
363    }
364}