Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion libdd-capabilities-impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::time::Duration;

pub use http::NativeHttpClient;
use libdd_capabilities::{http::HttpError, MaybeSend};
pub use libdd_capabilities::{HttpClientCapability, SleepCapability};
pub use libdd_capabilities::{HttpClientCapability, LogWriterCapability, SleepCapability};
pub use sleep::NativeSleepCapability;

/// Bundle struct for native platform capabilities.
Expand Down Expand Up @@ -64,6 +64,18 @@ impl HttpClientCapability for NativeCapabilities {
}
}

impl LogWriterCapability for NativeCapabilities {
fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()> {
use std::io::Write;
// `Stdout` is internally synchronized; lock once so the whole buffer
// (one or more newline-terminated JSON lines) is written without
// interleaving, then flush.
let mut out = std::io::stdout().lock();
out.write_all(bytes)?;
out.flush()
}
}

impl SleepCapability for NativeCapabilities {
fn new() -> Self {
Self {
Expand Down
2 changes: 2 additions & 0 deletions libdd-capabilities/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
//! Portable capability traits for cross-platform libdatadog.

pub mod http;
pub mod log_output;
pub mod maybe_send;
pub mod sleep;
pub mod spawn;

pub use self::http::{HttpClientCapability, HttpError};
pub use self::log_output::LogWriterCapability;
pub use self::sleep::SleepCapability;
pub use self::spawn::SpawnError;
pub use ::http::{Request, Response};
Expand Down
19 changes: 19 additions & 0 deletions libdd-capabilities/src/log_output.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/
// SPDX-License-Identifier: Apache-2.0

//! Log-output capability trait.
//!
//! Lets the trace pipeline emit already-encoded log-exporter output to the
//! platform's log sink. On native targets this writes to process stdout; a wasm
//! consumer implements it by handing the bytes to the host (e.g. JavaScript),
//! since wasm cannot write to stdout directly.

/// Capability for writing encoded log-exporter output to the platform log sink.
pub trait LogWriterCapability {
/// Write a buffer of newline-delimited log output.
///
/// `bytes` may contain one or more `\n`-terminated JSON lines. Implementations
/// should write the whole buffer (so individual lines are not interleaved with
/// other writers) and flush before returning.
fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()>;
}
63 changes: 63 additions & 0 deletions libdd-data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ pub struct TraceExporterConfig {
connection_timeout: Option<u64>,
shared_runtime: Option<Arc<SharedRuntime>>,
otlp_endpoint: Option<String>,
output_to_log: bool,
log_max_line_size: Option<usize>,
}

#[no_mangle]
Expand Down Expand Up @@ -498,6 +500,37 @@ pub unsafe extern "C" fn ddog_trace_exporter_config_set_otlp_endpoint(
)
}

/// Configure the exporter to write traces as newline-delimited JSON to stdout (the Datadog
/// Forwarder "log exporter" path) instead of sending them to a Datadog agent. Used in serverless
/// environments (e.g. AWS Lambda) when no agent is reachable.
///
/// `max_line_size` overrides the per-line byte cap; pass `0` to use the default (256 KiB, the AWS
/// CloudWatch Logs limit). When enabled, agent-specific behavior (agent-info polling, client-side
/// stats, V1 negotiation) is bypassed.
///
/// In this mode each span's `meta` is serialized to process stdout (and thus captured by CloudWatch
/// Logs in Lambda); `meta_struct` is excluded because it holds raw msgpack the log intake cannot
/// interpret.
///
/// Writes are synchronous/blocking on stdout, so this mode targets single-threaded / current-thread
/// serverless runtimes (e.g. AWS Lambda) where a blocking write won't stall a shared async reactor.
#[no_mangle]
pub unsafe extern "C" fn ddog_trace_exporter_config_set_output_to_log(
config: Option<&mut TraceExporterConfig>,
max_line_size: usize,
) -> Option<Box<ExporterError>> {
catch_panic!(
if let Some(handle) = config {
handle.output_to_log = true;
handle.log_max_line_size = (max_line_size != 0).then_some(max_line_size);
None
} else {
gen_error!(ErrorCode::InvalidArgument)
},
gen_error!(ErrorCode::Panic)
)
}

/// Create a new TraceExporter instance.
///
/// When an OTLP endpoint is configured via `TraceExporterConfig`, the exporter sends traces in
Expand Down Expand Up @@ -567,6 +600,10 @@ pub unsafe extern "C" fn ddog_trace_exporter_new(
builder.set_otlp_endpoint(url);
}

if config.output_to_log {
builder.set_output_to_log(config.log_max_line_size);
}

match builder.build() {
Ok(exporter) => {
out_handle.as_ptr().write(Box::new(exporter));
Expand Down Expand Up @@ -662,11 +699,37 @@ mod tests {
assert!(cfg.process_tags.is_none());
assert!(cfg.test_session_token.is_none());
assert!(cfg.connection_timeout.is_none());
assert!(!cfg.output_to_log);
assert_eq!(cfg.log_max_line_size, None);

ddog_trace_exporter_config_free(cfg);
}
}

#[test]
fn config_output_to_log_test() {
unsafe {
// Null config handle -> InvalidArgument.
let error = ddog_trace_exporter_config_set_output_to_log(None, 0);
assert_eq!(error.as_ref().unwrap().code, ErrorCode::InvalidArgument);
ddog_trace_exporter_error_free(error);

// 0 is a sentinel for "use the default cap" -> None.
let mut config = Some(TraceExporterConfig::default());
let error = ddog_trace_exporter_config_set_output_to_log(config.as_mut(), 0);
assert_eq!(error, None);
let cfg = config.unwrap();
assert!(cfg.output_to_log);
assert_eq!(cfg.log_max_line_size, None);

// Non-zero cap is stored as-is.
let mut config = Some(TraceExporterConfig::default());
let error = ddog_trace_exporter_config_set_output_to_log(config.as_mut(), 4096);
assert_eq!(error, None);
assert_eq!(config.unwrap().log_max_line_size, Some(4096));
}
}

#[test]
fn config_url_test() {
unsafe {
Expand Down
16 changes: 11 additions & 5 deletions libdd-data-pipeline/src/trace_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use std::{
time::{Duration, Instant},
};

use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability};
use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability};
use libdd_shared_runtime::Worker;

use crate::trace_exporter::{
Expand Down Expand Up @@ -640,18 +640,24 @@ pub trait Export<T>: Send + Debug {
}

#[derive(Debug)]
pub struct DefaultExport<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> {
pub struct DefaultExport<
C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static,
> {
trace_exporter: TraceExporter<C>,
}

impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static> DefaultExport<C> {
impl<
C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static,
> DefaultExport<C>
{
pub fn new(trace_exporter: TraceExporter<C>) -> Self {
Self { trace_exporter }
}
}

impl<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static>
Export<libdd_trace_utils::span::v04::SpanBytes> for DefaultExport<C>
impl<
C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static,
> Export<libdd_trace_utils::span::v04::SpanBytes> for DefaultExport<C>
{
fn export_trace_chunks(
&mut self,
Expand Down
116 changes: 100 additions & 16 deletions libdd-data-pipeline/src/trace_exporter/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::otlp::OtlpTraceConfig;
use crate::telemetry::TelemetryClientBuilder;
use crate::trace_exporter::agent_response::AgentResponsePayloadVersion;
use crate::trace_exporter::error::BuilderErrorKind;
use crate::trace_exporter::log_writer::DEFAULT_LOG_MAX_LINE_SIZE;
#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))]
use crate::trace_exporter::TelemetryConfig;
#[cfg(not(target_arch = "wasm32"))]
Expand All @@ -18,7 +19,7 @@ use crate::trace_exporter::{
TracerMetadata, INFO_ENDPOINT,
};
use arc_swap::ArcSwap;
use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability};
use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability};
use libdd_common::{parse_uri, tag, Endpoint};
use libdd_dogstatsd_client::new;
use libdd_shared_runtime::SharedRuntime;
Expand Down Expand Up @@ -65,6 +66,11 @@ pub struct TraceExporterBuilder {
connection_timeout: Option<u64>,
otlp_endpoint: Option<String>,
otlp_headers: Vec<(String, String)>,
/// When true, traces are written as newline-delimited JSON to stdout (the
/// Datadog Forwarder "log exporter" path) instead of being sent to an agent.
output_to_log: bool,
/// Optional override for the maximum size of a single emitted log line.
log_max_line_size: Option<usize>,
}

impl TraceExporterBuilder {
Expand Down Expand Up @@ -296,12 +302,40 @@ impl TraceExporterBuilder {
self
}

/// Configure the exporter to write traces as newline-delimited JSON to stdout
/// (the Datadog Forwarder "log exporter" path) instead of sending them to a
/// Datadog agent. This is the transport used in serverless environments (e.g.
/// AWS Lambda) when no agent is reachable.
///
/// `max_line_size` overrides the per-line byte cap; `None` (or `Some(0)`,
/// which is coerced to the default) uses the default of 256 KiB, the AWS
/// CloudWatch Logs per-event limit. When this is set, agent-specific behavior
/// (agent-info polling, client-side stats, V1 negotiation) is bypassed.
///
/// In this mode each span's `meta` is serialized to process stdout (and thus
/// captured by CloudWatch Logs in Lambda); `meta_struct` is excluded because
/// it holds raw msgpack the log intake cannot interpret. Writes are
/// synchronous/blocking, so this mode targets single-threaded serverless
/// runtimes where blocking stdout writes do not stall a shared async reactor.
///
/// Takes precedence over an OTLP endpoint: if both this and `set_otlp_endpoint`

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is non-blocking and can be addressed in a separate PR...

I'm not a fan of the exporter containing precedence logic for configuration. I left a similar comment on @paullegranddc's agentless PR. I believe config logic should live in the SDKs as much as possible, and if the builder receives conflicting config settings, we should return an error. If we want to support precedence to avoid setup friction for customers that should be handled by the SDKs. I don't think there is any harm in merging it as is for now. Let's just capture it in a Jira ticket.

/// are configured, traces are written to the log output and not sent via OTLP.
pub fn set_output_to_log(&mut self, max_line_size: Option<usize>) -> &mut Self {
self.output_to_log = true;
// Treat `Some(0)` as "use the default" (a 0 cap would drop every span);
// keeps parity with the FFI setter's 0-sentinel.
self.log_max_line_size = max_line_size.filter(|&n| n != 0);
self
}

/// Build the [`TraceExporter`] synchronously.
///
/// Sync facade over [`Self::build_async`]; panics inside an existing tokio context.
/// Not available on wasm — use [`Self::build_async`] there.
#[cfg(not(target_arch = "wasm32"))]
pub fn build<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static>(
pub fn build<
C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static,
>(
mut self,
) -> Result<TraceExporter<C>, TraceExporterError> {
let shared_runtime = match self.shared_runtime.as_ref() {
Expand All @@ -320,7 +354,7 @@ impl TraceExporterBuilder {
/// Awaits all async setup (e.g. telemetry start-up). Safe to drive from any async
/// context.
pub async fn build_async<
C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static,
C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static,
>(
self,
) -> Result<TraceExporter<C>, TraceExporterError> {
Expand Down Expand Up @@ -365,19 +399,26 @@ impl TraceExporterBuilder {
let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT));
let (info_fetcher, info_response_observer) =
AgentInfoFetcher::<C>::new(info_endpoint, Duration::from_secs(5 * 60));
let info_fetcher_handle =
shared_runtime
.spawn_worker(info_fetcher, false)
.map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(
e.to_string(),
))
})?;
// The handle is currently only tracked for shutdown on native; on wasm
// it is dropped here (the worker keeps running on the JS event loop
// until the page/module is torn down).
// In log-export mode there is no agent to poll; skip spawning the worker

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left a comment about agent info and telemetry workers being unnecessarily spawned in @paullegranddc's agentless PR. Beyond the scope of this PR, but after these two PRs land we should discuss refactoring the builder a little bit to handle this a bit more elegantly.

// entirely so we don't make repeated failing `/info` calls (e.g. in Lambda).
let info_fetcher_handle = if self.output_to_log {
None
} else {
Some(
shared_runtime
.spawn_worker(info_fetcher, false)
.map_err(|e| {
TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration(
e.to_string(),
))
})?,
)
};
// The handle is only tracked for shutdown on native; on wasm the `workers`
// field is cfg'd out, so drop it here (the worker keeps running on the JS
// event loop until the page/module is torn down).
#[cfg(target_arch = "wasm32")]
let _ = info_fetcher_handle;
drop(info_fetcher_handle);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Claude can't refrain from rewriting unrelated stuff with bad taste, can't he 😛 (I'm mostly joking, I don't care what syntax people use to drop value, although I do personally prefer the former)


#[allow(unused_mut)]
let mut stats = StatsComputationStatus::Disabled;
Expand All @@ -389,7 +430,13 @@ impl TraceExporterBuilder {
#[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))]
let (telemetry_client, telemetry_handle) = {
let sessions = self.telemetry_instrumentation_sessions;
let telemetry = self.telemetry.map(|telemetry_config| {
// Telemetry talks to the agent; disable it in log-export mode.
let telemetry = if self.output_to_log {
None
} else {
self.telemetry
}
.map(|telemetry_config| {
let mut builder = TelemetryClientBuilder::default()
.set_language(&self.language)
.set_language_version(&self.language_version)
Expand Down Expand Up @@ -455,6 +502,10 @@ impl TraceExporterBuilder {
}
});

let log_output = self
.output_to_log
.then(|| self.log_max_line_size.unwrap_or(DEFAULT_LOG_MAX_LINE_SIZE));

Ok(TraceExporter {
endpoint: Endpoint {
url: agent_url,
Expand Down Expand Up @@ -514,6 +565,7 @@ impl TraceExporterBuilder {
.agent_rates_payload_version_enabled
.then(AgentResponsePayloadVersion::new),
otlp_config,
log_output,
})
}

Expand Down Expand Up @@ -545,6 +597,38 @@ mod tests {
use crate::trace_exporter::error::BuilderErrorKind;
use libdd_capabilities_impl::NativeCapabilities;

#[cfg_attr(miri, ignore)]
#[test]
fn test_log_output_mode() {
let mut builder = TraceExporterBuilder::default();
builder
.set_service("test")
.set_input_format(TraceExporterInputFormat::V04)
.set_output_to_log(None);
let exporter = builder.build::<NativeCapabilities>().unwrap();
// Log-output mode is enabled and the agent-info worker is not spawned.
// (End-to-end send -> bytes is covered by the capability-injecting test in
// `mod.rs` and the `log_writer` unit tests.)
assert!(
exporter.log_output.is_some(),
"log_output should be set in log-output mode"
);
assert!(
exporter.workers.info_fetcher.is_none(),
"no agent-info worker should be spawned in log mode"
);
}

#[test]
fn set_output_to_log_some_zero_uses_default() {
// `Some(0)` is coerced to "use the default cap" (a 0 cap would drop every
// span), which is represented as `None` on the builder field.
let mut builder = TraceExporterBuilder::default();
builder.set_output_to_log(Some(0));
assert!(builder.output_to_log);
assert_eq!(builder.log_max_line_size, None);
}

#[cfg_attr(miri, ignore)]
#[test]
fn test_new() {
Expand Down
Loading
Loading