vector/sinks/util/
mod.rs

1pub mod adaptive_concurrency;
2pub mod auth;
3// https://github.com/mcarton/rust-derivative/issues/112
4#[allow(clippy::non_canonical_clone_impl)]
5pub mod batch;
6pub mod buffer;
7pub mod builder;
8pub mod compressor;
9pub mod datagram;
10pub mod encoding;
11pub mod http;
12pub mod metadata;
13pub mod normalizer;
14pub mod partitioner;
15pub mod processed_event;
16pub mod request_builder;
17pub mod retries;
18pub mod service;
19pub mod sink;
20pub mod snappy;
21pub mod socket_bytes_sink;
22pub mod statistic;
23pub mod tcp;
24#[cfg(any(test, feature = "test-utils"))]
25pub mod test;
26pub mod udp;
27#[cfg(unix)]
28pub mod unix;
29pub mod uri;
30pub mod zstd;
31
32use std::borrow::Cow;
33
34pub use batch::{
35    Batch, BatchConfig, BatchSettings, BatchSize, BulkSizeBasedDefaultBatchSettings, Merged,
36    NoDefaultsBatchSettings, PushResult, RealtimeEventBasedDefaultBatchSettings,
37    RealtimeSizeBasedDefaultBatchSettings, SinkBatchSettings, Unmerged,
38};
39pub use buffer::{
40    json::{BoxedRawValue, JsonArrayBuffer},
41    partition::Partition,
42    vec::{EncodedLength, VecBuffer},
43    Buffer, Compression, PartitionBuffer, PartitionInnerBuffer,
44};
45pub use builder::SinkBuilderExt;
46pub use compressor::Compressor;
47pub use normalizer::Normalizer;
48pub use request_builder::{IncrementalRequestBuilder, RequestBuilder};
49pub use service::{
50    Concurrency, ServiceBuilderExt, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig,
51    TowerRequestLayer, TowerRequestSettings,
52};
53pub use sink::{BatchSink, PartitionBatchSink, StreamSink};
54use snafu::Snafu;
55pub use uri::UriSerde;
56use vector_lib::{json_size::JsonSize, TimeZone};
57
58use crate::event::EventFinalizers;
59use chrono::{FixedOffset, Offset, Utc};
60
61#[derive(Debug, Snafu)]
62enum SinkBuildError {
63    #[snafu(display("Missing host in address field"))]
64    MissingHost,
65    #[snafu(display("Missing port in address field"))]
66    MissingPort,
67}
68
69#[derive(Debug)]
70pub struct EncodedEvent<I> {
71    pub item: I,
72    pub finalizers: EventFinalizers,
73    pub byte_size: usize,
74    pub json_byte_size: JsonSize,
75}
76
77impl<I> EncodedEvent<I> {
78    /// Create a trivial input with no metadata. This method will be
79    /// removed when all sinks are converted.
80    pub fn new(item: I, byte_size: usize, json_byte_size: JsonSize) -> Self {
81        Self {
82            item,
83            finalizers: Default::default(),
84            byte_size,
85            json_byte_size,
86        }
87    }
88
89    // This should be:
90    // ```impl<F, I: From<F>> From<EncodedEvent<F>> for EncodedEvent<I>```
91    // however, the compiler rejects that due to conflicting
92    // implementations of `From` due to the generic
93    // ```impl<T> From<T> for T```
94    pub fn from<F>(that: EncodedEvent<F>) -> Self
95    where
96        I: From<F>,
97    {
98        Self {
99            item: I::from(that.item),
100            finalizers: that.finalizers,
101            byte_size: that.byte_size,
102            json_byte_size: that.json_byte_size,
103        }
104    }
105
106    /// Remap the item using an adapter
107    pub fn map<T>(self, doit: impl Fn(I) -> T) -> EncodedEvent<T> {
108        EncodedEvent {
109            item: doit(self.item),
110            finalizers: self.finalizers,
111            byte_size: self.byte_size,
112            json_byte_size: self.json_byte_size,
113        }
114    }
115}
116
117/// Joins namespace with name via delimiter if namespace is present.
118pub fn encode_namespace<'a>(
119    namespace: Option<&str>,
120    delimiter: char,
121    name: impl Into<Cow<'a, str>>,
122) -> String {
123    let name = name.into();
124    namespace
125        .map(|namespace| format!("{namespace}{delimiter}{name}"))
126        .unwrap_or_else(|| name.into_owned())
127}
128
129/// Marker trait for types that can hold a batch of events
130pub trait ElementCount {
131    fn element_count(&self) -> usize;
132}
133
134impl<T> ElementCount for Vec<T> {
135    fn element_count(&self) -> usize {
136        self.len()
137    }
138}
139
140pub fn timezone_to_offset(tz: TimeZone) -> Option<FixedOffset> {
141    match tz {
142        TimeZone::Local => Some(*Utc::now().with_timezone(&chrono::Local).offset()),
143        TimeZone::Named(tz) => Some(Utc::now().with_timezone(&tz).offset().fix()),
144    }
145}