1#[cfg(feature = "sinks-splunk_hec")]
4pub use self::sink::*;
5#[cfg(feature = "sources-splunk_hec")]
6pub use self::source::*;
7
8#[cfg(feature = "sinks-splunk_hec")]
9mod sink {
10 use metrics::{counter, gauge};
11 use serde_json::Error;
12 use vector_lib::internal_event::InternalEvent;
13 use vector_lib::internal_event::{
14 error_stage, error_type, ComponentEventsDropped, UNINTENTIONAL,
15 };
16
17 use crate::{
18 event::metric::{MetricKind, MetricValue},
19 sinks::splunk_hec::common::acknowledgements::HecAckApiError,
20 };
21
22 #[derive(Debug)]
23 pub struct SplunkEventEncodeError {
24 pub error: vector_lib::Error,
25 }
26
27 impl InternalEvent for SplunkEventEncodeError {
28 fn emit(self) {
29 let reason = "Failed to encode Splunk HEC event as JSON.";
30 error!(
31 message = reason,
32 error = ?self.error,
33 error_code = "serializing_json",
34 error_type = error_type::ENCODER_FAILED,
35 stage = error_stage::PROCESSING,
36 internal_log_rate_limit = true,
37 );
38 counter!(
39 "component_errors_total",
40 "error_code" => "serializing_json",
41 "error_type" => error_type::ENCODER_FAILED,
42 "stage" => error_stage::PROCESSING,
43 )
44 .increment(1);
45 emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
46 }
47 }
48
49 #[derive(Debug)]
50 pub(crate) struct SplunkInvalidMetricReceivedError<'a> {
51 pub value: &'a MetricValue,
52 pub kind: &'a MetricKind,
53 pub error: crate::Error,
54 }
55
56 impl InternalEvent for SplunkInvalidMetricReceivedError<'_> {
57 fn emit(self) {
58 error!(
59 message = "Invalid metric received.",
60 error = ?self.error,
61 error_type = error_type::INVALID_METRIC,
62 stage = error_stage::PROCESSING,
63 value = ?self.value,
64 kind = ?self.kind,
65 internal_log_rate_limit = true,
66 );
67 counter!(
68 "component_errors_total",
69 "error_type" => error_type::INVALID_METRIC,
70 "stage" => error_stage::PROCESSING,
71 )
72 .increment(1);
73 counter!(
74 "component_discarded_events_total",
75 "error_type" => error_type::INVALID_METRIC,
76 "stage" => error_stage::PROCESSING,
77 )
78 .increment(1);
79 }
80 }
81
82 #[derive(Debug)]
83 pub struct SplunkResponseParseError {
84 pub error: Error,
85 }
86
87 impl InternalEvent for SplunkResponseParseError {
88 fn emit(self) {
89 error!(
90 message = "Unable to parse Splunk HEC response. Acknowledging based on initial 200 OK.",
91 error = ?self.error,
92 error_code = "invalid_response",
93 error_type = error_type::PARSER_FAILED,
94 stage = error_stage::SENDING,
95 internal_log_rate_limit = true,
96 );
97 counter!(
98 "component_errors_total",
99 "error_code" => "invalid_response",
100 "error_type" => error_type::PARSER_FAILED,
101 "stage" => error_stage::SENDING,
102 )
103 .increment(1);
104 }
105 }
106
107 #[derive(Debug)]
108 pub struct SplunkIndexerAcknowledgementAPIError {
109 pub message: &'static str,
110 pub error: HecAckApiError,
111 }
112
113 impl InternalEvent for SplunkIndexerAcknowledgementAPIError {
114 fn emit(self) {
115 error!(
116 message = self.message,
117 error = ?self.error,
118 error_code = "indexer_ack_failed",
119 error_type = error_type::ACKNOWLEDGMENT_FAILED,
120 stage = error_stage::SENDING,
121 internal_log_rate_limit = true,
122 );
123 counter!(
124 "component_errors_total",
125 "error_code" => "indexer_ack_failed",
126 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
127 "stage" => error_stage::SENDING,
128 )
129 .increment(1);
130 }
131 }
132
133 #[derive(Debug)]
134 pub struct SplunkIndexerAcknowledgementUnavailableError<E> {
135 pub error: E,
136 }
137
138 impl<E: std::fmt::Display> InternalEvent for SplunkIndexerAcknowledgementUnavailableError<E> {
139 fn emit(self) {
140 error!(
141 message = "Internal indexer acknowledgement client unavailable. Acknowledging based on initial 200 OK.",
142 error = %self.error,
143 error_code = "indexer_ack_unavailable",
144 error_type = error_type::ACKNOWLEDGMENT_FAILED,
145 stage = error_stage::SENDING,
146 internal_log_rate_limit = true,
147 );
148 counter!(
149 "component_errors_total",
150 "error_code" => "indexer_ack_unavailable",
151 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
152 "stage" => error_stage::SENDING,
153 )
154 .increment(1);
155 }
156 }
157
158 pub struct SplunkIndexerAcknowledgementAckAdded;
159
160 impl InternalEvent for SplunkIndexerAcknowledgementAckAdded {
161 fn emit(self) {
162 gauge!("splunk_pending_acks").increment(1.0);
163 }
164 }
165
166 pub struct SplunkIndexerAcknowledgementAcksRemoved {
167 pub count: f64,
168 }
169
170 impl InternalEvent for SplunkIndexerAcknowledgementAcksRemoved {
171 fn emit(self) {
172 gauge!("splunk_pending_acks").decrement(self.count);
173 }
174 }
175
176 pub struct SplunkEventTimestampInvalidType<'a> {
177 pub r#type: &'a str,
178 }
179
180 impl InternalEvent for SplunkEventTimestampInvalidType<'_> {
181 fn emit(self) {
182 warn!(
183 message =
184 "Timestamp was an unexpected type. Deferring to Splunk to set the timestamp.",
185 invalid_type = self.r#type,
186 internal_log_rate_limit = true
187 );
188 }
189 }
190
191 pub struct SplunkEventTimestampMissing;
192
193 impl InternalEvent for SplunkEventTimestampMissing {
194 fn emit(self) {
195 warn!(
196 message = "Timestamp was not found. Deferring to Splunk to set the timestamp.",
197 internal_log_rate_limit = true
198 );
199 }
200 }
201}
202
203#[cfg(feature = "sources-splunk_hec")]
204mod source {
205 use metrics::counter;
206 use vector_lib::internal_event::InternalEvent;
207
208 use crate::sources::splunk_hec::ApiError;
209 use vector_lib::internal_event::{error_stage, error_type};
210
211 #[derive(Debug)]
212 pub struct SplunkHecRequestBodyInvalidError {
213 pub error: std::io::Error,
214 }
215
216 impl InternalEvent for SplunkHecRequestBodyInvalidError {
217 fn emit(self) {
218 error!(
219 message = "Invalid request body.",
220 error = ?self.error,
221 error_code = "invalid_request_body",
222 error_type = error_type::PARSER_FAILED,
223 stage = error_stage::PROCESSING,
224 internal_log_rate_limit = true
225 );
226 counter!(
227 "component_errors_total",
228 "error_code" => "invalid_request_body",
229 "error_type" => error_type::PARSER_FAILED,
230 "stage" => error_stage::PROCESSING,
231 )
232 .increment(1);
233 }
234 }
235
236 #[derive(Debug)]
237 pub struct SplunkHecRequestError {
238 pub(crate) error: ApiError,
239 }
240
241 impl InternalEvent for SplunkHecRequestError {
242 fn emit(self) {
243 error!(
244 message = "Error processing request.",
245 error = ?self.error,
246 error_type = error_type::REQUEST_FAILED,
247 stage = error_stage::RECEIVING,
248 internal_log_rate_limit = true
249 );
250 counter!(
251 "component_errors_total",
252 "error_type" => error_type::REQUEST_FAILED,
253 "stage" => error_stage::RECEIVING,
254 )
255 .increment(1);
256 }
257 }
258}