vector/sinks/redis/
service.rs

1use std::task::{Context, Poll};
2
3use super::{
4    RedisRequest, RedisSinkError,
5    config::{ListMethod, SortedSetMethod},
6    sink::{ConnectionState, RedisConnection},
7};
8use crate::sinks::prelude::*;
9
10#[derive(Clone)]
11pub struct RedisService {
12    pub(super) conn: RedisConnection,
13    pub(super) data_type: super::DataType,
14}
15
16impl Service<RedisRequest> for RedisService {
17    type Response = RedisResponse;
18    type Error = RedisSinkError;
19    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
20
21    // Emission of an internal event in case of errors is handled upstream by the caller.
22    fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
23        Poll::Ready(Ok(()))
24    }
25
26    // Emission of internal events for errors and dropped events is handled upstream by the caller.
27    fn call(&mut self, kvs: RedisRequest) -> Self::Future {
28        let count = kvs.request.len();
29
30        let mut redis_conn = self.conn.clone();
31        let mut pipe = redis::pipe();
32
33        for kv in kvs.request {
34            match self.data_type {
35                super::DataType::List(method) => match method {
36                    ListMethod::LPush => {
37                        if count > 1 {
38                            pipe.atomic().lpush(kv.key, kv.value.as_ref());
39                        } else {
40                            pipe.lpush(kv.key, kv.value.as_ref());
41                        }
42                    }
43                    ListMethod::RPush => {
44                        if count > 1 {
45                            pipe.atomic().rpush(kv.key, kv.value.as_ref());
46                        } else {
47                            pipe.rpush(kv.key, kv.value.as_ref());
48                        }
49                    }
50                },
51                super::DataType::SortedSet(method) => match method {
52                    SortedSetMethod::ZAdd => {
53                        if count > 1 {
54                            pipe.atomic().zadd(
55                                kv.key,
56                                kv.value.as_ref(),
57                                kv.score.unwrap_or(0) as f64,
58                            );
59                        } else {
60                            pipe.zadd(kv.key, kv.value.as_ref(), kv.score.unwrap_or(0) as f64);
61                        }
62                    }
63                },
64                super::DataType::Channel => {
65                    if count > 1 {
66                        pipe.atomic().publish(kv.key, kv.value.as_ref());
67                    } else {
68                        pipe.publish(kv.key, kv.value.as_ref());
69                    }
70                }
71            }
72        }
73
74        let byte_size = kvs.metadata.events_byte_size();
75
76        Box::pin(async move {
77            let ConnectionState {
78                connection: mut conn,
79                generation,
80            } = redis_conn.get_connection_manager().await?;
81
82            match pipe.query_async(&mut conn).await {
83                Ok(event_status) => Ok(RedisResponse {
84                    event_status,
85                    events_byte_size: kvs.metadata.into_events_estimated_json_encoded_byte_size(),
86                    byte_size,
87                }),
88                Err(error) => Err(RedisSinkError::SendError {
89                    source: error,
90                    generation,
91                }),
92            }
93        })
94    }
95}
96
97pub struct RedisResponse {
98    pub event_status: Vec<bool>,
99    pub events_byte_size: GroupedCountByteSize,
100    pub byte_size: usize,
101}
102
103impl RedisResponse {
104    pub(super) fn is_successful(&self) -> bool {
105        self.event_status.iter().all(|x| *x)
106    }
107}
108
109impl DriverResponse for RedisResponse {
110    fn event_status(&self) -> EventStatus {
111        if self.is_successful() {
112            EventStatus::Delivered
113        } else {
114            EventStatus::Errored
115        }
116    }
117
118    fn events_sent(&self) -> &GroupedCountByteSize {
119        &self.events_byte_size
120    }
121
122    fn bytes_sent(&self) -> Option<usize> {
123        Some(self.byte_size)
124    }
125}