vector/sinks/webhdfs/
config.rs1use opendal::{layers::LoggingLayer, services::Webhdfs, Operator};
2use tower::ServiceBuilder;
3use vector_lib::codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig};
4use vector_lib::configurable::configurable_component;
5use vector_lib::{
6 config::{AcknowledgementsConfig, DataType, Input},
7 sink::VectorSink,
8};
9
10use crate::{
11 codecs::{Encoder, EncodingConfigWithFraming, SinkType},
12 config::{GenerateConfig, SinkConfig, SinkContext},
13 sinks::{
14 opendal_common::*,
15 util::{
16 partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings,
17 Compression,
18 },
19 Healthcheck,
20 },
21};
22
23#[configurable_component(sink("webhdfs", "WebHDFS."))]
25#[derive(Clone, Debug)]
26#[serde(deny_unknown_fields)]
27pub struct WebHdfsConfig {
28 #[serde(default)]
34 pub root: String,
35
36 #[serde(default)]
44 #[configurable(metadata(docs::templateable))]
45 pub prefix: String,
46
47 #[serde(default)]
55 #[configurable(metadata(docs::examples = "http://127.0.0.1:9870"))]
56 pub endpoint: String,
57
58 #[serde(flatten)]
59 pub encoding: EncodingConfigWithFraming,
60
61 #[configurable(derived)]
62 #[serde(default = "Compression::gzip_default")]
63 pub compression: Compression,
64
65 #[configurable(derived)]
66 #[serde(default)]
67 pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
68
69 #[configurable(derived)]
70 #[serde(
71 default,
72 deserialize_with = "crate::serde::bool_or_struct",
73 skip_serializing_if = "crate::serde::is_default"
74 )]
75 pub acknowledgements: AcknowledgementsConfig,
76}
77
78impl GenerateConfig for WebHdfsConfig {
79 fn generate_config() -> toml::Value {
80 toml::Value::try_from(Self {
81 root: "/".to_string(),
82 prefix: "%F/".to_string(),
83 endpoint: "http://127.0.0.1:9870".to_string(),
84
85 encoding: (
86 Some(NewlineDelimitedEncoderConfig::new()),
87 JsonSerializerConfig::default(),
88 )
89 .into(),
90 compression: Compression::gzip_default(),
91 batch: BatchConfig::default(),
92
93 acknowledgements: Default::default(),
94 })
95 .unwrap()
96 }
97}
98
99#[async_trait::async_trait]
100#[typetag::serde(name = "webhdfs")]
101impl SinkConfig for WebHdfsConfig {
102 async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
103 let op = self.build_operator()?;
104
105 let check_op = op.clone();
106 let healthcheck = Box::pin(async move { Ok(check_op.check().await?) });
107
108 let sink = self.build_processor(op)?;
109 Ok((sink, healthcheck))
110 }
111
112 fn input(&self) -> Input {
113 Input::new(self.encoding.config().1.input_type() & DataType::Log)
114 }
115
116 fn acknowledgements(&self) -> &AcknowledgementsConfig {
117 &self.acknowledgements
118 }
119}
120
121impl WebHdfsConfig {
122 pub fn build_operator(&self) -> crate::Result<Operator> {
123 let mut builder = Webhdfs::default();
125 builder = builder.root(&self.root);
127 builder = builder.endpoint(&self.endpoint);
128
129 let op = Operator::new(builder)?
130 .layer(LoggingLayer::default())
131 .finish();
132 Ok(op)
133 }
134
135 pub fn build_processor(&self, op: Operator) -> crate::Result<VectorSink> {
136 let batcher_settings = self.batch.into_batcher_settings()?;
138
139 let transformer = self.encoding.transformer();
140 let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
141 let encoder = Encoder::<Framer>::new(framer, serializer);
142
143 let request_builder = OpenDalRequestBuilder {
144 encoder: (transformer, encoder),
145 compression: self.compression,
146 };
147
148 let svc = ServiceBuilder::new().service(OpenDalService::new(op));
150
151 let sink = OpenDalSink::new(
152 svc,
153 request_builder,
154 self.key_partitioner()?,
155 batcher_settings,
156 );
157
158 Ok(VectorSink::from_event_streamsink(sink))
159 }
160
161 pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
162 let prefix = self.prefix.clone().try_into()?;
163 Ok(KeyPartitioner::new(prefix, None))
164 }
165}