1use std::{collections::HashMap, convert::TryFrom, io};
2
3use bytes::Bytes;
4use chrono::{FixedOffset, Utc};
5use http::{
6 Uri,
7 header::{HeaderName, HeaderValue},
8};
9use indoc::indoc;
10use snafu::{ResultExt, Snafu};
11use tower::ServiceBuilder;
12use uuid::Uuid;
13use vector_lib::{
14 TimeZone,
15 codecs::encoding::Framer,
16 configurable::configurable_component,
17 event::{EventFinalizers, Finalizable},
18 request_metadata::RequestMetadata,
19};
20
21use crate::{
22 codecs::{Encoder, EncodingConfigWithFraming, SinkType, Transformer},
23 config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
24 event::Event,
25 gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
26 http::{HttpClient, get_http_scheme_from_uri},
27 serde::json::to_string,
28 sinks::{
29 Healthcheck, VectorSink,
30 gcs_common::{
31 config::{
32 GcsPredefinedAcl, GcsRetryLogic, GcsStorageClass, build_healthcheck,
33 default_endpoint,
34 },
35 service::{GcsRequest, GcsRequestSettings, GcsService},
36 sink::GcsSink,
37 },
38 util::{
39 BulkSizeBasedDefaultBatchSettings, Compression, RequestBuilder, ServiceBuilderExt,
40 TowerRequestConfig, batch::BatchConfig, metadata::RequestMetadataBuilder,
41 partitioner::KeyPartitioner, request_builder::EncodeResult,
42 service::TowerRequestConfigDefaults, timezone_to_offset,
43 },
44 },
45 template::{Template, TemplateParseError},
46 tls::{TlsConfig, TlsSettings},
47};
48
49#[derive(Debug, Snafu)]
50#[snafu(visibility(pub))]
51pub enum GcsHealthcheckError {
52 #[snafu(display("key_prefix template parse error: {}", source))]
53 KeyPrefixTemplate { source: TemplateParseError },
54}
55
56#[derive(Clone, Copy, Debug)]
57pub struct GcsTowerRequestConfigDefaults;
58
59impl TowerRequestConfigDefaults for GcsTowerRequestConfigDefaults {
60 const RATE_LIMIT_NUM: u64 = 1_000;
61}
62
63#[configurable_component(sink(
65 "gcp_cloud_storage",
66 "Store observability events in GCP Cloud Storage."
67))]
68#[derive(Clone, Debug)]
69#[serde(deny_unknown_fields)]
70pub struct GcsSinkConfig {
71 #[configurable(metadata(docs::examples = "my-bucket"))]
73 bucket: String,
74
75 acl: Option<GcsPredefinedAcl>,
81
82 storage_class: Option<GcsStorageClass>,
88
89 #[configurable(metadata(docs::additional_props_description = "A key/value pair."))]
95 #[configurable(metadata(docs::advanced))]
96 metadata: Option<HashMap<String, String>>,
97
98 #[configurable(metadata(docs::templateable))]
104 #[configurable(metadata(
105 docs::examples = "date=%F/",
106 docs::examples = "date=%F/hour=%H/",
107 docs::examples = "year=%Y/month=%m/day=%d/",
108 docs::examples = "application_id={{ application_id }}/date=%F/"
109 ))]
110 #[configurable(metadata(docs::advanced))]
111 key_prefix: Option<String>,
112
113 #[serde(default = "default_time_format")]
130 #[configurable(metadata(docs::advanced))]
131 filename_time_format: String,
132
133 #[serde(default = "crate::serde::default_true")]
142 #[configurable(metadata(docs::advanced))]
143 filename_append_uuid: bool,
144
145 #[configurable(metadata(docs::advanced))]
149 filename_extension: Option<String>,
150
151 #[serde(flatten)]
152 encoding: EncodingConfigWithFraming,
153
154 #[configurable(derived)]
161 #[serde(default)]
162 compression: Compression,
163
164 #[configurable(derived)]
165 #[serde(default)]
166 batch: BatchConfig<BulkSizeBasedDefaultBatchSettings>,
167
168 #[configurable(metadata(docs::examples = "http://localhost:9000"))]
170 #[configurable(validation(format = "uri"))]
171 #[serde(default = "default_endpoint")]
172 endpoint: String,
173
174 #[configurable(derived)]
175 #[serde(default)]
176 request: TowerRequestConfig<GcsTowerRequestConfigDefaults>,
177
178 #[serde(flatten)]
179 auth: GcpAuthConfig,
180
181 #[configurable(derived)]
182 tls: Option<TlsConfig>,
183
184 #[configurable(derived)]
185 #[serde(
186 default,
187 deserialize_with = "crate::serde::bool_or_struct",
188 skip_serializing_if = "crate::serde::is_default"
189 )]
190 acknowledgements: AcknowledgementsConfig,
191
192 #[configurable(derived)]
193 #[serde(default)]
194 pub timezone: Option<TimeZone>,
195}
196
197fn default_time_format() -> String {
198 "%s".to_string()
199}
200
201#[cfg(test)]
202fn default_config(encoding: EncodingConfigWithFraming) -> GcsSinkConfig {
203 GcsSinkConfig {
204 bucket: Default::default(),
205 acl: Default::default(),
206 storage_class: Default::default(),
207 metadata: Default::default(),
208 key_prefix: Default::default(),
209 filename_time_format: default_time_format(),
210 filename_append_uuid: true,
211 filename_extension: Default::default(),
212 encoding,
213 compression: Compression::gzip_default(),
214 batch: Default::default(),
215 endpoint: Default::default(),
216 request: Default::default(),
217 auth: Default::default(),
218 tls: Default::default(),
219 acknowledgements: Default::default(),
220 timezone: Default::default(),
221 }
222}
223
224impl GenerateConfig for GcsSinkConfig {
225 fn generate_config() -> toml::Value {
226 toml::from_str(indoc! {r#"
227 bucket = "my-bucket"
228 credentials_path = "/path/to/credentials.json"
229 framing.method = "newline_delimited"
230 encoding.codec = "json"
231 "#})
232 .unwrap()
233 }
234}
235
236#[async_trait::async_trait]
237#[typetag::serde(name = "gcp_cloud_storage")]
238impl SinkConfig for GcsSinkConfig {
239 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
240 let auth = self.auth.build(Scope::DevStorageReadWrite).await?;
241 let base_url = format!("{}/{}/", self.endpoint, self.bucket);
242 let tls = TlsSettings::from_options(self.tls.as_ref())?;
243 let client = HttpClient::new(tls, cx.proxy())?;
244 let healthcheck = build_healthcheck(
245 self.bucket.clone(),
246 client.clone(),
247 base_url.clone(),
248 auth.clone(),
249 )?;
250 auth.spawn_regenerate_token();
251 let sink = self.build_sink(client, base_url, auth, cx)?;
252
253 Ok((sink, healthcheck))
254 }
255
256 fn input(&self) -> Input {
257 Input::new(self.encoding.config().1.input_type() & DataType::Log)
258 }
259
260 fn acknowledgements(&self) -> &AcknowledgementsConfig {
261 &self.acknowledgements
262 }
263}
264
265impl GcsSinkConfig {
266 fn build_sink(
267 &self,
268 client: HttpClient,
269 base_url: String,
270 auth: GcpAuthenticator,
271 cx: SinkContext,
272 ) -> crate::Result<VectorSink> {
273 let request = self.request.into_settings();
274
275 let batch_settings = self.batch.into_batcher_settings()?;
276
277 let partitioner = self.key_partitioner()?;
278
279 let protocol = get_http_scheme_from_uri(&base_url.parse::<Uri>().unwrap());
280
281 let svc = ServiceBuilder::new()
282 .settings(request, GcsRetryLogic::default())
283 .service(GcsService::new(client, base_url, auth));
284
285 let request_settings = RequestSettings::new(self, cx)?;
286
287 let sink = GcsSink::new(svc, request_settings, partitioner, batch_settings, protocol);
288
289 Ok(VectorSink::from_event_streamsink(sink))
290 }
291
292 fn key_partitioner(&self) -> crate::Result<KeyPartitioner> {
293 Ok(KeyPartitioner::new(
294 Template::try_from(self.key_prefix.as_deref().unwrap_or("date=%F/"))
295 .context(KeyPrefixTemplateSnafu)?,
296 None,
297 ))
298 }
299}
300
301#[derive(Clone, Debug)]
305struct RequestSettings {
306 acl: Option<HeaderValue>,
307 content_type: HeaderValue,
308 content_encoding: Option<HeaderValue>,
309 storage_class: HeaderValue,
310 headers: Vec<(HeaderName, HeaderValue)>,
311 extension: String,
312 time_format: String,
313 append_uuid: bool,
314 encoder: (Transformer, Encoder<Framer>),
315 compression: Compression,
316 tz_offset: Option<FixedOffset>,
317}
318
319impl RequestBuilder<(String, Vec<Event>)> for RequestSettings {
320 type Metadata = (String, EventFinalizers);
321 type Events = Vec<Event>;
322 type Encoder = (Transformer, Encoder<Framer>);
323 type Payload = Bytes;
324 type Request = GcsRequest;
325 type Error = io::Error;
326
327 fn compression(&self) -> Compression {
328 self.compression
329 }
330
331 fn encoder(&self) -> &Self::Encoder {
332 &self.encoder
333 }
334
335 fn split_input(
336 &self,
337 input: (String, Vec<Event>),
338 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
339 let (partition_key, mut events) = input;
340 let finalizers = events.take_finalizers();
341 let builder = RequestMetadataBuilder::from_events(&events);
342
343 ((partition_key, finalizers), builder, events)
344 }
345
346 fn build_request(
347 &self,
348 gcp_metadata: Self::Metadata,
349 metadata: RequestMetadata,
350 payload: EncodeResult<Self::Payload>,
351 ) -> Self::Request {
352 let (key, finalizers) = gcp_metadata;
353 let filename = {
355 let seconds = match self.tz_offset {
356 Some(offset) => Utc::now().with_timezone(&offset).format(&self.time_format),
357 None => Utc::now()
358 .with_timezone(&chrono::Utc)
359 .format(&self.time_format),
360 };
361
362 if self.append_uuid {
363 let uuid = Uuid::new_v4();
364 format!("{}-{}", seconds, uuid.hyphenated())
365 } else {
366 seconds.to_string()
367 }
368 };
369
370 let key = format!("{}{}.{}", key, filename, self.extension);
371 let body = payload.into_payload();
372
373 GcsRequest {
374 key,
375 body,
376 finalizers,
377 settings: GcsRequestSettings {
378 acl: self.acl.clone(),
379 content_type: self.content_type.clone(),
380 content_encoding: self.content_encoding.clone(),
381 storage_class: self.storage_class.clone(),
382 headers: self.headers.clone(),
383 },
384 metadata,
385 }
386 }
387}
388
389impl RequestSettings {
390 fn new(config: &GcsSinkConfig, cx: SinkContext) -> crate::Result<Self> {
391 let transformer = config.encoding.transformer();
392 let (framer, serializer) = config.encoding.build(SinkType::MessageBased)?;
393 let encoder = Encoder::<Framer>::new(framer, serializer);
394 let acl = config
395 .acl
396 .map(|acl| HeaderValue::from_str(&to_string(acl)).unwrap());
397 let content_type = HeaderValue::from_str(encoder.content_type()).unwrap();
398 let content_encoding = config
399 .compression
400 .content_encoding()
401 .map(|ce| HeaderValue::from_str(&to_string(ce)).unwrap());
402 let storage_class = config.storage_class.unwrap_or_default();
403 let storage_class = HeaderValue::from_str(&to_string(storage_class)).unwrap();
404 let metadata = config
405 .metadata
406 .as_ref()
407 .map(|metadata| {
408 metadata
409 .iter()
410 .map(make_header)
411 .collect::<Result<Vec<_>, _>>()
412 })
413 .unwrap_or_else(|| Ok(vec![]))?;
414 let extension = config
415 .filename_extension
416 .clone()
417 .unwrap_or_else(|| config.compression.extension().into());
418 let time_format = config.filename_time_format.clone();
419 let append_uuid = config.filename_append_uuid;
420 let offset = config
421 .timezone
422 .or(cx.globals.timezone)
423 .and_then(timezone_to_offset);
424
425 Ok(Self {
426 acl,
427 content_type,
428 content_encoding,
429 storage_class,
430 headers: metadata,
431 extension,
432 time_format,
433 append_uuid,
434 compression: config.compression,
435 encoder: (transformer, encoder),
436 tz_offset: offset,
437 })
438 }
439}
440
441fn make_header((name, value): (&String, &String)) -> crate::Result<(HeaderName, HeaderValue)> {
443 Ok((
444 HeaderName::from_bytes(name.as_bytes())?,
445 HeaderValue::from_str(value)?,
446 ))
447}
448
449#[cfg(test)]
450mod tests {
451 use futures_util::{future::ready, stream};
452 use vector_lib::{
453 EstimatedJsonEncodedSizeOf,
454 codecs::{
455 JsonSerializerConfig, NewlineDelimitedEncoderConfig, TextSerializerConfig,
456 encoding::FramingConfig,
457 },
458 partition::Partitioner,
459 request_metadata::GroupedCountByteSize,
460 };
461
462 use super::*;
463 use crate::{
464 event::LogEvent,
465 test_util::{
466 components::{SINK_TAGS, run_and_assert_sink_compliance},
467 http::{always_200_response, spawn_blackhole_http_server},
468 },
469 };
470
471 #[test]
472 fn generate_config() {
473 crate::test_util::test_generate_config::<GcsSinkConfig>();
474 }
475
476 #[tokio::test]
477 async fn component_spec_compliance() {
478 let mock_endpoint = spawn_blackhole_http_server(always_200_response).await;
479
480 let context = SinkContext::default();
481
482 let tls = TlsSettings::default();
483 let client =
484 HttpClient::new(tls, context.proxy()).expect("should not fail to create HTTP client");
485
486 let config =
487 default_config((None::<FramingConfig>, JsonSerializerConfig::default()).into());
488 let sink = config
489 .build_sink(
490 client,
491 mock_endpoint.to_string(),
492 GcpAuthenticator::None,
493 context,
494 )
495 .expect("failed to build sink");
496
497 let event = Event::Log(LogEvent::from("simple message"));
498 run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await;
499 }
500
501 #[test]
502 fn gcs_encode_event_apply_rules() {
503 crate::test_util::trace_init();
504
505 let message = "hello world".to_string();
506 let mut event = LogEvent::from(message);
507 event.insert("key", "value");
508
509 let sink_config = GcsSinkConfig {
510 key_prefix: Some("key: {{ key }}".into()),
511 ..default_config((None::<FramingConfig>, TextSerializerConfig::default()).into())
512 };
513 let key = sink_config
514 .key_partitioner()
515 .unwrap()
516 .partition(&Event::Log(event))
517 .expect("key wasn't provided");
518
519 assert_eq!(key, "key: value");
520 }
521
522 fn request_settings(sink_config: &GcsSinkConfig, context: SinkContext) -> RequestSettings {
523 RequestSettings::new(sink_config, context).expect("Could not create request settings")
524 }
525
526 fn build_request(extension: Option<&str>, uuid: bool, compression: Compression) -> GcsRequest {
527 let context = SinkContext::default();
528 let sink_config = GcsSinkConfig {
529 key_prefix: Some("key/".into()),
530 filename_time_format: "date".into(),
531 filename_extension: extension.map(Into::into),
532 filename_append_uuid: uuid,
533 compression,
534 ..default_config(
535 (
536 Some(NewlineDelimitedEncoderConfig::new()),
537 JsonSerializerConfig::default(),
538 )
539 .into(),
540 )
541 };
542 let log = LogEvent::default().into();
543 let key = sink_config
544 .key_partitioner()
545 .unwrap()
546 .partition(&log)
547 .expect("key wasn't provided");
548
549 let mut byte_size = GroupedCountByteSize::new_untagged();
550 byte_size.add_event(&log, log.estimated_json_encoded_size_of());
551
552 let request_settings = request_settings(&sink_config, context);
553 let (metadata, metadata_request_builder, _events) =
554 request_settings.split_input((key, vec![log]));
555 let payload = EncodeResult::uncompressed(Bytes::new(), byte_size);
556 let request_metadata = metadata_request_builder.build(&payload);
557
558 request_settings.build_request(metadata, request_metadata, payload)
559 }
560
561 #[test]
562 fn gcs_build_request() {
563 let req = build_request(Some("ext"), false, Compression::None);
564 assert_eq!(req.key, "key/date.ext".to_string());
565
566 let req = build_request(None, false, Compression::None);
567 assert_eq!(req.key, "key/date.log".to_string());
568
569 let req = build_request(None, false, Compression::gzip_default());
570 assert_eq!(req.key, "key/date.log.gz".to_string());
571
572 let req = build_request(None, true, Compression::gzip_default());
573 assert_ne!(req.key, "key/date.log.gz".to_string());
574 }
575}