diff --git a/Sources/ReerJSON/Error.swift b/Sources/ReerJSON/Error.swift index 827e245..38d9d75 100644 --- a/Sources/ReerJSON/Error.swift +++ b/Sources/ReerJSON/Error.swift @@ -51,6 +51,15 @@ public struct JSONError: Error, Equatable, Sendable, CustomStringConvertible { /// The coding path where the error occurred (for decoding errors). public let path: String + /// The underlying yyjson read error code, when this error originates from + /// the JSON parser. Zero (`YYJSON_READ_SUCCESS`) when not applicable. + /// + /// This is primarily used by the streaming APIs to distinguish recoverable + /// "need more data" conditions (`YYJSON_READ_ERROR_UNEXPECTED_END`, + /// `YYJSON_READ_ERROR_EMPTY_CONTENT`, `YYJSON_READ_ERROR_MORE`) from + /// non-recoverable parse errors. + internal let readErrorCode: UInt32 + public var description: String { if path.isEmpty { return message @@ -58,10 +67,11 @@ public struct JSONError: Error, Equatable, Sendable, CustomStringConvertible { return "\(message) (at \(path))" } - private init(kind: Kind, message: String, path: String = "") { + private init(kind: Kind, message: String, path: String = "", readErrorCode: UInt32 = 0) { self.kind = kind self.message = message self.path = path + self.readErrorCode = readErrorCode } // MARK: - Public Factory Methods @@ -144,6 +154,7 @@ public struct JSONError: Error, Equatable, Sendable, CustomStringConvertible { self.kind = .invalidJSON self.message = message self.path = "" + self.readErrorCode = UInt32(error.code) } /// Create an error from a yyjson write error. @@ -168,5 +179,6 @@ public struct JSONError: Error, Equatable, Sendable, CustomStringConvertible { self.kind = .writeError self.message = message self.path = "" + self.readErrorCode = 0 } } diff --git a/Sources/ReerJSON/StreamDecoder.swift b/Sources/ReerJSON/StreamDecoder.swift new file mode 100644 index 0000000..e9b3b8f --- /dev/null +++ b/Sources/ReerJSON/StreamDecoder.swift @@ -0,0 +1,453 @@ +// +// Copyright © 2026 reers. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +import Foundation + +// MARK: - StreamingJSONLinesDecoder + +/// A streaming decoder for JSON Lines (NDJSON) format. +/// +/// Each top-level JSON value in the stream is decoded into `T`. +/// +/// Internally this passes raw byte slices from the underlying +/// ``JSONStreamParser`` straight to ``ReerJSONDecoder/decode(_:from:)``, +/// avoiding any intermediate `JSONValue` serialization round-trip. +/// +/// ```swift +/// var decoder = StreamingJSONLinesDecoder(Item.self) +/// let items1 = try decoder.parseBuffer(chunk1) +/// let items2 = try decoder.parseBuffer(chunk2) +/// let remaining = try decoder.finalize() +/// ``` +public struct StreamingJSONLinesDecoder: @unchecked Sendable { + + private var parser: JSONStreamParser + private let decoder: ReerJSONDecoder + private let type: T.Type + + /// Creates a new JSON Lines streaming decoder. + /// + /// - Parameters: + /// - type: The `Decodable` type to decode each value into. + /// - options: Options for reading JSON. + /// - decoder: An optional ``ReerJSONDecoder`` with custom strategies. + /// If `nil`, a default decoder is used. + public init( + _ type: T.Type, + options: JSONReadOptions = .default, + decoder: ReerJSONDecoder? = nil + ) { + self.type = type + self.parser = JSONStreamParser(mode: .jsonLines, options: options) + self.decoder = decoder ?? ReerJSONDecoder() + } + + /// Feeds data to the decoder and returns all decoded values. + public mutating func parseBuffer(_ data: Data) throws -> [T] { + let slices = try parser.parseSlices(data) + return try decodeAll(slices) + } + + /// Signals end-of-stream and returns any remaining decoded values. + public mutating func finalize() throws -> [T] { + let slices = try parser.finalizeSlices() + return try decodeAll(slices) + } + + /// Resets the decoder to its initial state. + public mutating func reset() { + parser.reset() + } + + private func decodeAll(_ slices: [Data]) throws -> [T] { + var out: [T] = [] + out.reserveCapacity(slices.count) + for slice in slices { + out.append(try decoder.decode(type, from: slice)) + } + return out + } +} + +// MARK: - StreamingJSONArrayDecoder + +/// A streaming decoder for JSON array format. +/// +/// The stream is expected to be a single JSON array. Each element is decoded +/// individually as it becomes available. +/// +/// ```swift +/// var decoder = StreamingJSONArrayDecoder(Item.self) +/// let items1 = try decoder.parseBuffer(chunk1) +/// let items2 = try decoder.parseBuffer(chunk2) +/// let remaining = try decoder.finalize() +/// ``` +public struct StreamingJSONArrayDecoder: @unchecked Sendable { + + private var parser: JSONStreamParser + private let decoder: ReerJSONDecoder + private let type: T.Type + + public init( + _ type: T.Type, + options: JSONReadOptions = .default, + decoder: ReerJSONDecoder? = nil + ) { + self.type = type + self.parser = JSONStreamParser(mode: .jsonArray, options: options) + self.decoder = decoder ?? ReerJSONDecoder() + } + + public mutating func parseBuffer(_ data: Data) throws -> [T] { + let slices = try parser.parseSlices(data) + return try decodeAll(slices) + } + + public mutating func finalize() throws -> [T] { + let slices = try parser.finalizeSlices() + return try decodeAll(slices) + } + + public mutating func reset() { + parser.reset() + } + + private func decodeAll(_ slices: [Data]) throws -> [T] { + var out: [T] = [] + out.reserveCapacity(slices.count) + for slice in slices { + out.append(try decoder.decode(type, from: slice)) + } + return out + } +} + +// MARK: - AsyncSequence Adapters + +/// An `AsyncSequence` that yields ``JSONValue`` items from chunks of `Data`. +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public struct JSONValueStream: AsyncSequence, Sendable +where Source.Element == Data { + public typealias Element = JSONValue + + let source: Source + let mode: JSONStreamMode + let options: JSONReadOptions + + public func makeAsyncIterator() -> Iterator { + Iterator(source: source.makeAsyncIterator(), mode: mode, options: options) + } + + public struct Iterator: AsyncIteratorProtocol { + var sourceIterator: Source.AsyncIterator + var parser: JSONStreamParser + var pending: [JSONValue] = [] + var pendingIndex: Int = 0 + var sourceExhausted = false + + init(source: Source.AsyncIterator, mode: JSONStreamMode, options: JSONReadOptions) { + self.sourceIterator = source + self.parser = JSONStreamParser(mode: mode, options: options) + } + + public mutating func next() async throws -> JSONValue? { + while true { + if pendingIndex < pending.count { + let value = pending[pendingIndex] + pendingIndex += 1 + if pendingIndex >= pending.count { + pending.removeAll(keepingCapacity: true) + pendingIndex = 0 + } + return value + } + + if sourceExhausted { + return nil + } + + guard let chunk = try await sourceIterator.next() else { + sourceExhausted = true + let remaining = try parser.finalize() + if !remaining.isEmpty { + pending = remaining + pendingIndex = 0 + continue + } + return nil + } + + let values = try parser.parse(chunk) + if !values.isEmpty { + pending = values + pendingIndex = 0 + } + } + } + } +} + +/// An `AsyncSequence` that yields ``JSONValue`` items from an `AsyncSequence` of bytes. +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public struct JSONValueByteStream: AsyncSequence, Sendable +where Source.Element == UInt8 { + public typealias Element = JSONValue + + let source: Source + let mode: JSONStreamMode + let options: JSONReadOptions + let chunkSize: Int + + public func makeAsyncIterator() -> Iterator { + Iterator( + source: source.makeAsyncIterator(), + mode: mode, options: options, + chunkSize: chunkSize + ) + } + + public struct Iterator: AsyncIteratorProtocol { + var sourceIterator: Source.AsyncIterator + var parser: JSONStreamParser + var pending: [JSONValue] = [] + var pendingIndex: Int = 0 + var sourceExhausted = false + let chunkSize: Int + + init( + source: Source.AsyncIterator, + mode: JSONStreamMode, options: JSONReadOptions, + chunkSize: Int + ) { + self.sourceIterator = source + self.parser = JSONStreamParser(mode: mode, options: options) + self.chunkSize = chunkSize + } + + public mutating func next() async throws -> JSONValue? { + while true { + if pendingIndex < pending.count { + let value = pending[pendingIndex] + pendingIndex += 1 + if pendingIndex >= pending.count { + pending.removeAll(keepingCapacity: true) + pendingIndex = 0 + } + return value + } + + if sourceExhausted { + return nil + } + + let chunk = try await readChunk() + if !chunk.isEmpty { + let values = try parser.parse(chunk) + if !values.isEmpty { + pending = values + pendingIndex = 0 + continue + } + } + + if sourceExhausted { + let remaining = try parser.finalize() + if !remaining.isEmpty { + pending = remaining + pendingIndex = 0 + continue + } + return nil + } + } + } + + /// Reads up to `chunkSize` bytes from the byte source into a + /// pre-allocated buffer to avoid byte-at-a-time `Data.append`. + private mutating func readChunk() async throws -> Data { + var scratch = [UInt8]() + scratch.reserveCapacity(chunkSize) + for _ in 0..: + AsyncSequence, @unchecked Sendable +where Source.Element == Data { + public typealias Element = T + + let source: Source + let mode: JSONStreamMode + let options: JSONReadOptions + let decoder: ReerJSONDecoder + let type: T.Type + + public func makeAsyncIterator() -> Iterator { + Iterator( + source: source.makeAsyncIterator(), + mode: mode, options: options, + decoder: decoder, type: type + ) + } + + public struct Iterator: AsyncIteratorProtocol { + var sourceIterator: Source.AsyncIterator + var parser: JSONStreamParser + var pending: [T] = [] + var pendingIndex: Int = 0 + var sourceExhausted = false + let decoder: ReerJSONDecoder + let type: T.Type + + init( + source: Source.AsyncIterator, + mode: JSONStreamMode, options: JSONReadOptions, + decoder: ReerJSONDecoder, type: T.Type + ) { + self.sourceIterator = source + self.parser = JSONStreamParser(mode: mode, options: options) + self.decoder = decoder + self.type = type + } + + public mutating func next() async throws -> T? { + while true { + if pendingIndex < pending.count { + let value = pending[pendingIndex] + pendingIndex += 1 + if pendingIndex >= pending.count { + pending.removeAll(keepingCapacity: true) + pendingIndex = 0 + } + return value + } + + if sourceExhausted { + return nil + } + + guard let chunk = try await sourceIterator.next() else { + sourceExhausted = true + let slices = try parser.finalizeSlices() + if !slices.isEmpty { + pending = try decodeAll(slices) + pendingIndex = 0 + continue + } + return nil + } + + let slices = try parser.parseSlices(chunk) + if !slices.isEmpty { + pending = try decodeAll(slices) + pendingIndex = 0 + } + } + } + + private func decodeAll(_ slices: [Data]) throws -> [T] { + var out: [T] = [] + out.reserveCapacity(slices.count) + for slice in slices { + out.append(try decoder.decode(type, from: slice)) + } + return out + } + } +} + +// MARK: - AsyncSequence Extensions + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension AsyncSequence where Element == Data, Self: Sendable { + + /// Returns an `AsyncSequence` of ``JSONValue`` items parsed from this + /// data stream. + /// + /// - Parameters: + /// - mode: The stream format (`.jsonLines` or `.jsonArray`). + /// - options: Options for reading JSON. + public func jsonValues( + mode: JSONStreamMode = .jsonLines, + options: JSONReadOptions = .default + ) -> JSONValueStream { + JSONValueStream(source: self, mode: mode, options: options) + } + + /// Returns an `AsyncSequence` that decodes items from this data stream. + /// + /// - Parameters: + /// - type: The `Decodable` type to decode each value into. + /// - mode: The stream format (`.jsonLines` or `.jsonArray`). + /// - options: Options for reading JSON. + /// - decoder: An optional ``ReerJSONDecoder``. If `nil`, uses a default decoder. + public func decode( + _ type: T.Type, + mode: JSONStreamMode = .jsonLines, + options: JSONReadOptions = .default, + decoder: ReerJSONDecoder? = nil + ) -> DecodingStream { + DecodingStream( + source: self, + mode: mode, options: options, + decoder: decoder ?? ReerJSONDecoder(), + type: type + ) + } +} + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension AsyncSequence where Element == UInt8, Self: Sendable { + + /// Returns an `AsyncSequence` of ``JSONValue`` items parsed from this + /// byte stream. + /// + /// Bytes are batched internally for efficient parsing. + /// + /// - Parameters: + /// - mode: The stream format (`.jsonLines` or `.jsonArray`). + /// - options: Options for reading JSON. + /// - chunkSize: Number of bytes to batch before parsing. Default is 4096. + public func jsonValues( + mode: JSONStreamMode = .jsonLines, + options: JSONReadOptions = .default, + chunkSize: Int = 4096 + ) -> JSONValueByteStream { + JSONValueByteStream( + source: self, mode: mode, + options: options, chunkSize: chunkSize + ) + } +} diff --git a/Sources/ReerJSON/StreamParser.swift b/Sources/ReerJSON/StreamParser.swift new file mode 100644 index 0000000..4df3eff --- /dev/null +++ b/Sources/ReerJSON/StreamParser.swift @@ -0,0 +1,511 @@ +// +// Copyright © 2026 reers. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +import yyjson +import Foundation + +// MARK: - JSONStreamMode + +/// The parsing mode for a stream of JSON data. +public enum JSONStreamMode: Sendable { + /// Each top-level JSON value is yielded individually (JSON Lines / NDJSON). + /// + /// Values may be separated by any JSON whitespace (space, tab, CR, LF); + /// strict NDJSON producers should separate values by a single `\n`. + case jsonLines + + /// The stream is a single JSON array whose elements are yielded one by one. + case jsonArray +} + +// MARK: - JSONStreamParser + +/// A streaming JSON parser that extracts individual ``JSONValue`` items from +/// a byte stream, supporting both JSON Lines and JSON Array modes. +/// +/// `JSONStreamParser` maintains an internal buffer. You feed data incrementally +/// with ``parse(_:)`` and receive fully-parsed ``JSONValue`` items as they +/// become available. Call ``finalize()`` when the stream ends to flush any +/// remaining buffered data. +/// +/// ## JSON Lines Mode +/// +/// Each top-level JSON value in the buffer is extracted as a separate item. +/// Values may span multiple ``parse(_:)`` calls. +/// +/// ```swift +/// var parser = JSONStreamParser(mode: .jsonLines) +/// let chunk1 = Data("{\"a\":1}\n{\"b\"".utf8) +/// let chunk2 = Data(":2}\n".utf8) +/// let values1 = try parser.parse(chunk1) // [{"a":1}] +/// let values2 = try parser.parse(chunk2) // [{"b":2}] +/// ``` +/// +/// ## JSON Array Mode +/// +/// The stream is expected to be a single JSON array (`[...]`). +/// Each array element is yielded individually. +/// +/// ```swift +/// var parser = JSONStreamParser(mode: .jsonArray) +/// let items = try parser.parse(Data("[1, 2, 3]".utf8)) +/// let remaining = try parser.finalize() +/// // items + remaining contain JSONValues for 1, 2, 3 +/// ``` +/// +/// ## Boundaries between chunks +/// +/// JSON tokens that have an explicit terminator (strings, objects, arrays, +/// `true`/`false`/`null`) are always parsed reliably across chunk boundaries. +/// Bare numeric tokens (e.g. `123` or `1.5e10`) are inherently ambiguous when +/// a chunk ends exactly at the end of the number — the next chunk may or may +/// not continue the number. To stay correct, the parser conservatively defers +/// any value whose parse ends exactly at the buffer end; it will be yielded +/// after the next ``parse(_:)`` chunk arrives, or on ``finalize()``. +/// +/// In practice, properly-formed NDJSON terminates every value with a newline, +/// so this deferral is invisible to the caller. +public struct JSONStreamParser: Sendable { + + /// The parsing mode. + public let mode: JSONStreamMode + + /// Options for reading JSON. + public let options: JSONReadOptions + + private var buffer: Data + private var readOffset: Int + private var arrayState: ArrayParseState + + /// The number of bytes buffered but not yet consumed. + public var pendingByteCount: Int { + buffer.count - readOffset + } + + /// Creates a new stream parser. + /// + /// - Parameters: + /// - mode: The stream format (`.jsonLines` or `.jsonArray`). + /// - options: Options for reading JSON. The parser internally combines + /// this with `YYJSON_READ_STOP_WHEN_DONE` for boundary detection. + public init(mode: JSONStreamMode, options: JSONReadOptions = .default) { + self.mode = mode + self.options = options + self.buffer = Data() + self.readOffset = 0 + self.arrayState = .expectOpenBracket + } + + // MARK: - Public Parse API + + /// Feeds data to the parser and returns all complete JSON values found. + public mutating func parse(_ data: Data) throws -> [JSONValue] { + if !data.isEmpty { buffer.append(data) } + return try drainValues(finalizing: false) + } + + /// Feeds raw bytes to the parser and returns all complete JSON values found. + public mutating func parse(bytes: UnsafeBufferPointer) throws -> [JSONValue] { + if let base = bytes.baseAddress, bytes.count > 0 { + buffer.append(base, count: bytes.count) + } + return try drainValues(finalizing: false) + } + + /// Signals end-of-stream and returns any remaining JSON values. + /// + /// After calling this method, the parser is in a finished state. + /// Call ``reset()`` to reuse it. + public mutating func finalize() throws -> [JSONValue] { + let results = try drainValues(finalizing: true) + try validateAtEndOfStream() + return results + } + + /// Resets the parser to its initial state, discarding all buffered data. + public mutating func reset() { + buffer.removeAll(keepingCapacity: true) + readOffset = 0 + arrayState = .expectOpenBracket + } + + // MARK: - Internal Slice API (for streaming decoders) + + /// Feeds data and returns raw byte slices for each parsed JSON value, + /// without constructing intermediate `JSONValue` instances. Used by the + /// streaming decoders to avoid an extra serialize-and-reparse round-trip. + internal mutating func parseSlices(_ data: Data) throws -> [Data] { + if !data.isEmpty { buffer.append(data) } + return try drainSlices(finalizing: false) + } + + internal mutating func finalizeSlices() throws -> [Data] { + let results = try drainSlices(finalizing: true) + try validateAtEndOfStream() + return results + } + + // MARK: - Private Types + + private enum ArrayParseState: Sendable { + case expectOpenBracket + case expectElementOrClose + case expectElementAfterComma + case expectCommaOrClose + case done + } + + /// One parsed value's metadata. + private struct ParsedItem { + /// yyjson document owning the parsed value. + let document: Document + /// Byte range of this value in `buffer` (relative to `buffer.startIndex`). + let range: Range + } + + // MARK: - Drain entry points + + private mutating func drainValues(finalizing: Bool) throws -> [JSONValue] { + compactIfNeeded() + let items = try drainItems(finalizing: finalizing) + var results: [JSONValue] = [] + results.reserveCapacity(items.count) + for item in items { + guard let root = item.document.root else { + throw JSONError.invalidData("Document has no root value") + } + results.append(JSONValue(value: root, document: item.document)) + } + return results + } + + private mutating func drainSlices(finalizing: Bool) throws -> [Data] { + compactIfNeeded() + let items = try drainItems(finalizing: finalizing) + var slices: [Data] = [] + slices.reserveCapacity(items.count) + let start = buffer.startIndex + for item in items { + slices.append(buffer.subdata(in: (start + item.range.lowerBound)..<(start + item.range.upperBound))) + } + return slices + } + + private mutating func drainItems(finalizing: Bool) throws -> [ParsedItem] { + var items: [ParsedItem] = [] + switch mode { + case .jsonLines: + try drainJSONLines(finalizing: finalizing) { items.append($0) } + case .jsonArray: + try drainJSONArray(finalizing: finalizing) { items.append($0) } + } + return items + } + + // MARK: - Mode-specific drain + + private mutating func drainJSONLines( + finalizing: Bool, + emit: (ParsedItem) throws -> Void + ) throws { + while true { + skipWhitespace() + guard readOffset < buffer.count else { break } + + guard let item = try parseOneItem(finalizing: finalizing) else { break } + try emit(item) + } + } + + private mutating func drainJSONArray( + finalizing: Bool, + emit: (ParsedItem) throws -> Void + ) throws { + loop: while true { + skipWhitespace() + + switch arrayState { + case .expectOpenBracket: + guard readOffset < buffer.count else { break loop } + let byte = buffer[buffer.startIndex + readOffset] + guard byte == UInt8(ascii: "[") else { + throw JSONError.invalidJSON("Expected '[' at start of JSON array stream") + } + readOffset += 1 + arrayState = .expectElementOrClose + + case .expectElementOrClose: + skipWhitespace() + guard readOffset < buffer.count else { break loop } + let byte = buffer[buffer.startIndex + readOffset] + if byte == UInt8(ascii: "]") { + readOffset += 1 + arrayState = .done + break loop + } + guard let item = try parseOneItem(finalizing: finalizing) else { break loop } + try emit(item) + arrayState = .expectCommaOrClose + + case .expectElementAfterComma: + skipWhitespace() + guard readOffset < buffer.count else { break loop } + let byte = buffer[buffer.startIndex + readOffset] + if byte == UInt8(ascii: "]") { + guard allowsTrailingCommas else { + throw JSONError.invalidJSON("Trailing comma is not allowed in JSON array stream") + } + readOffset += 1 + arrayState = .done + break loop + } + guard let item = try parseOneItem(finalizing: finalizing) else { break loop } + try emit(item) + arrayState = .expectCommaOrClose + + case .expectCommaOrClose: + skipWhitespace() + guard readOffset < buffer.count else { break loop } + let byte = buffer[buffer.startIndex + readOffset] + if byte == UInt8(ascii: ",") { + readOffset += 1 + arrayState = .expectElementAfterComma + } else if byte == UInt8(ascii: "]") { + readOffset += 1 + arrayState = .done + break loop + } else { + let char = Unicode.Scalar(byte) + throw JSONError.invalidJSON( + "Expected ',' or ']' in JSON array, got '\(char)'" + ) + } + + case .done: + break loop + } + } + + if arrayState == .done { + try validateNoTrailingArrayContent() + } + } + + // MARK: - Core Parse + + /// Attempts to parse one JSON value starting at `readOffset`. + /// + /// Returns `nil` when more data is required to confidently advance: + /// either yyjson reported incomplete input, or the parsed value ends + /// exactly at the buffer end and we are not finalizing (which would be + /// ambiguous for unterminated numeric tokens). + private mutating func parseOneItem(finalizing: Bool) throws -> ParsedItem? { + let available = buffer.count - readOffset + guard available > 0 else { return nil } + + // yyjson_read_opts() in non-INSITU mode allocates its own padded buffer + // and copies the input — so we do not need to add YYJSON_PADDING_SIZE + // bytes ourselves here. + let startOffset = readOffset + let result: Document.StreamParseResult = try buffer.withUnsafeBytes { buf in + guard let base = buf.baseAddress else { return .needMoreData } + let ptr = base.advanced(by: startOffset).assumingMemoryBound(to: UInt8.self) + return try Document.streamParse(bytes: ptr, count: available, options: options) + } + + switch result { + case .needMoreData: + return nil + case .success(let doc, let consumed): + let endOffset = startOffset + consumed + + // Boundary safety: + // A successful parse that ends exactly at the buffer end may be a + // truncated numeric token (e.g. we have "12" so far but the source + // is actually "123"). yyjson cannot tell the difference. Defer the + // value until either more data arrives or the caller finalizes. + if !finalizing && endOffset >= buffer.count { + return nil + } + readOffset = endOffset + return ParsedItem(document: doc, range: startOffset.. 0, readOffset > buffer.count / 2 else { return } + buffer.removeSubrange(buffer.startIndex ..< buffer.startIndex + readOffset) + readOffset = 0 + } + + private mutating func validateNoTrailingArrayContent() throws { + skipWhitespace() + guard readOffset < buffer.count else { return } + throw JSONError.invalidJSON("Unexpected content after JSON array stream") + } + + private mutating func validateAtEndOfStream() throws { + skipWhitespace() + if readOffset < buffer.count { + if mode == .jsonArray { + throw JSONError.invalidJSON("Unexpected end of JSON array stream") + } else { + throw JSONError.invalidJSON("Incomplete JSON value at end of stream") + } + } + if mode == .jsonArray && arrayState != .done { + throw JSONError.invalidJSON("Unexpected end of JSON array stream") + } + } +} + +// MARK: - JSONIncrementalReader + +/// The outcome of feeding a chunk of bytes to a ``JSONIncrementalReader``. +/// +/// This is intentionally a non-copyable enum rather than `JSONDocument?` +/// because ``JSONDocument`` is `~Copyable`, and `Optional<~Copyable>` is +/// not yet supported by Swift 5.10 (the minimum required by this package on +/// Linux). Using a custom enum keeps the API ergonomic on every supported +/// toolchain. +public enum JSONIncrementalReadResult: ~Copyable { + /// The reader has fully assembled a complete document. + case ready(JSONDocument) + /// The reader needs more input before a document can be produced. + case needMoreData +} + +/// An incremental reader for a single large JSON document. +/// +/// Feed chunks of a single large JSON document with ``feed(_:)``. +/// Data is accumulated internally. ``feed(_:)`` will attempt to parse the +/// accumulated buffer after each chunk; ``finish()`` parses any remaining +/// buffered data and finalizes the reader. +/// +/// ```swift +/// let reader = try JSONIncrementalReader() +/// for try await chunk in stream { +/// switch try reader.feed(chunk) { +/// case .ready(let doc): +/// // doc.root is now available +/// return doc +/// case .needMoreData: +/// continue +/// } +/// } +/// ``` +/// +/// - Note: For a document already fully in memory, prefer +/// ``JSONDocument/init(data:options:)`` which is faster. +/// This type is for when data arrives in chunks over the network. +/// +/// - Note: This type is internally synchronized; `feed`, `finish`, and +/// property access are safe to call concurrently from multiple threads. +/// +/// - Important: Internally, ``feed(_:)`` attempts a fresh parse over the +/// entire accumulated buffer on each call. For a document of total size +/// `N` arriving as `K` chunks, the cost is `O(N · K)`. When `K` is small +/// (e.g. a handful of large network chunks) this is effectively `O(N)`. +/// If you expect a very large number of small chunks, consider buffering +/// them yourself and constructing a single ``JSONDocument`` once you've +/// received the entire document. +public final class JSONIncrementalReader: @unchecked Sendable { + + private let lock = LockedState() + private var parser: IncrementalParser + private var finished: Bool = false + + /// The total number of buffered input bytes. + public var bufferedByteCount: Int { + lock.lock(); defer { lock.unlock() } + return parser.count + } + + /// Creates a new incremental reader. + /// + /// - Parameters: + /// - data: An initial chunk of JSON data (may be empty). + /// - options: Options for reading JSON. + public init( + data: Data = Data(), + options: JSONReadOptions = .default + ) throws { + self.parser = IncrementalParser(initialData: data, options: options) + } + + /// Feeds more data and attempts to parse the accumulated buffer. + /// + /// - Parameter data: Additional JSON data (may be empty to retry). + /// - Returns: ``JSONIncrementalReadResult/ready(_:)`` if the buffer + /// contains a complete document, or + /// ``JSONIncrementalReadResult/needMoreData`` if more data is required. + /// - Throws: ``JSONError`` for non-recoverable parse errors, or if the + /// reader has already produced a complete document. + public func feed(_ data: Data) throws -> JSONIncrementalReadResult { + lock.lock(); defer { lock.unlock() } + guard !finished else { + throw JSONError.invalidJSON("Incremental reader already finished") + } + parser.append(data) + switch try parser.read() { + case .success(let doc): + finished = true + return .ready(JSONDocument(_document: doc)) + case .needMoreData: + return .needMoreData + } + } + + /// Signals end-of-stream and returns the completed document. + /// + /// - Returns: The parsed ``JSONDocument``. + /// - Throws: ``JSONError`` if the document is incomplete or malformed. + public func finish() throws -> JSONDocument { + lock.lock(); defer { lock.unlock() } + guard !finished else { + throw JSONError.invalidJSON("Incremental reader already finished") + } + switch try parser.read() { + case .success(let doc): + finished = true + return JSONDocument(_document: doc) + case .needMoreData: + throw JSONError.invalidJSON("Incomplete JSON value at end of stream") + } + } +} diff --git a/Sources/ReerJSON/Value.swift b/Sources/ReerJSON/Value.swift index 6305394..3af6876 100644 --- a/Sources/ReerJSON/Value.swift +++ b/Sources/ReerJSON/Value.swift @@ -105,6 +105,58 @@ internal final class Document: @unchecked Sendable { self.doc = doc } + /// Result of a streaming parse attempt. + enum StreamParseResult { + case success(Document, consumedBytes: Int) + case needMoreData + } + + /// Attempts to parse one JSON value from bytes with `STOP_WHEN_DONE`. + /// + /// The bytes do not need to be padded; for non-INSITU parsing yyjson + /// allocates and pads its own internal buffer. + /// + /// - Parameters: + /// - bytes: Pointer to the JSON bytes. + /// - count: Number of valid bytes. + /// - options: Options for reading the JSON. + /// - Returns: `.success` with consumed byte count, or `.needMoreData` if incomplete. + /// - Throws: `JSONError` for non-recoverable parse errors. + static func streamParse( + bytes: UnsafePointer, count: Int, + options: JSONReadOptions + ) throws -> StreamParseResult { + guard count > 0 else { return .needMoreData } + + var error = yyjson_read_err() + var flags = options.yyjsonFlags + flags |= YYJSON_READ_STOP_WHEN_DONE + flags &= ~yyjson_read_flag(YYJSON_READ_INSITU) + + let ptr = UnsafeMutablePointer( + mutating: UnsafeRawPointer(bytes).assumingMemoryBound(to: CChar.self) + ) + let result = yyjson_read_opts(ptr, count, flags, nil, &error) + + if let doc = result { + let consumed = yyjson_doc_get_read_size(doc) + return .success(Document(alreadyParsed: doc), consumedBytes: consumed) + } + + if error.code == YYJSON_READ_ERROR_UNEXPECTED_END + || error.code == YYJSON_READ_ERROR_EMPTY_CONTENT { + return .needMoreData + } + + throw JSONError(parsing: error) + } + + /// Adopts an already-parsed yyjson_doc, taking ownership. + init(alreadyParsed doc: UnsafeMutablePointer) { + self.doc = doc + self.retainedData = nil + } + deinit { yyjson_doc_free(doc) } @@ -114,6 +166,62 @@ internal final class Document: @unchecked Sendable { } } +// MARK: - IncrementalParser (Internal) + +/// Internal accumulator used by ``JSONIncrementalReader``. +/// +/// We can't use yyjson's `yyjson_incr_*` API directly here because: +/// - Non-INSITU `yyjson_incr_new` takes a snapshot of the input buffer at the +/// time of the call (it `memcpy`s `buf_len` bytes into its own storage), so +/// later appends to our buffer are invisible to it; calling `yyjson_incr_read` +/// with a larger `len` fails with `INVALID_PARAMETER`. +/// - INSITU mode requires a stable, pre-sized buffer and modifies the input +/// in place for string unescaping; growing the buffer invalidates the state +/// and the in-place modifications make restart-from-scratch fragile. +/// +/// Instead we keep the data in a growable `Data` buffer and try a fresh +/// `STOP_WHEN_DONE` parse on each `read()`. This is `O(N²)` worst-case across +/// many small chunks, but is correct in all cases and is `O(N)` total when +/// data arrives in a small number of large chunks (the common case for +/// network streaming). +internal final class IncrementalParser { + private var buffer: Data + private let options: JSONReadOptions + + var count: Int { buffer.count } + + init(initialData: Data, options: JSONReadOptions) { + self.buffer = initialData + self.options = options + } + + func append(_ data: Data) { + guard !data.isEmpty else { return } + buffer.append(data) + } + + /// Result of an incremental read. + enum ReadResult { + case success(Document) + case needMoreData + } + + func read() throws -> ReadResult { + guard !buffer.isEmpty else { return .needMoreData } + + do { + let doc = try Document(data: buffer, options: options) + return .success(doc) + } catch let error as JSONError { + if error.readErrorCode == UInt32(YYJSON_READ_ERROR_UNEXPECTED_END) + || error.readErrorCode == UInt32(YYJSON_READ_ERROR_EMPTY_CONTENT) { + return .needMoreData + } + throw error + } + } +} + // MARK: - Document (Public) /// A parsed JSON document that owns the underlying memory. @@ -149,6 +257,11 @@ internal final class Document: @unchecked Sendable { public struct JSONDocument: ~Copyable, @unchecked Sendable { internal let _document: Document + /// Creates a document from a pre-parsed internal document. + internal init(_document: Document) { + self._document = _document + } + /// Creates a document by parsing JSON data. /// /// - Parameters: diff --git a/Tests/ReerJSONTests/StreamParserTests.swift b/Tests/ReerJSONTests/StreamParserTests.swift new file mode 100644 index 0000000..d4458bb --- /dev/null +++ b/Tests/ReerJSONTests/StreamParserTests.swift @@ -0,0 +1,662 @@ +// +// Copyright © 2026 reers. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +import XCTest +@testable import ReerJSON + +// MARK: - Test Helpers + +private struct Item: Codable, Equatable, Sendable { + let id: Int + let name: String +} + +// MARK: - JSON Lines Tests + +final class JSONStreamParserJSONLinesTests: XCTestCase { + + func testSingleCompleteChunk() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let data = Data("{\"id\":1,\"name\":\"a\"}\n{\"id\":2,\"name\":\"b\"}\n".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + XCTAssertEqual(values[0]["id"]?.int64, 1) + XCTAssertEqual(values[1]["name"]?.string, "b") + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testCrossChunkValues() throws { + var parser = JSONStreamParser(mode: .jsonLines) + + let chunk1 = Data("{\"id\":1}\n{\"id\"".utf8) + let values1 = try parser.parse(chunk1) + XCTAssertEqual(values1.count, 1) + XCTAssertEqual(values1[0]["id"]?.int64, 1) + + let chunk2 = Data(":2}\n".utf8) + let values2 = try parser.parse(chunk2) + XCTAssertEqual(values2.count, 1) + XCTAssertEqual(values2[0]["id"]?.int64, 2) + + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testEmptyLines() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let data = Data("\n\n{\"x\":1}\n\n\n{\"x\":2}\n\n".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testWhitespacePadding() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let data = Data(" {\"a\":1} \n {\"a\":2} ".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testVariousTypes() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let data = Data("42\n\"hello\"\ntrue\nnull\n[1,2]\n{\"k\":\"v\"}\n".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 6) + XCTAssertEqual(values[0].int64, 42) + XCTAssertEqual(values[1].string, "hello") + XCTAssertEqual(values[2].bool, true) + XCTAssertTrue(values[3].isNull) + XCTAssertEqual(values[4].array?.count, 2) + XCTAssertEqual(values[5]["k"]?.string, "v") + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testEmptyDataParse() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let values = try parser.parse(Data()) + XCTAssertTrue(values.isEmpty) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testIncompleteJSONAtFinalize() throws { + var parser = JSONStreamParser(mode: .jsonLines) + _ = try parser.parse(Data("{\"id\":1}".utf8)) + _ = try parser.parse(Data("{\"incomplete".utf8)) + XCTAssertThrowsError(try parser.finalize()) + } + + // MARK: - Boundary / cross-chunk correctness + + /// Regression: yyjson with STOP_WHEN_DONE will happily parse `1` from + /// a buffer that ends exactly at "1" — but the real input might be + /// "12345". The parser must defer such tokens until the next chunk or + /// `finalize()` confirms there is no continuation. + func testCrossChunkSplitNumber() throws { + var parser = JSONStreamParser(mode: .jsonLines) + // Send "12345\n" split as "1" / "234" / "5\n". + let v1 = try parser.parse(Data("1".utf8)) + XCTAssertTrue(v1.isEmpty, "Bare number at end of buffer must not be yielded yet") + let v2 = try parser.parse(Data("234".utf8)) + XCTAssertTrue(v2.isEmpty) + let v3 = try parser.parse(Data("5\n".utf8)) + XCTAssertEqual(v3.count, 1) + XCTAssertEqual(v3[0].int64, 12345) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testCrossChunkSplitFloat() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let v1 = try parser.parse(Data("1.2".utf8)) + XCTAssertTrue(v1.isEmpty) + let v2 = try parser.parse(Data("3e".utf8)) + XCTAssertTrue(v2.isEmpty) + let v3 = try parser.parse(Data("4\n".utf8)) + XCTAssertEqual(v3.count, 1) + XCTAssertEqual(v3[0].number, 1.23e4) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + /// Strings end with `"`, so yyjson can detect truncation directly. + /// Splitting them mid-token must still work. + func testCrossChunkSplitInsideString() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let v1 = try parser.parse(Data("\"hel".utf8)) + XCTAssertTrue(v1.isEmpty) + let v2 = try parser.parse(Data("lo\"\n".utf8)) + XCTAssertEqual(v2.count, 1) + XCTAssertEqual(v2[0].string, "hello") + } + + /// A value that ends exactly at the buffer boundary without a trailing + /// terminator (e.g. {"a":1} with no newline) should still surface on + /// finalize(). + func testFinalizeFlushesTrailingValueWithoutNewline() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let v1 = try parser.parse(Data("{\"a\":1}".utf8)) + // May or may not be yielded depending on buffer-end heuristic; the + // important contract is that finalize() yields it. + let remaining = try parser.finalize() + let total = v1 + remaining + XCTAssertEqual(total.count, 1) + XCTAssertEqual(total[0]["a"]?.int64, 1) + } +} + +// MARK: - JSON Array Tests + +final class JSONStreamParserJSONArrayTests: XCTestCase { + + func testNormalArray() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[1, 2, 3]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 3) + XCTAssertEqual(values[0].int64, 1) + XCTAssertEqual(values[1].int64, 2) + XCTAssertEqual(values[2].int64, 3) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testNestedObjects() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[{\"a\":1},{\"b\":[2,3]},{\"c\":{\"d\":4}}]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 3) + XCTAssertEqual(values[0]["a"]?.int64, 1) + XCTAssertEqual(values[1]["b"]?.array?.count, 2) + XCTAssertEqual(values[2]["c"]?["d"]?.int64, 4) + } + + func testNestedArrays() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[[1,2],[3,[4,5]]]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + XCTAssertEqual(values[0].array?.count, 2) + } + + func testEmptyArray() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[]".utf8) + let values = try parser.parse(data) + XCTAssertTrue(values.isEmpty) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testTrailingCommaWithOption() throws { + var parser = JSONStreamParser(mode: .jsonArray, options: .allowTrailingCommas) + let data = Data("[1, 2, 3,]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 3) + } + + func testTrailingCommaWithoutOptionThrows() throws { + var parser = JSONStreamParser(mode: .jsonArray) + XCTAssertThrowsError(try parser.parse(Data("[1, 2, 3,]".utf8))) + } + + func testTrailingCommaWithJSON5Option() throws { + var parser = JSONStreamParser(mode: .jsonArray, options: .json5) + let values = try parser.parse(Data("[1, 2, 3,]".utf8)) + XCTAssertEqual(values.map(\.int64), [1, 2, 3]) + } + + func testCrossChunkArray() throws { + var parser = JSONStreamParser(mode: .jsonArray) + + let chunk1 = Data("[{\"id\":1},".utf8) + let values1 = try parser.parse(chunk1) + XCTAssertEqual(values1.count, 1) + XCTAssertEqual(values1[0]["id"]?.int64, 1) + + let chunk2 = Data("{\"id\":2}]".utf8) + let values2 = try parser.parse(chunk2) + XCTAssertEqual(values2.count, 1) + XCTAssertEqual(values2[0]["id"]?.int64, 2) + + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testArrayWithWhitespace() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data(" [ 1 , 2 , 3 ] ".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 3) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testMissingOpenBracket() throws { + var parser = JSONStreamParser(mode: .jsonArray) + XCTAssertThrowsError(try parser.parse(Data("1, 2, 3]".utf8))) + } + + func testIncompleteArray() throws { + var parser = JSONStreamParser(mode: .jsonArray) + _ = try parser.parse(Data("[1, 2".utf8)) + XCTAssertThrowsError(try parser.finalize()) + } + + func testEmptyArrayStreamAtFinalizeThrows() throws { + var parser = JSONStreamParser(mode: .jsonArray) + XCTAssertThrowsError(try parser.finalize()) + } + + func testTrailingContentAfterArrayThrows() throws { + var parser = JSONStreamParser(mode: .jsonArray) + XCTAssertThrowsError(try parser.parse(Data("[1] 2".utf8))) + } + + /// Regression for cross-chunk numeric splitting inside an array. + /// Without the buffer-end deferral, the parser would commit `1` from + /// `[1` and then choke on `2` when the second chunk arrives. + func testCrossChunkArraySplitNumber() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let v1 = try parser.parse(Data("[1".utf8)) + XCTAssertTrue(v1.isEmpty, "Bare number at buffer end must defer") + let v2 = try parser.parse(Data("23,456]".utf8)) + XCTAssertEqual(v2.map(\.int64), [123, 456]) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testCrossChunkArrayTinyChunks() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let chunks = ["[", "12", "3,", "45", "6", ",", "789", "]"] + var all: [JSONValue] = [] + for chunk in chunks { + all += try parser.parse(Data(chunk.utf8)) + } + all += try parser.finalize() + XCTAssertEqual(all.map(\.int64), [123, 456, 789]) + } + + func testStringElements() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[\"hello\", \"world\"]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + XCTAssertEqual(values[0].string, "hello") + XCTAssertEqual(values[1].string, "world") + } + + func testMixedTypes() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[1, \"two\", true, null, {\"k\":\"v\"}, [3]]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 6) + XCTAssertEqual(values[0].int64, 1) + XCTAssertEqual(values[1].string, "two") + XCTAssertEqual(values[2].bool, true) + XCTAssertTrue(values[3].isNull) + } +} + +// MARK: - Incremental Reader Tests + +final class JSONIncrementalReaderTests: XCTestCase { + + func testSingleChunk() throws { + let reader = try JSONIncrementalReader(data: Data("{\"key\":\"value\"}".utf8)) + let doc = try reader.finish() + XCTAssertEqual(doc.root?["key"]?.string, "value") + } + + func testMultipleChunks() throws { + let reader = try JSONIncrementalReader(data: Data("{\"ke".utf8)) + // First feed should need more data. + switch try reader.feed(Data("y\":\"val".utf8)) { + case .ready(let doc): + XCTFail("Should need more data, got doc with root: \(String(describing: doc.root))") + case .needMoreData: + break + } + // Second feed should complete. + switch try reader.feed(Data("ue\"}".utf8)) { + case .ready(let doc): + XCTAssertEqual(doc.root?["key"]?.string, "value") + case .needMoreData: + XCTFail("Should have completed parsing") + } + } + + func testLargerDocument() throws { + var items: [[String: Any]] = [] + for i in 0..<100 { + items.append(["id": i, "name": "item_\(i)"]) + } + let jsonData = try JSONSerialization.data(withJSONObject: items) + + let chunkSize = 64 + let firstChunk = Data(jsonData[0.. 0) + } + + func testVeryLargeObject() throws { + var parser = JSONStreamParser(mode: .jsonLines) + var json = "{\"data\":\"" + for _ in 0..<10_000 { + json += "x" + } + json += "\"}\n" + let values = try parser.parse(Data(json.utf8)) + XCTAssertEqual(values.count, 1) + } + + func testArrayResetAndReuse() throws { + var parser = JSONStreamParser(mode: .jsonArray) + _ = try parser.parse(Data("[1,2]".utf8)) + _ = try parser.finalize() + + parser.reset() + let values = try parser.parse(Data("[3,4]".utf8)) + XCTAssertEqual(values.count, 2) + XCTAssertEqual(values[0].int64, 3) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } +} + +// MARK: - Codable Decoder Tests + +final class StreamingDecoderTests: XCTestCase { + + func testJSONLinesDecoder() throws { + var decoder = StreamingJSONLinesDecoder(Item.self) + let data = Data("{\"id\":1,\"name\":\"a\"}\n{\"id\":2,\"name\":\"b\"}\n".utf8) + let items = try decoder.parseBuffer(data) + XCTAssertEqual(items, [Item(id: 1, name: "a"), Item(id: 2, name: "b")]) + let remaining = try decoder.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testJSONLinesDecoderCrossChunk() throws { + var decoder = StreamingJSONLinesDecoder(Item.self) + let items1 = try decoder.parseBuffer(Data("{\"id\":1,\"name\":\"a\"}\n{\"id\"".utf8)) + XCTAssertEqual(items1, [Item(id: 1, name: "a")]) + let items2 = try decoder.parseBuffer(Data(":2,\"name\":\"b\"}\n".utf8)) + XCTAssertEqual(items2, [Item(id: 2, name: "b")]) + let remaining = try decoder.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testJSONArrayDecoder() throws { + var decoder = StreamingJSONArrayDecoder(Item.self) + let data = Data("[{\"id\":1,\"name\":\"a\"},{\"id\":2,\"name\":\"b\"}]".utf8) + let items = try decoder.parseBuffer(data) + XCTAssertEqual(items, [Item(id: 1, name: "a"), Item(id: 2, name: "b")]) + let remaining = try decoder.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testJSONArrayDecoderCrossChunk() throws { + var decoder = StreamingJSONArrayDecoder(Item.self) + let items1 = try decoder.parseBuffer(Data("[{\"id\":1,\"name\":\"a\"},".utf8)) + XCTAssertEqual(items1, [Item(id: 1, name: "a")]) + let items2 = try decoder.parseBuffer(Data("{\"id\":2,\"name\":\"b\"}]".utf8)) + XCTAssertEqual(items2, [Item(id: 2, name: "b")]) + let remaining = try decoder.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testDecoderReset() throws { + var decoder = StreamingJSONLinesDecoder(Item.self) + _ = try decoder.parseBuffer(Data("{\"id\":1,\"name\":\"a\"}\n".utf8)) + decoder.reset() + let items = try decoder.parseBuffer(Data("{\"id\":2,\"name\":\"b\"}\n".utf8)) + XCTAssertEqual(items, [Item(id: 2, name: "b")]) + } +} + +// MARK: - AsyncSequence Tests + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +final class AsyncStreamTests: XCTestCase { + + func testJSONValueStream() async throws { + let chunks: [Data] = [ + Data("{\"id\":1}\n{\"id\"".utf8), + Data(":2}\n{\"id\":3}\n".utf8) + ] + let stream = AsyncStream { continuation in + for chunk in chunks { + continuation.yield(chunk) + } + continuation.finish() + } + + var values: [JSONValue] = [] + for try await value in stream.jsonValues(mode: .jsonLines) { + values.append(value) + } + XCTAssertEqual(values.count, 3) + XCTAssertEqual(values[0]["id"]?.int64, 1) + XCTAssertEqual(values[1]["id"]?.int64, 2) + XCTAssertEqual(values[2]["id"]?.int64, 3) + } + + func testDecodingStream() async throws { + let chunks: [Data] = [ + Data("{\"id\":1,\"name\":\"a\"}\n".utf8), + Data("{\"id\":2,\"name\":\"b\"}\n".utf8) + ] + let stream = AsyncStream { continuation in + for chunk in chunks { + continuation.yield(chunk) + } + continuation.finish() + } + + var items: [Item] = [] + for try await item in stream.decode(Item.self, mode: .jsonLines) { + items.append(item) + } + XCTAssertEqual(items, [Item(id: 1, name: "a"), Item(id: 2, name: "b")]) + } + + func testJSONArrayValueStream() async throws { + let chunks: [Data] = [ + Data("[{\"id\":1,\"name\":\"a\"},".utf8), + Data("{\"id\":2,\"name\":\"b\"}]".utf8) + ] + let stream = AsyncStream { continuation in + for chunk in chunks { + continuation.yield(chunk) + } + continuation.finish() + } + + var values: [JSONValue] = [] + for try await value in stream.jsonValues(mode: .jsonArray) { + values.append(value) + } + XCTAssertEqual(values.count, 2) + } + + func testDecodingStreamArrayMode() async throws { + let chunks: [Data] = [ + Data("[{\"id\":1,\"name\":\"x\"},".utf8), + Data("{\"id\":2,\"name\":\"y\"}]".utf8) + ] + let stream = AsyncStream { continuation in + for chunk in chunks { + continuation.yield(chunk) + } + continuation.finish() + } + + var items: [Item] = [] + for try await item in stream.decode(Item.self, mode: .jsonArray) { + items.append(item) + } + XCTAssertEqual(items, [Item(id: 1, name: "x"), Item(id: 2, name: "y")]) + } + + func testEmptyStream() async throws { + let stream = AsyncStream { continuation in + continuation.finish() + } + + var count = 0 + for try await _ in stream.jsonValues(mode: .jsonLines) { + count += 1 + } + XCTAssertEqual(count, 0) + } +}