1use std::{collections::HashMap, convert::TryFrom, io};
2
3use bytes::Bytes;
4use chrono::{FixedOffset, Utc};
5use http::header::{HeaderName, HeaderValue};
6use http::Uri;
7use indoc::indoc;
8use snafu::ResultExt;
9use snafu::Snafu;
10use tower::ServiceBuilder;
11use uuid::Uuid;
12use vector_lib::codecs::encoding::Framer;
13use vector_lib::configurable::configurable_component;
14use vector_lib::event::{EventFinalizers, Finalizable};
15use vector_lib::{request_metadata::RequestMetadata, TimeZone};
16
17use crate::sinks::util::metadata::RequestMetadataBuilder;
18use crate::sinks::util::service::TowerRequestConfigDefaults;
19use crate::{
20 codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
21 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
22 event::Event,
23 gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
24 http::{get_http_scheme_from_uri, HttpClient},
25 serde::json::to_string,
26 sinks::{
27 gcs_common::{
28 config::{
29 build_healthcheck, default_endpoint, GcsPredefinedAcl, GcsRetryLogic,
30 GcsStorageClass,
31 },
32 service::{GcsRequest, GcsRequestSettings, GcsService},
33 sink::GcsSink,
34 },
35 util::{
36 batch::BatchConfig, partitioner::KeyPartitioner, request_builder::EncodeResult,
37 timezone_to_offset, BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder,
38 ServiceBuilderExt, TowerRequestConfig,
39 },
40 Healthcheck, VectorSink,
41 },
42 template::{Template, TemplateParseError},
43 tls::{TlsConfig, TlsSettings},
44};
45
46#[derive(Debug, Snafu)]
47#[snafu(visibility(pub))]
48pub enum GcsHealthcheckError {
49 #[snafu(display("key_prefix template parse error: {}", source))]
50 KeyPrefixTemplate { source: TemplateParseError },
51}
52
53#[derive(Clone, Copy, Debug)]
54pub struct GcsTowerRequestConfigDefaults;
55
56impl TowerRequestConfigDefaults for GcsTowerRequestConfigDefaults {
57 const RATE_LIMIT_NUM: u64 = 1_000;
58}
59
60#[configurable_component(sink(
62 "gcp_cloud_storage",
63 "Store observability events in GCP Cloud Storage."
64))]
65#[derive(Clone, Debug)]
66#[serde(deny_unknown_fields)]
67pub struct GcsSinkConfig {
68 #[configurable(metadata(docs::examples = "my-bucket"))]
70 bucket: String,
71
72 acl: Option<GcsPredefinedAcl>,
78
79 storage_class: Option<GcsStorageClass>,
85
86 #[configurable(metadata(docs::additional_props_description = "A key/value pair."))]
92 #[configurable(metadata(docs::advanced))]
93 metadata: Option<HashMap<String, String>>,
94
95 #[configurable(metadata(docs::templateable))]
101 #[configurable(metadata(
102 docs::examples = "date=%F/",
103 docs::examples = "date=%F/hour=%H/",
104 docs::examples = "year=%Y/month=%m/day=%d/",
105 docs::examples = "application_id={{ application_id }}/date=%F/"
106 ))]
107 #[configurable(metadata(docs::advanced))]
108 key_prefix: Option<String>,
109
110 #[serde(default = "default_time_format")]
127 #[configurable(metadata(docs::advanced))]
128 filename_time_format: String,
129
130 #[serde(default = "crate::serde::default_true")]
139 #[configurable(metadata(docs::advanced))]
140 filename_append_uuid: bool,
141
142 #[configurable(metadata(docs::advanced))]
146 filename_extension: Option<String>,
147
148 #[serde(flatten)]
149 encoding: EncodingConfigWithFraming,
150
151 #[configurable(derived)]
158 #[serde(default)]
159 compression: Compression,
160
161 #[configurable(derived)]
162 #[serde(default)]
163 batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
164
165 #[configurable(metadata(docs::examples = "http://localhost:9000"))]
167 #[configurable(validation(format = "uri"))]
168 #[serde(default = "default_endpoint")]
169 endpoint: String,
170
171 #[configurable(derived)]
172 #[serde(default)]
173 request: TowerRequestConfig<GcsTowerRequestConfigDefaults>,
174
175 #[serde(flatten)]
176 auth: GcpAuthConfig,
177
178 #[configurable(derived)]
179 tls: Option<TlsConfig>,
180
181 #[configurable(derived)]
182 #[serde(
183 default,
184 deserialize_with = "crate::serde::bool_or_struct",
185 skip_serializing_if = "crate::serde::is_default"
186 )]
187 acknowledgements: AcknowledgementsConfig,
188
189 #[configurable(derived)]
190 #[serde(default)]
191 pub timezone: Option<TimeZone>,
192}
193
194fn default_time_format() -> String {
195 "%s".to_string()
196}
197
198#[cfg(test)]
199fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig {
200 GcsSinkConfig {
201 bucket: Default::default(),
202 acl: Default::default(),
203 storage_class: Default::default(),
204 metadata: Default::default(),
205 key_prefix: Default::default(),
206 filename_time_format: default_time_format(),
207 filename_append_uuid: true,
208 filename_extension: Default::default(),
209 encoding,
210 compression: Compression::gzip_default(),
211 batch: Default::default(),
212 endpoint: Default::default(),
213 request: Default::default(),
214 auth: Default::default(),
215 tls: Default::default(),
216 acknowledgements: Default::default(),
217 timezone: Default::default(),
218 }
219}
220
221impl GenerateConfig for GcsSinkConfig {
222 fn generate_config() -> toml::Value {
223 toml::from_str(indoc! {r#"
224 bucket = "my-bucket"
225 credentials_path = "/path/to/credentials.json"
226 framing.method = "newline_delimited"
227 encoding.codec = "json"
228 "#})
229 .unwrap()
230 }
231}
232
233#[async_trait::async_trait]
234#[typetag::serde(name = "gcp_cloud_storage")]
235impl SinkConfig for GcsSinkConfig {
236 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
237 let auth = self.auth.build(Scope::DevStorageReadWrite).await?;
238 let base_url = format!("{}/{}/", self.endpoint, self.bucket);
239 let tls = TlsSettings::from_options(self.tls.as_ref())?;
240 let client = HttpClient::new(tls, cx.proxy())?;
241 let healthcheck = build_healthcheck(
242 self.bucket.clone(),
243 client.clone(),
244 base_url.clone(),
245 auth.clone(),
246 )?;
247 auth.spawn_regenerate_token();
248 let sink = self.build_sink(client, base_url, auth, cx)?;
249
250 Ok((sink, healthcheck))
251 }
252
253 fn input(&self) -> Input {
254 Input::new(self.encoding.config().1.input_type() & DataType::Log)
255 }
256
257 fn acknowledgements(&self) -> &AcknowledgementsConfig {
258 &self.acknowledgements
259 }
260}
261
262impl GcsSinkConfig {
263 fn build_sink(
264 &self,
265 client: HttpClient,
266 base_url: String,
267 auth: GcpAuthenticator,
268 cx: SinkContext,
269 ) -> crate::Result<VectorSink> {
270 let request = self.request.into_settings();
271
272 let batch_settings = self.batch.into_batcher_settings()?;
273
274 let partitioner = self.key_partitioner()?;
275
276 let protocol = get_http_scheme_from_uri(&base_url.parse::<Uri>().unwrap());
277
278 let svc = ServiceBuilder::new()
279 .settings(request, GcsRetryLogic::default())
280 .service(GcsService::new(client, base_url, auth));
281
282 let request_settings = RequestSettings::new(self, cx)?;
283
284 let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol);
285
286 Ok(VectorSink::from_event_streamsink(sink))
287 }
288
289 fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
290 Ok(KeyPartitioner::new(
291 Template::try_from(self.key_prefix.as_deref().unwrap_or("date=%F/"))
292 .context(KeyPrefixTemplateSnafu)?,
293 None,
294 ))
295 }
296}
297
298#[derive(Clone, Debug)]
302struct RequestSettings {
303 acl: Option<HeaderValue>,
304 content_type: HeaderValue,
305 content_encoding: Option<HeaderValue>,
306 storage_class: HeaderValue,
307 headers: Vec<(HeaderName, HeaderValue)>,
308 extension: String,
309 time_format: String,
310 append_uuid: bool,
311 encoder: (Transformer, Encoder<Framer>),
312 compression: Compression,
313 tz_offset: Option<FixedOffset>,
314}
315
316impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
317 type Metadata = (String, EventFinalizers);
318 type Events = Vec<Event>;
319 type Encoder = (Transformer, Encoder<Framer>);
320 type Payload = Bytes;
321 type Request = GcsRequest;
322 type Error = io::Error;
323
324 fn compression(&self) -> Compression {
325 self.compression
326 }
327
328 fn encoder(&self) -> &Self::Encoder {
329 &self.encoder
330 }
331
332 fn split_input(
333 &self,
334 input: (String, Vec<Event>),
335 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
336 let (partition_key, mut events) = input;
337 let finalizers = events.take_finalizers();
338 let builder = RequestMetadataBuilder::from_events(&events);
339
340 ((partition_key, finalizers), builder, events)
341 }
342
343 fn build_request(
344 &self,
345 gcp_metadata: Self::Metadata,
346 metadata: RequestMetadata,
347 payload: EncodeResult<Self::Payload>,
348 ) -> Self::Request {
349 let (key, finalizers) = gcp_metadata;
350 let filename = {
352 let seconds = match self.tz_offset {
353 Some(offset) => Utc::now().with_timezone(&offset).format(&self.time_format),
354 None => Utc::now()
355 .with_timezone(&chrono::Utc)
356 .format(&self.time_format),
357 };
358
359 if self.append_uuid {
360 let uuid = Uuid::new_v4();
361 format!("{}-{}", seconds, uuid.hyphenated())
362 } else {
363 seconds.to_string()
364 }
365 };
366
367 let key = format!("{}{}.{}", key, filename, self.extension);
368 let body = payload.into_payload();
369
370 GcsRequest {
371 key,
372 body,
373 finalizers,
374 settings: GcsRequestSettings {
375 acl: self.acl.clone(),
376 content_type: self.content_type.clone(),
377 content_encoding: self.content_encoding.clone(),
378 storage_class: self.storage_class.clone(),
379 headers: self.headers.clone(),
380 },
381 metadata,
382 }
383 }
384}
385
386impl RequestSettings {
387 fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result<Self> {
388 let transformer = config.encoding.transformer();
389 let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?;
390 let encoder = Encoder::<Framer>::new(framer, serializer);
391 let acl = config
392 .acl
393 .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap());
394 let content_type = HeaderValue::from_str(encoder.content_type()).unwrap();
395 let content_encoding = config
396 .compression
397 .content_encoding()
398 .map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap());
399 let storage_class = config.storage_class.unwrap_or_default();
400 let storage_class = HeaderValue::from_str(&to_string(storage_class)).unwrap();
401 let metadata = config
402 .metadata
403 .as_ref()
404 .map(|metadata| {
405 metadata
406 .iter()
407 .map(make_header)
408 .collect::<Result<Vec<_>, _>>()
409 })
410 .unwrap_or_else(|| Ok(vec![]))?;
411 let extension = config
412 .filename_extension
413 .clone()
414 .unwrap_or_else(|| config.compression.extension().into());
415 let time_format = config.filename_time_format.clone();
416 let append_uuid = config.filename_append_uuid;
417 let offset = config
418 .timezone
419 .or(cx.globals.timezone)
420 .and_then(timezone_to_offset);
421
422 Ok(Self {
423 acl,
424 content_type,
425 content_encoding,
426 storage_class,
427 headers: metadata,
428 extension,
429 time_format,
430 append_uuid,
431 compression: config.compression,
432 encoder: (transformer, encoder),
433 tz_offset: offset,
434 })
435 }
436}
437
438fn make_header((name, value): (&String, &String)) -> crate::Result<(HeaderName, HeaderValue)> {
440 Ok((
441 HeaderName::from_bytes(name.as_bytes())?,
442 HeaderValue::from_str(value)?,
443 ))
444}
445
446#[cfg(test)]
447mod tests {
448 use futures_util::{future::ready, stream};
449 use vector_lib::codecs::encoding::FramingConfig;
450 use vector_lib::codecs::{
451 JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig,
452 };
453 use vector_lib::partition::Partitioner;
454 use vector_lib::request_metadata::GroupedCountByteSize;
455 use vector_lib::EstimatedJsonEncodedSizeOf;
456
457 use crate::event::LogEvent;
458 use crate::test_util::{
459 components::{run_and_assert_sink_compliance, SINK_TAGS},
460 http::{always_200_response, spawn_blackhole_http_server},
461 };
462
463 use super::*;
464
465 #[test]
466 fn generate_config() {
467 crate::test_util::test_generate_config::<GcsSinkConfig>();
468 }
469
470 #[tokio::test]
471 async fn component_spec_compliance() {
472 let mock_endpoint = spawn_blackhole_http_server(always_200_response).await;
473
474 let context = SinkContext::default();
475
476 let tls = TlsSettings::default();
477 let client =
478 HttpClient::new(tls, context.proxy()).expect("should not fail to create HTTP client");
479
480 let config =
481 default_config((None::<FramingConfig>, JsonSerializerConfig::default()).into());
482 let sink = config
483 .build_sink(
484 client,
485 mock_endpoint.to_string(),
486 GcpAuthenticator::None,
487 context,
488 )
489 .expect("failed to build sink");
490
491 let event = Event::Log(LogEvent::from("simple message"));
492 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await;
493 }
494
495 #[test]
496 fn gcs_encode_event_apply_rules() {
497 crate::test_util::trace_init();
498
499 let message = "hello world".to_string();
500 let mut event = LogEvent::from(message);
501 event.insert("key", "value");
502
503 let sink_config = GcsSinkConfig {
504 key_prefix: Some("key: {{ key }}".into()),
505 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
506 };
507 let key = sink_config
508 .key_partitioner()
509 .unwrap()
510 .partition(&Event::Log(event))
511 .expect("key wasn't provided");
512
513 assert_eq!(key, "key: value");
514 }
515
516 fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings {
517 RequestSettings::new(sink_config, context).expect("Could not create request settings")
518 }
519
520 fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest {
521 let context = SinkContext::default();
522 let sink_config = GcsSinkConfig {
523 key_prefix: Some("key/".into()),
524 filename_time_format: "date".into(),
525 filename_extension: extension.map(Into::into),
526 filename_append_uuid: uuid,
527 compression,
528 ..default_config(
529 (
530 Some(NewlineDelimitedEncoderConfig::new()),
531 JsonSerializerConfig::default(),
532 )
533 .into(),
534 )
535 };
536 let log = LogEvent::default().into();
537 let key = sink_config
538 .key_partitioner()
539 .unwrap()
540 .partition(&log)
541 .expect("key wasn't provided");
542
543 let mut byte_size = GroupedCountByteSize::new_untagged();
544 byte_size.add_event(&log, log.estimated_json_encoded_size_of());
545
546 let request_settings = request_settings(&sink_config, context);
547 let (metadata, metadata_request_builder, _events) =
548 request_settings.split_input((key, vec![log]));
549 let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
550 let request_metadata = metadata_request_builder.build(&payload);
551
552 request_settings.build_request(metadata, request_metadata, payload)
553 }
554
555 #[test]
556 fn gcs_build_request() {
557 let req = build_request(Some("ext"), false, Compression::None);
558 assert_eq!(req.key, "key/date.ext".to_string());
559
560 let req = build_request(None, false, Compression::None);
561 assert_eq!(req.key, "key/date.log".to_string());
562
563 let req = build_request(None, false, Compression::gzip_default());
564 assert_eq!(req.key, "key/date.log.gz".to_string());
565
566 let req = build_request(None, true, Compression::gzip_default());
567 assert_ne!(req.key, "key/date.log.gz".to_string());
568 }
569}