diff --git a/src/autocluster_consul.erl b/src/autocluster_consul.erl index 0659ee6..3ecbb5b 100644 --- a/src/autocluster_consul.erl +++ b/src/autocluster_consul.erl @@ -27,24 +27,29 @@ %% @end %% init() -> - io:format("~n Consul Cluster Initializing: "), - case register() of + application:ensure_started(inets), + case maybe_register() of ok -> - io:format("Node registered~n"); - {error, 400} -> - io:format("permission denied when registering node with Consul, did you provide a ACL token?~n"), - rabbit:stop_and_halt(); - {error, Error} -> - io:format("Error registering: ~p~n", [Error]), - rabbit:stop_and_halt() + CheckInterval = autocluster_consul_config:service_ttl() * 1000, + send_check_pass(), + {ok, _} = timer:apply_interval(CheckInterval, ?MODULE, send_check_pass, []); + error -> + warning("Failed to register or join cluster") end, - send_check_pass(), - Interval = autocluster_consul_config:service_ttl() * 1000, - {ok, _} = timer:apply_interval(Interval, ?MODULE, send_check_pass, []), - Nodes = cluster_nodes(), - case lists:member(node(), Nodes) of - true -> ok; - false -> join_cluster(Nodes) + ok. + + +%% @spec send_check_pass() -> ok +%% @doc Let Consul know that the health check should be passing +%% @end +%% +send_check_pass() -> + Service = list_to_atom("service:" ++ autocluster_consul_config:service()), + case autocluster_consul_client:get([agent, check, pass, Service], []) of + ok -> ok; + {error, Reason} -> + err("Error updating Consul health check: ~p~n", [Reason]), + ok end. @@ -54,32 +59,54 @@ init() -> %% @end %% shutdown() -> - io:format("~n Consul Cluster Deregistering: "), - case deregister() of - ok -> - io:format("Node deregistered~n"); - Other -> - io:format("Error deregistering: ~p~n", [Other]) - end, + case unregister() of + ok -> info("Unregistered"); + {error, Error} -> err("Error unregistering: ~p", [Error]) + end. + + +%% @private +%% @spec join_cluster(term()) -> ok +%% @doc Have the current node join a cluster using the specified discovery node +%% @end +%% +join_cluster([]) -> + debug("No nodes in existing cluster"), + ok; +join_cluster(Nodes) -> + debug("Joining existing cluster: ~p", [Nodes]), + application:stop(rabbit), + mnesia:stop(), + rabbit_mnesia:reset(), + rabbit_mnesia:join_cluster(lists:nth(1, Nodes), disc), + mnesia:start(), + rabbit:start(), + info("Cluster joined"), ok. %% @private -%% @spec extract_nodes() -> list() -%% @doc Fetch the list of cluster nodes from Consul, returning them as a list of -%% atoms. +%% @spec maybe_register() -> ok|error +%% @doc Register with Consul and join the cluster if needed %% @end %% -cluster_nodes() -> - {Path, Args} = case autocluster_consul_config:cluster_name() of - undefined -> {[catalog, service, autocluster_consul_config:service()], []}; - Name -> {[catalog, service, autocluster_consul_config:service()], [{tag, Name}]} - end, - case autocluster_consul_client:get(Path, Args) of - {ok, Nodes} -> extract_nodes(Nodes); - {error, Reason} -> - error_logger:error_msg("Error fetching nodes from consul: ~p~n", [Reason]), - [] +maybe_register() -> + Nodes = cluster_nodes(), + case lists:member(node(), Nodes) of + true -> + debug("Node is already registered"), + ok; + false -> + case register() of + ok -> + join_cluster(Nodes); + {error, 400} -> + err("Permission denied when registering node with Consul"), + error; + {error, Error} -> + err("Error registering: ~p", [Error]), + error + end end. @@ -89,39 +116,42 @@ cluster_nodes() -> %% @end %% register() -> + info("Registering node with Consul"), case autocluster_consul_client:post([agent, service, register], registration_body()) of ok -> ok; - {error, Reason} -> - error_logger:error_msg("Error registering node with consul: ~p~n", [Reason]), - {error, Reason} + Error -> Error end. %% @private -%% @spec register() -> mixed +%% @spec unregister() -> mixed %% @doc Deregister the rabbitmq service for this node from Consul %% @end %% -deregister() -> +unregister() -> + info("Unregistering node with Consul"), case autocluster_consul_client:get([agent, service, deregister, autocluster_consul_config:service()], []) of {ok, _} -> ok; - {error, Reason} -> - error_logger:error_msg("Error fetching deregistering node from consul: ~p~n", [Reason]), - [] + Error -> Error end. -%% @spec send_check_pass() -> ok -%% @doc Let Consul know that the health check should be passing +%% @private +%% @spec cluster_nodes() -> list() +%% @doc Fetch the list of cluster nodes from Consul, returning them as a list of +%% atoms. %% @end %% -send_check_pass() -> - Service = list_to_atom("service:" ++ autocluster_consul_config:service()), - case autocluster_consul_client:get([agent, check, pass, Service], []) of - ok -> ok; - {error, Reason} -> - error_logger:error_msg("Error updating Consul health check: ~p~n", [Reason]), - ok +cluster_nodes() -> + {Path, Args} = case autocluster_consul_config:cluster_name() of + undefined -> {[catalog, service, autocluster_consul_config:service()], []}; + Name -> {[catalog, service, autocluster_consul_config:service()], [{tag, Name}]} + end, + case autocluster_consul_client:get(Path, Args) of + {ok, Nodes} -> extract_nodes(Nodes); + {error, Error} -> + warning("Error fetching nodes from consul: ~p~n", [Error]), + [] end. @@ -149,39 +179,15 @@ filter_self(Addresses) -> %% @private -%% @spec join_cluster(term()) -> ok -%% @doc Have the current node join a cluster using the specified discovery node -%% @end -%% -join_cluster([]) -> ok; -join_cluster(Nodes) -> - io:format(" Joining existing cluster.~n~n"), - application:stop(rabbit), - mnesia:stop(), - rabbit_mnesia:reset(), - rabbit_mnesia:join_cluster(lists:nth(1, Nodes), disc), - mnesia:start(), - rabbit:start(), - ok. - - -%% @private -%% @spec registration_body() -> list() -%% @doc Return the appropriate registration body. +%% @spec host_sname(binary()) -> list() +%% @doc Return the hostname/sname from the specified value %% @end %% -registration_body() -> - {Service, Name, Port, TTL} = {autocluster_consul_config:service(), - autocluster_consul_config:cluster_name(), - autocluster_consul_config:service_port(), - autocluster_consul_config:service_ttl()}, - Payload = build_registration_body(list_to_atom(Service), Name, Port, TTL), - case rabbit_misc:json_encode(Payload) of - {ok, Body} -> - lists:flatten(Body); - {error, Error} -> - error_logger:error_msg("Could not JSON serialize the request body: ~p (~p)~n", [Error, Payload]), - rabbit:stop_and_halt() +host_sname(Value) -> + Parts = string:tokens(binary_to_list(Value), "."), + case length(Parts) of + 1 -> binary_to_list(Value); + _ -> binary_to_list(lists:nth(1, Parts)) end. @@ -202,15 +208,22 @@ build_registration_body(Service, Name, Port, TTL) -> %% @private -%% @spec host_sname(binary()) -> list() -%% @doc Return the hostname/sname from the specified value +%% @spec registration_body() -> list() +%% @doc Return the appropriate registration body. %% @end %% -host_sname(Value) -> - Parts = string:tokens(binary_to_list(Value), "."), - case length(Parts) of - 1 -> Value; - _ -> lists:nth(1, Parts) +registration_body() -> + {Service, Name, Port, TTL} = {autocluster_consul_config:service(), + autocluster_consul_config:cluster_name(), + autocluster_consul_config:service_port(), + autocluster_consul_config:service_ttl()}, + Payload = build_registration_body(list_to_atom(Service), Name, Port, TTL), + case rabbit_misc:json_encode(Payload) of + {ok, Body} -> + lists:flatten(Body); + {error, Error} -> + err("Could not JSON serialize the request body: ~p (~p)~n", [Error, Payload]), + {error, Error} end. @@ -221,3 +234,23 @@ host_sname(Value) -> %% ttl_string(Value) -> list_to_atom(integer_to_list(Value) ++ "s"). + + +%% @private +%% @spec log(Module, Function, Message, Args) -> ok +%% @doc Ensure all logged lines share the same format +%% @end +%% +log(Module, Fun, Message, Args) -> + Module:Fun(string:join(["autocluster_consul: ", Message, "~n"], ""), Args). + +%% Logging methods + +debug(Message) -> log(rabbit_log, debug, Message, []). +debug(Message, Args) -> log(rabbit_log, debug, Message, Args). +info(Message) -> log(rabbit_log, info, Message, []). +info(Message, Args) -> log(rabbit_log, info, Message, Args). +err(Message) -> log(rabbit_log, error, Message, []). +err(Message, Args) -> log(rabbit_log, error, Message, Args). +warning(Message) -> log(rabbit_log, error, Message, []). +warning(Message, Args) -> log(rabbit_log, error, Message, Args).