From c71fb77dce142e64fa90cae38d932c86f3b06a46 Mon Sep 17 00:00:00 2001 From: Bryan English Date: Tue, 2 Jun 2026 15:59:51 -0400 Subject: [PATCH] feat(data-pipeline)!: add stdout log trace exporter Add a "log exporter" transport that writes traces as newline-delimited JSON to stdout in the format consumed by the Datadog Forwarder, instead of sending them to an agent over HTTP. This is the serverless fallback used in AWS Lambda when no Datadog agent or Lambda extension is reachable, matching the log writers in dd-trace-js/py/go/java. - json_log_encoder (libdd-trace-utils): encodes spans to {"traces":[[...]]} lines with lowercase hex ids (incl. 128-bit), integer error, ns start/duration, empty-field omission, and greedy 256 KiB line batching that drops oversize single spans. - TraceExporterBuilder::set_output_to_log selects a stdout destination that bypasses agent-info polling, client-side stats, V1 negotiation, and telemetry. Exposed over FFI as ddog_trace_exporter_config_set_output_to_log. - The write goes through a new LogWriterCapability so the transport also works on wasm (where the host, e.g. JavaScript, supplies the sink); the native capability writes to stdout. Writes are synchronous and intended for single-threaded serverless runtimes. meta_struct is omitted (raw msgpack the log intake can't read). BREAKING CHANGE: TraceExporter (and TraceExporterBuilder::build / build_async and trace_buffer::DefaultExport) now require C: LogWriterCapability. Callers using a custom capability bundle must implement libdd_capabilities::LogWriterCapability; NativeCapabilities already implements it, so callers using NativeCapabilities are unaffected. --- libdd-capabilities-impl/src/lib.rs | 14 +- libdd-capabilities/src/lib.rs | 2 + libdd-capabilities/src/log_output.rs | 19 + libdd-data-pipeline-ffi/src/trace_exporter.rs | 63 +++ libdd-data-pipeline/src/trace_buffer/mod.rs | 16 +- .../src/trace_exporter/builder.rs | 116 +++- .../src/trace_exporter/log_writer.rs | 94 ++++ libdd-data-pipeline/src/trace_exporter/mod.rs | 170 +++++- libdd-trace-utils/src/json_log_encoder/mod.rs | 497 ++++++++++++++++++ .../src/json_log_encoder/span.rs | 200 +++++++ libdd-trace-utils/src/lib.rs | 1 + 11 files changed, 1162 insertions(+), 30 deletions(-) create mode 100644 libdd-capabilities/src/log_output.rs create mode 100644 libdd-data-pipeline/src/trace_exporter/log_writer.rs create mode 100644 libdd-trace-utils/src/json_log_encoder/mod.rs create mode 100644 libdd-trace-utils/src/json_log_encoder/span.rs diff --git a/libdd-capabilities-impl/src/lib.rs b/libdd-capabilities-impl/src/lib.rs index d90d19034b..e2b48d7d74 100644 --- a/libdd-capabilities-impl/src/lib.rs +++ b/libdd-capabilities-impl/src/lib.rs @@ -15,7 +15,7 @@ use std::time::Duration; pub use http::NativeHttpClient; use libdd_capabilities::{http::HttpError, MaybeSend}; -pub use libdd_capabilities::{HttpClientCapability, SleepCapability}; +pub use libdd_capabilities::{HttpClientCapability, LogWriterCapability, SleepCapability}; pub use sleep::NativeSleepCapability; /// Bundle struct for native platform capabilities. @@ -64,6 +64,18 @@ impl HttpClientCapability for NativeCapabilities { } } +impl LogWriterCapability for NativeCapabilities { + fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()> { + use std::io::Write; + // `Stdout` is internally synchronized; lock once so the whole buffer + // (one or more newline-terminated JSON lines) is written without + // interleaving, then flush. + let mut out = std::io::stdout().lock(); + out.write_all(bytes)?; + out.flush() + } +} + impl SleepCapability for NativeCapabilities { fn new() -> Self { Self { diff --git a/libdd-capabilities/src/lib.rs b/libdd-capabilities/src/lib.rs index 340fbf2c6c..1798946057 100644 --- a/libdd-capabilities/src/lib.rs +++ b/libdd-capabilities/src/lib.rs @@ -4,11 +4,13 @@ //! Portable capability traits for cross-platform libdatadog. pub mod http; +pub mod log_output; pub mod maybe_send; pub mod sleep; pub mod spawn; pub use self::http::{HttpClientCapability, HttpError}; +pub use self::log_output::LogWriterCapability; pub use self::sleep::SleepCapability; pub use self::spawn::SpawnError; pub use ::http::{Request, Response}; diff --git a/libdd-capabilities/src/log_output.rs b/libdd-capabilities/src/log_output.rs new file mode 100644 index 0000000000..9b75e3124e --- /dev/null +++ b/libdd-capabilities/src/log_output.rs @@ -0,0 +1,19 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Log-output capability trait. +//! +//! Lets the trace pipeline emit already-encoded log-exporter output to the +//! platform's log sink. On native targets this writes to process stdout; a wasm +//! consumer implements it by handing the bytes to the host (e.g. JavaScript), +//! since wasm cannot write to stdout directly. + +/// Capability for writing encoded log-exporter output to the platform log sink. +pub trait LogWriterCapability { + /// Write a buffer of newline-delimited log output. + /// + /// `bytes` may contain one or more `\n`-terminated JSON lines. Implementations + /// should write the whole buffer (so individual lines are not interleaved with + /// other writers) and flush before returning. + fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()>; +} diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index 5271c86e63..8e71458246 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -83,6 +83,8 @@ pub struct TraceExporterConfig { connection_timeout: Option, shared_runtime: Option>, otlp_endpoint: Option, + output_to_log: bool, + log_max_line_size: Option, } #[no_mangle] @@ -498,6 +500,37 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint( ) } +/// Configure the exporter to write traces as newline-delimited JSON to stdout (the Datadog +/// Forwarder "log exporter" path) instead of sending them to a Datadog agent. Used in serverless +/// environments (e.g. AWS Lambda) when no agent is reachable. +/// +/// `max_line_size` overrides the per-line byte cap; pass `0` to use the default (256 KiB, the AWS +/// CloudWatch Logs limit). When enabled, agent-specific behavior (agent-info polling, client-side +/// stats, V1 negotiation) is bypassed. +/// +/// In this mode each span's `meta` is serialized to process stdout (and thus captured by CloudWatch +/// Logs in Lambda); `meta_struct` is excluded because it holds raw msgpack the log intake cannot +/// interpret. +/// +/// Writes are synchronous/blocking on stdout, so this mode targets single-threaded / current-thread +/// serverless runtimes (e.g. AWS Lambda) where a blocking write won't stall a shared async reactor. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_config_set_output_to_log( + config: Option<&mut TraceExporterConfig>, + max_line_size: usize, +) -> Option> { + catch_panic!( + if let Some(handle) = config { + handle.output_to_log = true; + handle.log_max_line_size = (max_line_size != 0).then_some(max_line_size); + None + } else { + gen_error!(ErrorCode::InvalidArgument) + }, + gen_error!(ErrorCode::Panic) + ) +} + /// Create a new TraceExporter instance. /// /// When an OTLP endpoint is configured via `TraceExporterConfig`, the exporter sends traces in @@ -567,6 +600,10 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( builder.set_otlp_endpoint(url); } + if config.output_to_log { + builder.set_output_to_log(config.log_max_line_size); + } + match builder.build() { Ok(exporter) => { out_handle.as_ptr().write(Box::new(exporter)); @@ -662,11 +699,37 @@ mod tests { assert!(cfg.process_tags.is_none()); assert!(cfg.test_session_token.is_none()); assert!(cfg.connection_timeout.is_none()); + assert!(!cfg.output_to_log); + assert_eq!(cfg.log_max_line_size, None); ddog_trace_exporter_config_free(cfg); } } + #[test] + fn config_output_to_log_test() { + unsafe { + // Null config handle -> InvalidArgument. + let error = ddog_trace_exporter_config_set_output_to_log(None, 0); + assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); + ddog_trace_exporter_error_free(error); + + // 0 is a sentinel for "use the default cap" -> None. + let mut config = Some(TraceExporterConfig::default()); + let error = ddog_trace_exporter_config_set_output_to_log(config.as_mut(), 0); + assert_eq!(error, None); + let cfg = config.unwrap(); + assert!(cfg.output_to_log); + assert_eq!(cfg.log_max_line_size, None); + + // Non-zero cap is stored as-is. + let mut config = Some(TraceExporterConfig::default()); + let error = ddog_trace_exporter_config_set_output_to_log(config.as_mut(), 4096); + assert_eq!(error, None); + assert_eq!(config.unwrap().log_max_line_size, Some(4096)); + } + } + #[test] fn config_url_test() { unsafe { diff --git a/libdd-data-pipeline/src/trace_buffer/mod.rs b/libdd-data-pipeline/src/trace_buffer/mod.rs index 09698eef18..0950ea4c3c 100644 --- a/libdd-data-pipeline/src/trace_buffer/mod.rs +++ b/libdd-data-pipeline/src/trace_buffer/mod.rs @@ -13,7 +13,7 @@ use std::{ time::{Duration, Instant}, }; -use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; +use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability}; use libdd_shared_runtime::Worker; use crate::trace_exporter::{ @@ -640,18 +640,24 @@ pub trait Export: Send + Debug { } #[derive(Debug)] -pub struct DefaultExport { +pub struct DefaultExport< + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, +> { trace_exporter: TraceExporter, } -impl DefaultExport { +impl< + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, + > DefaultExport +{ pub fn new(trace_exporter: TraceExporter) -> Self { Self { trace_exporter } } } -impl - Export for DefaultExport +impl< + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, + > Export for DefaultExport { fn export_trace_chunks( &mut self, diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 3c0e1f14b5..a3b94d5478 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -8,6 +8,7 @@ use crate::otlp::OtlpTraceConfig; use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; use crate::trace_exporter::error::BuilderErrorKind; +use crate::trace_exporter::log_writer::DEFAULT_LOG_MAX_LINE_SIZE; #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] use crate::trace_exporter::TelemetryConfig; #[cfg(not(target_arch = "wasm32"))] @@ -18,7 +19,7 @@ use crate::trace_exporter::{ TracerMetadata, INFO_ENDPOINT, }; use arc_swap::ArcSwap; -use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; +use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability}; use libdd_common::{parse_uri, tag, Endpoint}; use libdd_dogstatsd_client::new; use libdd_shared_runtime::SharedRuntime; @@ -65,6 +66,11 @@ pub struct TraceExporterBuilder { connection_timeout: Option, otlp_endpoint: Option, otlp_headers: Vec<(String, String)>, + /// When true, traces are written as newline-delimited JSON to stdout (the + /// Datadog Forwarder "log exporter" path) instead of being sent to an agent. + output_to_log: bool, + /// Optional override for the maximum size of a single emitted log line. + log_max_line_size: Option, } impl TraceExporterBuilder { @@ -296,12 +302,40 @@ impl TraceExporterBuilder { self } + /// Configure the exporter to write traces as newline-delimited JSON to stdout + /// (the Datadog Forwarder "log exporter" path) instead of sending them to a + /// Datadog agent. This is the transport used in serverless environments (e.g. + /// AWS Lambda) when no agent is reachable. + /// + /// `max_line_size` overrides the per-line byte cap; `None` (or `Some(0)`, + /// which is coerced to the default) uses the default of 256 KiB, the AWS + /// CloudWatch Logs per-event limit. When this is set, agent-specific behavior + /// (agent-info polling, client-side stats, V1 negotiation) is bypassed. + /// + /// In this mode each span's `meta` is serialized to process stdout (and thus + /// captured by CloudWatch Logs in Lambda); `meta_struct` is excluded because + /// it holds raw msgpack the log intake cannot interpret. Writes are + /// synchronous/blocking, so this mode targets single-threaded serverless + /// runtimes where blocking stdout writes do not stall a shared async reactor. + /// + /// Takes precedence over an OTLP endpoint: if both this and `set_otlp_endpoint` + /// are configured, traces are written to the log output and not sent via OTLP. + pub fn set_output_to_log(&mut self, max_line_size: Option) -> &mut Self { + self.output_to_log = true; + // Treat `Some(0)` as "use the default" (a 0 cap would drop every span); + // keeps parity with the FFI setter's 0-sentinel. + self.log_max_line_size = max_line_size.filter(|&n| n != 0); + self + } + /// Build the [`TraceExporter`] synchronously. /// /// Sync facade over [`Self::build_async`]; panics inside an existing tokio context. /// Not available on wasm — use [`Self::build_async`] there. #[cfg(not(target_arch = "wasm32"))] - pub fn build( + pub fn build< + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, + >( mut self, ) -> Result, TraceExporterError> { let shared_runtime = match self.shared_runtime.as_ref() { @@ -320,7 +354,7 @@ impl TraceExporterBuilder { /// Awaits all async setup (e.g. telemetry start-up). Safe to drive from any async /// context. pub async fn build_async< - C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, >( self, ) -> Result, TraceExporterError> { @@ -365,19 +399,26 @@ impl TraceExporterBuilder { let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)); let (info_fetcher, info_response_observer) = AgentInfoFetcher::::new(info_endpoint, Duration::from_secs(5 * 60)); - let info_fetcher_handle = - shared_runtime - .spawn_worker(info_fetcher, false) - .map_err(|e| { - TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( - e.to_string(), - )) - })?; - // The handle is currently only tracked for shutdown on native; on wasm - // it is dropped here (the worker keeps running on the JS event loop - // until the page/module is torn down). + // In log-export mode there is no agent to poll; skip spawning the worker + // entirely so we don't make repeated failing `/info` calls (e.g. in Lambda). + let info_fetcher_handle = if self.output_to_log { + None + } else { + Some( + shared_runtime + .spawn_worker(info_fetcher, false) + .map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( + e.to_string(), + )) + })?, + ) + }; + // The handle is only tracked for shutdown on native; on wasm the `workers` + // field is cfg'd out, so drop it here (the worker keeps running on the JS + // event loop until the page/module is torn down). #[cfg(target_arch = "wasm32")] - let _ = info_fetcher_handle; + drop(info_fetcher_handle); #[allow(unused_mut)] let mut stats = StatsComputationStatus::Disabled; @@ -389,7 +430,13 @@ impl TraceExporterBuilder { #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] let (telemetry_client, telemetry_handle) = { let sessions = self.telemetry_instrumentation_sessions; - let telemetry = self.telemetry.map(|telemetry_config| { + // Telemetry talks to the agent; disable it in log-export mode. + let telemetry = if self.output_to_log { + None + } else { + self.telemetry + } + .map(|telemetry_config| { let mut builder = TelemetryClientBuilder::default() .set_language(&self.language) .set_language_version(&self.language_version) @@ -455,6 +502,10 @@ impl TraceExporterBuilder { } }); + let log_output = self + .output_to_log + .then(|| self.log_max_line_size.unwrap_or(DEFAULT_LOG_MAX_LINE_SIZE)); + Ok(TraceExporter { endpoint: Endpoint { url: agent_url, @@ -514,6 +565,7 @@ impl TraceExporterBuilder { .agent_rates_payload_version_enabled .then(AgentResponsePayloadVersion::new), otlp_config, + log_output, }) } @@ -545,6 +597,38 @@ mod tests { use crate::trace_exporter::error::BuilderErrorKind; use libdd_capabilities_impl::NativeCapabilities; + #[cfg_attr(miri, ignore)] + #[test] + fn test_log_output_mode() { + let mut builder = TraceExporterBuilder::default(); + builder + .set_service("test") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_to_log(None); + let exporter = builder.build::().unwrap(); + // Log-output mode is enabled and the agent-info worker is not spawned. + // (End-to-end send -> bytes is covered by the capability-injecting test in + // `mod.rs` and the `log_writer` unit tests.) + assert!( + exporter.log_output.is_some(), + "log_output should be set in log-output mode" + ); + assert!( + exporter.workers.info_fetcher.is_none(), + "no agent-info worker should be spawned in log mode" + ); + } + + #[test] + fn set_output_to_log_some_zero_uses_default() { + // `Some(0)` is coerced to "use the default cap" (a 0 cap would drop every + // span), which is represented as `None` on the builder field. + let mut builder = TraceExporterBuilder::default(); + builder.set_output_to_log(Some(0)); + assert!(builder.output_to_log); + assert_eq!(builder.log_max_line_size, None); + } + #[cfg_attr(miri, ignore)] #[test] fn test_new() { diff --git a/libdd-data-pipeline/src/trace_exporter/log_writer.rs b/libdd-data-pipeline/src/trace_exporter/log_writer.rs new file mode 100644 index 0000000000..40b5ca2436 --- /dev/null +++ b/libdd-data-pipeline/src/trace_exporter/log_writer.rs @@ -0,0 +1,94 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Stdout "log exporter" trace transport. +//! +//! Encodes trace chunks as newline-delimited JSON in the format consumed by the +//! Datadog Forwarder and writes them through the [`LogWriterCapability`] (stdout +//! on native targets; host-provided, e.g. handed to JavaScript, on wasm). This +//! is used in serverless environments (primarily AWS Lambda) where no agent is +//! reachable; a downstream tool (the Datadog Forwarder) tails the platform logs +//! and submits the traces to the trace intake. + +use libdd_capabilities::LogWriterCapability; +use libdd_trace_utils::json_log_encoder::{encode_traces, EncodeStats}; +use libdd_trace_utils::span::{v04::Span, TraceData}; + +/// Default maximum size of a single emitted log line, in bytes. +/// +/// Matches the AWS CloudWatch Logs per-event limit (256 KiB), consistent with +/// dd-trace-go's `logBufferLimit`. Spans are packed greedily up to this size; +/// a single span that alone exceeds it is dropped (see [`encode_traces`]). +pub(crate) const DEFAULT_LOG_MAX_LINE_SIZE: usize = 256 * 1024; + +/// Encode `traces` to newline-delimited Forwarder JSON and write them through the +/// log-output capability. Returns counts of spans written/dropped. +/// +/// Writes are synchronous: on native targets this blocks on a stdout write, so +/// log-export mode is intended for single-threaded / current-thread serverless +/// runtimes (e.g. AWS Lambda) where there is no shared async reactor to stall. +pub(crate) fn write_log_traces( + capabilities: &C, + traces: &[Vec>], + max_line_size: usize, +) -> std::io::Result { + let mut buf: Vec = Vec::new(); + let stats = encode_traces(traces, &mut buf, max_line_size)?; + if !buf.is_empty() { + capabilities.write_log_output(&buf)?; + } + Ok(stats) +} + +#[cfg(test)] +mod tests { + use super::*; + use libdd_trace_utils::span::v04::Span; + use libdd_trace_utils::span::SliceData; + use std::sync::{Arc, Mutex}; + + /// Test capability that captures written bytes instead of touching stdout. + #[derive(Clone, Default)] + struct CapturingLog(Arc>>); + + impl LogWriterCapability for CapturingLog { + fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()> { + self.0 + .lock() + .expect("capture lock") + .extend_from_slice(bytes); + Ok(()) + } + } + + #[test] + fn encodes_and_writes_through_capability() { + let cap = CapturingLog::default(); + let traces = vec![vec![Span::> { + trace_id: 1, + span_id: 2, + ..Default::default() + }]]; + + let stats = write_log_traces(&cap, &traces, DEFAULT_LOG_MAX_LINE_SIZE).expect("write ok"); + assert_eq!(stats.spans_written, 1); + assert_eq!(stats.spans_dropped, 0); + + let out = cap.0.lock().expect("lock"); + let text = std::str::from_utf8(&out).expect("utf8"); + assert!(text.ends_with('\n'), "line must be newline-terminated"); + let v: serde_json::Value = serde_json::from_str(text.trim_end()).expect("valid json"); + // Forwarder is_trace contract. + assert!(v["traces"][0][0]["trace_id"].is_string()); + assert_eq!(v["traces"][0][0]["span_id"], "0000000000000002"); + } + + #[test] + fn empty_traces_do_not_call_capability() { + let cap = CapturingLog::default(); + let traces: Vec>>> = vec![]; + let stats = write_log_traces(&cap, &traces, DEFAULT_LOG_MAX_LINE_SIZE).expect("ok"); + assert_eq!(stats.spans_written, 0); + assert!(cap.0.lock().expect("lock").is_empty()); + } +} diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 839c6b7247..b4dd6799ea 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -3,6 +3,7 @@ pub mod agent_response; pub mod builder; pub mod error; +mod log_writer; pub mod metrics; pub mod stats; mod trace_serializer; @@ -11,6 +12,7 @@ mod trace_serializer; pub use builder::TraceExporterBuilder; use self::agent_response::AgentResponse; +use self::log_writer::write_log_traces; use self::metrics::MetricsEmitter; use self::stats::StatsComputationStatus; use self::trace_serializer::TraceSerializer; @@ -35,7 +37,7 @@ use bytes::Bytes; use http::header::HeaderMap; use http::uri::PathAndQuery; use http::Uri; -use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; +use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability}; use libdd_common::tag::Tag; use libdd_common::Endpoint; use libdd_dogstatsd_client::Client; @@ -139,7 +141,8 @@ pub use libdd_trace_utils::tracer_metadata::TracerMetadata; #[cfg(not(target_arch = "wasm32"))] #[derive(Debug)] pub(crate) struct TraceExporterWorkers { - info_fetcher: WorkerHandle, + /// `None` in log-export mode, where no agent `/info` polling is performed. + info_fetcher: Option, #[cfg(feature = "telemetry")] telemetry: Option, } @@ -181,7 +184,9 @@ impl From for DeserInputFormat { /// pin it to a concrete type (`NativeCapabilities` or `WasmCapabilities`). /// Task spawning is handled internally by `SharedRuntime`. #[derive(Debug)] -pub struct TraceExporter { +pub struct TraceExporter< + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, +> { endpoint: Endpoint, metadata: TracerMetadata, input_format: TraceExporterInputFormat, @@ -212,9 +217,17 @@ pub struct TraceExporter, /// When set, traces are exported via OTLP HTTP/JSON instead of the Datadog agent. otlp_config: Option, + /// When `Some(max_line_size)`, traces are written as newline-delimited JSON + /// through the [`LogWriterCapability`] (the Datadog Forwarder "log exporter" + /// path) instead of being sent to an agent. Used in serverless environments + /// where no agent is reachable. + log_output: Option, } -impl TraceExporter { +impl< + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, + > TraceExporter +{ #[allow(missing_docs)] pub fn builder() -> TraceExporterBuilder { TraceExporterBuilder::default() @@ -267,8 +280,9 @@ impl Tra join_set.spawn(async move { handle.stop().await }); } - let info_fetcher = self.workers.info_fetcher; - join_set.spawn(async move { info_fetcher.stop().await }); + if let Some(info_fetcher) = self.workers.info_fetcher { + join_set.spawn(async move { info_fetcher.stop().await }); + } #[cfg(feature = "telemetry")] if let Some(telemetry) = self.workers.telemetry { @@ -302,7 +316,10 @@ impl Tra /// `data` must be encoded per the `input_format` given to the builder. /// [`Self::send`] is the sync facade over this method. pub async fn send_async(&self, data: &[u8]) -> Result { - self.check_agent_info().await; + // In log-export mode there is no agent to negotiate with; skip the poll. + if self.log_output.is_none() { + self.check_agent_info().await; + } let format: DeserInputFormat = self.input_format.into(); @@ -499,7 +516,10 @@ impl Tra &self, trace_chunks: Vec>>, ) -> Result { - self.check_agent_info().await; + // In log-export mode there is no agent to negotiate with; skip the poll. + if self.log_output.is_none() { + self.check_agent_info().await; + } self.send_trace_chunks_inner(trace_chunks).await } @@ -575,6 +595,25 @@ impl Tra &self, mut traces: Vec>>, ) -> Result { + // Log-export path: write traces as newline-delimited JSON through the + // log-output capability (stdout on native; host/JS on wasm) for the Datadog + // Forwarder. No agent, stats, OTLP, or telemetry involved. + // + // We emit all spans as-is and do NOT drop unsampled (p0) chunks here + // (unlike the OTLP path): the reference log exporters (JS/Go/Py/Java) write + // every span they are handed and defer sampling to the trace intake behind + // the Datadog Forwarder. + if let Some(max_line_size) = self.log_output { + let stats = write_log_traces(&self.capabilities, &traces, max_line_size) + .map_err(TraceExporterError::Io)?; + debug!( + spans_written = stats.spans_written, + spans_dropped = stats.spans_dropped, + "Wrote traces to log exporter" + ); + return Ok(AgentResponse::Unchanged); + } + let mut header_tags: TracerHeaderTags = self.metadata.borrow().into(); // Process stats computation and drop non-sampled (p0) chunks. @@ -1129,6 +1168,121 @@ mod tests { builder.build::().unwrap() } + // Capturing capabilities: delegate HTTP/sleep to the native impls, but capture + // log output into a thread-local buffer so log-mode tests can assert the emitted + // bytes without writing to real stdout. `build` constructs `C` via + // `C::new_client`, so the buffer is shared through a thread-local rather than an + // instance field; `new_client` clears it so each build starts fresh. + thread_local! { + static LOG_CAPTURE: std::cell::RefCell> = const { std::cell::RefCell::new(Vec::new()) }; + } + + #[derive(Clone, Debug)] + struct CapturingCapabilities(NativeCapabilities); + + impl HttpClientCapability for CapturingCapabilities { + fn new_client() -> Self { + LOG_CAPTURE.with(|c| c.borrow_mut().clear()); + Self(NativeCapabilities::new_client()) + } + fn request( + &self, + req: http::Request, + ) -> impl std::future::Future< + Output = Result, libdd_capabilities::http::HttpError>, + > + MaybeSend { + self.0.request(req) + } + } + + impl SleepCapability for CapturingCapabilities { + fn new() -> Self { + Self(NativeCapabilities::new()) + } + fn sleep(&self, duration: Duration) -> impl std::future::Future + MaybeSend { + self.0.sleep(duration) + } + } + + impl LogWriterCapability for CapturingCapabilities { + fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()> { + LOG_CAPTURE.with(|c| c.borrow_mut().extend_from_slice(bytes)); + Ok(()) + } + } + + fn captured_log() -> Vec { + LOG_CAPTURE.with(|c| c.borrow().clone()) + } + + // The real `send` entry point decodes msgpack, hits the log branch, and writes + // Forwarder-format JSON bytes through the log-output capability. + #[test] + #[cfg_attr(miri, ignore)] + fn test_log_mode_send_writes_forwarder_json() { + let mut builder = TraceExporterBuilder::default(); + builder + .set_service("test") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_to_log(None); + let exporter = builder.build::().unwrap(); + + let traces: Vec> = vec![vec![SpanBytes { + name: BytesString::from_slice(b"aws.lambda").unwrap(), + trace_id: 1, + span_id: 2, + ..Default::default() + }]]; + let data = msgpack_encoder::v04::to_vec(&traces); + + let resp = exporter.send(data.as_ref()).unwrap(); + assert!(matches!(resp, AgentResponse::Unchanged)); + + let text = String::from_utf8(captured_log()).unwrap(); + assert!(text.ends_with('\n'), "line must be newline-terminated"); + let line = text.trim_end(); + let v: serde_json::Value = serde_json::from_str(line).expect("valid json line"); + // Forwarder is_trace contract + the span actually round-tripped through + // msgpack decode -> log encode with hex ids. + assert!(v["traces"][0][0]["trace_id"].is_string()); + assert_eq!(v["traces"][0][0]["span_id"], "0000000000000002"); + assert_eq!(v["traces"][0][0]["name"], "aws.lambda"); + } + + // Log mode must make zero agent HTTP calls — also guards the worker-gating fix + // (info-fetcher is not spawned in log mode). + #[test] + #[cfg_attr(miri, ignore)] + fn test_log_mode_makes_no_agent_requests() { + let fake_agent = MockServer::start(); + // No `when` constraints => matches any request to any path. + let any = fake_agent.mock(|_when, then| { + then.status(200).body("{}"); + }); + + let mut builder = TraceExporterBuilder::default(); + builder + .set_url(&fake_agent.url("/")) + .set_service("test") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_to_log(None); + let exporter = builder.build::().unwrap(); + // Structural guarantee: no agent-info worker is spawned in log mode. + assert!(exporter.workers.info_fetcher.is_none()); + + let traces: Vec> = vec![vec![SpanBytes { + name: BytesString::from_slice(b"test").unwrap(), + ..Default::default() + }]]; + let data = msgpack_encoder::v04::to_vec(&traces); + // `send` is synchronous and, in log mode, returns after writing through the + // capability without initiating any HTTP; combined with the structural assert + // above this is deterministic (no background worker can race the mock). + exporter.send(data.as_ref()).unwrap(); + + assert_eq!(any.calls(), 0, "log mode must not contact the agent"); + } + #[test] #[cfg_attr(miri, ignore)] fn test_health_metrics() { diff --git a/libdd-trace-utils/src/json_log_encoder/mod.rs b/libdd-trace-utils/src/json_log_encoder/mod.rs new file mode 100644 index 0000000000..fecf60be11 --- /dev/null +++ b/libdd-trace-utils/src/json_log_encoder/mod.rs @@ -0,0 +1,497 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! JSON "log exporter" trace encoder. +//! +//! Emits traces in the newline-delimited JSON format consumed by the Datadog +//! Forwarder Lambda (the legacy serverless path used when no Datadog Agent / +//! Lambda Extension is reachable). Each emitted line is a self-contained JSON +//! document of the form: +//! +//! ```text +//! {"traces":[[ {span}, {span}, ... ]]}\n +//! ``` +//! +//! Spans are greedily packed into size-bounded lines. A single span that alone +//! exceeds the line cap is dropped (and counted in [`EncodeStats::spans_dropped`]) +//! rather than emitted as a truncated, unparseable line. +//! +//! See the cross-language specification for the wire contract and the +//! Forwarder's `is_trace` detection requirements. + +mod span; + +use crate::span::v04::Span; +use crate::span::TraceData; +use span::LogSpan; +use std::io::Write; + +/// Opening bytes of every emitted line: `{"traces":[[`. +const TRACE_PREFIX: &[u8] = b"{\"traces\":[["; +/// Closing bytes of every emitted line: `]]}` plus the terminating newline. +const TRACE_SUFFIX: &[u8] = b"]]}\n"; +/// Fixed per-line overhead contributed by [`TRACE_PREFIX`] and [`TRACE_SUFFIX`]. +const TRACE_FORMAT_OVERHEAD: usize = TRACE_PREFIX.len() + TRACE_SUFFIX.len(); + +/// Statistics reported by [`encode_traces`]. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct EncodeStats { + /// Number of spans successfully written to `out`. + pub spans_written: usize, + /// Number of spans dropped because a single span exceeded `max_line_size`. + pub spans_dropped: usize, +} + +/// Encodes `traces` into newline-delimited JSON "log exporter" lines, writing +/// them to `out`. +/// +/// All spans across all input traces are flattened and greedily packed into +/// lines no larger than `max_line_size` bytes (including the `{"traces":[[` / +/// `]]}\n` framing). Each line contains a single inner trace array, matching the +/// reference dd-trace-js exporter. A span whose own serialized size plus the +/// framing overhead exceeds `max_line_size` is dropped (counted in +/// [`EncodeStats::spans_dropped`]) and never split across lines. +/// +/// The caller is responsible for flushing `out` (e.g. stdout) after this returns. +/// +/// # Errors +/// +/// Returns any [`std::io::Error`] produced while writing to `out`. +/// +/// # Examples +/// +/// ``` +/// use libdd_trace_utils::json_log_encoder::encode_traces; +/// use libdd_trace_utils::span::v04::SpanSlice; +/// +/// let span = SpanSlice { +/// service: "my-fn".into(), +/// name: "aws.lambda".into(), +/// resource: "my-fn".into(), +/// trace_id: 1, +/// span_id: 2, +/// ..Default::default() +/// }; +/// let traces = vec![vec![span]]; +/// +/// let mut out = Vec::new(); +/// let stats = encode_traces(&traces, &mut out, 64 * 1024).unwrap(); +/// +/// assert_eq!(stats.spans_written, 1); +/// assert!(out.ends_with(b"\n")); +/// ``` +pub fn encode_traces( + traces: &[Vec>], + out: &mut impl Write, + max_line_size: usize, +) -> std::io::Result { + let mut stats = EncodeStats::default(); + + // Reusable buffers: `span_buf` holds the JSON for the span currently being + // considered; `line` accumulates the complete current line. It is primed + // with `TRACE_PREFIX` so that a single `write_all` per line can emit + // prefix + spans + suffix in one syscall (see `flush_line`). + let mut span_buf: Vec = Vec::with_capacity(512); + let mut line: Vec = Vec::with_capacity(max_line_size.min(64 * 1024)); + line.extend_from_slice(TRACE_PREFIX); + let mut line_span_count: usize = 0; + + for trace in traces { + for span in trace { + span_buf.clear(); + serde_json::to_writer(&mut span_buf, &LogSpan(span)).map_err(std::io::Error::other)?; + let span_len = span_buf.len(); + + // A span that cannot fit on a line by itself is dropped rather than + // emitted as a truncated, unparseable line. + if span_len + TRACE_FORMAT_OVERHEAD > max_line_size { + stats.spans_dropped += 1; + tracing::debug!( + span_len, + max_line_size, + "Span too large to send to logs, dropping" + ); + continue; + } + + // Flush the current line if appending this span would overflow. + // `line` already contains `TRACE_PREFIX`; the emitted line will also + // gain `TRACE_SUFFIX`, so account for both here. + let comma = usize::from(line_span_count > 0); + if line_span_count > 0 + && line.len() + comma + span_len + TRACE_SUFFIX.len() > max_line_size + { + flush_line(out, &mut line)?; + line_span_count = 0; + } + + if line_span_count > 0 { + line.push(b','); + } + line.extend_from_slice(&span_buf); + line_span_count += 1; + stats.spans_written += 1; + } + } + + if line_span_count > 0 { + flush_line(out, &mut line)?; + } + + Ok(stats) +} + +/// Writes one complete line (`{"traces":[[` + joined spans + `]]}\n`) in a single +/// `write_all`, then resets the line buffer (re-primed with `TRACE_PREFIX`) for +/// reuse. +/// +/// `line` is expected to already contain `TRACE_PREFIX` followed by the +/// comma-joined spans; this appends `TRACE_SUFFIX` to complete the line. +fn flush_line(out: &mut impl Write, line: &mut Vec) -> std::io::Result<()> { + line.extend_from_slice(TRACE_SUFFIX); + out.write_all(line)?; + line.clear(); + line.extend_from_slice(TRACE_PREFIX); + Ok(()) +} + +#[cfg(test)] +// `SpanSlice` fields are `Cow<'a, str>` (SliceData::Text), so the `"literal".into()` +// conversions below are genuine `&str -> Cow` conversions required to compile. clippy on +// the CI target nonetheless reports them as `useless_conversion` to `&str` (a false positive +// not reproduced on all hosts); allow it here rather than dropping the necessary `.into()`. +#[allow(clippy::useless_conversion)] +mod tests { + use super::*; + use crate::span::v04::SpanSlice; + use serde_json::Value; + use std::collections::HashMap; + + const MAX: usize = 64 * 1024; + + fn lines(out: &[u8]) -> Vec { + String::from_utf8(out.to_vec()) + .unwrap() + .lines() + .map(|s| s.to_string()) + .collect() + } + + #[test] + fn golden_known_span() { + let span = SpanSlice { + service: "my-fn".into(), + name: "aws.lambda".into(), + resource: "my-fn".into(), + r#type: "serverless".into(), + trace_id: 1, + span_id: 2, + parent_id: 0, + start: 1717200000000000000, + duration: 1500000, + error: 0, + meta: HashMap::from([("env".into(), "prod".into())]), + metrics: HashMap::from([("_sampling_priority_v1".into(), 1.0)]), + ..Default::default() + }; + let mut out = Vec::new(); + let stats = encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + assert_eq!(stats.spans_written, 1); + assert_eq!(stats.spans_dropped, 0); + + let expected = concat!( + "{\"traces\":[[", + "{\"trace_id\":\"0000000000000001\",", + "\"span_id\":\"0000000000000002\",", + "\"parent_id\":\"0000000000000000\",", + "\"service\":\"my-fn\",", + "\"name\":\"aws.lambda\",", + "\"resource\":\"my-fn\",", + "\"type\":\"serverless\",", + "\"error\":0,", + "\"start\":1717200000000000000,", + "\"duration\":1500000,", + "\"meta\":{\"env\":\"prod\"},", + "\"metrics\":{\"_sampling_priority_v1\":1.0}", + "}]]}\n", + ); + assert_eq!(String::from_utf8(out).unwrap(), expected); + } + + #[test] + fn meta_struct_is_omitted() { + let span = SpanSlice { + trace_id: 1, + span_id: 2, + meta_struct: HashMap::from([("_dd.appsec.json".into(), [0x81u8, 0xa4].as_slice())]), + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + let text = String::from_utf8(out).unwrap(); + assert!( + !text.contains("meta_struct"), + "meta_struct must not be emitted: {text}" + ); + // Still a valid, Forwarder-parseable line. + let v: Value = serde_json::from_str(text.trim_end()).unwrap(); + assert!(v["traces"][0][0]["trace_id"].is_string()); + } + + #[test] + fn hex_high_bits_and_root_parent() { + // trace_id with the high 64 bits set => 32 hex chars. + let trace_id: u128 = (0xABCDu128 << 64) | 0x1; + let span = SpanSlice { + trace_id, + span_id: 0xFF, + parent_id: 0, + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + let text = String::from_utf8(out).unwrap(); + assert!( + text.contains("\"trace_id\":\"000000000000abcd0000000000000001\""), + "got: {text}" + ); + assert!(text.contains("\"span_id\":\"00000000000000ff\"")); + assert!(text.contains("\"parent_id\":\"0000000000000000\"")); + } + + #[test] + fn error_is_integer() { + let span = SpanSlice { + error: 1, + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + let text = String::from_utf8(out).unwrap(); + assert!(text.contains("\"error\":1,"), "got: {text}"); + } + + #[test] + fn empty_maps_and_type_omitted() { + let span = SpanSlice { + name: "op".into(), + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + let text = String::from_utf8(out).unwrap(); + assert!(!text.contains("\"meta\""), "got: {text}"); + assert!(!text.contains("\"metrics\"")); + assert!(!text.contains("\"meta_struct\"")); + assert!(!text.contains("\"span_links\"")); + assert!(!text.contains("\"span_events\"")); + assert!(!text.contains("\"type\"")); + } + + #[test] + fn string_escaping() { + let span = SpanSlice { + name: "say \"hi\"\n".into(), + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + // Each line must remain valid JSON despite the embedded quote/newline. + for line in lines(&out) { + let parsed: Value = serde_json::from_str(&line).unwrap(); + assert_eq!( + parsed["traces"][0][0]["name"].as_str().unwrap(), + "say \"hi\"\n" + ); + } + } + + #[test] + fn size_cap_batches_into_multiple_lines() { + // Build several spans that individually fit but together exceed a small cap. + let make = |id: u64| SpanSlice { + name: "x".into(), + span_id: id, + ..Default::default() + }; + let trace: Vec = (1..=6).map(make).collect(); + + // Determine one span's serialized length to size the cap so ~2 fit per line. + let mut one = Vec::new(); + encode_traces(&[vec![make(1)]], &mut one, MAX).unwrap(); + // `one` = prefix + span + suffix. The bare span length: + let span_len = one.len() - TRACE_FORMAT_OVERHEAD; + // Cap fits two spans + a comma but not three. + let cap = TRACE_FORMAT_OVERHEAD + span_len * 2 + 1; + + let mut out = Vec::new(); + let stats = encode_traces(&[trace], &mut out, cap).unwrap(); + assert_eq!(stats.spans_written, 6); + assert_eq!(stats.spans_dropped, 0); + + let emitted = lines(&out); + assert_eq!(emitted.len(), 3, "expected 3 lines, got {emitted:?}"); + for line in &emitted { + assert!(line.len() <= cap, "line over cap: {} > {cap}", line.len()); + let parsed: Value = serde_json::from_str(line).unwrap(); + assert_eq!(parsed["traces"][0].as_array().unwrap().len(), 2); + } + } + + #[test] + fn oversize_single_span_dropped() { + let big = "a".repeat(10_000); + let span = SpanSlice { + name: big.as_str().into(), + span_id: 1, + ..Default::default() + }; + let small = SpanSlice { + name: "ok".into(), + span_id: 2, + ..Default::default() + }; + let mut out = Vec::new(); + // Cap large enough for `small` but far too small for `big`. + let stats = encode_traces(&[vec![span, small]], &mut out, 1024).unwrap(); + assert_eq!(stats.spans_dropped, 1); + assert_eq!(stats.spans_written, 1); + + let emitted = lines(&out); + assert_eq!(emitted.len(), 1); + let parsed: Value = serde_json::from_str(&emitted[0]).unwrap(); + assert_eq!(parsed["traces"][0][0]["name"].as_str().unwrap(), "ok"); + } + + #[test] + fn metric_non_finite_serializes_null() { + // serde_json renders non-finite f64 as JSON null; the line must remain + // parseable and the metric values must be null (not NaN/Infinity tokens). + let span = SpanSlice { + span_id: 1, + metrics: HashMap::from([("nan".into(), f64::NAN), ("inf".into(), f64::INFINITY)]), + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + let emitted = lines(&out); + assert_eq!(emitted.len(), 1); + let parsed: Value = serde_json::from_str(&emitted[0]).unwrap(); + let metrics = &parsed["traces"][0][0]["metrics"]; + assert!(metrics["nan"].is_null(), "got: {metrics}"); + assert!(metrics["inf"].is_null(), "got: {metrics}"); + } + + #[test] + fn multi_trace_flattened_into_one_line() { + // Two input traces, both well under the cap, flatten into a single line + // whose single inner trace array holds both spans. + let span_a = SpanSlice { + name: "a".into(), + span_id: 1, + ..Default::default() + }; + let span_b = SpanSlice { + name: "b".into(), + span_id: 2, + ..Default::default() + }; + let mut out = Vec::new(); + let stats = encode_traces(&[vec![span_a], vec![span_b]], &mut out, MAX).unwrap(); + assert_eq!(stats.spans_written, 2); + let emitted = lines(&out); + assert_eq!(emitted.len(), 1, "expected one line, got {emitted:?}"); + let parsed: Value = serde_json::from_str(&emitted[0]).unwrap(); + let inner = parsed["traces"][0].as_array().unwrap(); + assert_eq!(inner.len(), 2); + assert_eq!(parsed["traces"].as_array().unwrap().len(), 1); + } + + #[test] + fn span_links_and_events_emitted_when_present() { + use crate::span::v04::{SpanEvent, SpanLink}; + + let span = SpanSlice { + span_id: 1, + span_links: vec![SpanLink { + trace_id: 7, + span_id: 8, + ..Default::default() + }], + span_events: vec![SpanEvent { + time_unix_nano: 123, + name: "evt".into(), + ..Default::default() + }], + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + let emitted = lines(&out); + assert_eq!(emitted.len(), 1); + let parsed: Value = serde_json::from_str(&emitted[0]).unwrap(); + let span_json = &parsed["traces"][0][0]; + // Lock the inner wire shape (field names/values), not just presence. + assert_eq!(span_json["span_links"][0]["trace_id"], 7); + assert_eq!(span_json["span_links"][0]["span_id"], 8); + assert_eq!(span_json["span_events"][0]["name"], "evt"); + assert_eq!(span_json["span_events"][0]["time_unix_nano"], 123); + } + + #[test] + fn empty_inner_trace_writes_nothing() { + // One trace containing zero spans: nothing is emitted and no spans counted. + let traces: Vec> = vec![vec![]]; + let mut out = Vec::new(); + let stats = encode_traces(&traces, &mut out, MAX).unwrap(); + assert!(out.is_empty()); + assert_eq!(stats.spans_written, 0); + assert_eq!(stats.spans_dropped, 0); + } + + #[test] + fn empty_input_writes_nothing() { + let traces: Vec> = vec![]; + let mut out = Vec::new(); + let stats = encode_traces(&traces, &mut out, MAX).unwrap(); + assert_eq!(stats, EncodeStats::default()); + assert!(out.is_empty()); + } + + // Mirrors the Datadog Forwarder `is_trace` detection contract: every line + // parses as JSON, top-level `traces` is a non-empty array whose `[0]` is a + // non-empty array whose `[0]` has a non-null string `trace_id`, and every + // line ends with `\n`. + #[test] + fn forwarder_is_trace_contract() { + let trace: Vec = (1u64..=5) + .map(|i| SpanSlice { + service: "svc".into(), + name: "op".into(), + trace_id: u128::from(i), + span_id: i + 100, + ..Default::default() + }) + .collect(); + + let mut out = Vec::new(); + encode_traces(&[trace], &mut out, 200).unwrap(); + + let text = String::from_utf8(out).unwrap(); + assert!(!text.is_empty()); + // Every line is newline-terminated. + for chunk in text.split_inclusive('\n') { + assert!(chunk.ends_with('\n'), "line not newline-terminated"); + let line = chunk.trim_end_matches('\n'); + let parsed: Value = serde_json::from_str(line).unwrap(); + + let traces = parsed.get("traces").and_then(Value::as_array).unwrap(); + assert!(!traces.is_empty()); + let first = traces[0].as_array().unwrap(); + assert!(!first.is_empty()); + let trace_id = first[0].get("trace_id").unwrap(); + assert!(trace_id.is_string()); + assert!(!trace_id.as_str().unwrap().is_empty()); + } + } +} diff --git a/libdd-trace-utils/src/json_log_encoder/span.rs b/libdd-trace-utils/src/json_log_encoder/span.rs new file mode 100644 index 0000000000..c39363ed95 --- /dev/null +++ b/libdd-trace-utils/src/json_log_encoder/span.rs @@ -0,0 +1,200 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Serialization of a single [`Span`] into the Datadog Forwarder "log exporter" +//! JSON shape (APM v0.2 span schema). +//! +//! This deliberately does **not** reuse the derived [`serde::Serialize`] impl on +//! [`Span`], which targets the msgpack v04 wire format (numeric ids, different +//! skip rules). The log format requires `trace_id`/`span_id`/`parent_id` as +//! zero-padded lowercase hex strings, `error` always present, and `type`/`meta`/ +//! `metrics`/`meta_struct`/`span_links`/`span_events` omitted when empty. +//! +//! The wrappers below serialize via the span's associated text type +//! (`T::Text`, which is always [`serde::Serialize`] through the `SpanText` +//! trait bound) rather than the whole `T`, so the encoder does not require a +//! `T: Serialize` bound — keeping the public exporter API free of that bound. + +use crate::span::v04::{Span, SpanEvent, SpanLink}; +use crate::span::TraceData; +use serde::ser::{SerializeSeq, SerializeStruct}; +use serde::{Serialize, Serializer}; +use std::borrow::Borrow; + +const HEX_DIGITS: &[u8; 16] = b"0123456789abcdef"; + +/// Writes `value` as 16 zero-padded lowercase hex bytes into the start of `out`. +/// +/// The caller guarantees `out` has room for at least 16 bytes. +fn fill_hex(out: &mut [u8], value: u64) { + for i in 0..16 { + out[15 - i] = HEX_DIGITS[((value >> (i * 4)) & 0xf) as usize]; + } +} + +/// Serializes `buf` as a JSON string. `buf` must only contain ASCII hex digits, +/// so the UTF-8 check below never fails in practice. +fn serialize_ascii_hex(serializer: S, buf: &[u8]) -> Result { + match std::str::from_utf8(buf) { + Ok(text) => serializer.serialize_str(text), + // Unreachable: `fill_hex` only writes ASCII hex digits. + Err(_) => serializer.serialize_str(""), + } +} + +/// A `u64` id rendered as a 16-char zero-padded lowercase hex JSON string. +struct HexU64(u64); + +impl Serialize for HexU64 { + fn serialize(&self, serializer: S) -> Result { + let mut buf = [0u8; 16]; + fill_hex(&mut buf, self.0); + serialize_ascii_hex(serializer, &buf) + } +} + +/// A 128-bit trace id rendered as hex. When the high 64 bits are zero it is +/// emitted as 16 hex chars (the low 64 bits); otherwise as the full 32 chars. +struct HexTraceId(u128); + +impl Serialize for HexTraceId { + fn serialize(&self, serializer: S) -> Result { + let high = (self.0 >> 64) as u64; + let low = self.0 as u64; + if high == 0 { + let mut buf = [0u8; 16]; + fill_hex(&mut buf, low); + serialize_ascii_hex(serializer, &buf) + } else { + let mut buf = [0u8; 32]; + fill_hex(&mut buf[0..16], high); + fill_hex(&mut buf[16..32], low); + serialize_ascii_hex(serializer, &buf) + } + } +} + +/// Serializes a [`SpanLink`] in the v04 JSON shape without requiring `T: +/// Serialize` (only `T::Text`, via the `SpanText` bound, is needed). +struct LogSpanLink<'a, T: TraceData>(&'a SpanLink); + +impl Serialize for LogSpanLink<'_, T> { + fn serialize(&self, serializer: S) -> Result { + let link = self.0; + let has_attributes = !link.attributes.is_empty(); + let has_tracestate = !Borrow::::borrow(&link.tracestate).is_empty(); + let has_flags = link.flags != 0; + + // Always: trace_id, trace_id_high, span_id. + let mut len = 3; + len += has_attributes as usize; + len += has_tracestate as usize; + len += has_flags as usize; + + let mut state = serializer.serialize_struct("span_link", len)?; + state.serialize_field("trace_id", &link.trace_id)?; + state.serialize_field("trace_id_high", &link.trace_id_high)?; + state.serialize_field("span_id", &link.span_id)?; + if has_attributes { + state.serialize_field("attributes", &link.attributes)?; + } + if has_tracestate { + state.serialize_field("tracestate", &link.tracestate)?; + } + if has_flags { + state.serialize_field("flags", &link.flags)?; + } + state.end() + } +} + +/// Serializes a [`SpanEvent`] in the v04 JSON shape without requiring `T: +/// Serialize`. The event's attribute values (`AttributeAnyValue`) already +/// serialize via a `T::Text`-bounded impl. +struct LogSpanEvent<'a, T: TraceData>(&'a SpanEvent); + +impl Serialize for LogSpanEvent<'_, T> { + fn serialize(&self, serializer: S) -> Result { + let event = self.0; + let has_attributes = !event.attributes.is_empty(); + + // Always: time_unix_nano, name. + let mut len = 2; + len += has_attributes as usize; + + let mut state = serializer.serialize_struct("span_event", len)?; + state.serialize_field("time_unix_nano", &event.time_unix_nano)?; + state.serialize_field("name", &event.name)?; + if has_attributes { + state.serialize_field("attributes", &event.attributes)?; + } + state.end() + } +} + +/// Serializes a slice of `Item` as a JSON array using the `Wrap` per-element +/// wrapper, avoiding a `T: Serialize` bound on the element type. +struct LogSeq<'a, I, W>(&'a [I], fn(&'a I) -> W); + +impl Serialize for LogSeq<'_, I, W> { + fn serialize(&self, serializer: S) -> Result { + let mut seq = serializer.serialize_seq(Some(self.0.len()))?; + for item in self.0 { + seq.serialize_element(&(self.1)(item))?; + } + seq.end() + } +} + +/// Wraps a [`Span`] so that [`serde_json`] emits the Datadog Forwarder log shape. +pub(crate) struct LogSpan<'a, T: TraceData>(pub &'a Span); + +impl Serialize for LogSpan<'_, T> { + fn serialize(&self, serializer: S) -> Result { + let span = self.0; + + let has_type = !Borrow::::borrow(&span.r#type).is_empty(); + let has_meta = !span.meta.is_empty(); + let has_metrics = !span.metrics.is_empty(); + let has_links = !span.span_links.is_empty(); + let has_events = !span.span_events.is_empty(); + + // Required: trace_id, span_id, parent_id, service, name, resource, error, + // start, duration. `meta_struct` is intentionally NOT emitted: it holds raw + // msgpack bytes that would serialize as a JSON number array the intake + // cannot interpret (the reference JS/Go/Py/Java exporters omit it too). + let mut len = 9; + len += has_type as usize; + len += has_meta as usize; + len += has_metrics as usize; + len += has_links as usize; + len += has_events as usize; + + let mut state = serializer.serialize_struct("span", len)?; + state.serialize_field("trace_id", &HexTraceId(span.trace_id))?; + state.serialize_field("span_id", &HexU64(span.span_id))?; + state.serialize_field("parent_id", &HexU64(span.parent_id))?; + state.serialize_field("service", &span.service)?; + state.serialize_field("name", &span.name)?; + state.serialize_field("resource", &span.resource)?; + if has_type { + state.serialize_field("type", &span.r#type)?; + } + state.serialize_field("error", &span.error)?; + state.serialize_field("start", &span.start)?; + state.serialize_field("duration", &span.duration)?; + if has_meta { + state.serialize_field("meta", &span.meta)?; + } + if has_metrics { + state.serialize_field("metrics", &span.metrics)?; + } + if has_links { + state.serialize_field("span_links", &LogSeq(&span.span_links, LogSpanLink))?; + } + if has_events { + state.serialize_field("span_events", &LogSeq(&span.span_events, LogSpanEvent))?; + } + state.end() + } +} diff --git a/libdd-trace-utils/src/lib.rs b/libdd-trace-utils/src/lib.rs index aa8d93c887..fbe798175b 100644 --- a/libdd-trace-utils/src/lib.rs +++ b/libdd-trace-utils/src/lib.rs @@ -8,6 +8,7 @@ #![cfg_attr(not(test), deny(clippy::unimplemented))] pub mod config_utils; +pub mod json_log_encoder; pub mod msgpack_decoder; pub mod msgpack_encoder; pub mod otlp_encoder;