vector/sinks/postgres/
service.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
use std::num::NonZeroUsize;
use std::task::{Context, Poll};

use crate::internal_events::EndpointBytesSent;
use crate::sinks::prelude::{RequestMetadataBuilder, RetryLogic};
use futures::future::BoxFuture;
use snafu::{ResultExt, Snafu};
use sqlx::types::Json;
use sqlx::{Pool, Postgres};
use tower::Service;
use vector_lib::codecs::JsonSerializerConfig;
use vector_lib::event::{Event, EventFinalizers, EventStatus, Finalizable};
use vector_lib::request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata};
use vector_lib::stream::DriverResponse;
use vector_lib::EstimatedJsonEncodedSizeOf;

const POSTGRES_PROTOCOL: &str = "postgres";

#[derive(Clone)]
pub struct PostgresRetryLogic;

impl RetryLogic for PostgresRetryLogic {
    type Error = PostgresServiceError;
    type Response = PostgresResponse;

    fn is_retriable_error(&self, error: &Self::Error) -> bool {
        let PostgresServiceError::Postgres {
            source: postgres_error,
        } = error
        else {
            return false;
        };

        matches!(
            postgres_error,
            sqlx::Error::Io(_) | sqlx::Error::PoolTimedOut
        )
    }
}

#[derive(Clone)]
pub struct PostgresService {
    connection_pool: Pool<Postgres>,
    table: String,
    endpoint: String,
}

impl PostgresService {
    pub const fn new(connection_pool: Pool<Postgres>, table: String, endpoint: String) -> Self {
        Self {
            connection_pool,
            table,
            endpoint,
        }
    }
}

#[derive(Clone)]
pub struct PostgresRequest {
    pub events: Vec<Event>,
    pub finalizers: EventFinalizers,
    pub metadata: RequestMetadata,
}

impl TryFrom<Vec<Event>> for PostgresRequest {
    type Error = String;

    fn try_from(mut events: Vec<Event>) -> Result<Self, Self::Error> {
        let finalizers = events.take_finalizers();
        let metadata_builder = RequestMetadataBuilder::from_events(&events);
        let events_size = NonZeroUsize::new(events.estimated_json_encoded_size_of().get())
            .ok_or("payload should never be zero length")?;
        let metadata = metadata_builder.with_request_size(events_size);
        Ok(PostgresRequest {
            events,
            finalizers,
            metadata,
        })
    }
}

impl Finalizable for PostgresRequest {
    fn take_finalizers(&mut self) -> EventFinalizers {
        self.finalizers.take_finalizers()
    }
}

impl MetaDescriptive for PostgresRequest {
    fn get_metadata(&self) -> &RequestMetadata {
        &self.metadata
    }

    fn metadata_mut(&mut self) -> &mut RequestMetadata {
        &mut self.metadata
    }
}

pub struct PostgresResponse {
    metadata: RequestMetadata,
}

impl DriverResponse for PostgresResponse {
    fn event_status(&self) -> EventStatus {
        EventStatus::Delivered
    }

    fn events_sent(&self) -> &GroupedCountByteSize {
        self.metadata.events_estimated_json_encoded_byte_size()
    }

    fn bytes_sent(&self) -> Option<usize> {
        Some(self.metadata.request_encoded_size())
    }
}

#[derive(Debug, Snafu)]
pub enum PostgresServiceError {
    #[snafu(display("Database error: {source}"))]
    Postgres { source: sqlx::Error },

    #[snafu(display("Serialization error: {source}"))]
    VectorCommon { source: vector_common::Error },
}

impl Service<PostgresRequest> for PostgresService {
    type Response = PostgresResponse;
    type Error = PostgresServiceError;
    type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;

    fn poll_ready(&mut self, _cx: &mut Context) -> Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, request: PostgresRequest) -> Self::Future {
        let service = self.clone();
        let future = async move {
            let table = service.table;
            let metadata = request.metadata;
            let json_serializer = JsonSerializerConfig::default().build();
            let serialized_values = request
                .events
                .into_iter()
                .map(|event| json_serializer.to_json_value(event))
                .collect::<Result<Vec<_>, _>>()
                .context(VectorCommonSnafu)?;

            sqlx::query(&format!(
                "INSERT INTO {table} SELECT * FROM jsonb_populate_recordset(NULL::{table}, $1)"
            ))
            .bind(Json(serialized_values))
            .execute(&service.connection_pool)
            .await
            .context(PostgresSnafu)?;

            emit!(EndpointBytesSent {
                byte_size: metadata.request_encoded_size(),
                protocol: POSTGRES_PROTOCOL,
                endpoint: &service.endpoint,
            });

            Ok(PostgresResponse { metadata })
        };

        Box::pin(future)
    }
}