Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/change/direct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl<'a> DirectApi<'a> {

/// Start the SCTP over DTLS.
pub fn start_sctp(&mut self, client: bool) {
self.rtc.init_sctp(client)
self.rtc.init_sctp(client, None)
}

/// Create a new data channel.
Expand Down
167 changes: 156 additions & 11 deletions src/change/sdp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

use std::fmt;
use std::ops::{Deref, DerefMut};
use std::slice::Iter;

use crate::Rtc;
use crate::RtcError;
Expand Down Expand Up @@ -99,13 +100,15 @@ impl<'a> SdpApi<'a> {
// Ensure setup=active/passive is corresponding remote and init dtls.
init_dtls(self.rtc, &offer)?;

let remote_max_message_size = extract_max_message_size(offer.media_lines.iter());

// Modify session with offer
apply_offer(&mut self.rtc.session, offer)?;
apply_offer(self.rtc, offer)?;

// Handle potentially new m=application line.
let client = self.rtc.dtls.is_active().expect("DTLS active to be set");
if self.rtc.session.app().is_some() {
self.rtc.init_sctp(client);
self.rtc.init_sctp(client, remote_max_message_size);
}

let params = AsSdpParams::new(self.rtc, None);
Expand Down Expand Up @@ -174,13 +177,15 @@ impl<'a> SdpApi<'a> {
// Split out new channels, since that is not handled by the Session.
let new_channels = pending.changes.take_new_channels();

let remote_max_message_size = extract_max_message_size(answer.media_lines.iter());

// Modify session with answer
apply_answer(&mut self.rtc.session, pending.changes, answer)?;
apply_answer(self.rtc, pending.changes, answer)?;

// Handle potentially new m=application line.
let client = self.rtc.dtls.is_active().expect("DTLS to be inited");
if self.rtc.session.app().is_some() {
self.rtc.init_sctp(client);
self.rtc.init_sctp(client, remote_max_message_size);
}

for (id, config) in new_channels {
Expand Down Expand Up @@ -808,9 +813,10 @@ fn as_sdp(session: &Session, params: AsSdpParams) -> Sdp {
}
}

fn apply_offer(session: &mut Session, offer: SdpOffer) -> Result<(), RtcError> {
fn apply_offer(rtc: &mut Rtc, offer: SdpOffer) -> Result<(), RtcError> {
offer.assert_consistency()?;

let session = &mut rtc.session;
update_session(session, &offer);

let bundle_mids = offer.bundle_mids();
Expand All @@ -823,13 +829,10 @@ fn apply_offer(session: &mut Session, offer: SdpOffer) -> Result<(), RtcError> {
Ok(())
}

fn apply_answer(
session: &mut Session,
pending: Changes,
answer: SdpAnswer,
) -> Result<(), RtcError> {
fn apply_answer(rtc: &mut Rtc, pending: Changes, answer: SdpAnswer) -> Result<(), RtcError> {
answer.assert_consistency()?;

let session = &mut rtc.session;
update_session(session, &answer);

let bundle_mids = answer.bundle_mids();
Expand Down Expand Up @@ -1235,6 +1238,16 @@ fn update_media(
}
}

fn extract_max_message_size(mut media_lines: Iter<MediaLine>) -> Option<u32> {
if let Some(app_line) = media_lines.find(|m| m.typ.is_channel()) {
if let Some(max_size) = app_line.max_message_size() {
return Some(max_size as u32);
}
}

None
}

trait AsSdpMediaLine {
fn mid(&self) -> Mid;
fn msid(&self) -> Option<&Msid>;
Expand Down Expand Up @@ -1271,7 +1284,9 @@ impl AsSdpMediaLine for (Mid, usize) {
) -> MediaLine {
attrs.push(MediaAttribute::Mid(self.0));
attrs.push(MediaAttribute::SctpPort(5000));
attrs.push(MediaAttribute::MaxMessageSize(262144));
attrs.push(MediaAttribute::MaxMessageSize(
crate::sctp::LOCAL_MAX_MESSAGE_SIZE as usize,
));

MediaLine {
typ: sdp::MediaType::Application,
Expand Down Expand Up @@ -2231,4 +2246,134 @@ mod test {
// No space at the end
assert_eq!(count_lines(&line_string, "a=rid:no_attrs send"), 1);
}

#[test]
fn test_local_max_message_size_advertised() {
crate::init_crypto_default();

let now = Instant::now();
let mut rtc = Rtc::new(now);

// Create an offer with a data channel
let mut change = rtc.sdp_api();
change.add_channel("test-channel".into());
let (offer, _) = change.apply().unwrap();

// Find the application m-line in the offer
let app_line = offer
.media_lines
.iter()
.find(|m| m.typ.is_channel())
.expect("should have application m-line");

// Verify that max-message-size attribute is present and matches LOCAL_MAX_MESSAGE_SIZE
let max_size = app_line
.max_message_size()
.expect("max-message-size attribute should be present");

assert_eq!(
max_size,
crate::sctp::LOCAL_MAX_MESSAGE_SIZE as usize,
"max-message-size should match LOCAL_MAX_MESSAGE_SIZE constant"
);

// Also verify it's in the SDP string output
let sdp_string = offer.to_sdp_string();
let expected_line = format!("a=max-message-size:{}", crate::sctp::LOCAL_MAX_MESSAGE_SIZE);
assert!(
sdp_string.contains(&expected_line),
"SDP should contain max-message-size attribute with LOCAL_MAX_MESSAGE_SIZE value"
);
}

#[test]
fn test_remote_max_message_size_parsing() {
// Parse SDP with max-message-size attribute and verify value is extracted correctly
let sdp = "v=0\r\n\
o=- 0 0 IN IP4 172.17.0.1\r\n\
s=-\r\n\
c=IN IP4 172.17.0.1\r\n\
t=0 0\r\n\
a=group:BUNDLE 0\r\n\
a=fingerprint:sha-256 B4:12:1C:7C:7D:ED:F1:FA:61:07:57:9C:29:BE:58:E3:BC:41:E7:13:8E:7D\
:D3:9D:1F:94:6E:A5:23:46:94:23\r\n\
m=application 9999 UDP/DTLS/SCTP webrtc-datachannel\r\n\
a=mid:0\r\n\
a=ice-ufrag:test\r\n\
a=ice-pwd:testpassword1234\r\n\
a=setup:actpass\r\n\
a=sctp-port:5000\r\n\
a=max-message-size:131072\r\n\
";

let offer = SdpOffer::from_sdp_string(sdp).expect("should parse");

// Find the application m-line
let app_line = offer
.media_lines
.iter()
.find(|m| m.typ.is_channel())
.expect("should have application m-line");

assert_eq!(
app_line.max_message_size(),
Some(131072),
"max-message-size should be parsed as 131072"
);
}

#[test]
fn test_remote_max_message_size_applied() {
crate::init_crypto_default();

let now = Instant::now();
let mut rtc1 = Rtc::new(now);
let mut rtc2 = Rtc::new(now);

// Create an offer from rtc1 with a channel
let mut change1 = rtc1.sdp_api();
change1.add_channel("test-channel".into());
let (offer1, pending1) = change1.apply().unwrap();

// Get the offer SDP string and modify it to have a custom max-message-size
let custom_max_size1 = 98304u32;
let sdp_string = offer1.to_sdp_string();
let modified_sdp = sdp_string.replace(
&format!("a=max-message-size:{}", crate::sctp::LOCAL_MAX_MESSAGE_SIZE),
&format!("a=max-message-size:{}", custom_max_size1),
);

let modified_offer =
SdpOffer::from_sdp_string(&modified_sdp).expect("modified SDP should parse");

let answer = rtc2.sdp_api().accept_offer(modified_offer).unwrap();

// Verify that rtc2's SCTP send limit is set to the custom value from rtc1's offer
assert_eq!(
rtc2.sctp.remote_max_message_size(),
custom_max_size1,
"rtc2 should have remote max message size set to rtc1's advertised value"
);

// Now verify the reverse: rtc1 accepts rtc2's answer and applies its max-message-size
let custom_max_size2 = 131072u32;
let sdp_string = answer.to_sdp_string();
let modified_sdp = sdp_string.replace(
&format!("a=max-message-size:{}", crate::sctp::LOCAL_MAX_MESSAGE_SIZE),
&format!("a=max-message-size:{}", custom_max_size2),
);

let modified_answer =
SdpAnswer::from_sdp_string(&modified_sdp).expect("modified SDP should parse");

rtc1.sdp_api()
.accept_answer(pending1, modified_answer)
.unwrap();

assert_eq!(
rtc1.sctp.remote_max_message_size(),
custom_max_size2,
"rtc1 should have remote max message size set to rtc2's advertised value"
);
}
}
5 changes: 3 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1379,14 +1379,15 @@ impl Rtc {
Ok(())
}

fn init_sctp(&mut self, client: bool) {
fn init_sctp(&mut self, client: bool, remote_max_message_size: Option<u32>) {
// If we got an m=application line, ensure we have negotiated the
// SCTP association with the other side.
if self.sctp.is_inited() {
return;
}

self.sctp.init(client, self.last_now);
self.sctp
.init(client, self.last_now, remote_max_message_size);
}

/// Creates a new Mid that is not in the session already.
Expand Down
35 changes: 30 additions & 5 deletions src/sctp/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ pub use error::SctpError;
/// Bytes that can be buffered inside str0m across all streams.
const MAX_BUFFERED_ACROSS_STREAMS: usize = 128 * 1024;

/// Maximum message size we advertise in SDP (what we can receive)
pub const LOCAL_MAX_MESSAGE_SIZE: u32 = 256 * 1024;

/// Default max message size if remote doesn't advertise
pub const DEFAULT_REMOTE_MAX_MESSAGE_SIZE: u32 = 64 * 1024;

pub(crate) struct RtcSctp {
state: RtcSctpState,
endpoint: Endpoint,
Expand All @@ -34,6 +40,7 @@ pub(crate) struct RtcSctp {
pushed_back_transmit: Option<VecDeque<Vec<u8>>>,
last_now: Instant,
client: bool,
remote_max_message_size: u32,
}

/// This is okay because there is no way for a user of Rtc to interact with the Sctp subsystem
Expand Down Expand Up @@ -233,7 +240,12 @@ impl RtcSctp {
// DTLS above MTU 1200: 1277
// Let's try 1120, see if we can avoid warnings.
config.max_payload_size(1120);
let server_config = ServerConfig::default();
let mut server_config = ServerConfig::default();

server_config.transport = Arc::new(
TransportConfig::default().with_max_receive_message_size(LOCAL_MAX_MESSAGE_SIZE),
);

let endpoint = Endpoint::new(Arc::new(config), Some(Arc::new(server_config)));
let fake_addr = "1.1.1.1:5000".parse().unwrap();

Expand All @@ -247,26 +259,33 @@ impl RtcSctp {
pushed_back_transmit: None,
last_now: Instant::now(), // placeholder until init()
client: false,
remote_max_message_size: DEFAULT_REMOTE_MAX_MESSAGE_SIZE,
}
}

pub fn is_inited(&self) -> bool {
self.state != RtcSctpState::Uninited
}

pub fn init(&mut self, client: bool, now: Instant) {
pub fn init(&mut self, client: bool, now: Instant, remote_max_message_size: Option<u32>) {
assert!(self.state == RtcSctpState::Uninited);

self.client = client;
self.last_now = now;

if let Some(max_msg_size) = remote_max_message_size {
self.remote_max_message_size = max_msg_size;
}

if client {
// For WebRTC, we never want to give up retransmitting
// init and data packets. The connectivity is in ICE,
// and SCTP should not give up until ICE gives up.
let transport = TransportConfig::default()
.with_max_init_retransmits(None)
.with_max_data_retransmits(None);
.with_max_data_retransmits(None)
.with_max_receive_message_size(LOCAL_MAX_MESSAGE_SIZE)
.with_max_send_message_size(self.remote_max_message_size);

let config = ClientConfig {
transport: Arc::new(transport),
Expand Down Expand Up @@ -538,6 +557,7 @@ impl RtcSctp {

while let Some(e) = assoc.poll() {
if let Event::Connected = e {
assoc.set_max_send_message_size(self.remote_max_message_size);
set_state(&mut self.state, RtcSctpState::Established);
return self.poll();
}
Expand Down Expand Up @@ -812,6 +832,11 @@ impl RtcSctp {
.find(|s| s.id == sctp_stream_id)
.and_then(|s| s.config.as_ref())
}

#[cfg(test)]
pub(crate) fn remote_max_message_size(&self) -> u32 {
self.remote_max_message_size
}
}

fn transmit_to_vec(t: Transmit) -> Option<VecDeque<Vec<u8>>> {
Expand Down Expand Up @@ -977,8 +1002,8 @@ mod tests {
let mut client = RtcSctp::new();
let mut server = RtcSctp::new();

client.init(true, now);
server.init(false, now);
client.init(true, now, None);
server.init(false, now, None);

// Exchange packets until both are Established.
for _ in 0..20 {
Expand Down
Loading
Loading