diff --git a/Cargo.lock b/Cargo.lock index 9a0139f110..9847e7ef66 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2381,6 +2381,8 @@ dependencies = [ "async-trait", "chrono", "deadpool-postgres", + "futures 0.3.31", + "itertools 0.14.0", "metrics", "rand 0.8.5", "rusqlite", @@ -13469,6 +13471,7 @@ dependencies = [ "hyper 1.4.1", "hyper-util", "inventory", + "ipnet", "libgssapi", "memchr", "metrics", diff --git a/lib/observo/chkpts/Cargo.toml b/lib/observo/chkpts/Cargo.toml index 434efc7715..95a3008cdb 100644 --- a/lib/observo/chkpts/Cargo.toml +++ b/lib/observo/chkpts/Cargo.toml @@ -27,6 +27,8 @@ rusqlite.workspace = true toml.workspace = true tokio.workspace = true tracing.workspace = true +futures.workspace = true +itertools.workspace = true [features] default = [] diff --git a/lib/observo/private b/lib/observo/private index 9005c77c59..79ffa9eea2 160000 --- a/lib/observo/private +++ b/lib/observo/private @@ -1 +1 @@ -Subproject commit 9005c77c5914342729d95905ee98232d38fdccbb +Subproject commit 79ffa9eea2302aa204ede33611768412cd5fc250 diff --git a/lib/observo/wef/Cargo.toml b/lib/observo/wef/Cargo.toml index 2d9dcec357..23f4484ab8 100644 --- a/lib/observo/wef/Cargo.toml +++ b/lib/observo/wef/Cargo.toml @@ -41,6 +41,7 @@ httparse = "1" mime = "0.3" memchr = "2" metrics.workspace = true +ipnet = { version = "2", default-features = false, features = ["serde"] } socket2 = "0.5" chrono.workspace = true tracing.workspace = true diff --git a/lib/vector-core/src/config/global_options.rs b/lib/vector-core/src/config/global_options.rs index 7c6ea2c074..ef3a08987a 100644 --- a/lib/vector-core/src/config/global_options.rs +++ b/lib/vector-core/src/config/global_options.rs @@ -3,6 +3,7 @@ use std::{fs::DirBuilder, path::PathBuf, time::Duration}; use snafu::{ResultExt, Snafu}; use vector_common::TimeZone; use crate::chkpts::StoreConfig as CheckpointConfig; +use crate::ipallowlist::IpAllowlistConfig; use vector_config::configurable_component; use super::super::default_data_dir; @@ -108,6 +109,12 @@ pub struct GlobalOptions { /// Configuration for the checkpoint store. #[serde(skip_serializing_if = "crate::serde::is_default", default)] pub checkpoint: CheckpointConfig, + + /// List of allowed origin IP networks applied to all sources that support IP allowlisting. + /// Acts as a site-level default; a source's own `permit_origin` setting takes precedence when set. + #[configurable(derived)] + #[serde(default, skip_serializing_if = "Option::is_none")] + pub permit_origin: Option, } impl GlobalOptions { @@ -233,6 +240,10 @@ impl GlobalOptions { } }; + if conflicts(self.permit_origin.as_ref(), with.permit_origin.as_ref()) { + errors.push("conflicting values for 'permit_origin' found".to_owned()); + } + if errors.is_empty() { Ok(Self { data_dir, @@ -244,6 +255,7 @@ impl GlobalOptions { expire_metrics: self.expire_metrics.or(with.expire_metrics), expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs), checkpoint, + permit_origin: self.permit_origin.clone().or(with.permit_origin), }) } else { Err(errors) diff --git a/src/config/source.rs b/src/config/source.rs index cd2231001e..e1790938ae 100644 --- a/src/config/source.rs +++ b/src/config/source.rs @@ -14,6 +14,7 @@ use vector_lib::{ AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig, SourceOutput, }, + ipallowlist::IpAllowlistConfig, source::Source, }; @@ -229,6 +230,17 @@ impl SourceContext { .enabled() } + /// Returns the effective `permit_origin` for this source. + /// + /// If the source provides its own `permit_origin`, that value is used. + /// Otherwise the site-level `permit_origin` from `GlobalOptions` is used as a fallback. + pub fn effective_permit_origin( + &self, + source_permit_origin: Option, + ) -> Option { + source_permit_origin.or_else(|| self.globals.permit_origin.clone()) + } + /// Gets the log namespacing to use. The passed in value is from the source itself /// and will override any global default if it's set. pub fn log_namespace(&self, namespace: Option) -> LogNamespace { diff --git a/src/sources/aws_kinesis_firehose/mod.rs b/src/sources/aws_kinesis_firehose/mod.rs index 4788692219..60dc4d9288 100644 --- a/src/sources/aws_kinesis_firehose/mod.rs +++ b/src/sources/aws_kinesis_firehose/mod.rs @@ -162,6 +162,7 @@ impl SourceConfig for AwsKinesisFirehoseConfig { .build()?; let acknowledgements = cx.do_acknowledgements(self.acknowledgements); + let allowlist = cx.effective_permit_origin(self.permit_origin.clone()).map(Into::into); if self.access_key.is_some() { warn!("DEPRECATION `access_key`, use `access_keys` instead.") @@ -187,7 +188,7 @@ impl SourceConfig for AwsKinesisFirehoseConfig { let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?; let listener = tls.bind(&self.address).await?; let listener = listener - .with_allowlist(self.permit_origin.clone().map(Into::into)); + .with_allowlist(allowlist); let keepalive_settings = self.keepalive.clone(); let shutdown = cx.shutdown; diff --git a/src/sources/datadog_agent/mod.rs b/src/sources/datadog_agent/mod.rs index 7b3b3f85c7..bf97d3bd36 100644 --- a/src/sources/datadog_agent/mod.rs +++ b/src/sources/datadog_agent/mod.rs @@ -197,7 +197,7 @@ impl SourceConfig for DatadogAgentConfig { ); let listener = tls.bind(&self.address).await?; let listener = listener - .with_allowlist(self.permit_origin.clone().map(Into::into)); + .with_allowlist(cx.effective_permit_origin(self.permit_origin.clone()).map(Into::into)); let acknowledgements = cx.do_acknowledgements(self.acknowledgements); let filters = source.build_warp_filters(cx.out, acknowledgements, self)?; let shutdown = cx.shutdown; diff --git a/src/sources/fluent/mod.rs b/src/sources/fluent/mod.rs index 47afb99ded..8f033eefac 100644 --- a/src/sources/fluent/mod.rs +++ b/src/sources/fluent/mod.rs @@ -49,7 +49,7 @@ pub struct FluentConfig { connection_limit: Option, #[configurable(derived)] - keepalive: Option, + keepalive: Option, #[configurable(derived)] pub permit_origin: Option, @@ -104,6 +104,7 @@ impl SourceConfig for FluentConfig { .and_then(|tls| tls.client_metadata_key.clone()) .and_then(|k| k.path); let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?; + let allowlist = cx.effective_permit_origin(self.permit_origin.clone()).map(Into::into); source.run( self.address, self.keepalive, @@ -115,7 +116,7 @@ impl SourceConfig for FluentConfig { cx, self.acknowledgements, self.connection_limit, - self.permit_origin.clone().map(Into::into), + allowlist, FluentConfig::NAME, log_namespace, ) diff --git a/src/sources/heroku_logs.rs b/src/sources/heroku_logs.rs index 7373117bd6..27943ad5f0 100644 --- a/src/sources/heroku_logs.rs +++ b/src/sources/heroku_logs.rs @@ -196,6 +196,7 @@ impl SourceConfig for LogplexConfig { log_namespace, }; + let permit_origin = cx.effective_permit_origin(self.permit_origin.clone()); source.run( self.address, "events", @@ -207,7 +208,7 @@ impl SourceConfig for LogplexConfig { cx, self.acknowledgements, self.keepalive.clone(), - self.permit_origin.clone(), + permit_origin, ) } diff --git a/src/sources/http_server.rs b/src/sources/http_server.rs index 59599a17c5..c156449256 100644 --- a/src/sources/http_server.rs +++ b/src/sources/http_server.rs @@ -378,6 +378,7 @@ impl SourceConfig for SimpleHttpConfig { decoder, log_namespace, }; + let permit_origin = cx.effective_permit_origin(self.permit_origin.clone()); source.run( self.address, self.path.as_str(), @@ -389,7 +390,7 @@ impl SourceConfig for SimpleHttpConfig { cx, self.acknowledgements, self.keepalive.clone(), - self.permit_origin.clone(), + permit_origin, ) } diff --git a/src/sources/logstash.rs b/src/sources/logstash.rs index f5682f4464..1cea29bb29 100644 --- a/src/sources/logstash.rs +++ b/src/sources/logstash.rs @@ -156,6 +156,7 @@ impl SourceConfig for LogstashConfig { .and_then(|k| k.path); let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?; + let allowlist = cx.effective_permit_origin(self.permit_origin.clone()).map(Into::into); source.run( self.address, self.keepalive, @@ -167,7 +168,7 @@ impl SourceConfig for LogstashConfig { cx, self.acknowledgements, self.connection_limit, - self.permit_origin.clone().map(Into::into), + allowlist, LogstashConfig::NAME, log_namespace, ) diff --git a/src/sources/opentelemetry/mod.rs b/src/sources/opentelemetry/mod.rs index ff750040a8..95240054e6 100644 --- a/src/sources/opentelemetry/mod.rs +++ b/src/sources/opentelemetry/mod.rs @@ -174,6 +174,7 @@ impl SourceConfig for OpentelemetryConfig { let acknowledgements = cx.do_acknowledgements(self.acknowledgements); let events_received = register!(EventsReceived); let log_namespace = cx.log_namespace(self.log_namespace); + let permit_origin = cx.effective_permit_origin(self.permit_origin.clone()); let log_service = LogsServiceServer::new(Service { pipeline: cx.out.clone(), @@ -216,7 +217,7 @@ impl SourceConfig for OpentelemetryConfig { grpc_tls_settings, builder.routes(), cx.shutdown.clone(), - self.permit_origin.clone(), + permit_origin.clone(), ) .map_err(|error| { error!(message = "Source future failed.", %error); @@ -245,7 +246,7 @@ impl SourceConfig for OpentelemetryConfig { filters, cx.shutdown, http_config.keepalive.clone(), - self.permit_origin.clone(), + permit_origin, )) } else { None diff --git a/src/sources/prometheus/pushgateway.rs b/src/sources/prometheus/pushgateway.rs index 768575962b..b5974c583b 100644 --- a/src/sources/prometheus/pushgateway.rs +++ b/src/sources/prometheus/pushgateway.rs @@ -112,6 +112,7 @@ impl SourceConfig for PrometheusPushgatewayConfig { let source = PushgatewaySource { aggregate_metrics: self.aggregate_metrics, }; + let permit_origin = cx.effective_permit_origin(self.permit_origin.clone()); source.run( self.address, "", @@ -123,7 +124,7 @@ impl SourceConfig for PrometheusPushgatewayConfig { cx, self.acknowledgements, self.keepalive.clone(), - self.permit_origin.clone(), + permit_origin, ) } diff --git a/src/sources/prometheus/remote_write.rs b/src/sources/prometheus/remote_write.rs index 4ba6b14498..e1c74dc0a0 100644 --- a/src/sources/prometheus/remote_write.rs +++ b/src/sources/prometheus/remote_write.rs @@ -101,6 +101,7 @@ impl GenerateConfig for PrometheusRemoteWriteConfig { impl SourceConfig for PrometheusRemoteWriteConfig { async fn build(&self, cx: SourceContext) -> crate::Result { let source = RemoteWriteSource; + let permit_origin = cx.effective_permit_origin(self.permit_origin.clone()); source.run( self.address, "", @@ -112,7 +113,7 @@ impl SourceConfig for PrometheusRemoteWriteConfig { cx, self.acknowledgements, self.keepalive.clone(), - self.permit_origin.clone(), + permit_origin, ) } diff --git a/src/sources/socket/mod.rs b/src/sources/socket/mod.rs index 9159161b26..8c32247b8e 100644 --- a/src/sources/socket/mod.rs +++ b/src/sources/socket/mod.rs @@ -132,6 +132,7 @@ impl SourceConfig for SocketConfig { .and_then(|tls| tls.client_metadata_key.clone()) .and_then(|k| k.path); let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?; + let allowlist = cx.effective_permit_origin(config.permit_origin.clone()).map(Into::into); tcp.run( config.address(), config.keepalive(), @@ -143,12 +144,12 @@ impl SourceConfig for SocketConfig { cx, false.into(), config.connection_limit, - config.permit_origin.map(Into::into), + allowlist, SocketConfig::NAME, log_namespace, ) } - Mode::Udp(config) => { + Mode::Udp(mut config) => { let log_namespace = cx.log_namespace(config.log_namespace); let decoding = config.decoding().clone(); let framing = config @@ -156,6 +157,7 @@ impl SourceConfig for SocketConfig { .clone() .unwrap_or_else(|| decoding.default_message_based_framing()); let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?; + config.permit_origin = cx.effective_permit_origin(config.permit_origin); Ok(udp::udp( config, decoder, diff --git a/src/sources/splunk_hec/mod.rs b/src/sources/splunk_hec/mod.rs index d313c34cee..b646aec3f9 100644 --- a/src/sources/splunk_hec/mod.rs +++ b/src/sources/splunk_hec/mod.rs @@ -154,6 +154,7 @@ impl SourceConfig for SplunkConfig { let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?; let shutdown = cx.shutdown.clone(); let out = cx.out.clone(); + let allowlist = cx.effective_permit_origin(self.permit_origin.clone()).map(Into::into); let source = SplunkSource::new(self, tls.http_protocol_name(), cx); let event_service = source.event_service(out.clone()); @@ -178,7 +179,7 @@ impl SourceConfig for SplunkConfig { let listener = tls.bind(&self.address).await?; let listener = listener - .with_allowlist(self.permit_origin.clone().map(Into::into)); + .with_allowlist(allowlist); let keepalive_settings = self.keepalive.clone(); Ok(Box::pin(async move { diff --git a/src/sources/statsd/mod.rs b/src/sources/statsd/mod.rs index c573b8d86a..c76c27cecb 100644 --- a/src/sources/statsd/mod.rs +++ b/src/sources/statsd/mod.rs @@ -184,7 +184,7 @@ impl SourceConfig for StatsdConfig { let statsd_tcp_source = StatsdTcpSource { sanitize: config.sanitize, }; - + let allowlist = cx.effective_permit_origin(config.permit_origin.clone()).map(Into::into); statsd_tcp_source.run( config.address, config.keepalive, @@ -196,7 +196,7 @@ impl SourceConfig for StatsdConfig { cx, false.into(), config.connection_limit, - config.permit_origin.clone().map(Into::into), + allowlist, StatsdConfig::NAME, LogNamespace::Legacy, ) diff --git a/src/sources/syslog.rs b/src/sources/syslog.rs index ab2b2741cf..59a6147b55 100644 --- a/src/sources/syslog.rs +++ b/src/sources/syslog.rs @@ -196,6 +196,7 @@ impl SourceConfig for SyslogConfig { .and_then(|tls| tls.client_metadata_key.clone()) .and_then(|k| k.path); let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?; + let allowlist = cx.effective_permit_origin(permit_origin).map(Into::into); source.run( address, keepalive, @@ -207,7 +208,7 @@ impl SourceConfig for SyslogConfig { cx, false.into(), connection_limit, - permit_origin.map(Into::into), + allowlist, SyslogConfig::NAME, log_namespace, )