diff --git a/libdd-capabilities-impl/src/lib.rs b/libdd-capabilities-impl/src/lib.rs index d90d19034b..e2b48d7d74 100644 --- a/libdd-capabilities-impl/src/lib.rs +++ b/libdd-capabilities-impl/src/lib.rs @@ -15,7 +15,7 @@ use std::time::Duration; pub use http::NativeHttpClient; use libdd_capabilities::{http::HttpError, MaybeSend}; -pub use libdd_capabilities::{HttpClientCapability, SleepCapability}; +pub use libdd_capabilities::{HttpClientCapability, LogWriterCapability, SleepCapability}; pub use sleep::NativeSleepCapability; /// Bundle struct for native platform capabilities. @@ -64,6 +64,18 @@ impl HttpClientCapability for NativeCapabilities { } } +impl LogWriterCapability for NativeCapabilities { + fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()> { + use std::io::Write; + // `Stdout` is internally synchronized; lock once so the whole buffer + // (one or more newline-terminated JSON lines) is written without + // interleaving, then flush. + let mut out = std::io::stdout().lock(); + out.write_all(bytes)?; + out.flush() + } +} + impl SleepCapability for NativeCapabilities { fn new() -> Self { Self { diff --git a/libdd-capabilities/src/lib.rs b/libdd-capabilities/src/lib.rs index 340fbf2c6c..1798946057 100644 --- a/libdd-capabilities/src/lib.rs +++ b/libdd-capabilities/src/lib.rs @@ -4,11 +4,13 @@ //! Portable capability traits for cross-platform libdatadog. pub mod http; +pub mod log_output; pub mod maybe_send; pub mod sleep; pub mod spawn; pub use self::http::{HttpClientCapability, HttpError}; +pub use self::log_output::LogWriterCapability; pub use self::sleep::SleepCapability; pub use self::spawn::SpawnError; pub use ::http::{Request, Response}; diff --git a/libdd-capabilities/src/log_output.rs b/libdd-capabilities/src/log_output.rs new file mode 100644 index 0000000000..9b75e3124e --- /dev/null +++ b/libdd-capabilities/src/log_output.rs @@ -0,0 +1,19 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Log-output capability trait. +//! +//! Lets the trace pipeline emit already-encoded log-exporter output to the +//! platform's log sink. On native targets this writes to process stdout; a wasm +//! consumer implements it by handing the bytes to the host (e.g. JavaScript), +//! since wasm cannot write to stdout directly. + +/// Capability for writing encoded log-exporter output to the platform log sink. +pub trait LogWriterCapability { + /// Write a buffer of newline-delimited log output. + /// + /// `bytes` may contain one or more `\n`-terminated JSON lines. Implementations + /// should write the whole buffer (so individual lines are not interleaved with + /// other writers) and flush before returning. + fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()>; +} diff --git a/libdd-data-pipeline-ffi/src/trace_exporter.rs b/libdd-data-pipeline-ffi/src/trace_exporter.rs index 5271c86e63..8e71458246 100644 --- a/libdd-data-pipeline-ffi/src/trace_exporter.rs +++ b/libdd-data-pipeline-ffi/src/trace_exporter.rs @@ -83,6 +83,8 @@ pub struct TraceExporterConfig { connection_timeout: Option, shared_runtime: Option>, otlp_endpoint: Option, + output_to_log: bool, + log_max_line_size: Option, } #[no_mangle] @@ -498,6 +500,37 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint( ) } +/// Configure the exporter to write traces as newline-delimited JSON to stdout (the Datadog +/// Forwarder "log exporter" path) instead of sending them to a Datadog agent. Used in serverless +/// environments (e.g. AWS Lambda) when no agent is reachable. +/// +/// `max_line_size` overrides the per-line byte cap; pass `0` to use the default (256 KiB, the AWS +/// CloudWatch Logs limit). When enabled, agent-specific behavior (agent-info polling, client-side +/// stats, V1 negotiation) is bypassed. +/// +/// In this mode each span's `meta` is serialized to process stdout (and thus captured by CloudWatch +/// Logs in Lambda); `meta_struct` is excluded because it holds raw msgpack the log intake cannot +/// interpret. +/// +/// Writes are synchronous/blocking on stdout, so this mode targets single-threaded / current-thread +/// serverless runtimes (e.g. AWS Lambda) where a blocking write won't stall a shared async reactor. +#[no_mangle] +pub unsafe extern "C" fn ddog_trace_exporter_config_set_output_to_log( + config: Option<&mut TraceExporterConfig>, + max_line_size: usize, +) -> Option> { + catch_panic!( + if let Some(handle) = config { + handle.output_to_log = true; + handle.log_max_line_size = (max_line_size != 0).then_some(max_line_size); + None + } else { + gen_error!(ErrorCode::InvalidArgument) + }, + gen_error!(ErrorCode::Panic) + ) +} + /// Create a new TraceExporter instance. /// /// When an OTLP endpoint is configured via `TraceExporterConfig`, the exporter sends traces in @@ -567,6 +600,10 @@ pub unsafe extern "C" fn ddog_trace_exporter_new( builder.set_otlp_endpoint(url); } + if config.output_to_log { + builder.set_output_to_log(config.log_max_line_size); + } + match builder.build() { Ok(exporter) => { out_handle.as_ptr().write(Box::new(exporter)); @@ -662,11 +699,37 @@ mod tests { assert!(cfg.process_tags.is_none()); assert!(cfg.test_session_token.is_none()); assert!(cfg.connection_timeout.is_none()); + assert!(!cfg.output_to_log); + assert_eq!(cfg.log_max_line_size, None); ddog_trace_exporter_config_free(cfg); } } + #[test] + fn config_output_to_log_test() { + unsafe { + // Null config handle -> InvalidArgument. + let error = ddog_trace_exporter_config_set_output_to_log(None, 0); + assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument); + ddog_trace_exporter_error_free(error); + + // 0 is a sentinel for "use the default cap" -> None. + let mut config = Some(TraceExporterConfig::default()); + let error = ddog_trace_exporter_config_set_output_to_log(config.as_mut(), 0); + assert_eq!(error, None); + let cfg = config.unwrap(); + assert!(cfg.output_to_log); + assert_eq!(cfg.log_max_line_size, None); + + // Non-zero cap is stored as-is. + let mut config = Some(TraceExporterConfig::default()); + let error = ddog_trace_exporter_config_set_output_to_log(config.as_mut(), 4096); + assert_eq!(error, None); + assert_eq!(config.unwrap().log_max_line_size, Some(4096)); + } + } + #[test] fn config_url_test() { unsafe { diff --git a/libdd-data-pipeline/src/trace_buffer/mod.rs b/libdd-data-pipeline/src/trace_buffer/mod.rs index 09698eef18..0950ea4c3c 100644 --- a/libdd-data-pipeline/src/trace_buffer/mod.rs +++ b/libdd-data-pipeline/src/trace_buffer/mod.rs @@ -13,7 +13,7 @@ use std::{ time::{Duration, Instant}, }; -use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; +use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability}; use libdd_shared_runtime::Worker; use crate::trace_exporter::{ @@ -640,18 +640,24 @@ pub trait Export: Send + Debug { } #[derive(Debug)] -pub struct DefaultExport { +pub struct DefaultExport< + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, +> { trace_exporter: TraceExporter, } -impl DefaultExport { +impl< + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, + > DefaultExport +{ pub fn new(trace_exporter: TraceExporter) -> Self { Self { trace_exporter } } } -impl - Export for DefaultExport +impl< + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, + > Export for DefaultExport { fn export_trace_chunks( &mut self, diff --git a/libdd-data-pipeline/src/trace_exporter/builder.rs b/libdd-data-pipeline/src/trace_exporter/builder.rs index 3c0e1f14b5..a3b94d5478 100644 --- a/libdd-data-pipeline/src/trace_exporter/builder.rs +++ b/libdd-data-pipeline/src/trace_exporter/builder.rs @@ -8,6 +8,7 @@ use crate::otlp::OtlpTraceConfig; use crate::telemetry::TelemetryClientBuilder; use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; use crate::trace_exporter::error::BuilderErrorKind; +use crate::trace_exporter::log_writer::DEFAULT_LOG_MAX_LINE_SIZE; #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] use crate::trace_exporter::TelemetryConfig; #[cfg(not(target_arch = "wasm32"))] @@ -18,7 +19,7 @@ use crate::trace_exporter::{ TracerMetadata, INFO_ENDPOINT, }; use arc_swap::ArcSwap; -use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; +use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability}; use libdd_common::{parse_uri, tag, Endpoint}; use libdd_dogstatsd_client::new; use libdd_shared_runtime::SharedRuntime; @@ -65,6 +66,11 @@ pub struct TraceExporterBuilder { connection_timeout: Option, otlp_endpoint: Option, otlp_headers: Vec<(String, String)>, + /// When true, traces are written as newline-delimited JSON to stdout (the + /// Datadog Forwarder "log exporter" path) instead of being sent to an agent. + output_to_log: bool, + /// Optional override for the maximum size of a single emitted log line. + log_max_line_size: Option, } impl TraceExporterBuilder { @@ -296,12 +302,40 @@ impl TraceExporterBuilder { self } + /// Configure the exporter to write traces as newline-delimited JSON to stdout + /// (the Datadog Forwarder "log exporter" path) instead of sending them to a + /// Datadog agent. This is the transport used in serverless environments (e.g. + /// AWS Lambda) when no agent is reachable. + /// + /// `max_line_size` overrides the per-line byte cap; `None` (or `Some(0)`, + /// which is coerced to the default) uses the default of 256 KiB, the AWS + /// CloudWatch Logs per-event limit. When this is set, agent-specific behavior + /// (agent-info polling, client-side stats, V1 negotiation) is bypassed. + /// + /// In this mode each span's `meta` is serialized to process stdout (and thus + /// captured by CloudWatch Logs in Lambda); `meta_struct` is excluded because + /// it holds raw msgpack the log intake cannot interpret. Writes are + /// synchronous/blocking, so this mode targets single-threaded serverless + /// runtimes where blocking stdout writes do not stall a shared async reactor. + /// + /// Takes precedence over an OTLP endpoint: if both this and `set_otlp_endpoint` + /// are configured, traces are written to the log output and not sent via OTLP. + pub fn set_output_to_log(&mut self, max_line_size: Option) -> &mut Self { + self.output_to_log = true; + // Treat `Some(0)` as "use the default" (a 0 cap would drop every span); + // keeps parity with the FFI setter's 0-sentinel. + self.log_max_line_size = max_line_size.filter(|&n| n != 0); + self + } + /// Build the [`TraceExporter`] synchronously. /// /// Sync facade over [`Self::build_async`]; panics inside an existing tokio context. /// Not available on wasm — use [`Self::build_async`] there. #[cfg(not(target_arch = "wasm32"))] - pub fn build( + pub fn build< + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, + >( mut self, ) -> Result, TraceExporterError> { let shared_runtime = match self.shared_runtime.as_ref() { @@ -320,7 +354,7 @@ impl TraceExporterBuilder { /// Awaits all async setup (e.g. telemetry start-up). Safe to drive from any async /// context. pub async fn build_async< - C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, >( self, ) -> Result, TraceExporterError> { @@ -365,19 +399,26 @@ impl TraceExporterBuilder { let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)); let (info_fetcher, info_response_observer) = AgentInfoFetcher::::new(info_endpoint, Duration::from_secs(5 * 60)); - let info_fetcher_handle = - shared_runtime - .spawn_worker(info_fetcher, false) - .map_err(|e| { - TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( - e.to_string(), - )) - })?; - // The handle is currently only tracked for shutdown on native; on wasm - // it is dropped here (the worker keeps running on the JS event loop - // until the page/module is torn down). + // In log-export mode there is no agent to poll; skip spawning the worker + // entirely so we don't make repeated failing `/info` calls (e.g. in Lambda). + let info_fetcher_handle = if self.output_to_log { + None + } else { + Some( + shared_runtime + .spawn_worker(info_fetcher, false) + .map_err(|e| { + TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( + e.to_string(), + )) + })?, + ) + }; + // The handle is only tracked for shutdown on native; on wasm the `workers` + // field is cfg'd out, so drop it here (the worker keeps running on the JS + // event loop until the page/module is torn down). #[cfg(target_arch = "wasm32")] - let _ = info_fetcher_handle; + drop(info_fetcher_handle); #[allow(unused_mut)] let mut stats = StatsComputationStatus::Disabled; @@ -389,7 +430,13 @@ impl TraceExporterBuilder { #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] let (telemetry_client, telemetry_handle) = { let sessions = self.telemetry_instrumentation_sessions; - let telemetry = self.telemetry.map(|telemetry_config| { + // Telemetry talks to the agent; disable it in log-export mode. + let telemetry = if self.output_to_log { + None + } else { + self.telemetry + } + .map(|telemetry_config| { let mut builder = TelemetryClientBuilder::default() .set_language(&self.language) .set_language_version(&self.language_version) @@ -455,6 +502,10 @@ impl TraceExporterBuilder { } }); + let log_output = self + .output_to_log + .then(|| self.log_max_line_size.unwrap_or(DEFAULT_LOG_MAX_LINE_SIZE)); + Ok(TraceExporter { endpoint: Endpoint { url: agent_url, @@ -514,6 +565,7 @@ impl TraceExporterBuilder { .agent_rates_payload_version_enabled .then(AgentResponsePayloadVersion::new), otlp_config, + log_output, }) } @@ -545,6 +597,38 @@ mod tests { use crate::trace_exporter::error::BuilderErrorKind; use libdd_capabilities_impl::NativeCapabilities; + #[cfg_attr(miri, ignore)] + #[test] + fn test_log_output_mode() { + let mut builder = TraceExporterBuilder::default(); + builder + .set_service("test") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_to_log(None); + let exporter = builder.build::().unwrap(); + // Log-output mode is enabled and the agent-info worker is not spawned. + // (End-to-end send -> bytes is covered by the capability-injecting test in + // `mod.rs` and the `log_writer` unit tests.) + assert!( + exporter.log_output.is_some(), + "log_output should be set in log-output mode" + ); + assert!( + exporter.workers.info_fetcher.is_none(), + "no agent-info worker should be spawned in log mode" + ); + } + + #[test] + fn set_output_to_log_some_zero_uses_default() { + // `Some(0)` is coerced to "use the default cap" (a 0 cap would drop every + // span), which is represented as `None` on the builder field. + let mut builder = TraceExporterBuilder::default(); + builder.set_output_to_log(Some(0)); + assert!(builder.output_to_log); + assert_eq!(builder.log_max_line_size, None); + } + #[cfg_attr(miri, ignore)] #[test] fn test_new() { diff --git a/libdd-data-pipeline/src/trace_exporter/log_writer.rs b/libdd-data-pipeline/src/trace_exporter/log_writer.rs new file mode 100644 index 0000000000..40b5ca2436 --- /dev/null +++ b/libdd-data-pipeline/src/trace_exporter/log_writer.rs @@ -0,0 +1,94 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Stdout "log exporter" trace transport. +//! +//! Encodes trace chunks as newline-delimited JSON in the format consumed by the +//! Datadog Forwarder and writes them through the [`LogWriterCapability`] (stdout +//! on native targets; host-provided, e.g. handed to JavaScript, on wasm). This +//! is used in serverless environments (primarily AWS Lambda) where no agent is +//! reachable; a downstream tool (the Datadog Forwarder) tails the platform logs +//! and submits the traces to the trace intake. + +use libdd_capabilities::LogWriterCapability; +use libdd_trace_utils::json_log_encoder::{encode_traces, EncodeStats}; +use libdd_trace_utils::span::{v04::Span, TraceData}; + +/// Default maximum size of a single emitted log line, in bytes. +/// +/// Matches the AWS CloudWatch Logs per-event limit (256 KiB), consistent with +/// dd-trace-go's `logBufferLimit`. Spans are packed greedily up to this size; +/// a single span that alone exceeds it is dropped (see [`encode_traces`]). +pub(crate) const DEFAULT_LOG_MAX_LINE_SIZE: usize = 256 * 1024; + +/// Encode `traces` to newline-delimited Forwarder JSON and write them through the +/// log-output capability. Returns counts of spans written/dropped. +/// +/// Writes are synchronous: on native targets this blocks on a stdout write, so +/// log-export mode is intended for single-threaded / current-thread serverless +/// runtimes (e.g. AWS Lambda) where there is no shared async reactor to stall. +pub(crate) fn write_log_traces( + capabilities: &C, + traces: &[Vec>], + max_line_size: usize, +) -> std::io::Result { + let mut buf: Vec = Vec::new(); + let stats = encode_traces(traces, &mut buf, max_line_size)?; + if !buf.is_empty() { + capabilities.write_log_output(&buf)?; + } + Ok(stats) +} + +#[cfg(test)] +mod tests { + use super::*; + use libdd_trace_utils::span::v04::Span; + use libdd_trace_utils::span::SliceData; + use std::sync::{Arc, Mutex}; + + /// Test capability that captures written bytes instead of touching stdout. + #[derive(Clone, Default)] + struct CapturingLog(Arc>>); + + impl LogWriterCapability for CapturingLog { + fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()> { + self.0 + .lock() + .expect("capture lock") + .extend_from_slice(bytes); + Ok(()) + } + } + + #[test] + fn encodes_and_writes_through_capability() { + let cap = CapturingLog::default(); + let traces = vec![vec![Span::> { + trace_id: 1, + span_id: 2, + ..Default::default() + }]]; + + let stats = write_log_traces(&cap, &traces, DEFAULT_LOG_MAX_LINE_SIZE).expect("write ok"); + assert_eq!(stats.spans_written, 1); + assert_eq!(stats.spans_dropped, 0); + + let out = cap.0.lock().expect("lock"); + let text = std::str::from_utf8(&out).expect("utf8"); + assert!(text.ends_with('\n'), "line must be newline-terminated"); + let v: serde_json::Value = serde_json::from_str(text.trim_end()).expect("valid json"); + // Forwarder is_trace contract. + assert!(v["traces"][0][0]["trace_id"].is_string()); + assert_eq!(v["traces"][0][0]["span_id"], "0000000000000002"); + } + + #[test] + fn empty_traces_do_not_call_capability() { + let cap = CapturingLog::default(); + let traces: Vec>>> = vec![]; + let stats = write_log_traces(&cap, &traces, DEFAULT_LOG_MAX_LINE_SIZE).expect("ok"); + assert_eq!(stats.spans_written, 0); + assert!(cap.0.lock().expect("lock").is_empty()); + } +} diff --git a/libdd-data-pipeline/src/trace_exporter/mod.rs b/libdd-data-pipeline/src/trace_exporter/mod.rs index 839c6b7247..b4dd6799ea 100644 --- a/libdd-data-pipeline/src/trace_exporter/mod.rs +++ b/libdd-data-pipeline/src/trace_exporter/mod.rs @@ -3,6 +3,7 @@ pub mod agent_response; pub mod builder; pub mod error; +mod log_writer; pub mod metrics; pub mod stats; mod trace_serializer; @@ -11,6 +12,7 @@ mod trace_serializer; pub use builder::TraceExporterBuilder; use self::agent_response::AgentResponse; +use self::log_writer::write_log_traces; use self::metrics::MetricsEmitter; use self::stats::StatsComputationStatus; use self::trace_serializer::TraceSerializer; @@ -35,7 +37,7 @@ use bytes::Bytes; use http::header::HeaderMap; use http::uri::PathAndQuery; use http::Uri; -use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; +use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability}; use libdd_common::tag::Tag; use libdd_common::Endpoint; use libdd_dogstatsd_client::Client; @@ -139,7 +141,8 @@ pub use libdd_trace_utils::tracer_metadata::TracerMetadata; #[cfg(not(target_arch = "wasm32"))] #[derive(Debug)] pub(crate) struct TraceExporterWorkers { - info_fetcher: WorkerHandle, + /// `None` in log-export mode, where no agent `/info` polling is performed. + info_fetcher: Option, #[cfg(feature = "telemetry")] telemetry: Option, } @@ -181,7 +184,9 @@ impl From for DeserInputFormat { /// pin it to a concrete type (`NativeCapabilities` or `WasmCapabilities`). /// Task spawning is handled internally by `SharedRuntime`. #[derive(Debug)] -pub struct TraceExporter { +pub struct TraceExporter< + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, +> { endpoint: Endpoint, metadata: TracerMetadata, input_format: TraceExporterInputFormat, @@ -212,9 +217,17 @@ pub struct TraceExporter, /// When set, traces are exported via OTLP HTTP/JSON instead of the Datadog agent. otlp_config: Option, + /// When `Some(max_line_size)`, traces are written as newline-delimited JSON + /// through the [`LogWriterCapability`] (the Datadog Forwarder "log exporter" + /// path) instead of being sent to an agent. Used in serverless environments + /// where no agent is reachable. + log_output: Option, } -impl TraceExporter { +impl< + C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, + > TraceExporter +{ #[allow(missing_docs)] pub fn builder() -> TraceExporterBuilder { TraceExporterBuilder::default() @@ -267,8 +280,9 @@ impl Tra join_set.spawn(async move { handle.stop().await }); } - let info_fetcher = self.workers.info_fetcher; - join_set.spawn(async move { info_fetcher.stop().await }); + if let Some(info_fetcher) = self.workers.info_fetcher { + join_set.spawn(async move { info_fetcher.stop().await }); + } #[cfg(feature = "telemetry")] if let Some(telemetry) = self.workers.telemetry { @@ -302,7 +316,10 @@ impl Tra /// `data` must be encoded per the `input_format` given to the builder. /// [`Self::send`] is the sync facade over this method. pub async fn send_async(&self, data: &[u8]) -> Result { - self.check_agent_info().await; + // In log-export mode there is no agent to negotiate with; skip the poll. + if self.log_output.is_none() { + self.check_agent_info().await; + } let format: DeserInputFormat = self.input_format.into(); @@ -499,7 +516,10 @@ impl Tra &self, trace_chunks: Vec>>, ) -> Result { - self.check_agent_info().await; + // In log-export mode there is no agent to negotiate with; skip the poll. + if self.log_output.is_none() { + self.check_agent_info().await; + } self.send_trace_chunks_inner(trace_chunks).await } @@ -575,6 +595,25 @@ impl Tra &self, mut traces: Vec>>, ) -> Result { + // Log-export path: write traces as newline-delimited JSON through the + // log-output capability (stdout on native; host/JS on wasm) for the Datadog + // Forwarder. No agent, stats, OTLP, or telemetry involved. + // + // We emit all spans as-is and do NOT drop unsampled (p0) chunks here + // (unlike the OTLP path): the reference log exporters (JS/Go/Py/Java) write + // every span they are handed and defer sampling to the trace intake behind + // the Datadog Forwarder. + if let Some(max_line_size) = self.log_output { + let stats = write_log_traces(&self.capabilities, &traces, max_line_size) + .map_err(TraceExporterError::Io)?; + debug!( + spans_written = stats.spans_written, + spans_dropped = stats.spans_dropped, + "Wrote traces to log exporter" + ); + return Ok(AgentResponse::Unchanged); + } + let mut header_tags: TracerHeaderTags = self.metadata.borrow().into(); // Process stats computation and drop non-sampled (p0) chunks. @@ -1129,6 +1168,121 @@ mod tests { builder.build::().unwrap() } + // Capturing capabilities: delegate HTTP/sleep to the native impls, but capture + // log output into a thread-local buffer so log-mode tests can assert the emitted + // bytes without writing to real stdout. `build` constructs `C` via + // `C::new_client`, so the buffer is shared through a thread-local rather than an + // instance field; `new_client` clears it so each build starts fresh. + thread_local! { + static LOG_CAPTURE: std::cell::RefCell> = const { std::cell::RefCell::new(Vec::new()) }; + } + + #[derive(Clone, Debug)] + struct CapturingCapabilities(NativeCapabilities); + + impl HttpClientCapability for CapturingCapabilities { + fn new_client() -> Self { + LOG_CAPTURE.with(|c| c.borrow_mut().clear()); + Self(NativeCapabilities::new_client()) + } + fn request( + &self, + req: http::Request, + ) -> impl std::future::Future< + Output = Result, libdd_capabilities::http::HttpError>, + > + MaybeSend { + self.0.request(req) + } + } + + impl SleepCapability for CapturingCapabilities { + fn new() -> Self { + Self(NativeCapabilities::new()) + } + fn sleep(&self, duration: Duration) -> impl std::future::Future + MaybeSend { + self.0.sleep(duration) + } + } + + impl LogWriterCapability for CapturingCapabilities { + fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()> { + LOG_CAPTURE.with(|c| c.borrow_mut().extend_from_slice(bytes)); + Ok(()) + } + } + + fn captured_log() -> Vec { + LOG_CAPTURE.with(|c| c.borrow().clone()) + } + + // The real `send` entry point decodes msgpack, hits the log branch, and writes + // Forwarder-format JSON bytes through the log-output capability. + #[test] + #[cfg_attr(miri, ignore)] + fn test_log_mode_send_writes_forwarder_json() { + let mut builder = TraceExporterBuilder::default(); + builder + .set_service("test") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_to_log(None); + let exporter = builder.build::().unwrap(); + + let traces: Vec> = vec![vec![SpanBytes { + name: BytesString::from_slice(b"aws.lambda").unwrap(), + trace_id: 1, + span_id: 2, + ..Default::default() + }]]; + let data = msgpack_encoder::v04::to_vec(&traces); + + let resp = exporter.send(data.as_ref()).unwrap(); + assert!(matches!(resp, AgentResponse::Unchanged)); + + let text = String::from_utf8(captured_log()).unwrap(); + assert!(text.ends_with('\n'), "line must be newline-terminated"); + let line = text.trim_end(); + let v: serde_json::Value = serde_json::from_str(line).expect("valid json line"); + // Forwarder is_trace contract + the span actually round-tripped through + // msgpack decode -> log encode with hex ids. + assert!(v["traces"][0][0]["trace_id"].is_string()); + assert_eq!(v["traces"][0][0]["span_id"], "0000000000000002"); + assert_eq!(v["traces"][0][0]["name"], "aws.lambda"); + } + + // Log mode must make zero agent HTTP calls — also guards the worker-gating fix + // (info-fetcher is not spawned in log mode). + #[test] + #[cfg_attr(miri, ignore)] + fn test_log_mode_makes_no_agent_requests() { + let fake_agent = MockServer::start(); + // No `when` constraints => matches any request to any path. + let any = fake_agent.mock(|_when, then| { + then.status(200).body("{}"); + }); + + let mut builder = TraceExporterBuilder::default(); + builder + .set_url(&fake_agent.url("/")) + .set_service("test") + .set_input_format(TraceExporterInputFormat::V04) + .set_output_to_log(None); + let exporter = builder.build::().unwrap(); + // Structural guarantee: no agent-info worker is spawned in log mode. + assert!(exporter.workers.info_fetcher.is_none()); + + let traces: Vec> = vec![vec![SpanBytes { + name: BytesString::from_slice(b"test").unwrap(), + ..Default::default() + }]]; + let data = msgpack_encoder::v04::to_vec(&traces); + // `send` is synchronous and, in log mode, returns after writing through the + // capability without initiating any HTTP; combined with the structural assert + // above this is deterministic (no background worker can race the mock). + exporter.send(data.as_ref()).unwrap(); + + assert_eq!(any.calls(), 0, "log mode must not contact the agent"); + } + #[test] #[cfg_attr(miri, ignore)] fn test_health_metrics() { diff --git a/libdd-trace-utils/src/json_log_encoder/mod.rs b/libdd-trace-utils/src/json_log_encoder/mod.rs new file mode 100644 index 0000000000..fecf60be11 --- /dev/null +++ b/libdd-trace-utils/src/json_log_encoder/mod.rs @@ -0,0 +1,497 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! JSON "log exporter" trace encoder. +//! +//! Emits traces in the newline-delimited JSON format consumed by the Datadog +//! Forwarder Lambda (the legacy serverless path used when no Datadog Agent / +//! Lambda Extension is reachable). Each emitted line is a self-contained JSON +//! document of the form: +//! +//! ```text +//! {"traces":[[ {span}, {span}, ... ]]}\n +//! ``` +//! +//! Spans are greedily packed into size-bounded lines. A single span that alone +//! exceeds the line cap is dropped (and counted in [`EncodeStats::spans_dropped`]) +//! rather than emitted as a truncated, unparseable line. +//! +//! See the cross-language specification for the wire contract and the +//! Forwarder's `is_trace` detection requirements. + +mod span; + +use crate::span::v04::Span; +use crate::span::TraceData; +use span::LogSpan; +use std::io::Write; + +/// Opening bytes of every emitted line: `{"traces":[[`. +const TRACE_PREFIX: &[u8] = b"{\"traces\":[["; +/// Closing bytes of every emitted line: `]]}` plus the terminating newline. +const TRACE_SUFFIX: &[u8] = b"]]}\n"; +/// Fixed per-line overhead contributed by [`TRACE_PREFIX`] and [`TRACE_SUFFIX`]. +const TRACE_FORMAT_OVERHEAD: usize = TRACE_PREFIX.len() + TRACE_SUFFIX.len(); + +/// Statistics reported by [`encode_traces`]. +#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)] +pub struct EncodeStats { + /// Number of spans successfully written to `out`. + pub spans_written: usize, + /// Number of spans dropped because a single span exceeded `max_line_size`. + pub spans_dropped: usize, +} + +/// Encodes `traces` into newline-delimited JSON "log exporter" lines, writing +/// them to `out`. +/// +/// All spans across all input traces are flattened and greedily packed into +/// lines no larger than `max_line_size` bytes (including the `{"traces":[[` / +/// `]]}\n` framing). Each line contains a single inner trace array, matching the +/// reference dd-trace-js exporter. A span whose own serialized size plus the +/// framing overhead exceeds `max_line_size` is dropped (counted in +/// [`EncodeStats::spans_dropped`]) and never split across lines. +/// +/// The caller is responsible for flushing `out` (e.g. stdout) after this returns. +/// +/// # Errors +/// +/// Returns any [`std::io::Error`] produced while writing to `out`. +/// +/// # Examples +/// +/// ``` +/// use libdd_trace_utils::json_log_encoder::encode_traces; +/// use libdd_trace_utils::span::v04::SpanSlice; +/// +/// let span = SpanSlice { +/// service: "my-fn".into(), +/// name: "aws.lambda".into(), +/// resource: "my-fn".into(), +/// trace_id: 1, +/// span_id: 2, +/// ..Default::default() +/// }; +/// let traces = vec![vec![span]]; +/// +/// let mut out = Vec::new(); +/// let stats = encode_traces(&traces, &mut out, 64 * 1024).unwrap(); +/// +/// assert_eq!(stats.spans_written, 1); +/// assert!(out.ends_with(b"\n")); +/// ``` +pub fn encode_traces( + traces: &[Vec>], + out: &mut impl Write, + max_line_size: usize, +) -> std::io::Result { + let mut stats = EncodeStats::default(); + + // Reusable buffers: `span_buf` holds the JSON for the span currently being + // considered; `line` accumulates the complete current line. It is primed + // with `TRACE_PREFIX` so that a single `write_all` per line can emit + // prefix + spans + suffix in one syscall (see `flush_line`). + let mut span_buf: Vec = Vec::with_capacity(512); + let mut line: Vec = Vec::with_capacity(max_line_size.min(64 * 1024)); + line.extend_from_slice(TRACE_PREFIX); + let mut line_span_count: usize = 0; + + for trace in traces { + for span in trace { + span_buf.clear(); + serde_json::to_writer(&mut span_buf, &LogSpan(span)).map_err(std::io::Error::other)?; + let span_len = span_buf.len(); + + // A span that cannot fit on a line by itself is dropped rather than + // emitted as a truncated, unparseable line. + if span_len + TRACE_FORMAT_OVERHEAD > max_line_size { + stats.spans_dropped += 1; + tracing::debug!( + span_len, + max_line_size, + "Span too large to send to logs, dropping" + ); + continue; + } + + // Flush the current line if appending this span would overflow. + // `line` already contains `TRACE_PREFIX`; the emitted line will also + // gain `TRACE_SUFFIX`, so account for both here. + let comma = usize::from(line_span_count > 0); + if line_span_count > 0 + && line.len() + comma + span_len + TRACE_SUFFIX.len() > max_line_size + { + flush_line(out, &mut line)?; + line_span_count = 0; + } + + if line_span_count > 0 { + line.push(b','); + } + line.extend_from_slice(&span_buf); + line_span_count += 1; + stats.spans_written += 1; + } + } + + if line_span_count > 0 { + flush_line(out, &mut line)?; + } + + Ok(stats) +} + +/// Writes one complete line (`{"traces":[[` + joined spans + `]]}\n`) in a single +/// `write_all`, then resets the line buffer (re-primed with `TRACE_PREFIX`) for +/// reuse. +/// +/// `line` is expected to already contain `TRACE_PREFIX` followed by the +/// comma-joined spans; this appends `TRACE_SUFFIX` to complete the line. +fn flush_line(out: &mut impl Write, line: &mut Vec) -> std::io::Result<()> { + line.extend_from_slice(TRACE_SUFFIX); + out.write_all(line)?; + line.clear(); + line.extend_from_slice(TRACE_PREFIX); + Ok(()) +} + +#[cfg(test)] +// `SpanSlice` fields are `Cow<'a, str>` (SliceData::Text), so the `"literal".into()` +// conversions below are genuine `&str -> Cow` conversions required to compile. clippy on +// the CI target nonetheless reports them as `useless_conversion` to `&str` (a false positive +// not reproduced on all hosts); allow it here rather than dropping the necessary `.into()`. +#[allow(clippy::useless_conversion)] +mod tests { + use super::*; + use crate::span::v04::SpanSlice; + use serde_json::Value; + use std::collections::HashMap; + + const MAX: usize = 64 * 1024; + + fn lines(out: &[u8]) -> Vec { + String::from_utf8(out.to_vec()) + .unwrap() + .lines() + .map(|s| s.to_string()) + .collect() + } + + #[test] + fn golden_known_span() { + let span = SpanSlice { + service: "my-fn".into(), + name: "aws.lambda".into(), + resource: "my-fn".into(), + r#type: "serverless".into(), + trace_id: 1, + span_id: 2, + parent_id: 0, + start: 1717200000000000000, + duration: 1500000, + error: 0, + meta: HashMap::from([("env".into(), "prod".into())]), + metrics: HashMap::from([("_sampling_priority_v1".into(), 1.0)]), + ..Default::default() + }; + let mut out = Vec::new(); + let stats = encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + assert_eq!(stats.spans_written, 1); + assert_eq!(stats.spans_dropped, 0); + + let expected = concat!( + "{\"traces\":[[", + "{\"trace_id\":\"0000000000000001\",", + "\"span_id\":\"0000000000000002\",", + "\"parent_id\":\"0000000000000000\",", + "\"service\":\"my-fn\",", + "\"name\":\"aws.lambda\",", + "\"resource\":\"my-fn\",", + "\"type\":\"serverless\",", + "\"error\":0,", + "\"start\":1717200000000000000,", + "\"duration\":1500000,", + "\"meta\":{\"env\":\"prod\"},", + "\"metrics\":{\"_sampling_priority_v1\":1.0}", + "}]]}\n", + ); + assert_eq!(String::from_utf8(out).unwrap(), expected); + } + + #[test] + fn meta_struct_is_omitted() { + let span = SpanSlice { + trace_id: 1, + span_id: 2, + meta_struct: HashMap::from([("_dd.appsec.json".into(), [0x81u8, 0xa4].as_slice())]), + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + let text = String::from_utf8(out).unwrap(); + assert!( + !text.contains("meta_struct"), + "meta_struct must not be emitted: {text}" + ); + // Still a valid, Forwarder-parseable line. + let v: Value = serde_json::from_str(text.trim_end()).unwrap(); + assert!(v["traces"][0][0]["trace_id"].is_string()); + } + + #[test] + fn hex_high_bits_and_root_parent() { + // trace_id with the high 64 bits set => 32 hex chars. + let trace_id: u128 = (0xABCDu128 << 64) | 0x1; + let span = SpanSlice { + trace_id, + span_id: 0xFF, + parent_id: 0, + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + let text = String::from_utf8(out).unwrap(); + assert!( + text.contains("\"trace_id\":\"000000000000abcd0000000000000001\""), + "got: {text}" + ); + assert!(text.contains("\"span_id\":\"00000000000000ff\"")); + assert!(text.contains("\"parent_id\":\"0000000000000000\"")); + } + + #[test] + fn error_is_integer() { + let span = SpanSlice { + error: 1, + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + let text = String::from_utf8(out).unwrap(); + assert!(text.contains("\"error\":1,"), "got: {text}"); + } + + #[test] + fn empty_maps_and_type_omitted() { + let span = SpanSlice { + name: "op".into(), + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + let text = String::from_utf8(out).unwrap(); + assert!(!text.contains("\"meta\""), "got: {text}"); + assert!(!text.contains("\"metrics\"")); + assert!(!text.contains("\"meta_struct\"")); + assert!(!text.contains("\"span_links\"")); + assert!(!text.contains("\"span_events\"")); + assert!(!text.contains("\"type\"")); + } + + #[test] + fn string_escaping() { + let span = SpanSlice { + name: "say \"hi\"\n".into(), + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + // Each line must remain valid JSON despite the embedded quote/newline. + for line in lines(&out) { + let parsed: Value = serde_json::from_str(&line).unwrap(); + assert_eq!( + parsed["traces"][0][0]["name"].as_str().unwrap(), + "say \"hi\"\n" + ); + } + } + + #[test] + fn size_cap_batches_into_multiple_lines() { + // Build several spans that individually fit but together exceed a small cap. + let make = |id: u64| SpanSlice { + name: "x".into(), + span_id: id, + ..Default::default() + }; + let trace: Vec = (1..=6).map(make).collect(); + + // Determine one span's serialized length to size the cap so ~2 fit per line. + let mut one = Vec::new(); + encode_traces(&[vec![make(1)]], &mut one, MAX).unwrap(); + // `one` = prefix + span + suffix. The bare span length: + let span_len = one.len() - TRACE_FORMAT_OVERHEAD; + // Cap fits two spans + a comma but not three. + let cap = TRACE_FORMAT_OVERHEAD + span_len * 2 + 1; + + let mut out = Vec::new(); + let stats = encode_traces(&[trace], &mut out, cap).unwrap(); + assert_eq!(stats.spans_written, 6); + assert_eq!(stats.spans_dropped, 0); + + let emitted = lines(&out); + assert_eq!(emitted.len(), 3, "expected 3 lines, got {emitted:?}"); + for line in &emitted { + assert!(line.len() <= cap, "line over cap: {} > {cap}", line.len()); + let parsed: Value = serde_json::from_str(line).unwrap(); + assert_eq!(parsed["traces"][0].as_array().unwrap().len(), 2); + } + } + + #[test] + fn oversize_single_span_dropped() { + let big = "a".repeat(10_000); + let span = SpanSlice { + name: big.as_str().into(), + span_id: 1, + ..Default::default() + }; + let small = SpanSlice { + name: "ok".into(), + span_id: 2, + ..Default::default() + }; + let mut out = Vec::new(); + // Cap large enough for `small` but far too small for `big`. + let stats = encode_traces(&[vec![span, small]], &mut out, 1024).unwrap(); + assert_eq!(stats.spans_dropped, 1); + assert_eq!(stats.spans_written, 1); + + let emitted = lines(&out); + assert_eq!(emitted.len(), 1); + let parsed: Value = serde_json::from_str(&emitted[0]).unwrap(); + assert_eq!(parsed["traces"][0][0]["name"].as_str().unwrap(), "ok"); + } + + #[test] + fn metric_non_finite_serializes_null() { + // serde_json renders non-finite f64 as JSON null; the line must remain + // parseable and the metric values must be null (not NaN/Infinity tokens). + let span = SpanSlice { + span_id: 1, + metrics: HashMap::from([("nan".into(), f64::NAN), ("inf".into(), f64::INFINITY)]), + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + let emitted = lines(&out); + assert_eq!(emitted.len(), 1); + let parsed: Value = serde_json::from_str(&emitted[0]).unwrap(); + let metrics = &parsed["traces"][0][0]["metrics"]; + assert!(metrics["nan"].is_null(), "got: {metrics}"); + assert!(metrics["inf"].is_null(), "got: {metrics}"); + } + + #[test] + fn multi_trace_flattened_into_one_line() { + // Two input traces, both well under the cap, flatten into a single line + // whose single inner trace array holds both spans. + let span_a = SpanSlice { + name: "a".into(), + span_id: 1, + ..Default::default() + }; + let span_b = SpanSlice { + name: "b".into(), + span_id: 2, + ..Default::default() + }; + let mut out = Vec::new(); + let stats = encode_traces(&[vec![span_a], vec![span_b]], &mut out, MAX).unwrap(); + assert_eq!(stats.spans_written, 2); + let emitted = lines(&out); + assert_eq!(emitted.len(), 1, "expected one line, got {emitted:?}"); + let parsed: Value = serde_json::from_str(&emitted[0]).unwrap(); + let inner = parsed["traces"][0].as_array().unwrap(); + assert_eq!(inner.len(), 2); + assert_eq!(parsed["traces"].as_array().unwrap().len(), 1); + } + + #[test] + fn span_links_and_events_emitted_when_present() { + use crate::span::v04::{SpanEvent, SpanLink}; + + let span = SpanSlice { + span_id: 1, + span_links: vec![SpanLink { + trace_id: 7, + span_id: 8, + ..Default::default() + }], + span_events: vec![SpanEvent { + time_unix_nano: 123, + name: "evt".into(), + ..Default::default() + }], + ..Default::default() + }; + let mut out = Vec::new(); + encode_traces(&[vec![span]], &mut out, MAX).unwrap(); + let emitted = lines(&out); + assert_eq!(emitted.len(), 1); + let parsed: Value = serde_json::from_str(&emitted[0]).unwrap(); + let span_json = &parsed["traces"][0][0]; + // Lock the inner wire shape (field names/values), not just presence. + assert_eq!(span_json["span_links"][0]["trace_id"], 7); + assert_eq!(span_json["span_links"][0]["span_id"], 8); + assert_eq!(span_json["span_events"][0]["name"], "evt"); + assert_eq!(span_json["span_events"][0]["time_unix_nano"], 123); + } + + #[test] + fn empty_inner_trace_writes_nothing() { + // One trace containing zero spans: nothing is emitted and no spans counted. + let traces: Vec> = vec![vec![]]; + let mut out = Vec::new(); + let stats = encode_traces(&traces, &mut out, MAX).unwrap(); + assert!(out.is_empty()); + assert_eq!(stats.spans_written, 0); + assert_eq!(stats.spans_dropped, 0); + } + + #[test] + fn empty_input_writes_nothing() { + let traces: Vec> = vec![]; + let mut out = Vec::new(); + let stats = encode_traces(&traces, &mut out, MAX).unwrap(); + assert_eq!(stats, EncodeStats::default()); + assert!(out.is_empty()); + } + + // Mirrors the Datadog Forwarder `is_trace` detection contract: every line + // parses as JSON, top-level `traces` is a non-empty array whose `[0]` is a + // non-empty array whose `[0]` has a non-null string `trace_id`, and every + // line ends with `\n`. + #[test] + fn forwarder_is_trace_contract() { + let trace: Vec = (1u64..=5) + .map(|i| SpanSlice { + service: "svc".into(), + name: "op".into(), + trace_id: u128::from(i), + span_id: i + 100, + ..Default::default() + }) + .collect(); + + let mut out = Vec::new(); + encode_traces(&[trace], &mut out, 200).unwrap(); + + let text = String::from_utf8(out).unwrap(); + assert!(!text.is_empty()); + // Every line is newline-terminated. + for chunk in text.split_inclusive('\n') { + assert!(chunk.ends_with('\n'), "line not newline-terminated"); + let line = chunk.trim_end_matches('\n'); + let parsed: Value = serde_json::from_str(line).unwrap(); + + let traces = parsed.get("traces").and_then(Value::as_array).unwrap(); + assert!(!traces.is_empty()); + let first = traces[0].as_array().unwrap(); + assert!(!first.is_empty()); + let trace_id = first[0].get("trace_id").unwrap(); + assert!(trace_id.is_string()); + assert!(!trace_id.as_str().unwrap().is_empty()); + } + } +} diff --git a/libdd-trace-utils/src/json_log_encoder/span.rs b/libdd-trace-utils/src/json_log_encoder/span.rs new file mode 100644 index 0000000000..c39363ed95 --- /dev/null +++ b/libdd-trace-utils/src/json_log_encoder/span.rs @@ -0,0 +1,200 @@ +// Copyright 2024-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +//! Serialization of a single [`Span`] into the Datadog Forwarder "log exporter" +//! JSON shape (APM v0.2 span schema). +//! +//! This deliberately does **not** reuse the derived [`serde::Serialize`] impl on +//! [`Span`], which targets the msgpack v04 wire format (numeric ids, different +//! skip rules). The log format requires `trace_id`/`span_id`/`parent_id` as +//! zero-padded lowercase hex strings, `error` always present, and `type`/`meta`/ +//! `metrics`/`meta_struct`/`span_links`/`span_events` omitted when empty. +//! +//! The wrappers below serialize via the span's associated text type +//! (`T::Text`, which is always [`serde::Serialize`] through the `SpanText` +//! trait bound) rather than the whole `T`, so the encoder does not require a +//! `T: Serialize` bound — keeping the public exporter API free of that bound. + +use crate::span::v04::{Span, SpanEvent, SpanLink}; +use crate::span::TraceData; +use serde::ser::{SerializeSeq, SerializeStruct}; +use serde::{Serialize, Serializer}; +use std::borrow::Borrow; + +const HEX_DIGITS: &[u8; 16] = b"0123456789abcdef"; + +/// Writes `value` as 16 zero-padded lowercase hex bytes into the start of `out`. +/// +/// The caller guarantees `out` has room for at least 16 bytes. +fn fill_hex(out: &mut [u8], value: u64) { + for i in 0..16 { + out[15 - i] = HEX_DIGITS[((value >> (i * 4)) & 0xf) as usize]; + } +} + +/// Serializes `buf` as a JSON string. `buf` must only contain ASCII hex digits, +/// so the UTF-8 check below never fails in practice. +fn serialize_ascii_hex(serializer: S, buf: &[u8]) -> Result { + match std::str::from_utf8(buf) { + Ok(text) => serializer.serialize_str(text), + // Unreachable: `fill_hex` only writes ASCII hex digits. + Err(_) => serializer.serialize_str(""), + } +} + +/// A `u64` id rendered as a 16-char zero-padded lowercase hex JSON string. +struct HexU64(u64); + +impl Serialize for HexU64 { + fn serialize(&self, serializer: S) -> Result { + let mut buf = [0u8; 16]; + fill_hex(&mut buf, self.0); + serialize_ascii_hex(serializer, &buf) + } +} + +/// A 128-bit trace id rendered as hex. When the high 64 bits are zero it is +/// emitted as 16 hex chars (the low 64 bits); otherwise as the full 32 chars. +struct HexTraceId(u128); + +impl Serialize for HexTraceId { + fn serialize(&self, serializer: S) -> Result { + let high = (self.0 >> 64) as u64; + let low = self.0 as u64; + if high == 0 { + let mut buf = [0u8; 16]; + fill_hex(&mut buf, low); + serialize_ascii_hex(serializer, &buf) + } else { + let mut buf = [0u8; 32]; + fill_hex(&mut buf[0..16], high); + fill_hex(&mut buf[16..32], low); + serialize_ascii_hex(serializer, &buf) + } + } +} + +/// Serializes a [`SpanLink`] in the v04 JSON shape without requiring `T: +/// Serialize` (only `T::Text`, via the `SpanText` bound, is needed). +struct LogSpanLink<'a, T: TraceData>(&'a SpanLink); + +impl Serialize for LogSpanLink<'_, T> { + fn serialize(&self, serializer: S) -> Result { + let link = self.0; + let has_attributes = !link.attributes.is_empty(); + let has_tracestate = !Borrow::::borrow(&link.tracestate).is_empty(); + let has_flags = link.flags != 0; + + // Always: trace_id, trace_id_high, span_id. + let mut len = 3; + len += has_attributes as usize; + len += has_tracestate as usize; + len += has_flags as usize; + + let mut state = serializer.serialize_struct("span_link", len)?; + state.serialize_field("trace_id", &link.trace_id)?; + state.serialize_field("trace_id_high", &link.trace_id_high)?; + state.serialize_field("span_id", &link.span_id)?; + if has_attributes { + state.serialize_field("attributes", &link.attributes)?; + } + if has_tracestate { + state.serialize_field("tracestate", &link.tracestate)?; + } + if has_flags { + state.serialize_field("flags", &link.flags)?; + } + state.end() + } +} + +/// Serializes a [`SpanEvent`] in the v04 JSON shape without requiring `T: +/// Serialize`. The event's attribute values (`AttributeAnyValue`) already +/// serialize via a `T::Text`-bounded impl. +struct LogSpanEvent<'a, T: TraceData>(&'a SpanEvent); + +impl Serialize for LogSpanEvent<'_, T> { + fn serialize(&self, serializer: S) -> Result { + let event = self.0; + let has_attributes = !event.attributes.is_empty(); + + // Always: time_unix_nano, name. + let mut len = 2; + len += has_attributes as usize; + + let mut state = serializer.serialize_struct("span_event", len)?; + state.serialize_field("time_unix_nano", &event.time_unix_nano)?; + state.serialize_field("name", &event.name)?; + if has_attributes { + state.serialize_field("attributes", &event.attributes)?; + } + state.end() + } +} + +/// Serializes a slice of `Item` as a JSON array using the `Wrap` per-element +/// wrapper, avoiding a `T: Serialize` bound on the element type. +struct LogSeq<'a, I, W>(&'a [I], fn(&'a I) -> W); + +impl Serialize for LogSeq<'_, I, W> { + fn serialize(&self, serializer: S) -> Result { + let mut seq = serializer.serialize_seq(Some(self.0.len()))?; + for item in self.0 { + seq.serialize_element(&(self.1)(item))?; + } + seq.end() + } +} + +/// Wraps a [`Span`] so that [`serde_json`] emits the Datadog Forwarder log shape. +pub(crate) struct LogSpan<'a, T: TraceData>(pub &'a Span); + +impl Serialize for LogSpan<'_, T> { + fn serialize(&self, serializer: S) -> Result { + let span = self.0; + + let has_type = !Borrow::::borrow(&span.r#type).is_empty(); + let has_meta = !span.meta.is_empty(); + let has_metrics = !span.metrics.is_empty(); + let has_links = !span.span_links.is_empty(); + let has_events = !span.span_events.is_empty(); + + // Required: trace_id, span_id, parent_id, service, name, resource, error, + // start, duration. `meta_struct` is intentionally NOT emitted: it holds raw + // msgpack bytes that would serialize as a JSON number array the intake + // cannot interpret (the reference JS/Go/Py/Java exporters omit it too). + let mut len = 9; + len += has_type as usize; + len += has_meta as usize; + len += has_metrics as usize; + len += has_links as usize; + len += has_events as usize; + + let mut state = serializer.serialize_struct("span", len)?; + state.serialize_field("trace_id", &HexTraceId(span.trace_id))?; + state.serialize_field("span_id", &HexU64(span.span_id))?; + state.serialize_field("parent_id", &HexU64(span.parent_id))?; + state.serialize_field("service", &span.service)?; + state.serialize_field("name", &span.name)?; + state.serialize_field("resource", &span.resource)?; + if has_type { + state.serialize_field("type", &span.r#type)?; + } + state.serialize_field("error", &span.error)?; + state.serialize_field("start", &span.start)?; + state.serialize_field("duration", &span.duration)?; + if has_meta { + state.serialize_field("meta", &span.meta)?; + } + if has_metrics { + state.serialize_field("metrics", &span.metrics)?; + } + if has_links { + state.serialize_field("span_links", &LogSeq(&span.span_links, LogSpanLink))?; + } + if has_events { + state.serialize_field("span_events", &LogSeq(&span.span_events, LogSpanEvent))?; + } + state.end() + } +} diff --git a/libdd-trace-utils/src/lib.rs b/libdd-trace-utils/src/lib.rs index aa8d93c887..fbe798175b 100644 --- a/libdd-trace-utils/src/lib.rs +++ b/libdd-trace-utils/src/lib.rs @@ -8,6 +8,7 @@ #![cfg_attr(not(test), deny(clippy::unimplemented))] pub mod config_utils; +pub mod json_log_encoder; pub mod msgpack_decoder; pub mod msgpack_encoder; pub mod otlp_encoder;