From b99d3ed90502a7b7b783da2986cd57c59744ac14 Mon Sep 17 00:00:00 2001 From: Seven Du <5564821+medz@users.noreply.github.com> Date: Tue, 23 Jun 2026 14:43:40 +0800 Subject: [PATCH 1/2] refactor(fetch): make Body extend Blob --- CHANGELOG.md | 3 + lib/src/fetch/body.dart | 220 +++++++++++++++++------------- test/body_test.dart | 24 +++- test/public_api_surface_test.dart | 5 + 4 files changed, 154 insertions(+), 98 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d8e429f..dbf5537 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## Next +- BREAKING: `Body` now extends the platform `Blob` implementation and implements + `Stream` directly; use `body` as a stream or call `body.stream()` + instead of reading the previous `body.stream` getter. - Added `Body.size` for exposing known body byte lengths without consuming the body. - Fixed `Blob` byte snapshot semantics so byte-backed parts and read buffers are diff --git a/lib/src/fetch/body.dart b/lib/src/fetch/body.dart index 626582e..c37cb2b 100644 --- a/lib/src/fetch/body.dart +++ b/lib/src/fetch/body.dart @@ -26,122 +26,96 @@ import 'url_search_params.dart'; /// On IO, `dart:io File` values are supported as [Blob] parts. Wrap files in a /// [Blob] before passing them as [BodyInit], for example `Body(Blob([file]))`. /// -/// Native bodies normalize supported inputs into a detached [block.Block] -/// when possible. Platform implementations may accept additional host-backed -/// inputs before materialization. +/// Bodies normalize supported inputs into a detached [Blob] when +/// possible. Platform implementations may accept additional host-backed inputs +/// before materialization. typedef BodyInit = Object?; const _textPlainUtf8 = 'text/plain;charset=UTF-8'; const _urlEncodedUtf8 = 'application/x-www-form-urlencoded;charset=UTF-8'; +const _defaultBlobChunkSize = 16 * 1024; /// Native detached body implementation. /// /// This is the shared body baseline that web/io implementations align to. -class Body extends Stream { +class Body extends Blob with Stream implements Stream { Body._({ - block.Block? blockHost, + Iterable blobParts = const [], Stream? streamHost, - int? byteLength, + int? streamSize, + String type = '', this.contentType, - }) : assert(blockHost != null || streamHost != null), - assert(byteLength == null || byteLength >= 0), - size = byteLength ?? blockHost?.size, - _blockHost = blockHost, - _streamHost = streamHost; + }) : assert(streamSize == null || streamSize >= 0), + _streamHost = streamHost, + _streamSize = streamSize, + super(blobParts, type); factory Body([BodyInit? init]) { - switch (init) { - case null: - return Body._(blockHost: block.Block(const [])); - case final Body body: - return body.clone(); - case final String text: - return Body._( - blockHost: block.Block([text], type: _textPlainUtf8), - contentType: _textPlainUtf8, - ); - case final Uint8List bytes: - return Body._(blockHost: block.Block([bytes])); - case final ByteBuffer buffer: - return Body._(blockHost: block.Block([buffer.asUint8List()])); - case final List bytes: - return Body._(blockHost: block.Block([Uint8List.fromList(bytes)])); - case final Blob blob: - return Body._(blockHost: blob, contentType: _contentType(blob.type)); - case final block.Block blockHost: - return Body._( - blockHost: blockHost, - contentType: _contentType(blockHost.type), - ); - case final URLSearchParams params: - return Body._( - blockHost: block.Block([params.toString()], type: _urlEncodedUtf8), - contentType: _urlEncodedUtf8, - ); - case final FormData formData: - final encoded = formData.encodeMultipart(); - return Body._( - streamHost: encoded.stream, - byteLength: encoded.contentLength, - contentType: encoded.contentType, - ); - case final Stream> stream: - return Body._( - streamHost: stream.map( - (chunk) => chunk is Uint8List ? chunk : Uint8List.fromList(chunk), - ), - ); - default: - throw ArgumentError.value( - init, - 'init', - 'Unsupported body type: ${init.runtimeType}', - ); - } + return switch (init) { + final Body body => body.clone(), + final FormData formData => Body._fromFormData(formData), + final Stream> stream => Body._fromStream(stream), + _ => Body._fromBlobInit(init), + }; } - final block.Block? _blockHost; Stream? _streamHost; + int? _streamSize; bool _used = false; /// The body-derived media type, when extracting the body produced one. final String? contentType; - /// The byte length when it is known without consuming the body. - /// - /// Stream-backed bodies created from arbitrary [Stream] values return `null`. - final int? size; - - Stream get stream async* { - final blockHost = _blockHost; + @override + int get size { final streamHost = _streamHost; - if (blockHost == null && streamHost == null) { - return; + if (streamHost == null) { + return super.size; } - _startConsumption(); + final streamSize = _streamSize; + if (streamSize != null) { + return streamSize; + } - if (blockHost != null) { - yield* blockHost.stream(); - return; + throw UnsupportedError( + 'Body.size is unavailable for stream-backed bodies with unknown length.', + ); + } + + @override + Stream stream({int chunkSize = _defaultBlobChunkSize}) async* { + if (chunkSize <= 0) { + throw ArgumentError.value(chunkSize, 'chunkSize', 'Must be > 0'); } + _startConsumption(); + + final streamHost = _streamHost; if (streamHost != null) { yield* streamHost; + return; } + + yield* super.stream(chunkSize: chunkSize); } bool get bodyUsed => _used; - Future bytes() async { + @override + Future bytes() => arrayBuffer(); + + @override + Future arrayBuffer() async { final builder = BytesBuilder(copy: false); - await for (final chunk in stream) { + await for (final chunk in stream()) { builder.add(chunk); } return builder.takeBytes(); } + @override Future text([Encoding encoding = utf8]) async { return encoding.decode(await bytes()); } @@ -151,17 +125,12 @@ class Body extends Stream { } Future blob() async { - final blockHost = _blockHost; - if (blockHost != null) { + if (_streamHost == null) { _startConsumption(); - if (blockHost case final Blob blob) { - return blob; - } - - return Blob([blockHost], blockHost.type); + return Blob([super.slice(0, null, type)], type); } - return Blob([await bytes()]); + return Blob([await bytes()], type); } Body clone() { @@ -169,23 +138,35 @@ class Body extends Stream { throw StateError('Body has already been consumed.'); } - final blockHost = _blockHost; - if (blockHost != null) { - return Body._(blockHost: blockHost, contentType: contentType); - } - final streamHost = _streamHost; - if (streamHost != null) { - final (left, right) = streamTee(streamHost); - _streamHost = left; + if (streamHost == null) { return Body._( - streamHost: right, - byteLength: size, + blobParts: [super.slice(0, null, type)], + type: type, contentType: contentType, ); } - throw StateError('Body has no host.'); + final (left, right) = streamTee(streamHost); + _streamHost = left; + return Body._( + streamHost: right, + streamSize: _streamSize, + type: type, + contentType: contentType, + ); + } + + @override + Blob slice(int start, [int? end, String? contentType]) { + if (_streamHost != null) { + throw UnsupportedError( + 'Body.slice is unavailable for stream-backed bodies.', + ); + } + + final sliced = super.slice(start, end, contentType); + return Blob([sliced], sliced.type); } @override @@ -195,7 +176,7 @@ class Body extends Stream { void Function()? onDone, bool? cancelOnError, }) { - return stream.listen( + return stream().listen( onData, onError: onError, onDone: onDone, @@ -211,5 +192,54 @@ class Body extends Stream { _used = true; } + static Body _fromBlobInit(BodyInit init) { + final type = _blobInitType(init); + return Body._( + blobParts: _blobInitParts(init), + type: type, + contentType: _contentType(type), + ); + } + + static Body _fromFormData(FormData formData) { + final encoded = formData.encodeMultipart(); + return Body._( + streamHost: encoded.stream, + streamSize: encoded.contentLength, + type: encoded.contentType, + contentType: encoded.contentType, + ); + } + + static Body _fromStream(Stream> stream) { + return Body._( + streamHost: stream.map( + (chunk) => chunk is Uint8List ? chunk : Uint8List.fromList(chunk), + ), + ); + } + + static Iterable _blobInitParts(BodyInit init) { + return switch (init) { + null => const [], + final URLSearchParams params => [params.toString()], + final ByteBuffer buffer => [buffer.asUint8List()], + final List bytes when bytes is! Uint8List => [ + Uint8List.fromList(bytes), + ], + _ => [init], + }; + } + + static String _blobInitType(BodyInit init) { + return switch (init) { + final String _ => _textPlainUtf8, + final URLSearchParams _ => _urlEncodedUtf8, + final Blob blob => blob.type, + final block.Block blockHost => blockHost.type, + _ => '', + }; + } + static String? _contentType(String type) => type.isEmpty ? null : type; } diff --git a/test/body_test.dart b/test/body_test.dart index 7a274f8..0939db6 100644 --- a/test/body_test.dart +++ b/test/body_test.dart @@ -3,6 +3,7 @@ import 'dart:convert'; import 'dart:typed_data'; import 'package:block/block.dart' as block; +import 'package:ht/src/fetch/blob.dart' as platform_blob; import 'package:ht/src/fetch/body.dart'; import 'package:ht/src/fetch/form_data.native.dart'; import 'package:ht/src/fetch/url_search_params.dart'; @@ -55,6 +56,23 @@ void main() { expect(Body([1, 2, 3]).contentType, isNull); }); + test('extends platform Blob and implements Stream', () async { + final body = Body('hello'); + + expect(body, isA()); + expect(body, isA>()); + expect(body.type, 'text/plain;charset=utf-8'); + expect(body.size, 5); + + final slice = body.slice(1, 4); + expect(await slice.text(), 'ell'); + expect(body.bodyUsed, isFalse); + + final chunks = await expectNonNullableStream(body).toList(); + expect(chunks.expand((chunk) => chunk).toList(), utf8.encode('hello')); + expect(body.bodyUsed, isTrue); + }); + test('exposes known byte size without consuming the body', () { final params = URLSearchParams({'a': '1', 'b': '2'}); final formData = FormData()..append('a', Multipart.text('1')); @@ -74,10 +92,10 @@ void main() { expect(formBody.bodyUsed, isFalse); }); - test('reports null size for arbitrary stream bodies', () { + test('rejects size reads for arbitrary stream bodies', () { final body = Body(Stream>.value(utf8.encode('stream'))); - expect(body.size, isNull); + expect(() => body.size, throwsUnsupportedError); expect(body.bodyUsed, isFalse); }); @@ -145,7 +163,7 @@ void main() { 'empty bodies return empty bytes and become used when consumed', () async { final body = Body(); - final stream = expectNonNullableStream(body.stream); + final stream = expectNonNullableStream(body); expect(body.bodyUsed, isFalse); expect(await stream.toList(), isEmpty); diff --git a/test/public_api_surface_test.dart b/test/public_api_surface_test.dart index 82c305b..a34d1f1 100644 --- a/test/public_api_surface_test.dart +++ b/test/public_api_surface_test.dart @@ -1,3 +1,6 @@ +import 'dart:async'; +import 'dart:typed_data'; + import 'package:block/block.dart' as block; import 'package:ht/ht.dart'; import 'package:test/test.dart'; @@ -42,6 +45,8 @@ void main() { expect(requestInit.method, 'POST'); expect(requestInit.priority, RequestPriority.high); expect(responseInit.status, 200); + expect(body, isA()); + expect(body, isA>()); expect(body.size, 6); expect(request.headers.has('content-type'), isTrue); expect(await multipart.bytes(), isNotEmpty); From 8349abd38699f75bb0d24518e50fe51bfe32acf8 Mon Sep 17 00:00:00 2001 From: Seven Du <5564821+medz@users.noreply.github.com> Date: Tue, 23 Jun 2026 16:36:13 +0800 Subject: [PATCH 2/2] refactor(fetch): simplify Body blob fallback --- lib/src/fetch/body.dart | 58 ++++++++++++++++-------------- test/body_test.dart | 79 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 110 insertions(+), 27 deletions(-) diff --git a/lib/src/fetch/body.dart b/lib/src/fetch/body.dart index c37cb2b..af93544 100644 --- a/lib/src/fetch/body.dart +++ b/lib/src/fetch/body.dart @@ -15,6 +15,7 @@ import 'url_search_params.dart'; /// - [String] /// - [Uint8List] /// - [ByteBuffer] +/// - [ByteData] /// - [List] /// - [Stream>] /// - [Blob] @@ -23,8 +24,8 @@ import 'url_search_params.dart'; /// - [FormData] /// - [URLSearchParams] /// -/// On IO, `dart:io File` values are supported as [Blob] parts. Wrap files in a -/// [Blob] before passing them as [BodyInit], for example `Body(Blob([file]))`. +/// On IO, `dart:io File` values are supported through the platform [Blob] +/// implementation. /// /// Bodies normalize supported inputs into a detached [Blob] when /// possible. Platform implementations may accept additional host-backed inputs @@ -35,7 +36,7 @@ const _textPlainUtf8 = 'text/plain;charset=UTF-8'; const _urlEncodedUtf8 = 'application/x-www-form-urlencoded;charset=UTF-8'; const _defaultBlobChunkSize = 16 * 1024; -/// Native detached body implementation. +/// Detached body implementation. /// /// This is the shared body baseline that web/io implementations align to. class Body extends Blob with Stream implements Stream { @@ -55,7 +56,12 @@ class Body extends Blob with Stream implements Stream { final Body body => body.clone(), final FormData formData => Body._fromFormData(formData), final Stream> stream => Body._fromStream(stream), - _ => Body._fromBlobInit(init), + final String text => Body._fromBlobInit(text, _textPlainUtf8), + final URLSearchParams params => Body._fromBlobInit( + params.toString(), + _urlEncodedUtf8, + ), + _ => Body._fromBlobInit(init, _blobInitType(init)), }; } @@ -84,11 +90,15 @@ class Body extends Blob with Stream implements Stream { } @override - Stream stream({int chunkSize = _defaultBlobChunkSize}) async* { + Stream stream({int chunkSize = _defaultBlobChunkSize}) { if (chunkSize <= 0) { throw ArgumentError.value(chunkSize, 'chunkSize', 'Must be > 0'); } + return _stream(chunkSize); + } + + Stream _stream(int chunkSize) async* { _startConsumption(); final streamHost = _streamHost; @@ -107,12 +117,14 @@ class Body extends Blob with Stream implements Stream { @override Future arrayBuffer() async { - final builder = BytesBuilder(copy: false); - await for (final chunk in stream()) { - builder.add(chunk); + _startConsumption(); + + final streamHost = _streamHost; + if (streamHost == null) { + return super.arrayBuffer(); } - return builder.takeBytes(); + return _readStream(streamHost); } @override @@ -192,10 +204,9 @@ class Body extends Blob with Stream implements Stream { _used = true; } - static Body _fromBlobInit(BodyInit init) { - final type = _blobInitType(init); + static Body _fromBlobInit(BodyInit init, String type) { return Body._( - blobParts: _blobInitParts(init), + blobParts: init == null ? const [] : [init], type: type, contentType: _contentType(type), ); @@ -212,23 +223,16 @@ class Body extends Blob with Stream implements Stream { } static Body _fromStream(Stream> stream) { - return Body._( - streamHost: stream.map( - (chunk) => chunk is Uint8List ? chunk : Uint8List.fromList(chunk), - ), - ); + return Body._(streamHost: stream.map(Uint8List.fromList)); } - static Iterable _blobInitParts(BodyInit init) { - return switch (init) { - null => const [], - final URLSearchParams params => [params.toString()], - final ByteBuffer buffer => [buffer.asUint8List()], - final List bytes when bytes is! Uint8List => [ - Uint8List.fromList(bytes), - ], - _ => [init], - }; + static Future _readStream(Stream stream) async { + final builder = BytesBuilder(copy: false); + await for (final chunk in stream) { + builder.add(chunk); + } + + return builder.takeBytes(); } static String _blobInitType(BodyInit init) { diff --git a/test/body_test.dart b/test/body_test.dart index 0939db6..0ed16b5 100644 --- a/test/body_test.dart +++ b/test/body_test.dart @@ -11,6 +11,31 @@ import 'package:test/test.dart'; Stream expectNonNullableStream(Stream stream) => stream; +final class _ThrowingReadBlob extends platform_blob.Blob { + _ThrowingReadBlob(List bytes, {String type = ''}) + : super([Uint8List.fromList(bytes)], type); + + @override + Future arrayBuffer() { + throw StateError('arrayBuffer should not be called'); + } + + @override + Future bytes() { + throw StateError('bytes should not be called'); + } + + @override + Future text() { + throw StateError('text should not be called'); + } + + @override + Stream stream({int chunkSize = 16 * 1024}) { + throw StateError('stream should not be called'); + } +} + void main() { group('Body', () { test('string bodies decode as text and bytes', () async { @@ -73,6 +98,27 @@ void main() { expect(body.bodyUsed, isTrue); }); + test('normalizes ordinary init values through Blob snapshots', () async { + final bytes = Uint8List.fromList([1, 2, 3]); + final body = Body(bytes); + + bytes[0] = 9; + + expect(await body.bytes(), [1, 2, 3]); + }); + + test('uses Blob backing without calling source read methods', () async { + final blob = _ThrowingReadBlob([ + 1, + 2, + 3, + ], type: 'application/octet-stream'); + final body = Body(blob); + + expect(body.contentType, 'application/octet-stream'); + expect(await body.bytes(), [1, 2, 3]); + }); + test('exposes known byte size without consuming the body', () { final params = URLSearchParams({'a': '1', 'b': '2'}); final formData = FormData()..append('a', Multipart.text('1')); @@ -99,6 +145,26 @@ void main() { expect(body.bodyUsed, isFalse); }); + test('validates stream chunk size before consumption starts', () async { + final body = Body('hello'); + + expect(() => body.stream(chunkSize: 0), throwsArgumentError); + expect(body.bodyUsed, isFalse); + + final stream = body.stream(chunkSize: 2); + + expect(body.bodyUsed, isFalse); + expect(await stream.map(utf8.decode).join(), 'hello'); + expect(body.bodyUsed, isTrue); + }); + + test('stream-backed bodies cannot be sliced', () { + final body = Body(Stream>.value(utf8.encode('stream'))); + + expect(() => body.slice(0), throwsUnsupportedError); + expect(body.bodyUsed, isFalse); + }); + test('block bodies can be converted back to Blob', () async { final body = Body(block.Block(['payload'], type: 'text/plain')); final blob = await body.blob(); @@ -128,6 +194,19 @@ void main() { expect(await clone.text(), 'hello world'); }); + test('stream chunks are copied when consumed', () async { + final controller = StreamController>(sync: true); + final chunk = Uint8List.fromList([1, 2, 3]); + final body = Body(controller.stream); + + final bytes = body.bytes(); + controller.add(chunk); + chunk[0] = 9; + await controller.close(); + + expect(await bytes, [1, 2, 3]); + }); + test('copying a stream-backed body preserves independent reads', () async { final controller = StreamController>(); scheduleMicrotask(() async {