diff --git a/CHANGELOG.md b/CHANGELOG.md index d8e429f..dbf5537 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,8 @@ ## Next +- BREAKING: `Body` now extends the platform `Blob` implementation and implements + `Stream` directly; use `body` as a stream or call `body.stream()` + instead of reading the previous `body.stream` getter. - Added `Body.size` for exposing known body byte lengths without consuming the body. - Fixed `Blob` byte snapshot semantics so byte-backed parts and read buffers are diff --git a/lib/src/fetch/body.dart b/lib/src/fetch/body.dart index 626582e..af93544 100644 --- a/lib/src/fetch/body.dart +++ b/lib/src/fetch/body.dart @@ -15,6 +15,7 @@ import 'url_search_params.dart'; /// - [String] /// - [Uint8List] /// - [ByteBuffer] +/// - [ByteData] /// - [List] /// - [Stream>] /// - [Blob] @@ -23,125 +24,110 @@ import 'url_search_params.dart'; /// - [FormData] /// - [URLSearchParams] /// -/// On IO, `dart:io File` values are supported as [Blob] parts. Wrap files in a -/// [Blob] before passing them as [BodyInit], for example `Body(Blob([file]))`. +/// On IO, `dart:io File` values are supported through the platform [Blob] +/// implementation. /// -/// Native bodies normalize supported inputs into a detached [block.Block] -/// when possible. Platform implementations may accept additional host-backed -/// inputs before materialization. +/// Bodies normalize supported inputs into a detached [Blob] when +/// possible. Platform implementations may accept additional host-backed inputs +/// before materialization. typedef BodyInit = Object?; const _textPlainUtf8 = 'text/plain;charset=UTF-8'; const _urlEncodedUtf8 = 'application/x-www-form-urlencoded;charset=UTF-8'; +const _defaultBlobChunkSize = 16 * 1024; -/// Native detached body implementation. +/// Detached body implementation. /// /// This is the shared body baseline that web/io implementations align to. -class Body extends Stream { +class Body extends Blob with Stream implements Stream { Body._({ - block.Block? blockHost, + Iterable blobParts = const [], Stream? streamHost, - int? byteLength, + int? streamSize, + String type = '', this.contentType, - }) : assert(blockHost != null || streamHost != null), - assert(byteLength == null || byteLength >= 0), - size = byteLength ?? blockHost?.size, - _blockHost = blockHost, - _streamHost = streamHost; + }) : assert(streamSize == null || streamSize >= 0), + _streamHost = streamHost, + _streamSize = streamSize, + super(blobParts, type); factory Body([BodyInit? init]) { - switch (init) { - case null: - return Body._(blockHost: block.Block(const [])); - case final Body body: - return body.clone(); - case final String text: - return Body._( - blockHost: block.Block([text], type: _textPlainUtf8), - contentType: _textPlainUtf8, - ); - case final Uint8List bytes: - return Body._(blockHost: block.Block([bytes])); - case final ByteBuffer buffer: - return Body._(blockHost: block.Block([buffer.asUint8List()])); - case final List bytes: - return Body._(blockHost: block.Block([Uint8List.fromList(bytes)])); - case final Blob blob: - return Body._(blockHost: blob, contentType: _contentType(blob.type)); - case final block.Block blockHost: - return Body._( - blockHost: blockHost, - contentType: _contentType(blockHost.type), - ); - case final URLSearchParams params: - return Body._( - blockHost: block.Block([params.toString()], type: _urlEncodedUtf8), - contentType: _urlEncodedUtf8, - ); - case final FormData formData: - final encoded = formData.encodeMultipart(); - return Body._( - streamHost: encoded.stream, - byteLength: encoded.contentLength, - contentType: encoded.contentType, - ); - case final Stream> stream: - return Body._( - streamHost: stream.map( - (chunk) => chunk is Uint8List ? chunk : Uint8List.fromList(chunk), - ), - ); - default: - throw ArgumentError.value( - init, - 'init', - 'Unsupported body type: ${init.runtimeType}', - ); - } + return switch (init) { + final Body body => body.clone(), + final FormData formData => Body._fromFormData(formData), + final Stream> stream => Body._fromStream(stream), + final String text => Body._fromBlobInit(text, _textPlainUtf8), + final URLSearchParams params => Body._fromBlobInit( + params.toString(), + _urlEncodedUtf8, + ), + _ => Body._fromBlobInit(init, _blobInitType(init)), + }; } - final block.Block? _blockHost; Stream? _streamHost; + int? _streamSize; bool _used = false; /// The body-derived media type, when extracting the body produced one. final String? contentType; - /// The byte length when it is known without consuming the body. - /// - /// Stream-backed bodies created from arbitrary [Stream] values return `null`. - final int? size; - - Stream get stream async* { - final blockHost = _blockHost; + @override + int get size { final streamHost = _streamHost; - if (blockHost == null && streamHost == null) { - return; + if (streamHost == null) { + return super.size; } - _startConsumption(); + final streamSize = _streamSize; + if (streamSize != null) { + return streamSize; + } - if (blockHost != null) { - yield* blockHost.stream(); - return; + throw UnsupportedError( + 'Body.size is unavailable for stream-backed bodies with unknown length.', + ); + } + + @override + Stream stream({int chunkSize = _defaultBlobChunkSize}) { + if (chunkSize <= 0) { + throw ArgumentError.value(chunkSize, 'chunkSize', 'Must be > 0'); } + return _stream(chunkSize); + } + + Stream _stream(int chunkSize) async* { + _startConsumption(); + + final streamHost = _streamHost; if (streamHost != null) { yield* streamHost; + return; } + + yield* super.stream(chunkSize: chunkSize); } bool get bodyUsed => _used; - Future bytes() async { - final builder = BytesBuilder(copy: false); - await for (final chunk in stream) { - builder.add(chunk); + @override + Future bytes() => arrayBuffer(); + + @override + Future arrayBuffer() async { + _startConsumption(); + + final streamHost = _streamHost; + if (streamHost == null) { + return super.arrayBuffer(); } - return builder.takeBytes(); + return _readStream(streamHost); } + @override Future text([Encoding encoding = utf8]) async { return encoding.decode(await bytes()); } @@ -151,17 +137,12 @@ class Body extends Stream { } Future blob() async { - final blockHost = _blockHost; - if (blockHost != null) { + if (_streamHost == null) { _startConsumption(); - if (blockHost case final Blob blob) { - return blob; - } - - return Blob([blockHost], blockHost.type); + return Blob([super.slice(0, null, type)], type); } - return Blob([await bytes()]); + return Blob([await bytes()], type); } Body clone() { @@ -169,23 +150,35 @@ class Body extends Stream { throw StateError('Body has already been consumed.'); } - final blockHost = _blockHost; - if (blockHost != null) { - return Body._(blockHost: blockHost, contentType: contentType); - } - final streamHost = _streamHost; - if (streamHost != null) { - final (left, right) = streamTee(streamHost); - _streamHost = left; + if (streamHost == null) { return Body._( - streamHost: right, - byteLength: size, + blobParts: [super.slice(0, null, type)], + type: type, contentType: contentType, ); } - throw StateError('Body has no host.'); + final (left, right) = streamTee(streamHost); + _streamHost = left; + return Body._( + streamHost: right, + streamSize: _streamSize, + type: type, + contentType: contentType, + ); + } + + @override + Blob slice(int start, [int? end, String? contentType]) { + if (_streamHost != null) { + throw UnsupportedError( + 'Body.slice is unavailable for stream-backed bodies.', + ); + } + + final sliced = super.slice(start, end, contentType); + return Blob([sliced], sliced.type); } @override @@ -195,7 +188,7 @@ class Body extends Stream { void Function()? onDone, bool? cancelOnError, }) { - return stream.listen( + return stream().listen( onData, onError: onError, onDone: onDone, @@ -211,5 +204,46 @@ class Body extends Stream { _used = true; } + static Body _fromBlobInit(BodyInit init, String type) { + return Body._( + blobParts: init == null ? const [] : [init], + type: type, + contentType: _contentType(type), + ); + } + + static Body _fromFormData(FormData formData) { + final encoded = formData.encodeMultipart(); + return Body._( + streamHost: encoded.stream, + streamSize: encoded.contentLength, + type: encoded.contentType, + contentType: encoded.contentType, + ); + } + + static Body _fromStream(Stream> stream) { + return Body._(streamHost: stream.map(Uint8List.fromList)); + } + + static Future _readStream(Stream stream) async { + final builder = BytesBuilder(copy: false); + await for (final chunk in stream) { + builder.add(chunk); + } + + return builder.takeBytes(); + } + + static String _blobInitType(BodyInit init) { + return switch (init) { + final String _ => _textPlainUtf8, + final URLSearchParams _ => _urlEncodedUtf8, + final Blob blob => blob.type, + final block.Block blockHost => blockHost.type, + _ => '', + }; + } + static String? _contentType(String type) => type.isEmpty ? null : type; } diff --git a/test/body_test.dart b/test/body_test.dart index 7a274f8..0ed16b5 100644 --- a/test/body_test.dart +++ b/test/body_test.dart @@ -3,6 +3,7 @@ import 'dart:convert'; import 'dart:typed_data'; import 'package:block/block.dart' as block; +import 'package:ht/src/fetch/blob.dart' as platform_blob; import 'package:ht/src/fetch/body.dart'; import 'package:ht/src/fetch/form_data.native.dart'; import 'package:ht/src/fetch/url_search_params.dart'; @@ -10,6 +11,31 @@ import 'package:test/test.dart'; Stream expectNonNullableStream(Stream stream) => stream; +final class _ThrowingReadBlob extends platform_blob.Blob { + _ThrowingReadBlob(List bytes, {String type = ''}) + : super([Uint8List.fromList(bytes)], type); + + @override + Future arrayBuffer() { + throw StateError('arrayBuffer should not be called'); + } + + @override + Future bytes() { + throw StateError('bytes should not be called'); + } + + @override + Future text() { + throw StateError('text should not be called'); + } + + @override + Stream stream({int chunkSize = 16 * 1024}) { + throw StateError('stream should not be called'); + } +} + void main() { group('Body', () { test('string bodies decode as text and bytes', () async { @@ -55,6 +81,44 @@ void main() { expect(Body([1, 2, 3]).contentType, isNull); }); + test('extends platform Blob and implements Stream', () async { + final body = Body('hello'); + + expect(body, isA()); + expect(body, isA>()); + expect(body.type, 'text/plain;charset=utf-8'); + expect(body.size, 5); + + final slice = body.slice(1, 4); + expect(await slice.text(), 'ell'); + expect(body.bodyUsed, isFalse); + + final chunks = await expectNonNullableStream(body).toList(); + expect(chunks.expand((chunk) => chunk).toList(), utf8.encode('hello')); + expect(body.bodyUsed, isTrue); + }); + + test('normalizes ordinary init values through Blob snapshots', () async { + final bytes = Uint8List.fromList([1, 2, 3]); + final body = Body(bytes); + + bytes[0] = 9; + + expect(await body.bytes(), [1, 2, 3]); + }); + + test('uses Blob backing without calling source read methods', () async { + final blob = _ThrowingReadBlob([ + 1, + 2, + 3, + ], type: 'application/octet-stream'); + final body = Body(blob); + + expect(body.contentType, 'application/octet-stream'); + expect(await body.bytes(), [1, 2, 3]); + }); + test('exposes known byte size without consuming the body', () { final params = URLSearchParams({'a': '1', 'b': '2'}); final formData = FormData()..append('a', Multipart.text('1')); @@ -74,10 +138,30 @@ void main() { expect(formBody.bodyUsed, isFalse); }); - test('reports null size for arbitrary stream bodies', () { + test('rejects size reads for arbitrary stream bodies', () { final body = Body(Stream>.value(utf8.encode('stream'))); - expect(body.size, isNull); + expect(() => body.size, throwsUnsupportedError); + expect(body.bodyUsed, isFalse); + }); + + test('validates stream chunk size before consumption starts', () async { + final body = Body('hello'); + + expect(() => body.stream(chunkSize: 0), throwsArgumentError); + expect(body.bodyUsed, isFalse); + + final stream = body.stream(chunkSize: 2); + + expect(body.bodyUsed, isFalse); + expect(await stream.map(utf8.decode).join(), 'hello'); + expect(body.bodyUsed, isTrue); + }); + + test('stream-backed bodies cannot be sliced', () { + final body = Body(Stream>.value(utf8.encode('stream'))); + + expect(() => body.slice(0), throwsUnsupportedError); expect(body.bodyUsed, isFalse); }); @@ -110,6 +194,19 @@ void main() { expect(await clone.text(), 'hello world'); }); + test('stream chunks are copied when consumed', () async { + final controller = StreamController>(sync: true); + final chunk = Uint8List.fromList([1, 2, 3]); + final body = Body(controller.stream); + + final bytes = body.bytes(); + controller.add(chunk); + chunk[0] = 9; + await controller.close(); + + expect(await bytes, [1, 2, 3]); + }); + test('copying a stream-backed body preserves independent reads', () async { final controller = StreamController>(); scheduleMicrotask(() async { @@ -145,7 +242,7 @@ void main() { 'empty bodies return empty bytes and become used when consumed', () async { final body = Body(); - final stream = expectNonNullableStream(body.stream); + final stream = expectNonNullableStream(body); expect(body.bodyUsed, isFalse); expect(await stream.toList(), isEmpty); diff --git a/test/public_api_surface_test.dart b/test/public_api_surface_test.dart index 82c305b..a34d1f1 100644 --- a/test/public_api_surface_test.dart +++ b/test/public_api_surface_test.dart @@ -1,3 +1,6 @@ +import 'dart:async'; +import 'dart:typed_data'; + import 'package:block/block.dart' as block; import 'package:ht/ht.dart'; import 'package:test/test.dart'; @@ -42,6 +45,8 @@ void main() { expect(requestInit.method, 'POST'); expect(requestInit.priority, RequestPriority.high); expect(responseInit.status, 200); + expect(body, isA()); + expect(body, isA>()); expect(body.size, 6); expect(request.headers.has('content-type'), isTrue); expect(await multipart.bytes(), isNotEmpty);