Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
## Next

- BREAKING: `Body` now extends the platform `Blob` implementation and implements
`Stream<Uint8List>` directly; use `body` as a stream or call `body.stream()`
instead of reading the previous `body.stream` getter.
- Added `Body.size` for exposing known body byte lengths without consuming the
body.
- Fixed `Blob` byte snapshot semantics so byte-backed parts and read buffers are
Expand Down
236 changes: 135 additions & 101 deletions lib/src/fetch/body.dart
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import 'url_search_params.dart';
/// - [String]
/// - [Uint8List]
/// - [ByteBuffer]
/// - [ByteData]
/// - [List<int>]
/// - [Stream<List<int>>]
/// - [Blob]
Expand All @@ -23,125 +24,110 @@ import 'url_search_params.dart';
/// - [FormData]
/// - [URLSearchParams]
///
/// On IO, `dart:io File` values are supported as [Blob] parts. Wrap files in a
/// [Blob] before passing them as [BodyInit], for example `Body(Blob([file]))`.
/// On IO, `dart:io File` values are supported through the platform [Blob]
/// implementation.
///
/// Native bodies normalize supported inputs into a detached [block.Block]
/// when possible. Platform implementations may accept additional host-backed
/// inputs before materialization.
/// Bodies normalize supported inputs into a detached [Blob] when
/// possible. Platform implementations may accept additional host-backed inputs
/// before materialization.
typedef BodyInit = Object?;

const _textPlainUtf8 = 'text/plain;charset=UTF-8';
const _urlEncodedUtf8 = 'application/x-www-form-urlencoded;charset=UTF-8';
const _defaultBlobChunkSize = 16 * 1024;

/// Native detached body implementation.
/// Detached body implementation.
///
/// This is the shared body baseline that web/io implementations align to.
class Body extends Stream<Uint8List> {
class Body extends Blob with Stream<Uint8List> implements Stream<Uint8List> {
Body._({
block.Block? blockHost,
Iterable<BlobPart> blobParts = const <BlobPart>[],
Stream<Uint8List>? streamHost,
int? byteLength,
int? streamSize,
String type = '',
this.contentType,
}) : assert(blockHost != null || streamHost != null),
assert(byteLength == null || byteLength >= 0),
size = byteLength ?? blockHost?.size,
_blockHost = blockHost,
_streamHost = streamHost;
}) : assert(streamSize == null || streamSize >= 0),
_streamHost = streamHost,
_streamSize = streamSize,
super(blobParts, type);
Comment thread
medz marked this conversation as resolved.

factory Body([BodyInit? init]) {
switch (init) {
case null:
return Body._(blockHost: block.Block(const []));
case final Body body:
return body.clone();
case final String text:
return Body._(
blockHost: block.Block([text], type: _textPlainUtf8),
contentType: _textPlainUtf8,
);
case final Uint8List bytes:
return Body._(blockHost: block.Block([bytes]));
case final ByteBuffer buffer:
return Body._(blockHost: block.Block([buffer.asUint8List()]));
case final List<int> bytes:
return Body._(blockHost: block.Block([Uint8List.fromList(bytes)]));
case final Blob blob:
return Body._(blockHost: blob, contentType: _contentType(blob.type));
case final block.Block blockHost:
return Body._(
blockHost: blockHost,
contentType: _contentType(blockHost.type),
);
case final URLSearchParams params:
return Body._(
blockHost: block.Block([params.toString()], type: _urlEncodedUtf8),
contentType: _urlEncodedUtf8,
);
case final FormData formData:
final encoded = formData.encodeMultipart();
return Body._(
streamHost: encoded.stream,
byteLength: encoded.contentLength,
contentType: encoded.contentType,
);
case final Stream<List<int>> stream:
return Body._(
streamHost: stream.map(
(chunk) => chunk is Uint8List ? chunk : Uint8List.fromList(chunk),
),
);
default:
throw ArgumentError.value(
init,
'init',
'Unsupported body type: ${init.runtimeType}',
);
}
return switch (init) {
final Body body => body.clone(),
final FormData formData => Body._fromFormData(formData),
final Stream<List<int>> stream => Body._fromStream(stream),
final String text => Body._fromBlobInit(text, _textPlainUtf8),
final URLSearchParams params => Body._fromBlobInit(
params.toString(),
_urlEncodedUtf8,
),
_ => Body._fromBlobInit(init, _blobInitType(init)),
};
}

final block.Block? _blockHost;
Stream<Uint8List>? _streamHost;
int? _streamSize;
bool _used = false;

/// The body-derived media type, when extracting the body produced one.
final String? contentType;

/// The byte length when it is known without consuming the body.
///
/// Stream-backed bodies created from arbitrary [Stream] values return `null`.
final int? size;

Stream<Uint8List> get stream async* {
final blockHost = _blockHost;
@override
int get size {
final streamHost = _streamHost;
if (blockHost == null && streamHost == null) {
return;
if (streamHost == null) {
return super.size;
}

_startConsumption();
final streamSize = _streamSize;
if (streamSize != null) {
return streamSize;
}

if (blockHost != null) {
yield* blockHost.stream();
return;
throw UnsupportedError(
'Body.size is unavailable for stream-backed bodies with unknown length.',
);
}

@override
Stream<Uint8List> stream({int chunkSize = _defaultBlobChunkSize}) {
if (chunkSize <= 0) {
throw ArgumentError.value(chunkSize, 'chunkSize', 'Must be > 0');
}

return _stream(chunkSize);
}

Stream<Uint8List> _stream(int chunkSize) async* {
_startConsumption();

final streamHost = _streamHost;
if (streamHost != null) {
yield* streamHost;
return;
}

yield* super.stream(chunkSize: chunkSize);
}

bool get bodyUsed => _used;

Future<Uint8List> bytes() async {
final builder = BytesBuilder(copy: false);
await for (final chunk in stream) {
builder.add(chunk);
@override
Future<Uint8List> bytes() => arrayBuffer();

@override
Future<Uint8List> arrayBuffer() async {
_startConsumption();

final streamHost = _streamHost;
if (streamHost == null) {
return super.arrayBuffer();
}

return builder.takeBytes();
return _readStream(streamHost);
}

@override
Future<String> text([Encoding encoding = utf8]) async {
return encoding.decode(await bytes());
}
Expand All @@ -151,41 +137,48 @@ class Body extends Stream<Uint8List> {
}

Future<Blob> blob() async {
final blockHost = _blockHost;
if (blockHost != null) {
if (_streamHost == null) {
_startConsumption();
if (blockHost case final Blob blob) {
return blob;
}

return Blob(<Object>[blockHost], blockHost.type);
return Blob(<BlobPart>[super.slice(0, null, type)], type);
}

return Blob(<Object>[await bytes()]);
return Blob(<BlobPart>[await bytes()], type);
}

Body clone() {
if (_used) {
throw StateError('Body has already been consumed.');
}

final blockHost = _blockHost;
if (blockHost != null) {
return Body._(blockHost: blockHost, contentType: contentType);
}

final streamHost = _streamHost;
if (streamHost != null) {
final (left, right) = streamTee(streamHost);
_streamHost = left;
if (streamHost == null) {
return Body._(
streamHost: right,
byteLength: size,
blobParts: <BlobPart>[super.slice(0, null, type)],
type: type,
contentType: contentType,
);
}

throw StateError('Body has no host.');
final (left, right) = streamTee(streamHost);
_streamHost = left;
return Body._(
streamHost: right,
streamSize: _streamSize,
type: type,
contentType: contentType,
);
}

@override
Blob slice(int start, [int? end, String? contentType]) {
if (_streamHost != null) {
throw UnsupportedError(
'Body.slice is unavailable for stream-backed bodies.',
);
}

final sliced = super.slice(start, end, contentType);
return Blob(<BlobPart>[sliced], sliced.type);
}

@override
Expand All @@ -195,7 +188,7 @@ class Body extends Stream<Uint8List> {
void Function()? onDone,
bool? cancelOnError,
}) {
return stream.listen(
return stream().listen(
onData,
onError: onError,
onDone: onDone,
Expand All @@ -211,5 +204,46 @@ class Body extends Stream<Uint8List> {
_used = true;
}

static Body _fromBlobInit(BodyInit init, String type) {
return Body._(
blobParts: init == null ? const <BlobPart>[] : <BlobPart>[init],
type: type,
contentType: _contentType(type),
);
}

static Body _fromFormData(FormData formData) {
final encoded = formData.encodeMultipart();
return Body._(
streamHost: encoded.stream,
streamSize: encoded.contentLength,
type: encoded.contentType,
contentType: encoded.contentType,
);
}

static Body _fromStream(Stream<List<int>> stream) {
return Body._(streamHost: stream.map(Uint8List.fromList));
}

static Future<Uint8List> _readStream(Stream<Uint8List> stream) async {
final builder = BytesBuilder(copy: false);
await for (final chunk in stream) {
builder.add(chunk);
}

return builder.takeBytes();
}

static String _blobInitType(BodyInit init) {
return switch (init) {
final String _ => _textPlainUtf8,
final URLSearchParams _ => _urlEncodedUtf8,
final Blob blob => blob.type,
final block.Block blockHost => blockHost.type,
_ => '',

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Preserve native Blob content types

When users pass a native web.Blob or web.File directly as BodyInit on the web target, Blob now accepts and reads that host-backed part, but this fallback leaves the type as '', so Body(web.Blob(...type: 'text/plain')).contentType is null. Request/Response only auto-populate the content-type header from body.contentType, so these accepted native Blob/File bodies lose their MIME type even though package Blob bodies still preserve it; please extract the host Blob/File type when delegating these inputs.

Useful? React with 👍 / 👎.

};
}

static String? _contentType(String type) => type.isEmpty ? null : type;
}
Loading