vector/sinks/webhdfs/
config.rs

1use opendal::{layers::LoggingLayer, services::Webhdfs, Operator};
2use tower::ServiceBuilder;
3use vector_lib::codecs::{encoding::Framer, JsonSerializerConfig, NewlineDelimitedEncoderConfig};
4use vector_lib::configurable::configurable_component;
5use vector_lib::{
6    config::{AcknowledgementsConfig, DataType, Input},
7    sink::VectorSink,
8};
9
10use crate::{
11    codecs::{Encoder, EncodingConfigWithFraming, SinkType},
12    config::{GenerateConfig, SinkConfig, SinkContext},
13    sinks::{
14        opendal_common::*,
15        util::{
16            partitioner::KeyPartitioner, BatchConfig, BulkSizeBasedDefaultBatchSettings,
17            Compression,
18        },
19        Healthcheck,
20    },
21};
22
23/// Configuration for the `webhdfs` sink.
24#[configurable_component(sink("webhdfs", "WebHDFS."))]
25#[derive(Clone, Debug)]
26#[serde(deny_unknown_fields)]
27pub struct WebHdfsConfig {
28    /// The root path for WebHDFS.
29    ///
30    /// Must be a valid directory.
31    ///
32    /// The final file path is in the format of `{root}/{prefix}{suffix}`.
33    #[serde(default)]
34    pub root: String,
35
36    /// A prefix to apply to all keys.
37    ///
38    /// Prefixes are useful for partitioning objects, such as by creating a blob key that
39    /// stores blobs under a particular directory. If using a prefix for this purpose, it must end
40    /// in `/` to act as a directory path. A trailing `/` is **not** automatically added.
41    ///
42    /// The final file path is in the format of `{root}/{prefix}{suffix}`.
43    #[serde(default)]
44    #[configurable(metadata(docs::templateable))]
45    pub prefix: String,
46
47    /// An HDFS cluster consists of a single NameNode, a master server that manages the file system namespace and regulates access to files by clients.
48    ///
49    /// The endpoint is the HDFS's web restful HTTP API endpoint.
50    ///
51    /// For more information, see the [HDFS Architecture][hdfs_arch] documentation.
52    ///
53    /// [hdfs_arch]: https://hadoop.apache.org/docs/r3.3.4/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html#NameNode_and_DataNodes
54    #[serde(default)]
55    #[configurable(metadata(docs::examples = "http://127.0.0.1:9870"))]
56    pub endpoint: String,
57
58    #[serde(flatten)]
59    pub encoding: EncodingConfigWithFraming,
60
61    #[configurable(derived)]
62    #[serde(default = "Compression::gzip_default")]
63    pub compression: Compression,
64
65    #[configurable(derived)]
66    #[serde(default)]
67    pub batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
68
69    #[configurable(derived)]
70    #[serde(
71        default,
72        deserialize_with = "crate::serde::bool_or_struct",
73        skip_serializing_if = "crate::serde::is_default"
74    )]
75    pub acknowledgements: AcknowledgementsConfig,
76}
77
78impl GenerateConfig for WebHdfsConfig {
79    fn generate_config() -> toml::Value {
80        toml::Value::try_from(Self {
81            root: "/".to_string(),
82            prefix: "%F/".to_string(),
83            endpoint: "http://127.0.0.1:9870".to_string(),
84
85            encoding: (
86                Some(NewlineDelimitedEncoderConfig::new()),
87                JsonSerializerConfig::default(),
88            )
89                .into(),
90            compression: Compression::gzip_default(),
91            batch: BatchConfig::default(),
92
93            acknowledgements: Default::default(),
94        })
95        .unwrap()
96    }
97}
98
99#[async_trait::async_trait]
100#[typetag::serde(name = "webhdfs")]
101impl SinkConfig for WebHdfsConfig {
102    async fn build(&self, _cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
103        let op = self.build_operator()?;
104
105        let check_op = op.clone();
106        let healthcheck = Box::pin(async move { Ok(check_op.check().await?) });
107
108        let sink = self.build_processor(op)?;
109        Ok((sink, healthcheck))
110    }
111
112    fn input(&self) -> Input {
113        Input::new(self.encoding.config().1.input_type() & DataType::Log)
114    }
115
116    fn acknowledgements(&self) -> &AcknowledgementsConfig {
117        &self.acknowledgements
118    }
119}
120
121impl WebHdfsConfig {
122    pub fn build_operator(&self) -> crate::Result<Operator> {
123        // Build OpenDal Operator
124        let mut builder = Webhdfs::default();
125        // Prefix logic will be handled by key_partitioner.
126        builder = builder.root(&self.root);
127        builder = builder.endpoint(&self.endpoint);
128
129        let op = Operator::new(builder)?
130            .layer(LoggingLayer::default())
131            .finish();
132        Ok(op)
133    }
134
135    pub fn build_processor(&self, op: Operator) -> crate::Result<VectorSink> {
136        // Configure our partitioning/batching.
137        let batcher_settings = self.batch.into_batcher_settings()?;
138
139        let transformer = self.encoding.transformer();
140        let (framer, serializer) = self.encoding.build(SinkType::MessageBased)?;
141        let encoder = Encoder::<Framer>::new(framer, serializer);
142
143        let request_builder = OpenDalRequestBuilder {
144            encoder: (transformer, encoder),
145            compression: self.compression,
146        };
147
148        // TODO: we can add tower middleware here.
149        let svc = ServiceBuilder::new().service(OpenDalService::new(op));
150
151        let sink = OpenDalSink::new(
152            svc,
153            request_builder,
154            self.key_partitioner()?,
155            batcher_settings,
156        );
157
158        Ok(VectorSink::from_event_streamsink(sink))
159    }
160
161    pub fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
162        let prefix = self.prefix.clone().try_into()?;
163        Ok(KeyPartitioner::new(prefix, None))
164    }
165}