From a7af0ddce2c1d61d7549d12d51b20761f06c6b95 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20Rubinos=20Rodr=C3=ADguez?= Date: Fri, 17 Apr 2026 16:04:20 +0200 Subject: [PATCH 1/2] feat: streaming responses support. --- src/erf.erl | 11 +- src/erf_http_server/erf_http_server_elli.erl | 62 +++- src/erf_postprocess_middleware.erl | 1 - src/erf_preprocess_middleware.erl | 1 - src/erf_router.erl | 2 + test/erf_SUITE.erl | 315 ++++++++++++++++++- 6 files changed, 386 insertions(+), 6 deletions(-) diff --git a/src/erf.erl b/src/erf.erl index ba37463..203a085 100644 --- a/src/erf.erl +++ b/src/erf.erl @@ -88,12 +88,16 @@ -type response() :: { StatusCode :: pos_integer(), Headers :: [header()], - Body :: body() | {file, binary()} + Body :: body() | {file, binary()} | stream_body() }. -type route_patterns() :: [{Route :: binary(), RouteRegEx :: binary()}]. +-type send_chunk_fun() :: fun((iodata()) -> ok | {error, closed | timeout}). -type static_dir() :: {dir, binary()}. -type static_file() :: {file, binary()}. -type static_route() :: {Path :: binary(), Resource :: static_file() | static_dir()}. +-type stream_body() :: {stream, stream_producer()}. +%% Push-style streaming body. +-type stream_producer() :: fun((send_chunk_fun()) -> any()). %%% TYPE EXPORTS -export_type([ @@ -107,7 +111,10 @@ request/0, response/0, route_patterns/0, - static_route/0 + send_chunk_fun/0, + static_route/0, + stream_body/0, + stream_producer/0 ]). %%% MACROS diff --git a/src/erf_http_server/erf_http_server_elli.erl b/src/erf_http_server/erf_http_server_elli.erl index 3ed75ab..d28fce7 100644 --- a/src/erf_http_server/erf_http_server_elli.erl +++ b/src/erf_http_server/erf_http_server_elli.erl @@ -91,7 +91,12 @@ init(Req, [Name]) -> handle(ElliRequest, [Name]) -> ErfRequest = preprocess(Name, ElliRequest), ErfResponse = erf_router:handle(Name, ErfRequest), - postprocess(ErfRequest, ErfResponse). + case ErfResponse of + {Status, Headers, {stream, Producer}} -> + handle_stream(Name, ElliRequest, Status, Headers, Producer); + _ -> + postprocess(ErfRequest, ErfResponse) + end. -spec handle_event(Event, Args, CallbackArgs) -> ok when Event :: elli_handler:event(), @@ -131,6 +136,61 @@ handle_event(_Event, _Args, _CallbackArgs) -> %%%----------------------------------------------------------------------------- %%% INTERNAL FUNCTIONS %%%----------------------------------------------------------------------------- +-spec handle_stream(Name, ElliRequest, Status, Headers, Producer) -> Result when + Name :: atom(), + ElliRequest :: elli:req(), + Status :: pos_integer(), + Headers :: [erf:header()], + Producer :: erf:stream_producer(), + Result :: elli_handler:result(). +handle_stream(Name, ElliRequest, Status, Headers, Producer) -> + case elli_request:chunk_ref(ElliRequest) of + {error, not_supported} -> + {ok, LogLevel} = erf_conf:log_level(Name), + ?LOG( + LogLevel, + "[erf] Streaming response requested on HTTP/1.0; returning 505" + ), + {505, [], <<>>}; + ChunkRef -> + case Status of + 200 -> + ok; + _ -> + {ok, LogLevel} = erf_conf:log_level(Name), + ?LOG( + LogLevel, + "[erf] elli backend forces chunked responses to HTTP 200; " + "status ~p from callback ignored", + [Status] + ) + end, + _ = erlang:spawn(fun() -> drive_stream(Name, ChunkRef, Producer) end), + {chunk, Headers} + end. + +-spec drive_stream(Name, ChunkRef, Producer) -> ok when + Name :: atom(), + ChunkRef :: pid(), + Producer :: erf:stream_producer(). +drive_stream(Name, ChunkRef, Producer) -> + Send = fun(Data) -> elli_request:send_chunk(ChunkRef, Data) end, + try + _ = Producer(Send), + ok + catch + Class:Reason:Stack -> + {ok, LogLevel} = erf_conf:log_level(Name), + ?LOG( + LogLevel, + "[erf] Stream producer crashed ~p:~p~nStacktrace:~n~p", + [Class, Reason, Stack] + ), + ok + after + _ = elli_request:close_chunk(ChunkRef) + end. + -spec build_elli_conf(Name, HTTPServerConf, ExtraElliConf) -> ElliConf when Name :: atom(), HTTPServerConf :: erf_http_server:conf(), diff --git a/src/erf_postprocess_middleware.erl b/src/erf_postprocess_middleware.erl index 17c65b0..ec08fb5 100644 --- a/src/erf_postprocess_middleware.erl +++ b/src/erf_postprocess_middleware.erl @@ -12,7 +12,6 @@ %% See the License for the specific language governing permissions and %% limitations under the License -%% @doc Behaviour for erf's postprocessing middlewares. -module(erf_postprocess_middleware). %%% TYPES diff --git a/src/erf_preprocess_middleware.erl b/src/erf_preprocess_middleware.erl index e377ef5..c4ae6b4 100644 --- a/src/erf_preprocess_middleware.erl +++ b/src/erf_preprocess_middleware.erl @@ -12,7 +12,6 @@ %% See the License for the specific language governing permissions and %% limitations under the License -%% @doc Behaviour for erf's preprocessing middlewares. -module(erf_preprocess_middleware). %%% TYPES diff --git a/src/erf_router.erl b/src/erf_router.erl index 35affe5..40bc26e 100644 --- a/src/erf_router.erl +++ b/src/erf_router.erl @@ -879,6 +879,8 @@ load_binary(ModuleName, Bin) -> Response :: erf:response(). postprocess(_Request, {_Status, _Headers, {file, _Path}} = Response) -> Response; +postprocess(_Request, {_Status, _Headers, {stream, _Producer}} = Response) -> + Response; postprocess(_Request, {Status, RawHeaders, RawBody}) -> ContentTypeHeader = string:casefold(<<"content-type">>), case proplists:get_value(ContentTypeHeader, RawHeaders, undefined) of diff --git a/test/erf_SUITE.erl b/test/erf_SUITE.erl index c4c07fa..3bef6a9 100644 --- a/test/erf_SUITE.erl +++ b/test/erf_SUITE.erl @@ -29,7 +29,13 @@ all() -> statics, swagger_ui, start_stop, - reload_conf + reload_conf, + stream, + stream_http_1_0, + stream_with_postprocess_middleware, + stream_producer_crash, + stream_non_200_status, + callback_crash ]. %%%----------------------------------------------------------------------------- @@ -577,3 +583,310 @@ reload_conf(_Conf) -> meck:unload(erf_callback_2), ok. + +stream(_Conf) -> + meck:new([erf_callback], [non_strict, no_link]), + + Chunks = [<<"chunk-1">>, <<"chunk-2">>, <<"chunk-3">>], + Producer = + fun(Send) -> + lists:foreach(fun(Chunk) -> ok = Send(Chunk) end, Chunks) + end, + + meck:expect( + erf_callback, + get_foo, + fun(_Request) -> + {200, [{<<"content-type">>, <<"text/plain">>}], {stream, Producer}} + end + ), + + {ok, _Pid} = erf:start_link(#{ + spec_path => filename:join( + [code:lib_dir(erf), "test", <<"fixtures/with_refs_oas_3_0_spec.json">>] + ), + callback => erf_callback, + port => 8789, + name => erf_server + }), + + {ok, {{"HTTP/1.1", 200, "OK"}, Headers, Body}} = + httpc:request( + get, + {"http://localhost:8789/1/foo", []}, + [], + [{body_format, binary}] + ), + + ?assertEqual(iolist_to_binary(Chunks), Body), + ?assertEqual( + "text/plain", + proplists:get_value("content-type", Headers) + ), + + RawBytes = raw_http_1_1_get(8789, <<"/1/foo">>), + ?assertMatch({match, _}, re:run(RawBytes, <<"Transfer-Encoding: chunked">>, [caseless])), + ?assertMatch({match, _}, re:run(RawBytes, <<"\r\n7\r\nchunk-1\r\n">>)), + ?assertMatch({match, _}, re:run(RawBytes, <<"\r\n7\r\nchunk-2\r\n">>)), + ?assertMatch({match, _}, re:run(RawBytes, <<"\r\n7\r\nchunk-3\r\n">>)), + ?assertMatch({match, _}, re:run(RawBytes, <<"\r\n0\r\n\r\n$">>)), + + ok = erf:stop(erf_server), + + meck:unload(erf_callback), + + ok. + +stream_http_1_0(_Conf) -> + meck:new([erf_callback], [non_strict, no_link]), + + meck:expect( + erf_callback, + get_foo, + fun(_Request) -> + Producer = fun(Send) -> ok = Send(<<"unreachable">>) end, + {200, [{<<"content-type">>, <<"text/plain">>}], {stream, Producer}} + end + ), + + {ok, _Pid} = erf:start_link(#{ + spec_path => filename:join( + [code:lib_dir(erf), "test", <<"fixtures/with_refs_oas_3_0_spec.json">>] + ), + callback => erf_callback, + port => 8789, + name => erf_server + }), + + {ok, Sock} = gen_tcp:connect( + "localhost", + 8789, + [binary, {active, false}, {packet, http_bin}] + ), + ok = gen_tcp:send( + Sock, + <<"GET /1/foo HTTP/1.0\r\nHost: localhost:8789\r\n\r\n">> + ), + {ok, {http_response, _Version, Status, _Reason}} = gen_tcp:recv(Sock, 0, 5000), + gen_tcp:close(Sock), + + ?assertEqual(505, Status), + + ok = erf:stop(erf_server), + + meck:unload(erf_callback), + + ok. + +stream_with_postprocess_middleware(_Conf) -> + meck:new( + [erf_callback, erf_postprocess_middleware], + [non_strict, no_link] + ), + + Chunks = [<<"alpha">>, <<"beta">>], + Producer = + fun(Send) -> + lists:foreach(fun(Chunk) -> ok = Send(Chunk) end, Chunks) + end, + + meck:expect( + erf_callback, + get_foo, + fun(_Request) -> + {200, [{<<"content-type">>, <<"text/plain">>}], {stream, Producer}} + end + ), + + meck:expect( + erf_postprocess_middleware, + postprocess, + fun(_Req, {Status, Headers, Body}) -> + {Status, [{<<"x-touched-by-middleware">>, <<"yes">>} | Headers], Body} + end + ), + + {ok, _Pid} = erf:start_link(#{ + spec_path => filename:join( + [code:lib_dir(erf), "test", <<"fixtures/with_refs_oas_3_0_spec.json">>] + ), + callback => erf_callback, + postprocess_middlewares => [erf_postprocess_middleware], + port => 8789, + name => erf_server + }), + + {ok, {{"HTTP/1.1", 200, "OK"}, Headers, Body}} = + httpc:request( + get, + {"http://localhost:8789/1/foo", []}, + [], + [{body_format, binary}] + ), + + ?assertEqual(iolist_to_binary(Chunks), Body), + ?assertEqual( + "yes", + proplists:get_value("x-touched-by-middleware", Headers) + ), + + ok = erf:stop(erf_server), + + meck:unload([erf_callback, erf_postprocess_middleware]), + + ok. + +stream_producer_crash(_Conf) -> + meck:new([erf_callback], [non_strict, no_link]), + + meck:expect( + erf_callback, + get_foo, + fun(_Request) -> + BoomProducer = fun(_Send) -> erlang:error(boom) end, + {200, [{<<"content-type">>, <<"text/plain">>}], {stream, BoomProducer}} + end + ), + + {ok, _Pid} = erf:start_link(#{ + spec_path => filename:join( + [code:lib_dir(erf), "test", <<"fixtures/with_refs_oas_3_0_spec.json">>] + ), + callback => erf_callback, + port => 8789, + name => erf_server + }), + + {ok, {{"HTTP/1.1", 200, "OK"}, _Headers, Body}} = + httpc:request( + get, + {"http://localhost:8789/1/foo", []}, + [], + [{body_format, binary}] + ), + ?assertEqual(<<>>, Body), + + meck:expect( + erf_callback, + get_foo, + fun(_Request) -> + {200, [], <<"bar">>} + end + ), + ?assertMatch( + {ok, {{"HTTP/1.1", 200, "OK"}, _, <<"\"bar\"">>}}, + httpc:request( + get, + {"http://localhost:8789/1/foo", []}, + [], + [{body_format, binary}] + ) + ), + + ok = erf:stop(erf_server), + + meck:unload(erf_callback), + + ok. + +stream_non_200_status(_Conf) -> + meck:new([erf_callback], [non_strict, no_link]), + + meck:expect( + erf_callback, + get_foo, + fun(_Request) -> + Producer = fun(Send) -> ok = Send(<<"teapot-body">>) end, + {418, [{<<"content-type">>, <<"text/plain">>}], {stream, Producer}} + end + ), + + {ok, _Pid} = erf:start_link(#{ + spec_path => filename:join( + [code:lib_dir(erf), "test", <<"fixtures/with_refs_oas_3_0_spec.json">>] + ), + callback => erf_callback, + port => 8789, + name => erf_server + }), + + {ok, {{"HTTP/1.1", 200, "OK"}, _Headers, Body}} = + httpc:request( + get, + {"http://localhost:8789/1/foo", []}, + [], + [{body_format, binary}] + ), + ?assertEqual(<<"teapot-body">>, Body), + + ok = erf:stop(erf_server), + + meck:unload(erf_callback), + + ok. + +callback_crash(_Conf) -> + meck:new([erf_callback], [non_strict, no_link]), + + meck:expect( + erf_callback, + get_foo, + fun(_Request) -> + erlang:error(kaboom) + end + ), + + {ok, _Pid} = erf:start_link(#{ + spec_path => filename:join( + [code:lib_dir(erf), "test", <<"fixtures/with_refs_oas_3_0_spec.json">>] + ), + callback => erf_callback, + port => 8789, + name => erf_server + }), + + %% Elli catches the crash and returns a 500. This exercises + %% handle_event(request_error, ...) and handle_exception/3. + {ok, {{"HTTP/1.1", Status, _}, _Headers, _Body}} = + httpc:request( + get, + {"http://localhost:8789/1/foo", []}, + [], + [{body_format, binary}] + ), + ?assertEqual(500, Status), + + ok = erf:stop(erf_server), + + meck:unload(erf_callback), + + ok. + +%%%----------------------------------------------------------------------------- +%%% INTERNAL HELPERS +%%%----------------------------------------------------------------------------- +raw_http_1_1_get(Port, Path) -> + {ok, Sock} = gen_tcp:connect( + "localhost", + Port, + [binary, {active, false}, {packet, raw}] + ), + ok = gen_tcp:send( + Sock, + << + "GET ", + Path/binary, + " HTTP/1.1\r\nHost: localhost:", + (integer_to_binary(Port))/binary, + "\r\nConnection: close\r\n\r\n" + >> + ), + All = raw_recv(Sock, <<>>), + gen_tcp:close(Sock), + All. + +raw_recv(Sock, Acc) -> + case gen_tcp:recv(Sock, 0, 5000) of + {ok, Bytes} -> raw_recv(Sock, <>); + {error, closed} -> Acc + end. From 945b31d4ba4deaf63ca791aeed11708ff9774feb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Miguel=20Rubinos=20Rodr=C3=ADguez?= Date: Sat, 18 Apr 2026 09:26:24 +0200 Subject: [PATCH 2/2] fix: comments reviewed. too agressive on previous commit. --- src/erf.erl | 1 - src/erf_postprocess_middleware.erl | 1 + src/erf_preprocess_middleware.erl | 1 + 3 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/erf.erl b/src/erf.erl index 203a085..d6cc072 100644 --- a/src/erf.erl +++ b/src/erf.erl @@ -96,7 +96,6 @@ -type static_file() :: {file, binary()}. -type static_route() :: {Path :: binary(), Resource :: static_file() | static_dir()}. -type stream_body() :: {stream, stream_producer()}. -%% Push-style streaming body. -type stream_producer() :: fun((send_chunk_fun()) -> any()). %%% TYPE EXPORTS diff --git a/src/erf_postprocess_middleware.erl b/src/erf_postprocess_middleware.erl index ec08fb5..17c65b0 100644 --- a/src/erf_postprocess_middleware.erl +++ b/src/erf_postprocess_middleware.erl @@ -12,6 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License +%% @doc Behaviour for erf's postprocessing middlewares. -module(erf_postprocess_middleware). %%% TYPES diff --git a/src/erf_preprocess_middleware.erl b/src/erf_preprocess_middleware.erl index c4ae6b4..e377ef5 100644 --- a/src/erf_preprocess_middleware.erl +++ b/src/erf_preprocess_middleware.erl @@ -12,6 +12,7 @@ %% See the License for the specific language governing permissions and %% limitations under the License +%% @doc Behaviour for erf's preprocessing middlewares. -module(erf_preprocess_middleware). %%% TYPES