-
Notifications
You must be signed in to change notification settings - Fork 21
feat(data-pipeline)!: add stdout log trace exporter #2074
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,19 @@ | ||
| // Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ | ||
| // SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| //! Log-output capability trait. | ||
| //! | ||
| //! Lets the trace pipeline emit already-encoded log-exporter output to the | ||
| //! platform's log sink. On native targets this writes to process stdout; a wasm | ||
| //! consumer implements it by handing the bytes to the host (e.g. JavaScript), | ||
| //! since wasm cannot write to stdout directly. | ||
|
|
||
| /// Capability for writing encoded log-exporter output to the platform log sink. | ||
| pub trait LogWriterCapability { | ||
| /// Write a buffer of newline-delimited log output. | ||
| /// | ||
| /// `bytes` may contain one or more `\n`-terminated JSON lines. Implementations | ||
| /// should write the whole buffer (so individual lines are not interleaved with | ||
| /// other writers) and flush before returning. | ||
| fn write_log_output(&self, bytes: &[u8]) -> std::io::Result<()>; | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,7 @@ use crate::otlp::OtlpTraceConfig; | |
| use crate::telemetry::TelemetryClientBuilder; | ||
| use crate::trace_exporter::agent_response::AgentResponsePayloadVersion; | ||
| use crate::trace_exporter::error::BuilderErrorKind; | ||
| use crate::trace_exporter::log_writer::DEFAULT_LOG_MAX_LINE_SIZE; | ||
| #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] | ||
| use crate::trace_exporter::TelemetryConfig; | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
|
|
@@ -18,7 +19,7 @@ use crate::trace_exporter::{ | |
| TracerMetadata, INFO_ENDPOINT, | ||
| }; | ||
| use arc_swap::ArcSwap; | ||
| use libdd_capabilities::{HttpClientCapability, MaybeSend, SleepCapability}; | ||
| use libdd_capabilities::{HttpClientCapability, LogWriterCapability, MaybeSend, SleepCapability}; | ||
| use libdd_common::{parse_uri, tag, Endpoint}; | ||
| use libdd_dogstatsd_client::new; | ||
| use libdd_shared_runtime::SharedRuntime; | ||
|
|
@@ -65,6 +66,11 @@ pub struct TraceExporterBuilder { | |
| connection_timeout: Option<u64>, | ||
| otlp_endpoint: Option<String>, | ||
| otlp_headers: Vec<(String, String)>, | ||
| /// When true, traces are written as newline-delimited JSON to stdout (the | ||
| /// Datadog Forwarder "log exporter" path) instead of being sent to an agent. | ||
| output_to_log: bool, | ||
| /// Optional override for the maximum size of a single emitted log line. | ||
| log_max_line_size: Option<usize>, | ||
| } | ||
|
|
||
| impl TraceExporterBuilder { | ||
|
|
@@ -296,12 +302,40 @@ impl TraceExporterBuilder { | |
| self | ||
| } | ||
|
|
||
| /// Configure the exporter to write traces as newline-delimited JSON to stdout | ||
| /// (the Datadog Forwarder "log exporter" path) instead of sending them to a | ||
| /// Datadog agent. This is the transport used in serverless environments (e.g. | ||
| /// AWS Lambda) when no agent is reachable. | ||
| /// | ||
| /// `max_line_size` overrides the per-line byte cap; `None` (or `Some(0)`, | ||
| /// which is coerced to the default) uses the default of 256 KiB, the AWS | ||
| /// CloudWatch Logs per-event limit. When this is set, agent-specific behavior | ||
| /// (agent-info polling, client-side stats, V1 negotiation) is bypassed. | ||
| /// | ||
| /// In this mode each span's `meta` is serialized to process stdout (and thus | ||
| /// captured by CloudWatch Logs in Lambda); `meta_struct` is excluded because | ||
| /// it holds raw msgpack the log intake cannot interpret. Writes are | ||
| /// synchronous/blocking, so this mode targets single-threaded serverless | ||
| /// runtimes where blocking stdout writes do not stall a shared async reactor. | ||
| /// | ||
| /// Takes precedence over an OTLP endpoint: if both this and `set_otlp_endpoint` | ||
| /// are configured, traces are written to the log output and not sent via OTLP. | ||
| pub fn set_output_to_log(&mut self, max_line_size: Option<usize>) -> &mut Self { | ||
| self.output_to_log = true; | ||
| // Treat `Some(0)` as "use the default" (a 0 cap would drop every span); | ||
| // keeps parity with the FFI setter's 0-sentinel. | ||
| self.log_max_line_size = max_line_size.filter(|&n| n != 0); | ||
| self | ||
| } | ||
|
|
||
| /// Build the [`TraceExporter`] synchronously. | ||
| /// | ||
| /// Sync facade over [`Self::build_async`]; panics inside an existing tokio context. | ||
| /// Not available on wasm — use [`Self::build_async`] there. | ||
| #[cfg(not(target_arch = "wasm32"))] | ||
| pub fn build<C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static>( | ||
| pub fn build< | ||
| C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, | ||
| >( | ||
| mut self, | ||
| ) -> Result<TraceExporter<C>, TraceExporterError> { | ||
| let shared_runtime = match self.shared_runtime.as_ref() { | ||
|
|
@@ -320,7 +354,7 @@ impl TraceExporterBuilder { | |
| /// Awaits all async setup (e.g. telemetry start-up). Safe to drive from any async | ||
| /// context. | ||
| pub async fn build_async< | ||
| C: HttpClientCapability + SleepCapability + MaybeSend + Sync + 'static, | ||
| C: HttpClientCapability + SleepCapability + LogWriterCapability + MaybeSend + Sync + 'static, | ||
| >( | ||
| self, | ||
| ) -> Result<TraceExporter<C>, TraceExporterError> { | ||
|
|
@@ -365,19 +399,26 @@ impl TraceExporterBuilder { | |
| let info_endpoint = Endpoint::from_url(add_path(&agent_url, INFO_ENDPOINT)); | ||
| let (info_fetcher, info_response_observer) = | ||
| AgentInfoFetcher::<C>::new(info_endpoint, Duration::from_secs(5 * 60)); | ||
| let info_fetcher_handle = | ||
| shared_runtime | ||
| .spawn_worker(info_fetcher, false) | ||
| .map_err(|e| { | ||
| TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( | ||
| e.to_string(), | ||
| )) | ||
| })?; | ||
| // The handle is currently only tracked for shutdown on native; on wasm | ||
| // it is dropped here (the worker keeps running on the JS event loop | ||
| // until the page/module is torn down). | ||
| // In log-export mode there is no agent to poll; skip spawning the worker | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I left a comment about agent info and telemetry workers being unnecessarily spawned in @paullegranddc's agentless PR. Beyond the scope of this PR, but after these two PRs land we should discuss refactoring the builder a little bit to handle this a bit more elegantly. |
||
| // entirely so we don't make repeated failing `/info` calls (e.g. in Lambda). | ||
| let info_fetcher_handle = if self.output_to_log { | ||
| None | ||
| } else { | ||
| Some( | ||
| shared_runtime | ||
| .spawn_worker(info_fetcher, false) | ||
| .map_err(|e| { | ||
| TraceExporterError::Builder(BuilderErrorKind::InvalidConfiguration( | ||
| e.to_string(), | ||
| )) | ||
| })?, | ||
| ) | ||
| }; | ||
| // The handle is only tracked for shutdown on native; on wasm the `workers` | ||
| // field is cfg'd out, so drop it here (the worker keeps running on the JS | ||
| // event loop until the page/module is torn down). | ||
| #[cfg(target_arch = "wasm32")] | ||
| let _ = info_fetcher_handle; | ||
| drop(info_fetcher_handle); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Claude can't refrain from rewriting unrelated stuff with bad taste, can't he 😛 (I'm mostly joking, I don't care what syntax people use to drop value, although I do personally prefer the former) |
||
|
|
||
| #[allow(unused_mut)] | ||
| let mut stats = StatsComputationStatus::Disabled; | ||
|
|
@@ -389,7 +430,13 @@ impl TraceExporterBuilder { | |
| #[cfg(all(not(target_arch = "wasm32"), feature = "telemetry"))] | ||
| let (telemetry_client, telemetry_handle) = { | ||
| let sessions = self.telemetry_instrumentation_sessions; | ||
| let telemetry = self.telemetry.map(|telemetry_config| { | ||
| // Telemetry talks to the agent; disable it in log-export mode. | ||
| let telemetry = if self.output_to_log { | ||
| None | ||
| } else { | ||
| self.telemetry | ||
| } | ||
| .map(|telemetry_config| { | ||
| let mut builder = TelemetryClientBuilder::default() | ||
| .set_language(&self.language) | ||
| .set_language_version(&self.language_version) | ||
|
|
@@ -455,6 +502,10 @@ impl TraceExporterBuilder { | |
| } | ||
| }); | ||
|
|
||
| let log_output = self | ||
| .output_to_log | ||
| .then(|| self.log_max_line_size.unwrap_or(DEFAULT_LOG_MAX_LINE_SIZE)); | ||
|
|
||
| Ok(TraceExporter { | ||
| endpoint: Endpoint { | ||
| url: agent_url, | ||
|
|
@@ -514,6 +565,7 @@ impl TraceExporterBuilder { | |
| .agent_rates_payload_version_enabled | ||
| .then(AgentResponsePayloadVersion::new), | ||
| otlp_config, | ||
| log_output, | ||
| }) | ||
| } | ||
|
|
||
|
|
@@ -545,6 +597,38 @@ mod tests { | |
| use crate::trace_exporter::error::BuilderErrorKind; | ||
| use libdd_capabilities_impl::NativeCapabilities; | ||
|
|
||
| #[cfg_attr(miri, ignore)] | ||
| #[test] | ||
| fn test_log_output_mode() { | ||
| let mut builder = TraceExporterBuilder::default(); | ||
| builder | ||
| .set_service("test") | ||
| .set_input_format(TraceExporterInputFormat::V04) | ||
| .set_output_to_log(None); | ||
| let exporter = builder.build::<NativeCapabilities>().unwrap(); | ||
| // Log-output mode is enabled and the agent-info worker is not spawned. | ||
| // (End-to-end send -> bytes is covered by the capability-injecting test in | ||
| // `mod.rs` and the `log_writer` unit tests.) | ||
| assert!( | ||
| exporter.log_output.is_some(), | ||
| "log_output should be set in log-output mode" | ||
| ); | ||
| assert!( | ||
| exporter.workers.info_fetcher.is_none(), | ||
| "no agent-info worker should be spawned in log mode" | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn set_output_to_log_some_zero_uses_default() { | ||
| // `Some(0)` is coerced to "use the default cap" (a 0 cap would drop every | ||
| // span), which is represented as `None` on the builder field. | ||
| let mut builder = TraceExporterBuilder::default(); | ||
| builder.set_output_to_log(Some(0)); | ||
| assert!(builder.output_to_log); | ||
| assert_eq!(builder.log_max_line_size, None); | ||
| } | ||
|
|
||
| #[cfg_attr(miri, ignore)] | ||
| #[test] | ||
| fn test_new() { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is non-blocking and can be addressed in a separate PR...
I'm not a fan of the exporter containing precedence logic for configuration. I left a similar comment on @paullegranddc's agentless PR. I believe config logic should live in the SDKs as much as possible, and if the builder receives conflicting config settings, we should return an error. If we want to support precedence to avoid setup friction for customers that should be handled by the SDKs. I don't think there is any harm in merging it as is for now. Let's just capture it in a Jira ticket.