Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion API.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,29 @@ init(
url: String,
credentials: Credentials? = nil,
transport: Transport = .tcp,
userAgent: String = "IPCamKit")
userAgent: String = "IPCamKit",
onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? = nil)

func start() async throws -> SessionDescription
func frames() -> AsyncThrowingStream<PublicCodecItem, Error>
func stop() async
```

### RTSPDiagnostic

Non-fatal anomalies observed during a session (e.g. cameras deviating from spec). The
stream stays alive; the callback is purely observational. `severity == .error` here
means real damage (e.g. dropped data) but the stream continues — distinct from a
thrown `RTSPError`, which means the stream is dead.

```swift
struct RTSPDiagnostic: Sendable {
enum Severity: Sendable { case info, warning, error }
let severity: Severity
let message: String
}
```

### Credentials

```swift
Expand Down
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,23 @@

- Remove `SessionIdPolicy` enum and the `sessionIdPolicy:` parameter from `RTSPClientSession.init`. Audio SETUP responses that return a different session ID are now always accepted (latest wins) instead of being a configurable choice.

### New

- `onDiagnostic` callback on `RTSPClientSession.init` for observing non-fatal anomalies (e.g. cameras deviating from spec). Emits `RTSPDiagnostic` values with `info` / `warning` / `error` severity. Initial events:
- `warning` when a camera issues a different Session ID at audio SETUP than at video SETUP.
- `warning` when an empty video RTP payload is received and skipped.
- `warning` when an out-of-order RTP packet is received on TCP-interleaved transport (and dropped).

### Improvements

- Add iOS 16, tvOS 16, and macCatalyst 16 to supported platforms (Thanks @brientim)
- Lower macOS minimum from 14 to 13

### Fixes

- Stop tearing down the video stream when a camera emits an empty (or, for H.265, sub-2-byte) RTP payload. Such packets are now skipped — matches GStreamer / Live555 behavior.
- Stop tearing down the session when an out-of-order RTP packet arrives on TCP-interleaved transport. Packet is now dropped to match UDP behavior, matching FFmpeg / GStreamer / Live555 / ExoPlayer (none of which abort on this case). TCP byte-stream ordering does not imply RTP-sequence ordering — buggy camera packetizers can write packets out-of-order before muxing.

## 0.1.1

### Improvements
Expand Down
52 changes: 47 additions & 5 deletions Sources/IPCamKit/Client/RTSPSession.swift
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,26 @@ public enum PublicAudioCodec: Sendable {
case other(String)
}

/// Diagnostic event emitted by `RTSPClientSession` when a non-fatal anomaly is observed.
///
/// Use the `onDiagnostic` callback on `RTSPClientSession.init` to receive these.
/// `severity == .error` here means real damage (e.g. dropped data) while the stream
/// is still alive — distinct from a thrown `RTSPError`, which means the stream is dead.
public struct RTSPDiagnostic: Sendable {
public enum Severity: Sendable {
case info
case warning
case error
}
public let severity: Severity
public let message: String

public init(severity: Severity, message: String) {
self.severity = severity
self.message = message
}
}

/// Parsed session description returned from `start()`.
public struct SessionDescription: Sendable {
public let videoCodec: VideoCodec
Expand Down Expand Up @@ -66,18 +86,21 @@ public final class RTSPClientSession: Sendable {
private let credentials: Credentials?
private let transport: Transport
private let userAgent: String
private let onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)?
private let state: SessionState

public init(
url: String,
credentials: Credentials? = nil,
transport: Transport = .tcp,
userAgent: String = "IPCamKit"
userAgent: String = "IPCamKit",
onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? = nil
) {
self.url = url
self.credentials = credentials
self.transport = transport
self.userAgent = userAgent
self.onDiagnostic = onDiagnostic
self.state = SessionState()
}

Expand All @@ -89,7 +112,8 @@ public final class RTSPClientSession: Sendable {
url: url,
credentials: credentials,
transport: transport,
userAgent: userAgent
userAgent: userAgent,
onDiagnostic: onDiagnostic
)
}

Expand Down Expand Up @@ -229,13 +253,16 @@ actor SessionState {
private var inorderParsers: [Int: InorderParser] = [:]
private var userAgent: String?
private var isPlaying = false
private var onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)?

func start(
url: String,
credentials: Credentials?,
transport: Transport,
userAgent: String
userAgent: String,
onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)?
) async throws -> SessionDescription {
self.onDiagnostic = onDiagnostic
// Parse URL
guard let urlComponents = URLComponents(string: url) else {
throw RTSPError.connectionFailed("Invalid URL: \(url)")
Expand Down Expand Up @@ -329,6 +356,14 @@ actor SessionState {
method: .setup, url: audioSetupURL,
extraHeaders: audioSetupHeaders)
let audioSetup = try parseSetup(response: audioSetupResp)
if let prev = sessionId, prev != audioSetup.session.id {
onDiagnostic?(
RTSPDiagnostic(
severity: .warning,
message:
"Camera issued a new Session ID at audio SETUP "
+ "(\(prev) -> \(audioSetup.session.id)); rolling forward."))
}
sessionId = audioSetup.session.id
audioSetupSSRC = audioSetup.ssrc
presMut.streams[audioIdx].state = .setup(
Expand Down Expand Up @@ -382,7 +417,7 @@ actor SessionState {
let timeline = try Timeline(start: videoStart, clockRate: stream.clockRateHz)
inorderParsers[videoIdx] = InorderParser(
ssrc: videoSsrc, nextSeq: videoSeq, isTcp: transport == .tcp,
timeline: timeline)
timeline: timeline, onDiagnostic: onDiagnostic)

// Initialize audio depacketizer and inorder parser
var resolvedAudioCodec: PublicAudioCodec?
Expand Down Expand Up @@ -415,7 +450,8 @@ actor SessionState {
start: audioStart, clockRate: audioStream.clockRateHz)
inorderParsers[audioIdx] = InorderParser(
ssrc: resolvedAudioSsrc, nextSeq: audioSeq,
isTcp: transport == .tcp, timeline: audioTimeline)
isTcp: transport == .tcp, timeline: audioTimeline,
onDiagnostic: onDiagnostic)
resolvedAudioCodec = publicAudioCodec(
from: audioStream.encodingName)
resolvedAudioRate = audioStream.clockRateHz
Expand Down Expand Up @@ -489,6 +525,12 @@ actor SessionState {
data: interleaved.data, ctx: .dummy,
streamId: mapping.streamIndex, streamCtx: .dummy)
{
if pkt.payload.isEmpty {
onDiagnostic?(
RTSPDiagnostic(
severity: .warning,
message: "Empty video RTP payload from camera; packet skipped."))
}
do {
try depkt.push(pkt)
} catch {
Expand Down
2 changes: 1 addition & 1 deletion Sources/IPCamKit/Codec/H264Depacketizer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ struct H264Depacketizer: Sendable {
// Parse NAL header from payload
let payload = pkt.payload
guard !payload.isEmpty else {
return .failure(DepacketizeError("Empty NAL"))
return .success(())
}

let nalHeaderByte = payload[payload.startIndex]
Expand Down
2 changes: 1 addition & 1 deletion Sources/IPCamKit/Codec/H265Depacketizer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ struct H265Depacketizer: Sendable {
// Parse 2-byte NAL header from payload
let payload = pkt.payload
guard payload.count >= 2 else {
return .failure(DepacketizeError("Short NAL"))
return .success(())
}
let hdr: H265NALHeader
do {
Expand Down
20 changes: 14 additions & 6 deletions Sources/IPCamKit/RTP/InorderParser.swift
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ enum UnknownRtcpSsrcPolicy: Sendable {
/// Handles:
/// - SSRC validation (reject mismatched SSRCs)
/// - Sequence number tracking and loss detection
/// - Out-of-order packet detection (skip on UDP, error on TCP)
/// - Out-of-order packet detection (drop, emit diagnostic on TCP)
/// - Geovision PT=50 quirk (silently drop)
/// - RTCP compound packet validation and SR timestamp placement
struct InorderParser: Sendable {
Expand All @@ -30,6 +30,7 @@ struct InorderParser: Sendable {
var timeline: Timeline
private var unknownRtcpSsrcPolicy: UnknownRtcpSsrcPolicy
private var seenUnknownRtcpSession: Bool = false
private let onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)?

/// Number of RTP packets seen.
private(set) var seenRtpPackets: UInt64 = 0
Expand All @@ -39,13 +40,15 @@ struct InorderParser: Sendable {

init(
ssrc: UInt32?, nextSeq: UInt16?, isTcp: Bool, timeline: Timeline,
unknownRtcpSsrcPolicy: UnknownRtcpSsrcPolicy = .dropPackets
unknownRtcpSsrcPolicy: UnknownRtcpSsrcPolicy = .dropPackets,
onDiagnostic: (@Sendable (RTSPDiagnostic) -> Void)? = nil
) {
self.ssrc = ssrc
self.nextSeq = nextSeq
self.isTcp = isTcp
self.timeline = timeline
self.unknownRtcpSsrcPolicy = unknownRtcpSsrcPolicy
self.onDiagnostic = onDiagnostic
}

/// Process an incoming RTP packet.
Expand Down Expand Up @@ -87,12 +90,17 @@ struct InorderParser: Sendable {
if let expected = nextSeq {
let delta = raw.sequenceNumber &- expected
if delta > 0x8000 {
// Out of order
// Out of order. UDP reordering is normal and stays silent; TCP-interleaved
// reordering means the camera's packetizer wrote sequence-numbers out of
// order before muxing, which is camera misbehavior worth surfacing.
if isTcp {
throw RTSPError.depacketizationError(
"Out-of-order packet on TCP: seq=\(raw.sequenceNumber), expected=\(expected)")
onDiagnostic?(
RTSPDiagnostic(
severity: .warning,
message:
"Out-of-order RTP packet on TCP-interleaved transport: "
+ "seq=\(raw.sequenceNumber), expected=\(expected); packet dropped."))
}
// UDP: silently drop out-of-order packets
return nil
}
loss = delta
Expand Down
30 changes: 30 additions & 0 deletions Tests/IPCamKitTests/H264DepacketizerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -666,6 +666,36 @@ struct H264DepacketizerTests {
// Third pull: nothing
#expect(d.pull() == nil)
}

/// Empty RTP payload (zero bytes — no NAL header at all) is tolerated and
/// does not tear the stream down. Some camera firmwares emit such packets
/// as keep-alives or muxer artifacts; previously this surfaced as a
/// `DepacketizeError("Empty NAL")` that propagated up and ended the session.
/// See CHANGELOG 0.2.0.
@Test("Empty RTP payload tolerated")
func emptyRTPPayload() throws {
var d = try H264Depacketizer(clockRate: 90000, formatSpecificParams: dahuaFmtp)

// Empty payload — must not throw.
try d.push(makePacket(seq: 0, timestamp: ts0, mark: false, payload: Data()))
#expect(d.pull() == nil)

// A subsequent valid packet still produces a frame.
try d.push(
makePacket(
seq: 1, timestamp: ts0, mark: true, payload: Data([0x06]) + Data("plain".utf8)))

guard case .success(.videoFrame(let frame)) = d.pull() else {
Issue.record("Expected video frame after empty packet")
return
}

var expected = Data()
expected.append(contentsOf: [0x00, 0x00, 0x00, 0x06, 0x06])
expected.append(Data("plain".utf8))
#expect(frame.data == expected)
#expect(d.pull() == nil)
}
}

// MARK: - Test Data
Expand Down
17 changes: 17 additions & 0 deletions Tests/IPCamKitTests/H265DepacketizerTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,23 @@ struct H265DepacketizerTests {
#expect(p.genericParameters.pixelDimensions?.width == 2304)
#expect(p.genericParameters.pixelDimensions?.height == 1296)
}

/// Short RTP payloads (zero or one byte — too short for the 2-byte H.265 NAL
/// header) are tolerated and do not tear the stream down. Previously this
/// surfaced as a `DepacketizeError("Short NAL")` that propagated up and
/// ended the session. See CHANGELOG 0.2.0.
@Test("Short RTP payload tolerated")
func shortRTPPayload() throws {
var d = try H265Depacketizer(clockRate: 90000, formatSpecificParams: nil)

// Zero-byte payload — must not throw.
try d.push(makeH265Packet(seq: 0, timestamp: h265Ts0, mark: false, payload: Data()))
#expect(d.pull() == nil)

// One-byte payload (insufficient for H.265's 2-byte NAL header) — must not throw.
try d.push(makeH265Packet(seq: 1, timestamp: h265Ts0, mark: false, payload: Data([0x40])))
#expect(d.pull() == nil)
}
}

// MARK: - NAL Tests
Expand Down
54 changes: 54 additions & 0 deletions Tests/IPCamKitTests/RTPTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,58 @@ struct InorderParserTests {
#expect(result3 != nil)
#expect(result3!.timestamp.elapsed == 1) // delta from start=2 to 3
}

/// Out-of-order packets on TCP-interleaved transport are dropped (matching UDP)
/// and surfaced via the onDiagnostic callback as `.warning`. Previously this case
/// threw and tore down the session — see CHANGELOG 0.2.0.
@Test("Out-of-order packets dropped on TCP and diagnostic emitted")
func outOfOrderTcp() throws {
let captured = DiagnosticBox()

var timeline = try Timeline(start: nil, clockRate: 90_000, enforceMaxJumpSecs: nil)
var parser = InorderParser(
ssrc: 0x0D25_614E, nextSeq: nil, isTcp: true, timeline: timeline,
onDiagnostic: { captured.append($0) })

// Packet with seq=2 arrives first
let pkt1 = RTPPacketBuilder(
sequenceNumber: 2, timestamp: 2, payloadType: 96,
ssrc: 0x0D25_614E, mark: true)
let data1 = try pkt1.build(payload: Data("pkt 2".utf8)).get().data
let result1 = try parser.rtp(
data: data1, ctx: .dummy, streamId: 0, streamCtx: .dummy)
#expect(result1 != nil)
#expect(captured.events.isEmpty)

// Packet with seq=1 arrives late — must drop without throwing and emit a warning
let pkt2 = RTPPacketBuilder(
sequenceNumber: 1, timestamp: 1, payloadType: 96,
ssrc: 0x0D25_614E, mark: true)
let data2 = try pkt2.build(payload: Data("pkt 1".utf8)).get().data
let result2 = try parser.rtp(
data: data2, ctx: .dummy, streamId: 0, streamCtx: .dummy)
#expect(result2 == nil)
#expect(captured.events.count == 1)
#expect(captured.events.first?.severity == .warning)
#expect(captured.events.first?.message.contains("seq=1") == true)
#expect(captured.events.first?.message.contains("expected=3") == true)
}
}

/// Thread-safe collector for diagnostic events captured during a test.
private final class DiagnosticBox: @unchecked Sendable {
private let lock = NSLock()
private var _events: [RTSPDiagnostic] = []

func append(_ event: RTSPDiagnostic) {
lock.lock()
defer { lock.unlock() }
_events.append(event)
}

var events: [RTSPDiagnostic] {
lock.lock()
defer { lock.unlock() }
return _events
}
}
Loading