Skip to content
This repository has been archived by the owner on Feb 15, 2018. It is now read-only.

Commit

Permalink
Rework the startup of the plugin
Browse files Browse the repository at this point in the history
- Addresses #5 by ensuring inets is running
- Only attempt to register if not already in Consul
- Only attempt to join cluster if not already a member of the cluster
- Simplify the startup flow and the submission of Consul health check on startup
- Wrap the rabbit_log methods for a consistent prefix
  • Loading branch information
gmr committed Jul 30, 2015
1 parent 09b31b0 commit 9262115
Showing 1 changed file with 124 additions and 91 deletions.
215 changes: 124 additions & 91 deletions src/autocluster_consul.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,24 +27,29 @@
%% @end
%%
init() ->
io:format("~n Consul Cluster Initializing: "),
case register() of
application:ensure_started(inets),
case maybe_register() of
ok ->
io:format("Node registered~n");
{error, 400} ->
io:format("permission denied when registering node with Consul, did you provide a ACL token?~n"),
rabbit:stop_and_halt();
{error, Error} ->
io:format("Error registering: ~p~n", [Error]),
rabbit:stop_and_halt()
CheckInterval = autocluster_consul_config:service_ttl() * 1000,
send_check_pass(),
{ok, _} = timer:apply_interval(CheckInterval, ?MODULE, send_check_pass, []);
error ->
warning("Failed to register or join cluster")
end,
send_check_pass(),
Interval = autocluster_consul_config:service_ttl() * 1000,
{ok, _} = timer:apply_interval(Interval, ?MODULE, send_check_pass, []),
Nodes = cluster_nodes(),
case lists:member(node(), Nodes) of
true -> ok;
false -> join_cluster(Nodes)
ok.


%% @spec send_check_pass() -> ok
%% @doc Let Consul know that the health check should be passing
%% @end
%%
send_check_pass() ->
Service = list_to_atom("service:" ++ autocluster_consul_config:service()),
case autocluster_consul_client:get([agent, check, pass, Service], []) of
ok -> ok;
{error, Reason} ->
err("Error updating Consul health check: ~p~n", [Reason]),
ok
end.


Expand All @@ -54,32 +59,54 @@ init() ->
%% @end
%%
shutdown() ->
io:format("~n Consul Cluster Deregistering: "),
case deregister() of
ok ->
io:format("Node deregistered~n");
Other ->
io:format("Error deregistering: ~p~n", [Other])
end,
case unregister() of
ok -> info("Unregistered");
{error, Error} -> err("Error unregistering: ~p", [Error])
end.


%% @private
%% @spec join_cluster(term()) -> ok
%% @doc Have the current node join a cluster using the specified discovery node
%% @end
%%
join_cluster([]) ->
debug("No nodes in existing cluster"),
ok;
join_cluster(Nodes) ->
debug("Joining existing cluster: ~p", [Nodes]),
application:stop(rabbit),
mnesia:stop(),
rabbit_mnesia:reset(),
rabbit_mnesia:join_cluster(lists:nth(1, Nodes), disc),
mnesia:start(),
rabbit:start(),
info("Cluster joined"),
ok.


%% @private
%% @spec extract_nodes() -> list()
%% @doc Fetch the list of cluster nodes from Consul, returning them as a list of
%% atoms.
%% @spec maybe_register() -> ok|error
%% @doc Register with Consul and join the cluster if needed
%% @end
%%
cluster_nodes() ->
{Path, Args} = case autocluster_consul_config:cluster_name() of
undefined -> {[catalog, service, autocluster_consul_config:service()], []};
Name -> {[catalog, service, autocluster_consul_config:service()], [{tag, Name}]}
end,
case autocluster_consul_client:get(Path, Args) of
{ok, Nodes} -> extract_nodes(Nodes);
{error, Reason} ->
error_logger:error_msg("Error fetching nodes from consul: ~p~n", [Reason]),
[]
maybe_register() ->
Nodes = cluster_nodes(),
case lists:member(node(), Nodes) of
true ->
debug("Node is already registered"),
ok;
false ->
case register() of
ok ->
join_cluster(Nodes);
{error, 400} ->
err("Permission denied when registering node with Consul"),
error;
{error, Error} ->
err("Error registering: ~p", [Error]),
error
end
end.


Expand All @@ -89,39 +116,42 @@ cluster_nodes() ->
%% @end
%%
register() ->
info("Registering node with Consul"),
case autocluster_consul_client:post([agent, service, register], registration_body()) of
ok -> ok;
{error, Reason} ->
error_logger:error_msg("Error registering node with consul: ~p~n", [Reason]),
{error, Reason}
Error -> Error
end.


%% @private
%% @spec register() -> mixed
%% @spec unregister() -> mixed
%% @doc Deregister the rabbitmq service for this node from Consul
%% @end
%%
deregister() ->
unregister() ->
info("Unregistering node with Consul"),
case autocluster_consul_client:get([agent, service, deregister, autocluster_consul_config:service()], []) of
{ok, _} -> ok;
{error, Reason} ->
error_logger:error_msg("Error fetching deregistering node from consul: ~p~n", [Reason]),
[]
Error -> Error
end.


%% @spec send_check_pass() -> ok
%% @doc Let Consul know that the health check should be passing
%% @private
%% @spec cluster_nodes() -> list()
%% @doc Fetch the list of cluster nodes from Consul, returning them as a list of
%% atoms.
%% @end
%%
send_check_pass() ->
Service = list_to_atom("service:" ++ autocluster_consul_config:service()),
case autocluster_consul_client:get([agent, check, pass, Service], []) of
ok -> ok;
{error, Reason} ->
error_logger:error_msg("Error updating Consul health check: ~p~n", [Reason]),
ok
cluster_nodes() ->
{Path, Args} = case autocluster_consul_config:cluster_name() of
undefined -> {[catalog, service, autocluster_consul_config:service()], []};
Name -> {[catalog, service, autocluster_consul_config:service()], [{tag, Name}]}
end,
case autocluster_consul_client:get(Path, Args) of
{ok, Nodes} -> extract_nodes(Nodes);
{error, Error} ->
warning("Error fetching nodes from consul: ~p~n", [Error]),
[]
end.


Expand Down Expand Up @@ -149,39 +179,15 @@ filter_self(Addresses) ->


%% @private
%% @spec join_cluster(term()) -> ok
%% @doc Have the current node join a cluster using the specified discovery node
%% @end
%%
join_cluster([]) -> ok;
join_cluster(Nodes) ->
io:format(" Joining existing cluster.~n~n"),
application:stop(rabbit),
mnesia:stop(),
rabbit_mnesia:reset(),
rabbit_mnesia:join_cluster(lists:nth(1, Nodes), disc),
mnesia:start(),
rabbit:start(),
ok.


%% @private
%% @spec registration_body() -> list()
%% @doc Return the appropriate registration body.
%% @spec host_sname(binary()) -> list()
%% @doc Return the hostname/sname from the specified value
%% @end
%%
registration_body() ->
{Service, Name, Port, TTL} = {autocluster_consul_config:service(),
autocluster_consul_config:cluster_name(),
autocluster_consul_config:service_port(),
autocluster_consul_config:service_ttl()},
Payload = build_registration_body(list_to_atom(Service), Name, Port, TTL),
case rabbit_misc:json_encode(Payload) of
{ok, Body} ->
lists:flatten(Body);
{error, Error} ->
error_logger:error_msg("Could not JSON serialize the request body: ~p (~p)~n", [Error, Payload]),
rabbit:stop_and_halt()
host_sname(Value) ->
Parts = string:tokens(binary_to_list(Value), "."),
case length(Parts) of
1 -> binary_to_list(Value);
_ -> binary_to_list(lists:nth(1, Parts))
end.


Expand All @@ -202,15 +208,22 @@ build_registration_body(Service, Name, Port, TTL) ->


%% @private
%% @spec host_sname(binary()) -> list()
%% @doc Return the hostname/sname from the specified value
%% @spec registration_body() -> list()
%% @doc Return the appropriate registration body.
%% @end
%%
host_sname(Value) ->
Parts = string:tokens(binary_to_list(Value), "."),
case length(Parts) of
1 -> Value;
_ -> lists:nth(1, Parts)
registration_body() ->
{Service, Name, Port, TTL} = {autocluster_consul_config:service(),
autocluster_consul_config:cluster_name(),
autocluster_consul_config:service_port(),
autocluster_consul_config:service_ttl()},
Payload = build_registration_body(list_to_atom(Service), Name, Port, TTL),
case rabbit_misc:json_encode(Payload) of
{ok, Body} ->
lists:flatten(Body);
{error, Error} ->
err("Could not JSON serialize the request body: ~p (~p)~n", [Error, Payload]),
{error, Error}
end.


Expand All @@ -221,3 +234,23 @@ host_sname(Value) ->
%%
ttl_string(Value) ->
list_to_atom(integer_to_list(Value) ++ "s").


%% @private
%% @spec log(Module, Function, Message, Args) -> ok
%% @doc Ensure all logged lines share the same format
%% @end
%%
log(Module, Fun, Message, Args) ->
Module:Fun(string:join(["autocluster_consul: ", Message, "~n"], ""), Args).

%% Logging methods

debug(Message) -> log(rabbit_log, debug, Message, []).
debug(Message, Args) -> log(rabbit_log, debug, Message, Args).
info(Message) -> log(rabbit_log, info, Message, []).
info(Message, Args) -> log(rabbit_log, info, Message, Args).
err(Message) -> log(rabbit_log, error, Message, []).
err(Message, Args) -> log(rabbit_log, error, Message, Args).
warning(Message) -> log(rabbit_log, error, Message, []).
warning(Message, Args) -> log(rabbit_log, error, Message, Args).

0 comments on commit 9262115

Please sign in to comment.