1#[cfg(feature = "sinks-splunk_hec")]
4pub use self::sink::*;
5#[cfg(feature = "sources-splunk_hec")]
6pub use self::source::*;
7
8#[cfg(feature = "sinks-splunk_hec")]
9mod sink {
10 use serde_json::Error;
11 use vector_lib::{
12 NamedInternalEvent, counter, gauge,
13 internal_event::{
14 ComponentEventsDropped, CounterName, GaugeName, InternalEvent, UNINTENTIONAL,
15 error_stage, error_type,
16 },
17 };
18
19 use crate::{
20 event::metric::{MetricKind, MetricValue},
21 sinks::splunk_hec::common::acknowledgements::HecAckApiError,
22 };
23
24 #[derive(Debug, NamedInternalEvent)]
25 pub struct SplunkEventEncodeError {
26 pub error: vector_lib::Error,
27 }
28
29 impl InternalEvent for SplunkEventEncodeError {
30 fn emit(self) {
31 let reason = "Failed to encode Splunk HEC event as JSON.";
32 error!(
33 message = reason,
34 error = ?self.error,
35 error_code = "serializing_json",
36 error_type = error_type::ENCODER_FAILED,
37 stage = error_stage::PROCESSING,
38 );
39 counter!(
40 CounterName::ComponentErrorsTotal,
41 "error_code" => "serializing_json",
42 "error_type" => error_type::ENCODER_FAILED,
43 "stage" => error_stage::PROCESSING,
44 )
45 .increment(1);
46 emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
47 }
48 }
49
50 #[derive(Debug, NamedInternalEvent)]
51 pub(crate) struct SplunkInvalidMetricReceivedError<'a> {
52 pub value: &'a MetricValue,
53 pub kind: &'a MetricKind,
54 pub error: crate::Error,
55 }
56
57 impl InternalEvent for SplunkInvalidMetricReceivedError<'_> {
58 fn emit(self) {
59 error!(
60 message = "Invalid metric received.",
61 error = ?self.error,
62 error_type = error_type::INVALID_METRIC,
63 stage = error_stage::PROCESSING,
64 value = ?self.value,
65 kind = ?self.kind,
66 );
67 counter!(
68 CounterName::ComponentErrorsTotal,
69 "error_type" => error_type::INVALID_METRIC,
70 "stage" => error_stage::PROCESSING,
71 )
72 .increment(1);
73 counter!(
74 CounterName::ComponentDiscardedEventsTotal,
75 "error_type" => error_type::INVALID_METRIC,
76 "stage" => error_stage::PROCESSING,
77 )
78 .increment(1);
79 }
80 }
81
82 #[derive(Debug, NamedInternalEvent)]
83 pub struct SplunkResponseParseError {
84 pub error: Error,
85 }
86
87 impl InternalEvent for SplunkResponseParseError {
88 fn emit(self) {
89 error!(
90 message = "Unable to parse Splunk HEC response. Acknowledging based on initial 200 OK.",
91 error = ?self.error,
92 error_code = "invalid_response",
93 error_type = error_type::PARSER_FAILED,
94 stage = error_stage::SENDING,
95 );
96 counter!(
97 CounterName::ComponentErrorsTotal,
98 "error_code" => "invalid_response",
99 "error_type" => error_type::PARSER_FAILED,
100 "stage" => error_stage::SENDING,
101 )
102 .increment(1);
103 }
104 }
105
106 #[derive(Debug, NamedInternalEvent)]
107 pub struct SplunkIndexerAcknowledgementAPIError {
108 pub message: &'static str,
109 pub error: HecAckApiError,
110 }
111
112 impl InternalEvent for SplunkIndexerAcknowledgementAPIError {
113 fn emit(self) {
114 error!(
115 message = self.message,
116 error = ?self.error,
117 error_code = "indexer_ack_failed",
118 error_type = error_type::ACKNOWLEDGMENT_FAILED,
119 stage = error_stage::SENDING,
120 );
121 counter!(
122 CounterName::ComponentErrorsTotal,
123 "error_code" => "indexer_ack_failed",
124 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
125 "stage" => error_stage::SENDING,
126 )
127 .increment(1);
128 }
129 }
130
131 #[derive(Debug, NamedInternalEvent)]
132 pub struct SplunkIndexerAcknowledgementUnavailableError<E> {
133 pub error: E,
134 }
135
136 impl<E: std::fmt::Display> InternalEvent for SplunkIndexerAcknowledgementUnavailableError<E> {
137 fn emit(self) {
138 error!(
139 message = "Internal indexer acknowledgement client unavailable. Acknowledging based on initial 200 OK.",
140 error = %self.error,
141 error_code = "indexer_ack_unavailable",
142 error_type = error_type::ACKNOWLEDGMENT_FAILED,
143 stage = error_stage::SENDING,
144 );
145 counter!(
146 CounterName::ComponentErrorsTotal,
147 "error_code" => "indexer_ack_unavailable",
148 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
149 "stage" => error_stage::SENDING,
150 )
151 .increment(1);
152 }
153 }
154
155 #[derive(NamedInternalEvent)]
156 pub struct SplunkIndexerAcknowledgementAckAdded;
157
158 impl InternalEvent for SplunkIndexerAcknowledgementAckAdded {
159 fn emit(self) {
160 gauge!(GaugeName::SplunkPendingAcks).increment(1.0);
161 }
162 }
163
164 #[derive(NamedInternalEvent)]
165 pub struct SplunkIndexerAcknowledgementAcksRemoved {
166 pub count: f64,
167 }
168
169 impl InternalEvent for SplunkIndexerAcknowledgementAcksRemoved {
170 fn emit(self) {
171 gauge!(GaugeName::SplunkPendingAcks).decrement(self.count);
172 }
173 }
174
175 #[derive(NamedInternalEvent)]
176 pub struct SplunkEventTimestampInvalidType<'a> {
177 pub r#type: &'a str,
178 }
179
180 impl InternalEvent for SplunkEventTimestampInvalidType<'_> {
181 fn emit(self) {
182 warn!(
183 message =
184 "Timestamp was an unexpected type. Deferring to Splunk to set the timestamp.",
185 invalid_type = self.r#type
186 );
187 }
188 }
189
190 #[derive(NamedInternalEvent)]
191 pub struct SplunkEventTimestampMissing;
192
193 impl InternalEvent for SplunkEventTimestampMissing {
194 fn emit(self) {
195 warn!("Timestamp was not found. Deferring to Splunk to set the timestamp.");
196 }
197 }
198}
199
200#[cfg(feature = "sources-splunk_hec")]
201mod source {
202 use vector_lib::{
203 NamedInternalEvent, counter,
204 internal_event::{CounterName, InternalEvent, error_stage, error_type},
205 };
206
207 use crate::sources::splunk_hec::ApiError;
208
209 #[derive(Debug, NamedInternalEvent)]
210 pub struct SplunkHecRequestBodyInvalidError {
211 pub error: std::io::Error,
212 }
213
214 impl InternalEvent for SplunkHecRequestBodyInvalidError {
215 fn emit(self) {
216 error!(
217 message = "Invalid request body.",
218 error = ?self.error,
219 error_code = "invalid_request_body",
220 error_type = error_type::PARSER_FAILED,
221 stage = error_stage::PROCESSING
222 );
223 counter!(
224 CounterName::ComponentErrorsTotal,
225 "error_code" => "invalid_request_body",
226 "error_type" => error_type::PARSER_FAILED,
227 "stage" => error_stage::PROCESSING,
228 )
229 .increment(1);
230 }
231 }
232
233 #[derive(Debug, NamedInternalEvent)]
234 pub struct SplunkHecRequestError {
235 pub(crate) error: ApiError,
236 }
237
238 impl InternalEvent for SplunkHecRequestError {
239 fn emit(self) {
240 match self.error {
241 ApiError::InvalidAuthorization | ApiError::MissingAuthorization => {
242 error!(
243 message = "Unauthenticated request.",
244 error = ?self.error,
245 error_type = error_type::AUTHENTICATION_FAILED,
246 stage = error_stage::RECEIVING
247 );
248 counter!(
249 CounterName::ComponentErrorsTotal,
250 "error_type" => error_type::AUTHENTICATION_FAILED,
251 "stage" => error_stage::RECEIVING,
252 )
253 .increment(1);
254 }
255 _ => {
256 error!(
257 message = "Error processing request.",
258 error = ?self.error,
259 error_type = error_type::REQUEST_FAILED,
260 stage = error_stage::RECEIVING
261 );
262 counter!(
263 CounterName::ComponentErrorsTotal,
264 "error_type" => error_type::REQUEST_FAILED,
265 "stage" => error_stage::RECEIVING,
266 )
267 .increment(1);
268 }
269 }
270 }
271 }
272}