vector/sinks/postgres/
config.rs1use futures::FutureExt;
2use tower::ServiceBuilder;
3use vector_lib::{
4 config::AcknowledgementsConfig,
5 configurable::{component::GenerateConfig, configurable_component},
6 sink::VectorSink,
7};
8
9use super::{
10 service::{PostgresRetryLogic, PostgresService},
11 sink::PostgresSink,
12};
13use sqlx::{postgres::PgPoolOptions, Pool, Postgres};
14
15use crate::{
16 config::{Input, SinkConfig, SinkContext},
17 sinks::{
18 util::{
19 BatchConfig, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
20 TowerRequestConfig, UriSerde,
21 },
22 Healthcheck,
23 },
24};
25
26const fn default_pool_size() -> u32 {
27 5
28}
29
30#[configurable_component(sink("postgres", "Deliver log data to a PostgreSQL database."))]
32#[derive(Clone, Default, Debug)]
33#[serde(deny_unknown_fields)]
34pub struct PostgresConfig {
35 pub endpoint: String,
39
40 pub table: String,
45
46 #[serde(default = "default_pool_size")]
49 pub pool_size: u32,
50
51 #[configurable(derived)]
62 #[serde(default)]
63 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
64
65 #[configurable(derived)]
66 #[serde(default)]
67 pub request: TowerRequestConfig,
68
69 #[configurable(derived)]
70 #[serde(
71 default,
72 deserialize_with = "crate::serde::bool_or_struct",
73 skip_serializing_if = "crate::serde::is_default"
74 )]
75 pub acknowledgements: AcknowledgementsConfig,
76}
77
78impl GenerateConfig for PostgresConfig {
79 fn generate_config() -> toml::Value {
80 toml::from_str(
81 r#"endpoint = "postgres://user:password@localhost/default"
82 table = "table"
83 "#,
84 )
85 .unwrap()
86 }
87}
88
89#[async_trait::async_trait]
90#[typetag::serde(name = "postgres")]
91impl SinkConfig for PostgresConfig {
92 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
93 let connection_pool = PgPoolOptions::new()
94 .max_connections(self.pool_size)
95 .connect_lazy(&self.endpoint)?;
96
97 let healthcheck = healthcheck(connection_pool.clone()).boxed();
98
99 let batch_settings = self.batch.into_batcher_settings()?;
100 let request_settings = self.request.into_settings();
101
102 let endpoint_uri: UriSerde = self.endpoint.parse()?;
103 let service = PostgresService::new(
104 connection_pool,
105 self.table.clone(),
106 endpoint_uri.to_string(),
107 );
108 let service = ServiceBuilder::new()
109 .settings(request_settings, PostgresRetryLogic)
110 .service(service);
111
112 let sink = PostgresSink::new(service, batch_settings);
113
114 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
115 }
116
117 fn input(&self) -> Input {
118 Input::all()
119 }
120
121 fn acknowledgements(&self) -> &AcknowledgementsConfig {
122 &self.acknowledgements
123 }
124}
125
126async fn healthcheck(connection_pool: Pool<Postgres>) -> crate::Result<()> {
127 sqlx::query("SELECT 1").execute(&connection_pool).await?;
128 Ok(())
129}
130
131#[cfg(test)]
132mod tests {
133 use super::*;
134
135 #[test]
136 fn generate_config() {
137 crate::test_util::test_generate_config::<PostgresConfig>();
138 }
139
140 #[test]
141 fn parse_config() {
142 let cfg = toml::from_str::<PostgresConfig>(
143 r#"
144 endpoint = "postgres://user:password@localhost/default"
145 table = "mytable"
146 "#,
147 )
148 .unwrap();
149 assert_eq!(cfg.endpoint, "postgres://user:password@localhost/default");
150 assert_eq!(cfg.table, "mytable");
151 }
152}