vector/internal_events/
aws_kinesis_firehose.rs1use metrics::counter;
2use vector_lib::internal_event::InternalEvent;
3use vector_lib::internal_event::{error_stage, error_type};
4
5use super::prelude::{http_error_code, io_error_code};
6use crate::sources::aws_kinesis_firehose::Compression;
7
8#[derive(Debug)]
9pub struct AwsKinesisFirehoseRequestReceived<'a> {
10 pub request_id: Option<&'a str>,
11 pub source_arn: Option<&'a str>,
12}
13
14impl InternalEvent for AwsKinesisFirehoseRequestReceived<'_> {
15 fn emit(self) {
16 debug!(
17 message = "Handling AWS Kinesis Firehose request.",
18 request_id = %self.request_id.unwrap_or_default(),
19 source_arn = %self.source_arn.unwrap_or_default(),
20 internal_log_rate_limit = true
21 );
22 }
23}
24
25#[derive(Debug)]
26pub struct AwsKinesisFirehoseRequestError<'a> {
27 request_id: Option<&'a str>,
28 error_code: String,
29 error: &'a str,
30}
31
32impl<'a> AwsKinesisFirehoseRequestError<'a> {
33 pub fn new(code: hyper::StatusCode, error: &'a str, request_id: Option<&'a str>) -> Self {
34 Self {
35 error_code: http_error_code(code.as_u16()),
36 error,
37 request_id,
38 }
39 }
40}
41
42impl InternalEvent for AwsKinesisFirehoseRequestError<'_> {
43 fn emit(self) {
44 error!(
45 message = "Error occurred while handling request.",
46 error = ?self.error,
47 stage = error_stage::RECEIVING,
48 error_type = error_type::REQUEST_FAILED,
49 error_code = %self.error_code,
50 request_id = %self.request_id.unwrap_or(""),
51 internal_log_rate_limit = true,
52 );
53 counter!(
54 "component_errors_total",
55 "stage" => error_stage::RECEIVING,
56 "error_type" => error_type::REQUEST_FAILED,
57 "error_code" => self.error_code,
58 )
59 .increment(1);
60 }
61}
62
63#[derive(Debug)]
64pub struct AwsKinesisFirehoseAutomaticRecordDecodeError {
65 pub compression: Compression,
66 pub error: std::io::Error,
67}
68
69impl InternalEvent for AwsKinesisFirehoseAutomaticRecordDecodeError {
70 fn emit(self) {
71 error!(
72 message = "Detected record failed to decode so passing along data as-is.",
73 error = ?self.error,
74 stage = error_stage::PROCESSING,
75 error_type = error_type::PARSER_FAILED,
76 error_code = %io_error_code(&self.error),
77 compression = %self.compression,
78 internal_log_rate_limit = true,
79 );
80 counter!(
81 "component_errors_total",
82 "stage" => error_stage::PROCESSING,
83 "error_type" => error_type::PARSER_FAILED,
84 "error_code" => io_error_code(&self.error),
85 )
86 .increment(1);
87 }
88}