vector/sinks/postgres/
config.rs

1use futures::FutureExt;
2use tower::ServiceBuilder;
3use vector_lib::{
4    config::AcknowledgementsConfig,
5    configurable::{component::GenerateConfig, configurable_component},
6    sink::VectorSink,
7};
8
9use super::{
10    service::{PostgresRetryLogic, PostgresService},
11    sink::PostgresSink,
12};
13use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
14
15use crate::{
16    config::{Input, SinkConfig, SinkContext},
17    sinks::{
18        util::{
19            BatchConfig, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
20            TowerRequestConfig, UriSerde,
21        },
22        Healthcheck,
23    },
24};
25
26const fn default_pool_size() -> u32 {
27    5
28}
29
30/// Configuration for the `postgres` sink.
31#[configurable_component(sink("postgres", "Deliver log data to a PostgreSQL database."))]
32#[derive(Clone, Default, Debug)]
33#[serde(deny_unknown_fields)]
34pub struct PostgresConfig {
35    /// The PostgreSQL server connection string. It can contain the username and password.
36    /// See [PostgreSQL documentation](https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING) about connection strings for more information
37    /// about valid formats and options that can be used.
38    pub endpoint: String,
39
40    /// The table that data is inserted into. This table parameter is vulnerable
41    /// to SQL injection attacks as Vector does not validate or sanitize it, you must not use untrusted input.
42    /// This parameter will be directly interpolated in the SQL query statement,
43    /// as table names as parameters in prepared statements are not allowed in PostgreSQL.
44    pub table: String,
45
46    /// The postgres connection pool size. See [this](https://docs.rs/sqlx/latest/sqlx/struct.Pool.html#why-use-a-pool) for more
47    /// information about why a connection pool should be used.
48    #[serde(default = "default_pool_size")]
49    pub pool_size: u32,
50
51    /// Event batching behavior.
52    ///
53    /// Note that as PostgreSQL's `jsonb_populate_recordset` function is used to insert events,
54    /// a single event in the batch can make the whole batch to fail. For example, if a single event within the batch triggers
55    /// a unique constraint violation in the destination table, the whole event batch will fail.
56    ///
57    /// As a workaround, [triggers](https://www.postgresql.org/docs/current/sql-createtrigger.html) on constraint violations
58    /// can be defined at a database level to change the behavior of the insert operation on specific tables.
59    /// Alternatively, setting `max_events` batch setting to `1` will make each event to be inserted independently,
60    /// so events that trigger a constraint violation will not affect the rest of the events.
61    #[configurable(derived)]
62    #[serde(default)]
63    pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
64
65    #[configurable(derived)]
66    #[serde(default)]
67    pub request: TowerRequestConfig,
68
69    #[configurable(derived)]
70    #[serde(
71        default,
72        deserialize_with = "crate::serde::bool_or_struct",
73        skip_serializing_if = "crate::serde::is_default"
74    )]
75    pub acknowledgements: AcknowledgementsConfig,
76}
77
78impl GenerateConfig for PostgresConfig {
79    fn generate_config() -> toml::Value {
80        toml::from_str(
81            r#"endpoint = "postgres://user:password@localhost/default"
82            table = "table"
83        "#,
84        )
85        .unwrap()
86    }
87}
88
89#[async_trait::async_trait]
90#[typetag::serde(name = "postgres")]
91impl SinkConfig for PostgresConfig {
92    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
93        let connection_pool = PgPoolOptions::new()
94            .max_connections(self.pool_size)
95            .connect_lazy(&self.endpoint)?;
96
97        let healthcheck = healthcheck(connection_pool.clone()).boxed();
98
99        let batch_settings = self.batch.into_batcher_settings()?;
100        let request_settings = self.request.into_settings();
101
102        let endpoint_uri: UriSerde = self.endpoint.parse()?;
103        let service = PostgresService::new(
104            connection_pool,
105            self.table.clone(),
106            endpoint_uri.to_string(),
107        );
108        let service = ServiceBuilder::new()
109            .settings(request_settings, PostgresRetryLogic)
110            .service(service);
111
112        let sink = PostgresSink::new(service, batch_settings);
113
114        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
115    }
116
117    fn input(&self) -> Input {
118        Input::all()
119    }
120
121    fn acknowledgements(&self) -> &AcknowledgementsConfig {
122        &self.acknowledgements
123    }
124}
125
126async fn healthcheck(connection_pool: Pool<Postgres>) -> crate::Result<()> {
127    sqlx::query("SELECT 1").execute(&connection_pool).await?;
128    Ok(())
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134
135    #[test]
136    fn generate_config() {
137        crate::test_util::test_generate_config::<PostgresConfig>();
138    }
139
140    #[test]
141    fn parse_config() {
142        let cfg = toml::from_str::<PostgresConfig>(
143            r#"
144            endpoint = "postgres://user:password@localhost/default"
145            table = "mytable"
146        "#,
147        )
148        .unwrap();
149        assert_eq!(cfg.endpoint, "postgres://user:password@localhost/default");
150        assert_eq!(cfg.table, "mytable");
151    }
152}