vector/internal_events/
codecs.rs

1use metrics::counter;
2use vector_lib::NamedInternalEvent;
3use vector_lib::internal_event::{
4    ComponentEventsDropped, InternalEvent, UNINTENTIONAL, error_stage, error_type,
5};
6
7#[derive(Debug, NamedInternalEvent)]
8pub struct DecoderFramingError<E> {
9    pub error: E,
10}
11
12impl<E: std::fmt::Display> InternalEvent for DecoderFramingError<E> {
13    fn emit(self) {
14        error!(
15            message = "Failed framing bytes.",
16            error = %self.error,
17            error_code = "decoder_frame",
18            error_type = error_type::PARSER_FAILED,
19            stage = error_stage::PROCESSING,
20        );
21        counter!(
22            "component_errors_total",
23            "error_code" => "decoder_frame",
24            "error_type" => error_type::PARSER_FAILED,
25            "stage" => error_stage::PROCESSING,
26        )
27        .increment(1);
28    }
29}
30
31#[derive(Debug, NamedInternalEvent)]
32pub struct DecoderDeserializeError<'a> {
33    pub error: &'a crate::Error,
34}
35
36impl InternalEvent for DecoderDeserializeError<'_> {
37    fn emit(self) {
38        error!(
39            message = "Failed deserializing frame.",
40            error = %self.error,
41            error_code = "decoder_deserialize",
42            error_type = error_type::PARSER_FAILED,
43            stage = error_stage::PROCESSING,
44        );
45        counter!(
46            "component_errors_total",
47            "error_code" => "decoder_deserialize",
48            "error_type" => error_type::PARSER_FAILED,
49            "stage" => error_stage::PROCESSING,
50        )
51        .increment(1);
52    }
53}
54
55#[derive(Debug, NamedInternalEvent)]
56pub struct EncoderFramingError<'a> {
57    pub error: &'a vector_lib::codecs::encoding::BoxedFramingError,
58}
59
60impl InternalEvent for EncoderFramingError<'_> {
61    fn emit(self) {
62        let reason = "Failed framing bytes.";
63        error!(
64            message = reason,
65            error = %self.error,
66            error_code = "encoder_frame",
67            error_type = error_type::ENCODER_FAILED,
68            stage = error_stage::SENDING,
69        );
70        counter!(
71            "component_errors_total",
72            "error_code" => "encoder_frame",
73            "error_type" => error_type::ENCODER_FAILED,
74            "stage" => error_stage::SENDING,
75        )
76        .increment(1);
77        emit!(ComponentEventsDropped::<UNINTENTIONAL> { count: 1, reason });
78    }
79}
80
81#[derive(Debug, NamedInternalEvent)]
82pub struct EncoderSerializeError<'a> {
83    pub error: &'a crate::Error,
84}
85
86impl InternalEvent for EncoderSerializeError<'_> {
87    fn emit(self) {
88        const SERIALIZE_REASON: &str = "Failed serializing frame.";
89        error!(
90            message = SERIALIZE_REASON,
91            error = %self.error,
92            error_code = "encoder_serialize",
93            error_type = error_type::ENCODER_FAILED,
94            stage = error_stage::SENDING,
95        );
96        counter!(
97            "component_errors_total",
98            "error_code" => "encoder_serialize",
99            "error_type" => error_type::ENCODER_FAILED,
100            "stage" => error_stage::SENDING,
101        )
102        .increment(1);
103        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
104            count: 1,
105            reason: SERIALIZE_REASON
106        });
107    }
108}
109
110#[derive(Debug, NamedInternalEvent)]
111pub struct EncoderWriteError<'a, E> {
112    pub error: &'a E,
113    pub count: usize,
114}
115
116impl<E: std::fmt::Display> InternalEvent for EncoderWriteError<'_, E> {
117    fn emit(self) {
118        let reason = "Failed writing bytes.";
119        error!(
120            message = reason,
121            error = %self.error,
122            error_type = error_type::IO_FAILED,
123            stage = error_stage::SENDING,
124        );
125        counter!(
126            "component_errors_total",
127            "error_type" => error_type::ENCODER_FAILED,
128            "stage" => error_stage::SENDING,
129        )
130        .increment(1);
131        if self.count > 0 {
132            emit!(ComponentEventsDropped::<UNINTENTIONAL> {
133                count: self.count,
134                reason,
135            });
136        }
137    }
138}
139
140#[cfg(feature = "codecs-arrow")]
141#[derive(Debug, NamedInternalEvent)]
142pub struct EncoderNullConstraintError<'a> {
143    pub error: &'a crate::Error,
144}
145
146#[cfg(feature = "codecs-arrow")]
147impl InternalEvent for EncoderNullConstraintError<'_> {
148    fn emit(self) {
149        const CONSTRAINT_REASON: &str = "Schema constraint violation.";
150        error!(
151            message = CONSTRAINT_REASON,
152            error = %self.error,
153            error_code = "encoding_null_constraint",
154            error_type = error_type::ENCODER_FAILED,
155            stage = error_stage::SENDING,
156        );
157        counter!(
158            "component_errors_total",
159            "error_code" => "encoding_null_constraint",
160            "error_type" => error_type::ENCODER_FAILED,
161            "stage" => error_stage::SENDING,
162        )
163        .increment(1);
164        emit!(ComponentEventsDropped::<UNINTENTIONAL> {
165            count: 1,
166            reason: CONSTRAINT_REASON
167        });
168    }
169}