Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions lib/observo/chkpts/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ rusqlite.workspace = true
toml.workspace = true
tokio.workspace = true
tracing.workspace = true
futures.workspace = true
itertools.workspace = true

[features]
default = []
Expand Down
2 changes: 1 addition & 1 deletion lib/observo/private
1 change: 1 addition & 0 deletions lib/observo/wef/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ httparse = "1"
mime = "0.3"
memchr = "2"
metrics.workspace = true
ipnet = { version = "2", default-features = false, features = ["serde"] }
socket2 = "0.5"
chrono.workspace = true
tracing.workspace = true
Expand Down
12 changes: 12 additions & 0 deletions lib/vector-core/src/config/global_options.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::{fs::DirBuilder, path::PathBuf, time::Duration};
use snafu::{ResultExt, Snafu};
use vector_common::TimeZone;
use crate::chkpts::StoreConfig as CheckpointConfig;
use crate::ipallowlist::IpAllowlistConfig;
use vector_config::configurable_component;

use super::super::default_data_dir;
Expand Down Expand Up @@ -108,6 +109,12 @@ pub struct GlobalOptions {
/// Configuration for the checkpoint store.
#[serde(skip_serializing_if = "crate::serde::is_default", default)]
pub checkpoint: CheckpointConfig,

/// List of allowed origin IP networks applied to all sources that support IP allowlisting.
/// Acts as a site-level default; a source's own `permit_origin` setting takes precedence when set.
#[configurable(derived)]
#[serde(default, skip_serializing_if = "Option::is_none")]
pub permit_origin: Option<IpAllowlistConfig>,
}

impl GlobalOptions {
Expand Down Expand Up @@ -233,6 +240,10 @@ impl GlobalOptions {
}
};

if conflicts(self.permit_origin.as_ref(), with.permit_origin.as_ref()) {
errors.push("conflicting values for 'permit_origin' found".to_owned());
}

if errors.is_empty() {
Ok(Self {
data_dir,
Expand All @@ -244,6 +255,7 @@ impl GlobalOptions {
expire_metrics: self.expire_metrics.or(with.expire_metrics),
expire_metrics_secs: self.expire_metrics_secs.or(with.expire_metrics_secs),
checkpoint,
permit_origin: self.permit_origin.clone().or(with.permit_origin),
})
} else {
Err(errors)
Expand Down
12 changes: 12 additions & 0 deletions src/config/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use vector_lib::{
AcknowledgementsConfig, GlobalOptions, LogNamespace, SourceAcknowledgementsConfig,
SourceOutput,
},
ipallowlist::IpAllowlistConfig,
source::Source,
};

Expand Down Expand Up @@ -229,6 +230,17 @@ impl SourceContext {
.enabled()
}

/// Returns the effective `permit_origin` for this source.
///
/// If the source provides its own `permit_origin`, that value is used.
/// Otherwise the site-level `permit_origin` from `GlobalOptions` is used as a fallback.
pub fn effective_permit_origin(
&self,
source_permit_origin: Option<IpAllowlistConfig>,
) -> Option<IpAllowlistConfig> {
source_permit_origin.or_else(|| self.globals.permit_origin.clone())
}

/// Gets the log namespacing to use. The passed in value is from the source itself
/// and will override any global default if it's set.
pub fn log_namespace(&self, namespace: Option<bool>) -> LogNamespace {
Expand Down
3 changes: 2 additions & 1 deletion src/sources/aws_kinesis_firehose/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ impl SourceConfig for AwsKinesisFirehoseConfig {
.build()?;

let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
let allowlist = cx.effective_permit_origin(self.permit_origin.clone()).map(Into::into);

if self.access_key.is_some() {
warn!("DEPRECATION `access_key`, use `access_keys` instead.")
Expand All @@ -187,7 +188,7 @@ impl SourceConfig for AwsKinesisFirehoseConfig {
let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
let listener = tls.bind(&self.address).await?;
let listener = listener
.with_allowlist(self.permit_origin.clone().map(Into::into));
.with_allowlist(allowlist);

let keepalive_settings = self.keepalive.clone();
let shutdown = cx.shutdown;
Expand Down
2 changes: 1 addition & 1 deletion src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl SourceConfig for DatadogAgentConfig {
);
let listener = tls.bind(&self.address).await?;
let listener = listener
.with_allowlist(self.permit_origin.clone().map(Into::into));
.with_allowlist(cx.effective_permit_origin(self.permit_origin.clone()).map(Into::into));
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
let filters = source.build_warp_filters(cx.out, acknowledgements, self)?;
let shutdown = cx.shutdown;
Expand Down
5 changes: 3 additions & 2 deletions src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub struct FluentConfig {
connection_limit: Option<u32>,

#[configurable(derived)]
keepalive: Option<TcpKeepaliveConfig>,
keepalive: Option<TcpKeepaliveConfig>,

#[configurable(derived)]
pub permit_origin: Option<IpAllowlistConfig>,
Expand Down Expand Up @@ -104,6 +104,7 @@ impl SourceConfig for FluentConfig {
.and_then(|tls| tls.client_metadata_key.clone())
.and_then(|k| k.path);
let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
let allowlist = cx.effective_permit_origin(self.permit_origin.clone()).map(Into::into);
source.run(
self.address,
self.keepalive,
Expand All @@ -115,7 +116,7 @@ impl SourceConfig for FluentConfig {
cx,
self.acknowledgements,
self.connection_limit,
self.permit_origin.clone().map(Into::into),
allowlist,
FluentConfig::NAME,
log_namespace,
)
Expand Down
3 changes: 2 additions & 1 deletion src/sources/heroku_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ impl SourceConfig for LogplexConfig {
log_namespace,
};

let permit_origin = cx.effective_permit_origin(self.permit_origin.clone());
source.run(
self.address,
"events",
Expand All @@ -207,7 +208,7 @@ impl SourceConfig for LogplexConfig {
cx,
self.acknowledgements,
self.keepalive.clone(),
self.permit_origin.clone(),
permit_origin,
)
}

Expand Down
3 changes: 2 additions & 1 deletion src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,7 @@ impl SourceConfig for SimpleHttpConfig {
decoder,
log_namespace,
};
let permit_origin = cx.effective_permit_origin(self.permit_origin.clone());
source.run(
self.address,
self.path.as_str(),
Expand All @@ -389,7 +390,7 @@ impl SourceConfig for SimpleHttpConfig {
cx,
self.acknowledgements,
self.keepalive.clone(),
self.permit_origin.clone(),
permit_origin,
)
}

Expand Down
3 changes: 2 additions & 1 deletion src/sources/logstash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ impl SourceConfig for LogstashConfig {
.and_then(|k| k.path);

let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
let allowlist = cx.effective_permit_origin(self.permit_origin.clone()).map(Into::into);
source.run(
self.address,
self.keepalive,
Expand All @@ -167,7 +168,7 @@ impl SourceConfig for LogstashConfig {
cx,
self.acknowledgements,
self.connection_limit,
self.permit_origin.clone().map(Into::into),
allowlist,
LogstashConfig::NAME,
log_namespace,
)
Expand Down
5 changes: 3 additions & 2 deletions src/sources/opentelemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ impl SourceConfig for OpentelemetryConfig {
let acknowledgements = cx.do_acknowledgements(self.acknowledgements);
let events_received = register!(EventsReceived);
let log_namespace = cx.log_namespace(self.log_namespace);
let permit_origin = cx.effective_permit_origin(self.permit_origin.clone());

let log_service = LogsServiceServer::new(Service {
pipeline: cx.out.clone(),
Expand Down Expand Up @@ -216,7 +217,7 @@ impl SourceConfig for OpentelemetryConfig {
grpc_tls_settings,
builder.routes(),
cx.shutdown.clone(),
self.permit_origin.clone(),
permit_origin.clone(),
)
.map_err(|error| {
error!(message = "Source future failed.", %error);
Expand Down Expand Up @@ -245,7 +246,7 @@ impl SourceConfig for OpentelemetryConfig {
filters,
cx.shutdown,
http_config.keepalive.clone(),
self.permit_origin.clone(),
permit_origin,
))
} else {
None
Expand Down
3 changes: 2 additions & 1 deletion src/sources/prometheus/pushgateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ impl SourceConfig for PrometheusPushgatewayConfig {
let source = PushgatewaySource {
aggregate_metrics: self.aggregate_metrics,
};
let permit_origin = cx.effective_permit_origin(self.permit_origin.clone());
source.run(
self.address,
"",
Expand All @@ -123,7 +124,7 @@ impl SourceConfig for PrometheusPushgatewayConfig {
cx,
self.acknowledgements,
self.keepalive.clone(),
self.permit_origin.clone(),
permit_origin,
)
}

Expand Down
3 changes: 2 additions & 1 deletion src/sources/prometheus/remote_write.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ impl GenerateConfig for PrometheusRemoteWriteConfig {
impl SourceConfig for PrometheusRemoteWriteConfig {
async fn build(&self, cx: SourceContext) -> crate::Result<sources::Source> {
let source = RemoteWriteSource;
let permit_origin = cx.effective_permit_origin(self.permit_origin.clone());
source.run(
self.address,
"",
Expand All @@ -112,7 +113,7 @@ impl SourceConfig for PrometheusRemoteWriteConfig {
cx,
self.acknowledgements,
self.keepalive.clone(),
self.permit_origin.clone(),
permit_origin,
)
}

Expand Down
6 changes: 4 additions & 2 deletions src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ impl SourceConfig for SocketConfig {
.and_then(|tls| tls.client_metadata_key.clone())
.and_then(|k| k.path);
let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
let allowlist = cx.effective_permit_origin(config.permit_origin.clone()).map(Into::into);
tcp.run(
config.address(),
config.keepalive(),
Expand All @@ -143,19 +144,20 @@ impl SourceConfig for SocketConfig {
cx,
false.into(),
config.connection_limit,
config.permit_origin.map(Into::into),
allowlist,
SocketConfig::NAME,
log_namespace,
)
}
Mode::Udp(config) => {
Mode::Udp(mut config) => {
let log_namespace = cx.log_namespace(config.log_namespace);
let decoding = config.decoding().clone();
let framing = config
.framing()
.clone()
.unwrap_or_else(|| decoding.default_message_based_framing());
let decoder = DecodingConfig::new(framing, decoding, log_namespace).build()?;
config.permit_origin = cx.effective_permit_origin(config.permit_origin);
Ok(udp::udp(
config,
decoder,
Expand Down
3 changes: 2 additions & 1 deletion src/sources/splunk_hec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ impl SourceConfig for SplunkConfig {
let tls = MaybeTlsSettings::from_config(self.tls.as_ref(), true)?;
let shutdown = cx.shutdown.clone();
let out = cx.out.clone();
let allowlist = cx.effective_permit_origin(self.permit_origin.clone()).map(Into::into);
let source = SplunkSource::new(self, tls.http_protocol_name(), cx);

let event_service = source.event_service(out.clone());
Expand All @@ -178,7 +179,7 @@ impl SourceConfig for SplunkConfig {

let listener = tls.bind(&self.address).await?;
let listener = listener
.with_allowlist(self.permit_origin.clone().map(Into::into));
.with_allowlist(allowlist);

let keepalive_settings = self.keepalive.clone();
Ok(Box::pin(async move {
Expand Down
4 changes: 2 additions & 2 deletions src/sources/statsd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ impl SourceConfig for StatsdConfig {
let statsd_tcp_source = StatsdTcpSource {
sanitize: config.sanitize,
};

let allowlist = cx.effective_permit_origin(config.permit_origin.clone()).map(Into::into);
statsd_tcp_source.run(
config.address,
config.keepalive,
Expand All @@ -196,7 +196,7 @@ impl SourceConfig for StatsdConfig {
cx,
false.into(),
config.connection_limit,
config.permit_origin.clone().map(Into::into),
allowlist,
StatsdConfig::NAME,
LogNamespace::Legacy,
)
Expand Down
3 changes: 2 additions & 1 deletion src/sources/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ impl SourceConfig for SyslogConfig {
.and_then(|tls| tls.client_metadata_key.clone())
.and_then(|k| k.path);
let tls = MaybeTlsSettings::from_config(tls_config.as_ref(), true)?;
let allowlist = cx.effective_permit_origin(permit_origin).map(Into::into);
source.run(
address,
keepalive,
Expand All @@ -207,7 +208,7 @@ impl SourceConfig for SyslogConfig {
cx,
false.into(),
connection_limit,
permit_origin.map(Into::into),
allowlist,
SyslogConfig::NAME,
log_namespace,
)
Expand Down