vector/sinks/postgres/
config.rs

1use futures::FutureExt;
2use sqlx::{Pool, Postgres, postgres::PgPoolOptions};
3use tower::ServiceBuilder;
4use vector_lib::{
5    config::AcknowledgementsConfig,
6    configurable::{component::GenerateConfig, configurable_component},
7    sink::VectorSink,
8};
9
10use super::{
11    service::{PostgresRetryLogic, PostgresService},
12    sink::PostgresSink,
13};
14use crate::{
15    config::{Input, SinkConfig, SinkContext},
16    sinks::{
17        Healthcheck,
18        util::{
19            BatchConfig, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
20            TowerRequestConfig, UriSerde,
21        },
22    },
23};
24
25const fn default_pool_size() -> u32 {
26    5
27}
28
29/// Configuration for the `postgres` sink.
30#[configurable_component(sink("postgres", "Deliver log data to a PostgreSQL database."))]
31#[derive(Clone, Default, Debug)]
32#[serde(deny_unknown_fields)]
33pub struct PostgresConfig {
34    /// The PostgreSQL server connection string. It can contain the username and password.
35    /// See [PostgreSQL documentation](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING) about connection strings for more information
36    /// about valid formats and options that can be used.
37    pub endpoint: String,
38
39    /// The table that data is inserted into. This table parameter is vulnerable
40    /// to SQL injection attacks as Vector does not validate or sanitize it, you must not use untrusted input.
41    /// This parameter will be directly interpolated in the SQL query statement,
42    /// as table names as parameters in prepared statements are not allowed in PostgreSQL.
43    pub table: String,
44
45    /// The postgres connection pool size. See [this](https://docs.rs/sqlx/latest/sqlx/struct.Pool.html#why-use-a-pool) for more
46    /// information about why a connection pool should be used.
47    #[serde(default = "default_pool_size")]
48    pub pool_size: u32,
49
50    /// Event batching behavior.
51    ///
52    /// Note that as PostgreSQL's `jsonb_populate_recordset` function is used to insert events,
53    /// a single event in the batch can make the whole batch to fail. For example, if a single event within the batch triggers
54    /// a unique constraint violation in the destination table, the whole event batch will fail.
55    ///
56    /// As a workaround, [triggers](https://www.postgresql.org/docs/current/sql-createtrigger.html) on constraint violations
57    /// can be defined at a database level to change the behavior of the insert operation on specific tables.
58    /// Alternatively, setting `max_events` batch setting to `1` will make each event to be inserted independently,
59    /// so events that trigger a constraint violation will not affect the rest of the events.
60    #[configurable(derived)]
61    #[serde(default)]
62    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
63
64    #[configurable(derived)]
65    #[serde(default)]
66    pub request: TowerRequestConfig,
67
68    #[configurable(derived)]
69    #[serde(
70        default,
71        deserialize_with = "crate::serde::bool_or_struct",
72        skip_serializing_if = "crate::serde::is_default"
73    )]
74    pub acknowledgements: AcknowledgementsConfig,
75}
76
77impl GenerateConfig for PostgresConfig {
78    fn generate_config() -> toml::Value {
79        toml::from_str(
80            r#"endpoint = "postgres://user:password@localhost/default"
81            table = "table"
82        "#,
83        )
84        .unwrap()
85    }
86}
87
88#[async_trait::async_trait]
89#[typetag::serde(name = "postgres")]
90impl SinkConfig for PostgresConfig {
91    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
92        let connection_pool = PgPoolOptions::new()
93            .max_connections(self.pool_size)
94            .connect_lazy(&self.endpoint)?;
95
96        let healthcheck = healthcheck(connection_pool.clone()).boxed();
97
98        let batch_settings = self.batch.into_batcher_settings()?;
99        let request_settings = self.request.into_settings();
100
101        let endpoint_uri: UriSerde = self.endpoint.parse()?;
102        let service = PostgresService::new(
103            connection_pool,
104            self.table.clone(),
105            endpoint_uri.to_string(),
106        );
107        let service = ServiceBuilder::new()
108            .settings(request_settings, PostgresRetryLogic)
109            .service(service);
110
111        let sink = PostgresSink::new(service, batch_settings);
112
113        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
114    }
115
116    fn input(&self) -> Input {
117        Input::all()
118    }
119
120    fn acknowledgements(&self) -> &AcknowledgementsConfig {
121        &self.acknowledgements
122    }
123}
124
125async fn healthcheck(connection_pool: Pool<Postgres>) -> crate::Result<()> {
126    sqlx::query("SELECT 1").execute(&connection_pool).await?;
127    Ok(())
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133
134    #[test]
135    fn generate_config() {
136        crate::test_util::test_generate_config::<PostgresConfig>();
137    }
138
139    #[test]
140    fn parse_config() {
141        let cfg = toml::from_str::<PostgresConfig>(
142            r#"
143            endpoint = "postgres://user:password@localhost/default"
144            table = "mytable"
145        "#,
146        )
147        .unwrap();
148        assert_eq!(cfg.endpoint, "postgres://user:password@localhost/default");
149        assert_eq!(cfg.table, "mytable");
150    }
151}