vector/internal_events/
aws_kinesis_firehose.rs1use metrics::counter;
2use vector_lib::NamedInternalEvent;
3use vector_lib::internal_event::{InternalEvent, error_stage, error_type};
4
5use super::prelude::{http_error_code, io_error_code};
6use crate::sources::aws_kinesis_firehose::Compression;
7
8#[derive(Debug, NamedInternalEvent)]
9pub struct AwsKinesisFirehoseRequestReceived<'a> {
10 pub request_id: Option<&'a str>,
11 pub source_arn: Option<&'a str>,
12}
13
14impl InternalEvent for AwsKinesisFirehoseRequestReceived<'_> {
15 fn emit(self) {
16 debug!(
17 message = "Handling AWS Kinesis Firehose request.",
18 request_id = %self.request_id.unwrap_or_default(),
19 source_arn = %self.source_arn.unwrap_or_default()
20 );
21 }
22}
23
24#[derive(Debug, NamedInternalEvent)]
25pub struct AwsKinesisFirehoseRequestError<'a> {
26 request_id: Option<&'a str>,
27 error_code: String,
28 error: &'a str,
29}
30
31impl<'a> AwsKinesisFirehoseRequestError<'a> {
32 pub fn new(code: hyper::StatusCode, error: &'a str, request_id: Option<&'a str>) -> Self {
33 Self {
34 error_code: http_error_code(code.as_u16()),
35 error,
36 request_id,
37 }
38 }
39}
40
41impl InternalEvent for AwsKinesisFirehoseRequestError<'_> {
42 fn emit(self) {
43 error!(
44 message = "Error occurred while handling request.",
45 error = ?self.error,
46 stage = error_stage::RECEIVING,
47 error_type = error_type::REQUEST_FAILED,
48 error_code = %self.error_code,
49 request_id = %self.request_id.unwrap_or(""),
50 );
51 counter!(
52 "component_errors_total",
53 "stage" => error_stage::RECEIVING,
54 "error_type" => error_type::REQUEST_FAILED,
55 "error_code" => self.error_code,
56 )
57 .increment(1);
58 }
59}
60
61#[derive(Debug, NamedInternalEvent)]
62pub struct AwsKinesisFirehoseAutomaticRecordDecodeError {
63 pub compression: Compression,
64 pub error: std::io::Error,
65}
66
67impl InternalEvent for AwsKinesisFirehoseAutomaticRecordDecodeError {
68 fn emit(self) {
69 error!(
70 message = "Detected record failed to decode so passing along data as-is.",
71 error = ?self.error,
72 stage = error_stage::PROCESSING,
73 error_type = error_type::PARSER_FAILED,
74 error_code = %io_error_code(&self.error),
75 compression = %self.compression,
76 );
77 counter!(
78 "component_errors_total",
79 "stage" => error_stage::PROCESSING,
80 "error_type" => error_type::PARSER_FAILED,
81 "error_code" => io_error_code(&self.error),
82 )
83 .increment(1);
84 }
85}