vector/sinks/gcp/stackdriver/logs/
config.rs1use std::collections::HashMap;
4
5use http::{Request, Uri};
6use hyper::Body;
7use snafu::Snafu;
8use vector_lib::lookup::lookup_v2::ConfigValuePath;
9use vrl::value::Kind;
10
11use super::{
12 encoder::StackdriverLogsEncoder, request_builder::StackdriverLogsRequestBuilder,
13 service::StackdriverLogsServiceRequestBuilder, sink::StackdriverLogsSink,
14};
15use crate::{
16 gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
17 http::HttpClient,
18 schema,
19 sinks::{
20 gcs_common::config::healthcheck_response,
21 prelude::*,
22 util::{
23 BoxedRawValue, RealtimeSizeBasedDefaultBatchSettings,
24 http::{HttpService, http_response_retry_logic},
25 service::TowerRequestConfigDefaults,
26 },
27 },
28};
29
30#[derive(Debug, Snafu)]
31enum HealthcheckError {
32 #[snafu(display("Resource not found"))]
33 NotFound,
34}
35
36#[derive(Clone, Copy, Debug)]
37pub struct StackdriverTowerRequestConfigDefaults;
38
39impl TowerRequestConfigDefaults for StackdriverTowerRequestConfigDefaults {
40 const RATE_LIMIT_NUM: u64 = 1_000;
41}
42
43#[configurable_component(sink(
45 "gcp_stackdriver_logs",
46 "Deliver logs to GCP's Cloud Operations suite."
47))]
48#[derive(Clone, Debug, Default)]
49#[serde(deny_unknown_fields)]
50pub(super) struct StackdriverConfig {
51 #[serde(skip, default = "default_endpoint")]
52 pub(super) endpoint: String,
53
54 #[serde(flatten)]
55 pub(super) log_name: StackdriverLogName,
56
57 pub(super) log_id: Template,
61
62 pub(super) resource: StackdriverResource,
64
65 #[serde(flatten)]
66 pub(super) label_config: StackdriverLabelConfig,
67
68 #[configurable(metadata(docs::examples = "severity"))]
82 pub(super) severity_key: Option<ConfigValuePath>,
83
84 #[serde(flatten)]
85 pub(super) auth: GcpAuthConfig,
86
87 #[configurable(derived)]
88 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
89 pub(super) encoding: Transformer,
90
91 #[configurable(derived)]
92 #[serde(default)]
93 pub(super) batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
94
95 #[configurable(derived)]
96 #[serde(default)]
97 pub(super) request: TowerRequestConfig<StackdriverTowerRequestConfigDefaults>,
98
99 #[configurable(derived)]
100 pub(super) tls: Option<TlsConfig>,
101
102 #[configurable(derived)]
103 #[serde(
104 default,
105 deserialize_with = "crate::serde::bool_or_struct",
106 skip_serializing_if = "crate::serde::is_default"
107 )]
108 acknowledgements: AcknowledgementsConfig,
109}
110
111pub(super) fn default_endpoint() -> String {
112 "https://logging.googleapis.com/v2/entries:write".to_string()
113}
114
115const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000;
117
118#[configurable_component]
120#[derive(Clone, Debug, Derivative)]
121#[derivative(Default)]
122pub(super) enum StackdriverLogName {
123 #[serde(rename = "billing_account_id")]
127 #[configurable(metadata(docs::examples = "012345-6789AB-CDEF01"))]
128 BillingAccount(String),
129
130 #[serde(rename = "folder_id")]
138 #[configurable(metadata(docs::examples = "My Folder"))]
139 Folder(String),
140
141 #[serde(rename = "organization_id")]
147 #[configurable(metadata(docs::examples = "622418129737"))]
148 Organization(String),
149
150 #[derivative(Default)]
158 #[serde(rename = "project_id")]
159 #[configurable(metadata(docs::examples = "vector-123456"))]
160 Project(String),
161}
162
163#[configurable_component]
165#[derive(Clone, Debug, Derivative)]
166#[derivative(Default)]
167pub(super) struct StackdriverLabelConfig {
168 #[configurable(metadata(docs::examples = "logging.googleapis.com/labels"))]
171 #[serde(default = "default_labels_key")]
172 pub(super) labels_key: Option<String>,
173
174 #[configurable(metadata(
176 docs::additional_props_description = "A key, value pair that describes a log entry."
177 ))]
178 #[configurable(metadata(docs::examples = "labels_examples()"))]
179 #[serde(default)]
180 pub(super) labels: HashMap<String, Template>,
181}
182
183fn labels_examples() -> HashMap<String, String> {
184 let mut example = HashMap::new();
185 example.insert("label_1".to_string(), "value_1".to_string());
186 example.insert("label_2".to_string(), "{{ template_value_2 }}".to_string());
187 example
188}
189
190pub(super) fn default_labels_key() -> Option<String> {
191 Some("logging.googleapis.com/labels".to_string())
192}
193
194#[configurable_component]
207#[derive(Clone, Debug, Default)]
208pub(super) struct StackdriverResource {
209 #[serde(rename = "type")]
217 pub(super) type_: String,
218
219 #[serde(flatten)]
221 #[configurable(metadata(docs::additional_props_description = "A type-specific label."))]
222 #[configurable(metadata(docs::examples = "label_examples()"))]
223 pub(super) labels: HashMap<String, Template>,
224}
225
226fn label_examples() -> HashMap<String, String> {
227 let mut example = HashMap::new();
228 example.insert("instanceId".to_string(), "Twilight".to_string());
229 example.insert("zone".to_string(), "{{ zone }}".to_string());
230 example
231}
232
233impl_generate_config_from_default!(StackdriverConfig);
234
235#[async_trait::async_trait]
236#[typetag::serde(name = "gcp_stackdriver_logs")]
237impl SinkConfig for StackdriverConfig {
238 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
239 let auth = self.auth.build(Scope::LoggingWrite).await?;
240
241 let request_builder = StackdriverLogsRequestBuilder {
242 encoder: StackdriverLogsEncoder::new(
243 self.encoding.clone(),
244 self.log_id.clone(),
245 self.log_name.clone(),
246 self.label_config.clone(),
247 self.resource.clone(),
248 self.severity_key.clone(),
249 ),
250 };
251
252 let batch_settings = self
253 .batch
254 .validate()?
255 .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)?
256 .into_batcher_settings()?;
257
258 let request_limits = self.request.into_settings();
259
260 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
261 let client = HttpClient::new(tls_settings, cx.proxy())?;
262
263 let uri: Uri = self.endpoint.parse()?;
264
265 let stackdriver_logs_service_request_builder = StackdriverLogsServiceRequestBuilder {
266 uri: uri.clone(),
267 auth: auth.clone(),
268 };
269
270 let service = HttpService::new(client.clone(), stackdriver_logs_service_request_builder);
271
272 let service = ServiceBuilder::new()
273 .settings(request_limits, http_response_retry_logic())
274 .service(service);
275
276 let sink = StackdriverLogsSink::new(service, batch_settings, request_builder);
277
278 let healthcheck = healthcheck(client, auth.clone(), uri).boxed();
279
280 auth.spawn_regenerate_token();
281
282 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
283 }
284
285 fn input(&self) -> Input {
286 let requirement =
287 schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp());
288
289 Input::log().with_schema_requirement(requirement)
290 }
291
292 fn acknowledgements(&self) -> &AcknowledgementsConfig {
293 &self.acknowledgements
294 }
295}
296
297async fn healthcheck(client: HttpClient, auth: GcpAuthenticator, uri: Uri) -> crate::Result<()> {
298 let entries: Vec<BoxedRawValue> = Vec::new();
299 let events = serde_json::json!({ "entries": entries });
300
301 let body = crate::serde::json::to_bytes(&events).unwrap().freeze();
302
303 let mut request = Request::post(uri)
304 .header("Content-Type", "application/json")
305 .body(body)
306 .unwrap();
307
308 auth.apply(&mut request);
309
310 let request = request.map(Body::from);
311
312 let response = client.send(request).await?;
313
314 healthcheck_response(response, HealthcheckError::NotFound.into())
315}