vector/sinks/redis/
service.rs

1use std::task::{Context, Poll};
2
3use crate::sinks::prelude::*;
4
5use super::{
6    config::{ListMethod, SortedSetMethod},
7    sink::{ConnectionState, RedisConnection},
8    RedisRequest, RedisSinkError,
9};
10
11#[derive(Clone)]
12pub struct RedisService {
13    pub(super) conn: RedisConnection,
14    pub(super) data_type: super::DataType,
15}
16
17impl Service<RedisRequest> for RedisService {
18    type Response = RedisResponse;
19    type Error = RedisSinkError;
20    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
21
22    // Emission of an internal event in case of errors is handled upstream by the caller.
23    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
24        Poll::Ready(Ok(()))
25    }
26
27    // Emission of internal events for errors and dropped events is handled upstream by the caller.
28    fn call(&mut self, kvs: RedisRequest) -> Self::Future {
29        let count = kvs.request.len();
30
31        let mut redis_conn = self.conn.clone();
32        let mut pipe = redis::pipe();
33
34        for kv in kvs.request {
35            match self.data_type {
36                super::DataType::List(method) => match method {
37                    ListMethod::LPush => {
38                        if count > 1 {
39                            pipe.atomic().lpush(kv.key, kv.value.as_ref());
40                        } else {
41                            pipe.lpush(kv.key, kv.value.as_ref());
42                        }
43                    }
44                    ListMethod::RPush => {
45                        if count > 1 {
46                            pipe.atomic().rpush(kv.key, kv.value.as_ref());
47                        } else {
48                            pipe.rpush(kv.key, kv.value.as_ref());
49                        }
50                    }
51                },
52                super::DataType::SortedSet(method) => match method {
53                    SortedSetMethod::ZAdd => {
54                        if count > 1 {
55                            pipe.atomic().zadd(
56                                kv.key,
57                                kv.value.as_ref(),
58                                kv.score.unwrap_or(0) as f64,
59                            );
60                        } else {
61                            pipe.zadd(kv.key, kv.value.as_ref(), kv.score.unwrap_or(0) as f64);
62                        }
63                    }
64                },
65                super::DataType::Channel => {
66                    if count > 1 {
67                        pipe.atomic().publish(kv.key, kv.value.as_ref());
68                    } else {
69                        pipe.publish(kv.key, kv.value.as_ref());
70                    }
71                }
72            }
73        }
74
75        let byte_size = kvs.metadata.events_byte_size();
76
77        Box::pin(async move {
78            let ConnectionState {
79                connection: mut conn,
80                generation,
81            } = redis_conn.get_connection_manager().await?;
82
83            match pipe.query_async(&mut conn).await {
84                Ok(event_status) => Ok(RedisResponse {
85                    event_status,
86                    events_byte_size: kvs.metadata.into_events_estimated_json_encoded_byte_size(),
87                    byte_size,
88                }),
89                Err(error) => Err(RedisSinkError::SendError {
90                    source: error,
91                    generation,
92                }),
93            }
94        })
95    }
96}
97
98pub struct RedisResponse {
99    pub event_status: Vec<bool>,
100    pub events_byte_size: GroupedCountByteSize,
101    pub byte_size: usize,
102}
103
104impl RedisResponse {
105    pub(super) fn is_successful(&self) -> bool {
106        self.event_status.iter().all(|x| *x)
107    }
108}
109
110impl DriverResponse for RedisResponse {
111    fn event_status(&self) -> EventStatus {
112        if self.is_successful() {
113            EventStatus::Delivered
114        } else {
115            EventStatus::Errored
116        }
117    }
118
119    fn events_sent(&self) -> &GroupedCountByteSize {
120        &self.events_byte_size
121    }
122
123    fn bytes_sent(&self) -> Option<usize> {
124        Some(self.byte_size)
125    }
126}