From 8d03924f4c39c9a617ca5faa69e7b4918e02c5d4 Mon Sep 17 00:00:00 2001 From: Anees Iqbal Date: Wed, 20 May 2026 05:03:41 +0200 Subject: [PATCH 1/7] :new: Add ApplicationDepacketizer for ONVIF metadata streams Accumulates RTP payload bytes across packets and emits a MetadataFrame when the marker bit fires (per ONVIF Streaming Spec: marker = end of XML document). Mid-document loss and oversized documents both discard the in-flight prefix and drop until the next marker; loss is preserved across drops so it surfaces on the next clean frame. Extends internal CodecItem with .metadataFrame; the public surface is wired up in subsequent commits. --- .../Codec/ApplicationDepacketizer.swift | 119 ++++++++++++++++++ Sources/IPCamKit/Codec/MetadataFrame.swift | 27 ++++ Sources/IPCamKit/Codec/VideoFrame.swift | 1 + 3 files changed, 147 insertions(+) create mode 100644 Sources/IPCamKit/Codec/ApplicationDepacketizer.swift create mode 100644 Sources/IPCamKit/Codec/MetadataFrame.swift diff --git a/Sources/IPCamKit/Codec/ApplicationDepacketizer.swift b/Sources/IPCamKit/Codec/ApplicationDepacketizer.swift new file mode 100644 index 0000000..3c342ac --- /dev/null +++ b/Sources/IPCamKit/Codec/ApplicationDepacketizer.swift @@ -0,0 +1,119 @@ +// Copyright (c) 2025 Steel Brain +// SPDX-License-Identifier: MIT +// Depacketizer for RTSP `application` streams carrying ONVIF analytics metadata. + +import Foundation + +/// Depacketizer for the ONVIF metadata RTP payload format (`vnd.onvif.metadata`). +/// +/// Per the ONVIF Streaming Specification, payloads concatenate across packets +/// until the RTP marker bit, which signals end-of-document. The loss count is +/// forwarded to the next completed frame so consumers can detect dropped events. +/// +/// Recovery semantics: +/// - Loss mid-document discards the buffered prefix and drops the rest of +/// that document (until the next marker). The loss surfaces on the next +/// clean frame. +/// - Buffer overflow (oversized document) discards the prefix, fires a +/// `warning` diagnostic, and drops the rest of that document until the +/// next marker — same as the loss case. The next document emits normally. +struct ApplicationDepacketizer: Sendable { + /// Hard cap on a single accumulated document. ONVIF places no limit on + /// document size, but real metadata documents are on the order of a few KB + /// per second; anything above this points at a malformed stream. + static let maxFragmentBytes = 1 << 20 // 1 MiB + + private var buffer = Data() + /// Packet context + timestamp of the last packet appended into `buffer`. + private var lastCtx: PacketContext = .dummy + private var lastStreamId: Int = 0 + private var lastTimestamp: Timestamp? + /// Loss accumulated across packets contributing to the in-progress document. + /// Cleared once a frame is emitted; carried across drops so the consumer + /// always sees a non-zero loss on the next good frame. + private var pendingLoss: UInt32 = 0 + /// True after a mid-document loss; the document under construction is + /// abandoned and we wait for the next marker before resuming. + private var dropUntilMark: Bool = false + /// True after an overflow diagnostic has fired since the last marker; used + /// to avoid spamming diagnostics if every packet in the same in-flight + /// document keeps overshooting the cap. + private var warnedSinceMark: Bool = false + private var ready: MetadataFrame? + private let onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? + + init(onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? = nil) { + self.onDiagnostic = onDiagnostic + } + + mutating func push(_ pkt: ReceivedRTPPacket) throws { + pendingLoss = pendingLoss + UInt32(pkt.loss) + + var skipAppend = false + + if pkt.loss > 0 && !buffer.isEmpty { + // Lost a packet mid-document — the buffered prefix is unusable. + buffer.removeAll(keepingCapacity: true) + dropUntilMark = true + } + + if buffer.count + pkt.payload.count > Self.maxFragmentBytes { + if !warnedSinceMark { + onDiagnostic?( + RTSPDiagnostic( + severity: .warning, + message: + "Metadata document exceeded \(Self.maxFragmentBytes) bytes; " + + "dropping until next marker.")) + warnedSinceMark = true + } + buffer.removeAll(keepingCapacity: true) + dropUntilMark = true + skipAppend = true + } + + if !dropUntilMark && !skipAppend { + buffer.append(pkt.payload) + lastCtx = pkt.ctx + lastStreamId = pkt.streamId + lastTimestamp = pkt.timestamp + } + + guard pkt.mark else { return } + + // Mark observed. Empty buffer means there's nothing to emit (drop, or + // marker on an empty payload). Carry `pendingLoss` forward. + if dropUntilMark || buffer.isEmpty { + buffer.removeAll(keepingCapacity: true) + dropUntilMark = false + warnedSinceMark = false + return + } + + // `lastTimestamp` is set on every append, and we only reach here when + // `buffer` is non-empty — so the guard is unreachable in practice. + guard let ts = lastTimestamp else { + buffer.removeAll(keepingCapacity: true) + warnedSinceMark = false + return + } + + let loss = UInt16(clamping: pendingLoss) + pendingLoss = 0 + warnedSinceMark = false + ready = MetadataFrame( + ctx: lastCtx, + streamId: lastStreamId, + timestamp: ts, + loss: loss, + data: buffer + ) + buffer.removeAll(keepingCapacity: true) + } + + mutating func pull() -> Result? { + guard let frame = ready else { return nil } + ready = nil + return .success(.metadataFrame(frame)) + } +} diff --git a/Sources/IPCamKit/Codec/MetadataFrame.swift b/Sources/IPCamKit/Codec/MetadataFrame.swift new file mode 100644 index 0000000..986153c --- /dev/null +++ b/Sources/IPCamKit/Codec/MetadataFrame.swift @@ -0,0 +1,27 @@ +// Copyright (c) 2025 Steel Brain +// SPDX-License-Identifier: MIT +// Frame type for RTSP `application` media streams (ONVIF analytics metadata). + +import Foundation + +/// A depacketized metadata frame from an RTSP `application` stream. +/// +/// Per the ONVIF Streaming Specification, the payload is an XML document with +/// root node `tt:MetaDataStream`, optionally GZIP-compressed; the RTP marker +/// bit signals end-of-document. +struct MetadataFrame: Sendable, Equatable { + /// Context of the last packet in this frame. + var ctx: PacketContext + + /// Stream index. + var streamId: Int + + /// RTP timestamp of this frame. + var timestamp: Timestamp + + /// Number of RTP packets lost before or during this frame. + var loss: UInt16 + + /// Raw payload bytes — typically UTF-8 XML, may be GZIP-compressed. + var data: Data +} diff --git a/Sources/IPCamKit/Codec/VideoFrame.swift b/Sources/IPCamKit/Codec/VideoFrame.swift index 676ac28..3ad2440 100644 --- a/Sources/IPCamKit/Codec/VideoFrame.swift +++ b/Sources/IPCamKit/Codec/VideoFrame.swift @@ -69,4 +69,5 @@ struct DepacketizeError: Error, Sendable, Equatable, CustomStringConvertible { enum CodecItem: Sendable { case videoFrame(VideoFrame) case audioFrame(AudioFrame) + case metadataFrame(MetadataFrame) } From 5b19f26cd8c22a94564f27445ae8c5726d4db704 Mon Sep 17 00:00:00 2001 From: Anees Iqbal Date: Wed, 20 May 2026 05:07:31 +0200 Subject: [PATCH 2/7] :new: Wire ONVIF metadata streams into RTSPSession MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Discover the application m-line during DESCRIBE, SETUP it alongside audio (best-effort — the rest of the session continues if the metadata stream can't be initialized), and dispatch incoming RTP packets to the ApplicationDepacketizer. Surface frames as PublicCodecItem.metadata carrying raw bytes, timestamp, encoding name, and loss count. Limited to vnd.onvif.metadata encoding for now; other application encodings fall through to the same not-supported path as unknown audio. --- Examples/CameraViewer/main.swift | 2 +- Sources/IPCamKit/Client/RTSPSession.swift | 145 +++++++++++++++++++++- 2 files changed, 145 insertions(+), 2 deletions(-) diff --git a/Examples/CameraViewer/main.swift b/Examples/CameraViewer/main.swift index 54e32cf..cff1c25 100644 --- a/Examples/CameraViewer/main.swift +++ b/Examples/CameraViewer/main.swift @@ -330,7 +330,7 @@ final class CameraViewerDelegate: NSObject, NSApplicationDelegate { layer.enqueue(sample) } - case .rtcp: + case .metadata, .rtcp: break } } diff --git a/Sources/IPCamKit/Client/RTSPSession.swift b/Sources/IPCamKit/Client/RTSPSession.swift index e99afe3..e562054 100644 --- a/Sources/IPCamKit/Client/RTSPSession.swift +++ b/Sources/IPCamKit/Client/RTSPSession.swift @@ -139,10 +139,11 @@ public final class RTSPClientSession: Sendable { } } -/// A decoded frame (video or audio) exposed to consumers. +/// A decoded frame (video, audio, or metadata) exposed to consumers. public enum PublicCodecItem: Sendable { case video(PublicVideoFrame) case audio(PublicAudioFrame) + case metadata(PublicMetadataFrame) case rtcp(PublicRTCPPacket) } @@ -171,6 +172,23 @@ public struct PublicAudioFrame: Sendable { public let loss: UInt16 } +/// A metadata frame from an RTSP `application` stream (typically ONVIF analytics). +public struct PublicMetadataFrame: Sendable { + /// Raw payload bytes. For `vnd.onvif.metadata` this is a UTF-8 XML + /// document with root `tt:MetaDataStream`, optionally GZIP-compressed + /// (consult the SDP `encodingName` for the exact format). + public let data: Data + + /// Presentation timestamp in seconds, derived from the RTP timestamp. + public let timestamp: Double + + /// SDP encoding name (e.g. `vnd.onvif.metadata`). + public let encodingName: String + + /// Number of RTP packets lost before this frame. + public let loss: UInt16 +} + /// A video frame exposed to consumers. public struct PublicVideoFrame: Sendable { /// NAL units in AVCC format (4-byte big-endian length prefix + NAL bytes). @@ -243,12 +261,15 @@ actor SessionState { private var authenticator: RTSPAuthenticator? private var depacketizer: VideoDepacketizer? private var audioDepacketizer: AudioDepacketizer? + private var applicationDepacketizer: ApplicationDepacketizer? private var url: String? private var videoStreamIndex: Int? private var audioStreamIndex: Int? + private var applicationStreamIndex: Int? private var audioEncodingName: String? private var audioClockRate: UInt32? private var audioChannels: UInt16? + private var applicationEncodingName: String? private var channelMappings = ChannelMappings() private var inorderParsers: [Int: InorderParser] = [:] private var userAgent: String? @@ -375,6 +396,54 @@ actor SessionState { audioChannels = audioStream.channels } + // Find and SETUP application (metadata) stream — optional, best-effort. + let applicationIdx = presMut.streams.firstIndex(where: { s in + s.media == "application" && isApplicationEncodingSupported(s.encodingName) + }) + var applicationSetupSSRC: UInt32? + + if let applicationIdx = applicationIdx { + let applicationStream = presMut.streams[applicationIdx] + let applicationSetupURL = applicationStream.control ?? url + var applicationSetupHeaders: [(String, String)] = [] + if transport == .tcp { + let applicationChannelId = channelMappings.nextUnassigned() ?? 4 + applicationSetupHeaders.append( + ( + "Transport", + "RTP/AVP/TCP;unicast;interleaved=\(applicationChannelId)-\(applicationChannelId + 1)" + )) + try channelMappings.assign( + channelId: applicationChannelId, streamIndex: applicationIdx) + } else { + applicationSetupHeaders.append(("Transport", "RTP/AVP;unicast")) + } + if let sid = sessionId { + applicationSetupHeaders.append(("Session", sid)) + } + let applicationSetupResp = try await sendRequest( + method: .setup, url: applicationSetupURL, + extraHeaders: applicationSetupHeaders) + let applicationSetup = try parseSetup(response: applicationSetupResp) + if let prev = sessionId, prev != applicationSetup.session.id { + onDiagnostic?( + RTSPDiagnostic( + severity: .warning, + message: + "Camera issued a new Session ID at application SETUP " + + "(\(prev) -> \(applicationSetup.session.id)); rolling forward.")) + } + sessionId = applicationSetup.session.id + applicationSetupSSRC = applicationSetup.ssrc + presMut.streams[applicationIdx].state = .setup( + StreamStateInit( + ssrc: applicationSetup.ssrc, initialSeq: nil, + initialRtptime: nil, ctx: .dummy)) + + applicationStreamIndex = applicationIdx + applicationEncodingName = applicationStream.encodingName + } + // PLAY var playHeaders: [(String, String)] = [] if let sid = sessionId { @@ -459,6 +528,44 @@ actor SessionState { } } + // Initialize application (metadata) depacketizer + inorder parser. + // Best-effort: if the timeline can't be built (e.g. malformed clock + // rate in SDP), disable the stream rather than failing the session. + if let applicationIdx = applicationIdx { + let applicationStream = presMut.streams[applicationIdx] + + var applicationStart: UInt32? + var applicationSeq: UInt16? + var resolvedApplicationSsrc = applicationSetupSSRC + + if case .setup(let init_) = presMut.streams[applicationIdx].state { + applicationStart = init_.initialRtptime + if let seq = init_.initialSeq, seq != 0, seq != 1 { + applicationSeq = seq + } + if let s = init_.ssrc { resolvedApplicationSsrc = s } + } + + do { + let applicationTimeline = try Timeline( + start: applicationStart, clockRate: applicationStream.clockRateHz) + inorderParsers[applicationIdx] = InorderParser( + ssrc: resolvedApplicationSsrc, nextSeq: applicationSeq, + isTcp: transport == .tcp, timeline: applicationTimeline, + onDiagnostic: onDiagnostic) + applicationDepacketizer = ApplicationDepacketizer(onDiagnostic: onDiagnostic) + } catch { + onDiagnostic?( + RTSPDiagnostic( + severity: .warning, + message: + "Failed to initialize application stream: \(error); " + + "metadata will not be delivered.")) + applicationStreamIndex = nil + applicationEncodingName = nil + } + } + isPlaying = true // Build session description @@ -577,6 +684,33 @@ actor SessionState { } } audioDepacketizer = depkt + } else if let applicationIdx = applicationStreamIndex, + mapping.streamIndex == applicationIdx + { + guard var depkt = applicationDepacketizer else { continue } + if let pkt = try parser.rtp( + data: interleaved.data, ctx: .dummy, + streamId: mapping.streamIndex, streamCtx: .dummy) + { + try depkt.push(pkt) + while let result = depkt.pull() { + switch result { + case .success(.metadataFrame(let frame)): + let publicFrame = PublicMetadataFrame( + data: frame.data, + timestamp: frame.timestamp.elapsedSeconds, + encodingName: applicationEncodingName ?? "", + loss: frame.loss + ) + continuation.yield(.metadata(publicFrame)) + case .failure(let err): + throw RTSPError.depacketizationError("Metadata depacketization failed: \(err)") + default: + break + } + } + } + applicationDepacketizer = depkt } inorderParsers[mapping.streamIndex] = parser @@ -735,6 +869,15 @@ actor SessionState { } } + private func isApplicationEncodingSupported(_ name: String) -> Bool { + switch name { + case "vnd.onvif.metadata": + return true + default: + return false + } + } + private func publicAudioCodec(from encoding: String) -> PublicAudioCodec { switch encoding { case "mpeg4-generic": return .aac From cff0e08dd4b88811d5cebb248229944b2c21f1b3 Mon Sep 17 00:00:00 2001 From: Anees Iqbal Date: Wed, 20 May 2026 05:09:43 +0200 Subject: [PATCH 3/7] :white_check_mark: Add ApplicationDepacketizer tests Cover the happy path (single + multi-packet documents, back-to-back documents), loss handling (initial, between, mid-document), the 1 MiB overflow cap with diagnostic rate-limiting and recovery, and the empty-payload edge cases at idle and mid-document. --- .../ApplicationDepacketizerTests.swift | 212 ++++++++++++++++++ 1 file changed, 212 insertions(+) create mode 100644 Tests/IPCamKitTests/ApplicationDepacketizerTests.swift diff --git a/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift b/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift new file mode 100644 index 0000000..7d783c2 --- /dev/null +++ b/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift @@ -0,0 +1,212 @@ +// Copyright (c) 2025 Steel Brain +// SPDX-License-Identifier: MIT +// Tests for ApplicationDepacketizer (ONVIF metadata RTP payload). + +import Foundation +import Testing + +@testable import IPCamKit + +@Suite("Application Depacketizer Tests") +struct ApplicationDepacketizerTests { + + // MARK: - Helpers + + private func ts(_ value: UInt32) -> Timestamp { + Timestamp(timestamp: Int64(value), clockRate: 90_000, start: 0)! + } + + private func makeMetadataPacket( + seq: UInt16, timestamp: UInt32, mark: Bool, loss: UInt16 = 0, + payload: Data + ) -> ReceivedRTPPacket { + let builder = ReceivedPacketBuilder( + ctx: .dummy, streamId: 0, sequenceNumber: seq, + timestamp: ts(timestamp), payloadType: 107, ssrc: 0x12_34_56_78, + mark: mark, loss: loss) + return try! builder.build(payload: payload).get() + } + + private func pullFrame( + _ d: inout ApplicationDepacketizer, _ comment: Comment? = nil + ) -> MetadataFrame? { + guard let result = d.pull() else { return nil } + switch result { + case .success(.metadataFrame(let frame)): + return frame + default: + Issue.record(comment ?? "Expected metadataFrame, got \(result)") + return nil + } + } + + // MARK: - Happy path + + @Test("Single-packet document emits one frame with marker bit") + func singlePacketDocument() throws { + var d = ApplicationDepacketizer() + let xml = Data("".utf8) + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: true, payload: xml)) + let frame = pullFrame(&d) + #expect(frame?.data == xml) + #expect(frame?.loss == 0) + #expect(frame?.timestamp == ts(1000)) + #expect(d.pull() == nil) + } + + @Test("Multi-packet document concatenates payload and emits on marker") + func multiPacketDocument() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: false, payload: Data("".utf8))) + #expect(d.pull() == nil) + try d.push(makeMetadataPacket(seq: 1, timestamp: 1000, mark: false, payload: Data("body".utf8))) + #expect(d.pull() == nil) + try d.push(makeMetadataPacket(seq: 2, timestamp: 1000, mark: true, payload: Data("".utf8))) + let frame = pullFrame(&d) + #expect(frame?.data == Data("body".utf8)) + #expect(frame?.loss == 0) + // Timestamp reflects the last packet (marker packet). + #expect(frame?.timestamp == ts(1000)) + #expect(d.pull() == nil) + } + + @Test("Two consecutive documents emit independently") + func twoConsecutiveDocuments() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: true, payload: Data("doc1".utf8))) + let f1 = pullFrame(&d) + #expect(f1?.data == Data("doc1".utf8)) + #expect(f1?.loss == 0) + try d.push(makeMetadataPacket(seq: 1, timestamp: 91_000, mark: true, payload: Data("doc2".utf8))) + let f2 = pullFrame(&d) + #expect(f2?.data == Data("doc2".utf8)) + #expect(f2?.loss == 0) + } + + // MARK: - Loss + + @Test("Initial loss surfaces on the first emitted frame") + func initialLossOnFirstFrame() throws { + var d = ApplicationDepacketizer() + try d.push( + makeMetadataPacket(seq: 3, timestamp: 1000, mark: true, loss: 3, payload: Data("ok".utf8))) + let frame = pullFrame(&d) + #expect(frame?.loss == 3) + #expect(frame?.data == Data("ok".utf8)) + } + + @Test("Loss between two complete documents is reported on the second") + func lossBetweenDocuments() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: true, payload: Data("a".utf8))) + #expect(pullFrame(&d)?.loss == 0) + try d.push( + makeMetadataPacket(seq: 5, timestamp: 2000, mark: true, loss: 4, payload: Data("b".utf8))) + let frame = pullFrame(&d) + #expect(frame?.loss == 4) + #expect(frame?.data == Data("b".utf8)) + } + + @Test("Mid-document loss discards prefix and carries loss to next clean frame") + func midDocumentLossDiscards() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: false, payload: Data("aaa".utf8))) + // Loss in the middle of the document — the buffered "aaa" is unusable. + try d.push( + makeMetadataPacket(seq: 2, timestamp: 1000, mark: false, loss: 1, payload: Data("bbb".utf8))) + // Marker closes the unusable document — no frame emitted. + try d.push(makeMetadataPacket(seq: 3, timestamp: 1000, mark: true, payload: Data("ccc".utf8))) + #expect(d.pull() == nil) + // Next clean document carries the loss=1. + try d.push(makeMetadataPacket(seq: 4, timestamp: 2000, mark: true, payload: Data("ok".utf8))) + let frame = pullFrame(&d) + #expect(frame?.loss == 1) + #expect(frame?.data == Data("ok".utf8)) + } + + // MARK: - Cap and recovery + + @Test("Document over cap fires diagnostic, drops, and recovers on next marker") + func overflowDiagnosticAndRecovery() throws { + final class Box: @unchecked Sendable { + var diagnostics: [RTSPDiagnostic] = [] + } + let box = Box() + var d = ApplicationDepacketizer { box.diagnostics.append($0) } + + // RTP packets are capped at 64 KiB by the transport, so simulate + // overflow with many 60 KiB unmarked packets that together exceed + // the 1 MiB depacketizer cap. + let chunk = Data(repeating: 0x42, count: 60_000) + for seq in 0..<20 { + try d.push( + makeMetadataPacket(seq: UInt16(seq), timestamp: 1000, mark: false, payload: chunk)) + } + #expect(d.pull() == nil) + #expect(box.diagnostics.count == 1) + #expect(box.diagnostics.first?.severity == .warning) + + // Marker on the abandoned document — no emit, depacketizer resets. + try d.push( + makeMetadataPacket(seq: 100, timestamp: 1000, mark: true, payload: Data("end".utf8))) + #expect(d.pull() == nil) + + // Next clean document emits normally. + try d.push( + makeMetadataPacket(seq: 101, timestamp: 2000, mark: true, payload: Data("clean".utf8))) + let frame = pullFrame(&d) + #expect(frame?.data == Data("clean".utf8)) + } + + @Test("Repeated overflow within one in-flight document emits only one warning") + func overflowWarningIsRateLimited() throws { + final class Box: @unchecked Sendable { + var diagnostics: [RTSPDiagnostic] = [] + } + let box = Box() + var d = ApplicationDepacketizer { box.diagnostics.append($0) } + + let chunk = Data(repeating: 0x00, count: 60_000) + // First overflow cycle: accumulate past 1 MiB, see one warning. + for seq in 0..<20 { + try d.push( + makeMetadataPacket(seq: UInt16(seq), timestamp: 1000, mark: false, payload: chunk)) + } + #expect(box.diagnostics.count == 1) + + // Continue piling on more packets in the same in-flight document — + // each one is dropped while waiting for the marker, no new warnings. + for seq in 20..<40 { + try d.push( + makeMetadataPacket(seq: UInt16(seq), timestamp: 1000, mark: false, payload: chunk)) + } + #expect(box.diagnostics.count == 1) + + // Marker resets the depacketizer. A fresh overflow then fires a new warning. + try d.push( + makeMetadataPacket(seq: 100, timestamp: 1000, mark: true, payload: Data("end".utf8))) + for seq in 101..<121 { + try d.push( + makeMetadataPacket(seq: UInt16(seq), timestamp: 2000, mark: false, payload: chunk)) + } + #expect(box.diagnostics.count == 2) + } + + // MARK: - Edge cases + + @Test("Marker on empty payload at idle emits nothing") + func emptyPayloadWithMarkerAtIdle() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: true, payload: Data())) + #expect(d.pull() == nil) + } + + @Test("Marker on empty payload mid-document still emits buffered bytes") + func emptyPayloadMarkerEmitsBufferedBytes() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: false, payload: Data("xyz".utf8))) + try d.push(makeMetadataPacket(seq: 1, timestamp: 1000, mark: true, payload: Data())) + let frame = pullFrame(&d) + #expect(frame?.data == Data("xyz".utf8)) + } +} From 5b08c1ff9fabb3d5c3a8eaf6210a12416a5629ca Mon Sep 17 00:00:00 2001 From: Anees Iqbal Date: Wed, 20 May 2026 05:19:24 +0200 Subject: [PATCH 4/7] :memo: Document metadata stream support in README Adds the feature bullet, a .metadata arm in the usage example, a Metadata subsection, and an architecture-diagram tweak. Also catches up the test count (90 -> 103) that was already stale on main. --- README.md | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index a58431a..32e7352 100644 --- a/README.md +++ b/README.md @@ -4,6 +4,7 @@ A pure-Swift RTSP client library for streaming live video and audio from IP came - **H.264 and H.265/HEVC video** — depacketized to AVCC format, ready for VideoToolbox - **Audio** — AAC, PCMU, PCMA, G.722, G.726, L16, G.723.1 +- **ONVIF analytics metadata** — raw XML documents from the camera's `application` RTSP stream - **Zero dependencies** — only Foundation, Network, and CryptoKit - **Swift 6** — strict concurrency with async/await and AsyncThrowingStream @@ -59,6 +60,10 @@ for try await item in session.frames() { // frame.data — raw audio bytes (codec-specific) // frame.codec, frame.sampleRate, frame.channels, frame.timestamp break + case .metadata(let frame): + // frame.data — raw payload (typically ONVIF XML, possibly GZIP-compressed) + // frame.encodingName, frame.timestamp, frame.loss + break case .rtcp: break } @@ -81,6 +86,11 @@ See [API.md](API.md) for the full API reference. - AAC (RFC 3640) with aggregation and fragmentation - PCMU (G.711 u-law), PCMA (G.711 A-law), L16, G.722, G.726, DVI4, G.723.1 +### Metadata +- ONVIF analytics metadata (`vnd.onvif.metadata`) per the ONVIF Streaming Specification +- Concatenates RTP payload fragments and emits a frame on the marker bit (end-of-document) +- Best-effort: malformed metadata SDP degrades to a diagnostic without aborting video/audio + ### Protocol - RTSP session management (DESCRIBE, SETUP, PLAY, TEARDOWN) - RTSP message parsing and serialization @@ -114,7 +124,7 @@ Sources/IPCamKit/ ├── RTSP/ RTSP message model, parser, serializer ├── SDP/ SDP session description parser (RFC 8866) ├── RTP/ RTP/RTCP packets, Timeline, ChannelMapping, InorderParser -├── Codec/ H.264/H.265 depacketizers, NAL/SPS/PPS parsing, audio depacketizers +├── Codec/ H.264/H.265 depacketizers, NAL/SPS/PPS parsing, audio + metadata depacketizers ├── Auth/ Basic and Digest authentication ├── Transport/ NWConnection TCP/UDP transport └── Client/ RTSP session, DESCRIBE/SETUP/PLAY parsers, Presentation @@ -122,7 +132,7 @@ Sources/IPCamKit/ ## Testing -90 tests across 15 suites covering RTSP parsing, SDP, RTP, H.264/H.265 depacketization, AAC, simple audio, authentication, and integration: +103 tests across 16 suites covering RTSP parsing, SDP, RTP, H.264/H.265 depacketization, AAC, simple audio, ONVIF metadata depacketization, authentication, and integration: ```bash swift test From cd42f2e55156b62ef97e7fd7320f39a10602de75 Mon Sep 17 00:00:00 2001 From: Anees Iqbal Date: Wed, 20 May 2026 05:20:09 +0200 Subject: [PATCH 5/7] :memo: Soften README test count to avoid drift [ci skip] --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 32e7352..ff2c66f 100644 --- a/README.md +++ b/README.md @@ -132,7 +132,7 @@ Sources/IPCamKit/ ## Testing -103 tests across 16 suites covering RTSP parsing, SDP, RTP, H.264/H.265 depacketization, AAC, simple audio, ONVIF metadata depacketization, authentication, and integration: +100+ tests across 15+ suites covering RTSP parsing, SDP, RTP, H.264/H.265 depacketization, AAC, simple audio, ONVIF metadata depacketization, authentication, and integration: ```bash swift test From 415c9fbb29f1ca96190f28c94178106568f12a82 Mon Sep 17 00:00:00 2001 From: Anees Iqbal Date: Wed, 20 May 2026 05:35:16 +0200 Subject: [PATCH 6/7] :art: Harden metadata pipeline from Opus review - Add precondition(ready == nil) to ApplicationDepacketizer.push so push-without-drain is a hard failure rather than silent data loss. - Nil out lastTimestamp on all drop paths so the "buffer non-empty iff lastTimestamp set" invariant is local rather than global. - Wrap application SETUP in do/catch so a 4xx response degrades to a diagnostic instead of aborting the whole session. - Expose metadataEncoding on SessionDescription so consumers can discover metadata availability without waiting for the first packet. - Add depacketizer test for the marker-packet-carries-loss path. --- README.md | 1 + Sources/IPCamKit/Client/RTSPSession.swift | 61 ++++++++++++------- .../Codec/ApplicationDepacketizer.swift | 5 ++ .../ApplicationDepacketizerTests.swift | 16 +++++ 4 files changed, 62 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index ff2c66f..96f3cf8 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ let session = RTSPClientSession( let desc = try await session.start() // desc.videoCodec, desc.resolution, desc.sps, desc.pps, desc.vps // desc.audioCodec, desc.audioSampleRate, desc.audioChannels +// desc.metadataEncoding — non-nil if an ONVIF metadata stream is active // Consume depacketized frames for try await item in session.frames() { diff --git a/Sources/IPCamKit/Client/RTSPSession.swift b/Sources/IPCamKit/Client/RTSPSession.swift index e562054..eb9c251 100644 --- a/Sources/IPCamKit/Client/RTSPSession.swift +++ b/Sources/IPCamKit/Client/RTSPSession.swift @@ -68,6 +68,10 @@ public struct SessionDescription: Sendable { public let audioChannels: UInt16? /// Codec-specific extra data (e.g. AudioSpecificConfig for AAC). public let audioExtraData: Data? + + /// SDP encoding name of the analytics-metadata stream if one was set up + /// (e.g. `vnd.onvif.metadata`), or `nil` if no metadata stream is active. + public let metadataEncoding: String? } /// RTSP client session that manages the full RTSP lifecycle. @@ -397,13 +401,16 @@ actor SessionState { } // Find and SETUP application (metadata) stream — optional, best-effort. - let applicationIdx = presMut.streams.firstIndex(where: { s in + // If SETUP fails (camera advertises the stream but rejects it, or any + // transport error), disable metadata locally rather than aborting the + // session. The channel slot stays assigned (no other streams follow). + var applicationIdx = presMut.streams.firstIndex(where: { s in s.media == "application" && isApplicationEncodingSupported(s.encodingName) }) var applicationSetupSSRC: UInt32? - if let applicationIdx = applicationIdx { - let applicationStream = presMut.streams[applicationIdx] + if let idx = applicationIdx { + let applicationStream = presMut.streams[idx] let applicationSetupURL = applicationStream.control ?? url var applicationSetupHeaders: [(String, String)] = [] if transport == .tcp { @@ -414,34 +421,45 @@ actor SessionState { "RTP/AVP/TCP;unicast;interleaved=\(applicationChannelId)-\(applicationChannelId + 1)" )) try channelMappings.assign( - channelId: applicationChannelId, streamIndex: applicationIdx) + channelId: applicationChannelId, streamIndex: idx) } else { applicationSetupHeaders.append(("Transport", "RTP/AVP;unicast")) } if let sid = sessionId { applicationSetupHeaders.append(("Session", sid)) } - let applicationSetupResp = try await sendRequest( - method: .setup, url: applicationSetupURL, - extraHeaders: applicationSetupHeaders) - let applicationSetup = try parseSetup(response: applicationSetupResp) - if let prev = sessionId, prev != applicationSetup.session.id { + + do { + let applicationSetupResp = try await sendRequest( + method: .setup, url: applicationSetupURL, + extraHeaders: applicationSetupHeaders) + let applicationSetup = try parseSetup(response: applicationSetupResp) + if let prev = sessionId, prev != applicationSetup.session.id { + onDiagnostic?( + RTSPDiagnostic( + severity: .warning, + message: + "Camera issued a new Session ID at application SETUP " + + "(\(prev) -> \(applicationSetup.session.id)); rolling forward.")) + } + sessionId = applicationSetup.session.id + applicationSetupSSRC = applicationSetup.ssrc + presMut.streams[idx].state = .setup( + StreamStateInit( + ssrc: applicationSetup.ssrc, initialSeq: nil, + initialRtptime: nil, ctx: .dummy)) + + applicationStreamIndex = idx + applicationEncodingName = applicationStream.encodingName + } catch { onDiagnostic?( RTSPDiagnostic( severity: .warning, message: - "Camera issued a new Session ID at application SETUP " - + "(\(prev) -> \(applicationSetup.session.id)); rolling forward.")) + "Application SETUP failed: \(error); " + + "metadata will not be delivered.")) + applicationIdx = nil } - sessionId = applicationSetup.session.id - applicationSetupSSRC = applicationSetup.ssrc - presMut.streams[applicationIdx].state = .setup( - StreamStateInit( - ssrc: applicationSetup.ssrc, initialSeq: nil, - initialRtptime: nil, ctx: .dummy)) - - applicationStreamIndex = applicationIdx - applicationEncodingName = applicationStream.encodingName } // PLAY @@ -605,7 +623,8 @@ actor SessionState { audioCodec: resolvedAudioCodec, audioSampleRate: resolvedAudioRate, audioChannels: resolvedAudioChannels, - audioExtraData: audioDepacketizer?.audioParameters?.extraData + audioExtraData: audioDepacketizer?.audioParameters?.extraData, + metadataEncoding: applicationEncodingName ) } diff --git a/Sources/IPCamKit/Codec/ApplicationDepacketizer.swift b/Sources/IPCamKit/Codec/ApplicationDepacketizer.swift index 3c342ac..02c43c0 100644 --- a/Sources/IPCamKit/Codec/ApplicationDepacketizer.swift +++ b/Sources/IPCamKit/Codec/ApplicationDepacketizer.swift @@ -47,6 +47,7 @@ struct ApplicationDepacketizer: Sendable { } mutating func push(_ pkt: ReceivedRTPPacket) throws { + precondition(ready == nil, "push() called before pull() drained the previous frame") pendingLoss = pendingLoss + UInt32(pkt.loss) var skipAppend = false @@ -54,6 +55,7 @@ struct ApplicationDepacketizer: Sendable { if pkt.loss > 0 && !buffer.isEmpty { // Lost a packet mid-document — the buffered prefix is unusable. buffer.removeAll(keepingCapacity: true) + lastTimestamp = nil dropUntilMark = true } @@ -68,6 +70,7 @@ struct ApplicationDepacketizer: Sendable { warnedSinceMark = true } buffer.removeAll(keepingCapacity: true) + lastTimestamp = nil dropUntilMark = true skipAppend = true } @@ -85,6 +88,7 @@ struct ApplicationDepacketizer: Sendable { // marker on an empty payload). Carry `pendingLoss` forward. if dropUntilMark || buffer.isEmpty { buffer.removeAll(keepingCapacity: true) + lastTimestamp = nil dropUntilMark = false warnedSinceMark = false return @@ -94,6 +98,7 @@ struct ApplicationDepacketizer: Sendable { // `buffer` is non-empty — so the guard is unreachable in practice. guard let ts = lastTimestamp else { buffer.removeAll(keepingCapacity: true) + dropUntilMark = false warnedSinceMark = false return } diff --git a/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift b/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift index 7d783c2..325fa46 100644 --- a/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift +++ b/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift @@ -107,6 +107,22 @@ struct ApplicationDepacketizerTests { #expect(frame?.data == Data("b".utf8)) } + @Test("Loss on the marker packet with a non-empty prefix drops the doc") + func lossOnMarkerWithPrefix() throws { + var d = ApplicationDepacketizer() + try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: false, payload: Data("aaa".utf8))) + // Marker packet itself carries loss > 0 — the prefix is unusable and + // we can't trust the marker packet's payload either. + try d.push( + makeMetadataPacket(seq: 2, timestamp: 1000, mark: true, loss: 2, payload: Data("bbb".utf8))) + #expect(d.pull() == nil) + // Loss is carried to the next clean document. + try d.push(makeMetadataPacket(seq: 3, timestamp: 2000, mark: true, payload: Data("ok".utf8))) + let frame = pullFrame(&d) + #expect(frame?.loss == 2) + #expect(frame?.data == Data("ok".utf8)) + } + @Test("Mid-document loss discards prefix and carries loss to next clean frame") func midDocumentLossDiscards() throws { var d = ApplicationDepacketizer() From c79edbb6cfbc422f11a76764391ac777adf4db4a Mon Sep 17 00:00:00 2001 From: Anees Iqbal Date: Wed, 20 May 2026 16:12:59 +0200 Subject: [PATCH 7/7] :shirt: Wrap long lines in ApplicationDepacketizerTests --- Tests/IPCamKitTests/ApplicationDepacketizerTests.swift | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift b/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift index 325fa46..f0a33d1 100644 --- a/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift +++ b/Tests/IPCamKitTests/ApplicationDepacketizerTests.swift @@ -57,11 +57,13 @@ struct ApplicationDepacketizerTests { @Test("Multi-packet document concatenates payload and emits on marker") func multiPacketDocument() throws { var d = ApplicationDepacketizer() - try d.push(makeMetadataPacket(seq: 0, timestamp: 1000, mark: false, payload: Data("".utf8))) + try d.push( + makeMetadataPacket(seq: 0, timestamp: 1000, mark: false, payload: Data("".utf8))) #expect(d.pull() == nil) try d.push(makeMetadataPacket(seq: 1, timestamp: 1000, mark: false, payload: Data("body".utf8))) #expect(d.pull() == nil) - try d.push(makeMetadataPacket(seq: 2, timestamp: 1000, mark: true, payload: Data("".utf8))) + try d.push( + makeMetadataPacket(seq: 2, timestamp: 1000, mark: true, payload: Data("".utf8))) let frame = pullFrame(&d) #expect(frame?.data == Data("body".utf8)) #expect(frame?.loss == 0) @@ -77,7 +79,8 @@ struct ApplicationDepacketizerTests { let f1 = pullFrame(&d) #expect(f1?.data == Data("doc1".utf8)) #expect(f1?.loss == 0) - try d.push(makeMetadataPacket(seq: 1, timestamp: 91_000, mark: true, payload: Data("doc2".utf8))) + try d.push( + makeMetadataPacket(seq: 1, timestamp: 91_000, mark: true, payload: Data("doc2".utf8))) let f2 = pullFrame(&d) #expect(f2?.data == Data("doc2".utf8)) #expect(f2?.loss == 0)