vector/internal_events/
splunk_hec.rs

1// ## skip check-dropped-events ##
2
3#[cfg(feature = "sinks-splunk_hec")]
4pub use self::sink::*;
5#[cfg(feature = "sources-splunk_hec")]
6pub use self::source::*;
7
8#[cfg(feature = "sinks-splunk_hec")]
9mod sink {
10    use metrics::{counter, gauge};
11    use serde_json::Error;
12    use vector_lib::NamedInternalEvent;
13    use vector_lib::internal_event::{
14        ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
15    };
16
17    use crate::{
18        event::metric::{MetricKind, MetricValue},
19        sinks::splunk_hec::common::acknowledgements::HecAckApiError,
20    };
21
22    #[derive(Debug, NamedInternalEvent)]
23    pub struct SplunkEventEncodeError {
24        pub error: vector_lib::Error,
25    }
26
27    impl InternalEvent for SplunkEventEncodeError {
28        fn emit(self) {
29            let reason = "Failed to encode Splunk HEC event as JSON.";
30            error!(
31                message = reason,
32                error = ?self.error,
33                error_code = "serializing_json",
34                error_type = error_type::ENCODER_FAILED,
35                stage = error_stage::PROCESSING,
36            );
37            counter!(
38                "component_errors_total",
39                "error_code" => "serializing_json",
40                "error_type" => error_type::ENCODER_FAILED,
41                "stage" => error_stage::PROCESSING,
42            )
43            .increment(1);
44            emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
45        }
46    }
47
48    #[derive(Debug, NamedInternalEvent)]
49    pub(crate) struct SplunkInvalidMetricReceivedError<'a> {
50        pub value: &'a MetricValue,
51        pub kind: &'a MetricKind,
52        pub error: crate::Error,
53    }
54
55    impl InternalEvent for SplunkInvalidMetricReceivedError<'_> {
56        fn emit(self) {
57            error!(
58                message = "Invalid metric received.",
59                error = ?self.error,
60                error_type = error_type::INVALID_METRIC,
61                stage = error_stage::PROCESSING,
62                value = ?self.value,
63                kind = ?self.kind,
64            );
65            counter!(
66                "component_errors_total",
67                "error_type" => error_type::INVALID_METRIC,
68                "stage" => error_stage::PROCESSING,
69            )
70            .increment(1);
71            counter!(
72                "component_discarded_events_total",
73                "error_type" => error_type::INVALID_METRIC,
74                "stage" => error_stage::PROCESSING,
75            )
76            .increment(1);
77        }
78    }
79
80    #[derive(Debug, NamedInternalEvent)]
81    pub struct SplunkResponseParseError {
82        pub error: Error,
83    }
84
85    impl InternalEvent for SplunkResponseParseError {
86        fn emit(self) {
87            error!(
88                message = "Unable to parse Splunk HEC response. Acknowledging based on initial 200 OK.",
89                error = ?self.error,
90                error_code = "invalid_response",
91                error_type = error_type::PARSER_FAILED,
92                stage = error_stage::SENDING,
93            );
94            counter!(
95                "component_errors_total",
96                "error_code" => "invalid_response",
97                "error_type" => error_type::PARSER_FAILED,
98                "stage" => error_stage::SENDING,
99            )
100            .increment(1);
101        }
102    }
103
104    #[derive(Debug, NamedInternalEvent)]
105    pub struct SplunkIndexerAcknowledgementAPIError {
106        pub message: &'static str,
107        pub error: HecAckApiError,
108    }
109
110    impl InternalEvent for SplunkIndexerAcknowledgementAPIError {
111        fn emit(self) {
112            error!(
113                message = self.message,
114                error = ?self.error,
115                error_code = "indexer_ack_failed",
116                error_type = error_type::ACKNOWLEDGMENT_FAILED,
117                stage = error_stage::SENDING,
118            );
119            counter!(
120                "component_errors_total",
121                "error_code" => "indexer_ack_failed",
122                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
123                "stage" => error_stage::SENDING,
124            )
125            .increment(1);
126        }
127    }
128
129    #[derive(Debug, NamedInternalEvent)]
130    pub struct SplunkIndexerAcknowledgementUnavailableError<E> {
131        pub error: E,
132    }
133
134    impl<E: std::fmt::Display> InternalEvent for SplunkIndexerAcknowledgementUnavailableError<E> {
135        fn emit(self) {
136            error!(
137                message = "Internal indexer acknowledgement client unavailable. Acknowledging based on initial 200 OK.",
138                error = %self.error,
139                error_code = "indexer_ack_unavailable",
140                error_type = error_type::ACKNOWLEDGMENT_FAILED,
141                stage = error_stage::SENDING,
142            );
143            counter!(
144                "component_errors_total",
145                "error_code" => "indexer_ack_unavailable",
146                "error_type" => error_type::ACKNOWLEDGMENT_FAILED,
147                "stage" => error_stage::SENDING,
148            )
149            .increment(1);
150        }
151    }
152
153    #[derive(NamedInternalEvent)]
154    pub struct SplunkIndexerAcknowledgementAckAdded;
155
156    impl InternalEvent for SplunkIndexerAcknowledgementAckAdded {
157        fn emit(self) {
158            gauge!("splunk_pending_acks").increment(1.0);
159        }
160    }
161
162    #[derive(NamedInternalEvent)]
163    pub struct SplunkIndexerAcknowledgementAcksRemoved {
164        pub count: f64,
165    }
166
167    impl InternalEvent for SplunkIndexerAcknowledgementAcksRemoved {
168        fn emit(self) {
169            gauge!("splunk_pending_acks").decrement(self.count);
170        }
171    }
172
173    #[derive(NamedInternalEvent)]
174    pub struct SplunkEventTimestampInvalidType<'a> {
175        pub r#type: &'a str,
176    }
177
178    impl InternalEvent for SplunkEventTimestampInvalidType<'_> {
179        fn emit(self) {
180            warn!(
181                message =
182                    "Timestamp was an unexpected type. Deferring to Splunk to set the timestamp.",
183                invalid_type = self.r#type
184            );
185        }
186    }
187
188    #[derive(NamedInternalEvent)]
189    pub struct SplunkEventTimestampMissing;
190
191    impl InternalEvent for SplunkEventTimestampMissing {
192        fn emit(self) {
193            warn!("Timestamp was not found. Deferring to Splunk to set the timestamp.");
194        }
195    }
196}
197
198#[cfg(feature = "sources-splunk_hec")]
199mod source {
200    use metrics::counter;
201    use vector_lib::NamedInternalEvent;
202    use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
203
204    use crate::sources::splunk_hec::ApiError;
205
206    #[derive(Debug, NamedInternalEvent)]
207    pub struct SplunkHecRequestBodyInvalidError {
208        pub error: std::io::Error,
209    }
210
211    impl InternalEvent for SplunkHecRequestBodyInvalidError {
212        fn emit(self) {
213            error!(
214                message = "Invalid request body.",
215                error = ?self.error,
216                error_code = "invalid_request_body",
217                error_type = error_type::PARSER_FAILED,
218                stage = error_stage::PROCESSING
219            );
220            counter!(
221                "component_errors_total",
222                "error_code" => "invalid_request_body",
223                "error_type" => error_type::PARSER_FAILED,
224                "stage" => error_stage::PROCESSING,
225            )
226            .increment(1);
227        }
228    }
229
230    #[derive(Debug, NamedInternalEvent)]
231    pub struct SplunkHecRequestError {
232        pub(crate) error: ApiError,
233    }
234
235    impl InternalEvent for SplunkHecRequestError {
236        fn emit(self) {
237            error!(
238                message = "Error processing request.",
239                error = ?self.error,
240                error_type = error_type::REQUEST_FAILED,
241                stage = error_stage::RECEIVING
242            );
243            counter!(
244                "component_errors_total",
245                "error_type" => error_type::REQUEST_FAILED,
246                "stage" => error_stage::RECEIVING,
247            )
248            .increment(1);
249        }
250    }
251}