vector/sinks/gcp/stackdriver/logs/
config.rs

1//! Configuration for the `gcp_stackdriver_logs` sink.
2
3use std::collections::HashMap;
4
5use http::{Request, Uri};
6use hyper::Body;
7use snafu::Snafu;
8use vector_lib::lookup::lookup_v2::ConfigValuePath;
9use vrl::value::Kind;
10
11use super::{
12    encoder::StackdriverLogsEncoder, request_builder::StackdriverLogsRequestBuilder,
13    service::StackdriverLogsServiceRequestBuilder, sink::StackdriverLogsSink,
14};
15use crate::{
16    gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
17    http::HttpClient,
18    schema,
19    sinks::{
20        gcs_common::config::healthcheck_response,
21        prelude::*,
22        util::{
23            BoxedRawValue, RealtimeSizeBasedDefaultBatchSettings,
24            http::{HttpService, http_response_retry_logic},
25            service::TowerRequestConfigDefaults,
26        },
27    },
28};
29
30#[derive(Debug, Snafu)]
31enum HealthcheckError {
32    #[snafu(display("Resource not found"))]
33    NotFound,
34}
35
36#[derive(Clone, Copy, Debug)]
37pub struct StackdriverTowerRequestConfigDefaults;
38
39impl TowerRequestConfigDefaults for StackdriverTowerRequestConfigDefaults {
40    const RATE_LIMIT_NUM: u64 = 1_000;
41}
42
43/// Configuration for the `gcp_stackdriver_logs` sink.
44#[configurable_component(sink(
45    "gcp_stackdriver_logs",
46    "Deliver logs to GCP's Cloud Operations suite."
47))]
48#[derive(Clone, Debug, Default)]
49#[serde(deny_unknown_fields)]
50pub(super) struct StackdriverConfig {
51    #[serde(skip, default = "default_endpoint")]
52    pub(super) endpoint: String,
53
54    #[serde(flatten)]
55    pub(super) log_name: StackdriverLogName,
56
57    /// The log ID to which to publish logs.
58    ///
59    /// This is a name you create to identify this log stream.
60    pub(super) log_id: Template,
61
62    /// The monitored resource to associate the logs with.
63    pub(super) resource: StackdriverResource,
64
65    #[serde(flatten)]
66    pub(super) label_config: StackdriverLabelConfig,
67
68    /// The field of the log event from which to take the outgoing log’s `severity` field.
69    ///
70    /// The named field is removed from the log event if present, and must be either an integer
71    /// between 0 and 800 or a string containing one of the [severity level names][sev_names] (case
72    /// is ignored) or a common prefix such as `err`.
73    ///
74    /// If no severity key is specified, the severity of outgoing records is set to 0 (`DEFAULT`).
75    ///
76    /// See the [GCP Stackdriver Logging LogSeverity description][logsev_docs] for more details on
77    /// the value of the `severity` field.
78    ///
79    /// [sev_names]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
80    /// [logsev_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
81    #[configurable(metadata(docs::examples = "severity"))]
82    pub(super) severity_key: Option<ConfigValuePath>,
83
84    #[serde(flatten)]
85    pub(super) auth: GcpAuthConfig,
86
87    #[configurable(derived)]
88    #[serde(default, skip_serializing_if = "crate::serde::is_default")]
89    pub(super) encoding: Transformer,
90
91    #[configurable(derived)]
92    #[serde(default)]
93    pub(super) batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
94
95    #[configurable(derived)]
96    #[serde(default)]
97    pub(super) request: TowerRequestConfig<StackdriverTowerRequestConfigDefaults>,
98
99    #[configurable(derived)]
100    pub(super) tls: Option<TlsConfig>,
101
102    #[configurable(derived)]
103    #[serde(
104        default,
105        deserialize_with = "crate::serde::bool_or_struct",
106        skip_serializing_if = "crate::serde::is_default"
107    )]
108    acknowledgements: AcknowledgementsConfig,
109}
110
111pub(super) fn default_endpoint() -> String {
112    "https://logging.googleapis.com/v2/entries:write".to_string()
113}
114
115// 10MB limit for entries.write: https://cloud.google.com/logging/quotas#api-limits
116const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000;
117
118/// Logging locations.
119#[configurable_component]
120#[derive(Clone, Debug, Derivative)]
121#[derivative(Default)]
122pub(super) enum StackdriverLogName {
123    /// The billing account ID to which to publish logs.
124    ///
125    ///	Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
126    #[serde(rename = "billing_account_id")]
127    #[configurable(metadata(docs::examples = "012345-6789AB-CDEF01"))]
128    BillingAccount(String),
129
130    /// The folder ID to which to publish logs.
131    ///
132    /// See the [Google Cloud Platform folder documentation][folder_docs] for more details.
133    ///
134    ///	Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
135    ///
136    /// [folder_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-folders
137    #[serde(rename = "folder_id")]
138    #[configurable(metadata(docs::examples = "My Folder"))]
139    Folder(String),
140
141    /// The organization ID to which to publish logs.
142    ///
143    /// This would be the identifier assigned to your organization on Google Cloud Platform.
144    ///
145    ///	Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
146    #[serde(rename = "organization_id")]
147    #[configurable(metadata(docs::examples = "622418129737"))]
148    Organization(String),
149
150    /// The project ID to which to publish logs.
151    ///
152    /// See the [Google Cloud Platform project management documentation][project_docs] for more details.
153    ///
154    ///	Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
155    ///
156    /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects
157    #[derivative(Default)]
158    #[serde(rename = "project_id")]
159    #[configurable(metadata(docs::examples = "vector-123456"))]
160    Project(String),
161}
162
163/// Label Configuration.
164#[configurable_component]
165#[derive(Clone, Debug, Default)]
166pub(super) struct StackdriverLabelConfig {
167    /// The value of this field is used to retrieve the associated labels from the `jsonPayload`
168    /// and extract their values to set as LogEntry labels.
169    #[configurable(metadata(docs::examples = "logging.googleapis.com/labels"))]
170    #[serde(default = "default_labels_key")]
171    pub(super) labels_key: Option<String>,
172
173    /// A map of key, value pairs that provides additional information about the log entry.
174    #[configurable(metadata(
175        docs::additional_props_description = "A key, value pair that describes a log entry."
176    ))]
177    #[configurable(metadata(docs::examples = "labels_examples()"))]
178    #[serde(default)]
179    pub(super) labels: HashMap<String, Template>,
180}
181
182fn labels_examples() -> HashMap<String, String> {
183    let mut example = HashMap::new();
184    example.insert("label_1".to_string(), "value_1".to_string());
185    example.insert("label_2".to_string(), "{{ template_value_2 }}".to_string());
186    example
187}
188
189pub(super) fn default_labels_key() -> Option<String> {
190    Some("logging.googleapis.com/labels".to_string())
191}
192
193/// A monitored resource.
194///
195/// Monitored resources in GCP allow associating logs and metrics specifically with native resources
196/// within Google Cloud Platform. This takes the form of a "type" field which identifies the
197/// resource, and a set of type-specific labels to uniquely identify a resource of that type.
198///
199/// See [Monitored resource types][mon_docs] for more information.
200///
201/// [mon_docs]: https://cloud.google.com/monitoring/api/resources
202// TODO: this type is specific to the stackdrivers log sink because it allows for template-able
203// label values, but we should consider replacing `sinks::gcp::GcpTypedResource` with this so both
204// the stackdriver metrics _and_ logs sink can have template-able label values, and less duplication
205#[configurable_component]
206#[derive(Clone, Debug, Default)]
207pub(super) struct StackdriverResource {
208    /// The monitored resource type.
209    ///
210    /// For example, the type of a Compute Engine VM instance is `gce_instance`.
211    /// See the [Google Cloud Platform monitored resource documentation][gcp_resources] for
212    /// more details.
213    ///
214    /// [gcp_resources]: https://cloud.google.com/monitoring/api/resources
215    #[serde(rename = "type")]
216    pub(super) type_: String,
217
218    /// Type-specific labels.
219    #[serde(flatten)]
220    #[configurable(metadata(docs::additional_props_description = "A type-specific label."))]
221    #[configurable(metadata(docs::examples = "label_examples()"))]
222    pub(super) labels: HashMap<String, Template>,
223}
224
225fn label_examples() -> HashMap<String, String> {
226    let mut example = HashMap::new();
227    example.insert("instanceId".to_string(), "Twilight".to_string());
228    example.insert("zone".to_string(), "{{ zone }}".to_string());
229    example
230}
231
232impl_generate_config_from_default!(StackdriverConfig);
233
234#[async_trait::async_trait]
235#[typetag::serde(name = "gcp_stackdriver_logs")]
236impl SinkConfig for StackdriverConfig {
237    async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
238        let auth = self.auth.build(Scope::LoggingWrite).await?;
239
240        let request_builder = StackdriverLogsRequestBuilder {
241            encoder: StackdriverLogsEncoder::new(
242                self.encoding.clone(),
243                self.log_id.clone(),
244                self.log_name.clone(),
245                self.label_config.clone(),
246                self.resource.clone(),
247                self.severity_key.clone(),
248            ),
249        };
250
251        let batch_settings = self
252            .batch
253            .validate()?
254            .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)?
255            .into_batcher_settings()?;
256
257        let request_limits = self.request.into_settings();
258
259        let tls_settings = TlsSettings::from_options(self.tls.as_ref())?;
260        let client = HttpClient::new(tls_settings, cx.proxy())?;
261
262        let uri: Uri = self.endpoint.parse()?;
263
264        let stackdriver_logs_service_request_builder = StackdriverLogsServiceRequestBuilder {
265            uri: uri.clone(),
266            auth: auth.clone(),
267        };
268
269        let service = HttpService::new(client.clone(), stackdriver_logs_service_request_builder);
270
271        let service = ServiceBuilder::new()
272            .settings(request_limits, http_response_retry_logic())
273            .service(service);
274
275        let sink = StackdriverLogsSink::new(service, batch_settings, request_builder);
276
277        let healthcheck = healthcheck(client, auth.clone(), uri).boxed();
278
279        auth.spawn_regenerate_token();
280
281        Ok((VectorSink::from_event_streamsink(sink), healthcheck))
282    }
283
284    fn input(&self) -> Input {
285        let requirement =
286            schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp());
287
288        Input::log().with_schema_requirement(requirement)
289    }
290
291    fn acknowledgements(&self) -> &AcknowledgementsConfig {
292        &self.acknowledgements
293    }
294}
295
296async fn healthcheck(client: HttpClient, auth: GcpAuthenticator, uri: Uri) -> crate::Result<()> {
297    let entries: Vec<BoxedRawValue> = Vec::new();
298    let events = serde_json::json!({ "entries": entries });
299
300    let body = crate::serde::json::to_bytes(&events).unwrap().freeze();
301
302    let mut request = Request::post(uri)
303        .header("Content-Type", "application/json")
304        .body(body)
305        .unwrap();
306
307    auth.apply(&mut request);
308
309    let request = request.map(Body::from);
310
311    let response = client.send(request).await?;
312
313    healthcheck_response(response, HealthcheckError::NotFound.into())
314}