diff --git a/Examples/CameraViewer/main.swift b/Examples/CameraViewer/main.swift index 54e32cf..cff1c25 100644 --- a/Examples/CameraViewer/main.swift +++ b/Examples/CameraViewer/main.swift @@ -330,7 +330,7 @@ final class CameraViewerDelegate: NSObject, NSApplicationDelegate { layer.enqueue(sample) } - case .rtcp: + case .metadata, .rtcp: break } } diff --git a/README.md b/README.md index a58431a..96f3cf8 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ A pure-Swift RTSP client library for streaming live video and audio from IP came - **H.264 and H.265/HEVC video** — depacketized to AVCC format, ready for VideoToolbox - **Audio** — AAC, PCMU, PCMA, G.722, G.726, L16, G.723.1 +- **ONVIF analytics metadata** — raw XML documents from the camera's `application` RTSP stream - **Zero dependencies** — only Foundation, Network, and CryptoKit - **Swift 6** — strict concurrency with async/await and AsyncThrowingStream @@ -46,6 +47,7 @@ let session = RTSPClientSession( let desc = try await session.start() // desc.videoCodec, desc.resolution, desc.sps, desc.pps, desc.vps // desc.audioCodec, desc.audioSampleRate, desc.audioChannels +// desc.metadataEncoding — non-nil if an ONVIF metadata stream is active // Consume depacketized frames for try await item in session.frames() { @@ -59,6 +61,10 @@ for try await item in session.frames() { // frame.data — raw audio bytes (codec-specific) // frame.codec, frame.sampleRate, frame.channels, frame.timestamp break + case .metadata(let frame): + // frame.data — raw payload (typically ONVIF XML, possibly GZIP-compressed) + // frame.encodingName, frame.timestamp, frame.loss + break case .rtcp: break } @@ -81,6 +87,11 @@ See [API.md](API.md) for the full API reference. - AAC (RFC 3640) with aggregation and fragmentation - PCMU (G.711 u-law), PCMA (G.711 A-law), L16, G.722, G.726, DVI4, G.723.1 +### Metadata +- ONVIF analytics metadata (`vnd.onvif.metadata`) per the ONVIF Streaming Specification +- Concatenates RTP payload fragments and emits a frame on the marker bit (end-of-document) +- Best-effort: malformed metadata SDP degrades to a diagnostic without aborting video/audio + ### Protocol - RTSP session management (DESCRIBE, SETUP, PLAY, TEARDOWN) - RTSP message parsing and serialization @@ -114,7 +125,7 @@ Sources/IPCamKit/ ├── RTSP/ RTSP message model, parser, serializer ├── SDP/ SDP session description parser (RFC 8866) ├── RTP/ RTP/RTCP packets, Timeline, ChannelMapping, InorderParser -├── Codec/ H.264/H.265 depacketizers, NAL/SPS/PPS parsing, audio depacketizers +├── Codec/ H.264/H.265 depacketizers, NAL/SPS/PPS parsing, audio + metadata depacketizers ├── Auth/ Basic and Digest authentication ├── Transport/ NWConnection TCP/UDP transport └── Client/ RTSP session, DESCRIBE/SETUP/PLAY parsers, Presentation @@ -122,7 +133,7 @@ Sources/IPCamKit/ ## Testing -90 tests across 15 suites covering RTSP parsing, SDP, RTP, H.264/H.265 depacketization, AAC, simple audio, authentication, and integration: +100+ tests across 15+ suites covering RTSP parsing, SDP, RTP, H.264/H.265 depacketization, AAC, simple audio, ONVIF metadata depacketization, authentication, and integration: ```bash swift test diff --git a/Sources/IPCamKit/Client/RTSPSession.swift b/Sources/IPCamKit/Client/RTSPSession.swift index e99afe3..eb9c251 100644 --- a/Sources/IPCamKit/Client/RTSPSession.swift +++ b/Sources/IPCamKit/Client/RTSPSession.swift @@ -68,6 +68,10 @@ public struct SessionDescription: Sendable { public let audioChannels: UInt16? /// Codec-specific extra data (e.g. AudioSpecificConfig for AAC). public let audioExtraData: Data? + + /// SDP encoding name of the analytics-metadata stream if one was set up + /// (e.g. `vnd.onvif.metadata`), or `nil` if no metadata stream is active. + public let metadataEncoding: String? } /// RTSP client session that manages the full RTSP lifecycle. @@ -139,10 +143,11 @@ public final class RTSPClientSession: Sendable { } } -/// A decoded frame (video or audio) exposed to consumers. +/// A decoded frame (video, audio, or metadata) exposed to consumers. public enum PublicCodecItem: Sendable { case video(PublicVideoFrame) case audio(PublicAudioFrame) + case metadata(PublicMetadataFrame) case rtcp(PublicRTCPPacket) } @@ -171,6 +176,23 @@ public struct PublicAudioFrame: Sendable { public let loss: UInt16 } +/// A metadata frame from an RTSP `application` stream (typically ONVIF analytics). +public struct PublicMetadataFrame: Sendable { + /// Raw payload bytes. For `vnd.onvif.metadata` this is a UTF-8 XML + /// document with root `tt:MetaDataStream`, optionally GZIP-compressed + /// (consult the SDP `encodingName` for the exact format). + public let data: Data + + /// Presentation timestamp in seconds, derived from the RTP timestamp. + public let timestamp: Double + + /// SDP encoding name (e.g. `vnd.onvif.metadata`). + public let encodingName: String + + /// Number of RTP packets lost before this frame. + public let loss: UInt16 +} + /// A video frame exposed to consumers. public struct PublicVideoFrame: Sendable { /// NAL units in AVCC format (4-byte big-endian length prefix + NAL bytes). @@ -243,12 +265,15 @@ actor SessionState { private var authenticator: RTSPAuthenticator? private var depacketizer: VideoDepacketizer? private var audioDepacketizer: AudioDepacketizer? + private var applicationDepacketizer: ApplicationDepacketizer? private var url: String? private var videoStreamIndex: Int? private var audioStreamIndex: Int? + private var applicationStreamIndex: Int? private var audioEncodingName: String? private var audioClockRate: UInt32? private var audioChannels: UInt16? + private var applicationEncodingName: String? private var channelMappings = ChannelMappings() private var inorderParsers: [Int: InorderParser] = [:] private var userAgent: String? @@ -375,6 +400,68 @@ actor SessionState { audioChannels = audioStream.channels } + // Find and SETUP application (metadata) stream — optional, best-effort. + // If SETUP fails (camera advertises the stream but rejects it, or any + // transport error), disable metadata locally rather than aborting the + // session. The channel slot stays assigned (no other streams follow). + var applicationIdx = presMut.streams.firstIndex(where: { s in + s.media == "application" && isApplicationEncodingSupported(s.encodingName) + }) + var applicationSetupSSRC: UInt32? + + if let idx = applicationIdx { + let applicationStream = presMut.streams[idx] + let applicationSetupURL = applicationStream.control ?? url + var applicationSetupHeaders: [(String, String)] = [] + if transport == .tcp { + let applicationChannelId = channelMappings.nextUnassigned() ?? 4 + applicationSetupHeaders.append( + ( + "Transport", + "RTP/AVP/TCP;unicast;interleaved=\(applicationChannelId)-\(applicationChannelId + 1)" + )) + try channelMappings.assign( + channelId: applicationChannelId, streamIndex: idx) + } else { + applicationSetupHeaders.append(("Transport", "RTP/AVP;unicast")) + } + if let sid = sessionId { + applicationSetupHeaders.append(("Session", sid)) + } + + do { + let applicationSetupResp = try await sendRequest( + method: .setup, url: applicationSetupURL, + extraHeaders: applicationSetupHeaders) + let applicationSetup = try parseSetup(response: applicationSetupResp) + if let prev = sessionId, prev != applicationSetup.session.id { + onDiagnostic?( + RTSPDiagnostic( + severity: .warning, + message: + "Camera issued a new Session ID at application SETUP " + + "(\(prev) -> \(applicationSetup.session.id)); rolling forward.")) + } + sessionId = applicationSetup.session.id + applicationSetupSSRC = applicationSetup.ssrc + presMut.streams[idx].state = .setup( + StreamStateInit( + ssrc: applicationSetup.ssrc, initialSeq: nil, + initialRtptime: nil, ctx: .dummy)) + + applicationStreamIndex = idx + applicationEncodingName = applicationStream.encodingName + } catch { + onDiagnostic?( + RTSPDiagnostic( + severity: .warning, + message: + "Application SETUP failed: \(error); " + + "metadata will not be delivered.")) + applicationIdx = nil + } + } + // PLAY var playHeaders: [(String, String)] = [] if let sid = sessionId { @@ -459,6 +546,44 @@ actor SessionState { } } + // Initialize application (metadata) depacketizer + inorder parser. + // Best-effort: if the timeline can't be built (e.g. malformed clock + // rate in SDP), disable the stream rather than failing the session. + if let applicationIdx = applicationIdx { + let applicationStream = presMut.streams[applicationIdx] + + var applicationStart: UInt32? + var applicationSeq: UInt16? + var resolvedApplicationSsrc = applicationSetupSSRC + + if case .setup(let init_) = presMut.streams[applicationIdx].state { + applicationStart = init_.initialRtptime + if let seq = init_.initialSeq, seq != 0, seq != 1 { + applicationSeq = seq + } + if let s = init_.ssrc { resolvedApplicationSsrc = s } + } + + do { + let applicationTimeline = try Timeline( + start: applicationStart, clockRate: applicationStream.clockRateHz) + inorderParsers[applicationIdx] = InorderParser( + ssrc: resolvedApplicationSsrc, nextSeq: applicationSeq, + isTcp: transport == .tcp, timeline: applicationTimeline, + onDiagnostic: onDiagnostic) + applicationDepacketizer = ApplicationDepacketizer(onDiagnostic: onDiagnostic) + } catch { + onDiagnostic?( + RTSPDiagnostic( + severity: .warning, + message: + "Failed to initialize application stream: \(error); " + + "metadata will not be delivered.")) + applicationStreamIndex = nil + applicationEncodingName = nil + } + } + isPlaying = true // Build session description @@ -498,7 +623,8 @@ actor SessionState { audioCodec: resolvedAudioCodec, audioSampleRate: resolvedAudioRate, audioChannels: resolvedAudioChannels, - audioExtraData: audioDepacketizer?.audioParameters?.extraData + audioExtraData: audioDepacketizer?.audioParameters?.extraData, + metadataEncoding: applicationEncodingName ) } @@ -577,6 +703,33 @@ actor SessionState { } } audioDepacketizer = depkt + } else if let applicationIdx = applicationStreamIndex, + mapping.streamIndex == applicationIdx + { + guard var depkt = applicationDepacketizer else { continue } + if let pkt = try parser.rtp( + data: interleaved.data, ctx: .dummy, + streamId: mapping.streamIndex, streamCtx: .dummy) + { + try depkt.push(pkt) + while let result = depkt.pull() { + switch result { + case .success(.metadataFrame(let frame)): + let publicFrame = PublicMetadataFrame( + data: frame.data, + timestamp: frame.timestamp.elapsedSeconds, + encodingName: applicationEncodingName ?? "", + loss: frame.loss + ) + continuation.yield(.metadata(publicFrame)) + case .failure(let err): + throw RTSPError.depacketizationError("Metadata depacketization failed: \(err)") + default: + break + } + } + } + applicationDepacketizer = depkt } inorderParsers[mapping.streamIndex] = parser @@ -735,6 +888,15 @@ actor SessionState { } } + private func isApplicationEncodingSupported(_ name: String) -> Bool { + switch name { + case "vnd.onvif.metadata": + return true + default: + return false + } + } + private func publicAudioCodec(from encoding: String) -> PublicAudioCodec { switch encoding { case "mpeg4-generic": return .aac diff --git a/Sources/IPCamKit/Codec/ApplicationDepacketizer.swift b/Sources/IPCamKit/Codec/ApplicationDepacketizer.swift new file mode 100644 index 0000000..02c43c0 --- /dev/null +++ b/Sources/IPCamKit/Codec/ApplicationDepacketizer.swift @@ -0,0 +1,124 @@ +// Copyright (c) 2025 Steel Brain +// SPDX-License-Identifier: MIT +// Depacketizer for RTSP `application` streams carrying ONVIF analytics metadata. + +import Foundation + +/// Depacketizer for the ONVIF metadata RTP payload format (`vnd.onvif.metadata`). +/// +/// Per the ONVIF Streaming Specification, payloads concatenate across packets +/// until the RTP marker bit, which signals end-of-document. The loss count is +/// forwarded to the next completed frame so consumers can detect dropped events. +/// +/// Recovery semantics: +/// - Loss mid-document discards the buffered prefix and drops the rest of +/// that document (until the next marker). The loss surfaces on the next +/// clean frame. +/// - Buffer overflow (oversized document) discards the prefix, fires a +/// `warning` diagnostic, and drops the rest of that document until the +/// next marker — same as the loss case. The next document emits normally. +struct ApplicationDepacketizer: Sendable { + /// Hard cap on a single accumulated document. ONVIF places no limit on + /// document size, but real metadata documents are on the order of a few KB + /// per second; anything above this points at a malformed stream. + static let maxFragmentBytes = 1 << 20 // 1 MiB + + private var buffer = Data() + /// Packet context + timestamp of the last packet appended into `buffer`. + private var lastCtx: PacketContext = .dummy + private var lastStreamId: Int = 0 + private var lastTimestamp: Timestamp? + /// Loss accumulated across packets contributing to the in-progress document. + /// Cleared once a frame is emitted; carried across drops so the consumer + /// always sees a non-zero loss on the next good frame. + private var pendingLoss: UInt32 = 0 + /// True after a mid-document loss; the document under construction is + /// abandoned and we wait for the next marker before resuming. + private var dropUntilMark: Bool = false + /// True after an overflow diagnostic has fired since the last marker; used + /// to avoid spamming diagnostics if every packet in the same in-flight + /// document keeps overshooting the cap. + private var warnedSinceMark: Bool = false + private var ready: MetadataFrame? + private let onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? + + init(onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? = nil) { + self.onDiagnostic = onDiagnostic + } + + mutating func push(_ pkt: ReceivedRTPPacket) throws { + precondition(ready == nil, "push() called before pull() drained the previous frame") + pendingLoss = pendingLoss + UInt32(pkt.loss) + + var skipAppend = false + + if pkt.loss > 0 && !buffer.isEmpty { + // Lost a packet mid-document — the buffered prefix is unusable. + buffer.removeAll(keepingCapacity: true) + lastTimestamp = nil + dropUntilMark = true + } + + if buffer.count + pkt.payload.count > Self.maxFragmentBytes { + if !warnedSinceMark { + onDiagnostic?( + RTSPDiagnostic( + severity: .warning, + message: + "Metadata document exceeded \(Self.maxFragmentBytes) bytes; " + + "dropping until next marker.")) + warnedSinceMark = true + } + buffer.removeAll(keepingCapacity: true) + lastTimestamp = nil + dropUntilMark = true + skipAppend = true + } + + if !dropUntilMark && !skipAppend { + buffer.append(pkt.payload) + lastCtx = pkt.ctx + lastStreamId = pkt.streamId + lastTimestamp = pkt.timestamp + } + + guard pkt.mark else { return } + + // Mark observed. Empty buffer means there's nothing to emit (drop, or + // marker on an empty payload). Carry `pendingLoss` forward. + if dropUntilMark || buffer.isEmpty { + buffer.removeAll(keepingCapacity: true) + lastTimestamp = nil + dropUntilMark = false + warnedSinceMark = false + return + } + + // `lastTimestamp` is set on every append, and we only reach here when + // `buffer` is non-empty — so the guard is unreachable in practice. + guard let ts = lastTimestamp else { + buffer.removeAll(keepingCapacity: true) + dropUntilMark = false + warnedSinceMark = false + return + } + + let loss = UInt16(clamping: pendingLoss) + pendingLoss = 0 + warnedSinceMark = false + ready = MetadataFrame( + ctx: lastCtx, + streamId: lastStreamId, + timestamp: ts, + loss: loss, + data: buffer + ) + buffer.removeAll(keepingCapacity: true) + } + + mutating func pull() -> Result? { + guard let frame = ready else { return nil } + ready = nil + return .success(.metadataFrame(frame)) + } +} diff --git a/Sources/IPCamKit/Codec/MetadataFrame.swift b/Sources/IPCamKit/Codec/MetadataFrame.swift new file mode 100644 index 0000000..986153c --- /dev/null +++ b/Sources/IPCamKit/Codec/MetadataFrame.swift @@ -0,0 +1,27 @@ +// Copyright (c) 2025 Steel Brain +// SPDX-License-Identifier: MIT +// Frame type for RTSP `application` media streams (ONVIF analytics metadata). + +import Foundation + +/// A depacketized metadata frame from an RTSP `application` stream. +/// +/// Per the ONVIF Streaming Specification, the payload is an XML document with +/// root node `tt:MetaDataStream`, optionally GZIP-compressed; the RTP marker +/// bit signals end-of-document. +struct MetadataFrame: Sendable, Equatable { + /// Context of the last packet in this frame. + var ctx: PacketContext + + /// Stream index. + var streamId: Int + + /// RTP timestamp of this frame. + var timestamp: Timestamp + + /// Number of RTP packets lost before or during this frame. + var loss: UInt16 + + /// Raw payload bytes — typically UTF-8 XML, may be GZIP-compressed. + var data: Data +} diff --git a/Sources/IPCamKit/Codec/VideoFrame.swift b/Sources/IPCamKit/Codec/VideoFrame.swift index 676ac28..3ad2440 100644 --- a/Sources/IPCamKit/Codec/VideoFrame.swift +++ b/Sources/IPCamKit/Codec/VideoFrame.swift @@ -69,4 +69,5 @@ struct DepacketizeError: Error, Sendable, Equatable, CustomStringConvertible { enum CodecItem: Sendable { case videoFrame(VideoFrame) case audioFrame(AudioFrame) + case metadataFrame(MetadataFrame) } diff --git a/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift b/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift new file mode 100644 index 0000000..f0a33d1 --- /dev/null +++ b/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift @@ -0,0 +1,231 @@ +// Copyright (c) 2025 Steel Brain +// SPDX-License-Identifier: MIT +// Tests for ApplicationDepacketizer (ONVIF metadata RTP payload). + +import Foundation +import Testing + +@testable import IPCamKit + +@Suite("Application Depacketizer Tests") +struct ApplicationDepacketizerTests { + + // MARK: - Helpers + + private func ts(_ value: UInt32) -> Timestamp { + Timestamp(timestamp: Int64(value), clockRate: 90_000, start: 0)! + } + + private func makeMetadataPacket( + seq: UInt16, timestamp: UInt32, mark: Bool, loss: UInt16 = 0, + payload: Data + ) -> ReceivedRTPPacket { + let builder = ReceivedPacketBuilder( + ctx: .dummy, streamId: 0, sequenceNumber: seq, + timestamp: ts(timestamp), payloadType: 107, ssrc: 0x12_34_56_78, + mark: mark, loss: loss) + return try! builder.build(payload: payload).get() + } + + private func pullFrame( + _ d: inout ApplicationDepacketizer, _ comment: Comment? = nil + ) -> MetadataFrame? { + guard let result = d.pull() else { return nil } + switch result { + case .success(.metadataFrame(let frame)): + return frame + default: + Issue.record(comment ?? "Expected metadataFrame, got \(result)") + return nil + } + } + + // MARK: - Happy path + + @Test("Single-packet document emits one frame with marker bit") + func singlePacketDocument() throws { + var d = ApplicationDepacketizer() + let xml = Data("".utf8) + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: true, payload: xml)) + let frame = pullFrame(&d) + #expect(frame?.data == xml) + #expect(frame?.loss == 0) + #expect(frame?.timestamp == ts(1000)) + #expect(d.pull() == nil) + } + + @Test("Multi-packet document concatenates payload and emits on marker") + func multiPacketDocument() throws { + var d = ApplicationDepacketizer() + try d.push( + makeMetadataPacket(seq: 0, timestamp: 1000, mark: false, payload: Data("".utf8))) + #expect(d.pull() == nil) + try d.push(makeMetadataPacket(seq: 1, timestamp: 1000, mark: false, payload: Data("body".utf8))) + #expect(d.pull() == nil) + try d.push( + makeMetadataPacket(seq: 2, timestamp: 1000, mark: true, payload: Data("".utf8))) + let frame = pullFrame(&d) + #expect(frame?.data == Data("body".utf8)) + #expect(frame?.loss == 0) + // Timestamp reflects the last packet (marker packet). + #expect(frame?.timestamp == ts(1000)) + #expect(d.pull() == nil) + } + + @Test("Two consecutive documents emit independently") + func twoConsecutiveDocuments() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: true, payload: Data("doc1".utf8))) + let f1 = pullFrame(&d) + #expect(f1?.data == Data("doc1".utf8)) + #expect(f1?.loss == 0) + try d.push( + makeMetadataPacket(seq: 1, timestamp: 91_000, mark: true, payload: Data("doc2".utf8))) + let f2 = pullFrame(&d) + #expect(f2?.data == Data("doc2".utf8)) + #expect(f2?.loss == 0) + } + + // MARK: - Loss + + @Test("Initial loss surfaces on the first emitted frame") + func initialLossOnFirstFrame() throws { + var d = ApplicationDepacketizer() + try d.push( + makeMetadataPacket(seq: 3, timestamp: 1000, mark: true, loss: 3, payload: Data("ok".utf8))) + let frame = pullFrame(&d) + #expect(frame?.loss == 3) + #expect(frame?.data == Data("ok".utf8)) + } + + @Test("Loss between two complete documents is reported on the second") + func lossBetweenDocuments() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: true, payload: Data("a".utf8))) + #expect(pullFrame(&d)?.loss == 0) + try d.push( + makeMetadataPacket(seq: 5, timestamp: 2000, mark: true, loss: 4, payload: Data("b".utf8))) + let frame = pullFrame(&d) + #expect(frame?.loss == 4) + #expect(frame?.data == Data("b".utf8)) + } + + @Test("Loss on the marker packet with a non-empty prefix drops the doc") + func lossOnMarkerWithPrefix() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: false, payload: Data("aaa".utf8))) + // Marker packet itself carries loss > 0 — the prefix is unusable and + // we can't trust the marker packet's payload either. + try d.push( + makeMetadataPacket(seq: 2, timestamp: 1000, mark: true, loss: 2, payload: Data("bbb".utf8))) + #expect(d.pull() == nil) + // Loss is carried to the next clean document. + try d.push(makeMetadataPacket(seq: 3, timestamp: 2000, mark: true, payload: Data("ok".utf8))) + let frame = pullFrame(&d) + #expect(frame?.loss == 2) + #expect(frame?.data == Data("ok".utf8)) + } + + @Test("Mid-document loss discards prefix and carries loss to next clean frame") + func midDocumentLossDiscards() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: false, payload: Data("aaa".utf8))) + // Loss in the middle of the document — the buffered "aaa" is unusable. + try d.push( + makeMetadataPacket(seq: 2, timestamp: 1000, mark: false, loss: 1, payload: Data("bbb".utf8))) + // Marker closes the unusable document — no frame emitted. + try d.push(makeMetadataPacket(seq: 3, timestamp: 1000, mark: true, payload: Data("ccc".utf8))) + #expect(d.pull() == nil) + // Next clean document carries the loss=1. + try d.push(makeMetadataPacket(seq: 4, timestamp: 2000, mark: true, payload: Data("ok".utf8))) + let frame = pullFrame(&d) + #expect(frame?.loss == 1) + #expect(frame?.data == Data("ok".utf8)) + } + + // MARK: - Cap and recovery + + @Test("Document over cap fires diagnostic, drops, and recovers on next marker") + func overflowDiagnosticAndRecovery() throws { + final class Box: @unchecked Sendable { + var diagnostics: [RTSPDiagnostic] = [] + } + let box = Box() + var d = ApplicationDepacketizer { box.diagnostics.append($0) } + + // RTP packets are capped at 64 KiB by the transport, so simulate + // overflow with many 60 KiB unmarked packets that together exceed + // the 1 MiB depacketizer cap. + let chunk = Data(repeating: 0x42, count: 60_000) + for seq in 0..<20 { + try d.push( + makeMetadataPacket(seq: UInt16(seq), timestamp: 1000, mark: false, payload: chunk)) + } + #expect(d.pull() == nil) + #expect(box.diagnostics.count == 1) + #expect(box.diagnostics.first?.severity == .warning) + + // Marker on the abandoned document — no emit, depacketizer resets. + try d.push( + makeMetadataPacket(seq: 100, timestamp: 1000, mark: true, payload: Data("end".utf8))) + #expect(d.pull() == nil) + + // Next clean document emits normally. + try d.push( + makeMetadataPacket(seq: 101, timestamp: 2000, mark: true, payload: Data("clean".utf8))) + let frame = pullFrame(&d) + #expect(frame?.data == Data("clean".utf8)) + } + + @Test("Repeated overflow within one in-flight document emits only one warning") + func overflowWarningIsRateLimited() throws { + final class Box: @unchecked Sendable { + var diagnostics: [RTSPDiagnostic] = [] + } + let box = Box() + var d = ApplicationDepacketizer { box.diagnostics.append($0) } + + let chunk = Data(repeating: 0x00, count: 60_000) + // First overflow cycle: accumulate past 1 MiB, see one warning. + for seq in 0..<20 { + try d.push( + makeMetadataPacket(seq: UInt16(seq), timestamp: 1000, mark: false, payload: chunk)) + } + #expect(box.diagnostics.count == 1) + + // Continue piling on more packets in the same in-flight document — + // each one is dropped while waiting for the marker, no new warnings. + for seq in 20..<40 { + try d.push( + makeMetadataPacket(seq: UInt16(seq), timestamp: 1000, mark: false, payload: chunk)) + } + #expect(box.diagnostics.count == 1) + + // Marker resets the depacketizer. A fresh overflow then fires a new warning. + try d.push( + makeMetadataPacket(seq: 100, timestamp: 1000, mark: true, payload: Data("end".utf8))) + for seq in 101..<121 { + try d.push( + makeMetadataPacket(seq: UInt16(seq), timestamp: 2000, mark: false, payload: chunk)) + } + #expect(box.diagnostics.count == 2) + } + + // MARK: - Edge cases + + @Test("Marker on empty payload at idle emits nothing") + func emptyPayloadWithMarkerAtIdle() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: true, payload: Data())) + #expect(d.pull() == nil) + } + + @Test("Marker on empty payload mid-document still emits buffered bytes") + func emptyPayloadMarkerEmitsBufferedBytes() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: false, payload: Data("xyz".utf8))) + try d.push(makeMetadataPacket(seq: 1, timestamp: 1000, mark: true, payload: Data())) + let frame = pullFrame(&d) + #expect(frame?.data == Data("xyz".utf8)) + } +}