1use redis::{
2 ProtocolVersion, RedisConnectionInfo, TlsMode,
3 sentinel::{Sentinel, SentinelNodeConnectionInfo},
4};
5use snafu::prelude::*;
6
7use super::{
8 RedisCreateFailedSnafu,
9 sink::{RedisConnection, RedisSink},
10};
11use crate::{
12 serde::OneOrMany,
13 sinks::{prelude::*, util::service::TowerRequestConfigDefaults},
14};
15
16#[derive(Clone, Copy, Debug)]
17pub struct RedisTowerRequestConfigDefaults;
18
19impl TowerRequestConfigDefaults for RedisTowerRequestConfigDefaults {
20 const CONCURRENCY: Concurrency = Concurrency::None;
21}
22
23#[configurable_component]
25#[derive(Clone, Copy, Debug, Derivative)]
26#[derivative(Default)]
27#[serde(rename_all = "lowercase")]
28pub enum DataTypeConfig {
29 #[derivative(Default)]
35 List,
36
37 SortedSet,
42
43 Channel,
47}
48
49#[configurable_component]
51#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
52#[serde(rename_all = "lowercase")]
53pub struct ListOption {
54 pub method: ListMethod,
56}
57
58#[configurable_component]
60#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
61#[derivative(Default)]
62#[serde(rename_all = "lowercase")]
63pub enum ListMethod {
64 #[derivative(Default)]
70 RPush,
71
72 LPush,
76}
77
78#[configurable_component]
80#[derive(Clone, Debug, Derivative, Eq, PartialEq)]
81#[serde(rename_all = "lowercase")]
82pub struct SortedSetOption {
83 pub method: Option<SortedSetMethod>,
85
86 #[configurable(validation(length(min = 1)))]
94 pub score: Option<UnsignedIntTemplate>,
95}
96
97#[configurable_component]
99#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
100#[derivative(Default)]
101#[serde(rename_all = "lowercase")]
102pub enum SortedSetMethod {
103 #[derivative(Default)]
109 ZAdd,
110}
111
112#[derive(Clone, Copy, Debug, Default)]
113pub struct RedisDefaultBatchSettings;
114
115impl SinkBatchSettings for RedisDefaultBatchSettings {
116 const MAX_EVENTS: Option<usize> = Some(1);
117 const MAX_BYTES: Option<usize> = None;
118 const TIMEOUT_SECS: f64 = 1.0;
119}
120
121#[configurable_component(sink("redis", "Publish observability data to Redis."))]
123#[derive(Clone, Debug)]
124#[serde(deny_unknown_fields)]
125pub struct RedisSinkConfig {
126 #[configurable(derived)]
127 pub(super) encoding: EncodingConfig,
128
129 #[configurable(derived)]
130 #[serde(default)]
131 pub(super) data_type: DataTypeConfig,
132
133 #[configurable(derived)]
134 #[serde(alias = "list")]
135 pub(super) list_option: Option<ListOption>,
136
137 #[configurable(derived)]
138 #[serde(alias = "sorted_set")]
139 pub(super) sorted_set_option: Option<SortedSetOption>,
140
141 #[configurable(metadata(docs::examples = "redis://127.0.0.1:6379/0"))]
146 #[serde(alias = "url")]
147 pub(super) endpoint: OneOrMany<String>,
148
149 #[configurable]
154 pub(super) sentinel_service: Option<String>,
155
156 #[configurable(derived)]
157 #[serde(default)]
158 pub(super) sentinel_connect: Option<SentinelConnectionSettings>,
159
160 #[configurable(validation(length(min = 1)))]
162 #[configurable(metadata(docs::examples = "syslog:{{ app }}", docs::examples = "vector"))]
163 pub(super) key: Template,
164
165 #[configurable(derived)]
166 #[serde(default)]
167 pub(super) batch: BatchConfig<RedisDefaultBatchSettings>,
168
169 #[configurable(derived)]
170 #[serde(default)]
171 pub(super) request: TowerRequestConfig<RedisTowerRequestConfigDefaults>,
172
173 #[configurable(derived)]
174 #[serde(
175 default,
176 deserialize_with = "crate::serde::bool_or_struct",
177 skip_serializing_if = "crate::serde::is_default"
178 )]
179 pub(super) acknowledgements: AcknowledgementsConfig,
180}
181
182impl GenerateConfig for RedisSinkConfig {
183 fn generate_config() -> toml::Value {
184 toml::from_str(
185 r#"
186 url = "redis://127.0.0.1:6379/0"
187 key = "vector"
188 data_type = "list"
189 list.method = "lpush"
190 encoding.codec = "json"
191 batch.max_events = 1
192 "#,
193 )
194 .unwrap()
195 }
196}
197
198#[async_trait::async_trait]
199#[typetag::serde(name = "redis")]
200impl SinkConfig for RedisSinkConfig {
201 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
202 if self.key.is_empty() {
203 return Err("`key` cannot be empty.".into());
204 }
205 let conn = self.build_connection().await?;
206 let healthcheck = RedisSinkConfig::healthcheck(conn.clone()).boxed();
207 let sink = RedisSink::new(self, conn)?;
208 Ok((super::VectorSink::from_event_streamsink(sink), healthcheck))
209 }
210
211 fn input(&self) -> Input {
212 Input::new(self.encoding.config().input_type())
213 }
214
215 fn acknowledgements(&self) -> &AcknowledgementsConfig {
216 &self.acknowledgements
217 }
218}
219
220impl RedisSinkConfig {
221 pub(super) async fn build_connection(&self) -> crate::Result<RedisConnection> {
222 let endpoints = self.endpoint.clone().to_vec();
223
224 if endpoints.is_empty() {
225 return Err("`endpoint` cannot be empty.".into());
226 }
227
228 if let Some(sentinel_service) = &self.sentinel_service {
229 let sentinel = Sentinel::build(endpoints).context(RedisCreateFailedSnafu)?;
230
231 Ok(RedisConnection::new_sentinel(
232 sentinel,
233 sentinel_service.clone(),
234 self.sentinel_connect.clone().unwrap_or_default().into(),
235 )
236 .await
237 .context(RedisCreateFailedSnafu)?)
238 } else {
239 let client =
241 redis::Client::open(endpoints[0].as_str()).context(RedisCreateFailedSnafu)?;
242 let conn = client
243 .get_connection_manager()
244 .await
245 .context(RedisCreateFailedSnafu)?;
246
247 Ok(RedisConnection::new_direct(conn))
248 }
249 }
250
251 async fn healthcheck(mut conn: RedisConnection) -> crate::Result<()> {
252 redis::cmd("PING")
253 .query_async(&mut conn.get_connection_manager().await?.connection)
254 .await
255 .map_err(Into::into)
256 }
257}
258
259#[configurable_component]
261#[derive(Clone, Debug, Default)]
262#[serde(deny_unknown_fields)]
263pub struct SentinelConnectionSettings {
264 #[configurable(derived)]
265 #[serde(default)]
266 pub tls: MaybeTlsMode,
267
268 #[configurable(derived)]
269 #[serde(default)]
270 pub connections: Option<RedisConnectionSettings>,
271}
272
273impl From<SentinelConnectionSettings> for SentinelNodeConnectionInfo {
274 fn from(value: SentinelConnectionSettings) -> Self {
275 SentinelNodeConnectionInfo {
276 tls_mode: value.tls.into(),
277 redis_connection_info: value.connections.map(Into::into),
278 }
279 }
280}
281
282#[configurable_component]
284#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
285#[derivative(Default)]
286#[serde(rename_all = "lowercase")]
287pub enum MaybeTlsMode {
288 #[derivative(Default)]
292 None,
293
294 Secure,
296
297 Insecure,
299}
300
301impl From<MaybeTlsMode> for Option<TlsMode> {
302 fn from(value: MaybeTlsMode) -> Self {
303 match value {
304 MaybeTlsMode::None => None,
305 MaybeTlsMode::Secure => Some(TlsMode::Secure),
306 MaybeTlsMode::Insecure => Some(TlsMode::Insecure),
307 }
308 }
309}
310
311#[configurable_component]
314#[derive(Clone, Debug, Derivative)]
315#[derivative(Default)]
316pub struct RedisConnectionSettings {
317 pub db: i64,
319
320 pub username: Option<String>,
322
323 pub password: Option<String>,
325
326 pub protocol: RedisProtocolVersion,
328}
329
330impl From<RedisConnectionSettings> for RedisConnectionInfo {
331 fn from(value: RedisConnectionSettings) -> Self {
332 RedisConnectionInfo {
333 db: value.db,
334 username: value.username,
335 password: value.password,
336 protocol: value.protocol.into(),
337 }
338 }
339}
340
341#[configurable_component]
343#[derive(Clone, Copy, Debug, Derivative, Eq, PartialEq)]
344#[derivative(Default)]
345pub enum RedisProtocolVersion {
346 #[derivative(Default)]
350 RESP2,
351
352 RESP3,
354}
355
356impl From<RedisProtocolVersion> for ProtocolVersion {
357 fn from(value: RedisProtocolVersion) -> Self {
358 match value {
359 RedisProtocolVersion::RESP2 => ProtocolVersion::RESP2,
360 RedisProtocolVersion::RESP3 => ProtocolVersion::RESP3,
361 }
362 }
363}