1#[cfg(feature = "sinks-splunk_hec")]
4pub use self::sink::*;
5#[cfg(feature = "sources-splunk_hec")]
6pub use self::source::*;
7
8#[cfg(feature = "sinks-splunk_hec")]
9mod sink {
10 use metrics::{counter, gauge};
11 use serde_json::Error;
12 use vector_lib::internal_event::{
13 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
14 };
15
16 use crate::{
17 event::metric::{MetricKind, MetricValue},
18 sinks::splunk_hec::common::acknowledgements::HecAckApiError,
19 };
20
21 #[derive(Debug)]
22 pub struct SplunkEventEncodeError {
23 pub error: vector_lib::Error,
24 }
25
26 impl InternalEvent for SplunkEventEncodeError {
27 fn emit(self) {
28 let reason = "Failed to encode Splunk HEC event as JSON.";
29 error!(
30 message = reason,
31 error = ?self.error,
32 error_code = "serializing_json",
33 error_type = error_type::ENCODER_FAILED,
34 stage = error_stage::PROCESSING,
35 );
36 counter!(
37 "component_errors_total",
38 "error_code" => "serializing_json",
39 "error_type" => error_type::ENCODER_FAILED,
40 "stage" => error_stage::PROCESSING,
41 )
42 .increment(1);
43 emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
44 }
45 }
46
47 #[derive(Debug)]
48 pub(crate) struct SplunkInvalidMetricReceivedError<'a> {
49 pub value: &'a MetricValue,
50 pub kind: &'a MetricKind,
51 pub error: crate::Error,
52 }
53
54 impl InternalEvent for SplunkInvalidMetricReceivedError<'_> {
55 fn emit(self) {
56 error!(
57 message = "Invalid metric received.",
58 error = ?self.error,
59 error_type = error_type::INVALID_METRIC,
60 stage = error_stage::PROCESSING,
61 value = ?self.value,
62 kind = ?self.kind,
63 );
64 counter!(
65 "component_errors_total",
66 "error_type" => error_type::INVALID_METRIC,
67 "stage" => error_stage::PROCESSING,
68 )
69 .increment(1);
70 counter!(
71 "component_discarded_events_total",
72 "error_type" => error_type::INVALID_METRIC,
73 "stage" => error_stage::PROCESSING,
74 )
75 .increment(1);
76 }
77 }
78
79 #[derive(Debug)]
80 pub struct SplunkResponseParseError {
81 pub error: Error,
82 }
83
84 impl InternalEvent for SplunkResponseParseError {
85 fn emit(self) {
86 error!(
87 message = "Unable to parse Splunk HEC response. Acknowledging based on initial 200 OK.",
88 error = ?self.error,
89 error_code = "invalid_response",
90 error_type = error_type::PARSER_FAILED,
91 stage = error_stage::SENDING,
92 );
93 counter!(
94 "component_errors_total",
95 "error_code" => "invalid_response",
96 "error_type" => error_type::PARSER_FAILED,
97 "stage" => error_stage::SENDING,
98 )
99 .increment(1);
100 }
101 }
102
103 #[derive(Debug)]
104 pub struct SplunkIndexerAcknowledgementAPIError {
105 pub message: &'static str,
106 pub error: HecAckApiError,
107 }
108
109 impl InternalEvent for SplunkIndexerAcknowledgementAPIError {
110 fn emit(self) {
111 error!(
112 message = self.message,
113 error = ?self.error,
114 error_code = "indexer_ack_failed",
115 error_type = error_type::ACKNOWLEDGMENT_FAILED,
116 stage = error_stage::SENDING,
117 );
118 counter!(
119 "component_errors_total",
120 "error_code" => "indexer_ack_failed",
121 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
122 "stage" => error_stage::SENDING,
123 )
124 .increment(1);
125 }
126 }
127
128 #[derive(Debug)]
129 pub struct SplunkIndexerAcknowledgementUnavailableError<E> {
130 pub error: E,
131 }
132
133 impl<E: std::fmt::Display> InternalEvent for SplunkIndexerAcknowledgementUnavailableError<E> {
134 fn emit(self) {
135 error!(
136 message = "Internal indexer acknowledgement client unavailable. Acknowledging based on initial 200 OK.",
137 error = %self.error,
138 error_code = "indexer_ack_unavailable",
139 error_type = error_type::ACKNOWLEDGMENT_FAILED,
140 stage = error_stage::SENDING,
141 );
142 counter!(
143 "component_errors_total",
144 "error_code" => "indexer_ack_unavailable",
145 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
146 "stage" => error_stage::SENDING,
147 )
148 .increment(1);
149 }
150 }
151
152 pub struct SplunkIndexerAcknowledgementAckAdded;
153
154 impl InternalEvent for SplunkIndexerAcknowledgementAckAdded {
155 fn emit(self) {
156 gauge!("splunk_pending_acks").increment(1.0);
157 }
158 }
159
160 pub struct SplunkIndexerAcknowledgementAcksRemoved {
161 pub count: f64,
162 }
163
164 impl InternalEvent for SplunkIndexerAcknowledgementAcksRemoved {
165 fn emit(self) {
166 gauge!("splunk_pending_acks").decrement(self.count);
167 }
168 }
169
170 pub struct SplunkEventTimestampInvalidType<'a> {
171 pub r#type: &'a str,
172 }
173
174 impl InternalEvent for SplunkEventTimestampInvalidType<'_> {
175 fn emit(self) {
176 warn!(
177 message =
178 "Timestamp was an unexpected type. Deferring to Splunk to set the timestamp.",
179 invalid_type = self.r#type
180 );
181 }
182 }
183
184 pub struct SplunkEventTimestampMissing;
185
186 impl InternalEvent for SplunkEventTimestampMissing {
187 fn emit(self) {
188 warn!("Timestamp was not found. Deferring to Splunk to set the timestamp.");
189 }
190 }
191}
192
193#[cfg(feature = "sources-splunk_hec")]
194mod source {
195 use metrics::counter;
196 use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
197
198 use crate::sources::splunk_hec::ApiError;
199
200 #[derive(Debug)]
201 pub struct SplunkHecRequestBodyInvalidError {
202 pub error: std::io::Error,
203 }
204
205 impl InternalEvent for SplunkHecRequestBodyInvalidError {
206 fn emit(self) {
207 error!(
208 message = "Invalid request body.",
209 error = ?self.error,
210 error_code = "invalid_request_body",
211 error_type = error_type::PARSER_FAILED,
212 stage = error_stage::PROCESSING
213 );
214 counter!(
215 "component_errors_total",
216 "error_code" => "invalid_request_body",
217 "error_type" => error_type::PARSER_FAILED,
218 "stage" => error_stage::PROCESSING,
219 )
220 .increment(1);
221 }
222 }
223
224 #[derive(Debug)]
225 pub struct SplunkHecRequestError {
226 pub(crate) error: ApiError,
227 }
228
229 impl InternalEvent for SplunkHecRequestError {
230 fn emit(self) {
231 error!(
232 message = "Error processing request.",
233 error = ?self.error,
234 error_type = error_type::REQUEST_FAILED,
235 stage = error_stage::RECEIVING
236 );
237 counter!(
238 "component_errors_total",
239 "error_type" => error_type::REQUEST_FAILED,
240 "stage" => error_stage::RECEIVING,
241 )
242 .increment(1);
243 }
244 }
245}