Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
5190ab4
docs(otlp): design spec for OTLP HTTP/protobuf trace export
bm1549 Jun 12, 2026
946f116
docs(otlp): implementation plan for OTLP HTTP/protobuf trace export
bm1549 Jun 12, 2026
07c7296
feat(trace-protobuf): vendor + generate OTLP trace/collector prost types
bm1549 Jun 12, 2026
6f385ba
feat(trace-utils): add serde->prost OTLP converter
bm1549 Jun 12, 2026
a52e30b
refactor(trace-utils): clarify OTLP converter + add fallback/status t…
bm1549 Jun 12, 2026
4a4846a
feat(trace-utils): add encode_otlp_json/encode_otlp_protobuf
bm1549 Jun 12, 2026
54d8856
feat(data-pipeline): make OtlpProtocol public with FromStr
bm1549 Jun 12, 2026
772be3e
feat(data-pipeline): set OTLP content-type from protocol
bm1549 Jun 12, 2026
46d72a7
feat(data-pipeline): add TraceExporterBuilder::set_otlp_protocol
bm1549 Jun 12, 2026
8f3c38e
feat(data-pipeline): dispatch OTLP encoder by protocol + protobuf test
bm1549 Jun 12, 2026
2633427
refactor(data-pipeline): narrow otlp pub surface + exhaustive content…
bm1549 Jun 12, 2026
e493d8d
feat(data-pipeline-ffi): add ddog_trace_exporter_config_set_otlp_prot…
bm1549 Jun 12, 2026
4a81412
test(data-pipeline-ffi): cover set_otlp_protocol + clarify contract
bm1549 Jun 12, 2026
3091b57
fix(trace-protobuf): disable comments on vendored OTLP trace protos t…
bm1549 Jun 12, 2026
664f16f
docs(data-pipeline): clarify parse_u64/kind fallbacks and unreachable…
bm1549 Jun 12, 2026
9bc5cf2
chore: drop in-repo planning docs (moved to chonk)
bm1549 Jun 12, 2026
a8a305f
fix(data-pipeline): reject unsupported OTLP gRPC at build time
bm1549 Jun 12, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

102 changes: 102 additions & 0 deletions libdd-data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ pub struct TraceExporterConfig {
connection_timeout: Option<u64>,
shared_runtime: Option<Arc<SharedRuntime>>,
otlp_endpoint: Option<String>,
otlp_protocol: Option<String>,
}

#[no_mangle]
Expand Down Expand Up @@ -498,6 +499,37 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint(
)
}

/// Sets the OTLP export protocol. Accepts the OTel-standard values `http/json` (default) or
/// `http/protobuf`; `grpc` is rejected as not yet supported. The host language resolves the value
/// (e.g. from `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL`).
///
/// Returns `None` on success, `ErrorCode::InvalidArgument` for a null config or an unaccepted
/// value, and `ErrorCode::InvalidInput` for a non-UTF-8 string.
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_protocol(
config: Option<&mut TraceExporterConfig>,
protocol: CharSlice,
) -> Option<Box<ExporterError>> {
catch_panic!(
if let Some(handle) = config {
let value = match sanitize_string(protocol) {
Ok(s) => s,
Err(e) => return Some(e),
};
match value.as_str() {
"http/json" | "http/protobuf" => {
handle.otlp_protocol = Some(value);
None
}
_ => gen_error!(ErrorCode::InvalidArgument),
}
} else {
gen_error!(ErrorCode::InvalidArgument)
},
gen_error!(ErrorCode::Panic)
)
}

/// Create a new TraceExporter instance.
///
/// When an OTLP endpoint is configured via `TraceExporterConfig`, the exporter sends traces in
Expand Down Expand Up @@ -565,6 +597,13 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(

if let Some(ref url) = config.otlp_endpoint {
builder.set_otlp_endpoint(url);
if let Some(ref proto) = config.otlp_protocol {
// The FFI setter only stores "http/json"/"http/protobuf", so this parse always
// succeeds here; a parse failure just leaves the builder's default protocol.
if let Ok(p) = proto.parse::<libdd_data_pipeline::otlp::OtlpProtocol>() {
builder.set_otlp_protocol(p);
}
}
}

match builder.build() {
Expand Down Expand Up @@ -1283,6 +1322,69 @@ mod tests {
}
}

#[test]
fn config_otlp_protocol_test() {
unsafe {
// Null config → InvalidArgument
let error =
ddog_trace_exporter_config_set_otlp_protocol(None, CharSlice::from("http/json"));
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument);
ddog_trace_exporter_error_free(error);

// "http/json" → success, stored
let mut config = Some(TraceExporterConfig::default());
let error = ddog_trace_exporter_config_set_otlp_protocol(
config.as_mut(),
CharSlice::from("http/json"),
);
assert_eq!(error, None);
assert_eq!(
config.as_ref().unwrap().otlp_protocol.as_deref(),
Some("http/json")
);

// "http/protobuf" → success, stored
let mut config = Some(TraceExporterConfig::default());
let error = ddog_trace_exporter_config_set_otlp_protocol(
config.as_mut(),
CharSlice::from("http/protobuf"),
);
assert_eq!(error, None);
assert_eq!(
config.as_ref().unwrap().otlp_protocol.as_deref(),
Some("http/protobuf")
);

// "grpc" → InvalidArgument
let mut config = Some(TraceExporterConfig::default());
let error = ddog_trace_exporter_config_set_otlp_protocol(
config.as_mut(),
CharSlice::from("grpc"),
);
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument);
ddog_trace_exporter_error_free(error);

// Garbage value → InvalidArgument
let mut config = Some(TraceExporterConfig::default());
let error = ddog_trace_exporter_config_set_otlp_protocol(
config.as_mut(),
CharSlice::from("nonsense"),
);
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument);
ddog_trace_exporter_error_free(error);

// Non-UTF-8 input → InvalidInput
let mut config = Some(TraceExporterConfig::default());
let invalid: [u8; 2] = [0x80u8, 0xFFu8];
let error = ddog_trace_exporter_config_set_otlp_protocol(
config.as_mut(),
CharSlice::from_bytes(&invalid),
);
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidInput);
ddog_trace_exporter_error_free(error);
}
}

#[cfg(all(feature = "catch_panic", panic = "unwind"))]
#[test]
fn catch_panic_test() {
Expand Down
1 change: 1 addition & 0 deletions libdd-data-pipeline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ libdd-trace-utils = { path = "../libdd-trace-utils", features = [
"test-utils",
] }
httpmock = "0.8.0-alpha.1"
prost = "0.14.1"
rand = "0.8.5"
tempfile = "3.3.0"
tokio = { version = "1.23", features = [
Expand Down
2 changes: 1 addition & 1 deletion libdd-data-pipeline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
pub mod agent_info;
mod health_metrics;
pub(crate) mod otlp;
pub mod otlp;
#[cfg(feature = "telemetry")]
pub(crate) mod telemetry;
#[cfg(not(target_arch = "wasm32"))]
Expand Down
47 changes: 38 additions & 9 deletions libdd-data-pipeline/src/otlp/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,31 @@
use http::HeaderMap;
use std::time::Duration;

/// OTLP trace export protocol. HTTP/JSON is currently supported.
/// OTLP trace export protocol.
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
pub(crate) enum OtlpProtocol {
pub enum OtlpProtocol {
/// HTTP with JSON body (Content-Type: application/json). Default for HTTP.
#[default]
HttpJson,
/// HTTP with protobuf body. (Not supported yet)
#[allow(dead_code)]
/// HTTP with protobuf body (Content-Type: application/x-protobuf).
HttpProtobuf,
/// gRPC. (Not supported yet)
#[allow(dead_code)]
/// gRPC. Parsed by `FromStr` so callers get a clean error, but rejected at export time
/// (unsupported).
Grpc,
}

impl std::str::FromStr for OtlpProtocol {
type Err = String;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"http/json" => Ok(OtlpProtocol::HttpJson),
"http/protobuf" => Ok(OtlpProtocol::HttpProtobuf),
"grpc" => Ok(OtlpProtocol::Grpc),
other => Err(format!("unknown OTLP protocol: {other}")),
}
}
}

/// Default timeout for OTLP export requests.
pub const DEFAULT_OTLP_TIMEOUT: Duration = Duration::from_secs(10);

Expand All @@ -32,7 +43,25 @@ pub struct OtlpTraceConfig {
pub headers: HeaderMap,
/// Request timeout.
pub timeout: Duration,
/// Protocol (for future use; currently only HttpJson is supported).
#[allow(dead_code)]
pub(crate) protocol: OtlpProtocol,
/// OTLP export protocol (selects body encoding and content-type).
pub protocol: OtlpProtocol,
}

#[cfg(test)]
mod tests {
use super::*;
use std::str::FromStr;
#[test]
fn protocol_from_str() {
assert_eq!(
OtlpProtocol::from_str("http/json").unwrap(),
OtlpProtocol::HttpJson
);
assert_eq!(
OtlpProtocol::from_str("http/protobuf").unwrap(),
OtlpProtocol::HttpProtobuf
);
assert_eq!(OtlpProtocol::from_str("grpc").unwrap(), OtlpProtocol::Grpc);
assert!(OtlpProtocol::from_str("nonsense").is_err());
}
}
24 changes: 17 additions & 7 deletions libdd-data-pipeline/src/otlp/exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ const OTLP_MAX_RETRIES: u32 = 4;
/// Initial backoff between retries (milliseconds).
const OTLP_RETRY_DELAY_MS: u64 = 100;

/// Send OTLP trace payload (JSON bytes) to the configured endpoint with retries.
/// Send an OTLP trace payload to the configured endpoint with retries.
///
/// The body encoding and `Content-Type` are selected from `config.protocol`.
///
/// Uses [`send_with_retry`] for consistent retry behaviour and observability across exporters.
///
Expand All @@ -26,7 +28,7 @@ pub async fn send_otlp_traces_http<C: HttpClientCapability + SleepCapability>(
capabilities: &C,
config: &OtlpTraceConfig,
test_token: Option<&str>,
json_body: Vec<u8>,
body: Vec<u8>,
) -> Result<(), TraceExporterError> {
let url = libdd_common::parse_uri(&config.endpoint_url).map_err(|e| {
TraceExporterError::Internal(InternalErrorKind::InvalidWorkerState(format!(
Expand All @@ -41,11 +43,19 @@ pub async fn send_otlp_traces_http<C: HttpClientCapability + SleepCapability>(
..Endpoint::default()
};

// `Grpc` is rejected earlier in `send_otlp_traces_inner` and never reaches this function, so it
// is grouped with the JSON content-type here only to keep the match exhaustive.
let content_type = match config.protocol {
crate::otlp::config::OtlpProtocol::HttpProtobuf => {
libdd_common::header::APPLICATION_PROTOBUF
}
crate::otlp::config::OtlpProtocol::HttpJson | crate::otlp::config::OtlpProtocol::Grpc => {
libdd_common::header::APPLICATION_JSON
}
};

let mut headers = config.headers.clone();
headers.insert(
http::header::CONTENT_TYPE,
libdd_common::header::APPLICATION_JSON,
);
headers.insert(http::header::CONTENT_TYPE, content_type);
if let Some(token) = test_token {
if let Ok(val) = http::HeaderValue::from_str(token) {
headers.insert(
Expand All @@ -62,7 +72,7 @@ pub async fn send_otlp_traces_http<C: HttpClientCapability + SleepCapability>(
None,
);

match send_with_retry(capabilities, &target, json_body, &headers, &retry_strategy).await {
match send_with_retry(capabilities, &target, body, &headers, &retry_strategy).await {
Ok(_) => Ok(()),
Err(e) => Err(map_send_error(e).await),
}
Expand Down
13 changes: 7 additions & 6 deletions libdd-data-pipeline/src/otlp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
//!
//! When an OTLP endpoint is configured via
//! [`crate::trace_exporter::TraceExporterBuilder::set_otlp_endpoint`], the trace exporter sends
//! traces in OTLP HTTP/JSON format to that endpoint instead of the Datadog agent. The host language
//! is responsible for resolving the endpoint from its own configuration (e.g.
//! traces in OTLP HTTP format to that endpoint instead of the Datadog agent; the wire encoding
//! (JSON or protobuf) is selected via [`OtlpProtocol`]. The host language is responsible for
//! resolving the endpoint from its own configuration (e.g.
//! `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`).
//!
//! ## Sampling
Expand All @@ -22,9 +23,9 @@
//! spans from a local trace are closed (i.e. send complete trace chunks). This crate does not
//! buffer or flush partially—it exports whatever trace chunks it receives.

pub mod config;
pub mod exporter;
pub(crate) mod config;
pub(crate) mod exporter;

pub use config::OtlpTraceConfig;
pub use exporter::send_otlp_traces_http;
pub use config::{OtlpProtocol, OtlpTraceConfig};
pub(crate) use exporter::send_otlp_traces_http;
pub use libdd_trace_utils::otlp_encoder::{map_traces_to_otlp, OtlpResourceInfo};
41 changes: 40 additions & 1 deletion libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ pub struct TraceExporterBuilder {
connection_timeout: Option<u64>,
otlp_endpoint: Option<String>,
otlp_headers: Vec<(String, String)>,
otlp_protocol: OtlpProtocol,
}

impl TraceExporterBuilder {
Expand Down Expand Up @@ -286,6 +287,17 @@ impl TraceExporterBuilder {
self
}

/// Selects the OTLP export protocol. Accepts `OtlpProtocol::HttpJson` (default) or
/// `OtlpProtocol::HttpProtobuf`. The host language resolves this from
/// `OTEL_EXPORTER_OTLP_TRACES_PROTOCOL` / `OTEL_EXPORTER_OTLP_PROTOCOL`.
///
/// `OtlpProtocol::Grpc` is not supported; selecting it makes [`build`](Self::build) /
/// [`build_async`](Self::build_async) fail with [`BuilderErrorKind::InvalidConfiguration`].
pub fn set_otlp_protocol(&mut self, protocol: OtlpProtocol) -> &mut Self {
self.otlp_protocol = protocol;
self
}

/// Sets additional HTTP headers to include in OTLP trace export requests.
///
/// Headers should be provided as key-value pairs. The host language is responsible for
Expand Down Expand Up @@ -332,6 +344,18 @@ impl TraceExporterBuilder {
));
}

// OTLP gRPC export is not implemented. Reject it here so a misconfigured exporter fails
// fast at build time with a clear `InvalidConfiguration` (FFI: `InvalidArgument`), matching
// the C FFI `set_otlp_protocol` setter, rather than erroring on every send. The send-time
// arm in `send_otlp_traces_inner` remains as a defensive guard.
if self.otlp_protocol == OtlpProtocol::Grpc {
return Err(TraceExporterError::Builder(
BuilderErrorKind::InvalidConfiguration(
"OTLP gRPC export is not supported".to_string(),
),
));
}

let shared_runtime = match self.shared_runtime {
Some(rt) => rt,
None => Self::new_shared_runtime()?,
Expand Down Expand Up @@ -451,7 +475,7 @@ impl TraceExporterBuilder {
.connection_timeout
.map(Duration::from_millis)
.unwrap_or(DEFAULT_OTLP_TIMEOUT),
protocol: OtlpProtocol::HttpJson,
protocol: self.otlp_protocol,
}
});

Expand Down Expand Up @@ -672,6 +696,21 @@ mod tests {
));
}

#[test]
fn test_otlp_grpc_protocol_rejected_at_build() {
// gRPC is unsupported and must fail fast at build time (not on the first send), with the
// same `InvalidConfiguration` category the C FFI setter uses.
let mut builder = TraceExporterBuilder::default();
builder.set_otlp_protocol(crate::otlp::OtlpProtocol::Grpc);
let result = builder.build::<NativeCapabilities>();
assert!(matches!(
result,
Err(TraceExporterError::Builder(
BuilderErrorKind::InvalidConfiguration(_)
))
));
}

#[cfg_attr(miri, ignore)]
#[test]
fn test_build_with_v1_starts_inactive() {
Expand Down
Loading
Loading