vector/sinks/
papertrail.rs

1use bytes::{BufMut, BytesMut};
2use syslog::{Facility, Formatter3164, LogFormat, Severity};
3use vector_lib::configurable::configurable_component;
4use vrl::value::Kind;
5
6use crate::{
7    codecs::{Encoder, EncodingConfig, Transformer},
8    config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
9    event::Event,
10    internal_events::TemplateRenderingError,
11    schema,
12    sinks::util::{tcp::TcpSinkConfig, UriSerde},
13    tcp::TcpKeepaliveConfig,
14    template::Template,
15    tls::TlsEnableableConfig,
16};
17
18/// Configuration for the `papertrail` sink.
19#[configurable_component(sink("papertrail", "Deliver log events to Papertrail from SolarWinds."))]
20#[derive(Clone, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct PapertrailConfig {
23    /// The TCP endpoint to send logs to.
24    #[configurable(metadata(docs::examples = "logs.papertrailapp.com:12345"))]
25    endpoint: UriSerde,
26
27    #[configurable(derived)]
28    encoding: EncodingConfig,
29
30    #[configurable(derived)]
31    keepalive: Option<TcpKeepaliveConfig>,
32
33    #[configurable(derived)]
34    tls: Option<TlsEnableableConfig>,
35
36    /// Configures the send buffer size using the `SO_SNDBUF` option on the socket.
37    send_buffer_bytes: Option<usize>,
38
39    /// The value to use as the `process` in Papertrail.
40    #[configurable(metadata(docs::examples = "{{ process }}", docs::examples = "my-process",))]
41    #[serde(default = "default_process")]
42    process: Template,
43
44    #[configurable(derived)]
45    #[serde(
46        default,
47        deserialize_with = "crate::serde::bool_or_struct",
48        skip_serializing_if = "crate::serde::is_default"
49    )]
50    acknowledgements: AcknowledgementsConfig,
51}
52
53fn default_process() -> Template {
54    Template::try_from("vector").unwrap()
55}
56
57impl GenerateConfig for PapertrailConfig {
58    fn generate_config() -> toml::Value {
59        toml::from_str(
60            r#"endpoint = "logs.papertrailapp.com:12345"
61            encoding.codec = "json""#,
62        )
63        .unwrap()
64    }
65}
66
67#[async_trait::async_trait]
68#[typetag::serde(name = "papertrail")]
69impl SinkConfig for PapertrailConfig {
70    async fn build(
71        &self,
72        _cx: SinkContext,
73    ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
74        let host = self
75            .endpoint
76            .uri
77            .host()
78            .map(str::to_string)
79            .ok_or_else(|| "A host is required for endpoint".to_string())?;
80        let port = self
81            .endpoint
82            .uri
83            .port_u16()
84            .ok_or_else(|| "A port is required for endpoint".to_string())?;
85
86        let address = format!("{host}:{port}");
87        let tls = Some(
88            self.tls
89                .clone()
90                .unwrap_or_else(TlsEnableableConfig::enabled),
91        );
92
93        let pid = std::process::id();
94        let process = self.process.clone();
95
96        let sink_config = TcpSinkConfig::new(address, self.keepalive, tls, self.send_buffer_bytes);
97
98        let transformer = self.encoding.transformer();
99        let serializer = self.encoding.build()?;
100        let encoder = Encoder::<()>::new(serializer);
101
102        sink_config.build(
103            Transformer::default(),
104            PapertrailEncoder {
105                pid,
106                process,
107                transformer,
108                encoder,
109            },
110        )
111    }
112
113    fn input(&self) -> Input {
114        let requirement = schema::Requirement::empty().optional_meaning("host", Kind::bytes());
115
116        Input::new(self.encoding.config().input_type() & DataType::Log)
117            .with_schema_requirement(requirement)
118    }
119
120    fn acknowledgements(&self) -> &AcknowledgementsConfig {
121        &self.acknowledgements
122    }
123}
124
125#[derive(Debug, Clone)]
126struct PapertrailEncoder {
127    pid: u32,
128    process: Template,
129    transformer: Transformer,
130    encoder: Encoder<()>,
131}
132
133impl tokio_util::codec::Encoder<Event> for PapertrailEncoder {
134    type Error = vector_lib::codecs::encoding::Error;
135
136    fn encode(
137        &mut self,
138        mut event: Event,
139        buffer: &mut bytes::BytesMut,
140    ) -> Result<(), Self::Error> {
141        let host = event
142            .as_mut_log()
143            .get_host()
144            .map(|host| host.to_string_lossy().into_owned());
145
146        let process = self
147            .process
148            .render_string(&event)
149            .map_err(|error| {
150                emit!(TemplateRenderingError {
151                    error,
152                    field: Some("process"),
153                    drop_event: false,
154                })
155            })
156            .ok()
157            .unwrap_or_else(|| String::from("vector"));
158
159        let formatter = Formatter3164 {
160            facility: Facility::LOG_USER,
161            hostname: host,
162            process,
163            pid: self.pid,
164        };
165
166        self.transformer.transform(&mut event);
167
168        let mut bytes = BytesMut::new();
169        self.encoder.encode(event, &mut bytes)?;
170
171        let message = String::from_utf8_lossy(&bytes);
172
173        formatter
174            .format(&mut buffer.writer(), Severity::LOG_INFO, message)
175            .map_err(|error| Self::Error::SerializingError(format!("{error}").into()))?;
176
177        buffer.put_u8(b'\n');
178
179        Ok(())
180    }
181}
182
183#[cfg(test)]
184mod tests {
185    use serde::Deserialize;
186    use std::convert::TryFrom;
187
188    use bytes::BytesMut;
189    use futures::{future::ready, stream};
190    use tokio_util::codec::Encoder as _;
191    use vector_lib::codecs::JsonSerializerConfig;
192    use vector_lib::event::{Event, LogEvent};
193
194    use crate::test_util::{
195        components::{run_and_assert_sink_compliance, SINK_TAGS},
196        http::{always_200_response, spawn_blackhole_http_server},
197    };
198
199    use super::*;
200
201    #[test]
202    fn generate_config() {
203        crate::test_util::test_generate_config::<PapertrailConfig>();
204    }
205
206    #[tokio::test]
207    async fn component_spec_compliance() {
208        let mock_endpoint = spawn_blackhole_http_server(always_200_response).await;
209
210        let config = PapertrailConfig::generate_config().to_string();
211        let mut config = PapertrailConfig::deserialize(
212            toml::de::ValueDeserializer::parse(&config).expect("toml should deserialize"),
213        )
214        .expect("config should be valid");
215        config.endpoint = mock_endpoint.into();
216        config.tls = Some(TlsEnableableConfig::default());
217
218        let context = SinkContext::default();
219        let (sink, _healthcheck) = config.build(context).await.unwrap();
220
221        let event = Event::Log(LogEvent::from("simple message"));
222        run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await;
223    }
224
225    #[test]
226    fn encode_event_apply_rules() {
227        let mut evt = Event::Log(LogEvent::from("vector"));
228        evt.as_mut_log().insert("magic", "key");
229        evt.as_mut_log().insert("process", "foo");
230
231        let mut encoder = PapertrailEncoder {
232            pid: 0,
233            process: Template::try_from("{{ process }}").unwrap(),
234            transformer: Transformer::new(None, Some(vec!["magic".into()]), None).unwrap(),
235            encoder: Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
236        };
237
238        let mut bytes = BytesMut::new();
239        encoder.encode(evt, &mut bytes).unwrap();
240        let bytes = bytes.freeze();
241
242        let msg = bytes.slice(String::from_utf8_lossy(&bytes).find(": ").unwrap() + 2..bytes.len());
243        let value: serde_json::Value = serde_json::from_slice(&msg).unwrap();
244        let value = value.as_object().unwrap();
245
246        assert!(!value.contains_key("magic"));
247        assert_eq!(value.get("process").unwrap().as_str(), Some("foo"));
248    }
249}