1use bytes::{BufMut, BytesMut};
2use syslog::{Facility, Formatter3164, LogFormat, Severity};
3use vector_lib::configurable::configurable_component;
4use vrl::value::Kind;
5
6use crate::{
7 codecs::{Encoder, EncodingConfig, Transformer},
8 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
9 event::Event,
10 internal_events::TemplateRenderingError,
11 schema,
12 sinks::util::{tcp::TcpSinkConfig, UriSerde},
13 tcp::TcpKeepaliveConfig,
14 template::Template,
15 tls::TlsEnableableConfig,
16};
17
18#[configurable_component(sink("papertrail", "Deliver log events to Papertrail from SolarWinds."))]
20#[derive(Clone, Debug)]
21#[serde(deny_unknown_fields)]
22pub struct PapertrailConfig {
23 #[configurable(metadata(docs::examples = "logs.papertrailapp.com:12345"))]
25 endpoint: UriSerde,
26
27 #[configurable(derived)]
28 encoding: EncodingConfig,
29
30 #[configurable(derived)]
31 keepalive: Option<TcpKeepaliveConfig>,
32
33 #[configurable(derived)]
34 tls: Option<TlsEnableableConfig>,
35
36 send_buffer_bytes: Option<usize>,
38
39 #[configurable(metadata(docs::examples = "{{ process }}", docs::examples = "my-process",))]
41 #[serde(default = "default_process")]
42 process: Template,
43
44 #[configurable(derived)]
45 #[serde(
46 default,
47 deserialize_with = "crate::serde::bool_or_struct",
48 skip_serializing_if = "crate::serde::is_default"
49 )]
50 acknowledgements: AcknowledgementsConfig,
51}
52
53fn default_process() -> Template {
54 Template::try_from("vector").unwrap()
55}
56
57impl GenerateConfig for PapertrailConfig {
58 fn generate_config() -> toml::Value {
59 toml::from_str(
60 r#"endpoint = "logs.papertrailapp.com:12345"
61 encoding.codec = "json""#,
62 )
63 .unwrap()
64 }
65}
66
67#[async_trait::async_trait]
68#[typetag::serde(name = "papertrail")]
69impl SinkConfig for PapertrailConfig {
70 async fn build(
71 &self,
72 _cx: SinkContext,
73 ) -> crate::Result<(super::VectorSink, super::Healthcheck)> {
74 let host = self
75 .endpoint
76 .uri
77 .host()
78 .map(str::to_string)
79 .ok_or_else(|| "A host is required for endpoint".to_string())?;
80 let port = self
81 .endpoint
82 .uri
83 .port_u16()
84 .ok_or_else(|| "A port is required for endpoint".to_string())?;
85
86 let address = format!("{host}:{port}");
87 let tls = Some(
88 self.tls
89 .clone()
90 .unwrap_or_else(TlsEnableableConfig::enabled),
91 );
92
93 let pid = std::process::id();
94 let process = self.process.clone();
95
96 let sink_config = TcpSinkConfig::new(address, self.keepalive, tls, self.send_buffer_bytes);
97
98 let transformer = self.encoding.transformer();
99 let serializer = self.encoding.build()?;
100 let encoder = Encoder::<()>::new(serializer);
101
102 sink_config.build(
103 Transformer::default(),
104 PapertrailEncoder {
105 pid,
106 process,
107 transformer,
108 encoder,
109 },
110 )
111 }
112
113 fn input(&self) -> Input {
114 let requirement = schema::Requirement::empty().optional_meaning("host", Kind::bytes());
115
116 Input::new(self.encoding.config().input_type() & DataType::Log)
117 .with_schema_requirement(requirement)
118 }
119
120 fn acknowledgements(&self) -> &AcknowledgementsConfig {
121 &self.acknowledgements
122 }
123}
124
125#[derive(Debug, Clone)]
126struct PapertrailEncoder {
127 pid: u32,
128 process: Template,
129 transformer: Transformer,
130 encoder: Encoder<()>,
131}
132
133impl tokio_util::codec::Encoder<Event> for PapertrailEncoder {
134 type Error = vector_lib::codecs::encoding::Error;
135
136 fn encode(
137 &mut self,
138 mut event: Event,
139 buffer: &mut bytes::BytesMut,
140 ) -> Result<(), Self::Error> {
141 let host = event
142 .as_mut_log()
143 .get_host()
144 .map(|host| host.to_string_lossy().into_owned());
145
146 let process = self
147 .process
148 .render_string(&event)
149 .map_err(|error| {
150 emit!(TemplateRenderingError {
151 error,
152 field: Some("process"),
153 drop_event: false,
154 })
155 })
156 .ok()
157 .unwrap_or_else(|| String::from("vector"));
158
159 let formatter = Formatter3164 {
160 facility: Facility::LOG_USER,
161 hostname: host,
162 process,
163 pid: self.pid,
164 };
165
166 self.transformer.transform(&mut event);
167
168 let mut bytes = BytesMut::new();
169 self.encoder.encode(event, &mut bytes)?;
170
171 let message = String::from_utf8_lossy(&bytes);
172
173 formatter
174 .format(&mut buffer.writer(), Severity::LOG_INFO, message)
175 .map_err(|error| Self::Error::SerializingError(format!("{error}").into()))?;
176
177 buffer.put_u8(b'\n');
178
179 Ok(())
180 }
181}
182
183#[cfg(test)]
184mod tests {
185 use serde::Deserialize;
186 use std::convert::TryFrom;
187
188 use bytes::BytesMut;
189 use futures::{future::ready, stream};
190 use tokio_util::codec::Encoder as _;
191 use vector_lib::codecs::JsonSerializerConfig;
192 use vector_lib::event::{Event, LogEvent};
193
194 use crate::test_util::{
195 components::{run_and_assert_sink_compliance, SINK_TAGS},
196 http::{always_200_response, spawn_blackhole_http_server},
197 };
198
199 use super::*;
200
201 #[test]
202 fn generate_config() {
203 crate::test_util::test_generate_config::<PapertrailConfig>();
204 }
205
206 #[tokio::test]
207 async fn component_spec_compliance() {
208 let mock_endpoint = spawn_blackhole_http_server(always_200_response).await;
209
210 let config = PapertrailConfig::generate_config().to_string();
211 let mut config = PapertrailConfig::deserialize(
212 toml::de::ValueDeserializer::parse(&config).expect("toml should deserialize"),
213 )
214 .expect("config should be valid");
215 config.endpoint = mock_endpoint.into();
216 config.tls = Some(TlsEnableableConfig::default());
217
218 let context = SinkContext::default();
219 let (sink, _healthcheck) = config.build(context).await.unwrap();
220
221 let event = Event::Log(LogEvent::from("simple message"));
222 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await;
223 }
224
225 #[test]
226 fn encode_event_apply_rules() {
227 let mut evt = Event::Log(LogEvent::from("vector"));
228 evt.as_mut_log().insert("magic", "key");
229 evt.as_mut_log().insert("process", "foo");
230
231 let mut encoder = PapertrailEncoder {
232 pid: 0,
233 process: Template::try_from("{{ process }}").unwrap(),
234 transformer: Transformer::new(None, Some(vec!["magic".into()]), None).unwrap(),
235 encoder: Encoder::<()>::new(JsonSerializerConfig::default().build().into()),
236 };
237
238 let mut bytes = BytesMut::new();
239 encoder.encode(evt, &mut bytes).unwrap();
240 let bytes = bytes.freeze();
241
242 let msg = bytes.slice(String::from_utf8_lossy(&bytes).find(": ").unwrap() + 2..bytes.len());
243 let value: serde_json::Value = serde_json::from_slice(&msg).unwrap();
244 let value = value.as_object().unwrap();
245
246 assert!(!value.contains_key("magic"));
247 assert_eq!(value.get("process").unwrap().as_str(), Some("foo"));
248 }
249}