1use std::{collections::HashMap, convert::TryFrom, io};
2
3use bytes::Bytes;
4use chrono::{FixedOffset, Utc};
5use http::{
6 Uri,
7 header::{HeaderName, HeaderValue},
8};
9use indoc::indoc;
10use snafu::{ResultExt, Snafu};
11use tower::ServiceBuilder;
12use uuid::Uuid;
13use vector_lib::{
14 TimeZone,
15 codecs::encoding::Framer,
16 configurable::configurable_component,
17 event::{EventFinalizers, Finalizable},
18 request_metadata::RequestMetadata,
19};
20
21use crate::{
22 codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
23 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
24 event::Event,
25 gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
26 http::{HttpClient, get_http_scheme_from_uri},
27 serde::json::to_string,
28 sinks::{
29 Healthcheck, VectorSink,
30 gcs_common::{
31 config::{
32 GcsPredefinedAcl, GcsRetryLogic, GcsStorageClass, build_healthcheck,
33 default_endpoint,
34 },
35 service::{GcsRequest, GcsRequestSettings, GcsService},
36 sink::GcsSink,
37 },
38 util::{
39 BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder, ServiceBuilderExt,
40 TowerRequestConfig, batch::BatchConfig, metadata::RequestMetadataBuilder,
41 partitioner::KeyPartitioner, request_builder::EncodeResult,
42 service::TowerRequestConfigDefaults, timezone_to_offset,
43 },
44 },
45 template::{Template, TemplateParseError},
46 tls::{TlsConfig, TlsSettings},
47};
48
49#[derive(Debug, Snafu)]
50#[snafu(visibility(pub))]
51pub enum GcsHealthcheckError {
52 #[snafu(display("key_prefix template parse error: {}", source))]
53 KeyPrefixTemplate { source: TemplateParseError },
54}
55
56#[derive(Clone, Copy, Debug)]
57pub struct GcsTowerRequestConfigDefaults;
58
59impl TowerRequestConfigDefaults for GcsTowerRequestConfigDefaults {
60 const RATE_LIMIT_NUM: u64 = 1_000;
61}
62
63#[configurable_component(sink(
65 "gcp_cloud_storage",
66 "Store observability events in GCP Cloud Storage."
67))]
68#[derive(Clone, Debug)]
69#[serde(deny_unknown_fields)]
70pub struct GcsSinkConfig {
71 #[configurable(metadata(docs::examples = "my-bucket"))]
73 bucket: String,
74
75 acl: Option<GcsPredefinedAcl>,
81
82 storage_class: Option<GcsStorageClass>,
88
89 #[configurable(metadata(docs::additional_props_description = "A key/value pair."))]
95 #[configurable(metadata(docs::advanced))]
96 metadata: Option<HashMap<String, String>>,
97
98 #[configurable(metadata(docs::templateable))]
104 #[configurable(metadata(
105 docs::examples = "date=%F/",
106 docs::examples = "date=%F/hour=%H/",
107 docs::examples = "year=%Y/month=%m/day=%d/",
108 docs::examples = "application_id={{ application_id }}/date=%F/"
109 ))]
110 #[configurable(metadata(docs::advanced))]
111 key_prefix: Option<String>,
112
113 #[serde(default = "default_time_format")]
130 #[configurable(metadata(docs::advanced))]
131 filename_time_format: String,
132
133 #[serde(default = "crate::serde::default_true")]
142 #[configurable(metadata(docs::advanced))]
143 filename_append_uuid: bool,
144
145 #[configurable(metadata(docs::advanced))]
149 filename_extension: Option<String>,
150
151 #[serde(flatten)]
152 encoding: EncodingConfigWithFraming,
153
154 #[configurable(derived)]
161 #[serde(default)]
162 compression: Compression,
163
164 #[configurable(metadata(
170 docs::examples = "text/plain; charset=utf-8",
171 docs::examples = "application/gzip"
172 ))]
173 content_type: Option<String>,
174
175 #[configurable(metadata(docs::examples = "gzip", docs::examples = "zstd"))]
181 content_encoding: Option<String>,
182
183 #[configurable(metadata(docs::examples = "no-transform"))]
187 cache_control: Option<String>,
188
189 #[configurable(derived)]
190 #[serde(default)]
191 batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
192
193 #[configurable(metadata(docs::examples = "http://localhost:9000"))]
195 #[configurable(validation(format = "uri"))]
196 #[serde(default = "default_endpoint")]
197 endpoint: String,
198
199 #[configurable(derived)]
200 #[serde(default)]
201 request: TowerRequestConfig<GcsTowerRequestConfigDefaults>,
202
203 #[serde(flatten)]
204 auth: GcpAuthConfig,
205
206 #[configurable(derived)]
207 tls: Option<TlsConfig>,
208
209 #[configurable(derived)]
210 #[serde(
211 default,
212 deserialize_with = "crate::serde::bool_or_struct",
213 skip_serializing_if = "crate::serde::is_default"
214 )]
215 acknowledgements: AcknowledgementsConfig,
216
217 #[configurable(derived)]
218 #[serde(default)]
219 pub timezone: Option<TimeZone>,
220}
221
222fn default_time_format() -> String {
223 "%s".to_string()
224}
225
226#[cfg(test)]
227fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig {
228 GcsSinkConfig {
229 bucket: Default::default(),
230 acl: Default::default(),
231 storage_class: Default::default(),
232 metadata: Default::default(),
233 key_prefix: Default::default(),
234 filename_time_format: default_time_format(),
235 filename_append_uuid: true,
236 filename_extension: Default::default(),
237 content_type: Default::default(),
238 content_encoding: Default::default(),
239 cache_control: Default::default(),
240 encoding,
241 compression: Compression::gzip_default(),
242 batch: Default::default(),
243 endpoint: Default::default(),
244 request: Default::default(),
245 auth: Default::default(),
246 tls: Default::default(),
247 acknowledgements: Default::default(),
248 timezone: Default::default(),
249 }
250}
251
252impl GenerateConfig for GcsSinkConfig {
253 fn generate_config() -> toml::Value {
254 toml::from_str(indoc! {r#"
255 bucket = "my-bucket"
256 credentials_path = "/path/to/credentials.json"
257 framing.method = "newline_delimited"
258 encoding.codec = "json"
259 "#})
260 .unwrap()
261 }
262}
263
264#[async_trait::async_trait]
265#[typetag::serde(name = "gcp_cloud_storage")]
266impl SinkConfig for GcsSinkConfig {
267 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
268 let auth = self.auth.build(Scope::DevStorageReadWrite).await?;
269 let base_url = format!("{}/{}/", self.endpoint, self.bucket);
270 let tls = TlsSettings::from_options(self.tls.as_ref())?;
271 let client = HttpClient::new(tls, cx.proxy())?;
272 let healthcheck = build_healthcheck(
273 self.bucket.clone(),
274 client.clone(),
275 base_url.clone(),
276 auth.clone(),
277 )?;
278 auth.spawn_regenerate_token();
279 let sink = self.build_sink(client, base_url, auth, cx)?;
280
281 Ok((sink, healthcheck))
282 }
283
284 fn input(&self) -> Input {
285 Input::new(self.encoding.config().1.input_type() & DataType::Log)
286 }
287
288 fn acknowledgements(&self) -> &AcknowledgementsConfig {
289 &self.acknowledgements
290 }
291}
292
293impl GcsSinkConfig {
294 fn build_sink(
295 &self,
296 client: HttpClient,
297 base_url: String,
298 auth: GcpAuthenticator,
299 cx: SinkContext,
300 ) -> crate::Result<VectorSink> {
301 let request = self.request.into_settings();
302
303 let batch_settings = self.batch.into_batcher_settings()?;
304
305 let partitioner = self.key_partitioner()?;
306
307 let protocol = get_http_scheme_from_uri(&base_url.parse::<Uri>().unwrap());
308
309 let svc = ServiceBuilder::new()
310 .settings(request, GcsRetryLogic::default())
311 .service(GcsService::new(client, base_url, auth));
312
313 let request_settings = RequestSettings::new(self, cx)?;
314
315 let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol);
316
317 Ok(VectorSink::from_event_streamsink(sink))
318 }
319
320 fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
321 Ok(KeyPartitioner::new(
322 Template::try_from(self.key_prefix.as_deref().unwrap_or("date=%F/"))
323 .context(KeyPrefixTemplateSnafu)?,
324 None,
325 ))
326 }
327}
328
329#[derive(Clone, Debug)]
333struct RequestSettings {
334 acl: Option<HeaderValue>,
335 content_type: HeaderValue,
336 content_encoding: Option<HeaderValue>,
337 storage_class: HeaderValue,
338 cache_control: Option<HeaderValue>,
339 headers: Vec<(HeaderName, HeaderValue)>,
340 extension: String,
341 time_format: String,
342 append_uuid: bool,
343 encoder: (Transformer, Encoder<Framer>),
344 compression: Compression,
345 tz_offset: Option<FixedOffset>,
346}
347
348impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
349 type Metadata = (String, EventFinalizers);
350 type Events = Vec<Event>;
351 type Encoder = (Transformer, Encoder<Framer>);
352 type Payload = Bytes;
353 type Request = GcsRequest;
354 type Error = io::Error;
355
356 fn compression(&self) -> Compression {
357 self.compression
358 }
359
360 fn encoder(&self) -> &Self::Encoder {
361 &self.encoder
362 }
363
364 fn split_input(
365 &self,
366 input: (String, Vec<Event>),
367 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
368 let (partition_key, mut events) = input;
369 let finalizers = events.take_finalizers();
370 let builder = RequestMetadataBuilder::from_events(&events);
371
372 ((partition_key, finalizers), builder, events)
373 }
374
375 fn build_request(
376 &self,
377 gcp_metadata: Self::Metadata,
378 metadata: RequestMetadata,
379 payload: EncodeResult<Self::Payload>,
380 ) -> Self::Request {
381 let (key, finalizers) = gcp_metadata;
382 let filename = {
384 let seconds = match self.tz_offset {
385 Some(offset) => Utc::now().with_timezone(&offset).format(&self.time_format),
386 None => Utc::now()
387 .with_timezone(&chrono::Utc)
388 .format(&self.time_format),
389 };
390
391 if self.append_uuid {
392 let uuid = Uuid::new_v4();
393 format!("{}-{}", seconds, uuid.hyphenated())
394 } else {
395 seconds.to_string()
396 }
397 };
398
399 let key = format!("{}{}.{}", key, filename, self.extension);
400 let body = payload.into_payload();
401
402 GcsRequest {
403 key,
404 body,
405 finalizers,
406 settings: GcsRequestSettings {
407 acl: self.acl.clone(),
408 content_type: self.content_type.clone(),
409 content_encoding: self.content_encoding.clone(),
410 storage_class: self.storage_class.clone(),
411 cache_control: self.cache_control.clone(),
412 headers: self.headers.clone(),
413 },
414 metadata,
415 }
416 }
417}
418
419impl RequestSettings {
420 fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result<Self> {
421 let transformer = config.encoding.transformer();
422 let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?;
423 let encoder = Encoder::<Framer>::new(framer, serializer);
424 let acl = config
425 .acl
426 .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap());
427 let content_type_str = config
428 .content_type
429 .as_deref()
430 .unwrap_or_else(|| encoder.content_type());
431 let content_type = HeaderValue::from_str(content_type_str)?;
432 let content_encoding = match &config.content_encoding {
433 Some(ce) => Some(HeaderValue::from_str(ce)?),
434 None => config
435 .compression
436 .content_encoding()
437 .map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap()),
438 };
439 let storage_class = config.storage_class.unwrap_or_default();
440 let storage_class = HeaderValue::from_str(&to_string(storage_class)).unwrap();
441 let cache_control = config
442 .cache_control
443 .as_ref()
444 .map(|cc| HeaderValue::from_str(cc))
445 .transpose()?;
446 let metadata = config
447 .metadata
448 .as_ref()
449 .map(|metadata| {
450 metadata
451 .iter()
452 .map(make_header)
453 .collect::<Result<Vec<_>, _>>()
454 })
455 .unwrap_or_else(|| Ok(vec![]))?;
456 let extension = config
457 .filename_extension
458 .clone()
459 .unwrap_or_else(|| config.compression.extension().into());
460 let time_format = config.filename_time_format.clone();
461 let append_uuid = config.filename_append_uuid;
462 let offset = config
463 .timezone
464 .or(cx.globals.timezone)
465 .and_then(timezone_to_offset);
466
467 Ok(Self {
468 acl,
469 content_type,
470 content_encoding,
471 storage_class,
472 cache_control,
473 headers: metadata,
474 extension,
475 time_format,
476 append_uuid,
477 compression: config.compression,
478 encoder: (transformer, encoder),
479 tz_offset: offset,
480 })
481 }
482}
483
484fn make_header((name, value): (&String, &String)) -> crate::Result<(HeaderName, HeaderValue)> {
486 Ok((
487 HeaderName::from_bytes(name.as_bytes())?,
488 HeaderValue::from_str(value)?,
489 ))
490}
491
492#[cfg(test)]
493mod tests {
494 use futures_util::{future::ready, stream};
495 use vector_lib::{
496 EstimatedJsonEncodedSizeOf,
497 codecs::{
498 JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig,
499 encoding::FramingConfig,
500 },
501 partition::Partitioner,
502 request_metadata::GroupedCountByteSize,
503 };
504
505 use super::*;
506 use crate::{
507 event::LogEvent,
508 test_util::{
509 components::{SINK_TAGS, run_and_assert_sink_compliance},
510 http::{always_200_response, spawn_blackhole_http_server},
511 },
512 };
513
514 #[test]
515 fn generate_config() {
516 crate::test_util::test_generate_config::<GcsSinkConfig>();
517 }
518
519 #[tokio::test]
520 async fn component_spec_compliance() {
521 let mock_endpoint = spawn_blackhole_http_server(always_200_response).await;
522
523 let context = SinkContext::default();
524
525 let tls = TlsSettings::default();
526 let client =
527 HttpClient::new(tls, context.proxy()).expect("should not fail to create HTTP client");
528
529 let config =
530 default_config((None::<FramingConfig>, JsonSerializerConfig::default()).into());
531 let sink = config
532 .build_sink(
533 client,
534 mock_endpoint.to_string(),
535 GcpAuthenticator::None,
536 context,
537 )
538 .expect("failed to build sink");
539
540 let event = Event::Log(LogEvent::from("simple message"));
541 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await;
542 }
543
544 #[test]
545 fn gcs_encode_event_apply_rules() {
546 crate::test_util::trace_init();
547
548 let message = "hello world".to_string();
549 let mut event = LogEvent::from(message);
550 event.insert("key", "value");
551
552 let sink_config = GcsSinkConfig {
553 key_prefix: Some("key: {{ key }}".into()),
554 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
555 };
556 let key = sink_config
557 .key_partitioner()
558 .unwrap()
559 .partition(&Event::Log(event))
560 .expect("key wasn't provided");
561
562 assert_eq!(key, "key: value");
563 }
564
565 fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings {
566 RequestSettings::new(sink_config, context).expect("Could not create request settings")
567 }
568
569 fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest {
570 let context = SinkContext::default();
571 let sink_config = GcsSinkConfig {
572 key_prefix: Some("key/".into()),
573 filename_time_format: "date".into(),
574 filename_extension: extension.map(Into::into),
575 filename_append_uuid: uuid,
576 compression,
577 ..default_config(
578 (
579 Some(NewlineDelimitedEncoderConfig::new()),
580 JsonSerializerConfig::default(),
581 )
582 .into(),
583 )
584 };
585 let log = LogEvent::default().into();
586 let key = sink_config
587 .key_partitioner()
588 .unwrap()
589 .partition(&log)
590 .expect("key wasn't provided");
591
592 let mut byte_size = GroupedCountByteSize::new_untagged();
593 byte_size.add_event(&log, log.estimated_json_encoded_size_of());
594
595 let request_settings = request_settings(&sink_config, context);
596 let (metadata, metadata_request_builder, _events) =
597 request_settings.split_input((key, vec![log]));
598 let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
599 let request_metadata = metadata_request_builder.build(&payload);
600
601 request_settings.build_request(metadata, request_metadata, payload)
602 }
603
604 #[test]
605 fn gcs_build_request() {
606 let req = build_request(Some("ext"), false, Compression::None);
607 assert_eq!(req.key, "key/date.ext".to_string());
608
609 let req = build_request(None, false, Compression::None);
610 assert_eq!(req.key, "key/date.log".to_string());
611
612 let req = build_request(None, false, Compression::gzip_default());
613 assert_eq!(req.key, "key/date.log.gz".to_string());
614
615 let req = build_request(None, true, Compression::gzip_default());
616 assert_ne!(req.key, "key/date.log.gz".to_string());
617 }
618
619 #[test]
620 fn gcs_content_type_default() {
621 let context = SinkContext::default();
622 let sink_config = GcsSinkConfig {
623 content_type: None,
624 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
625 };
626
627 let request_settings = request_settings(&sink_config, context);
628 assert_eq!(
630 request_settings.content_type.to_str().unwrap(),
631 "text/plain"
632 );
633 }
634
635 #[test]
636 fn gcs_content_type_custom() {
637 let context = SinkContext::default();
638 let sink_config = GcsSinkConfig {
639 content_type: Some("text/plain; charset=utf-8".to_string()),
640 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
641 };
642
643 let request_settings = request_settings(&sink_config, context);
644 assert_eq!(
646 request_settings.content_type.to_str().unwrap(),
647 "text/plain; charset=utf-8"
648 );
649 }
650
651 #[test]
652 fn gcs_content_type_invalid() {
653 let context = SinkContext::default();
654 let sink_config = GcsSinkConfig {
655 content_type: Some("text/plain\nInvalid".to_string()),
657 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
658 };
659
660 let result = RequestSettings::new(&sink_config, context);
661 assert!(result.is_err());
663 }
664
665 #[test]
666 fn gcs_content_encoding_default() {
667 let context = SinkContext::default();
668 let sink_config = GcsSinkConfig {
669 content_encoding: None,
670 compression: Compression::gzip_default(),
671 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
672 };
673
674 let request_settings = request_settings(&sink_config, context);
675 assert_eq!(
677 request_settings.content_encoding.unwrap().to_str().unwrap(),
678 "gzip"
679 );
680 }
681
682 #[test]
683 fn gcs_content_encoding_none_when_no_compression() {
684 let context = SinkContext::default();
685 let sink_config = GcsSinkConfig {
686 content_encoding: None,
687 compression: Compression::None,
688 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
689 };
690
691 let request_settings = request_settings(&sink_config, context);
692 assert!(request_settings.content_encoding.is_none());
694 }
695
696 #[test]
697 fn gcs_content_encoding_custom() {
698 let context = SinkContext::default();
699 let sink_config = GcsSinkConfig {
700 content_encoding: Some("gzip".to_string()),
701 compression: Compression::None,
702 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
703 };
704
705 let request_settings = request_settings(&sink_config, context);
706 assert_eq!(
708 request_settings.content_encoding.unwrap().to_str().unwrap(),
709 "gzip"
710 );
711 }
712
713 #[test]
714 fn gcs_content_encoding_invalid() {
715 let context = SinkContext::default();
716 let sink_config = GcsSinkConfig {
717 content_encoding: Some("gzip\nInvalid".to_string()),
719 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
720 };
721
722 let result = RequestSettings::new(&sink_config, context);
723 assert!(result.is_err());
725 }
726
727 #[test]
728 fn gcs_content_encoding_empty() {
729 let context = SinkContext::default();
730 let sink_config = GcsSinkConfig {
731 content_encoding: Some("".to_string()),
733 compression: Compression::gzip_default(),
734 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
735 };
736
737 let request_settings = request_settings(&sink_config, context);
738 assert_eq!(
740 request_settings.content_encoding.unwrap().to_str().unwrap(),
741 ""
742 );
743 }
744
745 #[test]
746 fn gcs_cache_control_default() {
747 let context = SinkContext::default();
748 let sink_config = GcsSinkConfig {
749 cache_control: None,
750 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
751 };
752
753 let request_settings = request_settings(&sink_config, context);
754 assert!(request_settings.cache_control.is_none());
756 }
757
758 #[test]
759 fn gcs_cache_control_custom() {
760 let context = SinkContext::default();
761 let sink_config = GcsSinkConfig {
762 cache_control: Some("no-transform".to_string()),
763 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
764 };
765
766 let request_settings = request_settings(&sink_config, context);
767 assert_eq!(
768 request_settings.cache_control.unwrap().to_str().unwrap(),
769 "no-transform"
770 );
771 }
772
773 #[test]
774 fn gcs_cache_control_invalid() {
775 let context = SinkContext::default();
776 let sink_config = GcsSinkConfig {
777 cache_control: Some("no-cache\nInvalid".to_string()),
779 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
780 };
781
782 let result = RequestSettings::new(&sink_config, context);
783 assert!(result.is_err());
785 }
786}