1use std::{sync::Arc, time::Duration};
2
3use bytes::{Bytes, BytesMut};
4use chrono::Utc;
5use futures::StreamExt as _;
6use futures_util::{FutureExt, Stream, stream};
7use http::Uri;
8use hyper::{Body, Request};
9use percent_encoding::utf8_percent_encode;
10use serde_with::serde_as;
11use tokio::sync::Mutex;
12use tokio_stream::wrappers::IntervalStream;
13use tokio_util::codec::Decoder as _;
14use vector_lib::{
15 EstimatedJsonEncodedSizeOf,
16 codecs::{
17 JsonDeserializerConfig, StreamDecodingError,
18 decoding::{DeserializerConfig, FramingConfig},
19 },
20 config::{LogNamespace, SourceOutput, proxy::ProxyConfig},
21 configurable::configurable_component,
22 event::Event,
23 json_size::JsonSize,
24 shutdown::ShutdownSignal,
25 tls::TlsConfig,
26};
27
28use crate::{
29 SourceSender,
30 codecs::{Decoder, DecodingConfig},
31 config::{SourceConfig, SourceContext},
32 http::{HttpClient, HttpError},
33 internal_events::{
34 DecoderDeserializeError, EndpointBytesReceived, HttpClientEventsReceived,
35 HttpClientHttpError, HttpClientHttpResponseError, StreamClosedError,
36 },
37 sources,
38 sources::util::http_client::{default_interval, default_timeout, warn_if_interval_too_low},
39 tls::TlsSettings,
40};
41
42#[serde_as]
44#[configurable_component(source("okta", "Pull Okta system logs via the Okta API",))]
45#[derive(Clone, Debug)]
46pub struct OktaConfig {
47 #[configurable(metadata(docs::examples = "foo.okta.com"))]
49 pub domain: String,
50
51 #[configurable(metadata(docs::examples = "00xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"))]
53 pub token: String,
54
55 #[serde(default = "default_interval")]
59 #[serde_as(as = "serde_with::DurationSeconds<u64>")]
60 #[serde(rename = "scrape_interval_secs")]
61 #[configurable(metadata(docs::human_name = "Scrape Interval"))]
62 pub interval: Duration,
63
64 #[serde(default = "default_timeout")]
66 #[serde_as(as = "serde_with::DurationSecondsWithFrac<f64>")]
67 #[serde(rename = "scrape_timeout_secs")]
68 #[configurable(metadata(docs::human_name = "Scrape Timeout"))]
69 pub timeout: Duration,
70
71 #[configurable(metadata(docs::human_name = "Since (seconds before now)"))]
74 pub since: Option<u64>,
75
76 #[configurable(derived)]
78 pub tls: Option<TlsConfig>,
79
80 #[configurable(metadata(docs::hidden))]
82 #[serde(default)]
83 pub log_namespace: Option<bool>,
84}
85
86impl Default for OktaConfig {
87 fn default() -> Self {
88 Self {
89 domain: "".to_string(),
90 token: "".to_string(),
91 interval: default_interval(),
92 timeout: default_timeout(),
93 since: None,
94 tls: None,
95 log_namespace: None,
96 }
97 }
98}
99
100impl_generate_config_from_default!(OktaConfig);
101
102fn find_rel_next_link(header: &str) -> Option<String> {
103 for part in header.split(',') {
104 let relpart: Vec<_> = part.split(';').collect();
105 if let Some(url) = relpart
106 .first()
107 .map(|s| s.trim().trim_matches(|c| c == '<' || c == '>'))
108 && part.contains("rel=\"next\"")
109 {
110 return Some(url.to_string());
111 }
112 }
113 None
114}
115
116#[async_trait::async_trait]
117#[typetag::serde(name = "okta")]
118impl SourceConfig for OktaConfig {
119 async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
120 let since = match self.since {
121 Some(since) => Utc::now() - Duration::from_secs(since),
122 _ => Utc::now(),
123 };
124
125 let path_and_query = format!(
126 "/api/v1/logs?since={}",
127 utf8_percent_encode(&since.to_rfc3339(), percent_encoding::NON_ALPHANUMERIC)
128 );
129
130 let mut url_parts = Uri::try_from(&self.domain)
131 .map_err(|_| {
132 format!(
133 "Invalid domain: {}. Must be a valid Okta subdomain.",
134 self.domain
135 )
136 })?
137 .into_parts();
138
139 url_parts.path_and_query = Some(path_and_query.parse()?);
140 if url_parts.scheme.is_none() {
141 url_parts.scheme = Some(http::uri::Scheme::HTTPS);
142 }
143
144 let url = Uri::from_parts(url_parts).map_err(|_| {
145 format!(
146 "Invalid domain: {}. Must be a valid Okta subdomain.",
147 self.domain
148 )
149 })?;
150
151 let tls = TlsSettings::from_options(self.tls.as_ref())?;
152
153 let log_namespace = cx.log_namespace(self.log_namespace);
154
155 warn_if_interval_too_low(self.timeout, self.interval);
156
157 Ok(run(
158 url,
159 tls,
160 cx.proxy,
161 self.token.clone(),
162 self.interval,
163 self.timeout,
164 log_namespace,
165 cx.shutdown,
166 cx.out,
167 )
168 .boxed())
169 }
170
171 fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
172 let log_namespace = global_log_namespace.merge(self.log_namespace);
175
176 vec![SourceOutput::new_maybe_logs(
177 JsonDeserializerConfig::default().output_type(),
178 JsonDeserializerConfig::default().schema_definition(log_namespace),
179 )]
180 }
181
182 fn can_acknowledge(&self) -> bool {
183 false
184 }
185}
186
187fn enrich_events(events: &mut Vec<Event>, log_namespace: LogNamespace) {
188 let now = Utc::now();
189 for event in events {
190 log_namespace.insert_standard_vector_source_metadata(
191 event.as_mut_log(),
192 OktaConfig::NAME,
193 now,
194 );
195 }
196}
197
198type OktaRunResult =
199 Result<(http::response::Parts, Bytes, Option<Uri>), Box<dyn std::error::Error + Send + Sync>>;
200
201type OktaTimeoutResult =
202 Result<Result<http::Response<Body>, HttpError>, tokio::time::error::Elapsed>;
203
204async fn run_once(url: String, result: OktaTimeoutResult, timeout: Duration) -> OktaRunResult {
205 let mut next: Option<Uri> = None;
206 match result {
207 Ok(Ok(response)) => {
208 let (header, body) = response.into_parts();
209 if let Some(next_url) = header
210 .headers
211 .get_all("link")
212 .iter()
213 .filter_map(|v| v.to_str().ok())
214 .filter_map(find_rel_next_link)
215 .next()
216 .and_then(|next| Uri::try_from(next).ok())
217 {
218 next = Some(next_url);
219 };
220
221 let body = hyper::body::to_bytes(body).await?;
222
223 emit!(EndpointBytesReceived {
224 byte_size: body.len(),
225 protocol: "http",
226 endpoint: &url,
227 });
228 Ok((header, body, next))
229 }
230 Ok(Err(error)) => Err(error.into()),
231 Err(_) => Err(format!("Timeout error: request exceeded {}s", timeout.as_secs_f64()).into()),
232 }
233}
234
235fn handle_response(
236 response: OktaRunResult,
237 decoder: Decoder,
238 log_namespace: LogNamespace,
239 url: String,
240) -> Option<impl Stream<Item = Event> + Send + use<>> {
241 match response {
242 Ok((header, body, _)) if header.status == hyper::StatusCode::OK => {
243 let mut buf = BytesMut::new();
244 buf.extend_from_slice(&body);
245 let mut events = decode_events(&mut buf, decoder);
246 let byte_size = if events.is_empty() {
247 JsonSize::zero()
248 } else {
249 events.estimated_json_encoded_size_of()
250 };
251
252 emit!(HttpClientEventsReceived {
253 byte_size,
254 count: events.len(),
255 url,
256 });
257
258 if events.is_empty() {
259 return None;
260 }
261
262 enrich_events(&mut events, log_namespace);
263
264 Some(stream::iter(events))
265 }
266 Ok((header, _, _)) => {
267 emit!(HttpClientHttpResponseError {
268 code: header.status,
269 url,
270 });
271 None
272 }
273 Err(error) => {
274 emit!(HttpClientHttpError { error, url });
275 None
276 }
277 }
278}
279
280#[allow(clippy::too_many_arguments)] async fn run(
289 url: Uri,
290 tls: TlsSettings,
291 proxy: ProxyConfig,
292 token: String,
293 interval: Duration,
294 timeout: Duration,
295 log_namespace: LogNamespace,
296 shutdown: ShutdownSignal,
297 mut out: SourceSender,
298) -> Result<(), ()> {
299 let url_mutex = Arc::new(Mutex::new(url.clone()));
300 let decoder = DecodingConfig::new(
301 FramingConfig::Bytes,
302 DeserializerConfig::Json(JsonDeserializerConfig::default()),
303 log_namespace,
304 )
305 .build()
306 .map_err(|ref e| {
307 emit!(DecoderDeserializeError { error: e });
308 })?;
309
310 let client = HttpClient::new(tls, &proxy).map_err(|e| {
311 emit!(HttpClientHttpError {
312 error: Box::new(e),
313 url: url.to_string()
314 });
315 })?;
316
317 let mut stream = IntervalStream::new(tokio::time::interval(interval))
318 .take_until(shutdown)
319 .then(move |_| {
320 let client = client.clone();
321 let url_mutex = Arc::clone(&url_mutex);
322 let token = token.clone();
323 let decoder = decoder.clone();
324
325 async move {
326 stream::unfold((), move |_| {
327 let url_mutex = Arc::clone(&url_mutex);
328 let token = token.clone();
329 let decoder = decoder.clone();
330 let client = client.clone();
331
332 async move {
333 let (run_url, response): (String, OktaRunResult) = {
334 let mut url_lock = url_mutex.lock().await;
337 let url = url_lock.to_string();
338
339 let mut request = match Request::get(&url).body(Body::empty()) {
340 Ok(request) => request,
341 Err(e) => {
342 emit!(HttpClientHttpError {
343 error: e.into(),
344 url: url.clone(),
345 });
346 return None;
347 }
348 };
349
350 let headers = request.headers_mut();
351 headers.insert(
352 http::header::AUTHORIZATION,
353 format!("SSWS {token}").parse().unwrap(),
354 );
355 headers
356 .insert(http::header::ACCEPT, "application/json".parse().unwrap());
357 headers.insert(
358 http::header::CONTENT_TYPE,
359 "application/json".parse().unwrap(),
360 );
361
362 let client = client.clone();
363 let response = tokio::time::timeout(timeout, client.send(request))
364 .then({
365 let url = url.clone();
366 move |result| run_once(url, result, timeout)
367 })
368 .await;
369
370 if let Ok((_, _, Some(ref next))) = response {
371 *url_lock = next.clone();
372 }
373 let new_url = url_lock.to_string();
374
375 (new_url, response)
376 };
377
378 handle_response(response, decoder, log_namespace, run_url)
379 .map(|events| (events, ()))
380 }
381 })
382 .flatten()
383 .boxed()
384 }
385 })
386 .flatten_unordered(None)
387 .boxed();
388
389 match out.send_event_stream(&mut stream).await {
390 Ok(()) => {
391 debug!("Finished sending.");
392 Ok(())
393 }
394 Err(_) => {
395 let (count, _) = stream.size_hint();
396 emit!(StreamClosedError { count });
397 Err(())
398 }
399 }
400}
401
402fn decode_events(buf: &mut BytesMut, mut decoder: Decoder) -> Vec<Event> {
403 let mut events = Vec::new();
404 loop {
405 match decoder.decode_eof(buf) {
406 Ok(Some((next, _))) => {
407 events.extend(next);
408 }
409 Ok(None) => break,
410 Err(error) => {
411 if !error.can_continue() {
414 break;
415 }
416 break;
417 }
418 }
419 }
420 events
421}