1#[cfg(feature = "sinks-splunk_hec")]
4pub use self::sink::*;
5#[cfg(feature = "sources-splunk_hec")]
6pub use self::source::*;
7
8#[cfg(feature = "sinks-splunk_hec")]
9mod sink {
10 use metrics::{counter, gauge};
11 use serde_json::Error;
12 use vector_lib::internal_event::{
13 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
14 };
15
16 use crate::{
17 event::metric::{MetricKind, MetricValue},
18 sinks::splunk_hec::common::acknowledgements::HecAckApiError,
19 };
20
21 #[derive(Debug)]
22 pub struct SplunkEventEncodeError {
23 pub error: vector_lib::Error,
24 }
25
26 impl InternalEvent for SplunkEventEncodeError {
27 fn emit(self) {
28 let reason = "Failed to encode Splunk HEC event as JSON.";
29 error!(
30 message = reason,
31 error = ?self.error,
32 error_code = "serializing_json",
33 error_type = error_type::ENCODER_FAILED,
34 stage = error_stage::PROCESSING,
35 internal_log_rate_limit = true,
36 );
37 counter!(
38 "component_errors_total",
39 "error_code" => "serializing_json",
40 "error_type" => error_type::ENCODER_FAILED,
41 "stage" => error_stage::PROCESSING,
42 )
43 .increment(1);
44 emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
45 }
46 }
47
48 #[derive(Debug)]
49 pub(crate) struct SplunkInvalidMetricReceivedError<'a> {
50 pub value: &'a MetricValue,
51 pub kind: &'a MetricKind,
52 pub error: crate::Error,
53 }
54
55 impl InternalEvent for SplunkInvalidMetricReceivedError<'_> {
56 fn emit(self) {
57 error!(
58 message = "Invalid metric received.",
59 error = ?self.error,
60 error_type = error_type::INVALID_METRIC,
61 stage = error_stage::PROCESSING,
62 value = ?self.value,
63 kind = ?self.kind,
64 internal_log_rate_limit = true,
65 );
66 counter!(
67 "component_errors_total",
68 "error_type" => error_type::INVALID_METRIC,
69 "stage" => error_stage::PROCESSING,
70 )
71 .increment(1);
72 counter!(
73 "component_discarded_events_total",
74 "error_type" => error_type::INVALID_METRIC,
75 "stage" => error_stage::PROCESSING,
76 )
77 .increment(1);
78 }
79 }
80
81 #[derive(Debug)]
82 pub struct SplunkResponseParseError {
83 pub error: Error,
84 }
85
86 impl InternalEvent for SplunkResponseParseError {
87 fn emit(self) {
88 error!(
89 message = "Unable to parse Splunk HEC response. Acknowledging based on initial 200 OK.",
90 error = ?self.error,
91 error_code = "invalid_response",
92 error_type = error_type::PARSER_FAILED,
93 stage = error_stage::SENDING,
94 internal_log_rate_limit = true,
95 );
96 counter!(
97 "component_errors_total",
98 "error_code" => "invalid_response",
99 "error_type" => error_type::PARSER_FAILED,
100 "stage" => error_stage::SENDING,
101 )
102 .increment(1);
103 }
104 }
105
106 #[derive(Debug)]
107 pub struct SplunkIndexerAcknowledgementAPIError {
108 pub message: &'static str,
109 pub error: HecAckApiError,
110 }
111
112 impl InternalEvent for SplunkIndexerAcknowledgementAPIError {
113 fn emit(self) {
114 error!(
115 message = self.message,
116 error = ?self.error,
117 error_code = "indexer_ack_failed",
118 error_type = error_type::ACKNOWLEDGMENT_FAILED,
119 stage = error_stage::SENDING,
120 internal_log_rate_limit = true,
121 );
122 counter!(
123 "component_errors_total",
124 "error_code" => "indexer_ack_failed",
125 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
126 "stage" => error_stage::SENDING,
127 )
128 .increment(1);
129 }
130 }
131
132 #[derive(Debug)]
133 pub struct SplunkIndexerAcknowledgementUnavailableError<E> {
134 pub error: E,
135 }
136
137 impl<E: std::fmt::Display> InternalEvent for SplunkIndexerAcknowledgementUnavailableError<E> {
138 fn emit(self) {
139 error!(
140 message = "Internal indexer acknowledgement client unavailable. Acknowledging based on initial 200 OK.",
141 error = %self.error,
142 error_code = "indexer_ack_unavailable",
143 error_type = error_type::ACKNOWLEDGMENT_FAILED,
144 stage = error_stage::SENDING,
145 internal_log_rate_limit = true,
146 );
147 counter!(
148 "component_errors_total",
149 "error_code" => "indexer_ack_unavailable",
150 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
151 "stage" => error_stage::SENDING,
152 )
153 .increment(1);
154 }
155 }
156
157 pub struct SplunkIndexerAcknowledgementAckAdded;
158
159 impl InternalEvent for SplunkIndexerAcknowledgementAckAdded {
160 fn emit(self) {
161 gauge!("splunk_pending_acks").increment(1.0);
162 }
163 }
164
165 pub struct SplunkIndexerAcknowledgementAcksRemoved {
166 pub count: f64,
167 }
168
169 impl InternalEvent for SplunkIndexerAcknowledgementAcksRemoved {
170 fn emit(self) {
171 gauge!("splunk_pending_acks").decrement(self.count);
172 }
173 }
174
175 pub struct SplunkEventTimestampInvalidType<'a> {
176 pub r#type: &'a str,
177 }
178
179 impl InternalEvent for SplunkEventTimestampInvalidType<'_> {
180 fn emit(self) {
181 warn!(
182 message =
183 "Timestamp was an unexpected type. Deferring to Splunk to set the timestamp.",
184 invalid_type = self.r#type,
185 internal_log_rate_limit = true
186 );
187 }
188 }
189
190 pub struct SplunkEventTimestampMissing;
191
192 impl InternalEvent for SplunkEventTimestampMissing {
193 fn emit(self) {
194 warn!(
195 message = "Timestamp was not found. Deferring to Splunk to set the timestamp.",
196 internal_log_rate_limit = true
197 );
198 }
199 }
200}
201
202#[cfg(feature = "sources-splunk_hec")]
203mod source {
204 use metrics::counter;
205 use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
206
207 use crate::sources::splunk_hec::ApiError;
208
209 #[derive(Debug)]
210 pub struct SplunkHecRequestBodyInvalidError {
211 pub error: std::io::Error,
212 }
213
214 impl InternalEvent for SplunkHecRequestBodyInvalidError {
215 fn emit(self) {
216 error!(
217 message = "Invalid request body.",
218 error = ?self.error,
219 error_code = "invalid_request_body",
220 error_type = error_type::PARSER_FAILED,
221 stage = error_stage::PROCESSING,
222 internal_log_rate_limit = true
223 );
224 counter!(
225 "component_errors_total",
226 "error_code" => "invalid_request_body",
227 "error_type" => error_type::PARSER_FAILED,
228 "stage" => error_stage::PROCESSING,
229 )
230 .increment(1);
231 }
232 }
233
234 #[derive(Debug)]
235 pub struct SplunkHecRequestError {
236 pub(crate) error: ApiError,
237 }
238
239 impl InternalEvent for SplunkHecRequestError {
240 fn emit(self) {
241 error!(
242 message = "Error processing request.",
243 error = ?self.error,
244 error_type = error_type::REQUEST_FAILED,
245 stage = error_stage::RECEIVING,
246 internal_log_rate_limit = true
247 );
248 counter!(
249 "component_errors_total",
250 "error_type" => error_type::REQUEST_FAILED,
251 "stage" => error_stage::RECEIVING,
252 )
253 .increment(1);
254 }
255 }
256}