1#[cfg(feature = "sinks-splunk_hec")]
4pub use self::sink::*;
5#[cfg(feature = "sources-splunk_hec")]
6pub use self::source::*;
7
8#[cfg(feature = "sinks-splunk_hec")]
9mod sink {
10 use metrics::{counter, gauge};
11 use serde_json::Error;
12 use vector_lib::NamedInternalEvent;
13 use vector_lib::internal_event::{
14 ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
15 };
16
17 use crate::{
18 event::metric::{MetricKind, MetricValue},
19 sinks::splunk_hec::common::acknowledgements::HecAckApiError,
20 };
21
22 #[derive(Debug, NamedInternalEvent)]
23 pub struct SplunkEventEncodeError {
24 pub error: vector_lib::Error,
25 }
26
27 impl InternalEvent for SplunkEventEncodeError {
28 fn emit(self) {
29 let reason = "Failed to encode Splunk HEC event as JSON.";
30 error!(
31 message = reason,
32 error = ?self.error,
33 error_code = "serializing_json",
34 error_type = error_type::ENCODER_FAILED,
35 stage = error_stage::PROCESSING,
36 );
37 counter!(
38 "component_errors_total",
39 "error_code" => "serializing_json",
40 "error_type" => error_type::ENCODER_FAILED,
41 "stage" => error_stage::PROCESSING,
42 )
43 .increment(1);
44 emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
45 }
46 }
47
48 #[derive(Debug, NamedInternalEvent)]
49 pub(crate) struct SplunkInvalidMetricReceivedError<'a> {
50 pub value: &'a MetricValue,
51 pub kind: &'a MetricKind,
52 pub error: crate::Error,
53 }
54
55 impl InternalEvent for SplunkInvalidMetricReceivedError<'_> {
56 fn emit(self) {
57 error!(
58 message = "Invalid metric received.",
59 error = ?self.error,
60 error_type = error_type::INVALID_METRIC,
61 stage = error_stage::PROCESSING,
62 value = ?self.value,
63 kind = ?self.kind,
64 );
65 counter!(
66 "component_errors_total",
67 "error_type" => error_type::INVALID_METRIC,
68 "stage" => error_stage::PROCESSING,
69 )
70 .increment(1);
71 counter!(
72 "component_discarded_events_total",
73 "error_type" => error_type::INVALID_METRIC,
74 "stage" => error_stage::PROCESSING,
75 )
76 .increment(1);
77 }
78 }
79
80 #[derive(Debug, NamedInternalEvent)]
81 pub struct SplunkResponseParseError {
82 pub error: Error,
83 }
84
85 impl InternalEvent for SplunkResponseParseError {
86 fn emit(self) {
87 error!(
88 message = "Unable to parse Splunk HEC response. Acknowledging based on initial 200 OK.",
89 error = ?self.error,
90 error_code = "invalid_response",
91 error_type = error_type::PARSER_FAILED,
92 stage = error_stage::SENDING,
93 );
94 counter!(
95 "component_errors_total",
96 "error_code" => "invalid_response",
97 "error_type" => error_type::PARSER_FAILED,
98 "stage" => error_stage::SENDING,
99 )
100 .increment(1);
101 }
102 }
103
104 #[derive(Debug, NamedInternalEvent)]
105 pub struct SplunkIndexerAcknowledgementAPIError {
106 pub message: &'static str,
107 pub error: HecAckApiError,
108 }
109
110 impl InternalEvent for SplunkIndexerAcknowledgementAPIError {
111 fn emit(self) {
112 error!(
113 message = self.message,
114 error = ?self.error,
115 error_code = "indexer_ack_failed",
116 error_type = error_type::ACKNOWLEDGMENT_FAILED,
117 stage = error_stage::SENDING,
118 );
119 counter!(
120 "component_errors_total",
121 "error_code" => "indexer_ack_failed",
122 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
123 "stage" => error_stage::SENDING,
124 )
125 .increment(1);
126 }
127 }
128
129 #[derive(Debug, NamedInternalEvent)]
130 pub struct SplunkIndexerAcknowledgementUnavailableError<E> {
131 pub error: E,
132 }
133
134 impl<E: std::fmt::Display> InternalEvent for SplunkIndexerAcknowledgementUnavailableError<E> {
135 fn emit(self) {
136 error!(
137 message = "Internal indexer acknowledgement client unavailable. Acknowledging based on initial 200 OK.",
138 error = %self.error,
139 error_code = "indexer_ack_unavailable",
140 error_type = error_type::ACKNOWLEDGMENT_FAILED,
141 stage = error_stage::SENDING,
142 );
143 counter!(
144 "component_errors_total",
145 "error_code" => "indexer_ack_unavailable",
146 "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
147 "stage" => error_stage::SENDING,
148 )
149 .increment(1);
150 }
151 }
152
153 #[derive(NamedInternalEvent)]
154 pub struct SplunkIndexerAcknowledgementAckAdded;
155
156 impl InternalEvent for SplunkIndexerAcknowledgementAckAdded {
157 fn emit(self) {
158 gauge!("splunk_pending_acks").increment(1.0);
159 }
160 }
161
162 #[derive(NamedInternalEvent)]
163 pub struct SplunkIndexerAcknowledgementAcksRemoved {
164 pub count: f64,
165 }
166
167 impl InternalEvent for SplunkIndexerAcknowledgementAcksRemoved {
168 fn emit(self) {
169 gauge!("splunk_pending_acks").decrement(self.count);
170 }
171 }
172
173 #[derive(NamedInternalEvent)]
174 pub struct SplunkEventTimestampInvalidType<'a> {
175 pub r#type: &'a str,
176 }
177
178 impl InternalEvent for SplunkEventTimestampInvalidType<'_> {
179 fn emit(self) {
180 warn!(
181 message =
182 "Timestamp was an unexpected type. Deferring to Splunk to set the timestamp.",
183 invalid_type = self.r#type
184 );
185 }
186 }
187
188 #[derive(NamedInternalEvent)]
189 pub struct SplunkEventTimestampMissing;
190
191 impl InternalEvent for SplunkEventTimestampMissing {
192 fn emit(self) {
193 warn!("Timestamp was not found. Deferring to Splunk to set the timestamp.");
194 }
195 }
196}
197
198#[cfg(feature = "sources-splunk_hec")]
199mod source {
200 use metrics::counter;
201 use vector_lib::NamedInternalEvent;
202 use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
203
204 use crate::sources::splunk_hec::ApiError;
205
206 #[derive(Debug, NamedInternalEvent)]
207 pub struct SplunkHecRequestBodyInvalidError {
208 pub error: std::io::Error,
209 }
210
211 impl InternalEvent for SplunkHecRequestBodyInvalidError {
212 fn emit(self) {
213 error!(
214 message = "Invalid request body.",
215 error = ?self.error,
216 error_code = "invalid_request_body",
217 error_type = error_type::PARSER_FAILED,
218 stage = error_stage::PROCESSING
219 );
220 counter!(
221 "component_errors_total",
222 "error_code" => "invalid_request_body",
223 "error_type" => error_type::PARSER_FAILED,
224 "stage" => error_stage::PROCESSING,
225 )
226 .increment(1);
227 }
228 }
229
230 #[derive(Debug, NamedInternalEvent)]
231 pub struct SplunkHecRequestError {
232 pub(crate) error: ApiError,
233 }
234
235 impl InternalEvent for SplunkHecRequestError {
236 fn emit(self) {
237 error!(
238 message = "Error processing request.",
239 error = ?self.error,
240 error_type = error_type::REQUEST_FAILED,
241 stage = error_stage::RECEIVING
242 );
243 counter!(
244 "component_errors_total",
245 "error_type" => error_type::REQUEST_FAILED,
246 "stage" => error_stage::RECEIVING,
247 )
248 .increment(1);
249 }
250 }
251}