Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,140 changes: 487 additions & 653 deletions Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ tokio-util = { version = "0.7", default-features = false, features = ["io", "tim
bytesize = { version = "1.3.0", default-features = false }
bytes = { version = "1.9.0", default-features = false, features = ["serde"] }
async-compression = { version = "0.4.18", default-features = false, features = ["tokio", "gzip", "zstd"] }
object_store = { version = "0.6.1", dependencies = { url = "=2.3.1" }, features = ["gcp"], default-features = false }
object_store = { version = "0.10.2", features = ["gcp"], default-features = false }
once_cell = { version = "1.20.2", default-features = false }
smallvec = { version = "1", default-features = false, features = ["union", "serde"] }
h2 = { version = "0.4.7", default-features = false }
Expand Down Expand Up @@ -391,9 +391,9 @@ k8s-openapi = { version = "0.22.0", default-features = false, features = ["v1_26
kube = { version = "0.93.0", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true }
listenfd = { version = "1.0.1", default-features = false, optional = true }
lru = { version = "0.12.5", default-features = false, optional = true }
maxminddb = { version = "0.24.0", default-features = false, optional = true }
maxminddb = { version = "0.27.0", default-features = false, optional = true }
md-5 = { version = "0.10", default-features = false, optional = true }
mongodb = { version = "2.8.2", default-features = false, features = ["tokio-runtime"], optional = true }
mongodb = { version = "3.2.5", default-features = false, features = ["rustls-tls", "compat-3-0-0"], optional = true }
async-nats = { version = "0.33.0", default-features = false, optional = true }
nkeys = { version = "0.4.4", default-features = false, optional = true }
nom = { version = "7.1.3", default-features = false, optional = true }
Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ netflow_parser = { version = "=0.5.2", features = ["parse_unknown_fields"]}
memchr = { version = "2", default-features = false }
metrics.workspace = true
ordered-float = { version = "4.5.0", default-features = false }
parquet = {version = "39.0.0", default-feature = false}
parquet = { version = "55.2.0", default-features = false, features = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd"] }
prost.workspace = true
prost-reflect.workspace = true
rand.workspace = true
Expand Down
44 changes: 28 additions & 16 deletions lib/codecs/src/encoding/format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -857,28 +857,40 @@ mod tests {
) where
<T as DataType>::T: Default,
{
let mut values = Vec::new();
values.resize(count, <T as DataType>::T::default());
let mut def_levels = Vec::new();
def_levels.resize(count, 0);
let mut rep_levels = Vec::new();
rep_levels.resize(count, 0);
let (read, level) = column_reader
.read_batch(
let mut values: Vec<<T as DataType>::T> = Vec::with_capacity(count);
let mut def_levels: Vec<i16> = Vec::with_capacity(count);
let mut rep_levels: Vec<i16> = Vec::with_capacity(count);
let (_records_read, _values_read, levels_read) = column_reader
.read_records(
count,
Some(def_levels.as_mut_slice()).filter(|_| expect_def_levels.is_some()),
Some(rep_levels.as_mut_slice()).filter(|_| expect_rep_levels.is_some()),
if expect_def_levels.is_some() {
Some(&mut def_levels)
} else {
None
},
if expect_rep_levels.is_some() {
Some(&mut rep_levels)
} else {
None
},
&mut values,
)
.unwrap();

assert_eq!(level, count);
assert_eq!(&values[..read], expect_values);
if expect_rep_levels.is_some() {
assert_eq!(rep_levels, expect_rep_levels.unwrap());
assert_eq!(levels_read, count);
assert_eq!(values.as_slice(), expect_values);
if let Some(expected) = expect_rep_levels {
// In parquet 55, columns with max_rep_level = 0 leave the rep_levels
// buffer empty rather than writing zeros. Treat empty as all zeros
// to keep tests independent of buffer-fill behavior.
if rep_levels.is_empty() && expected.iter().all(|&l| l == 0) {
// ok
} else {
assert_eq!(rep_levels, expected);
}
}
if expect_def_levels.is_some() {
assert_eq!(def_levels, expect_def_levels.unwrap());
if let Some(expected) = expect_def_levels {
assert_eq!(def_levels, expected);
}
}

Expand Down
4 changes: 2 additions & 2 deletions lib/vector-config-common/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,9 @@ pub enum Validation {
///
/// Can only be used for numbers.
Range {
#[darling(default, rename = "min", with = "maybe_float_or_int")]
#[darling(default, rename = "min", with = maybe_float_or_int)]
minimum: Option<f64>,
#[darling(default, rename = "max", with = "maybe_float_or_int")]
#[darling(default, rename = "max", with = maybe_float_or_int)]
maximum: Option<f64>,
},

Expand Down
29 changes: 27 additions & 2 deletions lib/vector-core/src/tls/settings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use lookup::lookup_v2::OptionalValuePath;
use openssl::{
pkcs12::{ParsedPkcs12_2, Pkcs12},
pkey::{PKey, Private},
ssl::{select_next_proto, AlpnError, ConnectConfiguration, SslContextBuilder, SslVerifyMode},
ssl::{AlpnError, ConnectConfiguration, SslContextBuilder, SslVerifyMode},
stack::Stack,
x509::{store::X509StoreBuilder, X509},
};
Expand Down Expand Up @@ -331,7 +331,32 @@ impl TlsSettings {
if for_server {
let server_proto = alpn.clone();
context.set_alpn_select_callback(move |_, client_proto| {
select_next_proto(server_proto.as_slice(), client_proto).ok_or(AlpnError::NOACK)
// Walk the server's preference list and return the first
// protocol that also appears in the client's list, returned
// as a borrow of `client_proto` so the lifetime escapes the
// closure as required by the callback signature.
let mut server = server_proto.as_slice();
while let Some((&len, rest)) = server.split_first() {
let len = len as usize;
if rest.len() < len {
break;
}
let (sproto, after) = rest.split_at(len);
let mut client = client_proto;
while let Some((&clen, crest)) = client.split_first() {
let clen = clen as usize;
if crest.len() < clen {
break;
}
let (cproto, cafter) = crest.split_at(clen);
if cproto == sproto {
return Ok(cproto);
}
client = cafter;
}
server = after;
}
Err(AlpnError::NOACK)
});
} else {
context
Expand Down
94 changes: 41 additions & 53 deletions src/enrichment_tables/geoip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@
//!
//! [maxmind]: https://dev.maxmind.com/geoip/geoip2/downloadable
//! [geolite]: https://dev.maxmind.com/geoip/geoip2/geolite2/#Download_Access
use std::{collections::BTreeMap, fs, net::IpAddr, sync::Arc, time::SystemTime};
use std::{fs, net::IpAddr, sync::Arc, time::SystemTime};

use maxminddb::{
geoip2::{AnonymousIp, City, ConnectionType, Isp},
MaxMindDBError, Reader,
geoip2::{AnonymousIp, City, ConnectionType, Isp, Names},
Reader,
};
use ordered_float::NotNan;
use vector_lib::configurable::configurable_component;
Expand Down Expand Up @@ -124,24 +124,16 @@ impl Geoip {
)
})?;

// Check if we can read database with dummy Ip.
// Verify the database is readable; missing-record is fine, not an error.
let ip = IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED);
let result = match dbkind {
DatabaseKind::Asn | DatabaseKind::Isp => dbreader.lookup::<Isp>(ip).map(|_| ()),
DatabaseKind::ConnectionType => dbreader.lookup::<ConnectionType>(ip).map(|_| ()),
DatabaseKind::City => dbreader.lookup::<City>(ip).map(|_| ()),
DatabaseKind::AnonymousIp => dbreader.lookup::<AnonymousIp>(ip).map(|_| ()),
};
dbreader.lookup(ip)?;

match result {
Ok(_) | Err(MaxMindDBError::AddressNotFoundError(_)) => Ok(Geoip {
last_modified: fs::metadata(&config.path)?.modified()?,
dbreader,
dbkind,
config,
}),
Err(error) => Err(error.into()),
}
Ok(Geoip {
last_modified: fs::metadata(&config.path)?.modified()?,
dbreader,
dbkind,
config,
})
}

fn lookup(&self, ip: IpAddr, select: Option<&[String]>) -> Option<ObjectMap> {
Expand All @@ -163,7 +155,7 @@ impl Geoip {

match self.dbkind {
DatabaseKind::Asn | DatabaseKind::Isp => {
let data = self.dbreader.lookup::<Isp>(ip).ok()?;
let data: Isp = self.dbreader.lookup(ip).ok()?.decode().ok()??;

add_field!("autonomous_system_number", data.autonomous_system_number);
add_field!(
Expand All @@ -174,62 +166,53 @@ impl Geoip {
add_field!("organization", data.organization);
}
DatabaseKind::City => {
let data = self.dbreader.lookup::<City>(ip).ok()?;
let data: City = self.dbreader.lookup(ip).ok()?.decode().ok()??;

add_field!(
"city_name",
self.take_translation(data.city.as_ref().and_then(|c| c.names.as_ref()))
);
add_field!("city_name", self.take_translation(&data.city.names));

add_field!("continent_code", data.continent.and_then(|c| c.code));
add_field!("continent_code", data.continent.code);

let country = data.country.as_ref();
add_field!("country_code", country.and_then(|country| country.iso_code));
add_field!("country_code", data.country.iso_code);
add_field!(
"country_name",
self.take_translation(country.and_then(|c| c.names.as_ref()))
self.take_translation(&data.country.names)
);

let location = data.location.as_ref();
add_field!("timezone", location.and_then(|location| location.time_zone));
let location = &data.location;
add_field!("timezone", location.time_zone);
add_field!(
"latitude",
location
.and_then(|location| location.latitude)
.map(|latitude| Value::Float(
NotNan::new(latitude).expect("latitude cannot be Nan")
))
location.latitude.map(|latitude| Value::Float(
NotNan::new(latitude).expect("latitude cannot be Nan")
))
);
add_field!(
"longitude",
location
.and_then(|location| location.longitude)
.longitude
.map(|longitude| NotNan::new(longitude).expect("longitude cannot be Nan"))
);
add_field!(
"metro_code",
location.and_then(|location| location.metro_code)
);
add_field!("metro_code", location.metro_code);

// last subdivision is most specific per https://github.com/maxmind/GeoIP2-java/blob/39385c6ce645374039450f57208b886cf87ade47/src/main/java/com/maxmind/geoip2/model/AbstractCityResponse.java#L96-L107
let subdivision = data.subdivisions.as_ref().and_then(|s| s.last());
let subdivision = data.subdivisions.last();
add_field!(
"region_name",
self.take_translation(subdivision.and_then(|s| s.names.as_ref()))
subdivision.and_then(|s| self.take_translation(&s.names))
);
add_field!(
"region_code",
subdivision.and_then(|subdivision| subdivision.iso_code)
);
add_field!("postal_code", data.postal.and_then(|p| p.code));
add_field!("postal_code", data.postal.code);
}
DatabaseKind::ConnectionType => {
let data = self.dbreader.lookup::<ConnectionType>(ip).ok()?;
let data: ConnectionType = self.dbreader.lookup(ip).ok()?.decode().ok()??;

add_field!("connection_type", data.connection_type);
}
DatabaseKind::AnonymousIp => {
let data = self.dbreader.lookup::<AnonymousIp>(ip).ok()?;
let data: AnonymousIp = self.dbreader.lookup(ip).ok()?.decode().ok()??;

add_field!("is_anonymous", data.is_anonymous);
add_field!("is_anonymous_vpn", data.is_anonymous_vpn);
Expand All @@ -243,13 +226,18 @@ impl Geoip {
Some(map)
}

fn take_translation<'a>(
&self,
translations: Option<&BTreeMap<&str, &'a str>>,
) -> Option<&'a str> {
translations
.and_then(|translations| translations.get(&*self.config.locale))
.copied()
fn take_translation<'a>(&self, names: &Names<'a>) -> Option<&'a str> {
match self.config.locale.as_str() {
"de" => names.german,
"en" => names.english,
"es" => names.spanish,
"fr" => names.french,
"ja" => names.japanese,
"pt-BR" => names.brazilian_portuguese,
"ru" => names.russian,
"zh-CN" => names.simplified_chinese,
_ => names.english,
}
}
}

Expand Down
23 changes: 10 additions & 13 deletions src/enrichment_tables/mmdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
//! [maxmind]: https://maxmind.com
use std::{fs, net::IpAddr, sync::Arc, time::SystemTime};

use maxminddb::{MaxMindDBError, Reader};
use maxminddb::Reader;
use vector_lib::configurable::configurable_component;
use vector_lib::enrichment::{Case, Condition, IndexHandle, Table};
use vrl::value::{ObjectMap, Value};
Expand Down Expand Up @@ -52,22 +52,19 @@ impl Mmdb {
pub fn new(config: MmdbConfig) -> crate::Result<Self> {
let dbreader = Arc::new(Reader::open_readfile(config.path.clone())?);

// Check if we can read database with dummy Ip.
// Verify the database is readable; missing-record is fine, not an error.
let ip = IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED);
let result = dbreader.lookup::<ObjectMap>(ip).map(|_| ());

match result {
Ok(_) | Err(MaxMindDBError::AddressNotFoundError(_)) => Ok(Mmdb {
last_modified: fs::metadata(&config.path)?.modified()?,
dbreader,
config,
}),
Err(error) => Err(error.into()),
}
dbreader.lookup(ip)?;

Ok(Mmdb {
last_modified: fs::metadata(&config.path)?.modified()?,
dbreader,
config,
})
}

fn lookup(&self, ip: IpAddr, select: Option<&[String]>) -> Option<ObjectMap> {
let data = self.dbreader.lookup::<ObjectMap>(ip).ok()?;
let data: ObjectMap = self.dbreader.lookup(ip).ok()?.decode().ok()??;

if let Some(fields) = select {
let mut filtered = Value::from(ObjectMap::new());
Expand Down
6 changes: 3 additions & 3 deletions src/sources/mongodb_metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ impl MongoDbMetrics {
let doc = self
.client
.database("admin")
.run_command(doc! { "isMaster": 1 }, None)
.run_command(doc! { "isMaster": 1 })
.await
.map_err(CollectError::Mongo)?;
let msg: CommandIsMaster = from_document(doc).map_err(CollectError::Bson)?;
Expand All @@ -215,7 +215,7 @@ impl MongoDbMetrics {
let doc = self
.client
.database("admin")
.run_command(doc! { "buildInfo": 1 }, None)
.run_command(doc! { "buildInfo": 1 })
.await
.map_err(CollectError::Mongo)?;
from_document(doc).map_err(CollectError::Bson)
Expand Down Expand Up @@ -281,7 +281,7 @@ impl MongoDbMetrics {
let command = doc! { "serverStatus": 1, "opLatencies": { "histograms": true }};
let db = self.client.database("admin");
let doc = db
.run_command(command, None)
.run_command(command)
.await
.map_err(CollectError::Mongo)?;
let byte_size = document_size(&doc);
Expand Down