From 6b761fb23c672c0565a146a25dd6b97de9e69e58 Mon Sep 17 00:00:00 2001 From: paullegranddc Date: Fri, 12 Jun 2026 18:53:35 +0200 Subject: [PATCH] feat: agentless RC fetcher --- Cargo.lock | 223 +++-- libdd-common/src/lib.rs | 21 +- libdd-remote-config/Cargo.toml | 17 +- .../examples/remote_config_fetch.rs | 52 +- .../roots/prod/config_root.json | 63 ++ .../roots/prod/director_root.json | 63 ++ .../src/agentless_client/mod.rs | 766 ++++++++++++++++++ libdd-remote-config/src/fetch/fetcher.rs | 247 +++++- libdd-remote-config/src/fetch/shared.rs | 8 +- libdd-remote-config/src/fetch/single.rs | 35 +- libdd-remote-config/src/fetch/test_server.rs | 3 + libdd-remote-config/src/lib.rs | 2 + libdd-trace-protobuf/build.rs | 6 + .../src/pb/remoteconfig.proto | 210 +++++ libdd-trace-protobuf/src/remoteconfig.rs | 400 +++++++++ 15 files changed, 1994 insertions(+), 122 deletions(-) create mode 100644 libdd-remote-config/roots/prod/config_root.json create mode 100644 libdd-remote-config/roots/prod/director_root.json create mode 100644 libdd-remote-config/src/agentless_client/mod.rs diff --git a/Cargo.lock b/Cargo.lock index fff06dbaa3..8ccd0cae6a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -397,10 +397,10 @@ dependencies = [ "axum-core", "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", - "itoa", + "itoa 1.0.11", "matchit", "memchr", "mime", @@ -421,8 +421,8 @@ checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" dependencies = [ "bytes", "futures-core", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", "mime", "pin-project-lite", @@ -1226,7 +1226,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" dependencies = [ "csv-core", - "itoa", + "itoa 1.0.11", "ryu", "serde", ] @@ -1447,7 +1447,7 @@ dependencies = [ "bytes", "constcat", "futures", - "http", + "http 1.1.0", "http-body-util", "libdd-common", "libdd-data-pipeline", @@ -1506,7 +1506,7 @@ dependencies = [ "datadog-live-debugger", "datadog-sidecar-macros", "futures", - "http", + "http 1.1.0", "http-body-util", "httpmock", "libc", @@ -1558,7 +1558,7 @@ dependencies = [ "datadog-ipc", "datadog-live-debugger", "datadog-sidecar", - "http", + "http 1.1.0", "libc", "libdd-common", "libdd-common-ffi", @@ -1636,6 +1636,15 @@ dependencies = [ "syn 2.0.87", ] +[[package]] +name = "derp" +version = "0.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c9b84cfd9b6fa437e498215e5625e9e3ae3bf9bb54d623028a181c40820db169" +dependencies = [ + "untrusted 0.7.1", +] + [[package]] name = "diff" version = "0.1.13" @@ -2135,7 +2144,7 @@ dependencies = [ "fnv", "futures-core", "futures-sink", - "http", + "http 1.1.0", "indexmap 2.12.1", "slab", "tokio", @@ -2223,7 +2232,7 @@ dependencies = [ "base64 0.21.7", "bytes", "headers-core", - "http", + "http 1.1.0", "httpdate", "mime", "sha1", @@ -2235,7 +2244,7 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" dependencies = [ - "http", + "http 1.1.0", ] [[package]] @@ -2314,6 +2323,17 @@ dependencies = [ "tracing", ] +[[package]] +name = "http" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "601cbb57e577e2f5ef5be8e7b83f0f63994f25aa94d673e54a92d5c516d101f1" +dependencies = [ + "bytes", + "fnv", + "itoa 1.0.11", +] + [[package]] name = "http" version = "1.1.0" @@ -2322,7 +2342,18 @@ checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ "bytes", "fnv", - "itoa", + "itoa 1.0.11", +] + +[[package]] +name = "http-body" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ceab25649e9960c0311ea418d17bee82c0dcec1bd053b5f9a66e265a693bed2" +dependencies = [ + "bytes", + "http 0.2.12", + "pin-project-lite", ] [[package]] @@ -2332,7 +2363,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1efedce1fb8e6913f23e0c92de8e62cd5b772a67e7b3946df930a62566c93184" dependencies = [ "bytes", - "http", + "http 1.1.0", ] [[package]] @@ -2343,8 +2374,8 @@ checksum = "793429d76616a256bcb62c2a2ec2bed781c8307e797e2598c50010f2bee2544f" dependencies = [ "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "pin-project-lite", ] @@ -2377,9 +2408,9 @@ dependencies = [ "futures-timer", "futures-util", "headers", - "http", + "http 1.1.0", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", "lazy_static", "log", @@ -2402,6 +2433,29 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" +[[package]] +name = "hyper" +version = "0.14.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41dfc780fdec9373c01bae43289ea34c972e40ee3c9f6b3c8801a35f35586ce7" +dependencies = [ + "bytes", + "futures-channel", + "futures-core", + "futures-util", + "http 0.2.12", + "http-body 0.4.6", + "httparse", + "httpdate", + "itoa 1.0.11", + "pin-project-lite", + "socket2 0.5.10", + "tokio", + "tower-service", + "tracing", + "want", +] + [[package]] name = "hyper" version = "1.6.0" @@ -2412,11 +2466,11 @@ dependencies = [ "futures-channel", "futures-util", "h2", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "httparse", "httpdate", - "itoa", + "itoa 1.0.11", "pin-project-lite", "smallvec", "tokio", @@ -2429,8 +2483,8 @@ version = "0.27.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c93eb611681b207e1fe55d5a71ecf91572ec8a6705cdb6857f7d8d5242cf58" dependencies = [ - "http", - "hyper", + "http 1.1.0", + "hyper 1.6.0", "hyper-util", "rustls", "rustls-native-certs", @@ -2447,7 +2501,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" dependencies = [ - "hyper", + "hyper 1.6.0", "hyper-util", "pin-project-lite", "tokio", @@ -2465,9 +2519,9 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "http", - "http-body", - "hyper", + "http 1.1.0", + "http-body 1.0.1", + "hyper 1.6.0", "ipnet", "libc", "percent-encoding", @@ -2734,6 +2788,12 @@ dependencies = [ "either", ] +[[package]] +name = "itoa" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" + [[package]] name = "itoa" version = "1.0.11" @@ -2846,7 +2906,7 @@ version = "2.0.0" dependencies = [ "anyhow", "bytes", - "http", + "http 1.1.0", "thiserror 1.0.68", ] @@ -2855,7 +2915,7 @@ name = "libdd-capabilities-impl" version = "2.0.0" dependencies = [ "bytes", - "http", + "http 1.1.0", "http-body-util", "libdd-capabilities", "libdd-common", @@ -2875,11 +2935,11 @@ dependencies = [ "futures-core", "futures-util", "hex", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", "httparse", - "hyper", + "hyper 1.6.0", "hyper-rustls", "hyper-util", "indexmap 2.12.1", @@ -2916,7 +2976,7 @@ dependencies = [ "chrono", "crossbeam-queue", "function_name", - "hyper", + "hyper 1.6.0", "libdd-common", "serde", ] @@ -2934,7 +2994,7 @@ dependencies = [ "cxx-build", "errno", "goblin", - "http", + "http 1.1.0", "libc", "libdd-common", "libdd-libunwind-sys", @@ -2990,7 +3050,7 @@ dependencies = [ "duplicate 2.0.1", "either", "getrandom 0.2.15", - "http", + "http 1.1.0", "http-body-util", "httpmock", "libdd-capabilities", @@ -3060,7 +3120,7 @@ version = "3.0.0" dependencies = [ "anyhow", "cadence", - "http", + "http 1.1.0", "libdd-common", "serde", "tokio", @@ -3075,7 +3135,7 @@ dependencies = [ "fastrand", "http-body-util", "httpmock", - "hyper", + "hyper 1.6.0", "hyper-util", "libdd-common", "reqwest", @@ -3182,7 +3242,7 @@ dependencies = [ "cxx-build", "futures", "hashbrown 0.16.1", - "http", + "http 1.1.0", "http-body-util", "httparse", "indexmap 2.12.1", @@ -3221,7 +3281,7 @@ dependencies = [ "function_name", "futures", "http-body-util", - "hyper", + "hyper 1.6.0", "libc", "libdd-common", "libdd-common-ffi", @@ -3257,14 +3317,17 @@ dependencies = [ "futures", "futures-util", "hashbrown 0.15.1", - "http", + "http 1.1.0", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-util", + "libdd-capabilities", + "libdd-capabilities-impl", "libdd-common", "libdd-remote-config", "libdd-trace-protobuf", "manual_future", + "prost", "serde", "serde_json", "serde_with", @@ -3274,6 +3337,7 @@ dependencies = [ "tokio", "tokio-util", "tracing", + "tuf", "uuid", ] @@ -3324,7 +3388,7 @@ dependencies = [ "bytes", "futures", "hashbrown 0.15.1", - "http", + "http 1.1.0", "http-body-util", "httpmock", "libc", @@ -3423,7 +3487,7 @@ dependencies = [ "async-trait", "criterion", "hashbrown 0.15.1", - "http", + "http 1.1.0", "httpmock", "libdd-capabilities", "libdd-capabilities-impl", @@ -3455,11 +3519,11 @@ dependencies = [ "flate2", "futures", "getrandom 0.2.15", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", "httpmock", - "hyper", + "hyper 1.6.0", "indexmap 2.12.1", "libdd-capabilities", "libdd-capabilities-impl", @@ -3488,7 +3552,7 @@ version = "0.1.0" dependencies = [ "anyhow", "bytes", - "http", + "http 1.1.0", "httpmock", "libdd-common", "libdd-remote-config", @@ -3764,7 +3828,7 @@ dependencies = [ "bytes", "encoding_rs", "futures-util", - "http", + "http 1.1.0", "httparse", "memchr", "mime", @@ -4762,10 +4826,10 @@ dependencies = [ "futures-core", "futures-util", "hickory-resolver", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-rustls", "hyper-util", "js-sys", @@ -4805,7 +4869,7 @@ dependencies = [ "cfg-if", "getrandom 0.2.15", "libc", - "untrusted", + "untrusted 0.9.0", "windows-sys 0.52.0", ] @@ -4969,7 +5033,7 @@ dependencies = [ "aws-lc-rs", "ring", "rustls-pki-types", - "untrusted", + "untrusted 0.9.0", ] [[package]] @@ -5213,7 +5277,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d726bfaff4b320266d395898905d0eba0345aae23b54aee3a737e260fd46db03" dependencies = [ "indexmap 2.12.1", - "itoa", + "itoa 1.0.11", "memchr", "ryu", "serde", @@ -5275,7 +5339,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ "indexmap 2.12.1", - "itoa", + "itoa 1.0.11", "ryu", "serde", "unsafe-libyaml", @@ -5534,7 +5598,7 @@ version = "2.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6fe17b8deb33a9441280b4266c2d257e166bafbaea6e66b4b34ca139c91766d9" dependencies = [ - "itoa", + "itoa 1.0.11", "ryu", "sval", ] @@ -5545,7 +5609,7 @@ version = "2.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "854addb048a5bafb1f496c98e0ab5b9b581c3843f03ca07c034ae110d3b7c623" dependencies = [ - "itoa", + "itoa 1.0.11", "ryu", "sval", ] @@ -5829,7 +5893,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8a7619e19bc266e0f9c5e6686659d394bc57973859340060a69221e57dbc0c40" dependencies = [ "deranged", - "itoa", + "itoa 1.0.11", "num-conv", "powerfmt", "serde", @@ -6018,10 +6082,10 @@ dependencies = [ "base64 0.22.1", "bytes", "h2", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "http-body-util", - "hyper", + "hyper 1.6.0", "hyper-timeout", "hyper-util", "percent-encoding", @@ -6089,8 +6153,8 @@ dependencies = [ "bitflags", "bytes", "futures-util", - "http", - "http-body", + "http 1.1.0", + "http-body 1.0.1", "iri-string", "pin-project-lite", "tower", @@ -6202,6 +6266,31 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" +[[package]] +name = "tuf" +version = "0.3.0-beta10" +source = "git+https://github.com/DataDog/rust-tuf/?tag=0.3.0-beta10-opw-3#9e8d6077b0e67f13233ad0a347bb7d640705da04" +dependencies = [ + "chrono", + "data-encoding", + "derp", + "futures-io", + "futures-util", + "http 0.2.12", + "hyper 0.14.32", + "itoa 0.4.8", + "log", + "percent-encoding", + "ring", + "serde", + "serde_derive", + "serde_json", + "tempfile", + "thiserror 1.0.68", + "untrusted 0.7.1", + "url", +] + [[package]] name = "twox-hash" version = "1.6.3" @@ -6260,6 +6349,12 @@ version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" +[[package]] +name = "untrusted" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" + [[package]] name = "untrusted" version = "0.9.0" @@ -6362,7 +6457,7 @@ checksum = "9170e001f458781e92711d2ad666110f153e4e50bfd5cbd02db6547625714187" dependencies = [ "float-cmp", "halfbrown", - "itoa", + "itoa 1.0.11", "ryu", ] diff --git a/libdd-common/src/lib.rs b/libdd-common/src/lib.rs index dbd2e4a090..54a6f03193 100644 --- a/libdd-common/src/lib.rs +++ b/libdd-common/src/lib.rs @@ -7,7 +7,8 @@ #![cfg_attr(not(test), deny(clippy::unimplemented))] use anyhow::Context; -use http::uri; +use http::uri::PathAndQuery; +use http::{uri, Uri}; use serde::de::Error; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::sync::{Mutex, MutexGuard, RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -332,6 +333,24 @@ impl Endpoint { /// Default value for the timeout field in milliseconds. pub const DEFAULT_TIMEOUT: u64 = 3_000; + pub fn agentless(site: &str, api_key: String) -> anyhow::Result { + Ok(Self { + url: Uri::builder() + .scheme("https") + .authority( + uri::Authority::try_from(site) + .with_context(|| format!("dd_site is an invalid url: {site}"))?, + ) + .path_and_query(PathAndQuery::from_static("")) + .build() + .unwrap(), + api_key: Some(api_key.into()), + timeout_ms: Self::DEFAULT_TIMEOUT, + test_token: None, + use_system_resolver: true, + }) + } + /// Returns an iterator of optional endpoint-specific headers (api-key, test-token) /// as (header_name, header_value) string tuples for any that are available. pub fn get_optional_headers(&self) -> impl Iterator { diff --git a/libdd-remote-config/Cargo.toml b/libdd-remote-config/Cargo.toml index ae7988ceef..a42ff0030b 100644 --- a/libdd-remote-config/Cargo.toml +++ b/libdd-remote-config/Cargo.toml @@ -23,7 +23,7 @@ client = [ "tokio-util", "manual_future", "time", - "tracing" + "tracing", ] regex-lite = ["libdd-common/regex-lite"] @@ -36,24 +36,33 @@ test = ["hyper/server", "hyper-util"] [dependencies] anyhow = { version = "1.0" } libdd-common = { path = "../libdd-common", version = "4.2.0", default-features = false } +libdd-capabilities = { path = "../libdd-capabilities" } +libdd-capabilities-impl = { version = "2.0.0", path = "../libdd-capabilities-impl", features = ["https"]} libdd-trace-protobuf = { path = "../libdd-trace-protobuf", version = "3.0.2", optional = true } hyper = { workspace = true, optional = true, default-features = false } -http-body-util = {version = "0.1", optional = true } +http-body-util = { version = "0.1", optional = true } http = { version = "1.1", optional = true } base64 = { version = "0.22.1", optional = true } sha2 = { version = "0.10", optional = true } uuid = { version = "1.7.0", features = ["v4"], optional = true } futures-util = { version = "0.3", optional = true } tokio = { version = "1.36.0", optional = true } -tokio-util = { version = "0.7.10", optional = true } +tokio-util = { version = "0.7.10", optional = true } manual_future = { version = "0.1.1", optional = true } -time = { version = "0.3", features = ["parsing", "serde", "formatting"], optional = true } +time = { version = "0.3", features = [ + "parsing", + "serde", + "formatting", +], optional = true } tracing = { version = "0.1", default-features = false, optional = true } serde = "1.0" serde_json = { version = "1.0", features = ["raw_value"] } serde_with = "3" thiserror = "2" hashbrown = "0.15" +tuf = { git = "https://github.com/DataDog/rust-tuf/", tag = "0.3.0-beta10-opw-3" } +prost = "0.14.1" +futures = "0.3" # Test feature hyper-util = { workspace = true, features = ["service"], optional = true } diff --git a/libdd-remote-config/examples/remote_config_fetch.rs b/libdd-remote-config/examples/remote_config_fetch.rs index eb0c79ae95..66ac2033c6 100644 --- a/libdd-remote-config/examples/remote_config_fetch.rs +++ b/libdd-remote-config/examples/remote_config_fetch.rs @@ -8,6 +8,7 @@ use libdd_remote_config::file_change_tracker::{Change, FilePath}; use libdd_remote_config::file_storage::ParsedFileStorage; use libdd_remote_config::RemoteConfigProduct::ApmTracing; use libdd_remote_config::{RemoteConfigParsed, Target}; +use std::process::Command; use std::time::Duration; use tokio::time::sleep; @@ -16,8 +17,45 @@ const SERVICE: &str = "testservice"; const ENV: &str = "testenv"; const VERSION: &str = "1.2.3"; +fn get_hostname() -> String { + Command::new("hostname") + .output() + .ok() + .and_then(|o| String::from_utf8(o.stdout).ok()) + .map(|s| s.trim().to_string()) + .unwrap_or_else(|| "unknown".to_string()) +} + #[tokio::main(flavor = "current_thread")] async fn main() { + let hostname = get_hostname(); + println!("Hostname: {hostname}"); + + let dd_api_key = std::env::var("DD_API_KEY").ok(); + let dd_site = std::env::var("DD_SITE").ok(); + + let (endpoint, agentless_enabled) = match (dd_api_key, dd_site) { + (Some(api_key), Some(site)) => { + println!("DD_API_KEY and DD_SITE are set — enabling agentless mode (site: {site})"); + let endpoint = Endpoint::agentless(&site, api_key) + .expect("Failed to build agentless endpoint from DD_SITE"); + (endpoint, true) + } + _ => { + println!("DD_API_KEY / DD_SITE not set — connecting to local agent"); + ( + Endpoint { + url: http::Uri::from_static("http://localhost:8126"), + api_key: None, + timeout_ms: 5000, // custom timeout, defaults to 3 seconds + test_token: None, + ..Default::default() + }, + false, + ) + } + }; + // SingleChangesFetcher is ideal for a single static (runtime_id, service, env, version) tuple // Otherwise a SharedFetcher (or even a MultiTargetFetcher for a potentially high number of // targets) for multiple targets is needed. These can be manually wired together with a @@ -40,18 +78,16 @@ async fn main() { invariants: ConfigInvariants { language: "awesomelang".to_string(), tracer_version: "99.10.5".to_string(), - endpoint: Endpoint { - url: http::Uri::from_static("http://localhost:8126"), - api_key: None, - timeout_ms: 5000, // custom timeout, defaults to 3 seconds - test_token: None, - ..Default::default() - }, + endpoint, + hostname, + agentless_enabled, }, products: vec![ApmTracing], capabilities: vec![], }, - ); + ) + .await + .expect("Failed to create SingleChangesFetcher"); loop { match fetcher.fetch_changes().await { diff --git a/libdd-remote-config/roots/prod/config_root.json b/libdd-remote-config/roots/prod/config_root.json new file mode 100644 index 0000000000..d5a5eb7894 --- /dev/null +++ b/libdd-remote-config/roots/prod/config_root.json @@ -0,0 +1,63 @@ +{ + "signed": { + "_type": "root", + "spec_version": "1.0", + "version": 16, + "expires": "2026-10-31T17:00:00Z", + "keys": { + "620dacb7dc843acc731e4483c24ceb4121f4de5545f92d15dc2b13299b660e01": { + "keytype": "ed25519", + "scheme": "ed25519", + "keyid_hash_algorithms": ["sha256", "sha512"], + "keyval": { "public": "91d413c791907aae0be739d94a1e5e59c5d5ba65a8bbc1fb2153a5680f2d5958" } + }, + "e1fdd5827bb44defe9b87ed835c854be4b78a86ded013d1646bc416c1c89a9db": { + "keytype": "ed25519", + "scheme": "ed25519", + "keyid_hash_algorithms": ["sha256", "sha512"], + "keyval": { "public": "9323800f89d833ee263d3661c2616da89e405b92beeec334f21d54b5f60fbd85" } + } + }, + "roles": { + "root": { + "keyids": [ + "620dacb7dc843acc731e4483c24ceb4121f4de5545f92d15dc2b13299b660e01", + "e1fdd5827bb44defe9b87ed835c854be4b78a86ded013d1646bc416c1c89a9db" + ], + "threshold": 2 + }, + "snapshot": { + "keyids": [ + "620dacb7dc843acc731e4483c24ceb4121f4de5545f92d15dc2b13299b660e01", + "e1fdd5827bb44defe9b87ed835c854be4b78a86ded013d1646bc416c1c89a9db" + ], + "threshold": 2 + }, + "targets": { + "keyids": [ + "620dacb7dc843acc731e4483c24ceb4121f4de5545f92d15dc2b13299b660e01", + "e1fdd5827bb44defe9b87ed835c854be4b78a86ded013d1646bc416c1c89a9db" + ], + "threshold": 2 + }, + "timestamp": { + "keyids": [ + "620dacb7dc843acc731e4483c24ceb4121f4de5545f92d15dc2b13299b660e01", + "e1fdd5827bb44defe9b87ed835c854be4b78a86ded013d1646bc416c1c89a9db" + ], + "threshold": 2 + } + }, + "consistent_snapshot": true + }, + "signatures": [ + { + "keyid": "620dacb7dc843acc731e4483c24ceb4121f4de5545f92d15dc2b13299b660e01", + "sig": "a8b4ee59576c82bc1bc944df014bbeb90f5cba4ffbd8b7878461da2c934fd3bf93ac4c3b85a7936584da4a5a0cfe93b7150b559fc96423a98a70a11fc844f208" + }, + { + "keyid": "e1fdd5827bb44defe9b87ed835c854be4b78a86ded013d1646bc416c1c89a9db", + "sig": "3332e240a023dc267e87e210c7b46b9fa5772932d84936e3a7a5b5018b0f45fbf068ce60b97beb6e7e6c0c12a68d68a44461e590a934b577c71d4ff6dd94db09" + } + ] +} diff --git a/libdd-remote-config/roots/prod/director_root.json b/libdd-remote-config/roots/prod/director_root.json new file mode 100644 index 0000000000..f3882d62a0 --- /dev/null +++ b/libdd-remote-config/roots/prod/director_root.json @@ -0,0 +1,63 @@ +{ + "signed": { + "_type": "root", + "spec_version": "1.0", + "version": 15, + "expires": "2026-10-31T17:00:00Z", + "keys": { + "44d70fa8eae4c07f26c2767270827b6b9e11e7972926b3b419b5ea14ec32f796": { + "keytype": "ed25519", + "scheme": "ed25519", + "keyid_hash_algorithms": ["sha256", "sha512"], + "keyval": { "public": "286d6ae328365afec0f92519ceab68cd627e34072cde90b2f5d167badea970f2" } + }, + "b2b93a6dccc96d053e6db39181124c85ba4156d43503d4351b5500316fa084e8": { + "keytype": "ed25519", + "scheme": "ed25519", + "keyid_hash_algorithms": ["sha256", "sha512"], + "keyval": { "public": "afdd68be53815d67f8fa99cf101aac4589a358c660adf7dd4e179fe96834d3c9" } + } + }, + "roles": { + "root": { + "keyids": [ + "44d70fa8eae4c07f26c2767270827b6b9e11e7972926b3b419b5ea14ec32f796", + "b2b93a6dccc96d053e6db39181124c85ba4156d43503d4351b5500316fa084e8" + ], + "threshold": 2 + }, + "snapshot": { + "keyids": [ + "44d70fa8eae4c07f26c2767270827b6b9e11e7972926b3b419b5ea14ec32f796", + "b2b93a6dccc96d053e6db39181124c85ba4156d43503d4351b5500316fa084e8" + ], + "threshold": 2 + }, + "targets": { + "keyids": [ + "44d70fa8eae4c07f26c2767270827b6b9e11e7972926b3b419b5ea14ec32f796", + "b2b93a6dccc96d053e6db39181124c85ba4156d43503d4351b5500316fa084e8" + ], + "threshold": 2 + }, + "timestamp": { + "keyids": [ + "44d70fa8eae4c07f26c2767270827b6b9e11e7972926b3b419b5ea14ec32f796", + "b2b93a6dccc96d053e6db39181124c85ba4156d43503d4351b5500316fa084e8" + ], + "threshold": 2 + } + }, + "consistent_snapshot": true + }, + "signatures": [ + { + "keyid": "b2b93a6dccc96d053e6db39181124c85ba4156d43503d4351b5500316fa084e8", + "sig": "ccbe8cdd7dfb9a9d6b4bef8075a7aaf9baafe69a07100f22c04677a9737a23b24055ac3a0776c7021ae6a2fd175a251c0604164ea6705a0a896844766d2ecd07" + }, + { + "keyid": "44d70fa8eae4c07f26c2767270827b6b9e11e7972926b3b419b5ea14ec32f796", + "sig": "068a2e37e93688702e75ebb328b74cd8879832a63179ba1c54976aae4ee03a5e936c7b7274d4a6aa6755c27cfe800097984d94c83be901bde72103dccebcc008" + } + ] +} diff --git a/libdd-remote-config/src/agentless_client/mod.rs b/libdd-remote-config/src/agentless_client/mod.rs new file mode 100644 index 0000000000..32b91af183 --- /dev/null +++ b/libdd-remote-config/src/agentless_client/mod.rs @@ -0,0 +1,766 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use std::{ + fmt, + future::Future, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; + +use base64::Engine; +use futures::AsyncReadExt as _; +use hashbrown::{HashMap, HashSet}; +use http::{ + header, + uri::{Authority, PathAndQuery}, + Method, Request, Uri, +}; +use libdd_capabilities::{Bytes, HttpClientCapability}; +use libdd_common::Endpoint; +use libdd_trace_protobuf::remoteconfig; +use prost::Message; +use serde_json::Value; +use tracing::debug; +use tuf::repository::RepositoryStorage; +use tuf::{ + metadata::{ + Metadata, MetadataPath, MetadataVersion, RawSignedMetadata, TargetDescription, TargetPath, + }, + repository::RepositoryProvider as _, +}; + +#[allow(dead_code)] // used in tests and reserved for TUF config-repo init +const CONFIG_ROOT: &[u8] = include_bytes!("../../roots/prod/config_root.json"); +const CONFIG_ROOT_VERSION: u64 = 16; +const DIRECTOR_ROOT: &[u8] = include_bytes!("../../roots/prod/director_root.json"); +const DIRECTOR_ROOT_VERSION: u64 = 15; + +const FAKE_AGENT_VERSION: &'static str = "7.78.4"; + +type TUFRepo = tuf::repository::EphemeralRepository; +type TUFClient = tuf::client::Client; + +// Make a remote config API endpoint from and endpoint where `e.url` is the base dd site +// If the endpoint is not suitable (api key not set, not https), returns N +pub fn make_agentless_configs_endpoint(e: Endpoint) -> Option { + dbg!(&e); + if !(e.url.scheme_str().is_some_and(|s| s == "https") + && e.url.authority().is_some() + && e.api_key.is_some()) + { + return None; + } + + let mut parts = e.url.into_parts(); + parts.authority = + Some(Authority::try_from(format!("config.{}", parts.authority?.as_str())).ok()?); + parts.path_and_query = Some(PathAndQuery::from_static("/api/v0.1/configurations")); + + Some(Endpoint { + url: Uri::from_parts(parts).ok()?, + ..e + }) +} + +#[derive(Clone)] +pub struct AgentlessConfig { + pub hostname: String, +} + +struct CachedFile { + hash: Vec<(&'static tuf::crypto::HashAlgorithm, tuf::crypto::HashValue)>, + target_file: Vec, + version: u64, +} + +pub type NativeAgentlessFetcher = AgentlessFetcher; + +pub struct AgentlessFetcher { + http: C, + initialized: bool, + opaque_backend_state: Vec, + director_client: TUFClient, + config_client: TUFClient, + hostname: String, + products: HashSet, + refresh_interval: Duration, + endpoint: Endpoint, + // TODO: Not sure this is needed if the wrapper client already caches files? + target_cache: HashMap, +} + +pub struct ClientResponse<'a> { + pub root_version: u64, + pub target_version: u64, + pub opaque_backend_state: Vec, + pub targets: Vec<( + &'a str, + &'a [u8], + u64, + &'a Vec<(&'static tuf::crypto::HashAlgorithm, tuf::crypto::HashValue)>, + )>, +} + +struct BorrowedTarget<'a> { + pub path: &'a TargetPath, + pub desc: &'a TargetDescription, +} + +impl<'a> BorrowedTarget<'a> { + pub fn try_create(path: &'a TargetPath, desc: &'a TargetDescription) -> anyhow::Result { + if let Some(expiry) = desc.custom().get(CUSTOM_METADATA_EXPIRY_PATH) { + let expiry_ts = expiry + .as_u64() + .ok_or_else(|| anyhow::format_err!("expiry not a number"))?; + + if expiry_ts * 1000 <= now_unix_milli_ts() { + anyhow::bail!("expired target at path: {path}") + } + } + + Ok(Self { path, desc }) + } +} + +#[derive(Clone, Debug, Eq, PartialEq)] +pub struct Target { + pub path: TargetPath, + pub desc: TargetDescription, +} + +const CUSTOM_METADATA_EXPIRY_PATH: &str = "expires"; + +impl Target { + /// Returns Ok(Target) when valid and unexpired; Err(Error) otherwise. + pub fn try_create(path: &TargetPath, desc: &TargetDescription) -> anyhow::Result { + if let Some(expiry) = desc.custom().get(CUSTOM_METADATA_EXPIRY_PATH) { + let expiry_ts = expiry + .as_u64() + .ok_or_else(|| anyhow::format_err!("expiry not a number"))?; + + if expiry_ts * 1000 <= now_unix_milli_ts() { + anyhow::bail!("expired target at path: {path}") + } + } + + Ok(Self { + path: path.clone(), + desc: desc.clone(), + }) + } +} + +enum FetchTargetResult { + Cached, + New(CachedFile), +} + +impl AgentlessFetcher { + /// Create a new `RemoteConfig` client. + /// + /// # Errors + /// Returns an error if TUF root initialization fails. + pub async fn new(cfg: AgentlessConfig, endpoint: Endpoint) -> anyhow::Result { + Ok(Self { + endpoint, + http: C::new_client(), + director_client: TUFClient::with_trusted_root( + tuf::client::Config::default(), + &RawSignedMetadata::new(DIRECTOR_ROOT.to_vec()), + TUFRepo::new(), + TUFRepo::new(), + ) + .await?, + config_client: TUFClient::with_trusted_root( + tuf::client::Config::default(), + &RawSignedMetadata::new(CONFIG_ROOT.to_vec()), + TUFRepo::new(), + TUFRepo::new(), + ) + .await?, + hostname: cfg.hostname, + products: HashSet::new(), + target_cache: HashMap::new(), + + opaque_backend_state: Vec::new(), + refresh_interval: Duration::from_secs(5), + initialized: false, + }) + } + + /// Return the value of a particular target , checking both its length and + /// hashes against the metadata in the config repo. + /// + /// If it is already in the cache, return `Cached` + async fn fetch_target(&self, target: &BorrowedTarget<'_>) -> anyhow::Result { + let expected_hashes = tuf::crypto::retain_supported_hashes(target.desc.hashes()); + if expected_hashes.is_empty() { + anyhow::bail!("no supported hash for path: {}", target.path); + } + let (target_hash_algo, target_hash) = &expected_hashes[0]; + let target_path = target.path; + + dbg!(target.desc.custom()); + let version = target + .desc + .custom() + .get("v") + .and_then(|v| v.as_u64()) + .unwrap_or(0); + + if let Some(item) = self.target_cache.get(target_path) { + if item + .hash + .iter() + .find(|(alg, _)| alg == target_hash_algo) + .is_some_and(|(_, h)| h == target_hash) + && item.target_file.len() as u64 == target.desc.length() + { + return Ok(FetchTargetResult::Cached); + } + } + + // Fetch from the content from the remote __Unverified__ repo + // This is fine as we are comparing the (hash + len) with a validated + // target + let mut read = self + .director_client + .remote_repo() + .fetch_target(target_path) + .await?; + let mut buf = Vec::new(); + read.read_to_end(&mut buf).await?; + + let expected_len = target.desc.length() as usize; + if buf.len() != expected_len { + anyhow::bail!("bad length for file at path: {}", target.path) + } + + { + let hash_algs = expected_hashes + .iter() + .map(|(alg, _val)| (*alg).clone()) + .collect::>(); + let actual_hashes = + tuf::crypto::calculate_hashes_from_slice(&buf, hash_algs.as_slice())?; + let expected: HashMap<_, _> = expected_hashes + .iter() + .map(|(alg, val)| (alg, val)) + .collect(); + + if !(actual_hashes.len() == expected.len() + && actual_hashes + .iter() + .all(|(k, v)| expected.get(&k).is_some_and(|e| *e == v))) + { + anyhow::bail!("hash did not match: {}", target.path) + } + } + + Ok(FetchTargetResult::New(CachedFile { + hash: expected_hashes, + target_file: buf, + version, + })) + } + + pub async fn fetch_config( + &mut self, + c: remoteconfig::Client, + ) -> anyhow::Result { + let ( + current_config_snapshot_version, + current_config_root_version, + current_director_root_version, + ) = if self.initialized { + ( + u64::from( + self.config_client + .database() + .trusted_snapshot() + .ok_or(anyhow::anyhow!("Missing snapshot data"))? + .version(), + ), + u64::from(self.config_client.database().trusted_root().version()), + u64::from(self.director_client.database().trusted_root().version()), + ) + } else { + (0, 0, 0) + }; + + let all_products = c.products.iter().fold(HashSet::new(), |mut acc, p| { + acc.get_or_insert_with(p, String::clone); + acc + }); + let new_products = all_products + .difference(&self.products) + .cloned() + .collect::>(); + let old_products = self + .products + .intersection(&all_products) + .cloned() + .collect::>(); + + let request = remoteconfig::LatestConfigsRequest { + hostname: self.hostname.clone(), + current_config_snapshot_version, + current_config_root_version, + current_director_root_version, + products: old_products, + new_products: new_products, + backend_client_state: self.opaque_backend_state.clone(), + active_clients: vec![c.clone()], + agent_version: FAKE_AGENT_VERSION.to_owned(), + has_error: false, + error: String::new(), + trace_agent_env: String::new(), + org_uuid: String::new(), + tags: vec![], + agent_uuid: String::new(), + }; + let response = self.get_latest_config(request).await?; + if !self.initialized { + self.initialized = true; + } + + self.apply(&response).await?; + self.products = all_products; + + // TODO: filter predicates ? + + Ok(ClientResponse { + root_version: u64::from(self.config_client.database().trusted_root().version()), + target_version: u64::from( + self.config_client + .database() + .trusted_targets() + .ok_or(anyhow::anyhow!("Missing target data"))? + .version(), + ), + opaque_backend_state: self.opaque_backend_state.clone(), + targets: self + .target_cache + .iter() + .map(|(p, t)| (p.as_str(), t.target_file.as_slice(), t.version, &t.hash)) + .collect(), + }) + } + + /// Query the Remote Config org-status endpoint. + /// + /// # Errors + /// Returns an error if the HTTP request fails or the response cannot be decoded. + pub async fn get_org_status(&self) -> anyhow::Result { + let path = PathAndQuery::from_static("/api/v0.1/status"); + let res = self.send_request(Method::GET, path, Bytes::new()).await?; + parse_rc_response(res) + } + + pub async fn get_org_data(&self) -> anyhow::Result { + let path = PathAndQuery::from_static("/api/v0.1/org"); + let res = self.send_request(Method::GET, path, Bytes::new()).await?; + parse_rc_response(res) + } + + pub async fn init(&mut self) -> anyhow::Result<()> { + let response = self.initial_config().await?; + Ok(self.apply(&response).await?) + } + + /// Fetch the initial Remote Config payload for this tracer. + /// + /// # Errors + /// Returns an error if the HTTP request fails or the response cannot be decoded. + pub fn initial_config( + &self, + ) -> impl Future> + Send + use<'_, C> + { + let initial_request = remoteconfig::LatestConfigsRequest { + hostname: self.hostname.clone(), + current_config_snapshot_version: 0, + current_config_root_version: CONFIG_ROOT_VERSION, + current_director_root_version: DIRECTOR_ROOT_VERSION, + new_products: vec![], + active_clients: vec![], + backend_client_state: self.opaque_backend_state.clone(), + agent_version: FAKE_AGENT_VERSION.to_owned(), + products: vec![], + has_error: false, + error: String::new(), + trace_agent_env: String::new(), + tags: vec![], + agent_uuid: String::new(), + org_uuid: String::new(), + }; + self.get_latest_config(initial_request) + } + + /// Fetch the latest Remote Config for this client. + /// + /// # Errors + /// Returns an error if the HTTP request fails or the response cannot be decoded. + #[allow(clippy::future_not_send)] + pub async fn get_latest_config( + &self, + req: remoteconfig::LatestConfigsRequest, + ) -> anyhow::Result { + dbg!(&req); + let path = PathAndQuery::from_static("/api/v0.1/configurations"); + let body = Bytes::from(req.encode_to_vec()); + let res = self.send_request(Method::POST, path, body).await?; + let res = parse_rc_response(res)?; + dbg!(debug_latest_configs_response(&res)); + Ok(res) + } + + #[allow(clippy::future_not_send)] + async fn send_request( + &self, + method: Method, + path: PathAndQuery, + body: Bytes, + ) -> anyhow::Result> { + let req = self + .endpoint + .set_standard_headers( + Request::builder(), + concat!("Libdatadog/", env!("CARGO_PKG_VERSION")), + ) + .header(header::CONTENT_TYPE, "application/x-protobuf") + .uri(url_with_path(self.endpoint.url.clone(), path)?) + .method(method) + .body(body)?; + Ok(self.http.request(req).await?) + } + + async fn apply( + &mut self, + response: &remoteconfig::LatestConfigsResponse, + ) -> anyhow::Result<()> { + // At a high level, what we're doing here is populating the "remote" repos with the metadata + // that we received from upstream (which does not validate it), and then using the clients' + // `update` methods to synchronize that metadata to the "local" repos, during which + // validation is performed. + + let root_path = MetadataPath::root(); + let timestamp_path = MetadataPath::timestamp(); + let snapshot_path = MetadataPath::snapshot(); + let targets_path = MetadataPath::targets(); + + let repo = self.director_client.remote_repo_mut(); + *repo = TUFRepo::new(); + for target_file in &response.target_files { + // let trimmed_path = trim_hash_target_path(&target_file.path)?; + // let trimmed_target_path = TargetPath::new(&trimmed_path)?; + repo.store_target( + &TargetPath::new(&target_file.path)?, + &mut target_file.raw.as_slice(), + ) + .await?; + } + + let config_repo_mut = self.config_client.remote_repo_mut(); + *config_repo_mut = TUFRepo::new(); + let Some(metas) = response.config_metas.as_ref() else { + anyhow::bail!("missing config meta from LatestConfigsResponse") + }; + + store(config_repo_mut, &root_path, &metas.roots).await?; + store_noversion(config_repo_mut, ×tamp_path, &metas.timestamp).await?; + store(config_repo_mut, &snapshot_path, &metas.snapshot).await?; + store(config_repo_mut, &targets_path, &metas.top_targets).await?; + // TODO: We do not store the delegated targets metadata + // This will need to be revisited in order to support proper Uptane + // verification of the full configuration data. + // store(repo, &targets_path, &metas.delegated_targets).await?; + + let director_remote_repo = self.director_client.remote_repo_mut(); + let Some(metas) = response.director_metas.as_ref() else { + anyhow::bail!("missing director meta from LatestConfigsResponse") + }; + + store(director_remote_repo, &root_path, &metas.roots).await?; + store_noversion(director_remote_repo, ×tamp_path, &metas.timestamp).await?; + store(director_remote_repo, &snapshot_path, &metas.snapshot).await?; + store(director_remote_repo, &targets_path, &metas.targets).await?; + + self.config_client.update().await?; + self.director_client.update().await?; + + let mut new_target_path_set = HashSet::new(); + for target in trusted_targets(&self.director_client)? { + new_target_path_set.insert(target.path); + match self.fetch_target(&target).await? { + FetchTargetResult::Cached => {} + FetchTargetResult::New(cached_target) => { + self.target_cache.insert(target.path.clone(), cached_target); + } + } + } + self.target_cache + .retain(|key, _| new_target_path_set.contains(key)); + + // The Remote Config service uses a `custom` field at the top-level of the targets metadata + // to store this field which we are supposed to echo back to the server. That `custom` field + // is not explicitly part of the TUF spec, which is why we need to pull it out of the + // `additional_fields` catch-all here. + if let Some((opaque_backend_state, refresh_interval)) = + get_director_custom(&self.director_client) + { + if let Some(opaque_backend_state) = opaque_backend_state { + self.opaque_backend_state = opaque_backend_state; + } + if let Some(refresh_interval) = refresh_interval { + self.refresh_interval = refresh_interval; + } + } + + Ok(()) + } +} + +fn get_director_custom(director_client: &TUFClient) -> Option<(Option>, Option)> { + let custom = director_client + .database() + .trusted_targets()? + .additional_fields() + .get("custom")?; + + Some(( + custom + .get("opaque_backend_state") + .and_then(Value::as_str) + .and_then(|s| base64::engine::general_purpose::STANDARD.decode(s).ok()), + custom + .get("agent_refresh_interval") + .and_then(Value::as_u64) + .map(Duration::from_secs), + )) +} + +fn url_with_path(base: http::Uri, path: PathAndQuery) -> anyhow::Result { + let mut parts = base.into_parts(); + parts.path_and_query = Some(path); + Ok(http::Uri::from_parts(parts)?) +} + +fn parse_rc_response( + response: http::Response, +) -> anyhow::Result { + let status = response.status().as_u16(); + let body = response.into_body(); + if !(200..300).contains(&status) { + anyhow::bail!( + "Non 2XX status code: {}\n{}", + status, + String::from_utf8_lossy(&body) + ) + } + + Ok(T::decode(body)?) +} + +fn now_unix_milli_ts() -> u64 { + u64::try_from( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_millis(), + ) + .unwrap_or(u64::MAX) +} + +/// Return the available, unexpired target paths and their descriptions based on the current metadata. +fn trusted_targets( + director_client: &TUFClient, +) -> anyhow::Result> + '_> { + Ok(director_client + .database() + .trusted_targets() + .ok_or_else(|| anyhow::format_err!("missing targets from TUF director client"))? + .targets() + .iter() + .filter_map(|(path, desc)| { + BorrowedTarget::try_create(path, desc) + .inspect_err(|e| { + debug!(%path, "Skipping target: error {}", e); + }) + .ok() + })) +} + +async fn store<'a, T>(repo: &mut TUFRepo, path: &MetadataPath, tms: T) -> anyhow::Result<()> +where + T: IntoIterator + 'a, +{ + for tm in tms { + repo.store_metadata( + path, + MetadataVersion::Number(tm.version as u32), + &mut tm.raw.as_slice(), + ) + .await?; + } + Ok(()) +} + +async fn store_noversion( + repo: &mut TUFRepo, + path: &MetadataPath, + tms: &Option, +) -> anyhow::Result<()> { + if let Some(tm) = tms { + repo.store_metadata(path, MetadataVersion::None, &mut tm.raw.as_slice()) + .await?; + } + Ok(()) +} + +// ── Debug helpers: render `raw: Vec` fields as JSON ──────────────────── + +struct RawJson<'a>(&'a [u8]); + +impl fmt::Debug for RawJson<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let RawJson(bytes) = self; + match serde_json::from_slice::(bytes) { + Ok(v) => write!(f, "{v:#}"), + Err(_) => write!(f, "<{} non-JSON bytes>", bytes.len()), + } + } +} + +struct DebugTopMeta<'a>(&'a remoteconfig::TopMeta); + +impl fmt::Debug for DebugTopMeta<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let remoteconfig::TopMeta { version, raw } = self.0; + f.debug_struct("TopMeta") + .field("version", version) + .field("raw", &RawJson(raw)) + .finish() + } +} + +struct DebugDelegatedMeta<'a>(&'a remoteconfig::DelegatedMeta); + +impl fmt::Debug for DebugDelegatedMeta<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let remoteconfig::DelegatedMeta { version, role, raw } = self.0; + f.debug_struct("DelegatedMeta") + .field("version", version) + .field("role", role) + .field("raw", &RawJson(raw)) + .finish() + } +} + +struct DebugFile<'a>(&'a remoteconfig::File); + +impl fmt::Debug for DebugFile<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let remoteconfig::File { path, raw } = self.0; + f.debug_struct("File") + .field("path", path) + .field("raw", &RawJson(raw)) + .finish() + } +} + +struct DebugConfigMetas<'a>(&'a remoteconfig::ConfigMetas); + +impl fmt::Debug for DebugConfigMetas<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let remoteconfig::ConfigMetas { + roots, + timestamp, + snapshot, + top_targets, + delegated_targets, + } = self.0; + f.debug_struct("ConfigMetas") + .field("roots", &roots.iter().map(DebugTopMeta).collect::>()) + .field("timestamp", ×tamp.as_ref().map(DebugTopMeta)) + .field("snapshot", &snapshot.as_ref().map(DebugTopMeta)) + .field("top_targets", &top_targets.as_ref().map(DebugTopMeta)) + .field( + "delegated_targets", + &delegated_targets + .iter() + .map(DebugDelegatedMeta) + .collect::>(), + ) + .finish() + } +} + +struct DebugDirectorMetas<'a>(&'a remoteconfig::DirectorMetas); + +impl fmt::Debug for DebugDirectorMetas<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let remoteconfig::DirectorMetas { + roots, + timestamp, + snapshot, + targets, + } = &self.0; + f.debug_struct("DirectorMetas") + .field("roots", &roots.iter().map(DebugTopMeta).collect::>()) + .field("timestamp", ×tamp.as_ref().map(DebugTopMeta)) + .field("snapshot", &snapshot.as_ref().map(DebugTopMeta)) + .field("targets", &targets.as_ref().map(DebugTopMeta)) + .finish() + } +} + +struct DebugLatestConfigsResponse<'a>(&'a remoteconfig::LatestConfigsResponse); + +impl fmt::Debug for DebugLatestConfigsResponse<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let remoteconfig::LatestConfigsResponse { + config_metas, + director_metas, + target_files, + } = &self.0; + f.debug_struct("LatestConfigsResponse") + .field("config_metas", &config_metas.as_ref().map(DebugConfigMetas)) + .field( + "director_metas", + &director_metas.as_ref().map(DebugDirectorMetas), + ) + .field( + "target_files", + &target_files.iter().map(DebugFile).collect::>(), + ) + .finish() + } +} + +/// Returns a value that implements [`fmt::Debug`] for [`remoteconfig::LatestConfigsResponse`], +/// rendering every `raw` byte field as a parsed JSON value instead of a raw byte array. +/// +/// Use with the standard formatting machinery: +/// +/// ```rust,ignore +/// println!("{:#?}", debug_latest_configs_response(&response)); +/// ``` +pub fn debug_latest_configs_response( + resp: &remoteconfig::LatestConfigsResponse, +) -> impl fmt::Debug + '_ { + DebugLatestConfigsResponse(resp) +} + +#[cfg(test)] +mod tests { + use super::{CONFIG_ROOT, CONFIG_ROOT_VERSION, DIRECTOR_ROOT, DIRECTOR_ROOT_VERSION}; + + #[test] + fn test_root_version_match() { + let config_root: serde_json::Value = serde_json::from_slice(CONFIG_ROOT).unwrap(); + assert_eq!(config_root["signed"]["version"], CONFIG_ROOT_VERSION); + + let director_root: serde_json::Value = serde_json::from_slice(DIRECTOR_ROOT).unwrap(); + assert_eq!(director_root["signed"]["version"], DIRECTOR_ROOT_VERSION); + } +} diff --git a/libdd-remote-config/src/fetch/fetcher.rs b/libdd-remote-config/src/fetch/fetcher.rs index ffa55def0e..78c9260fed 100644 --- a/libdd-remote-config/src/fetch/fetcher.rs +++ b/libdd-remote-config/src/fetch/fetcher.rs @@ -1,6 +1,9 @@ // Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 +use crate::agentless_client::{ + make_agentless_configs_endpoint, AgentlessConfig, AgentlessFetcher, NativeAgentlessFetcher, +}; use crate::targets::{Root, TargetsList}; use crate::{RemoteConfigCapabilities, RemoteConfigPath, RemoteConfigProduct, Target}; use base64::Engine; @@ -50,7 +53,9 @@ pub trait FileStorage { pub struct ConfigInvariants { pub language: String, pub tracer_version: String, + pub hostname: String, pub endpoint: Endpoint, + pub agentless_enabled: bool, } struct StoredTargetFile { @@ -156,7 +161,15 @@ impl ConfigFetcherState { pub fn new(invariants: ConfigInvariants) -> Self { ConfigFetcherState { target_files_by_path: Default::default(), - endpoint: get_agent_configs_endpoint(&invariants.endpoint), + endpoint: if invariants.agentless_enabled { + if let Some(e) = make_agentless_configs_endpoint(invariants.endpoint.clone()) { + e + } else { + make_agent_configs_endpoint(&invariants.endpoint) + } + } else { + make_agent_configs_endpoint(&invariants.endpoint) + }, invariants, expire_unused_files: true, } @@ -201,9 +214,15 @@ impl ConfigFetcherState { } } +enum FetcherMode { + Agent, + Agentless(NativeAgentlessFetcher), +} + pub struct ConfigFetcher { pub file_storage: S, state: Arc>, + mode: FetcherMode, } pub struct ConfigClientState { @@ -237,11 +256,29 @@ impl ConfigClientState { } impl ConfigFetcher { - pub fn new(file_storage: S, state: Arc>) -> Self { - ConfigFetcher { + pub async fn new( + file_storage: S, + state: Arc>, + ) -> anyhow::Result { + let mode: FetcherMode = if dbg!(state.invariants.agentless_enabled) { + FetcherMode::Agentless( + AgentlessFetcher::new( + AgentlessConfig { + hostname: state.invariants.hostname.clone(), + }, + state.endpoint.clone(), + ) + .await?, + ) + } else { + FetcherMode::Agent + }; + + Ok(ConfigFetcher { file_storage, state, - } + mode, + }) } /// Sets the apply state on a stored file. @@ -317,39 +354,20 @@ impl ConfigFetcher { client_agent: None, last_seen: 0, capabilities: product_capabilities.encoded_capabilities.clone(), + is_updater: false, + client_updater: None, }), cached_target_files, } } - /// Quite generic fetching implementation: - /// - runs a request against the Remote Config Server, - /// - validates the data, - /// - removes unused files - /// - checks if the files are already known, - /// - stores new files, - /// - returns all currently active files. - /// - /// It also makes sure that old files are dropped before new files are inserted. - /// - /// Returns None if nothing changed. Otherwise Some(active configs). - pub async fn fetch_once( + async fn fetch_agent( &mut self, - runtime_id: &str, + config_req: ClientGetConfigsRequest, target: &Target, - product_capabilities: &ConfigProductCapabilities, - client_id: &str, client_state: &mut ConfigClientState, ) -> anyhow::Result>>> { - let config_req = self.build_config_request( - runtime_id, - target, - product_capabilities, - client_id, - &*client_state, - ); trace!("Submitting remote config request: {config_req:?}"); - let req = self .state .endpoint @@ -555,9 +573,164 @@ impl ConfigFetcher { client_state.last_config_paths = config_paths; Ok(Some(configs)) } + + /// Quite generic fetching implementation: + /// - runs a request against the Remote Config Server, + /// - validates the data, + /// - removes unused files + /// - checks if the files are already known, + /// - stores new files, + /// - returns all currently active files. + /// + /// It also makes sure that old files are dropped before new files are inserted. + /// + /// Returns None if nothing changed. Otherwise Some(active configs). + pub async fn fetch_once( + &mut self, + runtime_id: &str, + target: &Target, + product_capabilities: &ConfigProductCapabilities, + client_id: &str, + client_state: &mut ConfigClientState, + ) -> anyhow::Result>>> { + let config_req = self.build_config_request( + runtime_id, + target, + product_capabilities, + client_id, + &*client_state, + ); + match &mut self.mode { + FetcherMode::Agent => self.fetch_agent(config_req, target, client_state).await, + FetcherMode::Agentless(agentless_fetcher) => { + let res = agentless_fetcher + .fetch_config(config_req.client.unwrap()) + .await?; + + client_state.root_version = res.root_version; + client_state.targets_version = res.target_version; + client_state.opaque_backend_state = res.opaque_backend_state; + client_state.last_error = None; + + let mut target_files = self.state.target_files_by_path.lock_or_panic(); + let mut config_paths = HashSet::new(); + for &(path, _, _, _) in &res.targets { + match RemoteConfigPath::try_parse(path) { + Ok(parsed) => { + config_paths.insert(parsed.into()); + } + Err(e) => warn!("Failed parsing remote config path: {path} - {e:?}"), + } + } + + if self.state.expire_unused_files { + target_files.retain(|k, _| config_paths.contains(k.as_ref())); + } + + for (path, target_file, version, hashes) in res.targets { + let parsed_path = match RemoteConfigPath::try_parse(path) { + Ok(parsed_path) => parsed_path, + Err(e) => { + warn!("Failed parsing remote config path: {path} - {e:?}"); + continue; + } + }; + let Some((_, hash)) = hashes + .iter() + .find(|(h, _)| *h == &tuf::crypto::HashAlgorithm::Sha256) + .or_else(|| { + hashes + .iter() + .find(|(h, _)| *h == &tuf::crypto::HashAlgorithm::Sha512) + }) + else { + // todo no supported hash algorithm? + continue; + }; + let hash = hash.to_string(); + + let handle = if let Some(StoredTargetFile { + hash: old_hash, + handle, + .. + }) = target_files.get(&parsed_path) + { + if old_hash == &hash { + continue; + } + Some(handle.clone()) + } else { + None + }; + + let parsed_path: Arc = Arc::new(parsed_path.into()); + target_files.insert( + parsed_path.clone(), + StoredTargetFile { + hash, + state: ConfigState { + id: parsed_path.config_id.to_string(), + version, + product: parsed_path.product.to_string(), + apply_state: 2, // Acknowledged + apply_error: "".to_string(), + }, + meta: TargetFileMeta { + path: path.to_string(), + length: target_file.len() as i64, + hashes: hashes + .iter() + .map(|(algorithm, hash)| { + Ok(TargetFileHash { + algorithm: match algorithm { + tuf::crypto::HashAlgorithm::Sha256 => { + "sha256".to_string() + } + tuf::crypto::HashAlgorithm::Sha512 => { + "sha512".to_string() + } + tuf::crypto::HashAlgorithm::Unknown(u) => u.clone(), + _ => anyhow::bail!("unhandled has algorithm"), + }, + hash: hash.to_string(), + }) + }) + .collect::>()?, + }, + handle: if let Some(handle) = handle { + self.file_storage + .update(&handle, version, target_file.to_vec())?; + handle + } else { + self.file_storage.store( + version, + parsed_path, + target_file.to_vec(), + )? + }, + expiring: false, + }, + ); + } + let mut configs = Vec::with_capacity(config_paths.len()); + for config in config_paths.iter() { + if let Some(target_file) = target_files.get_mut(config) { + target_file.expiring = false; + configs.push(target_file.handle.clone()); + } else { + anyhow::bail!( + "Found {config} in client_configs response, but it isn't stored." + ); + } + } + client_state.last_config_paths = config_paths; + Ok(Some(configs)) + } + } + } } -fn get_agent_configs_endpoint(endpoint: &Endpoint) -> Endpoint { +fn make_agent_configs_endpoint(endpoint: &Endpoint) -> Endpoint { let mut parts = endpoint.url.clone().into_parts(); parts.path_and_query = Some(PathAndQuery::from_static("/v0.7/config")); #[allow(clippy::unwrap_used)] @@ -689,7 +862,9 @@ pub mod tests { let mut fetcher = ConfigFetcher::new( storage.clone(), Arc::new(ConfigFetcherState::new(server.dummy_options().invariants)), - ); + ) + .await + .unwrap(); let mut opaque_state = ConfigClientState::default(); let mut response = http_common::empty_response(Response::builder()).unwrap(); @@ -727,6 +902,8 @@ pub mod tests { language: "php".to_string(), tracer_version: "1.2.3".to_string(), endpoint: server.endpoint.clone(), + hostname: "host".to_string(), + agentless_enabled: false, }; let product_capabilities = ConfigProductCapabilities::new( vec![ @@ -739,7 +916,9 @@ pub mod tests { let mut fetcher = ConfigFetcher::new( storage.clone(), Arc::new(ConfigFetcherState::new(invariants)), - ); + ) + .await + .unwrap(); let mut opaque_state = ConfigClientState::default(); { @@ -923,7 +1102,9 @@ pub mod tests { let mut fetcher = ConfigFetcher::new( storage, Arc::new(ConfigFetcherState::new(server.dummy_options().invariants)), - ); + ) + .await + .unwrap(); let mut opaque_state = ConfigClientState::default(); // Default: nothing set, agent receives an empty list. @@ -1021,7 +1202,9 @@ pub mod tests { let mut fetcher = ConfigFetcher::new( storage, Arc::new(ConfigFetcherState::new(server.dummy_options().invariants)), - ); + ) + .await + .unwrap(); let mut opaque_state = ConfigClientState::default(); let fetched = fetcher diff --git a/libdd-remote-config/src/fetch/shared.rs b/libdd-remote-config/src/fetch/shared.rs index baf70b2c46..f618b992f8 100644 --- a/libdd-remote-config/src/fetch/shared.rs +++ b/libdd-remote-config/src/fetch/shared.rs @@ -275,7 +275,13 @@ impl SharedFetcher { S::StoredFile: RefcountedFile, { let state = storage.state.clone(); - let mut fetcher = ConfigFetcher::new(storage, state); + let mut fetcher = match ConfigFetcher::new(storage, state).await { + Ok(f) => f, + Err(e) => { + error!("failed to create the fetcher{:?}", e); + return; + } + }; let mut opaque_state = ConfigClientState::default(); diff --git a/libdd-remote-config/src/fetch/single.rs b/libdd-remote-config/src/fetch/single.rs index 3d48c69183..f5baf16908 100644 --- a/libdd-remote-config/src/fetch/single.rs +++ b/libdd-remote-config/src/fetch/single.rs @@ -16,7 +16,7 @@ pub struct SingleFetcher { product_capabilities: ConfigProductCapabilities, runtime_id: String, client_id: String, - opaque_state: ConfigClientState, + client_state: ConfigClientState, } #[derive(Clone, Debug)] @@ -27,12 +27,18 @@ pub struct ConfigOptions { } impl SingleFetcher { - pub fn new(sink: S, target: Target, runtime_id: String, options: ConfigOptions) -> Self { - SingleFetcher { + pub async fn new( + sink: S, + target: Target, + runtime_id: String, + options: ConfigOptions, + ) -> anyhow::Result { + Ok(SingleFetcher { fetcher: ConfigFetcher::new( sink, Arc::new(ConfigFetcherState::new(options.invariants)), - ), + ) + .await?, target: Arc::new(target), product_capabilities: ConfigProductCapabilities::new( options.products, @@ -40,8 +46,8 @@ impl SingleFetcher { ), runtime_id, client_id: uuid::Uuid::new_v4().to_string(), - opaque_state: ConfigClientState::default(), - } + client_state: ConfigClientState::default(), + }) } pub fn with_client_id(mut self, client_id: String) -> Self { @@ -57,7 +63,7 @@ impl SingleFetcher { &self.target, &self.product_capabilities, self.client_id.as_str(), - &mut self.opaque_state, + &mut self.client_state, ) .await } @@ -75,7 +81,7 @@ impl SingleFetcher { /// Sent to the agent on each subsequent poll so it can route configs targeting those /// services to this client. Replace-semantics: the new vec fully overrides the previous one. pub fn set_extra_services(&mut self, services: Vec) { - self.opaque_state.set_extra_services(services); + self.client_state.set_extra_services(services); } } @@ -91,11 +97,16 @@ impl SingleChangesFetcher where S::StoredFile: FilePath, { - pub fn new(sink: S, target: Target, runtime_id: String, options: ConfigOptions) -> Self { - SingleChangesFetcher { + pub async fn new( + sink: S, + target: Target, + runtime_id: String, + options: ConfigOptions, + ) -> anyhow::Result { + Ok(SingleChangesFetcher { changes: ChangeTracker::default(), - fetcher: SingleFetcher::new(sink, target, runtime_id, options), - } + fetcher: SingleFetcher::new(sink, target, runtime_id, options).await?, + }) } pub fn with_client_id(mut self, client_id: String) -> Self { diff --git a/libdd-remote-config/src/fetch/test_server.rs b/libdd-remote-config/src/fetch/test_server.rs index 1ed16ad0f3..ad25972b51 100644 --- a/libdd-remote-config/src/fetch/test_server.rs +++ b/libdd-remote-config/src/fetch/test_server.rs @@ -155,6 +155,7 @@ impl RemoteConfigServer { }) .collect(), client_configs: applied_files.keys().map(|k| k.to_string()).collect(), + config_status: 0, }; Response::new(http_common::Body::from( serde_json::to_vec(&response).unwrap(), @@ -215,6 +216,8 @@ impl RemoteConfigServer { language: "php".to_string(), tracer_version: "1.2.3".to_string(), endpoint: self.endpoint.clone(), + hostname: "localhost".to_string(), + agentless_enabled: false, }, products: vec![ RemoteConfigProduct::ApmTracing, diff --git a/libdd-remote-config/src/lib.rs b/libdd-remote-config/src/lib.rs index 462362d160..01ca7c4fc7 100644 --- a/libdd-remote-config/src/lib.rs +++ b/libdd-remote-config/src/lib.rs @@ -7,6 +7,8 @@ #![cfg_attr(not(test), deny(clippy::todo))] #![cfg_attr(not(test), deny(clippy::unimplemented))] +pub mod agentless_client; + pub mod config; #[cfg(feature = "client")] pub mod fetch; diff --git a/libdd-trace-protobuf/build.rs b/libdd-trace-protobuf/build.rs index c9c891a681..06714040f1 100644 --- a/libdd-trace-protobuf/build.rs +++ b/libdd-trace-protobuf/build.rs @@ -308,6 +308,12 @@ fn generate_protobuf() { "#[serde(default)]", ); + config.type_attribute("ClientUpdater", "#[derive(Deserialize, Serialize)]"); + config.type_attribute("PackageState", "#[derive(Deserialize, Serialize)]"); + config.type_attribute("PackageStateTask", "#[derive(Deserialize, Serialize)]"); + config.type_attribute("TaskError", "#[derive(Deserialize, Serialize)]"); + + config.include_file("_includes.rs"); config diff --git a/libdd-trace-protobuf/src/pb/remoteconfig.proto b/libdd-trace-protobuf/src/pb/remoteconfig.proto index 606bc851e7..cae7be854e 100644 --- a/libdd-trace-protobuf/src/pb/remoteconfig.proto +++ b/libdd-trace-protobuf/src/pb/remoteconfig.proto @@ -6,11 +6,73 @@ option go_package = "pkg/proto/pbgo/core"; // golang // Backend definitions +message ConfigMetas { + repeated TopMeta roots = 1; + TopMeta timestamp = 2; + TopMeta snapshot = 3; + TopMeta topTargets = 4; + repeated DelegatedMeta delegatedTargets = 5; +} + +message DirectorMetas { + repeated TopMeta roots = 1; + TopMeta timestamp = 2; + TopMeta snapshot = 3; + TopMeta targets = 4; +} + +message DelegatedMeta { + uint64 version = 1; + string role = 2; + bytes raw = 3; +} + +message TopMeta { + uint64 version = 1; + bytes raw = 2; +} + message File { string path = 1; bytes raw = 2; } +// Backend queries + +message LatestConfigsRequest { + string hostname = 1; + string agentVersion = 2; + // timestamp and snapshot versions move in tandem so they are the same. + uint64 current_config_snapshot_version = 3; + uint64 current_config_root_version = 9; + uint64 current_director_root_version = 8; + repeated string products = 4; + repeated string new_products = 5; + repeated Client active_clients = 6; + bytes backend_client_state = 10; + bool has_error = 11; + string error = 12; + string trace_agent_env = 13; + string org_uuid = 14; + repeated string tags = 15; + string agent_uuid = 16; +} + +message LatestConfigsResponse { + ConfigMetas config_metas = 1; + DirectorMetas director_metas = 2; + repeated File target_files = 3; +} + +message OrgDataResponse { + string uuid = 1; +} + +message OrgStatusResponse { + bool enabled = 1; + bool authorized = 2; +} + // Client definitions message Client { @@ -24,6 +86,9 @@ message Client { ClientAgent client_agent = 9; uint64 last_seen = 10; bytes capabilities = 11; + reserved 12, 13; + bool is_updater = 14; + ClientUpdater client_updater = 15; } message ClientTracer { @@ -47,6 +112,46 @@ message ClientAgent { repeated string cws_workloads = 5; } +message ClientUpdater { + repeated string tags = 1; + repeated PackageState packages = 2; + uint64 available_disk_space = 3; + string secrets_pub_key = 4; +} + +message PackageState { + string package = 1; + string stable_version = 2; + string experiment_version = 3; + PackageStateTask task = 4; + reserved 5, 6, 7, 8, 9, 10; + string stable_config_version = 11; + string experiment_config_version = 12; + string running_version = 13; + string running_config_version = 14; + uint64 heartbeat_timestamp = 15; + float completion = 16; +} + +message PackageStateTask { + string id = 1; + TaskState state = 2; + TaskError error = 3; +} + +enum TaskState { + IDLE = 0; + RUNNING = 1; + DONE = 2; + INVALID_STATE = 3; + ERROR = 4; +} + +message TaskError { + uint64 code = 1; + string message = 2; +} + message ConfigState { string id = 1; uint64 version = 2; @@ -78,14 +183,119 @@ message TargetFileMeta { repeated TargetFileHash hashes = 3; } +// ConfigSubscriptionProducts is used to targets specific products for tracking +// with a ConfigSubscriptionRequest. +enum ConfigSubscriptionProducts { + INVALID = 0; + + // LIVE_DEBUGGING corresponds to the LIVE_DEBUGING and LIVE_DEBUGGING_SYMBOLDB + // products. + LIVE_DEBUGGING = 1; +} + +// ConfigSubscriptionRequest is used to manage the state of the stream created +// using CreateConfigSubscription. +message ConfigSubscriptionRequest { + + enum Action { + INVALID = 0; + TRACK = 1; + UNTRACK = 2; + } + + + // RuntimeID of the client to track or untrack. + string runtime_id = 1; + + // Action indicates the action to take for the client with the given + // runtime_id. + Action action = 2; + + // If action is TRACK, products indicates the set of products for which the + // client is interested in receiving updates. + ConfigSubscriptionProducts products = 3; +} + +// ConfigSubscriptionResponse is streamed from CreateConfigSubscription with +// updates for matching clients and products. +message ConfigSubscriptionResponse { + + // Client is the client that was tracked or untracked. + Client client = 1; + + // Matched configs are all configs that were matched for the client given + // the subscription request. + // + // If a previously reported config is no longer matched, it will not be + // included in the response. + repeated string matched_configs = 2; + + // Target files are the target files that needs to be sent to the client. + repeated File target_files = 3; +} + message ClientGetConfigsRequest { Client client = 1; repeated TargetFileMeta cached_target_files = 2; } +enum ConfigStatus { + CONFIG_STATUS_OK = 0; + CONFIG_STATUS_EXPIRED = 1; +} + message ClientGetConfigsResponse { repeated bytes roots = 1; bytes targets = 2; repeated File target_files = 3; repeated string client_configs = 4; + ConfigStatus config_status = 5; +} + +// Full state + +message FileMetaState { + uint64 version = 1; + string hash = 2; +} + +message GetStateConfigResponse { + map config_state = 1; + map director_state = 2; + map target_filenames = 3; + repeated Client active_clients = 4; + repeated ConfigSubscriptionState config_subscription_states = 5; +} + +// ConfigSubscriptionState describes the state of a config subscription. +message ConfigSubscriptionState { + message TrackedClient { + string runtime_id = 1; + bool seen_any = 2; + ConfigSubscriptionProducts products = 3; + } + + // SubscriptionID is a process-unique identifier for the subscription. + uint64 subscription_id = 1; + + // TrackedClients is the list of clients that are currently tracked by the + // subscription. + repeated TrackedClient tracked_clients = 2; +} + +message ResetStateConfigResponse {} + + +message TracerPredicateV1 { + string clientID = 1; + string service = 2; + string environment = 3; + string appVersion = 4; + string tracerVersion = 5; + string language = 6; + string runtimeID = 7; +} + +message TracerPredicates { + repeated TracerPredicateV1 tracer_predicates_v1 = 1; } diff --git a/libdd-trace-protobuf/src/remoteconfig.rs b/libdd-trace-protobuf/src/remoteconfig.rs index 9e0a62494a..894da87077 100644 --- a/libdd-trace-protobuf/src/remoteconfig.rs +++ b/libdd-trace-protobuf/src/remoteconfig.rs @@ -3,6 +3,46 @@ use serde::{Deserialize, Serialize}; // This file is @generated by prost-build. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ConfigMetas { + #[prost(message, repeated, tag = "1")] + pub roots: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub timestamp: ::core::option::Option, + #[prost(message, optional, tag = "3")] + pub snapshot: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub top_targets: ::core::option::Option, + #[prost(message, repeated, tag = "5")] + pub delegated_targets: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct DirectorMetas { + #[prost(message, repeated, tag = "1")] + pub roots: ::prost::alloc::vec::Vec, + #[prost(message, optional, tag = "2")] + pub timestamp: ::core::option::Option, + #[prost(message, optional, tag = "3")] + pub snapshot: ::core::option::Option, + #[prost(message, optional, tag = "4")] + pub targets: ::core::option::Option, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct DelegatedMeta { + #[prost(uint64, tag = "1")] + pub version: u64, + #[prost(string, tag = "2")] + pub role: ::prost::alloc::string::String, + #[prost(bytes = "vec", tag = "3")] + pub raw: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct TopMeta { + #[prost(uint64, tag = "1")] + pub version: u64, + #[prost(bytes = "vec", tag = "2")] + pub raw: ::prost::alloc::vec::Vec, +} #[derive(Deserialize, Serialize)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct File { @@ -12,6 +52,61 @@ pub struct File { #[serde(with = "serde_bytes")] pub raw: ::prost::alloc::vec::Vec, } +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LatestConfigsRequest { + #[prost(string, tag = "1")] + pub hostname: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub agent_version: ::prost::alloc::string::String, + /// timestamp and snapshot versions move in tandem so they are the same. + #[prost(uint64, tag = "3")] + pub current_config_snapshot_version: u64, + #[prost(uint64, tag = "9")] + pub current_config_root_version: u64, + #[prost(uint64, tag = "8")] + pub current_director_root_version: u64, + #[prost(string, repeated, tag = "4")] + pub products: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, repeated, tag = "5")] + pub new_products: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(message, repeated, tag = "6")] + pub active_clients: ::prost::alloc::vec::Vec, + #[prost(bytes = "vec", tag = "10")] + pub backend_client_state: ::prost::alloc::vec::Vec, + #[prost(bool, tag = "11")] + pub has_error: bool, + #[prost(string, tag = "12")] + pub error: ::prost::alloc::string::String, + #[prost(string, tag = "13")] + pub trace_agent_env: ::prost::alloc::string::String, + #[prost(string, tag = "14")] + pub org_uuid: ::prost::alloc::string::String, + #[prost(string, repeated, tag = "15")] + pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(string, tag = "16")] + pub agent_uuid: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct LatestConfigsResponse { + #[prost(message, optional, tag = "1")] + pub config_metas: ::core::option::Option, + #[prost(message, optional, tag = "2")] + pub director_metas: ::core::option::Option, + #[prost(message, repeated, tag = "3")] + pub target_files: ::prost::alloc::vec::Vec, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct OrgDataResponse { + #[prost(string, tag = "1")] + pub uuid: ::prost::alloc::string::String, +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct OrgStatusResponse { + #[prost(bool, tag = "1")] + pub enabled: bool, + #[prost(bool, tag = "2")] + pub authorized: bool, +} #[derive(Deserialize, Serialize)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Client { @@ -34,6 +129,10 @@ pub struct Client { pub last_seen: u64, #[prost(bytes = "vec", tag = "11")] pub capabilities: ::prost::alloc::vec::Vec, + #[prost(bool, tag = "14")] + pub is_updater: bool, + #[prost(message, optional, tag = "15")] + pub client_updater: ::core::option::Option, } #[derive(Deserialize, Serialize)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] @@ -74,6 +173,60 @@ pub struct ClientAgent { pub cws_workloads: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, } #[derive(Deserialize, Serialize)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ClientUpdater { + #[prost(string, repeated, tag = "1")] + pub tags: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(message, repeated, tag = "2")] + pub packages: ::prost::alloc::vec::Vec, + #[prost(uint64, tag = "3")] + pub available_disk_space: u64, + #[prost(string, tag = "4")] + pub secrets_pub_key: ::prost::alloc::string::String, +} +#[derive(Deserialize, Serialize)] +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct PackageState { + #[prost(string, tag = "1")] + pub package: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub stable_version: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub experiment_version: ::prost::alloc::string::String, + #[prost(message, optional, tag = "4")] + pub task: ::core::option::Option, + #[prost(string, tag = "11")] + pub stable_config_version: ::prost::alloc::string::String, + #[prost(string, tag = "12")] + pub experiment_config_version: ::prost::alloc::string::String, + #[prost(string, tag = "13")] + pub running_version: ::prost::alloc::string::String, + #[prost(string, tag = "14")] + pub running_config_version: ::prost::alloc::string::String, + #[prost(uint64, tag = "15")] + pub heartbeat_timestamp: u64, + #[prost(float, tag = "16")] + pub completion: f32, +} +#[derive(Deserialize, Serialize)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct PackageStateTask { + #[prost(string, tag = "1")] + pub id: ::prost::alloc::string::String, + #[prost(enumeration = "TaskState", tag = "2")] + pub state: i32, + #[prost(message, optional, tag = "3")] + pub error: ::core::option::Option, +} +#[derive(Deserialize, Serialize)] +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct TaskError { + #[prost(uint64, tag = "1")] + pub code: u64, + #[prost(string, tag = "2")] + pub message: ::prost::alloc::string::String, +} +#[derive(Deserialize, Serialize)] #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] pub struct ConfigState { #[prost(string, tag = "1")] @@ -121,6 +274,82 @@ pub struct TargetFileMeta { #[prost(message, repeated, tag = "3")] pub hashes: ::prost::alloc::vec::Vec, } +/// ConfigSubscriptionRequest is used to manage the state of the stream created +/// using CreateConfigSubscription. +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ConfigSubscriptionRequest { + /// RuntimeID of the client to track or untrack. + #[prost(string, tag = "1")] + pub runtime_id: ::prost::alloc::string::String, + /// Action indicates the action to take for the client with the given + /// runtime_id. + #[prost(enumeration = "config_subscription_request::Action", tag = "2")] + pub action: i32, + /// If action is TRACK, products indicates the set of products for which the + /// client is interested in receiving updates. + #[prost(enumeration = "ConfigSubscriptionProducts", tag = "3")] + pub products: i32, +} +/// Nested message and enum types in `ConfigSubscriptionRequest`. +pub mod config_subscription_request { + #[derive( + Clone, + Copy, + Debug, + PartialEq, + Eq, + Hash, + PartialOrd, + Ord, + ::prost::Enumeration + )] + #[repr(i32)] + pub enum Action { + Invalid = 0, + Track = 1, + Untrack = 2, + } + impl Action { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Invalid => "INVALID", + Self::Track => "TRACK", + Self::Untrack => "UNTRACK", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "INVALID" => Some(Self::Invalid), + "TRACK" => Some(Self::Track), + "UNTRACK" => Some(Self::Untrack), + _ => None, + } + } + } +} +/// ConfigSubscriptionResponse is streamed from CreateConfigSubscription with +/// updates for matching clients and products. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ConfigSubscriptionResponse { + /// Client is the client that was tracked or untracked. + #[prost(message, optional, tag = "1")] + pub client: ::core::option::Option, + /// Matched configs are all configs that were matched for the client given + /// the subscription request. + /// + /// If a previously reported config is no longer matched, it will not be + /// included in the response. + #[prost(string, repeated, tag = "2")] + pub matched_configs: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + /// Target files are the target files that needs to be sent to the client. + #[prost(message, repeated, tag = "3")] + pub target_files: ::prost::alloc::vec::Vec, +} #[derive(Deserialize, Serialize)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct ClientGetConfigsRequest { @@ -146,4 +375,175 @@ pub struct ClientGetConfigsResponse { #[prost(string, repeated, tag = "4")] #[serde(default)] pub client_configs: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, + #[prost(enumeration = "ConfigStatus", tag = "5")] + pub config_status: i32, +} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct FileMetaState { + #[prost(uint64, tag = "1")] + pub version: u64, + #[prost(string, tag = "2")] + pub hash: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct GetStateConfigResponse { + #[prost(map = "string, message", tag = "1")] + pub config_state: ::std::collections::HashMap< + ::prost::alloc::string::String, + FileMetaState, + >, + #[prost(map = "string, message", tag = "2")] + pub director_state: ::std::collections::HashMap< + ::prost::alloc::string::String, + FileMetaState, + >, + #[prost(map = "string, string", tag = "3")] + pub target_filenames: ::std::collections::HashMap< + ::prost::alloc::string::String, + ::prost::alloc::string::String, + >, + #[prost(message, repeated, tag = "4")] + pub active_clients: ::prost::alloc::vec::Vec, + #[prost(message, repeated, tag = "5")] + pub config_subscription_states: ::prost::alloc::vec::Vec, +} +/// ConfigSubscriptionState describes the state of a config subscription. +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct ConfigSubscriptionState { + /// SubscriptionID is a process-unique identifier for the subscription. + #[prost(uint64, tag = "1")] + pub subscription_id: u64, + /// TrackedClients is the list of clients that are currently tracked by the + /// subscription. + #[prost(message, repeated, tag = "2")] + pub tracked_clients: ::prost::alloc::vec::Vec< + config_subscription_state::TrackedClient, + >, +} +/// Nested message and enum types in `ConfigSubscriptionState`. +pub mod config_subscription_state { + #[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] + pub struct TrackedClient { + #[prost(string, tag = "1")] + pub runtime_id: ::prost::alloc::string::String, + #[prost(bool, tag = "2")] + pub seen_any: bool, + #[prost(enumeration = "super::ConfigSubscriptionProducts", tag = "3")] + pub products: i32, + } +} +#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Message)] +pub struct ResetStateConfigResponse {} +#[derive(Clone, PartialEq, Eq, Hash, ::prost::Message)] +pub struct TracerPredicateV1 { + #[prost(string, tag = "1")] + pub client_id: ::prost::alloc::string::String, + #[prost(string, tag = "2")] + pub service: ::prost::alloc::string::String, + #[prost(string, tag = "3")] + pub environment: ::prost::alloc::string::String, + #[prost(string, tag = "4")] + pub app_version: ::prost::alloc::string::String, + #[prost(string, tag = "5")] + pub tracer_version: ::prost::alloc::string::String, + #[prost(string, tag = "6")] + pub language: ::prost::alloc::string::String, + #[prost(string, tag = "7")] + pub runtime_id: ::prost::alloc::string::String, +} +#[derive(Clone, PartialEq, ::prost::Message)] +pub struct TracerPredicates { + #[prost(message, repeated, tag = "1")] + pub tracer_predicates_v1: ::prost::alloc::vec::Vec, +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum TaskState { + Idle = 0, + Running = 1, + Done = 2, + InvalidState = 3, + Error = 4, +} +impl TaskState { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Idle => "IDLE", + Self::Running => "RUNNING", + Self::Done => "DONE", + Self::InvalidState => "INVALID_STATE", + Self::Error => "ERROR", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "IDLE" => Some(Self::Idle), + "RUNNING" => Some(Self::Running), + "DONE" => Some(Self::Done), + "INVALID_STATE" => Some(Self::InvalidState), + "ERROR" => Some(Self::Error), + _ => None, + } + } +} +/// ConfigSubscriptionProducts is used to targets specific products for tracking +/// with a ConfigSubscriptionRequest. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ConfigSubscriptionProducts { + Invalid = 0, + /// LIVE_DEBUGGING corresponds to the LIVE_DEBUGING and LIVE_DEBUGGING_SYMBOLDB + /// products. + LiveDebugging = 1, +} +impl ConfigSubscriptionProducts { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Invalid => "INVALID", + Self::LiveDebugging => "LIVE_DEBUGGING", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "INVALID" => Some(Self::Invalid), + "LIVE_DEBUGGING" => Some(Self::LiveDebugging), + _ => None, + } + } +} +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration)] +#[repr(i32)] +pub enum ConfigStatus { + Ok = 0, + Expired = 1, +} +impl ConfigStatus { + /// String value of the enum field names used in the ProtoBuf definition. + /// + /// The values are not transformed in any way and thus are considered stable + /// (if the ProtoBuf definition does not change) and safe for programmatic use. + pub fn as_str_name(&self) -> &'static str { + match self { + Self::Ok => "CONFIG_STATUS_OK", + Self::Expired => "CONFIG_STATUS_EXPIRED", + } + } + /// Creates an enum from field names used in the ProtoBuf definition. + pub fn from_str_name(value: &str) -> ::core::option::Option { + match value { + "CONFIG_STATUS_OK" => Some(Self::Ok), + "CONFIG_STATUS_EXPIRED" => Some(Self::Expired), + _ => None, + } + } }