1use std::{collections::HashMap, convert::TryFrom, io};
2
3use bytes::Bytes;
4use chrono::{FixedOffset, Utc};
5use http::{
6 Uri,
7 header::{HeaderName, HeaderValue},
8};
9use indoc::indoc;
10use snafu::{ResultExt, Snafu};
11use tower::ServiceBuilder;
12use uuid::Uuid;
13use vector_lib::{
14 TimeZone,
15 codecs::encoding::Framer,
16 configurable::configurable_component,
17 event::{EventFinalizers, Finalizable},
18 request_metadata::RequestMetadata,
19};
20
21use crate::{
22 codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
23 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
24 event::Event,
25 gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
26 http::{HttpClient, get_http_scheme_from_uri},
27 serde::json::to_string,
28 sinks::{
29 Healthcheck, VectorSink,
30 gcs_common::{
31 config::{
32 GcsPredefinedAcl, GcsRetryLogic, GcsStorageClass, build_healthcheck,
33 default_endpoint,
34 },
35 service::{GcsRequest, GcsRequestSettings, GcsService},
36 sink::GcsSink,
37 },
38 util::{
39 BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder, ServiceBuilderExt,
40 TowerRequestConfig, batch::BatchConfig, metadata::RequestMetadataBuilder,
41 partitioner::KeyPartitioner, request_builder::EncodeResult,
42 service::TowerRequestConfigDefaults, timezone_to_offset,
43 },
44 },
45 template::{Template, TemplateParseError},
46 tls::{TlsConfig, TlsSettings},
47};
48
49#[derive(Debug, Snafu)]
50#[snafu(visibility(pub))]
51pub enum GcsHealthcheckError {
52 #[snafu(display("key_prefix template parse error: {}", source))]
53 KeyPrefixTemplate { source: TemplateParseError },
54}
55
56#[derive(Clone, Copy, Debug)]
57pub struct GcsTowerRequestConfigDefaults;
58
59impl TowerRequestConfigDefaults for GcsTowerRequestConfigDefaults {
60 const RATE_LIMIT_NUM: u64 = 1_000;
61}
62
63#[configurable_component(sink(
65 "gcp_cloud_storage",
66 "Store observability events in GCP Cloud Storage."
67))]
68#[derive(Clone, Debug)]
69#[serde(deny_unknown_fields)]
70pub struct GcsSinkConfig {
71 #[configurable(metadata(docs::examples = "my-bucket"))]
73 bucket: String,
74
75 acl: Option<GcsPredefinedAcl>,
81
82 storage_class: Option<GcsStorageClass>,
88
89 #[configurable(metadata(docs::additional_props_description = "A key/value pair."))]
95 #[configurable(metadata(docs::advanced))]
96 metadata: Option<HashMap<String, String>>,
97
98 #[configurable(metadata(docs::templateable))]
104 #[configurable(metadata(
105 docs::examples = "date=%F/",
106 docs::examples = "date=%F/hour=%H/",
107 docs::examples = "year=%Y/month=%m/day=%d/",
108 docs::examples = "application_id={{ application_id }}/date=%F/"
109 ))]
110 #[configurable(metadata(docs::advanced))]
111 key_prefix: Option<String>,
112
113 #[serde(default = "default_time_format")]
130 #[configurable(metadata(docs::advanced))]
131 filename_time_format: String,
132
133 #[serde(default = "crate::serde::default_true")]
142 #[configurable(metadata(docs::advanced))]
143 filename_append_uuid: bool,
144
145 #[configurable(metadata(docs::advanced))]
149 filename_extension: Option<String>,
150
151 #[serde(flatten)]
152 encoding: EncodingConfigWithFraming,
153
154 #[configurable(derived)]
161 #[serde(default)]
162 compression: Compression,
163
164 #[configurable(metadata(
170 docs::examples = "text/plain; charset=utf-8",
171 docs::examples = "application/gzip"
172 ))]
173 content_type: Option<String>,
174
175 #[configurable(derived)]
176 #[serde(default)]
177 batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
178
179 #[configurable(metadata(docs::examples = "http://localhost:9000"))]
181 #[configurable(validation(format = "uri"))]
182 #[serde(default = "default_endpoint")]
183 endpoint: String,
184
185 #[configurable(derived)]
186 #[serde(default)]
187 request: TowerRequestConfig<GcsTowerRequestConfigDefaults>,
188
189 #[serde(flatten)]
190 auth: GcpAuthConfig,
191
192 #[configurable(derived)]
193 tls: Option<TlsConfig>,
194
195 #[configurable(derived)]
196 #[serde(
197 default,
198 deserialize_with = "crate::serde::bool_or_struct",
199 skip_serializing_if = "crate::serde::is_default"
200 )]
201 acknowledgements: AcknowledgementsConfig,
202
203 #[configurable(derived)]
204 #[serde(default)]
205 pub timezone: Option<TimeZone>,
206}
207
208fn default_time_format() -> String {
209 "%s".to_string()
210}
211
212#[cfg(test)]
213fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig {
214 GcsSinkConfig {
215 bucket: Default::default(),
216 acl: Default::default(),
217 storage_class: Default::default(),
218 metadata: Default::default(),
219 key_prefix: Default::default(),
220 filename_time_format: default_time_format(),
221 filename_append_uuid: true,
222 filename_extension: Default::default(),
223 content_type: Default::default(),
224 encoding,
225 compression: Compression::gzip_default(),
226 batch: Default::default(),
227 endpoint: Default::default(),
228 request: Default::default(),
229 auth: Default::default(),
230 tls: Default::default(),
231 acknowledgements: Default::default(),
232 timezone: Default::default(),
233 }
234}
235
236impl GenerateConfig for GcsSinkConfig {
237 fn generate_config() -> toml::Value {
238 toml::from_str(indoc! {r#"
239 bucket = "my-bucket"
240 credentials_path = "/path/to/credentials.json"
241 framing.method = "newline_delimited"
242 encoding.codec = "json"
243 "#})
244 .unwrap()
245 }
246}
247
248#[async_trait::async_trait]
249#[typetag::serde(name = "gcp_cloud_storage")]
250impl SinkConfig for GcsSinkConfig {
251 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
252 let auth = self.auth.build(Scope::DevStorageReadWrite).await?;
253 let base_url = format!("{}/{}/", self.endpoint, self.bucket);
254 let tls = TlsSettings::from_options(self.tls.as_ref())?;
255 let client = HttpClient::new(tls, cx.proxy())?;
256 let healthcheck = build_healthcheck(
257 self.bucket.clone(),
258 client.clone(),
259 base_url.clone(),
260 auth.clone(),
261 )?;
262 auth.spawn_regenerate_token();
263 let sink = self.build_sink(client, base_url, auth, cx)?;
264
265 Ok((sink, healthcheck))
266 }
267
268 fn input(&self) -> Input {
269 Input::new(self.encoding.config().1.input_type() & DataType::Log)
270 }
271
272 fn acknowledgements(&self) -> &AcknowledgementsConfig {
273 &self.acknowledgements
274 }
275}
276
277impl GcsSinkConfig {
278 fn build_sink(
279 &self,
280 client: HttpClient,
281 base_url: String,
282 auth: GcpAuthenticator,
283 cx: SinkContext,
284 ) -> crate::Result<VectorSink> {
285 let request = self.request.into_settings();
286
287 let batch_settings = self.batch.into_batcher_settings()?;
288
289 let partitioner = self.key_partitioner()?;
290
291 let protocol = get_http_scheme_from_uri(&base_url.parse::<Uri>().unwrap());
292
293 let svc = ServiceBuilder::new()
294 .settings(request, GcsRetryLogic::default())
295 .service(GcsService::new(client, base_url, auth));
296
297 let request_settings = RequestSettings::new(self, cx)?;
298
299 let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol);
300
301 Ok(VectorSink::from_event_streamsink(sink))
302 }
303
304 fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
305 Ok(KeyPartitioner::new(
306 Template::try_from(self.key_prefix.as_deref().unwrap_or("date=%F/"))
307 .context(KeyPrefixTemplateSnafu)?,
308 None,
309 ))
310 }
311}
312
313#[derive(Clone, Debug)]
317struct RequestSettings {
318 acl: Option<HeaderValue>,
319 content_type: HeaderValue,
320 content_encoding: Option<HeaderValue>,
321 storage_class: HeaderValue,
322 headers: Vec<(HeaderName, HeaderValue)>,
323 extension: String,
324 time_format: String,
325 append_uuid: bool,
326 encoder: (Transformer, Encoder<Framer>),
327 compression: Compression,
328 tz_offset: Option<FixedOffset>,
329}
330
331impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
332 type Metadata = (String, EventFinalizers);
333 type Events = Vec<Event>;
334 type Encoder = (Transformer, Encoder<Framer>);
335 type Payload = Bytes;
336 type Request = GcsRequest;
337 type Error = io::Error;
338
339 fn compression(&self) -> Compression {
340 self.compression
341 }
342
343 fn encoder(&self) -> &Self::Encoder {
344 &self.encoder
345 }
346
347 fn split_input(
348 &self,
349 input: (String, Vec<Event>),
350 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
351 let (partition_key, mut events) = input;
352 let finalizers = events.take_finalizers();
353 let builder = RequestMetadataBuilder::from_events(&events);
354
355 ((partition_key, finalizers), builder, events)
356 }
357
358 fn build_request(
359 &self,
360 gcp_metadata: Self::Metadata,
361 metadata: RequestMetadata,
362 payload: EncodeResult<Self::Payload>,
363 ) -> Self::Request {
364 let (key, finalizers) = gcp_metadata;
365 let filename = {
367 let seconds = match self.tz_offset {
368 Some(offset) => Utc::now().with_timezone(&offset).format(&self.time_format),
369 None => Utc::now()
370 .with_timezone(&chrono::Utc)
371 .format(&self.time_format),
372 };
373
374 if self.append_uuid {
375 let uuid = Uuid::new_v4();
376 format!("{}-{}", seconds, uuid.hyphenated())
377 } else {
378 seconds.to_string()
379 }
380 };
381
382 let key = format!("{}{}.{}", key, filename, self.extension);
383 let body = payload.into_payload();
384
385 GcsRequest {
386 key,
387 body,
388 finalizers,
389 settings: GcsRequestSettings {
390 acl: self.acl.clone(),
391 content_type: self.content_type.clone(),
392 content_encoding: self.content_encoding.clone(),
393 storage_class: self.storage_class.clone(),
394 headers: self.headers.clone(),
395 },
396 metadata,
397 }
398 }
399}
400
401impl RequestSettings {
402 fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result<Self> {
403 let transformer = config.encoding.transformer();
404 let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?;
405 let encoder = Encoder::<Framer>::new(framer, serializer);
406 let acl = config
407 .acl
408 .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap());
409 let content_type_str = config
410 .content_type
411 .as_deref()
412 .unwrap_or_else(|| encoder.content_type());
413 let content_type = HeaderValue::from_str(content_type_str)?;
414 let content_encoding = config
415 .compression
416 .content_encoding()
417 .map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap());
418 let storage_class = config.storage_class.unwrap_or_default();
419 let storage_class = HeaderValue::from_str(&to_string(storage_class)).unwrap();
420 let metadata = config
421 .metadata
422 .as_ref()
423 .map(|metadata| {
424 metadata
425 .iter()
426 .map(make_header)
427 .collect::<Result<Vec<_>, _>>()
428 })
429 .unwrap_or_else(|| Ok(vec![]))?;
430 let extension = config
431 .filename_extension
432 .clone()
433 .unwrap_or_else(|| config.compression.extension().into());
434 let time_format = config.filename_time_format.clone();
435 let append_uuid = config.filename_append_uuid;
436 let offset = config
437 .timezone
438 .or(cx.globals.timezone)
439 .and_then(timezone_to_offset);
440
441 Ok(Self {
442 acl,
443 content_type,
444 content_encoding,
445 storage_class,
446 headers: metadata,
447 extension,
448 time_format,
449 append_uuid,
450 compression: config.compression,
451 encoder: (transformer, encoder),
452 tz_offset: offset,
453 })
454 }
455}
456
457fn make_header((name, value): (&String, &String)) -> crate::Result<(HeaderName, HeaderValue)> {
459 Ok((
460 HeaderName::from_bytes(name.as_bytes())?,
461 HeaderValue::from_str(value)?,
462 ))
463}
464
465#[cfg(test)]
466mod tests {
467 use futures_util::{future::ready, stream};
468 use vector_lib::{
469 EstimatedJsonEncodedSizeOf,
470 codecs::{
471 JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig,
472 encoding::FramingConfig,
473 },
474 partition::Partitioner,
475 request_metadata::GroupedCountByteSize,
476 };
477
478 use super::*;
479 use crate::{
480 event::LogEvent,
481 test_util::{
482 components::{SINK_TAGS, run_and_assert_sink_compliance},
483 http::{always_200_response, spawn_blackhole_http_server},
484 },
485 };
486
487 #[test]
488 fn generate_config() {
489 crate::test_util::test_generate_config::<GcsSinkConfig>();
490 }
491
492 #[tokio::test]
493 async fn component_spec_compliance() {
494 let mock_endpoint = spawn_blackhole_http_server(always_200_response).await;
495
496 let context = SinkContext::default();
497
498 let tls = TlsSettings::default();
499 let client =
500 HttpClient::new(tls, context.proxy()).expect("should not fail to create HTTP client");
501
502 let config =
503 default_config((None::<FramingConfig>, JsonSerializerConfig::default()).into());
504 let sink = config
505 .build_sink(
506 client,
507 mock_endpoint.to_string(),
508 GcpAuthenticator::None,
509 context,
510 )
511 .expect("failed to build sink");
512
513 let event = Event::Log(LogEvent::from("simple message"));
514 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await;
515 }
516
517 #[test]
518 fn gcs_encode_event_apply_rules() {
519 crate::test_util::trace_init();
520
521 let message = "hello world".to_string();
522 let mut event = LogEvent::from(message);
523 event.insert("key", "value");
524
525 let sink_config = GcsSinkConfig {
526 key_prefix: Some("key: {{ key }}".into()),
527 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
528 };
529 let key = sink_config
530 .key_partitioner()
531 .unwrap()
532 .partition(&Event::Log(event))
533 .expect("key wasn't provided");
534
535 assert_eq!(key, "key: value");
536 }
537
538 fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings {
539 RequestSettings::new(sink_config, context).expect("Could not create request settings")
540 }
541
542 fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest {
543 let context = SinkContext::default();
544 let sink_config = GcsSinkConfig {
545 key_prefix: Some("key/".into()),
546 filename_time_format: "date".into(),
547 filename_extension: extension.map(Into::into),
548 filename_append_uuid: uuid,
549 compression,
550 ..default_config(
551 (
552 Some(NewlineDelimitedEncoderConfig::new()),
553 JsonSerializerConfig::default(),
554 )
555 .into(),
556 )
557 };
558 let log = LogEvent::default().into();
559 let key = sink_config
560 .key_partitioner()
561 .unwrap()
562 .partition(&log)
563 .expect("key wasn't provided");
564
565 let mut byte_size = GroupedCountByteSize::new_untagged();
566 byte_size.add_event(&log, log.estimated_json_encoded_size_of());
567
568 let request_settings = request_settings(&sink_config, context);
569 let (metadata, metadata_request_builder, _events) =
570 request_settings.split_input((key, vec![log]));
571 let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
572 let request_metadata = metadata_request_builder.build(&payload);
573
574 request_settings.build_request(metadata, request_metadata, payload)
575 }
576
577 #[test]
578 fn gcs_build_request() {
579 let req = build_request(Some("ext"), false, Compression::None);
580 assert_eq!(req.key, "key/date.ext".to_string());
581
582 let req = build_request(None, false, Compression::None);
583 assert_eq!(req.key, "key/date.log".to_string());
584
585 let req = build_request(None, false, Compression::gzip_default());
586 assert_eq!(req.key, "key/date.log.gz".to_string());
587
588 let req = build_request(None, true, Compression::gzip_default());
589 assert_ne!(req.key, "key/date.log.gz".to_string());
590 }
591
592 #[test]
593 fn gcs_content_type_default() {
594 let context = SinkContext::default();
595 let sink_config = GcsSinkConfig {
596 content_type: None,
597 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
598 };
599
600 let request_settings = request_settings(&sink_config, context);
601 assert_eq!(
603 request_settings.content_type.to_str().unwrap(),
604 "text/plain"
605 );
606 }
607
608 #[test]
609 fn gcs_content_type_custom() {
610 let context = SinkContext::default();
611 let sink_config = GcsSinkConfig {
612 content_type: Some("text/plain; charset=utf-8".to_string()),
613 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
614 };
615
616 let request_settings = request_settings(&sink_config, context);
617 assert_eq!(
619 request_settings.content_type.to_str().unwrap(),
620 "text/plain; charset=utf-8"
621 );
622 }
623
624 #[test]
625 fn gcs_content_type_invalid() {
626 let context = SinkContext::default();
627 let sink_config = GcsSinkConfig {
628 content_type: Some("text/plain\nInvalid".to_string()),
630 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
631 };
632
633 let result = RequestSettings::new(&sink_config, context);
634 assert!(result.is_err());
636 }
637}