diff --git a/API.md b/API.md index 2ae6f55..6eaa88a 100644 --- a/API.md +++ b/API.md @@ -39,13 +39,29 @@ init( url: String, credentials: Credentials? = nil, transport: Transport = .tcp, - userAgent: String = "IPCamKit") + userAgent: String = "IPCamKit", + onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? = nil) func start() async throws -> SessionDescription func frames() -> AsyncThrowingStream func stop() async ``` +### RTSPDiagnostic + +Non-fatal anomalies observed during a session (e.g. cameras deviating from spec). The +stream stays alive; the callback is purely observational. `severity == .error` here +means real damage (e.g. dropped data) but the stream continues — distinct from a +thrown `RTSPError`, which means the stream is dead. + +```swift +struct RTSPDiagnostic: Sendable { + enum Severity: Sendable { case info, warning, error } + let severity: Severity + let message: String +} +``` + ### Credentials ```swift diff --git a/CHANGELOG.md b/CHANGELOG.md index d022b1d..8289175 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,11 +6,23 @@ - Remove `SessionIdPolicy` enum and the `sessionIdPolicy:` parameter from `RTSPClientSession.init`. Audio SETUP responses that return a different session ID are now always accepted (latest wins) instead of being a configurable choice. +### New + +- `onDiagnostic` callback on `RTSPClientSession.init` for observing non-fatal anomalies (e.g. cameras deviating from spec). Emits `RTSPDiagnostic` values with `info` / `warning` / `error` severity. Initial events: + - `warning` when a camera issues a different Session ID at audio SETUP than at video SETUP. + - `warning` when an empty video RTP payload is received and skipped. + - `warning` when an out-of-order RTP packet is received on TCP-interleaved transport (and dropped). + ### Improvements - Add iOS 16, tvOS 16, and macCatalyst 16 to supported platforms (Thanks @brientim) - Lower macOS minimum from 14 to 13 +### Fixes + +- Stop tearing down the video stream when a camera emits an empty (or, for H.265, sub-2-byte) RTP payload. Such packets are now skipped — matches GStreamer / Live555 behavior. +- Stop tearing down the session when an out-of-order RTP packet arrives on TCP-interleaved transport. Packet is now dropped to match UDP behavior, matching FFmpeg / GStreamer / Live555 / ExoPlayer (none of which abort on this case). TCP byte-stream ordering does not imply RTP-sequence ordering — buggy camera packetizers can write packets out-of-order before muxing. + ## 0.1.1 ### Improvements diff --git a/Sources/IPCamKit/Client/RTSPSession.swift b/Sources/IPCamKit/Client/RTSPSession.swift index 2e659e7..e99afe3 100644 --- a/Sources/IPCamKit/Client/RTSPSession.swift +++ b/Sources/IPCamKit/Client/RTSPSession.swift @@ -30,6 +30,26 @@ public enum PublicAudioCodec: Sendable { case other(String) } +/// Diagnostic event emitted by `RTSPClientSession` when a non-fatal anomaly is observed. +/// +/// Use the `onDiagnostic` callback on `RTSPClientSession.init` to receive these. +/// `severity == .error` here means real damage (e.g. dropped data) while the stream +/// is still alive — distinct from a thrown `RTSPError`, which means the stream is dead. +public struct RTSPDiagnostic: Sendable { + public enum Severity: Sendable { + case info + case warning + case error + } + public let severity: Severity + public let message: String + + public init(severity: Severity, message: String) { + self.severity = severity + self.message = message + } +} + /// Parsed session description returned from `start()`. public struct SessionDescription: Sendable { public let videoCodec: VideoCodec @@ -66,18 +86,21 @@ public final class RTSPClientSession: Sendable { private let credentials: Credentials? private let transport: Transport private let userAgent: String + private let onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? private let state: SessionState public init( url: String, credentials: Credentials? = nil, transport: Transport = .tcp, - userAgent: String = "IPCamKit" + userAgent: String = "IPCamKit", + onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? = nil ) { self.url = url self.credentials = credentials self.transport = transport self.userAgent = userAgent + self.onDiagnostic = onDiagnostic self.state = SessionState() } @@ -89,7 +112,8 @@ public final class RTSPClientSession: Sendable { url: url, credentials: credentials, transport: transport, - userAgent: userAgent + userAgent: userAgent, + onDiagnostic: onDiagnostic ) } @@ -229,13 +253,16 @@ actor SessionState { private var inorderParsers: [Int: InorderParser] = [:] private var userAgent: String? private var isPlaying = false + private var onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? func start( url: String, credentials: Credentials?, transport: Transport, - userAgent: String + userAgent: String, + onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? ) async throws -> SessionDescription { + self.onDiagnostic = onDiagnostic // Parse URL guard let urlComponents = URLComponents(string: url) else { throw RTSPError.connectionFailed("Invalid URL: \(url)") @@ -329,6 +356,14 @@ actor SessionState { method: .setup, url: audioSetupURL, extraHeaders: audioSetupHeaders) let audioSetup = try parseSetup(response: audioSetupResp) + if let prev = sessionId, prev != audioSetup.session.id { + onDiagnostic?( + RTSPDiagnostic( + severity: .warning, + message: + "Camera issued a new Session ID at audio SETUP " + + "(\(prev) -> \(audioSetup.session.id)); rolling forward.")) + } sessionId = audioSetup.session.id audioSetupSSRC = audioSetup.ssrc presMut.streams[audioIdx].state = .setup( @@ -382,7 +417,7 @@ actor SessionState { let timeline = try Timeline(start: videoStart, clockRate: stream.clockRateHz) inorderParsers[videoIdx] = InorderParser( ssrc: videoSsrc, nextSeq: videoSeq, isTcp: transport == .tcp, - timeline: timeline) + timeline: timeline, onDiagnostic: onDiagnostic) // Initialize audio depacketizer and inorder parser var resolvedAudioCodec: PublicAudioCodec? @@ -415,7 +450,8 @@ actor SessionState { start: audioStart, clockRate: audioStream.clockRateHz) inorderParsers[audioIdx] = InorderParser( ssrc: resolvedAudioSsrc, nextSeq: audioSeq, - isTcp: transport == .tcp, timeline: audioTimeline) + isTcp: transport == .tcp, timeline: audioTimeline, + onDiagnostic: onDiagnostic) resolvedAudioCodec = publicAudioCodec( from: audioStream.encodingName) resolvedAudioRate = audioStream.clockRateHz @@ -489,6 +525,12 @@ actor SessionState { data: interleaved.data, ctx: .dummy, streamId: mapping.streamIndex, streamCtx: .dummy) { + if pkt.payload.isEmpty { + onDiagnostic?( + RTSPDiagnostic( + severity: .warning, + message: "Empty video RTP payload from camera; packet skipped.")) + } do { try depkt.push(pkt) } catch { diff --git a/Sources/IPCamKit/Codec/H264Depacketizer.swift b/Sources/IPCamKit/Codec/H264Depacketizer.swift index 5d9babc..9650d0e 100644 --- a/Sources/IPCamKit/Codec/H264Depacketizer.swift +++ b/Sources/IPCamKit/Codec/H264Depacketizer.swift @@ -193,7 +193,7 @@ struct H264Depacketizer: Sendable { // Parse NAL header from payload let payload = pkt.payload guard !payload.isEmpty else { - return .failure(DepacketizeError("Empty NAL")) + return .success(()) } let nalHeaderByte = payload[payload.startIndex] diff --git a/Sources/IPCamKit/Codec/H265Depacketizer.swift b/Sources/IPCamKit/Codec/H265Depacketizer.swift index 1ab2adc..beaf855 100644 --- a/Sources/IPCamKit/Codec/H265Depacketizer.swift +++ b/Sources/IPCamKit/Codec/H265Depacketizer.swift @@ -169,7 +169,7 @@ struct H265Depacketizer: Sendable { // Parse 2-byte NAL header from payload let payload = pkt.payload guard payload.count >= 2 else { - return .failure(DepacketizeError("Short NAL")) + return .success(()) } let hdr: H265NALHeader do { diff --git a/Sources/IPCamKit/RTP/InorderParser.swift b/Sources/IPCamKit/RTP/InorderParser.swift index a5b0835..7bf1a1e 100644 --- a/Sources/IPCamKit/RTP/InorderParser.swift +++ b/Sources/IPCamKit/RTP/InorderParser.swift @@ -20,7 +20,7 @@ enum UnknownRtcpSsrcPolicy: Sendable { /// Handles: /// - SSRC validation (reject mismatched SSRCs) /// - Sequence number tracking and loss detection -/// - Out-of-order packet detection (skip on UDP, error on TCP) +/// - Out-of-order packet detection (drop, emit diagnostic on TCP) /// - Geovision PT=50 quirk (silently drop) /// - RTCP compound packet validation and SR timestamp placement struct InorderParser: Sendable { @@ -30,6 +30,7 @@ struct InorderParser: Sendable { var timeline: Timeline private var unknownRtcpSsrcPolicy: UnknownRtcpSsrcPolicy private var seenUnknownRtcpSession: Bool = false + private let onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? /// Number of RTP packets seen. private(set) var seenRtpPackets: UInt64 = 0 @@ -39,13 +40,15 @@ struct InorderParser: Sendable { init( ssrc: UInt32?, nextSeq: UInt16?, isTcp: Bool, timeline: Timeline, - unknownRtcpSsrcPolicy: UnknownRtcpSsrcPolicy = .dropPackets + unknownRtcpSsrcPolicy: UnknownRtcpSsrcPolicy = .dropPackets, + onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? = nil ) { self.ssrc = ssrc self.nextSeq = nextSeq self.isTcp = isTcp self.timeline = timeline self.unknownRtcpSsrcPolicy = unknownRtcpSsrcPolicy + self.onDiagnostic = onDiagnostic } /// Process an incoming RTP packet. @@ -87,12 +90,17 @@ struct InorderParser: Sendable { if let expected = nextSeq { let delta = raw.sequenceNumber &- expected if delta > 0x8000 { - // Out of order + // Out of order. UDP reordering is normal and stays silent; TCP-interleaved + // reordering means the camera's packetizer wrote sequence-numbers out of + // order before muxing, which is camera misbehavior worth surfacing. if isTcp { - throw RTSPError.depacketizationError( - "Out-of-order packet on TCP: seq=\(raw.sequenceNumber), expected=\(expected)") + onDiagnostic?( + RTSPDiagnostic( + severity: .warning, + message: + "Out-of-order RTP packet on TCP-interleaved transport: " + + "seq=\(raw.sequenceNumber), expected=\(expected); packet dropped.")) } - // UDP: silently drop out-of-order packets return nil } loss = delta diff --git a/Tests/IPCamKitTests/H264DepacketizerTests.swift b/Tests/IPCamKitTests/H264DepacketizerTests.swift index 55e93f5..9a5fc72 100644 --- a/Tests/IPCamKitTests/H264DepacketizerTests.swift +++ b/Tests/IPCamKitTests/H264DepacketizerTests.swift @@ -666,6 +666,36 @@ struct H264DepacketizerTests { // Third pull: nothing #expect(d.pull() == nil) } + + /// Empty RTP payload (zero bytes — no NAL header at all) is tolerated and + /// does not tear the stream down. Some camera firmwares emit such packets + /// as keep-alives or muxer artifacts; previously this surfaced as a + /// `DepacketizeError("Empty NAL")` that propagated up and ended the session. + /// See CHANGELOG 0.2.0. + @Test("Empty RTP payload tolerated") + func emptyRTPPayload() throws { + var d = try H264Depacketizer(clockRate: 90000, formatSpecificParams: dahuaFmtp) + + // Empty payload — must not throw. + try d.push(makePacket(seq: 0, timestamp: ts0, mark: false, payload: Data())) + #expect(d.pull() == nil) + + // A subsequent valid packet still produces a frame. + try d.push( + makePacket( + seq: 1, timestamp: ts0, mark: true, payload: Data([0x06]) + Data("plain".utf8))) + + guard case .success(.videoFrame(let frame)) = d.pull() else { + Issue.record("Expected video frame after empty packet") + return + } + + var expected = Data() + expected.append(contentsOf: [0x00, 0x00, 0x00, 0x06, 0x06]) + expected.append(Data("plain".utf8)) + #expect(frame.data == expected) + #expect(d.pull() == nil) + } } // MARK: - Test Data diff --git a/Tests/IPCamKitTests/H265DepacketizerTests.swift b/Tests/IPCamKitTests/H265DepacketizerTests.swift index e1fd712..63a7334 100644 --- a/Tests/IPCamKitTests/H265DepacketizerTests.swift +++ b/Tests/IPCamKitTests/H265DepacketizerTests.swift @@ -248,6 +248,23 @@ struct H265DepacketizerTests { #expect(p.genericParameters.pixelDimensions?.width == 2304) #expect(p.genericParameters.pixelDimensions?.height == 1296) } + + /// Short RTP payloads (zero or one byte — too short for the 2-byte H.265 NAL + /// header) are tolerated and do not tear the stream down. Previously this + /// surfaced as a `DepacketizeError("Short NAL")` that propagated up and + /// ended the session. See CHANGELOG 0.2.0. + @Test("Short RTP payload tolerated") + func shortRTPPayload() throws { + var d = try H265Depacketizer(clockRate: 90000, formatSpecificParams: nil) + + // Zero-byte payload — must not throw. + try d.push(makeH265Packet(seq: 0, timestamp: h265Ts0, mark: false, payload: Data())) + #expect(d.pull() == nil) + + // One-byte payload (insufficient for H.265's 2-byte NAL header) — must not throw. + try d.push(makeH265Packet(seq: 1, timestamp: h265Ts0, mark: false, payload: Data([0x40]))) + #expect(d.pull() == nil) + } } // MARK: - NAL Tests diff --git a/Tests/IPCamKitTests/RTPTests.swift b/Tests/IPCamKitTests/RTPTests.swift index cf0f4c3..bd49809 100644 --- a/Tests/IPCamKitTests/RTPTests.swift +++ b/Tests/IPCamKitTests/RTPTests.swift @@ -330,4 +330,58 @@ struct InorderParserTests { #expect(result3 != nil) #expect(result3!.timestamp.elapsed == 1) // delta from start=2 to 3 } + + /// Out-of-order packets on TCP-interleaved transport are dropped (matching UDP) + /// and surfaced via the onDiagnostic callback as `.warning`. Previously this case + /// threw and tore down the session — see CHANGELOG 0.2.0. + @Test("Out-of-order packets dropped on TCP and diagnostic emitted") + func outOfOrderTcp() throws { + let captured = DiagnosticBox() + + var timeline = try Timeline(start: nil, clockRate: 90_000, enforceMaxJumpSecs: nil) + var parser = InorderParser( + ssrc: 0x0D25_614E, nextSeq: nil, isTcp: true, timeline: timeline, + onDiagnostic: { captured.append($0) }) + + // Packet with seq=2 arrives first + let pkt1 = RTPPacketBuilder( + sequenceNumber: 2, timestamp: 2, payloadType: 96, + ssrc: 0x0D25_614E, mark: true) + let data1 = try pkt1.build(payload: Data("pkt 2".utf8)).get().data + let result1 = try parser.rtp( + data: data1, ctx: .dummy, streamId: 0, streamCtx: .dummy) + #expect(result1 != nil) + #expect(captured.events.isEmpty) + + // Packet with seq=1 arrives late — must drop without throwing and emit a warning + let pkt2 = RTPPacketBuilder( + sequenceNumber: 1, timestamp: 1, payloadType: 96, + ssrc: 0x0D25_614E, mark: true) + let data2 = try pkt2.build(payload: Data("pkt 1".utf8)).get().data + let result2 = try parser.rtp( + data: data2, ctx: .dummy, streamId: 0, streamCtx: .dummy) + #expect(result2 == nil) + #expect(captured.events.count == 1) + #expect(captured.events.first?.severity == .warning) + #expect(captured.events.first?.message.contains("seq=1") == true) + #expect(captured.events.first?.message.contains("expected=3") == true) + } +} + +/// Thread-safe collector for diagnostic events captured during a test. +private final class DiagnosticBox: @unchecked Sendable { + private let lock = NSLock() + private var _events: [RTSPDiagnostic] = [] + + func append(_ event: RTSPDiagnostic) { + lock.lock() + defer { lock.unlock() } + _events.append(event) + } + + var events: [RTSPDiagnostic] { + lock.lock() + defer { lock.unlock() } + return _events + } }