vector/sinks/gcp/stackdriver/logs/
config.rs1use crate::{
4 gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
5 http::HttpClient,
6 schema,
7 sinks::{
8 gcs_common::config::healthcheck_response,
9 prelude::*,
10 util::{
11 http::{http_response_retry_logic, HttpService},
12 service::TowerRequestConfigDefaults,
13 BoxedRawValue, RealtimeSizeBasedDefaultBatchSettings,
14 },
15 },
16};
17use http::{Request, Uri};
18use hyper::Body;
19use snafu::Snafu;
20use std::collections::HashMap;
21use vector_lib::lookup::lookup_v2::ConfigValuePath;
22use vrl::value::Kind;
23
24use super::{
25 encoder::StackdriverLogsEncoder, request_builder::StackdriverLogsRequestBuilder,
26 service::StackdriverLogsServiceRequestBuilder, sink::StackdriverLogsSink,
27};
28
29#[derive(Debug, Snafu)]
30enum HealthcheckError {
31 #[snafu(display("Resource not found"))]
32 NotFound,
33}
34
35#[derive(Clone, Copy, Debug)]
36pub struct StackdriverTowerRequestConfigDefaults;
37
38impl TowerRequestConfigDefaults for StackdriverTowerRequestConfigDefaults {
39 const RATE_LIMIT_NUM: u64 = 1_000;
40}
41
42#[configurable_component(sink(
44 "gcp_stackdriver_logs",
45 "Deliver logs to GCP's Cloud Operations suite."
46))]
47#[derive(Clone, Debug, Default)]
48#[serde(deny_unknown_fields)]
49pub(super) struct StackdriverConfig {
50 #[serde(skip, default = "default_endpoint")]
51 pub(super) endpoint: String,
52
53 #[serde(flatten)]
54 pub(super) log_name: StackdriverLogName,
55
56 pub(super) log_id: Template,
60
61 pub(super) resource: StackdriverResource,
63
64 #[serde(flatten)]
65 pub(super) label_config: StackdriverLabelConfig,
66
67 #[configurable(metadata(docs::examples = "severity"))]
81 pub(super) severity_key: Option<ConfigValuePath>,
82
83 #[serde(flatten)]
84 pub(super) auth: GcpAuthConfig,
85
86 #[configurable(derived)]
87 #[serde(default, skip_serializing_if = "crate::serde::is_default")]
88 pub(super) encoding: Transformer,
89
90 #[configurable(derived)]
91 #[serde(default)]
92 pub(super) batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
93
94 #[configurable(derived)]
95 #[serde(default)]
96 pub(super) request: TowerRequestConfig<StackdriverTowerRequestConfigDefaults>,
97
98 #[configurable(derived)]
99 pub(super) tls: Option<TlsConfig>,
100
101 #[configurable(derived)]
102 #[serde(
103 default,
104 deserialize_with = "crate::serde::bool_or_struct",
105 skip_serializing_if = "crate::serde::is_default"
106 )]
107 acknowledgements: AcknowledgementsConfig,
108}
109
110pub(super) fn default_endpoint() -> String {
111 "https://logging.googleapis.com/v2/entries:write".to_string()
112}
113
114const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000;
116
117#[configurable_component]
119#[derive(Clone, Debug, Derivative)]
120#[derivative(Default)]
121pub(super) enum StackdriverLogName {
122 #[serde(rename = "billing_account_id")]
126 #[configurable(metadata(docs::examples = "012345-6789AB-CDEF01"))]
127 BillingAccount(String),
128
129 #[serde(rename = "folder_id")]
137 #[configurable(metadata(docs::examples = "My Folder"))]
138 Folder(String),
139
140 #[serde(rename = "organization_id")]
146 #[configurable(metadata(docs::examples = "622418129737"))]
147 Organization(String),
148
149 #[derivative(Default)]
157 #[serde(rename = "project_id")]
158 #[configurable(metadata(docs::examples = "vector-123456"))]
159 Project(String),
160}
161
162#[configurable_component]
164#[derive(Clone, Debug, Derivative)]
165#[derivative(Default)]
166pub(super) struct StackdriverLabelConfig {
167 #[configurable(metadata(docs::examples = "logging.googleapis.com/labels"))]
170 #[serde(default = "default_labels_key")]
171 pub(super) labels_key: Option<String>,
172
173 #[configurable(metadata(
175 docs::additional_props_description = "A key, value pair that describes a log entry."
176 ))]
177 #[configurable(metadata(docs::examples = "labels_examples()"))]
178 #[serde(default)]
179 pub(super) labels: HashMap<String, Template>,
180}
181
182fn labels_examples() -> HashMap<String, String> {
183 let mut example = HashMap::new();
184 example.insert("label_1".to_string(), "value_1".to_string());
185 example.insert("label_2".to_string(), "{{ template_value_2 }}".to_string());
186 example
187}
188
189pub(super) fn default_labels_key() -> Option<String> {
190 Some("logging.googleapis.com/labels".to_string())
191}
192
193#[configurable_component]
206#[derive(Clone, Debug, Default)]
207pub(super) struct StackdriverResource {
208 #[serde(rename = "type")]
216 pub(super) type_: String,
217
218 #[serde(flatten)]
220 #[configurable(metadata(docs::additional_props_description = "A type-specific label."))]
221 #[configurable(metadata(docs::examples = "label_examples()"))]
222 pub(super) labels: HashMap<String, Template>,
223}
224
225fn label_examples() -> HashMap<String, String> {
226 let mut example = HashMap::new();
227 example.insert("instanceId".to_string(), "Twilight".to_string());
228 example.insert("zone".to_string(), "{{ zone }}".to_string());
229 example
230}
231
232impl_generate_config_from_default!(StackdriverConfig);
233
234#[async_trait::async_trait]
235#[typetag::serde(name = "gcp_stackdriver_logs")]
236impl SinkConfig for StackdriverConfig {
237 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
238 let auth = self.auth.build(Scope::LoggingWrite).await?;
239
240 let request_builder = StackdriverLogsRequestBuilder {
241 encoder: StackdriverLogsEncoder::new(
242 self.encoding.clone(),
243 self.log_id.clone(),
244 self.log_name.clone(),
245 self.label_config.clone(),
246 self.resource.clone(),
247 self.severity_key.clone(),
248 ),
249 };
250
251 let batch_settings = self
252 .batch
253 .validate()?
254 .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)?
255 .into_batcher_settings()?;
256
257 let request_limits = self.request.into_settings();
258
259 let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
260 let client = HttpClient::new(tls_settings, cx.proxy())?;
261
262 let uri: Uri = self.endpoint.parse()?;
263
264 let stackdriver_logs_service_request_builder = StackdriverLogsServiceRequestBuilder {
265 uri: uri.clone(),
266 auth: auth.clone(),
267 };
268
269 let service = HttpService::new(client.clone(), stackdriver_logs_service_request_builder);
270
271 let service = ServiceBuilder::new()
272 .settings(request_limits, http_response_retry_logic())
273 .service(service);
274
275 let sink = StackdriverLogsSink::new(service, batch_settings, request_builder);
276
277 let healthcheck = healthcheck(client, auth.clone(), uri).boxed();
278
279 auth.spawn_regenerate_token();
280
281 Ok((VectorSink::from_event_streamsink(sink), healthcheck))
282 }
283
284 fn input(&self) -> Input {
285 let requirement =
286 schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp());
287
288 Input::log().with_schema_requirement(requirement)
289 }
290
291 fn acknowledgements(&self) -> &AcknowledgementsConfig {
292 &self.acknowledgements
293 }
294}
295
296async fn healthcheck(client: HttpClient, auth: GcpAuthenticator, uri: Uri) -> crate::Result<()> {
297 let entries: Vec<BoxedRawValue> = Vec::new();
298 let events = serde_json::json!({ "entries": entries });
299
300 let body = crate::serde::json::to_bytes(&events).unwrap().freeze();
301
302 let mut request = Request::post(uri)
303 .header("Content-Type", "application/json")
304 .body(body)
305 .unwrap();
306
307 auth.apply(&mut request);
308
309 let request = request.map(Body::from);
310
311 let response = client.send(request).await?;
312
313 healthcheck_response(response, HealthcheckError::NotFound.into())
314}