1use std::{collections::HashMap, io};
5
6use bytes::{Bytes, BytesMut};
7use futures_util::{future::BoxFuture, task::Poll};
8use goauth::scopes::Scope;
9use http::{
10 Request, StatusCode, Uri,
11 header::{self, HeaderName, HeaderValue},
12};
13use hyper::Body;
14use indoc::indoc;
15use serde::Serialize;
16use serde_json::json;
17use snafu::Snafu;
18use tokio_util::codec::Encoder as _;
19use tower::{Service, ServiceBuilder};
20use vector_lib::{
21 EstimatedJsonEncodedSizeOf,
22 config::{AcknowledgementsConfig, Input, telemetry},
23 configurable::configurable_component,
24 event::{Event, EventFinalizers, Finalizable},
25 request_metadata::{GroupedCountByteSize, MetaDescriptive, RequestMetadata},
26 sink::VectorSink,
27};
28use vrl::value::Kind;
29
30use crate::{
31 codecs::{self, EncodingConfig},
32 config::{GenerateConfig, SinkConfig, SinkContext},
33 gcp::{GcpAuthConfig, GcpAuthenticator},
34 http::HttpClient,
35 schema,
36 sinks::{
37 Healthcheck,
38 gcp_chronicle::{
39 compression::ChronicleCompression,
40 partitioner::{ChroniclePartitionKey, ChroniclePartitioner},
41 sink::ChronicleSink,
42 },
43 gcs_common::{
44 config::{GcsRetryLogic, healthcheck_response},
45 service::GcsResponse,
46 },
47 util::{
48 BatchConfig, Compression, RequestBuilder, SinkBatchSettings, TowerRequestConfig,
49 encoding::{Encoder, as_tracked_write},
50 metadata::RequestMetadataBuilder,
51 request_builder::EncodeResult,
52 service::TowerRequestConfigDefaults,
53 },
54 },
55 template::{Template, TemplateParseError},
56 tls::{TlsConfig, TlsSettings},
57};
58
59#[derive(Debug, Snafu)]
60#[snafu(visibility(pub))]
61pub enum GcsHealthcheckError {
62 #[snafu(display("log_type template parse error: {}", source))]
63 LogTypeTemplate { source: TemplateParseError },
64
65 #[snafu(display("Endpoint not found"))]
66 NotFound,
67}
68
69#[configurable_component]
71#[derive(Clone, Copy, Debug, Eq, PartialEq)]
72#[serde(rename_all = "snake_case")]
73pub enum Region {
74 Eu,
76
77 Us,
79
80 Asia,
82
83 SãoPaulo,
85
86 Canada,
88
89 Dammam,
91
92 Doha,
94
95 Frankfurt,
97
98 London,
100
101 Mumbai,
103
104 Paris,
106
107 Singapore,
109
110 Sydney,
112
113 TelAviv,
115
116 Tokyo,
118
119 Turin,
121
122 Zurich,
124}
125
126impl Region {
127 const fn endpoint(self) -> &'static str {
129 match self {
130 Region::Eu => "https://europe-malachiteingestion-pa.googleapis.com",
131 Region::Us => "https://malachiteingestion-pa.googleapis.com",
132 Region::Asia => "https://asia-southeast1-malachiteingestion-pa.googleapis.com",
133 Region::SãoPaulo => "https://southamerica-east1-malachiteingestion-pa.googleapis.com",
134 Region::Canada => {
135 "https://northamerica-northeast2-malachiteingestion-pa.googleapis.com"
136 }
137 Region::Dammam => "https://me-central2-malachiteingestion-pa.googleapis.com",
138 Region::Doha => "https://me-central1-malachiteingestion-pa.googleapis.com",
139 Region::Frankfurt => "https://europe-west3-malachiteingestion-pa.googleapis.com",
140 Region::London => "https://europe-west2-malachiteingestion-pa.googleapis.com",
141 Region::Mumbai => "https://asia-south1-malachiteingestion-pa.googleapis.com",
142 Region::Paris => "https://europe-west9-malachiteingestion-pa.googleapis.com",
143 Region::Singapore => "https://asia-southeast1-malachiteingestion-pa.googleapis.com",
144 Region::Sydney => "https://australia-southeast1-malachiteingestion-pa.googleapis.com",
145 Region::TelAviv => "https://me-west1-malachiteingestion-pa.googleapis.com",
146 Region::Tokyo => "https://asia-northeast1-malachiteingestion-pa.googleapis.com",
147 Region::Turin => "https://europe-west12-malachiteingestion-pa.googleapis.com",
148 Region::Zurich => "https://europe-west6-malachiteingestion-pa.googleapis.com",
149 }
150 }
151}
152
153#[derive(Clone, Copy, Debug, Default)]
154pub struct ChronicleUnstructuredDefaultBatchSettings;
155
156impl SinkBatchSettings for ChronicleUnstructuredDefaultBatchSettings {
162 const MAX_EVENTS: Option<usize> = None;
163 const MAX_BYTES: Option<usize> = Some(1_000_000);
164 const TIMEOUT_SECS: f64 = 15.0;
165}
166
167#[derive(Clone, Copy, Debug)]
168pub struct ChronicleUnstructuredTowerRequestConfigDefaults;
169
170impl TowerRequestConfigDefaults for ChronicleUnstructuredTowerRequestConfigDefaults {
171 const RATE_LIMIT_NUM: u64 = 1_000;
172}
173
174#[configurable_component(sink(
176 "gcp_chronicle_unstructured",
177 "Store unstructured log events in Google Chronicle."
178))]
179#[derive(Clone, Debug)]
180pub struct ChronicleUnstructuredConfig {
181 #[configurable(metadata(
183 docs::examples = "127.0.0.1:8080",
184 docs::examples = "example.com:12345"
185 ))]
186 pub endpoint: Option<String>,
187
188 #[configurable(derived)]
190 pub region: Option<Region>,
191
192 #[configurable(validation(format = "uuid"))]
194 #[configurable(metadata(docs::examples = "c8c65bfa-5f2c-42d4-9189-64bb7b939f2c"))]
195 pub customer_id: String,
196
197 #[configurable(metadata(docs::templateable))]
199 #[configurable(metadata(
200 docs::examples = "production",
201 docs::examples = "production-{{ namespace }}",
202 ))]
203 #[configurable(metadata(docs::advanced))]
204 pub namespace: Option<Template>,
205
206 #[configurable(metadata(docs::examples = "chronicle_labels_examples()"))]
208 #[configurable(metadata(docs::additional_props_description = "A Chronicle label."))]
209 pub labels: Option<HashMap<String, String>>,
210
211 #[serde(flatten)]
212 pub auth: GcpAuthConfig,
213
214 #[configurable(derived)]
215 #[serde(default)]
216 pub batch: BatchConfig<ChronicleUnstructuredDefaultBatchSettings>,
217
218 #[configurable(derived)]
219 pub encoding: EncodingConfig,
220
221 #[serde(default)]
222 #[configurable(derived)]
223 pub compression: ChronicleCompression,
224
225 #[configurable(derived)]
226 #[serde(default)]
227 pub request: TowerRequestConfig<ChronicleUnstructuredTowerRequestConfigDefaults>,
228
229 #[configurable(derived)]
230 pub tls: Option<TlsConfig>,
231
232 #[configurable(metadata(docs::examples = "WINDOWS_DNS", docs::examples = "{{ log_type }}"))]
239 pub log_type: Template,
240
241 #[configurable(metadata(docs::examples = "VECTOR_DEV"))]
243 pub fallback_log_type: Option<String>,
244
245 #[configurable(derived)]
246 #[serde(
247 default,
248 deserialize_with = "crate::serde::bool_or_struct",
249 skip_serializing_if = "crate::serde::is_default"
250 )]
251 acknowledgements: AcknowledgementsConfig,
252}
253
254fn chronicle_labels_examples() -> HashMap<String, String> {
255 let mut examples = HashMap::new();
256 examples.insert("source".to_string(), "vector".to_string());
257 examples.insert("tenant".to_string(), "marketing".to_string());
258 examples
259}
260
261impl GenerateConfig for ChronicleUnstructuredConfig {
262 fn generate_config() -> toml::Value {
263 toml::from_str(indoc! {r#"
264 credentials_path = "/path/to/credentials.json"
265 customer_id = "customer_id"
266 namespace = "namespace"
267 compression = "gzip"
268 log_type = "log_type"
269 fallback_log_type = "VECTOR_DEV"
270 encoding.codec = "text"
271 "#})
272 .unwrap()
273 }
274}
275
276pub fn build_healthcheck(
277 client: HttpClient,
278 base_url: &str,
279 auth: GcpAuthenticator,
280) -> crate::Result<Healthcheck> {
281 let uri = base_url.parse::<Uri>()?;
282
283 let healthcheck = async move {
284 let mut request = http::Request::get(&uri).body(Body::empty())?;
285 auth.apply(&mut request);
286
287 let response = client.send(request).await?;
288 healthcheck_response(response, GcsHealthcheckError::NotFound.into())
289 };
290
291 Ok(Box::pin(healthcheck))
292}
293
294#[derive(Debug, Snafu)]
295pub enum ChronicleError {
296 #[snafu(display("Region or endpoint not defined"))]
297 RegionOrEndpoint,
298 #[snafu(display("You can only specify one of region or endpoint"))]
299 BothRegionAndEndpoint,
300}
301
302#[async_trait::async_trait]
303#[typetag::serde(name = "gcp_chronicle_unstructured")]
304impl SinkConfig for ChronicleUnstructuredConfig {
305 async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
306 let creds = self.auth.build(Scope::MalachiteIngestion).await?;
307
308 let tls = TlsSettings::from_options(self.tls.as_ref())?;
309 let client = HttpClient::new(tls, cx.proxy())?;
310
311 let endpoint = self.create_endpoint("v2/unstructuredlogentries:batchCreate")?;
312
313 let healthcheck_endpoint = self.create_endpoint("v2/logtypes")?;
315
316 let healthcheck = build_healthcheck(client.clone(), &healthcheck_endpoint, creds.clone())?;
317 creds.spawn_regenerate_token();
318 let sink = self.build_sink(client, endpoint, creds)?;
319
320 Ok((sink, healthcheck))
321 }
322
323 fn input(&self) -> Input {
324 let requirement =
325 schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp());
326
327 Input::log().with_schema_requirement(requirement)
328 }
329
330 fn acknowledgements(&self) -> &AcknowledgementsConfig {
331 &self.acknowledgements
332 }
333}
334
335impl ChronicleUnstructuredConfig {
336 fn build_sink(
337 &self,
338 client: HttpClient,
339 base_url: String,
340 creds: GcpAuthenticator,
341 ) -> crate::Result<VectorSink> {
342 use crate::sinks::util::service::ServiceBuilderExt;
343
344 let request = self.request.into_settings();
345
346 let batch_settings = self.batch.into_batcher_settings()?;
347
348 let partitioner = self.partitioner()?;
349
350 let svc = ServiceBuilder::new()
351 .settings(request, GcsRetryLogic::default())
352 .service(ChronicleService::new(client, base_url, creds));
353
354 let request_settings = ChronicleRequestBuilder::new(self)?;
355
356 let sink = ChronicleSink::new(svc, request_settings, partitioner, batch_settings, "http");
357
358 Ok(VectorSink::from_event_streamsink(sink))
359 }
360
361 fn partitioner(&self) -> crate::Result<ChroniclePartitioner> {
362 Ok(ChroniclePartitioner::new(
363 self.log_type.clone(),
364 self.fallback_log_type.clone(),
365 self.namespace.clone(),
366 ))
367 }
368
369 fn create_endpoint(&self, path: &str) -> Result<String, ChronicleError> {
370 Ok(format!(
371 "{}/{}",
372 match (&self.endpoint, self.region) {
373 (Some(endpoint), None) => endpoint.trim_end_matches('/'),
374 (None, Some(region)) => region.endpoint(),
375 (Some(_), Some(_)) => return Err(ChronicleError::BothRegionAndEndpoint),
376 (None, None) => return Err(ChronicleError::RegionOrEndpoint),
377 },
378 path
379 ))
380 }
381}
382
383#[derive(Clone, Debug)]
384pub struct ChronicleRequest {
385 pub body: Bytes,
386 pub finalizers: EventFinalizers,
387 pub headers: HashMap<HeaderName, HeaderValue>,
388 metadata: RequestMetadata,
389}
390
391impl Finalizable for ChronicleRequest {
392 fn take_finalizers(&mut self) -> EventFinalizers {
393 std::mem::take(&mut self.finalizers)
394 }
395}
396
397impl MetaDescriptive for ChronicleRequest {
398 fn get_metadata(&self) -> &RequestMetadata {
399 &self.metadata
400 }
401
402 fn metadata_mut(&mut self) -> &mut RequestMetadata {
403 &mut self.metadata
404 }
405}
406
407#[derive(Clone, Debug, Serialize)]
408struct ChronicleRequestBody {
409 customer_id: String,
410 #[serde(skip_serializing_if = "Option::is_none")]
411 namespace: Option<String>,
412 #[serde(skip_serializing_if = "Option::is_none")]
413 labels: Option<Vec<Label>>,
414 log_type: String,
415 entries: Vec<serde_json::Value>,
416}
417
418#[derive(Clone, Debug)]
419struct ChronicleEncoder {
420 customer_id: String,
421 labels: Option<Vec<Label>>,
422 encoder: codecs::Encoder<()>,
423 transformer: codecs::Transformer,
424}
425
426impl Encoder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleEncoder {
427 fn encode_input(
428 &self,
429 input: (ChroniclePartitionKey, Vec<Event>),
430 writer: &mut dyn io::Write,
431 ) -> io::Result<(usize, GroupedCountByteSize)> {
432 let (key, events) = input;
433 let mut encoder = self.encoder.clone();
434 let mut byte_size = telemetry().create_request_count_byte_size();
435 let events = events
436 .into_iter()
437 .filter_map(|mut event| {
438 let timestamp = event
439 .as_log()
440 .get_timestamp()
441 .and_then(|ts| ts.as_timestamp())
442 .cloned();
443 let mut bytes = BytesMut::new();
444 self.transformer.transform(&mut event);
445
446 byte_size.add_event(&event, event.estimated_json_encoded_size_of());
447
448 encoder.encode(event, &mut bytes).ok()?;
449
450 let mut value = json!({
451 "log_text": String::from_utf8_lossy(&bytes),
452 });
453
454 if let Some(ts) = timestamp {
455 value.as_object_mut().unwrap().insert(
456 "ts_rfc3339".to_string(),
457 ts.to_rfc3339_opts(chrono::SecondsFormat::AutoSi, true)
458 .into(),
459 );
460 }
461
462 Some(value)
463 })
464 .collect::<Vec<_>>();
465
466 let json = json!(ChronicleRequestBody {
467 customer_id: self.customer_id.clone(),
468 namespace: key.namespace,
469 labels: self.labels.clone(),
470 log_type: key.log_type,
471 entries: events,
472 });
473
474 let size = as_tracked_write::<_, _, io::Error>(writer, &json, |writer, json| {
475 serde_json::to_writer(writer, json)?;
476 Ok(())
477 })?;
478
479 Ok((size, byte_size))
480 }
481}
482
483#[derive(Clone, Debug)]
487struct ChronicleRequestBuilder {
488 encoder: ChronicleEncoder,
489 compression: Compression,
490}
491
492struct ChronicleRequestPayload {
493 bytes: Bytes,
494}
495
496impl From<Bytes> for ChronicleRequestPayload {
497 fn from(bytes: Bytes) -> Self {
498 Self { bytes }
499 }
500}
501
502impl AsRef<[u8]> for ChronicleRequestPayload {
503 fn as_ref(&self) -> &[u8] {
504 self.bytes.as_ref()
505 }
506}
507
508impl RequestBuilder<(ChroniclePartitionKey, Vec<Event>)> for ChronicleRequestBuilder {
509 type Metadata = EventFinalizers;
510 type Events = (ChroniclePartitionKey, Vec<Event>);
511 type Encoder = ChronicleEncoder;
512 type Payload = ChronicleRequestPayload;
513 type Request = ChronicleRequest;
514 type Error = io::Error;
515
516 fn compression(&self) -> Compression {
517 self.compression
518 }
519
520 fn encoder(&self) -> &Self::Encoder {
521 &self.encoder
522 }
523
524 fn split_input(
525 &self,
526 input: (ChroniclePartitionKey, Vec<Event>),
527 ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) {
528 let (partition_key, mut events) = input;
529 let finalizers = events.take_finalizers();
530
531 let builder = RequestMetadataBuilder::from_events(&events);
532 (finalizers, builder, (partition_key, events))
533 }
534
535 fn build_request(
536 &self,
537 finalizers: Self::Metadata,
538 metadata: RequestMetadata,
539 payload: EncodeResult<Self::Payload>,
540 ) -> Self::Request {
541 let mut headers = HashMap::new();
542 headers.insert(
543 header::CONTENT_TYPE,
544 HeaderValue::from_static("application/json"),
545 );
546
547 match payload.compressed_byte_size {
548 Some(compressed_byte_size) => {
549 headers.insert(
550 header::CONTENT_LENGTH,
551 HeaderValue::from_str(&compressed_byte_size.to_string()).unwrap(),
552 );
553 headers.insert(
554 header::CONTENT_ENCODING,
555 HeaderValue::from_str(self.compression.content_encoding().unwrap()).unwrap(),
556 );
557 }
558 None => {
559 headers.insert(
560 header::CONTENT_LENGTH,
561 HeaderValue::from_str(&payload.uncompressed_byte_size.to_string()).unwrap(),
562 );
563 }
564 }
565
566 ChronicleRequest {
567 headers,
568 body: payload.into_payload().bytes,
569 finalizers,
570 metadata,
571 }
572 }
573}
574
575#[derive(Clone, Debug, Serialize)]
576struct Label {
577 key: String,
578 value: String,
579}
580
581impl ChronicleRequestBuilder {
582 fn new(config: &ChronicleUnstructuredConfig) -> crate::Result<Self> {
583 let transformer = config.encoding.transformer();
584 let serializer = config.encoding.config().build()?;
585 let compression = Compression::from(config.compression);
586 let encoder = crate::codecs::Encoder::<()>::new(serializer);
587 let encoder = ChronicleEncoder {
588 customer_id: config.customer_id.clone(),
589 labels: config.labels.as_ref().map(|labs| {
590 labs.iter()
591 .map(|(k, v)| Label {
592 key: k.to_string(),
593 value: v.to_string(),
594 })
595 .collect::<Vec<_>>()
596 }),
597 encoder,
598 transformer,
599 };
600 Ok(Self {
601 encoder,
602 compression,
603 })
604 }
605}
606
607#[derive(Debug, Clone)]
608pub struct ChronicleService {
609 client: HttpClient,
610 base_url: String,
611 creds: GcpAuthenticator,
612}
613
614impl ChronicleService {
615 pub const fn new(client: HttpClient, base_url: String, creds: GcpAuthenticator) -> Self {
616 Self {
617 client,
618 base_url,
619 creds,
620 }
621 }
622}
623
624#[derive(Debug, Snafu)]
625pub enum ChronicleResponseError {
626 #[snafu(display("Server responded with an error: {}", code))]
627 ServerError { code: StatusCode },
628 #[snafu(display("Failed to make HTTP(S) request: {}", error))]
629 HttpError { error: crate::http::HttpError },
630}
631
632impl Service<ChronicleRequest> for ChronicleService {
633 type Response = GcsResponse;
634 type Error = ChronicleResponseError;
635 type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
636
637 fn poll_ready(&mut self, _: &mut std::task::Context<'_>) -> Poll<Result<(), Self::Error>> {
638 Poll::Ready(Ok(()))
639 }
640
641 fn call(&mut self, request: ChronicleRequest) -> Self::Future {
642 let mut builder = Request::post(&self.base_url);
643 let metadata = request.get_metadata().clone();
644
645 let headers = builder.headers_mut().unwrap();
646 for (name, value) in request.headers {
647 headers.insert(name, value);
648 }
649
650 let mut http_request = builder.body(Body::from(request.body)).unwrap();
651 self.creds.apply(&mut http_request);
652
653 let mut client = self.client.clone();
654 Box::pin(async move {
655 match client.call(http_request).await {
656 Ok(response) => {
657 let status = response.status();
658 if status.is_success() {
659 Ok(GcsResponse {
660 inner: response,
661 metadata,
662 })
663 } else {
664 Err(ChronicleResponseError::ServerError { code: status })
665 }
666 }
667 Err(error) => Err(ChronicleResponseError::HttpError { error }),
668 }
669 })
670 }
671}
672
673#[cfg(all(test, feature = "chronicle-integration-tests"))]
674mod integration_tests {
675 use reqwest::{Client, Method, Response};
676 use serde::{Deserialize, Serialize};
677 use vector_lib::event::{BatchNotifier, BatchStatus};
678
679 use super::*;
680 use crate::test_util::{
681 components::{
682 COMPONENT_ERROR_TAGS, SINK_TAGS, run_and_assert_sink_compliance,
683 run_and_assert_sink_error,
684 },
685 random_events_with_stream, random_string, trace_init,
686 };
687
688 const ADDRESS_ENV_VAR: &str = "CHRONICLE_ADDRESS";
689
690 fn config(log_type: &str, auth_path: &str) -> ChronicleUnstructuredConfig {
691 let address = std::env::var(ADDRESS_ENV_VAR).unwrap();
692 let config = format!(
693 indoc! { r#"
694 endpoint = "{}"
695 customer_id = "customer id"
696 namespace = "namespace"
697 credentials_path = "{}"
698 log_type = "{}"
699 encoding.codec = "text"
700 "# },
701 address, auth_path, log_type
702 );
703
704 let config: ChronicleUnstructuredConfig = toml::from_str(&config).unwrap();
705 config
706 }
707
708 async fn config_build(
709 log_type: &str,
710 auth_path: &str,
711 ) -> crate::Result<(VectorSink, crate::sinks::Healthcheck)> {
712 let cx = SinkContext::default();
713 config(log_type, auth_path).build(cx).await
714 }
715
716 #[tokio::test]
717 async fn publish_events() {
718 trace_init();
719
720 let log_type = random_string(10);
721 let (sink, healthcheck) =
722 config_build(&log_type, "/home/vector/scripts/integration/gcp/auth.json")
723 .await
724 .expect("Building sink failed");
725
726 healthcheck.await.expect("Health check failed");
727
728 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
729 let (input, events) = random_events_with_stream(100, 100, Some(batch));
730 run_and_assert_sink_compliance(sink, events, &SINK_TAGS).await;
731 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Delivered));
732
733 let response = pull_messages(&log_type).await;
734 let messages = response
735 .into_iter()
736 .map(|message| message.log_text)
737 .collect::<Vec<_>>();
738 assert_eq!(input.len(), messages.len());
739 for i in 0..input.len() {
740 let data = serde_json::to_value(&messages[i]).unwrap();
741 let expected = serde_json::to_value(input[i].as_log().get("message").unwrap()).unwrap();
742 assert_eq!(data, expected);
743 }
744 }
745
746 #[tokio::test]
747 async fn invalid_credentials() {
748 trace_init();
749
750 let log_type = random_string(10);
751 let sink = config_build(
753 &log_type,
754 "/home/vector/scripts/integration/gcp/invalidauth.json",
755 )
756 .await;
757
758 assert!(sink.is_err())
759 }
760
761 #[tokio::test]
762 async fn publish_invalid_events() {
763 trace_init();
764
765 let log_type = "INVALID";
768 let (sink, healthcheck) =
769 config_build(log_type, "/home/vector/scripts/integration/gcp/auth.json")
770 .await
771 .expect("Building sink failed");
772
773 healthcheck.await.expect("Health check failed");
774
775 let (batch, mut receiver) = BatchNotifier::new_with_receiver();
776 let (_input, events) = random_events_with_stream(100, 100, Some(batch));
777 run_and_assert_sink_error(sink, events, &COMPONENT_ERROR_TAGS).await;
778 assert_eq!(receiver.try_recv(), Ok(BatchStatus::Rejected));
779 }
780
781 #[derive(Clone, Debug, Deserialize, Serialize)]
782 pub struct Log {
783 customer_id: String,
784 namespace: String,
785 log_type: String,
786 log_text: String,
787 ts_rfc3339: String,
788 }
789
790 async fn request(method: Method, path: &str, log_type: &str) -> Response {
791 let address = std::env::var(ADDRESS_ENV_VAR).unwrap();
792 let url = format!("{address}/{path}");
793 Client::new()
794 .request(method.clone(), &url)
795 .query(&[("log_type", log_type)])
796 .send()
797 .await
798 .unwrap_or_else(|_| panic!("Sending {method} request to {url} failed"))
799 }
800
801 async fn pull_messages(log_type: &str) -> Vec<Log> {
802 request(Method::GET, "logs", log_type)
803 .await
804 .json::<Vec<Log>>()
805 .await
806 .expect("Extracting pull data failed")
807 }
808}