Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/httpd.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
-define(INTERNAL_SERVER_ERROR, 500).
-define(BAD_REQUEST, 400).
-define(NOT_FOUND, 404).
-define(NOT_ALLOWED, 405).
-define(OK, 200).
-define(CONTINUE, 100).
-define(SWITCHING_PROTOCOLS, 101).
92 changes: 73 additions & 19 deletions src/gen_tcp_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,23 @@
-callback handle_receive(Socket :: term(), Packet :: binary(), State :: term()) ->
{reply, Packet :: iolist(), NewState :: term()} | {noreply, NewState :: term()} | {close, Packet :: iolist()} | close.

-callback handle_tcp_closed(Socket :: term(), State :: term()) -> ok.
-callback handle_tcp_closed(Socket :: term(), State :: term()) -> NewState :: term().

%% Optional callback: invoked for messages that gen_tcp_server does not handle
%% itself (e.g. internal timer messages). Return {noreply, NewState} to keep
%% the connection open, or {close, Socket, NewState} to close a specific socket.
-callback handle_info(Msg :: term(), State :: term()) ->
{noreply, NewState :: term()} | {close, Socket :: term(), NewState :: term()}.
-optional_callbacks([handle_info/2]).

% -define(TRACE_ENABLED, true).
-include_lib("atomvm_httpd/include/trace.hrl").

-record(state, {
handler,
handler_state
handler_state,
connections = #{},
max_connections = 0
}).

-define(DEFAULT_BIND_OPTIONS, #{
Expand Down Expand Up @@ -78,17 +87,21 @@ stop(Server) ->
%% @hidden
init({BindOptions, SocketOptions, Handler, Args}) ->
Self = self(),
MaxConnections = maps:get(max_connections, SocketOptions, 0),
Comment on lines 89 to +90
%% Strip max_connections before passing to set_socket_options/2 so that
%% socket:setopt/3 is never called with an unknown option key.
CleanSocketOptions = maps:remove(max_connections, SocketOptions),
case socket:open(inet, stream, tcp) of
{ok, Socket} ->
ok = set_socket_options(Socket, SocketOptions),
ok = set_socket_options(Socket, CleanSocketOptions),
case socket:bind(Socket, BindOptions) of
ok ->
case socket:listen(Socket) of
ok ->
spawn(fun() -> accept(Self, Socket) end),
spawn_link(fun() -> accept(Self, Socket) end),
case Handler:init(Args) of
{ok, HandlerState} ->
{ok, #state{handler = Handler, handler_state = HandlerState}};
{ok, #state{handler = Handler, handler_state = HandlerState, max_connections = MaxConnections}};
HandlerError ->
try_close(Socket),
{stop, {handler_error, HandlerError}}
Expand All @@ -108,7 +121,7 @@ init({Socket, Handler, Args}) ->
Self = self(),
case Handler:init(Args) of
Comment on lines 121 to 122
{ok, HandlerState} ->
spawn(fun() -> loop(Self, Socket) end),
spawn_link(fun() -> loop(Self, Socket) end),
{ok, #state{handler = Handler, handler_state = HandlerState}};
HandlerError ->
{stop, {handler_error, HandlerError}}
Expand All @@ -123,14 +136,55 @@ handle_cast(_Msg, State) ->
{noreply, State}.

%% @hidden
handle_info({new_connection, Socket}, State) ->
#state{connections=Conns, max_connections=MaxConns} = State,
case MaxConns > 0 andalso map_size(Conns) >= MaxConns of
true ->
?TRACE("Connection limit reached (~p), rejecting ~p at accept", [MaxConns, Socket]),
try_close(Socket),
{noreply, State};
false ->
?TRACE("Tracking new connection ~p (~p/~p)", [Socket, map_size(Conns) + 1, MaxConns]),
{noreply, State#state{connections = Conns#{Socket => true}}}
end;
handle_info({tcp_closed, Socket}, State) ->
?TRACE("TCP Socket closed ~p", [Socket]),
#state{handler=Handler, handler_state=HandlerState} = State,
#state{handler=Handler, handler_state=HandlerState, connections=Conns} = State,
NewHandlerState = Handler:handle_tcp_closed(Socket, HandlerState),
{noreply, State#state{handler_state=NewHandlerState}};
NewConns = maps:remove(Socket, Conns),
{noreply, State#state{handler_state=NewHandlerState, connections=NewConns}};
handle_info({tcp, Socket, Packet}, State) ->
#state{handler=Handler, handler_state=HandlerState} = State,
?TRACE("received packet: len(~p) from ~p", [erlang:byte_size(Packet), socket:peername(Socket)]),
handle_tcp_data(Socket, Packet, State);
handle_info({'EXIT', _Pid, _Reason}, State) ->
?TRACE("Linked process ~p exited: ~p", [_Pid, _Reason]),
{noreply, State};
handle_info(Info, State) ->
%% Forward unrecognised messages to the handler if it exports handle_info/2.
%% The handler may return {noreply, NewState} or {close, Socket, NewState}.
#state{handler=Handler, handler_state=HandlerState} = State,
case erlang:function_exported(Handler, handle_info, 2) of
true ->
case Handler:handle_info(Info, HandlerState) of
{noreply, NewHandlerState} ->
{noreply, State#state{handler_state = NewHandlerState}};
{close, Socket, NewHandlerState} ->
?TRACE("handle_info requested close for socket ~p", [Socket]),
try_close(Socket),
{noreply, State#state{handler_state = NewHandlerState}}
end;
false ->
io:format("Received spurious info msg: ~p~n", [Info]),
{noreply, State}
end.

%% @hidden
terminate(_Reason, _State) ->
ok.

%% @private
handle_tcp_data(Socket, Packet, State) ->
#state{handler=Handler, handler_state=HandlerState} = State,
case Handler:handle_receive(Socket, Packet, HandlerState) of
{reply, ResponsePacket, ResponseState} ->
?TRACE("Sending reply to endpoint ~p", [socket:peername(Socket)]),
Expand All @@ -153,7 +207,7 @@ handle_info({tcp, Socket, Packet}, State) ->
ok ->
try_close(Socket);
{error, closed} ->
ok; %% Already closed, nothing to do
ok;
{error, _Reason} ->
try_close(Socket)
end,
Expand All @@ -166,14 +220,7 @@ handle_info({tcp, Socket, Packet}, State) ->
?TRACE("Unexpected response from handler ~p: ~p", [Handler, _SomethingElse]),
try_close(Socket),
{noreply, State}
end;
handle_info(Info, State) ->
io:format("Received spurious info msg: ~p~n", [Info]),
{noreply, State}.

%% @hidden
terminate(_Reason, _State) ->
ok.
end.

%%
%% internal functions
Expand Down Expand Up @@ -275,7 +322,11 @@ accept(ControllingProcess, ListenSocket) ->
case socket:accept(ListenSocket) of
{ok, Connection} ->
?TRACE("Accepted connection from ~p", [socket:peername(Connection)]),
spawn(fun() -> accept(ControllingProcess, ListenSocket) end),
%% Notify controlling process immediately so max_connections is enforced
%% at accept time (before any data arrives). The controlling process may
%% close the socket if the limit is exceeded; loop/2 will detect the close.
ControllingProcess ! {new_connection, Connection},
spawn_link(fun() -> accept(ControllingProcess, ListenSocket) end),
loop(ControllingProcess, Connection);
_Error ->
?TRACE("Error accepting connection: ~p", [_Error]),
Expand All @@ -295,6 +346,9 @@ loop(ControllingProcess, Connection) ->
?TRACE("Peer closed connection ~p", [Connection]),
ControllingProcess ! {tcp_closed, Connection},
ok;
{error, timeout} ->
?TRACE("Timeout on recv from ~p, retrying", [Connection]),
loop(ControllingProcess, Connection);
{error, _SomethingElse} ->
?TRACE("Some other error occurred ~p: ~p", [Connection, _SomethingElse]),
try_close(Connection)
Expand Down
Loading