1pub mod adaptive_concurrency;
2pub mod auth;
3#[allow(clippy::non_canonical_clone_impl)]
5pub mod batch;
6pub mod buffer;
7pub mod builder;
8pub mod compressor;
9pub mod datagram;
10pub mod encoding;
11pub mod http;
12pub mod metadata;
13pub mod normalizer;
14pub mod partitioner;
15pub mod processed_event;
16pub mod request_builder;
17pub mod retries;
18pub mod service;
19pub mod sink;
20pub mod snappy;
21pub mod socket_bytes_sink;
22pub mod statistic;
23pub mod tcp;
24#[cfg(any(test, feature = "test-utils"))]
25pub mod test;
26pub mod udp;
27#[cfg(unix)]
28pub mod unix;
29pub mod uri;
30pub mod zstd;
31
32use std::borrow::Cow;
33
34pub use batch::{
35 Batch, BatchConfig, BatchSettings, BatchSize, BulkSizeBasedDefaultBatchSettings, Merged,
36 NoDefaultsBatchSettings, PushResult, RealtimeEventBasedDefaultBatchSettings,
37 RealtimeSizeBasedDefaultBatchSettings, SinkBatchSettings, Unmerged,
38};
39pub use buffer::{
40 json::{BoxedRawValue, JsonArrayBuffer},
41 partition::Partition,
42 vec::{EncodedLength, VecBuffer},
43 Buffer, Compression, PartitionBuffer, PartitionInnerBuffer,
44};
45pub use builder::SinkBuilderExt;
46pub use compressor::Compressor;
47pub use normalizer::Normalizer;
48pub use request_builder::{IncrementalRequestBuilder, RequestBuilder};
49pub use service::{
50 Concurrency, ServiceBuilderExt, TowerBatchedSink, TowerPartitionSink, TowerRequestConfig,
51 TowerRequestLayer, TowerRequestSettings,
52};
53pub use sink::{BatchSink, PartitionBatchSink, StreamSink};
54use snafu::Snafu;
55pub use uri::UriSerde;
56use vector_lib::{json_size::JsonSize, TimeZone};
57
58use crate::event::EventFinalizers;
59use chrono::{FixedOffset, Offset, Utc};
60
61#[derive(Debug, Snafu)]
62enum SinkBuildError {
63 #[snafu(display("Missing host in address field"))]
64 MissingHost,
65 #[snafu(display("Missing port in address field"))]
66 MissingPort,
67}
68
69#[derive(Debug)]
70pub struct EncodedEvent<I> {
71 pub item: I,
72 pub finalizers: EventFinalizers,
73 pub byte_size: usize,
74 pub json_byte_size: JsonSize,
75}
76
77impl<I> EncodedEvent<I> {
78 pub fn new(item: I, byte_size: usize, json_byte_size: JsonSize) -> Self {
81 Self {
82 item,
83 finalizers: Default::default(),
84 byte_size,
85 json_byte_size,
86 }
87 }
88
89 pub fn from<F>(that: EncodedEvent<F>) -> Self
95 where
96 I: From<F>,
97 {
98 Self {
99 item: I::from(that.item),
100 finalizers: that.finalizers,
101 byte_size: that.byte_size,
102 json_byte_size: that.json_byte_size,
103 }
104 }
105
106 pub fn map<T>(self, doit: impl Fn(I) -> T) -> EncodedEvent<T> {
108 EncodedEvent {
109 item: doit(self.item),
110 finalizers: self.finalizers,
111 byte_size: self.byte_size,
112 json_byte_size: self.json_byte_size,
113 }
114 }
115}
116
117pub fn encode_namespace<'a>(
119 namespace: Option<&str>,
120 delimiter: char,
121 name: impl Into<Cow<'a, str>>,
122) -> String {
123 let name = name.into();
124 namespace
125 .map(|namespace| format!("{namespace}{delimiter}{name}"))
126 .unwrap_or_else(|| name.into_owned())
127}
128
129pub trait ElementCount {
131 fn element_count(&self) -> usize;
132}
133
134impl<T> ElementCount for Vec<T> {
135 fn element_count(&self) -> usize {
136 self.len()
137 }
138}
139
140pub fn timezone_to_offset(tz: TimeZone) -> Option<FixedOffset> {
141 match tz {
142 TimeZone::Local => Some(*Utc::now().with_timezone(&chrono::Local).offset()),
143 TimeZone::Named(tz) => Some(Utc::now().with_timezone(&tz).offset().fix()),
144 }
145}