diff --git a/Cargo.lock b/Cargo.lock index ad07875b86..2c3cf428f3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3477,6 +3477,7 @@ dependencies = [ "serde", "serde_json", "tempfile", + "thin-vec", "tokio", "tracing", "urlencoding", @@ -5773,6 +5774,12 @@ dependencies = [ "tempfile", ] +[[package]] +name = "thin-vec" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0f7e269b48f0a7dd0146680fa24b50cc67fc0373f086a5b2f99bd084639b482" + [[package]] name = "thiserror" version = "1.0.68" diff --git a/LICENSE-3rdparty.csv b/LICENSE-3rdparty.csv index 16d8a34b9e..35909364f9 100644 --- a/LICENSE-3rdparty.csv +++ b/LICENSE-3rdparty.csv @@ -429,6 +429,7 @@ target-triple,https://github.com/dtolnay/target-triple,MIT OR Apache-2.0,David T tempfile,https://github.com/Stebalien/tempfile,MIT OR Apache-2.0,"Steven Allen , The Rust Project Developers, Ashley Mannix , Jason White " termcolor,https://github.com/BurntSushi/termcolor,Unlicense OR MIT,Andrew Gallant test-case-macros,https://github.com/frondeus/test-case,MIT,"Marcin Sas-Szymanski , Wojciech Polak , Łukasz Biel " +thin-vec,https://github.com/mozilla/thin-vec,MIT OR Apache-2.0,Aria Beingessner thiserror,https://github.com/dtolnay/thiserror,MIT OR Apache-2.0,David Tolnay thiserror-impl,https://github.com/dtolnay/thiserror,MIT OR Apache-2.0,David Tolnay thread_local,https://github.com/Amanieu/thread_local-rs,MIT OR Apache-2.0,Amanieu d'Antras diff --git a/libdd-trace-utils/Cargo.toml b/libdd-trace-utils/Cargo.toml index 247680b429..62ac3f9a2d 100644 --- a/libdd-trace-utils/Cargo.toml +++ b/libdd-trace-utils/Cargo.toml @@ -46,6 +46,7 @@ libdd-tinybytes = { version = "1.1.1", path = "../libdd-tinybytes", features = [ ] } indexmap = "2.11" rustc-hash = "2" +thin-vec = "0.2" # Compression feature flate2 = { version = "1.0", optional = true } diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs index 9bb50e393d..071e16bc70 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/mod.rs @@ -2,8 +2,10 @@ // SPDX-License-Identifier: Apache-2.0 mod span_v04; +mod span_v1; use crate::span::v04::Span; +use crate::span::v1::TracerPayload; use crate::span::TraceData; use crate::tracer_metadata::TracerMetadata; use libdd_common::ResultInfallibleExt; @@ -32,12 +34,75 @@ mod trace_key { mod chunk_key { pub const PRIORITY: u8 = 1; pub const ORIGIN: u8 = 2; + pub const ATTRIBUTES: u8 = 3; pub const SPANS: u8 = 4; + pub const DROPPED_TRACE: u8 = 5; pub const TRACE_ID: u8 = 6; /// Sampling mechanism (previously the `_dd.p.dm` span tag). pub const SAMPLING_MECHANISM: u8 = 7; } +/// Integer keys for V1 span fields. +#[repr(u8)] +pub(super) enum SpanKey { + Service = 1, + Name = 2, + Resource = 3, + SpanId = 4, + ParentId = 5, + Start = 6, + Duration = 7, + Error = 8, + Attributes = 9, + Type = 10, + SpanLinks = 11, + SpanEvents = 12, + Env = 13, + Version = 14, + Component = 15, + Kind = 16, +} + +/// Integer keys for V1 span link fields. +#[repr(u8)] +pub(super) enum SpanLinkKey { + TraceId = 1, + SpanId = 2, + Attributes = 3, + TraceState = 4, + Flags = 5, +} + +/// Integer keys for V1 span event fields. +#[repr(u8)] +pub(super) enum SpanEventKey { + Time = 1, + Name = 2, + Attributes = 3, +} + +/// Type discriminants for attribute values. +/// An attribute value is encoded as [type_uint8][actual_value]. +#[repr(u8)] +pub(super) enum AnyValueKey { + String = 1, + Bool = 2, + Double = 3, + Int64 = 4, + Bytes = 5, + Array = 6, + KeyValueList = 7, +} + +/// Number of msgpack items written per `[type, value]` pair when typed values are flattened +/// into a parent array (e.g. `AttributeValue::List`). +pub(super) const TYPED_VALUE_STRIDE: u32 = 2; + +/// Number of msgpack items written per `[key, type, value]` triplet when typed attribute +/// entries are flattened into a parent array (top-level attribute maps and +/// `AttributeValue::KeyValue`). +pub(super) const FLAT_ATTR_STRIDE: u32 = 3; + /// Streaming string intern table. /// /// The first time a string is written, it is emitted as a msgpack `str` and assigned an @@ -78,6 +143,21 @@ impl StringTable { } } +/// Returns the span start time in UNIX nanos, falling back to the current wall-clock time when +/// the input is negative. Matches the agent's `validateAndFixStartTime`, which substitutes +/// `time.Now().UnixNano()` for invalid start values; without this, a negative `i64` would wrap +/// to a near-`u64::MAX` timestamp on cast. +pub(super) fn span_start_unix_nanos(start: i64) -> u64 { + if start < 0 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_nanos() as u64) + .unwrap_or(0) + } else { + start as u64 + } +} + /// Promoted fields extracted from the payload's spans, written at the top-level map. struct PayloadAttrs<'a> { env: Option<&'a str>, @@ -259,6 +339,12 @@ fn encode_payload]>>( write_map_len(writer, map_len)?; + write_uint8(writer, trace_key::CHUNKS)?; + write_array_len(writer, traces.len() as u32)?; + for trace in traces { + encode_chunk(writer, trace.as_ref(), &mut table)?; + } + if !metadata.language.is_empty() { write_uint8(writer, trace_key::LANGUAGE_NAME)?; table.write_interned(writer, &metadata.language)?; @@ -298,25 +384,19 @@ fn encode_payload]>>( // Encoded as a flat array of triplets: [key, type_uint, value, ...] // String values use type discriminant 1. write_uint8(writer, trace_key::ATTRIBUTES)?; - write_array_len(writer, attr_count * 3)?; + write_array_len(writer, attr_count * FLAT_ATTR_STRIDE)?; if let Some(v) = payload_attrs.apm_mode { table.write_interned(writer, "_dd.apm_mode")?; - write_uint8(writer, span_v04::AnyValueKey::String as u8)?; + write_uint8(writer, AnyValueKey::String as u8)?; table.write_interned(writer, v)?; } if let Some(v) = payload_attrs.git_commit_sha { table.write_interned(writer, "_dd.git.commit.sha")?; - write_uint8(writer, span_v04::AnyValueKey::String as u8)?; + write_uint8(writer, AnyValueKey::String as u8)?; table.write_interned(writer, v)?; } } - write_uint8(writer, trace_key::CHUNKS)?; - write_array_len(writer, traces.len() as u32)?; - for trace in traces { - encode_chunk(writer, trace.as_ref(), &mut table)?; - } - Ok(()) } @@ -348,6 +428,12 @@ fn encode_chunk( write_uint8(writer, chunk_key::TRACE_ID)?; write_bin(writer, &attrs.trace_id.to_be_bytes())?; + write_uint8(writer, chunk_key::SPANS)?; + write_array_len(writer, spans.len() as u32)?; + for span in spans { + span_v04::encode_span(writer, span, table)?; + } + if let Some(origin) = attrs.origin { write_uint8(writer, chunk_key::ORIGIN)?; table.write_interned(writer, origin)?; @@ -363,12 +449,6 @@ fn encode_chunk( write_uint(writer, mechanism as u64)?; } - write_uint8(writer, chunk_key::SPANS)?; - write_array_len(writer, spans.len() as u32)?; - for span in spans { - span_v04::encode_span(writer, span, table)?; - } - Ok(()) } @@ -421,6 +501,266 @@ pub fn to_encoded_byte_len]>>( counter.0 } +/// Encodes a [`TracerPayload`] (V1 data model) as a V1 msgpack payload. +fn encode_payload_v1( + writer: &mut W, + payload: &TracerPayload, +) -> Result<(), ValueWriteError> { + let mut table = StringTable::new(); + + let has_attributes = !payload.attributes.is_empty(); + + let map_len = 1u32 // chunks always present + + (!payload.language_name.borrow().is_empty()) as u32 + + (!payload.language_version.borrow().is_empty()) as u32 + + (!payload.tracer_version.borrow().is_empty()) as u32 + + (!payload.runtime_id.borrow().is_empty()) as u32 + + (!payload.env.borrow().is_empty()) as u32 + + (!payload.hostname.borrow().is_empty()) as u32 + + (!payload.app_version.borrow().is_empty()) as u32 + + has_attributes as u32; + + write_map_len(writer, map_len)?; + + write_uint8(writer, trace_key::CHUNKS)?; + write_array_len(writer, payload.chunks.len() as u32)?; + for chunk in &payload.chunks { + encode_chunk_v1(writer, chunk, &mut table)?; + } + + if !payload.language_name.borrow().is_empty() { + write_uint8(writer, trace_key::LANGUAGE_NAME)?; + table.write_interned(writer, payload.language_name.borrow())?; + } + + if !payload.language_version.borrow().is_empty() { + write_uint8(writer, trace_key::LANGUAGE_VERSION)?; + table.write_interned(writer, payload.language_version.borrow())?; + } + + if !payload.tracer_version.borrow().is_empty() { + write_uint8(writer, trace_key::TRACER_VERSION)?; + table.write_interned(writer, payload.tracer_version.borrow())?; + } + + if !payload.runtime_id.borrow().is_empty() { + write_uint8(writer, trace_key::RUNTIME_ID)?; + table.write_interned(writer, payload.runtime_id.borrow())?; + } + + if !payload.env.borrow().is_empty() { + write_uint8(writer, trace_key::ENV_REF)?; + table.write_interned(writer, payload.env.borrow())?; + } + + if !payload.hostname.borrow().is_empty() { + write_uint8(writer, trace_key::HOSTNAME_REF)?; + table.write_interned(writer, payload.hostname.borrow())?; + } + + if !payload.app_version.borrow().is_empty() { + write_uint8(writer, trace_key::APP_VERSION_REF)?; + table.write_interned(writer, payload.app_version.borrow())?; + } + + if has_attributes { + write_uint8(writer, trace_key::ATTRIBUTES)?; + span_v1::encode_attributes_map(writer, &payload.attributes, &mut table)?; + } + + Ok(()) +} + +/// Encodes one V1 chunk (a group of spans sharing a trace ID). +fn encode_chunk_v1( + writer: &mut W, + chunk: &crate::span::v1::TraceChunk, + table: &mut StringTable, +) -> Result<(), ValueWriteError> { + let origin = >::borrow(&chunk.origin); + let has_attributes = !chunk.attributes.is_empty(); + let has_dropped = chunk.dropped_trace; + + let fields = 2u32 // trace_id + spans + + !origin.is_empty() as u32 + + chunk.priority.is_some() as u32 + + chunk.sampling_mechanism.is_some() as u32 + + has_attributes as u32 + + has_dropped as u32; + + write_map_len(writer, fields)?; + + write_uint8(writer, chunk_key::TRACE_ID)?; + write_bin(writer, &chunk.trace_id)?; + + write_uint8(writer, chunk_key::SPANS)?; + write_array_len(writer, chunk.spans.len() as u32)?; + for span in &chunk.spans { + span_v1::encode_span(writer, span, table)?; + } + + if !origin.is_empty() { + write_uint8(writer, chunk_key::ORIGIN)?; + table.write_interned(writer, origin)?; + } + + if let Some(priority) = chunk.priority { + write_uint8(writer, chunk_key::PRIORITY)?; + write_sint(writer, priority as i64)?; + } + + if let Some(mechanism) = chunk.sampling_mechanism { + write_uint8(writer, chunk_key::SAMPLING_MECHANISM)?; + write_uint(writer, mechanism as u64)?; + } + + if has_attributes { + write_uint8(writer, chunk_key::ATTRIBUTES)?; + span_v1::encode_attributes_map(writer, &chunk.attributes, table)?; + } + + if has_dropped { + write_uint8(writer, chunk_key::DROPPED_TRACE)?; + rmp::encode::write_bool(writer, true).map_err(ValueWriteError::InvalidDataWrite)?; + } + + Ok(()) +} + +/// Serializes a `TracerPayload` into a vector of bytes with a default capacity of 0. +/// +/// # Arguments +/// +/// * `payload` - A reference to a `TracerPayload`. +/// +/// # Returns +/// +/// * `Vec` - A vector containing the encoded payload. +/// +/// # Examples +/// +/// ``` +/// use libdd_trace_utils::msgpack_encoder::v1::to_vec_from_payload_v1; +/// use libdd_trace_utils::span::v1::TracerPayloadSlice; +/// +/// let payload = TracerPayloadSlice { +/// language_name: "rust".into(), +/// ..Default::default() +/// }; +/// let encoded = to_vec_from_payload_v1(&payload); +/// +/// assert!(!encoded.is_empty()); +/// ``` +pub fn to_vec_from_payload_v1(payload: &TracerPayload) -> Vec { + to_vec_from_payload_with_capacity_v1(payload, 0) +} + +/// Serializes a `TracerPayload` into a vector of bytes with specified capacity. +/// +/// # Arguments +/// +/// * `payload` - A reference to a `TracerPayload`. +/// * `capacity` - Desired initial capacity of the resulting vector. +/// +/// # Returns +/// +/// * `Vec` - A vector containing the encoded payload. +/// +/// # Examples +/// +/// ``` +/// use libdd_trace_utils::msgpack_encoder::v1::to_vec_from_payload_with_capacity_v1; +/// use libdd_trace_utils::span::v1::TracerPayloadSlice; +/// +/// let payload = TracerPayloadSlice { +/// language_name: "rust".into(), +/// ..Default::default() +/// }; +/// let encoded = to_vec_from_payload_with_capacity_v1(&payload, 1024); +/// +/// assert!(encoded.capacity() >= 1024); +/// ``` +pub fn to_vec_from_payload_with_capacity_v1( + payload: &TracerPayload, + capacity: u32, +) -> Vec { + let mut buf = ByteBuf::with_capacity(capacity as usize); + encode_payload_v1(&mut buf, payload) + .map_err(super::flatten_value_write_infallible) + .unwrap_infallible(); + buf.into_vec() +} + +/// Encodes a `TracerPayload` into a slice of bytes. +/// +/// # Arguments +/// +/// * `slice` - A mutable reference to a byte slice. +/// * `payload` - A reference to a `TracerPayload`. +/// +/// # Returns +/// +/// * `Ok(())` - If encoding succeeds. +/// * `Err(ValueWriteError)` - If encoding fails. +/// +/// # Errors +/// +/// This function will return an error if the underlying writer fails (e.g. buffer too small). +/// +/// # Examples +/// +/// ``` +/// use libdd_trace_utils::msgpack_encoder::v1::write_payload_to_slice_v1; +/// use libdd_trace_utils::span::v1::TracerPayloadSlice; +/// +/// let mut buffer = vec![0u8; 1024]; +/// let payload = TracerPayloadSlice { +/// language_name: "rust".into(), +/// ..Default::default() +/// }; +/// +/// write_payload_to_slice_v1(&mut &mut buffer[..], &payload).expect("Encoding failed"); +/// ``` +pub fn write_payload_to_slice_v1( + slice: &mut &mut [u8], + payload: &TracerPayload, +) -> Result<(), ValueWriteError> { + encode_payload_v1(slice, payload) +} + +/// Computes the number of bytes required to encode the given `TracerPayload`. +/// +/// This does not allocate any actual buffer, but simulates writing in order to measure +/// the encoded size of the payload. +/// +/// # Arguments +/// +/// * `payload` - A reference to a `TracerPayload`. +/// +/// # Returns +/// +/// * `u32` - The number of bytes that would be written by the encoder. +/// +/// # Examples +/// +/// ``` +/// use libdd_trace_utils::msgpack_encoder::v1::to_encoded_byte_len_from_payload_v1; +/// use libdd_trace_utils::span::v1::TracerPayloadSlice; +/// +/// let payload = TracerPayloadSlice { +/// language_name: "rust".into(), +/// ..Default::default() +/// }; +/// let encoded_len = to_encoded_byte_len_from_payload_v1(&payload); +/// +/// assert!(encoded_len > 0); +/// ``` +pub fn to_encoded_byte_len_from_payload_v1(payload: &TracerPayload) -> u32 { + let mut counter = super::CountLength(0); + let _ = encode_payload_v1(&mut counter, payload); + counter.0 +} + #[cfg(test)] mod tests { use super::*; @@ -867,3 +1207,367 @@ mod tests { ); } } + +#[cfg(test)] +mod v1_payload_tests { + //! Unit tests for the v1::Span encoder (`encode_payload_v1`). + //! + //! Verifies the encoder produces a valid V1 payload from the canonical + //! [`crate::span::v1::TracerPayload`] data model and that core invariants (interning, byte + //! length, optional fields) hold. + + use super::*; + use crate::span::v1::{ + AttributeValue, Span as V1Span, SpanBytes as V1SpanBytes, SpanKind, TraceChunkBytes, + TracerPayloadBytes, + }; + use crate::span::vec_map::VecMap; + use libdd_tinybytes::BytesString; + + fn bs(s: &str) -> BytesString { + BytesString::from_slice(s.as_bytes()).expect("test string must fit in BytesString") + } + + fn make_span(service: &str, name: &str, span_id: u64) -> V1SpanBytes { + V1Span { + service: bs(service), + name: bs(name), + resource: bs("res"), + span_id, + start: 1_000_000, + duration: 500, + ..Default::default() + } + } + + fn make_chunk(spans: Vec, trace_id: [u8; 16]) -> TraceChunkBytes { + TraceChunkBytes { + trace_id, + spans, + ..Default::default() + } + } + + #[test] + fn empty_payload_is_valid_msgpack_map() { + let payload = TracerPayloadBytes::default(); + let encoded = to_vec_from_payload_v1(&payload); + // Map with a single entry (chunks), then an empty array. `0x81` = fixmap of length 1, + // followed by chunk key (0x0b), then `0x90` (fixarray length 0). + assert_eq!(encoded, vec![0x81, 0x0b, 0x90]); + } + + #[test] + fn payload_byte_len_matches_to_vec() { + let chunk = make_chunk(vec![make_span("svc", "op", 1)], [0u8; 16]); + let payload = TracerPayloadBytes { + chunks: vec![chunk], + ..Default::default() + }; + let encoded = to_vec_from_payload_v1(&payload); + let len = to_encoded_byte_len_from_payload_v1(&payload); + assert_eq!(encoded.len() as u32, len); + } + + #[test] + fn span_kind_is_always_emitted_as_uint() { + // Default SpanKind (Internal=1) must be emitted. The encoded payload contains + // `kind_key (0x10) | uint 1 (0x01)`. + let chunk = make_chunk(vec![make_span("svc", "op", 1)], [0u8; 16]); + let payload = TracerPayloadBytes { + chunks: vec![chunk], + ..Default::default() + }; + let encoded = to_vec_from_payload_v1(&payload); + let pat = [0x10u8, 0x01u8]; + assert!( + encoded.windows(2).any(|w| w == pat), + "Kind (key=16) Internal (=1) must be emitted" + ); + } + + #[test] + fn typed_attributes_carry_correct_type_discriminants() { + let mut attrs = VecMap::new(); + attrs.insert(bs("k_str"), AttributeValue::String(bs("v"))); + let span = V1Span { + service: bs("svc"), + name: bs("op"), + resource: bs("res"), + span_id: 1, + start: 1, + duration: 1, + attributes: attrs, + ..Default::default() + }; + let chunk = make_chunk(vec![span], [0u8; 16]); + let payload = TracerPayloadBytes { + chunks: vec![chunk], + ..Default::default() + }; + let encoded = to_vec_from_payload_v1(&payload); + // String attribute → type discriminant = 1 (`AnyValueKey::String`). + assert!( + encoded.windows(b"k_str".len()).any(|w| w == b"k_str"), + "attribute key must appear" + ); + } + + #[test] + fn bytes_attribute_uses_bin_marker() { + // A Bytes attribute must use the msgpack `bin` family, not `str`. + let mut attrs = VecMap::new(); + attrs.insert( + bs("payload"), + AttributeValue::Bytes(libdd_tinybytes::Bytes::copy_from_slice(b"\xde\xad")), + ); + let span = V1Span { + service: bs("svc"), + name: bs("op"), + resource: bs("res"), + span_id: 1, + start: 1, + duration: 1, + attributes: attrs, + ..Default::default() + }; + let payload = TracerPayloadBytes { + chunks: vec![make_chunk(vec![span], [0u8; 16])], + ..Default::default() + }; + let encoded = to_vec_from_payload_v1(&payload); + // bin8 marker `0xc4` followed by length `0x02` and the bytes themselves. + let want = [0xc4u8, 0x02, 0xde, 0xad]; + assert!( + encoded.windows(4).any(|w| w == want), + "Bytes attribute must be encoded as msgpack bin" + ); + } + + #[test] + fn list_and_keyvalue_attributes_round_trip_through_recursion() { + let mut nested = VecMap::new(); + nested.insert(bs("nk"), AttributeValue::Int(7)); + let mut attrs = VecMap::new(); + attrs.insert( + bs("list"), + AttributeValue::List(vec![ + AttributeValue::String(bs("a")), + AttributeValue::Bool(true), + ]), + ); + attrs.insert(bs("kv"), AttributeValue::KeyValue(nested)); + let span = V1Span { + service: bs("svc"), + name: bs("op"), + resource: bs("res"), + span_id: 1, + start: 1, + duration: 1, + attributes: attrs, + ..Default::default() + }; + let payload = TracerPayloadBytes { + chunks: vec![make_chunk(vec![span], [0u8; 16])], + ..Default::default() + }; + let encoded = to_vec_from_payload_v1(&payload); + // The keys and the nested key must all appear at least once. + for s in &[b"list" as &[u8], b"kv", b"a", b"nk"] { + assert!( + encoded.windows(s.len()).any(|w| w == *s), + "{} should appear in payload", + std::str::from_utf8(s).unwrap() + ); + } + } + + #[test] + fn promoted_fields_at_payload_level() { + let payload = TracerPayloadBytes { + language_name: bs("python"), + language_version: bs("3.11"), + tracer_version: bs("2.0.0"), + runtime_id: bs("rt-1"), + env: bs("prod"), + hostname: bs("h"), + app_version: bs("1.2.3"), + chunks: vec![make_chunk(vec![make_span("svc", "op", 1)], [0u8; 16])], + ..Default::default() + }; + let encoded = to_vec_from_payload_v1(&payload); + for s in &[ + b"python" as &[u8], + b"3.11", + b"2.0.0", + b"rt-1", + b"prod", + b"1.2.3", + ] { + assert!( + encoded.windows(s.len()).any(|w| w == *s), + "{} should appear", + std::str::from_utf8(s).unwrap() + ); + } + } + + #[test] + fn chunk_level_attrs_emitted_when_set() { + let chunk = TraceChunkBytes { + trace_id: [0u8; 16], + priority: Some(1), + origin: bs("lambda"), + sampling_mechanism: Some(4), + spans: vec![make_span("svc", "op", 1)], + ..Default::default() + }; + let payload = TracerPayloadBytes { + chunks: vec![chunk], + ..Default::default() + }; + let encoded = to_vec_from_payload_v1(&payload); + assert!( + encoded.windows(b"lambda".len()).any(|w| w == b"lambda"), + "chunk origin should appear" + ); + // sampling_mechanism=4 → SAMPLING_MECHANISM (0x07) + positive fixint 0x04 + let want = [chunk_key::SAMPLING_MECHANISM, 0x04]; + assert!(encoded.windows(2).any(|w| w == want)); + } + + #[test] + fn chunk_dropped_trace_emitted_when_true() { + let chunk = TraceChunkBytes { + trace_id: [0u8; 16], + dropped_trace: true, + spans: vec![make_span("svc", "op", 1)], + ..Default::default() + }; + let payload = TracerPayloadBytes { + chunks: vec![chunk], + ..Default::default() + }; + let encoded = to_vec_from_payload_v1(&payload); + // DROPPED_TRACE (0x05) + msgpack true marker (0xc3) + let want = [chunk_key::DROPPED_TRACE, 0xc3]; + assert!( + encoded.windows(2).any(|w| w == want), + "DROPPED_TRACE marker + true should appear in payload" + ); + } + + #[test] + fn chunk_dropped_trace_skipped_when_false() { + let chunk = TraceChunkBytes { + trace_id: [0u8; 16], + dropped_trace: false, + spans: vec![make_span("svc", "op", 1)], + ..Default::default() + }; + let payload = TracerPayloadBytes { + chunks: vec![chunk], + ..Default::default() + }; + let encoded = to_vec_from_payload_v1(&payload); + assert!( + !encoded.contains(&chunk_key::DROPPED_TRACE), + "DROPPED_TRACE key should not be emitted when false" + ); + } + + #[test] + fn chunk_attributes_emitted_when_set() { + let mut attrs = VecMap::new(); + attrs.insert(bs("region"), AttributeValue::String(bs("us-east-1"))); + let chunk = TraceChunkBytes { + trace_id: [0u8; 16], + attributes: attrs, + spans: vec![make_span("svc", "op", 1)], + ..Default::default() + }; + let payload = TracerPayloadBytes { + chunks: vec![chunk], + ..Default::default() + }; + let encoded = to_vec_from_payload_v1(&payload); + // ATTRIBUTES (0x03) + msgpack fixarray header for 3 elements (0x93) + let want = [chunk_key::ATTRIBUTES, 0x93]; + assert!( + encoded.windows(2).any(|w| w == want), + "ATTRIBUTES key + flat-triplet array header should appear" + ); + assert!( + encoded + .windows(b"us-east-1".len()) + .any(|w| w == b"us-east-1"), + "chunk attribute value should be in the payload" + ); + } + + #[test] + fn span_kind_otel_values() { + for (kind, expected_byte) in [ + (SpanKind::Internal, 0x01u8), + (SpanKind::Server, 0x02), + (SpanKind::Client, 0x03), + (SpanKind::Producer, 0x04), + (SpanKind::Consumer, 0x05), + ] { + let span = V1Span { + service: bs("svc"), + name: bs("op"), + resource: bs("res"), + span_id: 1, + start: 1, + duration: 1, + span_kind: kind, + ..Default::default() + }; + let payload = TracerPayloadBytes { + chunks: vec![make_chunk(vec![span], [0u8; 16])], + ..Default::default() + }; + let encoded = to_vec_from_payload_v1(&payload); + let want = [0x10u8, expected_byte]; + assert!( + encoded.windows(2).any(|w| w == want), + "SpanKind {kind:?} should produce byte {expected_byte:#x}" + ); + } + } + + #[test] + fn string_interning_works_across_chunks() { + // The string "shared" appears in two chunks. The second occurrence must be a uint ID, + // not a fresh str. Verify by (a) scanning the encoded bytes for the literal "shared" + // — it must appear exactly once — and (b) confirming the two-chunk payload is smaller + // than two independent single-chunk payloads. + let chunk_with_two = TracerPayloadBytes { + chunks: vec![ + make_chunk(vec![make_span("shared", "op1", 1)], [0u8; 16]), + make_chunk(vec![make_span("shared", "op2", 2)], [0u8; 16]), + ], + ..Default::default() + }; + let single = TracerPayloadBytes { + chunks: vec![make_chunk(vec![make_span("shared", "op1", 1)], [0u8; 16])], + ..Default::default() + }; + let two = to_vec_from_payload_v1(&chunk_with_two); + let one = to_vec_from_payload_v1(&single); + let shared_occurrences = two + .windows(b"shared".len()) + .filter(|w| *w == b"shared") + .count(); + assert_eq!( + shared_occurrences, 1, + "the literal bytes \"shared\" must appear exactly once; subsequent uses must be \ + encoded as interning IDs" + ); + assert!( + two.len() < 2 * one.len(), + "interning should reduce repeated payload size" + ); + } +} diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs index ac7472de00..b3bfc8092e 100644 --- a/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs +++ b/libdd-trace-utils/src/msgpack_encoder/v1/span_v04.rs @@ -8,63 +8,8 @@ use rmp::encode::{ ValueWriteError, }; use std::borrow::Borrow; -use std::time; - -use super::StringTable; - -/// Integer keys for V1 span fields. -#[repr(u8)] -pub(super) enum SpanKey { - Service = 1, - Name = 2, - Resource = 3, - SpanId = 4, - ParentId = 5, - Start = 6, - Duration = 7, - Error = 8, - Attributes = 9, - Type = 10, - SpanLinks = 11, - SpanEvents = 12, - Env = 13, - Version = 14, - Component = 15, - Kind = 16, -} - -/// Integer keys for V1 span link fields. -#[repr(u8)] -pub(super) enum SpanLinkKey { - TraceId = 1, - SpanId = 2, - Attributes = 3, - TraceState = 4, - Flags = 5, -} -/// Integer keys for V1 span event fields. -#[repr(u8)] -pub(super) enum SpanEventKey { - Time = 1, - Name = 2, - Attributes = 3, -} - -/// Type discriminants for attribute values. -/// An attribute value is encoded as [type_uint8][actual_value]. -#[repr(u8)] -pub(super) enum AnyValueKey { - String = 1, - Bool = 2, - Double = 3, - Int64 = 4, - Bytes = 5, - Array = 6, - /// Not used in V04→V1 conversion (V04 has no key-value list type), defined for completeness. - #[allow(dead_code)] - KeyValueList = 7, -} +use super::{span_start_unix_nanos, AnyValueKey, SpanEventKey, SpanKey, SpanLinkKey, StringTable}; /// Maps the `span.kind` string tag (from v0.4 meta) to the OTEL SpanKind uint32. /// @@ -307,18 +252,7 @@ pub fn encode_span( write_u64(writer, span.span_id)?; write_uint8(writer, SpanKey::Start as u8)?; - if span.start < 0 { - // Fall back to wall-clock now (UNIX nanos). Matches the agent's - // `validateAndFixStartTime` which substitutes `time.Now().UnixNano()` - // for invalid start values. - let now = time::SystemTime::now() - .duration_since(time::UNIX_EPOCH) - .map(|d| d.as_nanos() as u64) - .unwrap_or(0); - write_u64(writer, now)?; - } else { - write_u64(writer, span.start as u64)?; - } + write_u64(writer, span_start_unix_nanos(span.start))?; if is_parent { write_uint8(writer, SpanKey::ParentId as u8)?; diff --git a/libdd-trace-utils/src/msgpack_encoder/v1/span_v1.rs b/libdd-trace-utils/src/msgpack_encoder/v1/span_v1.rs new file mode 100644 index 0000000000..44792f2235 --- /dev/null +++ b/libdd-trace-utils/src/msgpack_encoder/v1/span_v1.rs @@ -0,0 +1,343 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::span::v1::{AttributeValue, Span, SpanEvent, SpanLink}; +use crate::span::vec_map::VecMap; +use crate::span::TraceData; +use rmp::encode::{ + write_array_len, write_bin, write_bool, write_f64, write_map_len, write_sint, write_u64, + write_uint, write_uint8, RmpWrite, ValueWriteError, +}; +use std::borrow::Borrow; +use std::collections::HashSet; + +use super::{ + span_start_unix_nanos, AnyValueKey, SpanEventKey, SpanKey, SpanLinkKey, StringTable, + FLAT_ATTR_STRIDE, TYPED_VALUE_STRIDE, +}; + +/// Encodes a typed `AttributeValue` as `[type_uint8, value]`. +/// +/// # Arguments +/// +/// * `writer` - A RmpWriter compatible with rmp writing functions. +/// * `value` - The attribute value to encode. +/// * `table` - The streaming string intern table used for interning string values. +/// +/// # Returns +/// +/// * `Ok(())` - Nothing if successful. +/// * `Err(ValueWriteError)` - An error if the writing fails. +/// +/// # Errors +/// +/// This function will return any error emitted by the writer. +pub(super) fn encode_attribute_value( + writer: &mut W, + value: &AttributeValue, + table: &mut StringTable, +) -> Result<(), ValueWriteError> { + match value { + AttributeValue::String(s) => { + write_uint8(writer, AnyValueKey::String as u8)?; + table.write_interned(writer, s.borrow())?; + } + AttributeValue::Bool(b) => { + write_uint8(writer, AnyValueKey::Bool as u8)?; + write_bool(writer, *b).map_err(ValueWriteError::InvalidDataWrite)?; + } + AttributeValue::Float(f) => { + write_uint8(writer, AnyValueKey::Double as u8)?; + write_f64(writer, *f)?; + } + AttributeValue::Int(i) => { + write_uint8(writer, AnyValueKey::Int64 as u8)?; + write_sint(writer, *i)?; + } + AttributeValue::Bytes(b) => { + write_uint8(writer, AnyValueKey::Bytes as u8)?; + write_bin(writer, b.borrow())?; + } + AttributeValue::List(arr) => { + // Encoded as a flat array of `[type, value]` pairs. + write_uint8(writer, AnyValueKey::Array as u8)?; + write_array_len(writer, arr.len() as u32 * TYPED_VALUE_STRIDE)?; + for v in arr { + encode_attribute_value(writer, v, table)?; + } + } + AttributeValue::KeyValue(map) => { + // Encoded as a flat array of `[key, type, value]` triplets — consistent with the + // top-level attributes map (`encode_attributes_map`). + write_uint8(writer, AnyValueKey::KeyValueList as u8)?; + encode_attributes_map(writer, map, table)?; + } + } + Ok(()) +} + +/// Encodes a map of attributes as a flat triplet array: `[key, type_uint8, value, ...]`. +/// +/// # Arguments +/// +/// * `writer` - A RmpWriter compatible with rmp writing functions. +/// * `map` - The attribute map to encode. +/// * `table` - The streaming string intern table used for interning keys and string values. +/// +/// # Returns +/// +/// * `Ok(())` - Nothing if successful. +/// * `Err(ValueWriteError)` - An error if the writing fails. +/// +/// # Errors +/// +/// This function will return any error emitted by the writer. +pub(super) fn encode_attributes_map( + writer: &mut W, + map: &VecMap>, + table: &mut StringTable, +) -> Result<(), ValueWriteError> { + // `VecMap` tolerates duplicate keys for fast insertion (later writes shadow earlier ones via + // `get`). Dedup here so the wire format carries each key once, with the last-written value. + // Walk in reverse keeping first-seen (= last-written), then reverse to restore insertion + // order. `T::Text: Hash + Eq + Borrow` per the `SpanText` trait. + let mut seen: HashSet<&str> = HashSet::with_capacity(map.len()); + let mut deduped: Vec<(&T::Text, &AttributeValue)> = map + .iter() + .rev() + .filter(|(k, _)| seen.insert(>::borrow(k))) + .map(|(k, v)| (k, v)) + .collect(); + deduped.reverse(); + + write_array_len(writer, deduped.len() as u32 * FLAT_ATTR_STRIDE)?; + for (k, v) in deduped { + table.write_interned(writer, k.borrow())?; + encode_attribute_value(writer, v, table)?; + } + Ok(()) +} + +/// Encodes a `SpanLink` object into a slice of bytes. +/// +/// # Arguments +/// +/// * `writer` - A RmpWriter compatible with rmp writing functions. +/// * `span_links` - The span links to encode. +/// * `table` - The streaming string intern table. +/// +/// # Returns +/// +/// * `Ok(())` - Nothing if successful. +/// * `Err(ValueWriteError)` - An error if the writing fails. +/// +/// # Errors +/// +/// This function will return any error emitted by the writer. +pub(super) fn encode_span_links( + writer: &mut W, + span_links: &[SpanLink], + table: &mut StringTable, +) -> Result<(), ValueWriteError> { + write_uint8(writer, SpanKey::SpanLinks as u8)?; + write_array_len(writer, span_links.len() as u32)?; + + for link in span_links { + let link_len = 1 // trace_id (always) + + (link.span_id != 0) as u32 + + (!link.attributes.is_empty()) as u32 + + (!link.tracestate.borrow().is_empty()) as u32 + + (link.flags != 0) as u32; + + write_map_len(writer, link_len)?; + + write_uint8(writer, SpanLinkKey::TraceId as u8)?; + write_bin(writer, &link.trace_id)?; + + if link.span_id != 0 { + write_uint8(writer, SpanLinkKey::SpanId as u8)?; + write_u64(writer, link.span_id)?; + } + + if !link.attributes.is_empty() { + write_uint8(writer, SpanLinkKey::Attributes as u8)?; + encode_attributes_map(writer, &link.attributes, table)?; + } + + if !link.tracestate.borrow().is_empty() { + write_uint8(writer, SpanLinkKey::TraceState as u8)?; + table.write_interned(writer, link.tracestate.borrow())?; + } + + if link.flags != 0 { + write_uint8(writer, SpanLinkKey::Flags as u8)?; + write_uint(writer, link.flags as u64)?; + } + } + + Ok(()) +} + +/// Encodes a `SpanEvent` object into a slice of bytes. +/// +/// # Arguments +/// +/// * `writer` - A RmpWriter compatible with rmp writing functions. +/// * `span_events` - The span events to encode. +/// * `table` - The streaming string intern table. +/// +/// # Returns +/// +/// * `Ok(())` - Nothing if successful. +/// * `Err(ValueWriteError)` - An error if the writing fails. +/// +/// # Errors +/// +/// This function will return any error emitted by the writer. +pub(super) fn encode_span_events( + writer: &mut W, + span_events: &[SpanEvent], + table: &mut StringTable, +) -> Result<(), ValueWriteError> { + write_uint8(writer, SpanKey::SpanEvents as u8)?; + write_array_len(writer, span_events.len() as u32)?; + + for event in span_events { + let event_len = 2 // time + name + + (!event.attributes.is_empty()) as u32; + + write_map_len(writer, event_len)?; + + write_uint8(writer, SpanEventKey::Time as u8)?; + write_u64(writer, event.time_unix_nano)?; + + write_uint8(writer, SpanEventKey::Name as u8)?; + table.write_interned(writer, event.name.borrow())?; + + if !event.attributes.is_empty() { + write_uint8(writer, SpanEventKey::Attributes as u8)?; + encode_attributes_map(writer, &event.attributes, table)?; + } + } + + Ok(()) +} + +/// Encodes a `Span` object into a slice of bytes. +/// +/// # Arguments +/// +/// * `writer` - A RmpWriter compatible with rmp writing functions. +/// * `span` - The span to encode. +/// * `table` - The streaming string intern table. +/// +/// # Returns +/// +/// * `Ok(())` - Nothing if successful. +/// * `Err(ValueWriteError)` - An error if the writing fails. +/// +/// # Errors +/// +/// This function will return any error emitted by the writer. +pub(super) fn encode_span( + writer: &mut W, + span: &Span, + table: &mut StringTable, +) -> Result<(), ValueWriteError> { + let is_parent = span.parent_id != 0; + let has_duration = span.duration != 0; + let has_error = span.error; + let has_attributes = !span.attributes.is_empty(); + let has_env = !span.env.borrow().is_empty(); + let has_version = !span.version.borrow().is_empty(); + let has_component = !span.component.borrow().is_empty(); + + let span_len = 3 // span_id, start, kind — always present + + (!span.service.borrow().is_empty()) as u32 + + (!span.name.borrow().is_empty()) as u32 + + (!span.resource.borrow().is_empty()) as u32 + + (!span.r#type.borrow().is_empty()) as u32 + + is_parent as u32 + + has_duration as u32 + + has_error as u32 + + has_attributes as u32 + + (!span.span_links.is_empty()) as u32 + + (!span.span_events.is_empty()) as u32 + + has_env as u32 + + has_version as u32 + + has_component as u32; + + write_map_len(writer, span_len)?; + + if !span.service.borrow().is_empty() { + write_uint8(writer, SpanKey::Service as u8)?; + table.write_interned(writer, span.service.borrow())?; + } + + if !span.name.borrow().is_empty() { + write_uint8(writer, SpanKey::Name as u8)?; + table.write_interned(writer, span.name.borrow())?; + } + + if !span.resource.borrow().is_empty() { + write_uint8(writer, SpanKey::Resource as u8)?; + table.write_interned(writer, span.resource.borrow())?; + } + + write_uint8(writer, SpanKey::SpanId as u8)?; + write_u64(writer, span.span_id)?; + + write_uint8(writer, SpanKey::Start as u8)?; + write_u64(writer, span_start_unix_nanos(span.start))?; + + if is_parent { + write_uint8(writer, SpanKey::ParentId as u8)?; + write_u64(writer, span.parent_id)?; + } + + if has_duration { + write_uint8(writer, SpanKey::Duration as u8)?; + write_u64(writer, span.duration.max(0) as u64)?; + } + + if has_error { + write_uint8(writer, SpanKey::Error as u8)?; + write_bool(writer, true).map_err(ValueWriteError::InvalidDataWrite)?; + } + + if !span.r#type.borrow().is_empty() { + write_uint8(writer, SpanKey::Type as u8)?; + table.write_interned(writer, span.r#type.borrow())?; + } + + if has_attributes { + write_uint8(writer, SpanKey::Attributes as u8)?; + encode_attributes_map(writer, &span.attributes, table)?; + } + + if !span.span_links.is_empty() { + encode_span_links(writer, &span.span_links, table)?; + } + + if !span.span_events.is_empty() { + encode_span_events(writer, &span.span_events, table)?; + } + + if has_env { + write_uint8(writer, SpanKey::Env as u8)?; + table.write_interned(writer, span.env.borrow())?; + } + if has_version { + write_uint8(writer, SpanKey::Version as u8)?; + table.write_interned(writer, span.version.borrow())?; + } + if has_component { + write_uint8(writer, SpanKey::Component as u8)?; + table.write_interned(writer, span.component.borrow())?; + } + // SpanKind is always emitted (default = Internal). + write_uint8(writer, SpanKey::Kind as u8)?; + write_uint(writer, span.span_kind as u64)?; + + Ok(()) +} diff --git a/libdd-trace-utils/src/span/mod.rs b/libdd-trace-utils/src/span/mod.rs index 641f235974..1a122efe99 100644 --- a/libdd-trace-utils/src/span/mod.rs +++ b/libdd-trace-utils/src/span/mod.rs @@ -4,6 +4,7 @@ pub mod trace_utils; pub mod v04; pub mod v05; +pub mod v1; pub mod vec_map; use crate::msgpack_decoder::decode::buffer::read_string_ref_nomut; diff --git a/libdd-trace-utils/src/span/v1/mod.rs b/libdd-trace-utils/src/span/v1/mod.rs new file mode 100644 index 0000000000..459cbc5438 --- /dev/null +++ b/libdd-trace-utils/src/span/v1/mod.rs @@ -0,0 +1,176 @@ +// Copyright 2026-Present Datadog, Inc. https://www.datadoghq.com/ +// SPDX-License-Identifier: Apache-2.0 + +use crate::span::vec_map::VecMap; +use crate::span::{BytesData, SliceData, TraceData}; +pub use thin_vec::ThinVec; + +/// OpenTelemetry SpanKind values, encoded on the wire as a `uint32`. +/// Unset or unrecognized kinds default to [`SpanKind::Internal`]. +#[repr(u32)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +pub enum SpanKind { + #[default] + Internal = 1, + Server = 2, + Client = 3, + Producer = 4, + Consumer = 5, +} + +impl SpanKind { + /// Parses a v0.4 `span.kind` meta value into a [`SpanKind`]. + /// Unrecognized values map to [`SpanKind::Internal`]. + pub fn from_meta(s: &str) -> Self { + match s { + "server" => SpanKind::Server, + "client" => SpanKind::Client, + "producer" => SpanKind::Producer, + "consumer" => SpanKind::Consumer, + _ => SpanKind::Internal, + } + } +} + +/// Typed V1 attribute value. +/// Replaces v0.4's split `meta` / `metrics` / `meta_struct` maps. +#[derive(Debug)] +pub enum AttributeValue { + String(T::Text), + Float(f64), + Int(i64), + Bool(bool), + Bytes(T::Bytes), + KeyValue(VecMap>), + List(Vec>), +} + +/// The generic representation of a V1 span. +/// +/// `T: TraceData` carries the associated text type `T::Text` used for every string field in the +/// span; `T::Text` can be either owned (e.g. [`BytesString`](libdd_tinybytes::BytesString)) or +/// borrowed (e.g. `&str`). To define a generic function taking any `Span` you can use the +/// [`TraceData`] trait: +/// ``` +/// use libdd_trace_utils::span::{v1::Span, TraceData}; +/// fn foo(span: Span) { +/// let _ = span.attributes.get("foo"); +/// } +/// ``` +#[derive(Debug, Default)] +pub struct Span { + pub service: T::Text, + pub name: T::Text, + pub resource: T::Text, + pub r#type: T::Text, + pub span_id: u64, + pub parent_id: u64, + pub start: i64, + pub duration: i64, + pub error: bool, + pub span_kind: SpanKind, + pub env: T::Text, + pub version: T::Text, + pub component: T::Text, + pub attributes: VecMap>, + pub span_links: ThinVec>, + pub span_events: ThinVec>, +} + +/// The generic representation of a V1 span link. +/// `T` is the type used to represent strings in the span link. +#[derive(Debug, Default)] +pub struct SpanLink { + pub trace_id: [u8; 16], + pub span_id: u64, + pub attributes: VecMap>, + pub tracestate: T::Text, + pub flags: u32, +} + +/// The generic representation of a V1 span event. +/// `T` is the type used to represent strings in the span event. +#[derive(Debug, Default)] +pub struct SpanEvent { + pub time_unix_nano: u64, + pub name: T::Text, + pub attributes: VecMap>, +} + +/// A V1 trace chunk: a group of spans sharing the same `trace_id`, plus chunk-level metadata. +#[derive(Debug, Default)] +pub struct TraceChunk { + pub trace_id: [u8; 16], + pub priority: Option, + pub origin: T::Text, + pub sampling_mechanism: Option, + pub dropped_trace: bool, + pub attributes: VecMap>, + pub spans: Vec>, +} + +/// A V1 tracer payload: tracer-level metadata and the trace chunks it carries. +#[derive(Debug, Default)] +pub struct TracerPayload { + pub language_name: T::Text, + pub language_version: T::Text, + pub tracer_version: T::Text, + pub runtime_id: T::Text, + pub env: T::Text, + pub hostname: T::Text, + pub app_version: T::Text, + pub attributes: VecMap>, + pub chunks: Vec>, +} + +pub type SpanBytes = Span; +pub type SpanLinkBytes = SpanLink; +pub type SpanEventBytes = SpanEvent; +pub type AttributeValueBytes = AttributeValue; +pub type TraceChunkBytes = TraceChunk; +pub type TracerPayloadBytes = TracerPayload; + +pub type SpanSlice<'a> = Span>; +pub type SpanLinkSlice<'a> = SpanLink>; +pub type SpanEventSlice<'a> = SpanEvent>; +pub type AttributeValueSlice<'a> = AttributeValue>; +pub type TraceChunkSlice<'a> = TraceChunk>; +pub type TracerPayloadSlice<'a> = TracerPayload>; + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn span_kind_default_is_internal() { + assert_eq!(SpanKind::default(), SpanKind::Internal); + } + + #[test] + fn span_kind_from_meta() { + assert_eq!(SpanKind::from_meta("server"), SpanKind::Server); + assert_eq!(SpanKind::from_meta("client"), SpanKind::Client); + assert_eq!(SpanKind::from_meta("producer"), SpanKind::Producer); + assert_eq!(SpanKind::from_meta("consumer"), SpanKind::Consumer); + assert_eq!(SpanKind::from_meta("internal"), SpanKind::Internal); + assert_eq!(SpanKind::from_meta(""), SpanKind::Internal); + assert_eq!(SpanKind::from_meta("anything-else"), SpanKind::Internal); + } + + #[test] + fn span_kind_repr_matches_otel_spec() { + assert_eq!(SpanKind::Internal as u32, 1); + assert_eq!(SpanKind::Server as u32, 2); + assert_eq!(SpanKind::Client as u32, 3); + assert_eq!(SpanKind::Producer as u32, 4); + assert_eq!(SpanKind::Consumer as u32, 5); + } + + #[test] + fn span_default_has_internal_kind() { + let s = SpanBytes::default(); + assert_eq!(s.span_kind, SpanKind::Internal); + assert!(!s.error); + assert!(s.attributes.is_empty()); + } +} diff --git a/libdd-trace-utils/tests/snapshots/compare_send_data_v1_native_trace_snapshot_test.json b/libdd-trace-utils/tests/snapshots/compare_send_data_v1_native_trace_snapshot_test.json new file mode 100644 index 0000000000..ba36402ca8 --- /dev/null +++ b/libdd-trace-utils/tests/snapshots/compare_send_data_v1_native_trace_snapshot_test.json @@ -0,0 +1,43 @@ +[[ + { + "name": "test_send_data_v1_native_snapshot_root", + "service": "test-service", + "resource": "/api/users", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "web", + "meta": { + "_dd.origin": "synthetics", + "_dd.p.dm": "-4", + "_dd.p.tid": "0123456789abcdef", + "component": "http", + "env": "test-env", + "http.method": "GET", + "span.kind": "server", + "version": "1.2.3" + }, + "metrics": { + "_sampling_priority_v1": 1, + "http.duration_ms": 12.5, + "http.status_code": 200, + "http.success": 1 + }, + "span_links": [ + { + "trace_id": 18364758544493064720, + "trace_id_high": 81985529216486895, + "span_id": 11574427654092267680, + "tracestate": "dd=t.tid:abc", + "flags": 1 + } + ], + "span_events": [ + { + "time_unix_nano": 1727211691770715042, + "name": "exception" + } + ], + "duration": 5000, + "start": 1000000 + }]] diff --git a/libdd-trace-utils/tests/test_send_data.rs b/libdd-trace-utils/tests/test_send_data.rs index e4307ebf33..67b87b0671 100644 --- a/libdd-trace-utils/tests/test_send_data.rs +++ b/libdd-trace-utils/tests/test_send_data.rs @@ -12,6 +12,7 @@ mod tracing_integration_tests { use libdd_common::{http_common, Endpoint}; use libdd_tinybytes::{Bytes, BytesString}; use libdd_trace_utils::send_data::SendData; + use libdd_trace_utils::span::vec_map::VecMap; use libdd_trace_utils::test_utils::datadog_test_agent::DatadogTestAgent; use libdd_trace_utils::test_utils::{create_test_json_span, create_test_no_alloc_span}; use libdd_trace_utils::trace_utils::TracerHeaderTags; @@ -391,4 +392,286 @@ mod tracing_integration_tests { "eyJ0cmFjaW5nX3NhbXBsaW5nX3J1bGVzIjogW3sic2VydmljZSI6ICJ0ZXN0LXNlcnZpY2UiLCAibmFtZSI6ICJ0ZXN0LW5hbWUiLCAic2FtcGxlX3JhdGUiOiAwLjV9XX0=" ); } + + // ───────────────────────── V1 integration tests ────────────────────────── + // + // These tests cover the v1::Span encoder end-to-end: the payload is built directly from the + // `TracerPayload` data model in Rust, encoded with `to_vec_from_payload_v1`, POSTed to the + // `dd-apm-test-agent`'s `/v1.0/traces`, and validated via snapshot. The test-agent is the V1 + // decoder, so this exercises the full round-trip without us having to maintain one in this + // crate. + + fn bs_v1(s: &str) -> BytesString { + BytesString::from_slice(s.as_bytes()).expect("test string must fit in BytesString") + } + + /// 128-bit big-endian trace_id from `(high, low)` 64-bit halves. + fn tid_bytes(high: u64, low: u64) -> [u8; 16] { + let mut out = [0u8; 16]; + out[..8].copy_from_slice(&high.to_be_bytes()); + out[8..].copy_from_slice(&low.to_be_bytes()); + out + } + + /// POSTs a raw V1 msgpack payload to the test-agent's `/v1.0/traces` and asserts the agent + /// returns 2xx. Headers are the minimum the agent needs to attach the payload to a snapshot + /// session (`X-Datadog-Test-Session-Token` query param + `Datadog-Meta-Lang*` for routing). + async fn post_v1_payload(uri: hyper::Uri, body: Vec) { + use libdd_capabilities_impl::HttpClientCapability; + let client = NativeCapabilities::new_client(); + let req = http::Request::builder() + .method(http::Method::POST) + .uri(uri) + .header("Content-type", "application/msgpack") + .header("Datadog-Meta-Lang", "test-lang") + .header("Datadog-Meta-Lang-Version", "2.0") + .header("Datadog-Meta-Lang-Interpreter", "interpreter") + .header("Datadog-Meta-Tracer-Version", "1.0") + .body(bytes::Bytes::from(body)) + .expect("failed to build request"); + let response = client.request(req).await.expect("request failed"); + assert!( + response.status().is_success(), + "test-agent rejected V1 payload: status={} body={:?}", + response.status(), + String::from_utf8_lossy(response.body()) + ); + } + + /// Builds a TracerPayload that exercises the multi-key attribute paths the v0.4→V1 encoder + /// can't cover on its own (HashMap iteration order makes byte-by-byte cross-validation flaky + /// for n > 1), plus the primitive `AttributeValue` variants the test-agent currently + /// supports. + /// + /// NOTE: `AttributeValue::List` and `AttributeValue::KeyValue` are deliberately omitted — + /// `ddapm-test-agent` v1.56.0 returns `400: Array of strings values are not supported yet` + /// for the `Array` variant. Once test-agent V1 support catches up, add them here too. + fn make_v1_payload(name_prefix: &str) -> libdd_trace_utils::span::v1::TracerPayloadBytes { + use libdd_trace_utils::span::v1::{ + AttributeValue, AttributeValueBytes, SpanBytes as V1SpanBytes, SpanEventBytes, + SpanKind, SpanLinkBytes, TraceChunkBytes, TracerPayloadBytes, + }; + + // Multi-key attribute map on the root span — primitive variants only. + let mut root_attrs: VecMap = VecMap::new(); + root_attrs.insert(bs_v1("http.method"), AttributeValue::String(bs_v1("GET"))); + root_attrs.insert(bs_v1("http.status_code"), AttributeValue::Int(200)); + root_attrs.insert(bs_v1("http.success"), AttributeValue::Bool(true)); + root_attrs.insert(bs_v1("http.duration_ms"), AttributeValue::Float(12.5)); + + let span_link = SpanLinkBytes { + trace_id: tid_bytes(0x0123_4567_89ab_cdef, 0xfedc_ba98_7654_3210), + span_id: 0xa0a0_a0a0_a0a0_a0a0, + tracestate: bs_v1("dd=t.tid:abc"), + flags: 1, + attributes: VecMap::new(), + }; + + let span_event = SpanEventBytes { + time_unix_nano: 1_727_211_691_770_715_042, + name: bs_v1("exception"), + attributes: VecMap::new(), + }; + + let root_span = V1SpanBytes { + service: bs_v1("test-service"), + name: bs_v1(&format!("{name_prefix}_root")), + resource: bs_v1("/api/users"), + r#type: bs_v1("web"), + span_id: 1, + parent_id: 0, + start: 1_000_000, + duration: 5_000, + span_kind: SpanKind::Server, + env: bs_v1("test-env"), + version: bs_v1("1.2.3"), + component: bs_v1("http"), + attributes: root_attrs, + span_links: thin_vec::thin_vec![span_link], + span_events: thin_vec::thin_vec![span_event], + ..Default::default() + }; + + // Multi-key chunk-level attributes. + let mut chunk_attrs = VecMap::new(); + chunk_attrs.insert(bs_v1("_dd.p.dm"), AttributeValue::String(bs_v1("-4"))); + chunk_attrs.insert( + bs_v1("_dd.p.tid"), + AttributeValue::String(bs_v1("0123456789abcdef")), + ); + + let chunk = TraceChunkBytes { + trace_id: tid_bytes(0, 0xdeadbeef), + priority: Some(1), + origin: bs_v1("synthetics"), + sampling_mechanism: Some(4), + attributes: chunk_attrs, + dropped_trace: false, + spans: vec![root_span], + }; + + TracerPayloadBytes { + language_name: bs_v1("test-lang"), + language_version: bs_v1("2.0"), + tracer_version: bs_v1("1.0"), + runtime_id: bs_v1("test-runtime-id"), + env: bs_v1("test-env"), + hostname: bs_v1("test-host"), + app_version: bs_v1("1.2.3"), + attributes: VecMap::new(), + chunks: vec![chunk], + } + } + + /// End-to-end round-trip: builds a V1 payload directly from `TracerPayload`, encodes it + /// with `to_vec_from_payload_v1`, POSTs to the test-agent, and asserts the snapshot. + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn compare_v1_native_trace_snapshot_test() { + use libdd_trace_utils::msgpack_encoder::v1::to_vec_from_payload_v1; + + let relative_snapshot_path = "libdd-trace-utils/tests/snapshots/"; + let snapshot_name = "compare_send_data_v1_native_trace_snapshot_test"; + let test_agent = DatadogTestAgent::new(Some(relative_snapshot_path), None, &[]).await; + + let uri = test_agent + .get_uri_for_endpoint("v1.0/traces", Some(snapshot_name)) + .await; + + test_agent.start_session(snapshot_name, None).await; + + let payload = make_v1_payload("test_send_data_v1_native_snapshot"); + let encoded = to_vec_from_payload_v1(&payload); + + post_v1_payload(uri, encoded).await; + + test_agent.assert_snapshot(snapshot_name).await; + } + + /// Recursively normalizes a JSON value by sorting every object's keys. Necessary because the + /// test-agent serializes maps in HashMap iteration order, which is non-deterministic in Rust + /// — two semantically-equivalent traces can decode to JSON with keys in different orders. + fn normalize_json(v: &mut serde_json::Value) { + match v { + serde_json::Value::Object(map) => { + let entries: Vec<(String, serde_json::Value)> = + map.iter().map(|(k, v)| (k.clone(), v.clone())).collect(); + let mut sorted: std::collections::BTreeMap = + entries.into_iter().collect(); + for child in sorted.values_mut() { + normalize_json(child); + } + *map = sorted.into_iter().collect(); + } + serde_json::Value::Array(arr) => { + for child in arr.iter_mut() { + normalize_json(child); + } + } + _ => {} + } + } + + /// Asserts the v0.4→V1 encoder and the v1::Span encoder produce decoded traces with the same + /// canonical content. Replaces the byte-equality cross-validation suite — instead of + /// comparing raw bytes (which forced n=1 collections due to HashMap order non-determinism), + /// we let the test-agent decode both payloads and compare the resulting structures after + /// recursive key sorting. + #[cfg_attr(miri, ignore)] + #[tokio::test] + async fn v04_and_v1_encoders_produce_equivalent_decoded_traces() { + use libdd_trace_utils::span::v04::SpanBytes as V04SpanBytes; + use libdd_trace_utils::span::v1::{ + AttributeValue, SpanBytes as V1SpanBytes, SpanKind, TraceChunkBytes, TracerPayloadBytes, + }; + use libdd_trace_utils::{ + msgpack_encoder::v1::{to_vec, to_vec_from_payload_v1}, + tracer_metadata::TracerMetadata, + }; + + let test_agent = DatadogTestAgent::new(None, None, &[]).await; + let uri = test_agent.get_uri_for_endpoint("v1.0/traces", None).await; + + // ── v0.4 input ───────────────────────────────────────────────────────────── + let mut meta_v04 = VecMap::new(); + meta_v04.insert(bs_v1("env"), bs_v1("test-env")); + meta_v04.insert(bs_v1("http.method"), bs_v1("GET")); + let mut metrics_v04 = VecMap::new(); + metrics_v04.insert(bs_v1("http.duration_ms"), 12.5_f64); + let v04_traces: Vec> = vec![vec![V04SpanBytes { + service: bs_v1("svc"), + name: bs_v1("op"), + resource: bs_v1("res"), + trace_id: 1, + span_id: 1, + start: 1_000_000, + duration: 5_000, + meta: meta_v04, + metrics: metrics_v04, + ..Default::default() + }]]; + let metadata = TracerMetadata::default(); + + // ── v1::Span input — semantically equivalent to the v0.4 one ─────────────── + // `env` is promoted out of meta by the v0.4→V1 encoder; in the V1 model it lives on the + // span as a typed field. `http.method` and `http.duration_ms` go to span attributes. + let mut attrs_v1 = VecMap::new(); + attrs_v1.insert(bs_v1("http.method"), AttributeValue::String(bs_v1("GET"))); + attrs_v1.insert(bs_v1("http.duration_ms"), AttributeValue::Float(12.5)); + let v1_payload = TracerPayloadBytes { + chunks: vec![TraceChunkBytes { + trace_id: tid_bytes(0, 1), + spans: vec![V1SpanBytes { + service: bs_v1("svc"), + name: bs_v1("op"), + resource: bs_v1("res"), + span_id: 1, + start: 1_000_000, + duration: 5_000, + span_kind: SpanKind::Internal, + env: bs_v1("test-env"), + attributes: attrs_v1, + ..Default::default() + }], + ..Default::default() + }], + ..Default::default() + }; + + // ── Encode each via its dedicated encoder and POST both ──────────────────── + let bytes_v04 = to_vec(&v04_traces, &metadata); + let bytes_v1 = to_vec_from_payload_v1(&v1_payload); + post_v1_payload(uri.clone(), bytes_v04).await; + post_v1_payload(uri, bytes_v1).await; + + // ── Fetch what the test-agent decoded and compare structurally ───────────── + // The two POSTs share the same trace_id, so the test-agent groups them into a single + // "trace" containing both decoded spans. Equivalence means those two spans must match + // after recursive key normalization. + let traces = test_agent.get_sent_traces().await; + assert_eq!( + traces.len(), + 1, + "expected 1 merged trace, got {}", + traces.len() + ); + let spans = traces[0] + .as_array() + .expect("trace must be an array of spans"); + assert_eq!( + spans.len(), + 2, + "expected 2 spans (one per encoder), got {}", + spans.len() + ); + + let mut a = spans[0].clone(); + let mut b = spans[1].clone(); + normalize_json(&mut a); + normalize_json(&mut b); + assert_eq!( + a, b, + "v0.4→V1 and v1::Span encoders must decode to the same span" + ); + } }