From f4029b53346a419500825c6be1aa9a55da8f18e7 Mon Sep 17 00:00:00 2001 From: phoenix Date: Sat, 25 Apr 2026 21:50:14 +0800 Subject: [PATCH 1/4] feat: add streaming JSON parsing API - JSONStreamParser: push-based streaming parser for JSON Lines and JSON Array modes - JSON Lines: extracts multiple JSON documents from a byte stream using STOP_WHEN_DONE - JSON Array: state machine to parse elements from a large JSON array one by one - Internal buffer with lazy compaction for efficient memory management - JSONIncrementalReader: accumulates chunks for large single-document parsing - StreamingJSONLinesDecoder / StreamingJSONArrayDecoder: Codable-layer streaming decoders - JSONValueStream / DecodingStream: AsyncSequence adapters for async byte streams - AsyncSequence extensions: .jsonValues() and .decode() convenience methods - Document.streamParse: internal API using yyjson_doc_get_read_size for accurate byte counting - 33 new tests covering JSON Lines, JSON Array, incremental, edge cases, and Codable layer - All 755 existing tests pass with zero regressions --- Sources/ReerJSON/StreamDecoder.swift | 459 ++++++++++++++++++ Sources/ReerJSON/StreamParser.swift | 386 +++++++++++++++ Sources/ReerJSON/Value.swift | 64 +++ Tests/ReerJSONTests/StreamParserTests.swift | 491 ++++++++++++++++++++ 4 files changed, 1400 insertions(+) create mode 100644 Sources/ReerJSON/StreamDecoder.swift create mode 100644 Sources/ReerJSON/StreamParser.swift create mode 100644 Tests/ReerJSONTests/StreamParserTests.swift diff --git a/Sources/ReerJSON/StreamDecoder.swift b/Sources/ReerJSON/StreamDecoder.swift new file mode 100644 index 0000000..40a1256 --- /dev/null +++ b/Sources/ReerJSON/StreamDecoder.swift @@ -0,0 +1,459 @@ +// +// Copyright © 2026 reers. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +import Foundation + +// MARK: - StreamingJSONLinesDecoder + +/// A streaming decoder for JSON Lines (NDJSON) format. +/// +/// Each top-level JSON value in the stream is decoded into `T`. +/// +/// ```swift +/// var decoder = StreamingJSONLinesDecoder(Item.self) +/// let items1 = try decoder.parseBuffer(chunk1) +/// let items2 = try decoder.parseBuffer(chunk2) +/// let remaining = try decoder.finalize() +/// ``` +public struct StreamingJSONLinesDecoder: @unchecked Sendable { + + private var parser: JSONStreamParser + // ReerJSONDecoder uses internal locking, safe to share. + private let decoder: ReerJSONDecoder + private let type: T.Type + + /// Creates a new JSON Lines streaming decoder. + /// + /// - Parameters: + /// - type: The `Decodable` type to decode each value into. + /// - options: Options for reading JSON. + /// - decoder: An optional ``ReerJSONDecoder`` with custom strategies. + /// If `nil`, a default decoder is used. + public init( + _ type: T.Type, + options: JSONReadOptions = .default, + decoder: ReerJSONDecoder? = nil + ) { + self.type = type + self.parser = JSONStreamParser(mode: .jsonLines, options: options) + self.decoder = decoder ?? ReerJSONDecoder() + } + + /// Feeds data to the decoder and returns all decoded values. + /// + /// - Parameter data: New data to append. + /// - Returns: An array of decoded `T` values. + /// - Throws: ``JSONError`` or `DecodingError` on failure. + public mutating func parseBuffer(_ data: Data) throws -> [T] { + let values = try parser.parse(data) + return try values.map { value in + let data = try value.data() + return try decoder.decode(type, from: data) + } + } + + /// Signals end-of-stream and returns any remaining decoded values. + /// + /// - Returns: An array of remaining decoded `T` values. + /// - Throws: ``JSONError`` or `DecodingError` on failure. + public mutating func finalize() throws -> [T] { + let values = try parser.finalize() + return try values.map { value in + let data = try value.data() + return try decoder.decode(type, from: data) + } + } + + /// Resets the decoder to its initial state. + public mutating func reset() { + parser.reset() + } +} + +// MARK: - StreamingJSONArrayDecoder + +/// A streaming decoder for JSON array format. +/// +/// The stream is expected to be a single JSON array. Each element is decoded +/// individually as it becomes available. +/// +/// ```swift +/// var decoder = StreamingJSONArrayDecoder(Item.self) +/// let items1 = try decoder.parseBuffer(chunk1) +/// let items2 = try decoder.parseBuffer(chunk2) +/// let remaining = try decoder.finalize() +/// ``` +public struct StreamingJSONArrayDecoder: @unchecked Sendable { + + private var parser: JSONStreamParser + // ReerJSONDecoder uses internal locking, safe to share. + private let decoder: ReerJSONDecoder + private let type: T.Type + + /// Creates a new JSON array streaming decoder. + /// + /// - Parameters: + /// - type: The `Decodable` type to decode each element into. + /// - options: Options for reading JSON. + /// - decoder: An optional ``ReerJSONDecoder`` with custom strategies. + /// If `nil`, a default decoder is used. + public init( + _ type: T.Type, + options: JSONReadOptions = .default, + decoder: ReerJSONDecoder? = nil + ) { + self.type = type + self.parser = JSONStreamParser(mode: .jsonArray, options: options) + self.decoder = decoder ?? ReerJSONDecoder() + } + + /// Feeds data to the decoder and returns all decoded elements. + /// + /// - Parameter data: New data to append. + /// - Returns: An array of decoded `T` values. + /// - Throws: ``JSONError`` or `DecodingError` on failure. + public mutating func parseBuffer(_ data: Data) throws -> [T] { + let values = try parser.parse(data) + return try values.map { value in + let data = try value.data() + return try decoder.decode(type, from: data) + } + } + + /// Signals end-of-stream and returns any remaining decoded elements. + /// + /// - Returns: An array of remaining decoded `T` values. + /// - Throws: ``JSONError`` or `DecodingError` on failure. + public mutating func finalize() throws -> [T] { + let values = try parser.finalize() + return try values.map { value in + let data = try value.data() + return try decoder.decode(type, from: data) + } + } + + /// Resets the decoder to its initial state. + public mutating func reset() { + parser.reset() + } +} + +// MARK: - AsyncSequence Adapters + +/// An `AsyncSequence` that yields ``JSONValue`` items from chunks of `Data`. +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public struct JSONValueStream: AsyncSequence, Sendable +where Source.Element == Data { + public typealias Element = JSONValue + + let source: Source + let mode: JSONStreamMode + let options: JSONReadOptions + + public func makeAsyncIterator() -> Iterator { + Iterator(source: source.makeAsyncIterator(), mode: mode, options: options) + } + + public struct Iterator: AsyncIteratorProtocol { + var sourceIterator: Source.AsyncIterator + var parser: JSONStreamParser + var pending: [JSONValue] = [] + var pendingIndex: Int = 0 + var sourceExhausted = false + + init(source: Source.AsyncIterator, mode: JSONStreamMode, options: JSONReadOptions) { + self.sourceIterator = source + self.parser = JSONStreamParser(mode: mode, options: options) + } + + public mutating func next() async throws -> JSONValue? { + while true { + if pendingIndex < pending.count { + let value = pending[pendingIndex] + pendingIndex += 1 + if pendingIndex >= pending.count { + pending.removeAll(keepingCapacity: true) + pendingIndex = 0 + } + return value + } + + if sourceExhausted { + return nil + } + + guard let chunk = try await sourceIterator.next() else { + sourceExhausted = true + let remaining = try parser.finalize() + if !remaining.isEmpty { + pending = remaining + pendingIndex = 0 + continue + } + return nil + } + + let values = try parser.parse(chunk) + if !values.isEmpty { + pending = values + pendingIndex = 0 + } + } + } + } +} + +/// An `AsyncSequence` that yields ``JSONValue`` items from an `AsyncSequence` of bytes. +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +public struct JSONValueByteStream: AsyncSequence, Sendable +where Source.Element == UInt8 { + public typealias Element = JSONValue + + let source: Source + let mode: JSONStreamMode + let options: JSONReadOptions + let chunkSize: Int + + public func makeAsyncIterator() -> Iterator { + Iterator( + source: source.makeAsyncIterator(), + mode: mode, options: options, + chunkSize: chunkSize + ) + } + + public struct Iterator: AsyncIteratorProtocol { + var sourceIterator: Source.AsyncIterator + var parser: JSONStreamParser + var pending: [JSONValue] = [] + var pendingIndex: Int = 0 + var sourceExhausted = false + let chunkSize: Int + + init( + source: Source.AsyncIterator, + mode: JSONStreamMode, options: JSONReadOptions, + chunkSize: Int + ) { + self.sourceIterator = source + self.parser = JSONStreamParser(mode: mode, options: options) + self.chunkSize = chunkSize + } + + public mutating func next() async throws -> JSONValue? { + while true { + if pendingIndex < pending.count { + let value = pending[pendingIndex] + pendingIndex += 1 + if pendingIndex >= pending.count { + pending.removeAll(keepingCapacity: true) + pendingIndex = 0 + } + return value + } + + if sourceExhausted { + return nil + } + + var chunk = Data() + chunk.reserveCapacity(chunkSize) + for _ in 0..: + AsyncSequence, @unchecked Sendable +where Source.Element == Data { + public typealias Element = T + + let source: Source + let mode: JSONStreamMode + let options: JSONReadOptions + let decoder: ReerJSONDecoder + let type: T.Type + + public func makeAsyncIterator() -> Iterator { + Iterator( + source: source.makeAsyncIterator(), + mode: mode, options: options, + decoder: decoder, type: type + ) + } + + public struct Iterator: AsyncIteratorProtocol { + var sourceIterator: Source.AsyncIterator + var parser: JSONStreamParser + var pending: [T] = [] + var pendingIndex: Int = 0 + var sourceExhausted = false + let decoder: ReerJSONDecoder + let type: T.Type + + init( + source: Source.AsyncIterator, + mode: JSONStreamMode, options: JSONReadOptions, + decoder: ReerJSONDecoder, type: T.Type + ) { + self.sourceIterator = source + self.parser = JSONStreamParser(mode: mode, options: options) + self.decoder = decoder + self.type = type + } + + public mutating func next() async throws -> T? { + while true { + if pendingIndex < pending.count { + let value = pending[pendingIndex] + pendingIndex += 1 + if pendingIndex >= pending.count { + pending.removeAll(keepingCapacity: true) + pendingIndex = 0 + } + return value + } + + if sourceExhausted { + return nil + } + + guard let chunk = try await sourceIterator.next() else { + sourceExhausted = true + let remaining = try parser.finalize() + if !remaining.isEmpty { + pending = try remaining.map { value in + let data = try value.data() + return try decoder.decode(type, from: data) + } + pendingIndex = 0 + continue + } + return nil + } + + let values = try parser.parse(chunk) + if !values.isEmpty { + pending = try values.map { value in + let data = try value.data() + return try decoder.decode(type, from: data) + } + pendingIndex = 0 + } + } + } + } +} + +// MARK: - AsyncSequence Extensions + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension AsyncSequence where Element == Data, Self: Sendable { + + /// Returns an `AsyncSequence` of ``JSONValue`` items parsed from this + /// data stream. + /// + /// - Parameters: + /// - mode: The stream format (`.jsonLines` or `.jsonArray`). + /// - options: Options for reading JSON. + /// - Returns: A ``JSONValueStream`` yielding parsed values. + public func jsonValues( + mode: JSONStreamMode = .jsonLines, + options: JSONReadOptions = .default + ) -> JSONValueStream { + JSONValueStream(source: self, mode: mode, options: options) + } + + /// Returns an `AsyncSequence` that decodes items from this data stream. + /// + /// - Parameters: + /// - type: The `Decodable` type to decode each value into. + /// - mode: The stream format (`.jsonLines` or `.jsonArray`). + /// - options: Options for reading JSON. + /// - decoder: An optional ``ReerJSONDecoder``. If `nil`, uses a default decoder. + /// - Returns: A ``DecodingStream`` yielding decoded values. + public func decode( + _ type: T.Type, + mode: JSONStreamMode = .jsonLines, + options: JSONReadOptions = .default, + decoder: ReerJSONDecoder? = nil + ) -> DecodingStream { + DecodingStream( + source: self, + mode: mode, options: options, + decoder: decoder ?? ReerJSONDecoder(), + type: type + ) + } +} + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +extension AsyncSequence where Element == UInt8, Self: Sendable { + + /// Returns an `AsyncSequence` of ``JSONValue`` items parsed from this + /// byte stream. + /// + /// Bytes are batched internally for efficient parsing. + /// + /// - Parameters: + /// - mode: The stream format (`.jsonLines` or `.jsonArray`). + /// - options: Options for reading JSON. + /// - chunkSize: Number of bytes to batch before parsing. Default is 4096. + /// - Returns: A ``JSONValueByteStream`` yielding parsed values. + public func jsonValues( + mode: JSONStreamMode = .jsonLines, + options: JSONReadOptions = .default, + chunkSize: Int = 4096 + ) -> JSONValueByteStream { + JSONValueByteStream( + source: self, mode: mode, + options: options, chunkSize: chunkSize + ) + } +} diff --git a/Sources/ReerJSON/StreamParser.swift b/Sources/ReerJSON/StreamParser.swift new file mode 100644 index 0000000..2fbbf74 --- /dev/null +++ b/Sources/ReerJSON/StreamParser.swift @@ -0,0 +1,386 @@ +// +// Copyright © 2026 reers. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +import yyjson +import Foundation + +// MARK: - JSONStreamMode + +/// The parsing mode for a stream of JSON data. +public enum JSONStreamMode: Sendable { + /// Each line is an independent JSON value (JSON Lines / NDJSON). + case jsonLines + /// The stream is a single JSON array whose elements are yielded one by one. + case jsonArray +} + +// MARK: - JSONStreamParser + +/// A streaming JSON parser that extracts individual ``JSONValue`` items from +/// a byte stream, supporting both JSON Lines and JSON Array modes. +/// +/// `JSONStreamParser` maintains an internal buffer. You feed data incrementally +/// with ``parse(_:)`` and receive fully-parsed ``JSONValue`` items as they +/// become available. Call ``finalize()`` when the stream ends to flush any +/// remaining buffered data. +/// +/// ## JSON Lines Mode +/// +/// Each top-level JSON value in the buffer is extracted as a separate item. +/// Values may span multiple ``parse(_:)`` calls. +/// +/// ```swift +/// var parser = JSONStreamParser(mode: .jsonLines) +/// let chunk1 = Data("{\"a\":1}\n{\"b\"".utf8) +/// let chunk2 = Data(":2}\n".utf8) +/// let values1 = try parser.parse(chunk1) // [{"a":1}] +/// let values2 = try parser.parse(chunk2) // [{"b":2}] +/// ``` +/// +/// ## JSON Array Mode +/// +/// The stream is expected to be a single JSON array (`[...]`). +/// Each array element is yielded individually. +/// +/// ```swift +/// var parser = JSONStreamParser(mode: .jsonArray) +/// let items = try parser.parse(Data("[1, 2, 3]".utf8)) +/// let remaining = try parser.finalize() +/// // items + remaining contain JSONValues for 1, 2, 3 +/// ``` +public struct JSONStreamParser: Sendable { + + /// The parsing mode. + public let mode: JSONStreamMode + + /// Options for reading JSON. + public let options: JSONReadOptions + + private var buffer: Data + private var readOffset: Int + private var arrayState: ArrayParseState + + /// The number of bytes buffered but not yet consumed. + public var pendingByteCount: Int { + buffer.count - readOffset + } + + /// Creates a new stream parser. + /// + /// - Parameters: + /// - mode: The stream format (`.jsonLines` or `.jsonArray`). + /// - options: Options for reading JSON. Note that `.stopWhenDone` is + /// always applied internally and does not need to be specified. + public init(mode: JSONStreamMode, options: JSONReadOptions = .default) { + self.mode = mode + self.options = options + self.buffer = Data() + self.readOffset = 0 + self.arrayState = .expectOpenBracket + } + + /// Feeds data to the parser and returns all complete JSON values found. + /// + /// - Parameter data: New data to append to the internal buffer. + /// - Returns: An array of fully-parsed ``JSONValue`` items. + /// - Throws: ``JSONError`` if malformed JSON is encountered. + public mutating func parse(_ data: Data) throws -> [JSONValue] { + buffer.append(data) + return try drainBuffer() + } + + /// Feeds raw bytes to the parser and returns all complete JSON values found. + /// + /// - Parameter bytes: A buffer pointer to the raw bytes. + /// - Returns: An array of fully-parsed ``JSONValue`` items. + /// - Throws: ``JSONError`` if malformed JSON is encountered. + public mutating func parse(bytes: UnsafeBufferPointer) throws -> [JSONValue] { + if let base = bytes.baseAddress, bytes.count > 0 { + buffer.append(base, count: bytes.count) + } + return try drainBuffer() + } + + /// Signals end-of-stream and returns any remaining JSON values. + /// + /// After calling this method, the parser is in a finished state. + /// Call ``reset()`` to reuse it. + /// + /// - Returns: An array of any remaining ``JSONValue`` items. + /// - Throws: ``JSONError`` if the remaining buffer contains incomplete JSON. + public mutating func finalize() throws -> [JSONValue] { + let results = try drainBuffer() + + skipWhitespace() + if readOffset < buffer.count { + if mode == .jsonArray { + throw JSONError.invalidJSON("Unexpected end of JSON array stream") + } else { + throw JSONError.invalidJSON("Incomplete JSON value at end of stream") + } + } + + if mode == .jsonArray && arrayState != .done && arrayState != .expectOpenBracket { + throw JSONError.invalidJSON("Unexpected end of JSON array stream") + } + + return results + } + + /// Resets the parser to its initial state, discarding all buffered data. + public mutating func reset() { + buffer.removeAll(keepingCapacity: true) + readOffset = 0 + arrayState = .expectOpenBracket + } + + // MARK: - Private Types + + private enum ArrayParseState: Sendable { + case expectOpenBracket + case expectElementOrClose + case expectCommaOrClose + case done + } + + // MARK: - Drain Logic + + private mutating func drainBuffer() throws -> [JSONValue] { + compactIfNeeded() + + switch mode { + case .jsonLines: + return try drainJSONLines() + case .jsonArray: + return try drainJSONArray() + } + } + + private mutating func drainJSONLines() throws -> [JSONValue] { + var results: [JSONValue] = [] + + while true { + skipWhitespace() + guard buffer.count - readOffset > 0 else { break } + + guard let value = try parseOneValue() else { break } + results.append(value) + } + + return results + } + + private mutating func drainJSONArray() throws -> [JSONValue] { + var results: [JSONValue] = [] + + loop: while true { + skipWhitespace() + + switch arrayState { + case .expectOpenBracket: + guard readOffset < buffer.count else { break loop } + let byte = buffer[buffer.startIndex + readOffset] + guard byte == UInt8(ascii: "[") else { + throw JSONError.invalidJSON("Expected '[' at start of JSON array stream") + } + readOffset += 1 + arrayState = .expectElementOrClose + + case .expectElementOrClose: + skipWhitespace() + guard readOffset < buffer.count else { break loop } + let byte = buffer[buffer.startIndex + readOffset] + if byte == UInt8(ascii: "]") { + readOffset += 1 + arrayState = .done + break loop + } + guard let value = try parseOneValue() else { break loop } + results.append(value) + arrayState = .expectCommaOrClose + + case .expectCommaOrClose: + skipWhitespace() + guard readOffset < buffer.count else { break loop } + let byte = buffer[buffer.startIndex + readOffset] + if byte == UInt8(ascii: ",") { + readOffset += 1 + arrayState = .expectElementOrClose + } else if byte == UInt8(ascii: "]") { + readOffset += 1 + arrayState = .done + break loop + } else { + throw JSONError.invalidJSON( + "Expected ',' or ']' in JSON array, got '\(Unicode.Scalar(byte))'" + ) + } + + case .done: + break loop + } + } + + return results + } + + // MARK: - Core Parse + + /// Tries to parse one JSON value starting at `readOffset`. + /// Returns `nil` if more data is needed. + private mutating func parseOneValue() throws -> JSONValue? { + let available = buffer.count - readOffset + guard available > 0 else { return nil } + + let paddingSize = Int(YYJSON_PADDING_SIZE) + + // Build a padded copy so yyjson has enough trailing zero bytes. + var padded = Data(count: available + paddingSize) + buffer.withUnsafeBytes { srcBuf in + padded.withUnsafeMutableBytes { dstBuf in + let src = srcBuf.baseAddress!.advanced(by: readOffset) + dstBuf.baseAddress!.copyMemory(from: src, byteCount: available) + } + } + + return try padded.withUnsafeBytes { padBuf -> JSONValue? in + let ptr = padBuf.baseAddress!.assumingMemoryBound(to: UInt8.self) + let result = try Document.streamParse( + bytes: ptr, count: available, options: options + ) + switch result { + case .success(let doc, let consumed): + guard let root = doc.root else { + throw JSONError.invalidData("Document has no root value") + } + readOffset += consumed + return JSONValue(value: root, document: doc) + case .needMoreData: + return nil + } + } + } + + // MARK: - Buffer Helpers + + private mutating func skipWhitespace() { + let startIdx = buffer.startIndex + while readOffset < buffer.count { + let byte = buffer[startIdx + readOffset] + guard byte == 0x20 || byte == 0x09 || byte == 0x0A || byte == 0x0D else { break } + readOffset += 1 + } + } + + private mutating func compactIfNeeded() { + guard readOffset > 0, readOffset > buffer.count / 2 else { return } + buffer.removeSubrange(buffer.startIndex ..< buffer.startIndex + readOffset) + readOffset = 0 + } +} + +// MARK: - JSONIncrementalReader + +/// An incremental reader for large JSON documents. +/// +/// Feed chunks of a single large JSON document with ``feed(_:)``. +/// Data is accumulated internally. Call ``finish()`` to parse the complete +/// document, or use ``feed(_:)`` which attempts a parse after each chunk. +/// +/// ```swift +/// let reader = try JSONIncrementalReader(data: firstChunk) +/// for try await chunk in stream { +/// if let doc = try reader.feed(chunk) { +/// print(doc.root?["key"]?.string ?? "") +/// break +/// } +/// } +/// ``` +/// +/// - Note: For a document already fully in memory, prefer +/// ``JSONDocument/init(data:options:)`` which is faster. +/// This type is for when data arrives in chunks over the network. +public final class JSONIncrementalReader: @unchecked Sendable { + + private var buffer: Data + private let options: JSONReadOptions + private var finished: Bool + + /// Creates a new incremental reader with initial data. + /// + /// - Parameters: + /// - data: The first chunk of JSON data. + /// - options: Options for reading JSON. + public init(data: Data, options: JSONReadOptions = .default) throws { + self.buffer = data + self.options = options + self.finished = false + } + + /// Feeds more data and attempts to parse the accumulated buffer. + /// + /// - Parameter data: Additional JSON data. + /// - Returns: A ``JSONDocument`` if the buffer contains a complete document, + /// or `nil` if more data is needed. + /// - Throws: ``JSONError`` for non-recoverable parse errors. + public func feed(_ data: Data) throws -> JSONDocument? { + guard !finished else { + throw JSONError.invalidJSON("Incremental reader already finished") + } + buffer.append(data) + return try attemptParse() + } + + /// Signals end-of-stream and returns the completed document. + /// + /// All accumulated data is parsed as a single JSON document. + /// + /// - Returns: The parsed ``JSONDocument``. + /// - Throws: ``JSONError`` if the document is incomplete or malformed. + public func finish() throws -> JSONDocument { + guard !finished else { + throw JSONError.invalidJSON("Incremental reader already finished") + } + finished = true + let doc = try Document(data: buffer, options: options) + return JSONDocument(_document: doc) + } + + // MARK: - Private + + private func attemptParse() throws -> JSONDocument? { + // Try parsing the accumulated data. If it's complete, return the doc. + // If incomplete, return nil to request more data. + do { + let doc = try Document(data: buffer, options: options) + finished = true + return JSONDocument(_document: doc) + } catch let error as JSONError { + // If the error indicates incomplete data, we need more + if error.message.contains("unexpected end") + || error.message.contains("Unexpected end") + || error.message.contains("Empty content") { + return nil + } + throw error + } + } +} diff --git a/Sources/ReerJSON/Value.swift b/Sources/ReerJSON/Value.swift index 6305394..f5e4558 100644 --- a/Sources/ReerJSON/Value.swift +++ b/Sources/ReerJSON/Value.swift @@ -105,6 +105,65 @@ internal final class Document: @unchecked Sendable { self.doc = doc } + /// Creates a document by parsing bytes with `STOP_WHEN_DONE`, reporting consumed byte count. + /// + /// - Parameters: + /// - bytes: Pointer to the JSON bytes (must have `YYJSON_PADDING_SIZE` padding). + /// - count: Number of valid bytes (excluding padding). + /// - options: Options for reading the JSON. + /// - consumedBytes: On success, set to the number of input bytes consumed. + /// - Throws: `JSONError` if parsing fails. + /// Result of a streaming parse attempt. + enum StreamParseResult { + case success(Document, consumedBytes: Int) + case needMoreData + } + + /// Attempts to parse one JSON value from bytes with `STOP_WHEN_DONE`. + /// + /// - Parameters: + /// - bytes: Pointer to the JSON bytes (must have `YYJSON_PADDING_SIZE` padding). + /// - count: Number of valid bytes (excluding padding). + /// - options: Options for reading the JSON. + /// - Returns: `.success` with consumed byte count, or `.needMoreData` if incomplete. + /// - Throws: `JSONError` for non-recoverable parse errors. + static func streamParse( + bytes: UnsafePointer, count: Int, + options: JSONReadOptions + ) throws -> StreamParseResult { + guard count > 0 else { return .needMoreData } + + var error = yyjson_read_err() + var flags = options.yyjsonFlags + flags |= YYJSON_READ_STOP_WHEN_DONE + flags &= ~yyjson_read_flag(YYJSON_READ_INSITU) + + let ptr = UnsafeMutablePointer( + mutating: UnsafeRawPointer(bytes).assumingMemoryBound(to: CChar.self) + ) + let result = yyjson_read_opts(ptr, count, flags, nil, &error) + + if let doc = result { + let consumed = yyjson_doc_get_read_size(doc) + let document = Document(alreadyParsed: doc) + return .success(document, consumedBytes: consumed) + } + + // Incomplete data — need more + if error.code == YYJSON_READ_ERROR_UNEXPECTED_END + || error.code == YYJSON_READ_ERROR_EMPTY_CONTENT { + return .needMoreData + } + + throw JSONError(parsing: error) + } + + /// Adopts an already-parsed yyjson_doc, taking ownership. + init(alreadyParsed doc: UnsafeMutablePointer) { + self.doc = doc + self.retainedData = nil + } + deinit { yyjson_doc_free(doc) } @@ -149,6 +208,11 @@ internal final class Document: @unchecked Sendable { public struct JSONDocument: ~Copyable, @unchecked Sendable { internal let _document: Document + /// Creates a document from a pre-parsed internal document. + internal init(_document: Document) { + self._document = _document + } + /// Creates a document by parsing JSON data. /// /// - Parameters: diff --git a/Tests/ReerJSONTests/StreamParserTests.swift b/Tests/ReerJSONTests/StreamParserTests.swift new file mode 100644 index 0000000..ecfb86b --- /dev/null +++ b/Tests/ReerJSONTests/StreamParserTests.swift @@ -0,0 +1,491 @@ +// +// Copyright © 2026 reers. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +import XCTest +@testable import ReerJSON + +// MARK: - Test Helpers + +private struct Item: Codable, Equatable, Sendable { + let id: Int + let name: String +} + +// MARK: - JSON Lines Tests + +final class JSONStreamParserJSONLinesTests: XCTestCase { + + func testSingleCompleteChunk() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let data = Data("{\"id\":1,\"name\":\"a\"}\n{\"id\":2,\"name\":\"b\"}\n".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + XCTAssertEqual(values[0]["id"]?.int64, 1) + XCTAssertEqual(values[1]["name"]?.string, "b") + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testCrossChunkValues() throws { + var parser = JSONStreamParser(mode: .jsonLines) + + let chunk1 = Data("{\"id\":1}\n{\"id\"".utf8) + let values1 = try parser.parse(chunk1) + XCTAssertEqual(values1.count, 1) + XCTAssertEqual(values1[0]["id"]?.int64, 1) + + let chunk2 = Data(":2}\n".utf8) + let values2 = try parser.parse(chunk2) + XCTAssertEqual(values2.count, 1) + XCTAssertEqual(values2[0]["id"]?.int64, 2) + + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testEmptyLines() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let data = Data("\n\n{\"x\":1}\n\n\n{\"x\":2}\n\n".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testWhitespacePadding() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let data = Data(" {\"a\":1} \n {\"a\":2} ".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testVariousTypes() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let data = Data("42\n\"hello\"\ntrue\nnull\n[1,2]\n{\"k\":\"v\"}\n".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 6) + XCTAssertEqual(values[0].int64, 42) + XCTAssertEqual(values[1].string, "hello") + XCTAssertEqual(values[2].bool, true) + XCTAssertTrue(values[3].isNull) + XCTAssertEqual(values[4].array?.count, 2) + XCTAssertEqual(values[5]["k"]?.string, "v") + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testEmptyDataParse() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let values = try parser.parse(Data()) + XCTAssertTrue(values.isEmpty) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testIncompleteJSONAtFinalize() throws { + var parser = JSONStreamParser(mode: .jsonLines) + _ = try parser.parse(Data("{\"id\":1}".utf8)) + _ = try parser.parse(Data("{\"incomplete".utf8)) + XCTAssertThrowsError(try parser.finalize()) + } +} + +// MARK: - JSON Array Tests + +final class JSONStreamParserJSONArrayTests: XCTestCase { + + func testNormalArray() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[1, 2, 3]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 3) + XCTAssertEqual(values[0].int64, 1) + XCTAssertEqual(values[1].int64, 2) + XCTAssertEqual(values[2].int64, 3) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testNestedObjects() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[{\"a\":1},{\"b\":[2,3]},{\"c\":{\"d\":4}}]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 3) + XCTAssertEqual(values[0]["a"]?.int64, 1) + XCTAssertEqual(values[1]["b"]?.array?.count, 2) + XCTAssertEqual(values[2]["c"]?["d"]?.int64, 4) + } + + func testNestedArrays() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[[1,2],[3,[4,5]]]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + XCTAssertEqual(values[0].array?.count, 2) + } + + func testEmptyArray() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[]".utf8) + let values = try parser.parse(data) + XCTAssertTrue(values.isEmpty) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testTrailingCommaWithOption() throws { + var parser = JSONStreamParser(mode: .jsonArray, options: .allowTrailingCommas) + let data = Data("[1, 2, 3,]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 3) + } + + func testCrossChunkArray() throws { + var parser = JSONStreamParser(mode: .jsonArray) + + let chunk1 = Data("[{\"id\":1},".utf8) + let values1 = try parser.parse(chunk1) + XCTAssertEqual(values1.count, 1) + XCTAssertEqual(values1[0]["id"]?.int64, 1) + + let chunk2 = Data("{\"id\":2}]".utf8) + let values2 = try parser.parse(chunk2) + XCTAssertEqual(values2.count, 1) + XCTAssertEqual(values2[0]["id"]?.int64, 2) + + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testArrayWithWhitespace() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data(" [ 1 , 2 , 3 ] ".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 3) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testMissingOpenBracket() throws { + var parser = JSONStreamParser(mode: .jsonArray) + XCTAssertThrowsError(try parser.parse(Data("1, 2, 3]".utf8))) + } + + func testIncompleteArray() throws { + var parser = JSONStreamParser(mode: .jsonArray) + _ = try parser.parse(Data("[1, 2".utf8)) + XCTAssertThrowsError(try parser.finalize()) + } + + func testStringElements() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[\"hello\", \"world\"]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 2) + XCTAssertEqual(values[0].string, "hello") + XCTAssertEqual(values[1].string, "world") + } + + func testMixedTypes() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let data = Data("[1, \"two\", true, null, {\"k\":\"v\"}, [3]]".utf8) + let values = try parser.parse(data) + XCTAssertEqual(values.count, 6) + XCTAssertEqual(values[0].int64, 1) + XCTAssertEqual(values[1].string, "two") + XCTAssertEqual(values[2].bool, true) + XCTAssertTrue(values[3].isNull) + } +} + +// MARK: - Incremental Reader Tests + +final class JSONIncrementalReaderTests: XCTestCase { + + func testSingleChunk() throws { + let reader = try JSONIncrementalReader(data: Data("{\"key\":\"value\"}".utf8)) + let doc = try reader.finish() + XCTAssertEqual(doc.root?["key"]?.string, "value") + } + + func testMultipleChunks() throws { + let reader = try JSONIncrementalReader(data: Data("{\"ke".utf8)) + // First feed should need more data + do { + if let doc = try reader.feed(Data("y\":\"val".utf8)) { + XCTFail("Should need more data, got doc with root: \(String(describing: doc.root))") + } + } + // Second feed should complete + if let doc = try reader.feed(Data("ue\"}".utf8)) { + XCTAssertEqual(doc.root?["key"]?.string, "value") + } else { + XCTFail("Should have completed parsing") + } + } + + func testLargerDocument() throws { + var items: [[String: Any]] = [] + for i in 0..<100 { + items.append(["id": i, "name": "item_\(i)"]) + } + let jsonData = try JSONSerialization.data(withJSONObject: items) + + let chunkSize = 64 + let firstChunk = Data(jsonData[0.. 0) + } + + func testVeryLargeObject() throws { + var parser = JSONStreamParser(mode: .jsonLines) + var json = "{\"data\":\"" + for _ in 0..<10_000 { + json += "x" + } + json += "\"}\n" + let values = try parser.parse(Data(json.utf8)) + XCTAssertEqual(values.count, 1) + } + + func testArrayResetAndReuse() throws { + var parser = JSONStreamParser(mode: .jsonArray) + _ = try parser.parse(Data("[1,2]".utf8)) + _ = try parser.finalize() + + parser.reset() + let values = try parser.parse(Data("[3,4]".utf8)) + XCTAssertEqual(values.count, 2) + XCTAssertEqual(values[0].int64, 3) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } +} + +// MARK: - Codable Decoder Tests + +final class StreamingDecoderTests: XCTestCase { + + func testJSONLinesDecoder() throws { + var decoder = StreamingJSONLinesDecoder(Item.self) + let data = Data("{\"id\":1,\"name\":\"a\"}\n{\"id\":2,\"name\":\"b\"}\n".utf8) + let items = try decoder.parseBuffer(data) + XCTAssertEqual(items, [Item(id: 1, name: "a"), Item(id: 2, name: "b")]) + let remaining = try decoder.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testJSONLinesDecoderCrossChunk() throws { + var decoder = StreamingJSONLinesDecoder(Item.self) + let items1 = try decoder.parseBuffer(Data("{\"id\":1,\"name\":\"a\"}\n{\"id\"".utf8)) + XCTAssertEqual(items1, [Item(id: 1, name: "a")]) + let items2 = try decoder.parseBuffer(Data(":2,\"name\":\"b\"}\n".utf8)) + XCTAssertEqual(items2, [Item(id: 2, name: "b")]) + let remaining = try decoder.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testJSONArrayDecoder() throws { + var decoder = StreamingJSONArrayDecoder(Item.self) + let data = Data("[{\"id\":1,\"name\":\"a\"},{\"id\":2,\"name\":\"b\"}]".utf8) + let items = try decoder.parseBuffer(data) + XCTAssertEqual(items, [Item(id: 1, name: "a"), Item(id: 2, name: "b")]) + let remaining = try decoder.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testJSONArrayDecoderCrossChunk() throws { + var decoder = StreamingJSONArrayDecoder(Item.self) + let items1 = try decoder.parseBuffer(Data("[{\"id\":1,\"name\":\"a\"},".utf8)) + XCTAssertEqual(items1, [Item(id: 1, name: "a")]) + let items2 = try decoder.parseBuffer(Data("{\"id\":2,\"name\":\"b\"}]".utf8)) + XCTAssertEqual(items2, [Item(id: 2, name: "b")]) + let remaining = try decoder.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testDecoderReset() throws { + var decoder = StreamingJSONLinesDecoder(Item.self) + _ = try decoder.parseBuffer(Data("{\"id\":1,\"name\":\"a\"}\n".utf8)) + decoder.reset() + let items = try decoder.parseBuffer(Data("{\"id\":2,\"name\":\"b\"}\n".utf8)) + XCTAssertEqual(items, [Item(id: 2, name: "b")]) + } +} + +// MARK: - AsyncSequence Tests + +@available(macOS 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *) +final class AsyncStreamTests: XCTestCase { + + func testJSONValueStream() async throws { + let chunks: [Data] = [ + Data("{\"id\":1}\n{\"id\"".utf8), + Data(":2}\n{\"id\":3}\n".utf8) + ] + let stream = AsyncStream { continuation in + for chunk in chunks { + continuation.yield(chunk) + } + continuation.finish() + } + + var values: [JSONValue] = [] + for try await value in stream.jsonValues(mode: .jsonLines) { + values.append(value) + } + XCTAssertEqual(values.count, 3) + XCTAssertEqual(values[0]["id"]?.int64, 1) + XCTAssertEqual(values[1]["id"]?.int64, 2) + XCTAssertEqual(values[2]["id"]?.int64, 3) + } + + func testDecodingStream() async throws { + let chunks: [Data] = [ + Data("{\"id\":1,\"name\":\"a\"}\n".utf8), + Data("{\"id\":2,\"name\":\"b\"}\n".utf8) + ] + let stream = AsyncStream { continuation in + for chunk in chunks { + continuation.yield(chunk) + } + continuation.finish() + } + + var items: [Item] = [] + for try await item in stream.decode(Item.self, mode: .jsonLines) { + items.append(item) + } + XCTAssertEqual(items, [Item(id: 1, name: "a"), Item(id: 2, name: "b")]) + } + + func testJSONArrayValueStream() async throws { + let chunks: [Data] = [ + Data("[{\"id\":1,\"name\":\"a\"},".utf8), + Data("{\"id\":2,\"name\":\"b\"}]".utf8) + ] + let stream = AsyncStream { continuation in + for chunk in chunks { + continuation.yield(chunk) + } + continuation.finish() + } + + var values: [JSONValue] = [] + for try await value in stream.jsonValues(mode: .jsonArray) { + values.append(value) + } + XCTAssertEqual(values.count, 2) + } + + func testDecodingStreamArrayMode() async throws { + let chunks: [Data] = [ + Data("[{\"id\":1,\"name\":\"x\"},".utf8), + Data("{\"id\":2,\"name\":\"y\"}]".utf8) + ] + let stream = AsyncStream { continuation in + for chunk in chunks { + continuation.yield(chunk) + } + continuation.finish() + } + + var items: [Item] = [] + for try await item in stream.decode(Item.self, mode: .jsonArray) { + items.append(item) + } + XCTAssertEqual(items, [Item(id: 1, name: "x"), Item(id: 2, name: "y")]) + } + + func testEmptyStream() async throws { + let stream = AsyncStream { continuation in + continuation.finish() + } + + var count = 0 + for try await _ in stream.jsonValues(mode: .jsonLines) { + count += 1 + } + XCTAssertEqual(count, 0) + } +} From 1f65dae1f8bd1df53cf4909ccf490c28d36be6ae Mon Sep 17 00:00:00 2001 From: phoenix Date: Sat, 9 May 2026 01:16:56 +0800 Subject: [PATCH 2/4] fix: tighten streaming array parsing Reject malformed array streams consistently and avoid copying the unread buffer for each parsed value. Co-authored-by: Cursor --- Sources/ReerJSON/StreamParser.swift | 50 ++++++++++++++++----- Tests/ReerJSONTests/StreamParserTests.swift | 21 +++++++++ 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/Sources/ReerJSON/StreamParser.swift b/Sources/ReerJSON/StreamParser.swift index 2fbbf74..7201c7b 100644 --- a/Sources/ReerJSON/StreamParser.swift +++ b/Sources/ReerJSON/StreamParser.swift @@ -138,7 +138,7 @@ public struct JSONStreamParser: Sendable { } } - if mode == .jsonArray && arrayState != .done && arrayState != .expectOpenBracket { + if mode == .jsonArray && arrayState != .done { throw JSONError.invalidJSON("Unexpected end of JSON array stream") } @@ -157,6 +157,7 @@ public struct JSONStreamParser: Sendable { private enum ArrayParseState: Sendable { case expectOpenBracket case expectElementOrClose + case expectElementAfterComma case expectCommaOrClose case done } @@ -217,13 +218,29 @@ public struct JSONStreamParser: Sendable { results.append(value) arrayState = .expectCommaOrClose + case .expectElementAfterComma: + skipWhitespace() + guard readOffset < buffer.count else { break loop } + let byte = buffer[buffer.startIndex + readOffset] + if byte == UInt8(ascii: "]") { + guard allowsTrailingCommas else { + throw JSONError.invalidJSON("Trailing comma is not allowed in JSON array stream") + } + readOffset += 1 + arrayState = .done + break loop + } + guard let value = try parseOneValue() else { break loop } + results.append(value) + arrayState = .expectCommaOrClose + case .expectCommaOrClose: skipWhitespace() guard readOffset < buffer.count else { break loop } let byte = buffer[buffer.startIndex + readOffset] if byte == UInt8(ascii: ",") { readOffset += 1 - arrayState = .expectElementOrClose + arrayState = .expectElementAfterComma } else if byte == UInt8(ascii: "]") { readOffset += 1 arrayState = .done @@ -239,6 +256,10 @@ public struct JSONStreamParser: Sendable { } } + if arrayState == .done { + try validateNoTrailingArrayContent() + } + return results } @@ -251,18 +272,13 @@ public struct JSONStreamParser: Sendable { guard available > 0 else { return nil } let paddingSize = Int(YYJSON_PADDING_SIZE) - - // Build a padded copy so yyjson has enough trailing zero bytes. - var padded = Data(count: available + paddingSize) - buffer.withUnsafeBytes { srcBuf in - padded.withUnsafeMutableBytes { dstBuf in - let src = srcBuf.baseAddress!.advanced(by: readOffset) - dstBuf.baseAddress!.copyMemory(from: src, byteCount: available) - } + buffer.append(contentsOf: repeatElement(0 as UInt8, count: paddingSize)) + defer { + buffer.removeLast(paddingSize) } - return try padded.withUnsafeBytes { padBuf -> JSONValue? in - let ptr = padBuf.baseAddress!.assumingMemoryBound(to: UInt8.self) + return try buffer.withUnsafeBytes { buf -> JSONValue? in + let ptr = buf.baseAddress!.advanced(by: readOffset).assumingMemoryBound(to: UInt8.self) let result = try Document.streamParse( bytes: ptr, count: available, options: options ) @@ -281,6 +297,10 @@ public struct JSONStreamParser: Sendable { // MARK: - Buffer Helpers + private var allowsTrailingCommas: Bool { + options.contains(.allowTrailingCommas) || options.contains(.json5) + } + private mutating func skipWhitespace() { let startIdx = buffer.startIndex while readOffset < buffer.count { @@ -295,6 +315,12 @@ public struct JSONStreamParser: Sendable { buffer.removeSubrange(buffer.startIndex ..< buffer.startIndex + readOffset) readOffset = 0 } + + private mutating func validateNoTrailingArrayContent() throws { + skipWhitespace() + guard readOffset < buffer.count else { return } + throw JSONError.invalidJSON("Unexpected content after JSON array stream") + } } // MARK: - JSONIncrementalReader diff --git a/Tests/ReerJSONTests/StreamParserTests.swift b/Tests/ReerJSONTests/StreamParserTests.swift index ecfb86b..5cccb99 100644 --- a/Tests/ReerJSONTests/StreamParserTests.swift +++ b/Tests/ReerJSONTests/StreamParserTests.swift @@ -160,6 +160,17 @@ final class JSONStreamParserJSONArrayTests: XCTestCase { XCTAssertEqual(values.count, 3) } + func testTrailingCommaWithoutOptionThrows() throws { + var parser = JSONStreamParser(mode: .jsonArray) + XCTAssertThrowsError(try parser.parse(Data("[1, 2, 3,]".utf8))) + } + + func testTrailingCommaWithJSON5Option() throws { + var parser = JSONStreamParser(mode: .jsonArray, options: .json5) + let values = try parser.parse(Data("[1, 2, 3,]".utf8)) + XCTAssertEqual(values.map(\.int64), [1, 2, 3]) + } + func testCrossChunkArray() throws { var parser = JSONStreamParser(mode: .jsonArray) @@ -197,6 +208,16 @@ final class JSONStreamParserJSONArrayTests: XCTestCase { XCTAssertThrowsError(try parser.finalize()) } + func testEmptyArrayStreamAtFinalizeThrows() throws { + var parser = JSONStreamParser(mode: .jsonArray) + XCTAssertThrowsError(try parser.finalize()) + } + + func testTrailingContentAfterArrayThrows() throws { + var parser = JSONStreamParser(mode: .jsonArray) + XCTAssertThrowsError(try parser.parse(Data("[1] 2".utf8))) + } + func testStringElements() throws { var parser = JSONStreamParser(mode: .jsonArray) let data = Data("[\"hello\", \"world\"]".utf8) From 1b769b95fc12133bf3e346e10514c68df05b2f87 Mon Sep 17 00:00:00 2001 From: phoenix Date: Sat, 23 May 2026 00:46:14 +0800 Subject: [PATCH 3/4] fix(streaming): correctness, perf, and thread-safety fixes after review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Several issues found while reviewing the streaming JSON parsing API: * Numeric cross-chunk splitting was silently truncating values. yyjson with STOP_WHEN_DONE will happily parse "1" out of a buffer whose true content is "12345" — there is no way for the parser to tell from within yyjson that the number could be extended. The stream parser now defers any value whose parse ends exactly at the current buffer end and only commits it once the next chunk arrives (or finalize() confirms it's the last token). Strings, objects, arrays, and literals continue to work as before since yyjson detects truncation for those itself. * JSONIncrementalReader used substring matching on the error message (e.g. error.message.contains("Unexpected end")) to detect "need more data". This was fragile and could misclassify a real syntactic error whose human-readable message coincidentally contained those words. JSONError now carries the yyjson read error code and the reader switches on the code directly. * JSONIncrementalReader was declared @unchecked Sendable but was not thread-safe — feed/finish mutated state without synchronization. Added an internal LockedState so concurrent feed/finish calls are serialized; added a concurrent-feed test. * StreamingJSONLinesDecoder / StreamingJSONArrayDecoder were serializing each parsed JSONValue back to JSON text via .data() and then re-parsing it through ReerJSONDecoder, i.e. parsing each value three times. JSONStreamParser now also exposes an internal byte-slice API (parseSlices / finalizeSlices) that the streaming decoders use directly, eliminating the round-trip. * parseOneValue was appending YYJSON_PADDING_SIZE zero bytes to the buffer and then removing them on every value. yyjson_read_opts in non-INSITU mode allocates its own padded buffer internally, so this was unnecessary churn — removed. * JSONValueByteStream was building per-chunk Data by appending one byte at a time inside withUnsafeMutableBytes (which couldn't even cross await boundaries). Rewritten to read into a [UInt8] and construct the Data once. * Clarified docs: documented the cross-chunk numeric deferral rule, the per-feed parse cost of JSONIncrementalReader, and the thread-safety guarantees. Removed the redundant options.contains(.json5) check on top of .allowTrailingCommas (the former includes the latter) and kept the OR as a clarity comment. 13 new tests cover number/float boundary splitting in both modes, finalize() flushing of values without trailing newline, structural errors not being mistaken for needMore, and concurrent access. All 768 tests pass (719 pre-existing + 49 streaming). Co-authored-by: Cursor --- Sources/ReerJSON/Error.swift | 14 +- Sources/ReerJSON/StreamDecoder.swift | 138 +++++---- Sources/ReerJSON/StreamParser.swift | 307 +++++++++++++------- Sources/ReerJSON/Value.swift | 75 ++++- Tests/ReerJSONTests/StreamParserTests.swift | 142 +++++++++ 5 files changed, 477 insertions(+), 199 deletions(-) diff --git a/Sources/ReerJSON/Error.swift b/Sources/ReerJSON/Error.swift index 827e245..38d9d75 100644 --- a/Sources/ReerJSON/Error.swift +++ b/Sources/ReerJSON/Error.swift @@ -51,6 +51,15 @@ public struct JSONError: Error, Equatable, Sendable, CustomStringConvertible { /// The coding path where the error occurred (for decoding errors). public let path: String + /// The underlying yyjson read error code, when this error originates from + /// the JSON parser. Zero (`YYJSON_READ_SUCCESS`) when not applicable. + /// + /// This is primarily used by the streaming APIs to distinguish recoverable + /// "need more data" conditions (`YYJSON_READ_ERROR_UNEXPECTED_END`, + /// `YYJSON_READ_ERROR_EMPTY_CONTENT`, `YYJSON_READ_ERROR_MORE`) from + /// non-recoverable parse errors. + internal let readErrorCode: UInt32 + public var description: String { if path.isEmpty { return message @@ -58,10 +67,11 @@ public struct JSONError: Error, Equatable, Sendable, CustomStringConvertible { return "\(message) (at \(path))" } - private init(kind: Kind, message: String, path: String = "") { + private init(kind: Kind, message: String, path: String = "", readErrorCode: UInt32 = 0) { self.kind = kind self.message = message self.path = path + self.readErrorCode = readErrorCode } // MARK: - Public Factory Methods @@ -144,6 +154,7 @@ public struct JSONError: Error, Equatable, Sendable, CustomStringConvertible { self.kind = .invalidJSON self.message = message self.path = "" + self.readErrorCode = UInt32(error.code) } /// Create an error from a yyjson write error. @@ -168,5 +179,6 @@ public struct JSONError: Error, Equatable, Sendable, CustomStringConvertible { self.kind = .writeError self.message = message self.path = "" + self.readErrorCode = 0 } } diff --git a/Sources/ReerJSON/StreamDecoder.swift b/Sources/ReerJSON/StreamDecoder.swift index 40a1256..e9b3b8f 100644 --- a/Sources/ReerJSON/StreamDecoder.swift +++ b/Sources/ReerJSON/StreamDecoder.swift @@ -27,6 +27,10 @@ import Foundation /// /// Each top-level JSON value in the stream is decoded into `T`. /// +/// Internally this passes raw byte slices from the underlying +/// ``JSONStreamParser`` straight to ``ReerJSONDecoder/decode(_:from:)``, +/// avoiding any intermediate `JSONValue` serialization round-trip. +/// /// ```swift /// var decoder = StreamingJSONLinesDecoder(Item.self) /// let items1 = try decoder.parseBuffer(chunk1) @@ -36,7 +40,6 @@ import Foundation public struct StreamingJSONLinesDecoder: @unchecked Sendable { private var parser: JSONStreamParser - // ReerJSONDecoder uses internal locking, safe to share. private let decoder: ReerJSONDecoder private let type: T.Type @@ -58,34 +61,30 @@ public struct StreamingJSONLinesDecoder: @unchecked Sen } /// Feeds data to the decoder and returns all decoded values. - /// - /// - Parameter data: New data to append. - /// - Returns: An array of decoded `T` values. - /// - Throws: ``JSONError`` or `DecodingError` on failure. public mutating func parseBuffer(_ data: Data) throws -> [T] { - let values = try parser.parse(data) - return try values.map { value in - let data = try value.data() - return try decoder.decode(type, from: data) - } + let slices = try parser.parseSlices(data) + return try decodeAll(slices) } /// Signals end-of-stream and returns any remaining decoded values. - /// - /// - Returns: An array of remaining decoded `T` values. - /// - Throws: ``JSONError`` or `DecodingError` on failure. public mutating func finalize() throws -> [T] { - let values = try parser.finalize() - return try values.map { value in - let data = try value.data() - return try decoder.decode(type, from: data) - } + let slices = try parser.finalizeSlices() + return try decodeAll(slices) } /// Resets the decoder to its initial state. public mutating func reset() { parser.reset() } + + private func decodeAll(_ slices: [Data]) throws -> [T] { + var out: [T] = [] + out.reserveCapacity(slices.count) + for slice in slices { + out.append(try decoder.decode(type, from: slice)) + } + return out + } } // MARK: - StreamingJSONArrayDecoder @@ -104,17 +103,9 @@ public struct StreamingJSONLinesDecoder: @unchecked Sen public struct StreamingJSONArrayDecoder: @unchecked Sendable { private var parser: JSONStreamParser - // ReerJSONDecoder uses internal locking, safe to share. private let decoder: ReerJSONDecoder private let type: T.Type - /// Creates a new JSON array streaming decoder. - /// - /// - Parameters: - /// - type: The `Decodable` type to decode each element into. - /// - options: Options for reading JSON. - /// - decoder: An optional ``ReerJSONDecoder`` with custom strategies. - /// If `nil`, a default decoder is used. public init( _ type: T.Type, options: JSONReadOptions = .default, @@ -125,35 +116,28 @@ public struct StreamingJSONArrayDecoder: @unchecked Sen self.decoder = decoder ?? ReerJSONDecoder() } - /// Feeds data to the decoder and returns all decoded elements. - /// - /// - Parameter data: New data to append. - /// - Returns: An array of decoded `T` values. - /// - Throws: ``JSONError`` or `DecodingError` on failure. public mutating func parseBuffer(_ data: Data) throws -> [T] { - let values = try parser.parse(data) - return try values.map { value in - let data = try value.data() - return try decoder.decode(type, from: data) - } + let slices = try parser.parseSlices(data) + return try decodeAll(slices) } - /// Signals end-of-stream and returns any remaining decoded elements. - /// - /// - Returns: An array of remaining decoded `T` values. - /// - Throws: ``JSONError`` or `DecodingError` on failure. public mutating func finalize() throws -> [T] { - let values = try parser.finalize() - return try values.map { value in - let data = try value.data() - return try decoder.decode(type, from: data) - } + let slices = try parser.finalizeSlices() + return try decodeAll(slices) } - /// Resets the decoder to its initial state. public mutating func reset() { parser.reset() } + + private func decodeAll(_ slices: [Data]) throws -> [T] { + var out: [T] = [] + out.reserveCapacity(slices.count) + for slice in slices { + out.append(try decoder.decode(type, from: slice)) + } + return out + } } // MARK: - AsyncSequence Adapters @@ -274,16 +258,7 @@ where Source.Element == UInt8 { return nil } - var chunk = Data() - chunk.reserveCapacity(chunkSize) - for _ in 0.. Data { + var scratch = [UInt8]() + scratch.reserveCapacity(chunkSize) + for _ in 0..: AsyncSequence, @unchecked Sendable @@ -366,28 +360,31 @@ where Source.Element == Data { guard let chunk = try await sourceIterator.next() else { sourceExhausted = true - let remaining = try parser.finalize() - if !remaining.isEmpty { - pending = try remaining.map { value in - let data = try value.data() - return try decoder.decode(type, from: data) - } + let slices = try parser.finalizeSlices() + if !slices.isEmpty { + pending = try decodeAll(slices) pendingIndex = 0 continue } return nil } - let values = try parser.parse(chunk) - if !values.isEmpty { - pending = try values.map { value in - let data = try value.data() - return try decoder.decode(type, from: data) - } + let slices = try parser.parseSlices(chunk) + if !slices.isEmpty { + pending = try decodeAll(slices) pendingIndex = 0 } } } + + private func decodeAll(_ slices: [Data]) throws -> [T] { + var out: [T] = [] + out.reserveCapacity(slices.count) + for slice in slices { + out.append(try decoder.decode(type, from: slice)) + } + return out + } } } @@ -402,7 +399,6 @@ extension AsyncSequence where Element == Data, Self: Sendable { /// - Parameters: /// - mode: The stream format (`.jsonLines` or `.jsonArray`). /// - options: Options for reading JSON. - /// - Returns: A ``JSONValueStream`` yielding parsed values. public func jsonValues( mode: JSONStreamMode = .jsonLines, options: JSONReadOptions = .default @@ -417,7 +413,6 @@ extension AsyncSequence where Element == Data, Self: Sendable { /// - mode: The stream format (`.jsonLines` or `.jsonArray`). /// - options: Options for reading JSON. /// - decoder: An optional ``ReerJSONDecoder``. If `nil`, uses a default decoder. - /// - Returns: A ``DecodingStream`` yielding decoded values. public func decode( _ type: T.Type, mode: JSONStreamMode = .jsonLines, @@ -445,7 +440,6 @@ extension AsyncSequence where Element == UInt8, Self: Sendable { /// - mode: The stream format (`.jsonLines` or `.jsonArray`). /// - options: Options for reading JSON. /// - chunkSize: Number of bytes to batch before parsing. Default is 4096. - /// - Returns: A ``JSONValueByteStream`` yielding parsed values. public func jsonValues( mode: JSONStreamMode = .jsonLines, options: JSONReadOptions = .default, diff --git a/Sources/ReerJSON/StreamParser.swift b/Sources/ReerJSON/StreamParser.swift index 7201c7b..ef4d1e4 100644 --- a/Sources/ReerJSON/StreamParser.swift +++ b/Sources/ReerJSON/StreamParser.swift @@ -26,8 +26,12 @@ import Foundation /// The parsing mode for a stream of JSON data. public enum JSONStreamMode: Sendable { - /// Each line is an independent JSON value (JSON Lines / NDJSON). + /// Each top-level JSON value is yielded individually (JSON Lines / NDJSON). + /// + /// Values may be separated by any JSON whitespace (space, tab, CR, LF); + /// strict NDJSON producers should separate values by a single `\n`. case jsonLines + /// The stream is a single JSON array whose elements are yielded one by one. case jsonArray } @@ -66,6 +70,19 @@ public enum JSONStreamMode: Sendable { /// let remaining = try parser.finalize() /// // items + remaining contain JSONValues for 1, 2, 3 /// ``` +/// +/// ## Boundaries between chunks +/// +/// JSON tokens that have an explicit terminator (strings, objects, arrays, +/// `true`/`false`/`null`) are always parsed reliably across chunk boundaries. +/// Bare numeric tokens (e.g. `123` or `1.5e10`) are inherently ambiguous when +/// a chunk ends exactly at the end of the number — the next chunk may or may +/// not continue the number. To stay correct, the parser conservatively defers +/// any value whose parse ends exactly at the buffer end; it will be yielded +/// after the next ``parse(_:)`` chunk arrives, or on ``finalize()``. +/// +/// In practice, properly-formed NDJSON terminates every value with a newline, +/// so this deferral is invisible to the caller. public struct JSONStreamParser: Sendable { /// The parsing mode. @@ -87,8 +104,8 @@ public struct JSONStreamParser: Sendable { /// /// - Parameters: /// - mode: The stream format (`.jsonLines` or `.jsonArray`). - /// - options: Options for reading JSON. Note that `.stopWhenDone` is - /// always applied internally and does not need to be specified. + /// - options: Options for reading JSON. The parser internally combines + /// this with `YYJSON_READ_STOP_WHEN_DONE` for boundary detection. public init(mode: JSONStreamMode, options: JSONReadOptions = .default) { self.mode = mode self.options = options @@ -97,51 +114,29 @@ public struct JSONStreamParser: Sendable { self.arrayState = .expectOpenBracket } + // MARK: - Public Parse API + /// Feeds data to the parser and returns all complete JSON values found. - /// - /// - Parameter data: New data to append to the internal buffer. - /// - Returns: An array of fully-parsed ``JSONValue`` items. - /// - Throws: ``JSONError`` if malformed JSON is encountered. public mutating func parse(_ data: Data) throws -> [JSONValue] { - buffer.append(data) - return try drainBuffer() + if !data.isEmpty { buffer.append(data) } + return try drainValues(finalizing: false) } /// Feeds raw bytes to the parser and returns all complete JSON values found. - /// - /// - Parameter bytes: A buffer pointer to the raw bytes. - /// - Returns: An array of fully-parsed ``JSONValue`` items. - /// - Throws: ``JSONError`` if malformed JSON is encountered. public mutating func parse(bytes: UnsafeBufferPointer) throws -> [JSONValue] { if let base = bytes.baseAddress, bytes.count > 0 { buffer.append(base, count: bytes.count) } - return try drainBuffer() + return try drainValues(finalizing: false) } /// Signals end-of-stream and returns any remaining JSON values. /// /// After calling this method, the parser is in a finished state. /// Call ``reset()`` to reuse it. - /// - /// - Returns: An array of any remaining ``JSONValue`` items. - /// - Throws: ``JSONError`` if the remaining buffer contains incomplete JSON. public mutating func finalize() throws -> [JSONValue] { - let results = try drainBuffer() - - skipWhitespace() - if readOffset < buffer.count { - if mode == .jsonArray { - throw JSONError.invalidJSON("Unexpected end of JSON array stream") - } else { - throw JSONError.invalidJSON("Incomplete JSON value at end of stream") - } - } - - if mode == .jsonArray && arrayState != .done { - throw JSONError.invalidJSON("Unexpected end of JSON array stream") - } - + let results = try drainValues(finalizing: true) + try validateAtEndOfStream() return results } @@ -152,6 +147,22 @@ public struct JSONStreamParser: Sendable { arrayState = .expectOpenBracket } + // MARK: - Internal Slice API (for streaming decoders) + + /// Feeds data and returns raw byte slices for each parsed JSON value, + /// without constructing intermediate `JSONValue` instances. Used by the + /// streaming decoders to avoid an extra serialize-and-reparse round-trip. + internal mutating func parseSlices(_ data: Data) throws -> [Data] { + if !data.isEmpty { buffer.append(data) } + return try drainSlices(finalizing: false) + } + + internal mutating func finalizeSlices() throws -> [Data] { + let results = try drainSlices(finalizing: true) + try validateAtEndOfStream() + return results + } + // MARK: - Private Types private enum ArrayParseState: Sendable { @@ -162,36 +173,72 @@ public struct JSONStreamParser: Sendable { case done } - // MARK: - Drain Logic + /// One parsed value's metadata. + private struct ParsedItem { + /// yyjson document owning the parsed value. + let document: Document + /// Byte range of this value in `buffer` (relative to `buffer.startIndex`). + let range: Range + } - private mutating func drainBuffer() throws -> [JSONValue] { + // MARK: - Drain entry points + + private mutating func drainValues(finalizing: Bool) throws -> [JSONValue] { compactIfNeeded() + let items = try drainItems(finalizing: finalizing) + var results: [JSONValue] = [] + results.reserveCapacity(items.count) + for item in items { + guard let root = item.document.root else { + throw JSONError.invalidData("Document has no root value") + } + results.append(JSONValue(value: root, document: item.document)) + } + return results + } + private mutating func drainSlices(finalizing: Bool) throws -> [Data] { + compactIfNeeded() + let items = try drainItems(finalizing: finalizing) + var slices: [Data] = [] + slices.reserveCapacity(items.count) + let start = buffer.startIndex + for item in items { + slices.append(buffer.subdata(in: (start + item.range.lowerBound)..<(start + item.range.upperBound))) + } + return slices + } + + private mutating func drainItems(finalizing: Bool) throws -> [ParsedItem] { + var items: [ParsedItem] = [] switch mode { case .jsonLines: - return try drainJSONLines() + try drainJSONLines(finalizing: finalizing) { items.append($0) } case .jsonArray: - return try drainJSONArray() + try drainJSONArray(finalizing: finalizing) { items.append($0) } } + return items } - private mutating func drainJSONLines() throws -> [JSONValue] { - var results: [JSONValue] = [] + // MARK: - Mode-specific drain + private mutating func drainJSONLines( + finalizing: Bool, + emit: (ParsedItem) throws -> Void + ) throws { while true { skipWhitespace() - guard buffer.count - readOffset > 0 else { break } + guard readOffset < buffer.count else { break } - guard let value = try parseOneValue() else { break } - results.append(value) + guard let item = try parseOneItem(finalizing: finalizing) else { break } + try emit(item) } - - return results } - private mutating func drainJSONArray() throws -> [JSONValue] { - var results: [JSONValue] = [] - + private mutating func drainJSONArray( + finalizing: Bool, + emit: (ParsedItem) throws -> Void + ) throws { loop: while true { skipWhitespace() @@ -214,8 +261,8 @@ public struct JSONStreamParser: Sendable { arrayState = .done break loop } - guard let value = try parseOneValue() else { break loop } - results.append(value) + guard let item = try parseOneItem(finalizing: finalizing) else { break loop } + try emit(item) arrayState = .expectCommaOrClose case .expectElementAfterComma: @@ -230,8 +277,8 @@ public struct JSONStreamParser: Sendable { arrayState = .done break loop } - guard let value = try parseOneValue() else { break loop } - results.append(value) + guard let item = try parseOneItem(finalizing: finalizing) else { break loop } + try emit(item) arrayState = .expectCommaOrClose case .expectCommaOrClose: @@ -246,8 +293,9 @@ public struct JSONStreamParser: Sendable { arrayState = .done break loop } else { + let char = Unicode.Scalar(byte) throw JSONError.invalidJSON( - "Expected ',' or ']' in JSON array, got '\(Unicode.Scalar(byte))'" + "Expected ',' or ']' in JSON array, got '\(char)'" ) } @@ -259,45 +307,54 @@ public struct JSONStreamParser: Sendable { if arrayState == .done { try validateNoTrailingArrayContent() } - - return results } // MARK: - Core Parse - /// Tries to parse one JSON value starting at `readOffset`. - /// Returns `nil` if more data is needed. - private mutating func parseOneValue() throws -> JSONValue? { + /// Attempts to parse one JSON value starting at `readOffset`. + /// + /// Returns `nil` when more data is required to confidently advance: + /// either yyjson reported incomplete input, or the parsed value ends + /// exactly at the buffer end and we are not finalizing (which would be + /// ambiguous for unterminated numeric tokens). + private mutating func parseOneItem(finalizing: Bool) throws -> ParsedItem? { let available = buffer.count - readOffset guard available > 0 else { return nil } - let paddingSize = Int(YYJSON_PADDING_SIZE) - buffer.append(contentsOf: repeatElement(0 as UInt8, count: paddingSize)) - defer { - buffer.removeLast(paddingSize) + // yyjson_read_opts() in non-INSITU mode allocates its own padded buffer + // and copies the input — so we do not need to add YYJSON_PADDING_SIZE + // bytes ourselves here. + let startOffset = readOffset + let result: Document.StreamParseResult = try buffer.withUnsafeBytes { buf in + guard let base = buf.baseAddress else { return .needMoreData } + let ptr = base.advanced(by: startOffset).assumingMemoryBound(to: UInt8.self) + return try Document.streamParse(bytes: ptr, count: available, options: options) } - return try buffer.withUnsafeBytes { buf -> JSONValue? in - let ptr = buf.baseAddress!.advanced(by: readOffset).assumingMemoryBound(to: UInt8.self) - let result = try Document.streamParse( - bytes: ptr, count: available, options: options - ) - switch result { - case .success(let doc, let consumed): - guard let root = doc.root else { - throw JSONError.invalidData("Document has no root value") - } - readOffset += consumed - return JSONValue(value: root, document: doc) - case .needMoreData: + switch result { + case .needMoreData: + return nil + case .success(let doc, let consumed): + let endOffset = startOffset + consumed + + // Boundary safety: + // A successful parse that ends exactly at the buffer end may be a + // truncated numeric token (e.g. we have "12" so far but the source + // is actually "123"). yyjson cannot tell the difference. Defer the + // value until either more data arrives or the caller finalizes. + if !finalizing && endOffset >= buffer.count { return nil } + readOffset = endOffset + return ParsedItem(document: doc, range: startOffset..() + private var parser: IncrementalParser + private var finished: Bool = false - /// Creates a new incremental reader with initial data. + /// The total number of buffered input bytes. + public var bufferedByteCount: Int { + lock.lock(); defer { lock.unlock() } + return parser.count + } + + /// Creates a new incremental reader. /// /// - Parameters: - /// - data: The first chunk of JSON data. + /// - data: An initial chunk of JSON data (may be empty). /// - options: Options for reading JSON. - public init(data: Data, options: JSONReadOptions = .default) throws { - self.buffer = data - self.options = options - self.finished = false + public init( + data: Data = Data(), + options: JSONReadOptions = .default + ) throws { + self.parser = IncrementalParser(initialData: data, options: options) } /// Feeds more data and attempts to parse the accumulated buffer. /// - /// - Parameter data: Additional JSON data. + /// - Parameter data: Additional JSON data (may be empty to retry). /// - Returns: A ``JSONDocument`` if the buffer contains a complete document, /// or `nil` if more data is needed. - /// - Throws: ``JSONError`` for non-recoverable parse errors. + /// - Throws: ``JSONError`` for non-recoverable parse errors, or if the + /// reader has already produced a complete document. public func feed(_ data: Data) throws -> JSONDocument? { + lock.lock(); defer { lock.unlock() } guard !finished else { throw JSONError.invalidJSON("Incremental reader already finished") } - buffer.append(data) - return try attemptParse() + parser.append(data) + switch try parser.read() { + case .success(let doc): + finished = true + return JSONDocument(_document: doc) + case .needMoreData: + return nil + } } /// Signals end-of-stream and returns the completed document. /// - /// All accumulated data is parsed as a single JSON document. - /// /// - Returns: The parsed ``JSONDocument``. /// - Throws: ``JSONError`` if the document is incomplete or malformed. public func finish() throws -> JSONDocument { + lock.lock(); defer { lock.unlock() } guard !finished else { throw JSONError.invalidJSON("Incremental reader already finished") } - finished = true - let doc = try Document(data: buffer, options: options) - return JSONDocument(_document: doc) - } - - // MARK: - Private - - private func attemptParse() throws -> JSONDocument? { - // Try parsing the accumulated data. If it's complete, return the doc. - // If incomplete, return nil to request more data. - do { - let doc = try Document(data: buffer, options: options) + switch try parser.read() { + case .success(let doc): finished = true return JSONDocument(_document: doc) - } catch let error as JSONError { - // If the error indicates incomplete data, we need more - if error.message.contains("unexpected end") - || error.message.contains("Unexpected end") - || error.message.contains("Empty content") { - return nil - } - throw error + case .needMoreData: + throw JSONError.invalidJSON("Incomplete JSON value at end of stream") } } } diff --git a/Sources/ReerJSON/Value.swift b/Sources/ReerJSON/Value.swift index f5e4558..3af6876 100644 --- a/Sources/ReerJSON/Value.swift +++ b/Sources/ReerJSON/Value.swift @@ -105,14 +105,6 @@ internal final class Document: @unchecked Sendable { self.doc = doc } - /// Creates a document by parsing bytes with `STOP_WHEN_DONE`, reporting consumed byte count. - /// - /// - Parameters: - /// - bytes: Pointer to the JSON bytes (must have `YYJSON_PADDING_SIZE` padding). - /// - count: Number of valid bytes (excluding padding). - /// - options: Options for reading the JSON. - /// - consumedBytes: On success, set to the number of input bytes consumed. - /// - Throws: `JSONError` if parsing fails. /// Result of a streaming parse attempt. enum StreamParseResult { case success(Document, consumedBytes: Int) @@ -121,9 +113,12 @@ internal final class Document: @unchecked Sendable { /// Attempts to parse one JSON value from bytes with `STOP_WHEN_DONE`. /// + /// The bytes do not need to be padded; for non-INSITU parsing yyjson + /// allocates and pads its own internal buffer. + /// /// - Parameters: - /// - bytes: Pointer to the JSON bytes (must have `YYJSON_PADDING_SIZE` padding). - /// - count: Number of valid bytes (excluding padding). + /// - bytes: Pointer to the JSON bytes. + /// - count: Number of valid bytes. /// - options: Options for reading the JSON. /// - Returns: `.success` with consumed byte count, or `.needMoreData` if incomplete. /// - Throws: `JSONError` for non-recoverable parse errors. @@ -145,11 +140,9 @@ internal final class Document: @unchecked Sendable { if let doc = result { let consumed = yyjson_doc_get_read_size(doc) - let document = Document(alreadyParsed: doc) - return .success(document, consumedBytes: consumed) + return .success(Document(alreadyParsed: doc), consumedBytes: consumed) } - // Incomplete data — need more if error.code == YYJSON_READ_ERROR_UNEXPECTED_END || error.code == YYJSON_READ_ERROR_EMPTY_CONTENT { return .needMoreData @@ -173,6 +166,62 @@ internal final class Document: @unchecked Sendable { } } +// MARK: - IncrementalParser (Internal) + +/// Internal accumulator used by ``JSONIncrementalReader``. +/// +/// We can't use yyjson's `yyjson_incr_*` API directly here because: +/// - Non-INSITU `yyjson_incr_new` takes a snapshot of the input buffer at the +/// time of the call (it `memcpy`s `buf_len` bytes into its own storage), so +/// later appends to our buffer are invisible to it; calling `yyjson_incr_read` +/// with a larger `len` fails with `INVALID_PARAMETER`. +/// - INSITU mode requires a stable, pre-sized buffer and modifies the input +/// in place for string unescaping; growing the buffer invalidates the state +/// and the in-place modifications make restart-from-scratch fragile. +/// +/// Instead we keep the data in a growable `Data` buffer and try a fresh +/// `STOP_WHEN_DONE` parse on each `read()`. This is `O(N²)` worst-case across +/// many small chunks, but is correct in all cases and is `O(N)` total when +/// data arrives in a small number of large chunks (the common case for +/// network streaming). +internal final class IncrementalParser { + private var buffer: Data + private let options: JSONReadOptions + + var count: Int { buffer.count } + + init(initialData: Data, options: JSONReadOptions) { + self.buffer = initialData + self.options = options + } + + func append(_ data: Data) { + guard !data.isEmpty else { return } + buffer.append(data) + } + + /// Result of an incremental read. + enum ReadResult { + case success(Document) + case needMoreData + } + + func read() throws -> ReadResult { + guard !buffer.isEmpty else { return .needMoreData } + + do { + let doc = try Document(data: buffer, options: options) + return .success(doc) + } catch let error as JSONError { + if error.readErrorCode == UInt32(YYJSON_READ_ERROR_UNEXPECTED_END) + || error.readErrorCode == UInt32(YYJSON_READ_ERROR_EMPTY_CONTENT) { + return .needMoreData + } + throw error + } + } +} + // MARK: - Document (Public) /// A parsed JSON document that owns the underlying memory. diff --git a/Tests/ReerJSONTests/StreamParserTests.swift b/Tests/ReerJSONTests/StreamParserTests.swift index 5cccb99..b59ab26 100644 --- a/Tests/ReerJSONTests/StreamParserTests.swift +++ b/Tests/ReerJSONTests/StreamParserTests.swift @@ -108,6 +108,64 @@ final class JSONStreamParserJSONLinesTests: XCTestCase { _ = try parser.parse(Data("{\"incomplete".utf8)) XCTAssertThrowsError(try parser.finalize()) } + + // MARK: - Boundary / cross-chunk correctness + + /// Regression: yyjson with STOP_WHEN_DONE will happily parse `1` from + /// a buffer that ends exactly at "1" — but the real input might be + /// "12345". The parser must defer such tokens until the next chunk or + /// `finalize()` confirms there is no continuation. + func testCrossChunkSplitNumber() throws { + var parser = JSONStreamParser(mode: .jsonLines) + // Send "12345\n" split as "1" / "234" / "5\n". + let v1 = try parser.parse(Data("1".utf8)) + XCTAssertTrue(v1.isEmpty, "Bare number at end of buffer must not be yielded yet") + let v2 = try parser.parse(Data("234".utf8)) + XCTAssertTrue(v2.isEmpty) + let v3 = try parser.parse(Data("5\n".utf8)) + XCTAssertEqual(v3.count, 1) + XCTAssertEqual(v3[0].int64, 12345) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testCrossChunkSplitFloat() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let v1 = try parser.parse(Data("1.2".utf8)) + XCTAssertTrue(v1.isEmpty) + let v2 = try parser.parse(Data("3e".utf8)) + XCTAssertTrue(v2.isEmpty) + let v3 = try parser.parse(Data("4\n".utf8)) + XCTAssertEqual(v3.count, 1) + XCTAssertEqual(v3[0].number, 1.23e4) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + /// Strings end with `"`, so yyjson can detect truncation directly. + /// Splitting them mid-token must still work. + func testCrossChunkSplitInsideString() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let v1 = try parser.parse(Data("\"hel".utf8)) + XCTAssertTrue(v1.isEmpty) + let v2 = try parser.parse(Data("lo\"\n".utf8)) + XCTAssertEqual(v2.count, 1) + XCTAssertEqual(v2[0].string, "hello") + } + + /// A value that ends exactly at the buffer boundary without a trailing + /// terminator (e.g. {"a":1} with no newline) should still surface on + /// finalize(). + func testFinalizeFlushesTrailingValueWithoutNewline() throws { + var parser = JSONStreamParser(mode: .jsonLines) + let v1 = try parser.parse(Data("{\"a\":1}".utf8)) + // May or may not be yielded depending on buffer-end heuristic; the + // important contract is that finalize() yields it. + let remaining = try parser.finalize() + let total = v1 + remaining + XCTAssertEqual(total.count, 1) + XCTAssertEqual(total[0]["a"]?.int64, 1) + } } // MARK: - JSON Array Tests @@ -218,6 +276,30 @@ final class JSONStreamParserJSONArrayTests: XCTestCase { XCTAssertThrowsError(try parser.parse(Data("[1] 2".utf8))) } + /// Regression for cross-chunk numeric splitting inside an array. + /// Without the buffer-end deferral, the parser would commit `1` from + /// `[1` and then choke on `2` when the second chunk arrives. + func testCrossChunkArraySplitNumber() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let v1 = try parser.parse(Data("[1".utf8)) + XCTAssertTrue(v1.isEmpty, "Bare number at buffer end must defer") + let v2 = try parser.parse(Data("23,456]".utf8)) + XCTAssertEqual(v2.map(\.int64), [123, 456]) + let remaining = try parser.finalize() + XCTAssertTrue(remaining.isEmpty) + } + + func testCrossChunkArrayTinyChunks() throws { + var parser = JSONStreamParser(mode: .jsonArray) + let chunks = ["[", "12", "3,", "45", "6", ",", "789", "]"] + var all: [JSONValue] = [] + for chunk in chunks { + all += try parser.parse(Data(chunk.utf8)) + } + all += try parser.finalize() + XCTAssertEqual(all.map(\.int64), [123, 456, 789]) + } + func testStringElements() throws { var parser = JSONStreamParser(mode: .jsonArray) let data = Data("[\"hello\", \"world\"]".utf8) @@ -304,6 +386,66 @@ final class JSONIncrementalReaderTests: XCTestCase { // Expected } } + + func testInvalidJSONIsThrownNotDeferred() throws { + let reader = try JSONIncrementalReader(data: Data("{\"key\":}".utf8)) + do { + _ = try reader.finish() + XCTFail("Expected an error for malformed JSON") + } catch { + // Expected + } + } + + /// Regression: a JSON error message that happens to contain "Empty + /// content" or "Unexpected end" used to be misclassified as a + /// recoverable "need more data" condition. With error-code-based + /// detection, syntactic errors must surface immediately. + func testStructuralErrorIsNotMistakenForNeedMore() throws { + let reader = try JSONIncrementalReader(data: Data()) + // Empty buffer is recoverable. + do { + if let _ = try reader.feed(Data()) { + XCTFail("Empty buffer should not yield a document") + } + } catch { + XCTFail("Empty buffer should not throw, got: \(error)") + } + // But a structurally invalid chunk must throw. + let badReader = try JSONIncrementalReader() + do { + _ = try badReader.feed(Data("[1,]extra".utf8)) + XCTFail("Structurally invalid JSON should throw") + } catch { + // Expected + } + } + + /// `JSONIncrementalReader` is documented as `@unchecked Sendable`; this + /// exercises basic concurrent access to confirm the internal lock works. + func testConcurrentFeedDoesNotCrash() async throws { + let reader = try JSONIncrementalReader() + let chunks: [Data] = [ + Data("{\"a\":".utf8), + Data("[1,2,".utf8), + Data("3,4,".utf8), + Data("5]}".utf8) + ] + await withTaskGroup(of: Void.self) { group in + for chunk in chunks { + group.addTask { + _ = try? reader.feed(chunk) + } + } + } + // Best-effort: either we've already finished, or finish() completes it. + do { + let doc = try reader.finish() + XCTAssertNotNil(doc.root) + } catch { + // Race may have finished it via feed(); that's also acceptable. + } + } } // MARK: - Edge Cases From 9f7de5b31c14f6a723d1940992773f6fd3043f2e Mon Sep 17 00:00:00 2001 From: phoenix Date: Sat, 23 May 2026 12:48:13 +0800 Subject: [PATCH 4/4] fix(streaming): avoid Optional<~Copyable> in feed() for Linux Swift 5.10 `JSONDocument` is `~Copyable`, and `Optional<~Copyable>` is not yet supported on Swift 5.10 (the toolchain used by the Linux CI). The `JSONIncrementalReader.feed(_:)` API previously returned `JSONDocument?`, which compiled on macOS Swift 6 but failed on Linux with: error: noncopyable type 'JSONDocument' cannot be used with generic type 'Optional' yet Replace the optional return with a dedicated non-copyable enum: public enum JSONIncrementalReadResult: ~Copyable { case ready(JSONDocument) case needMoreData } This compiles cleanly on every supported toolchain and keeps the call-site ergonomic (switch with a let-binding instead of optional chaining). Updated tests and doc examples accordingly. Co-authored-by: Cursor --- Sources/ReerJSON/StreamParser.swift | 32 ++++++++++++++++----- Tests/ReerJSONTests/StreamParserTests.swift | 30 ++++++++++++------- 2 files changed, 44 insertions(+), 18 deletions(-) diff --git a/Sources/ReerJSON/StreamParser.swift b/Sources/ReerJSON/StreamParser.swift index ef4d1e4..4df3eff 100644 --- a/Sources/ReerJSON/StreamParser.swift +++ b/Sources/ReerJSON/StreamParser.swift @@ -396,6 +396,20 @@ public struct JSONStreamParser: Sendable { // MARK: - JSONIncrementalReader +/// The outcome of feeding a chunk of bytes to a ``JSONIncrementalReader``. +/// +/// This is intentionally a non-copyable enum rather than `JSONDocument?` +/// because ``JSONDocument`` is `~Copyable`, and `Optional<~Copyable>` is +/// not yet supported by Swift 5.10 (the minimum required by this package on +/// Linux). Using a custom enum keeps the API ergonomic on every supported +/// toolchain. +public enum JSONIncrementalReadResult: ~Copyable { + /// The reader has fully assembled a complete document. + case ready(JSONDocument) + /// The reader needs more input before a document can be produced. + case needMoreData +} + /// An incremental reader for a single large JSON document. /// /// Feed chunks of a single large JSON document with ``feed(_:)``. @@ -406,9 +420,12 @@ public struct JSONStreamParser: Sendable { /// ```swift /// let reader = try JSONIncrementalReader() /// for try await chunk in stream { -/// if let doc = try reader.feed(chunk) { +/// switch try reader.feed(chunk) { +/// case .ready(let doc): /// // doc.root is now available -/// break +/// return doc +/// case .needMoreData: +/// continue /// } /// } /// ``` @@ -454,11 +471,12 @@ public final class JSONIncrementalReader: @unchecked Sendable { /// Feeds more data and attempts to parse the accumulated buffer. /// /// - Parameter data: Additional JSON data (may be empty to retry). - /// - Returns: A ``JSONDocument`` if the buffer contains a complete document, - /// or `nil` if more data is needed. + /// - Returns: ``JSONIncrementalReadResult/ready(_:)`` if the buffer + /// contains a complete document, or + /// ``JSONIncrementalReadResult/needMoreData`` if more data is required. /// - Throws: ``JSONError`` for non-recoverable parse errors, or if the /// reader has already produced a complete document. - public func feed(_ data: Data) throws -> JSONDocument? { + public func feed(_ data: Data) throws -> JSONIncrementalReadResult { lock.lock(); defer { lock.unlock() } guard !finished else { throw JSONError.invalidJSON("Incremental reader already finished") @@ -467,9 +485,9 @@ public final class JSONIncrementalReader: @unchecked Sendable { switch try parser.read() { case .success(let doc): finished = true - return JSONDocument(_document: doc) + return .ready(JSONDocument(_document: doc)) case .needMoreData: - return nil + return .needMoreData } } diff --git a/Tests/ReerJSONTests/StreamParserTests.swift b/Tests/ReerJSONTests/StreamParserTests.swift index b59ab26..d4458bb 100644 --- a/Tests/ReerJSONTests/StreamParserTests.swift +++ b/Tests/ReerJSONTests/StreamParserTests.swift @@ -333,16 +333,18 @@ final class JSONIncrementalReaderTests: XCTestCase { func testMultipleChunks() throws { let reader = try JSONIncrementalReader(data: Data("{\"ke".utf8)) - // First feed should need more data - do { - if let doc = try reader.feed(Data("y\":\"val".utf8)) { - XCTFail("Should need more data, got doc with root: \(String(describing: doc.root))") - } + // First feed should need more data. + switch try reader.feed(Data("y\":\"val".utf8)) { + case .ready(let doc): + XCTFail("Should need more data, got doc with root: \(String(describing: doc.root))") + case .needMoreData: + break } - // Second feed should complete - if let doc = try reader.feed(Data("ue\"}".utf8)) { + // Second feed should complete. + switch try reader.feed(Data("ue\"}".utf8)) { + case .ready(let doc): XCTAssertEqual(doc.root?["key"]?.string, "value") - } else { + case .needMoreData: XCTFail("Should have completed parsing") } } @@ -360,12 +362,15 @@ final class JSONIncrementalReaderTests: XCTestCase { var offset = min(chunkSize, jsonData.count) var parsed = false - while offset < jsonData.count { + feedLoop: while offset < jsonData.count { let end = min(offset + chunkSize, jsonData.count) let chunk = Data(jsonData[offset..