Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/ecpool.appup.src
Original file line number Diff line number Diff line change
@@ -1,11 +1,22 @@
%% -*-: erlang -*-
{"0.5.11",
{"0.5.12",
[
{"0.5.11", [
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_pool_sup, brutal_purge, soft_purge, []}
]},
{"0.5.10", [
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_pool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_monitor, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []}
]},
{"0.5.9", [
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_pool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_monitor, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []}
]},
Expand All @@ -17,6 +28,7 @@
{apply, {ecpool_monitor, update_clients_global, []}},
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_pool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{apply, {ecpool_monitor, upgrade_clients_state, []}},
{load_module, ecpool, brutal_purge, soft_purge, []}
Expand All @@ -27,6 +39,7 @@
{apply, {ecpool_monitor, update_clients_global, []}},
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_pool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{apply, {ecpool_monitor, upgrade_clients_state, []}},
{load_module, ecpool, brutal_purge, soft_purge, []},
Expand All @@ -41,21 +54,34 @@
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_pool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []}
]}
],
[
{"0.5.11", [
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_pool_sup, brutal_purge, soft_purge, []}
]},
{"0.5.10", [
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_pool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_monitor, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []}
]},
{"0.5.9", [
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_pool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_monitor, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []}
]},
{<<"0\\.5\\.[3-8]">>, [
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_pool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool, brutal_purge, soft_purge, []},
{apply, {ecpool_monitor, ensure_monitor_stopped, []}},
Expand All @@ -64,6 +90,7 @@
{<<"0\\.5\\.[0-2]">>, [
{load_module, ecpool_pool, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_pool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_worker, brutal_purge, soft_purge, []},
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []},
Expand All @@ -76,6 +103,7 @@
{load_module, ecpool, brutal_purge, soft_purge, []},
{load_module, ecpool_worker_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_sup, brutal_purge, soft_purge, []},
{load_module, ecpool_pool_sup, brutal_purge, soft_purge, []},
{apply, {ecpool_monitor, ensure_monitor_stopped, []}},
{delete_module,ecpool_monitor}
]}
Expand Down
4 changes: 4 additions & 0 deletions src/ecpool.erl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
, start_pool/3
, start_sup_pool/3
, stop_sup_pool/1
, stop_sup_pool/2
, get_client/1
, get_client/2
, pick_and_do/3
Expand Down Expand Up @@ -91,6 +92,9 @@ start_sup_pool(Pool, Mod, Opts) ->
stop_sup_pool(Pool) ->
ecpool_sup:stop_pool(Pool).

stop_sup_pool(Pool, Opts) ->
ecpool_sup:stop_pool(Pool, Opts).

%% @doc Get client/connection
-spec(get_client(pool_name()) -> get_client_ret()).
get_client(Pool) ->
Expand Down
30 changes: 29 additions & 1 deletion src/ecpool_pool_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,41 @@
%% Supervisor callbacks
-export([init/1]).

-export([clear_init_incomplete/0]).

start_link(Pool, Mod, Opts) ->
supervisor:start_link(?MODULE, [Pool, Mod, Opts]).
case supervisor:start_link(?MODULE, [Pool, Mod, Opts]) of
{ok, SupPid} = Ret ->
_ = init_complete(SupPid),
Ret;
Error ->
Error
end.

init([Pool, Mod, Opts]) ->
set_init_incomplete(),
{ok, { {one_for_all, 10, 100}, [
{pool, {ecpool_pool, start_link, [Pool, Opts]},
transient, 16#ffff, worker, [ecpool_pool]},
{worker_sup, {ecpool_worker_sup, start_link, [Pool, Mod, Opts]},
transient, infinity, supervisor, [ecpool_worker_sup]}] }}.

init_complete(SupPid) ->
%% Not a real supervisor child, just for clearing the process dict with key `init_incomplete`
DummySpec = #{
id => clear_init_incomplete,
start => {?MODULE, clear_init_incomplete, []},
restart => temporary,
shutdown => brutal_kill,
type => worker,
modules => [?MODULE]
},
supervisor:start_child(SupPid, DummySpec).

set_init_incomplete() ->
_ = erlang:put(init_incomplete, true),
ok.

clear_init_incomplete() ->
_ = erlang:erase(init_incomplete),
{error, dummy_child}.
77 changes: 71 additions & 6 deletions src/ecpool_sup.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
%% API
-export([ start_pool/3
, stop_pool/1
, stop_pool/2
, get_pool/1
]).

Expand All @@ -32,6 +33,9 @@
-export([init/1]).

-type pool_name() :: ecpool:pool_name().
-type stop_opts() :: #{timeout => non_neg_integer()}.

-define(STOP_TIMOEUT, infinity).

%% @doc Start supervisor.
-spec(start_link() -> {ok, pid()} | {error, term()}).
Expand All @@ -50,13 +54,30 @@ start_pool(Pool, Mod, Opts) ->
%% @doc Stop a pool.
-spec(stop_pool(Pool :: pool_name()) -> ok | {error, term()}).
stop_pool(Pool) ->
stop_pool(Pool, #{}).

-spec(stop_pool(Pool :: pool_name(), stop_opts()) -> ok | {error, term()}).
stop_pool(Pool, Opts) ->
ChildId = child_id(Pool),
case supervisor:terminate_child(?MODULE, ChildId) of
ok ->
supervisor:delete_child(?MODULE, ChildId);
{error, Reason} ->
{error, Reason}
end.
Timeout = maps:get(timeout, Opts, ?STOP_TIMOEUT),
try gen_server:call(?MODULE, {terminate_child, ChildId}, Timeout) of
ok -> delete_child(ChildId, Timeout);
{error, Reason} -> {error, Reason}
catch
exit:{R, _} when R == noproc; R == normal; R == shutdown ->
{error, not_found};
exit:{timeout, _} ->
%% Sometimes the `ecpool_sup` is not responding to terminate request as the `ecpool_pool_sup`
%% process got stuck in connecting. In this case, we need to cancel the connection
%% by force killing it so the `ecpool_sup` won't be stuck.
_ = kill_ecpool_pool_sup_if_stuck(),
%% Now the `ecpool_pool_sup` process can be in one of the following state:
%% - has been force killed, or
%% - has gone down by itself or by the `terminate_child` call, or
%% - is still running normally
%% In any case we try to remove it from childspec as we have sent a `terminate_child`.
delete_child(ChildId, Timeout)
end.

%% @doc Get a pool.
-spec(get_pool(pool_name()) -> undefined | pid()).
Expand Down Expand Up @@ -90,3 +111,47 @@ pool_spec(Pool, Mod, Opts) ->

child_id(Pool) -> {pool_sup, Pool}.

%%--------------------------------------------------------------------
%% Internal functions
%%--------------------------------------------------------------------

kill_ecpool_pool_sup_if_stuck() ->
case process_info(whereis(?MODULE), links) of
{links, LinkedPids} ->
case search_stuck_ecpool_pool_sup(LinkedPids) of
{ok, Pid} ->
exit(Pid, kill),
{ok, Pid};
Err ->
Err
end;
undefined ->
{error, not_found}
end.

search_stuck_ecpool_pool_sup([]) ->
{error, not_found};
search_stuck_ecpool_pool_sup([Pid | Rest]) ->
case process_info(Pid, dictionary) of
{dictionary, Dicts} ->
case proplists:get_value('$initial_call', Dicts) of
{supervisor, ecpool_pool_sup, _} ->
case proplists:get_value(init_incomplete, Dicts) of
true -> {ok, Pid};
_ -> {error, {not_stuck_in_init, Pid}}
end;
_ ->
search_stuck_ecpool_pool_sup(Rest)
end;
undefined ->
search_stuck_ecpool_pool_sup(Rest)
end.

delete_child(ChildId, Timeout) ->
try gen_server:call(?MODULE, {delete_child, ChildId}, Timeout)
catch
exit:{R, _} when R == noproc; R == normal; R == shutdown ->
{error, not_found};
exit:{timeout, _} ->
{error, timeout}
end.
14 changes: 14 additions & 0 deletions test/ecpool_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ groups() ->
[t_start_pool,
t_start_pool_any_name,
t_start_sup_pool,
t_start_sup_pool_timeout,
t_empty_pool,
t_empty_hash_pool,
t_restart_client,
Expand Down Expand Up @@ -113,6 +114,19 @@ t_start_sup_pool(_Config) ->
ecpool:stop_sup_pool(xpool),
?assertEqual([], ecpool_sup:pools()).

t_start_sup_pool_timeout(_Config) ->
spawn_link(fun() ->
?assertMatch({error, {killed, _}},
ecpool:start_sup_pool(timeout_pool, test_timeout_client, ?POOL_OPTS))
end),
timer:sleep(100),
{Time, Val} = timer:tc(ecpool, stop_sup_pool, [timeout_pool, #{timeout => 200}]),
?assert(Time/1000 < 500),
%% The `ecpool:start_sup_pool/3` has not completed before it was cancelled (killed),
%% so `ecpool:stop_sup_pool/2` returns `{error, not_found}`.
?assertEqual({error, not_found}, Val),
?assertEqual([], ecpool_sup:pools()).

t_restart_client(_Config) ->
ecpool:start_pool(?POOL, test_client, [{pool_size, 4}]),
Workers1 = ecpool:workers(?POOL),
Expand Down
26 changes: 26 additions & 0 deletions test/test_timeout_client.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------

-module(test_timeout_client).

-behaviour(ecpool_worker).

-export([connect/1]).

connect(Options) ->
Delay = proplists:get_value(delay, Options, 5000),
timer:sleep(Delay),
{ok, erlang:spawn_link(fun() -> ok end)}.