vector/sinks/postgres/
config.rs1use futures::FutureExt;
2use sqlx::{Pool, Postgres, postgres::PgPoolOptions};
3use tower::ServiceBuilder;
4use vector_lib::{
5 config::AcknowledgementsConfig,
6 configurable::{component::GenerateConfig, configurable_component},
7 sink::VectorSink,
8};
9
10use super::{
11 service::{PostgresRetryLogic, PostgresService},
12 sink::PostgresSink,
13};
14use crate::{
15 config::{Input, SinkConfig, SinkContext},
16 sinks::{
17 Healthcheck,
18 util::{
19 BatchConfig, RealtimeSizeBasedDefaultBatchSettings, ServiceBuilderExt,
20 TowerRequestConfig, UriSerde,
21 },
22 },
23};
24
25const fn default_pool_size() -> u32 {
26 5
27}
28
29#[configurable_component(sink("postgres", "Deliver log data to a PostgreSQL database."))]
31#[derive(Clone, Default, Debug)]
32#[serde(deny_unknown_fields)]
33pub struct PostgresConfig {
34 pub endpoint: String,
38
39 pub table: String,
44
45 #[serde(default = "default_pool_size")]
48 pub pool_size: u32,
49
50 #[configurable(derived)]
61 #[serde(default)]
62 pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
63
64 #[configurable(derived)]
65 #[serde(default)]
66 pub request: TowerRequestConfig,
67
68 #[configurable(derived)]
69 #[serde(
70 default,
71 deserialize_with = "crate::serde::bool_or_struct",
72 skip_serializing_if = "crate::serde::is_default"
73 )]
74 pub acknowledgements: AcknowledgementsConfig,
75}
76
77impl GenerateConfig for PostgresConfig {
78 fn generate_config() -> toml::Value {
79 toml::from_str(
80 r#"endpoint = "postgres://user:password@localhost/default"
81 table = "table"
82 "#,
83 )
84 .unwrap()
85 }
86}
87
88#[async_trait::async_trait]
89#[typetag::serde(name = "postgres")]
90impl SinkConfig for PostgresConfig {
91 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
92 let connection_pool = PgPoolOptions::new()
93 .max_connections(self.pool_size)
94 .connect_lazy(&self.endpoint)?;
95
96 let healthcheck = healthcheck(connection_pool.clone()).boxed();
97
98 let batch_settings = self.batch.into_batcher_settings()?;
99 let request_settings = self.request.into_settings();
100
101 let endpoint_uri: UriSerde = self.endpoint.parse()?;
102 let service = PostgresService::new(
103 connection_pool,
104 self.table.clone(),
105 endpoint_uri.to_string(),
106 );
107 let service = ServiceBuilder::new()
108 .settings(request_settings, PostgresRetryLogic)
109 .service(service);
110
111 let sink = PostgresSink::new(service, batch_settings);
112
113 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
114 }
115
116 fn input(&self) -> Input {
117 Input::all()
118 }
119
120 fn acknowledgements(&self) -> &AcknowledgementsConfig {
121 &self.acknowledgements
122 }
123}
124
125async fn healthcheck(connection_pool: Pool<Postgres>) -> crate::Result<()> {
126 sqlx::query("SELECT 1").execute(&connection_pool).await?;
127 Ok(())
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133
134 #[test]
135 fn generate_config() {
136 crate::test_util::test_generate_config::<PostgresConfig>();
137 }
138
139 #[test]
140 fn parse_config() {
141 let cfg = toml::from_str::<PostgresConfig>(
142 r#"
143 endpoint = "postgres://user:password@localhost/default"
144 table = "mytable"
145 "#,
146 )
147 .unwrap();
148 assert_eq!(cfg.endpoint, "postgres://user:password@localhost/default");
149 assert_eq!(cfg.table, "mytable");
150 }
151}