diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 2044acf170a3..298bac37c5d3 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -11,8 +11,12 @@ local utils = require("kong.clustering.rpc.utils") local queue = require("kong.clustering.rpc.queue") local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") local constants = require("kong.constants") +local isarray = require("table.isarray") +local isempty = require("table.isempty") +local tb_insert = table.insert +local type = type local assert = assert local unpack = unpack local string_format = string.format @@ -59,7 +63,30 @@ function _M:_get_next_id() end -function _M._dispatch(premature, self, cb, payload) +function _M:push_request(msg) + return self.outgoing:push(msg) +end + + +-- collection is only for rpc batch call. +-- if collection is nil, it means the rpc is a single call. +function _M:push_response(msg, err_prefix, collection) + -- may be a batch + if collection then + tb_insert(collection, msg) + return true + end + + local res, err = self.outgoing:push(msg) + if not res then + return nil, err_prefix .. err + end + + return true +end + + +function _M._dispatch(premature, self, cb, payload, collection) if premature then return end @@ -73,10 +100,11 @@ function _M._dispatch(premature, self, cb, payload) return end - res, err = self.outgoing:push(new_error(payload.id, jsonrpc.SERVER_ERROR, - err)) + res, err = self:push_response(new_error(payload.id, jsonrpc.SERVER_ERROR, err), + "[rpc] unable to push RPC call error: ", + collection) if not res then - ngx_log(ngx_WARN, "[rpc] unable to push RPC call error: ", err) + ngx_log(ngx_WARN, err) end return @@ -89,14 +117,85 @@ function _M._dispatch(premature, self, cb, payload) end -- success - res, err = self.outgoing:push({ + res, err = self:push_response({ jsonrpc = jsonrpc.VERSION, id = payload.id, result = res, - }) + }, "[rpc] unable to push RPC call result: ", collection) if not res then - ngx_log(ngx_WARN, "[rpc] unable to push RPC call result: ", err) + ngx_log(ngx_WARN, err) + end +end + + +function _M:process_rpc_msg(payload, collection) + if type(payload) ~= "table" then + local res, err = self:push_response( + new_error(nil, jsonrpc.INVALID_REQUEST, "not an valid object"), + collection) + if not res then + return nil, err + end + + return true end + + assert(payload.jsonrpc == jsonrpc.VERSION) + + local payload_id = payload.id + local payload_method = payload.method + + if payload_method then + -- invoke + + ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload_method, " (id: ", payload_id, ")") + + local dispatch_cb = self.manager.callbacks.callbacks[payload_method] + if not dispatch_cb and payload_id then + local res, err = self:push_response(new_error(payload_id, jsonrpc.METHOD_NOT_FOUND), + "unable to send \"METHOD_NOT_FOUND\" error back to client: ", + collection) + if not res then + return nil, err + end + + return true + end + + -- call dispatch + local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", + self.node_id, payload_id or 0, payload_method), + 0, _M._dispatch, self, dispatch_cb, payload, collection) + if not res and payload_id then + local reso, erro = self:push_response(new_error(payload_id, jsonrpc.INTERNAL_ERROR), + "unable to send \"INTERNAL_ERROR\" error back to client: ", + collection) + if not reso then + return nil, erro + end + + return nil, "unable to dispatch JSON-RPC callback: " .. err + end + + else + -- response + local interest_cb = self.interest[payload_id] + self.interest[payload_id] = nil -- edge trigger only once + + if not interest_cb then + ngx_log(ngx_WARN, "[rpc] no interest for RPC response id: ", payload_id, ", dropping it") + + return true + end + + local res, err = interest_cb(payload) + if not res then + ngx_log(ngx_WARN, "[rpc] RPC response interest handler failed: id: ", + payload_id, ", err: ", err) + end + end -- if payload.method + + return true end @@ -120,9 +219,9 @@ function _M:start() end if waited > CLUSTERING_PING_INTERVAL then - local res, err = self.outgoing:push(PING_TYPE) + local res, err = self:push_response(PING_TYPE, "unable to send ping: ") if not res then - return nil, "unable to send ping: " .. err + return nil, err end end @@ -133,9 +232,9 @@ function _M:start() last_seen = ngx_time() if typ == "ping" then - local res, err = self.outgoing:push(PONG_TYPE) + local res, err = self:push_response(PONG_TYPE, "unable to handle ping: ") if not res then - return nil, "unable to handle ping: " .. err + return nil, err end goto continue @@ -154,54 +253,52 @@ function _M:start() assert(typ == "binary") local payload = decompress_payload(data) - assert(payload.jsonrpc == jsonrpc.VERSION) - - if payload.method then - -- invoke - - ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")") - local dispatch_cb = self.manager.callbacks.callbacks[payload.method] - if not dispatch_cb and payload.id then - local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND)) - if not res then - return nil, "unable to send \"METHOD_NOT_FOUND\" error back to client: " .. err - end - - goto continue + -- single rpc call + if not isarray(payload) then + local ok, err = self:process_rpc_msg(payload) + if not ok then + return nil, err end - -- call dispatch - local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", - self.node_id, payload.id or 0, payload.method), - 0, _M._dispatch, self, dispatch_cb, payload) - if not res and payload.id then - local reso, erro = self.outgoing:push(new_error(payload.id, jsonrpc.INTERNAL_ERROR)) - if not reso then - return nil, "unable to send \"INTERNAL_ERROR\" error back to client: " .. erro - end + goto continue + end - return nil, "unable to dispatch JSON-RPC callback: " .. err + -- rpc call with an empty Array + if isempty(payload) then + local res, err = self:push_response( + new_error(nil, jsonrpc.INVALID_REQUEST, "empty batch array")) + if not res then + return nil, err end - else - -- response - local interest_cb = self.interest[payload.id] - self.interest[payload.id] = nil -- edge trigger only once + goto continue + end - if not interest_cb then - ngx_log(ngx_WARN, "[rpc] no interest for RPC response id: ", payload.id, ", dropping it") + -- batch rpc call - goto continue - end + local collection = {} - local res, err = interest_cb(payload) - if not res then - ngx_log(ngx_WARN, "[rpc] RPC response interest handler failed: id: ", - payload.id, ", err: ", err) + for _, v in ipairs(payload) do + local ok, err = self:process_rpc_msg(v, collection) + if not ok then + return nil, err end end + -- may be all notifications + if isempty(collection) then + goto continue + end + + assert(isarray(collection)) + + local res, err = self:push_response(collection, + "[rpc] unable to push RPC call result: ") + if not res then + return nil, err + end + ::continue:: end end) @@ -290,7 +387,7 @@ function _M:call(node_id, method, params, callback) self.interest[id] = callback end - return self.outgoing:push({ + return self:push_request({ jsonrpc = jsonrpc.VERSION, method = method, params = params,