Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(clustering/rpc): support rpc batching on dp side #14040

Open
wants to merge 18 commits into
base: master
Choose a base branch
from
191 changes: 143 additions & 48 deletions kong/clustering/rpc/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,12 @@ local utils = require("kong.clustering.rpc.utils")
local queue = require("kong.clustering.rpc.queue")
local jsonrpc = require("kong.clustering.rpc.json_rpc_v2")
local constants = require("kong.constants")
local isarray = require("table.isarray")
local isempty = require("table.isempty")
local tb_insert = table.insert


local type = type
local assert = assert
local unpack = unpack
local string_format = string.format
Expand Down Expand Up @@ -59,7 +63,28 @@ function _M:_get_next_id()
end


function _M._dispatch(premature, self, cb, payload)
function _M:push_request(msg)
return self.outgoing:push(msg)
end


function _M:push_response(msg, err_prefix, collection)
-- may be a batch
if collection then
tb_insert(collection, msg)
return true
end

local res, err = self.outgoing:push(msg)
if not res then
return nil, err_prefix .. err
end

return true
end


function _M._dispatch(premature, self, cb, payload, collection)
if premature then
return
end
Expand All @@ -73,10 +98,11 @@ function _M._dispatch(premature, self, cb, payload)
return
end

res, err = self.outgoing:push(new_error(payload.id, jsonrpc.SERVER_ERROR,
err))
res, err = self:push_response(new_error(payload.id, jsonrpc.SERVER_ERROR, err),
"[rpc] unable to push RPC call error: ",
collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to push RPC call error: ", err)
ngx_log(ngx_WARN, err)
end

return
Expand All @@ -89,14 +115,85 @@ function _M._dispatch(premature, self, cb, payload)
end

-- success
res, err = self.outgoing:push({
res, err = self:push_response({
jsonrpc = jsonrpc.VERSION,
id = payload.id,
result = res,
})
}, "[rpc] unable to push RPC call result: ", collection)
if not res then
ngx_log(ngx_WARN, "[rpc] unable to push RPC call result: ", err)
ngx_log(ngx_WARN, err)
end
end


function _M:process_rpc_msg(payload, collection)
if type(payload) ~= "table" then
local res, err = self:push_response(
new_error(nil, jsonrpc.INVALID_REQUEST, "Invalid Request"),
collection)
if not res then
return nil, err
end

return true
end

assert(payload.jsonrpc == jsonrpc.VERSION)

local payload_id = payload.id
local payload_method = payload.method

if payload_method then
-- invoke

ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload_method, " (id: ", payload_id, ")")

local dispatch_cb = self.manager.callbacks.callbacks[payload_method]
if not dispatch_cb and payload_id then
local res, err = self:push_response(new_error(payload_id, jsonrpc.METHOD_NOT_FOUND),
"unable to send \"METHOD_NOT_FOUND\" error back to client: ",
collection)
if not res then
return nil, err
end

return true
end

-- call dispatch
local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s",
self.node_id, payload_id or 0, payload_method),
0, _M._dispatch, self, dispatch_cb, payload, collection)
if not res and payload_id then
local reso, erro = self:push_response(new_error(payload_id, jsonrpc.INTERNAL_ERROR),
"unable to send \"INTERNAL_ERROR\" error back to client: ",
collection)
if not reso then
return nil, erro
end

return nil, "unable to dispatch JSON-RPC callback: " .. err
end

else
-- response
local interest_cb = self.interest[payload_id]
self.interest[payload_id] = nil -- edge trigger only once

if not interest_cb then
ngx_log(ngx_WARN, "[rpc] no interest for RPC response id: ", payload_id, ", dropping it")

return true
end

local res, err = interest_cb(payload)
if not res then
ngx_log(ngx_WARN, "[rpc] RPC response interest handler failed: id: ",
payload_id, ", err: ", err)
end
end -- if payload.method

return true
end


Expand All @@ -120,9 +217,9 @@ function _M:start()
end

if waited > CLUSTERING_PING_INTERVAL then
local res, err = self.outgoing:push(PING_TYPE)
local res, err = self:push_response(PING_TYPE, "unable to send ping: ")
if not res then
return nil, "unable to send ping: " .. err
return nil, err
end
end

Expand All @@ -133,9 +230,9 @@ function _M:start()
last_seen = ngx_time()

if typ == "ping" then
local res, err = self.outgoing:push(PONG_TYPE)
local res, err = self:push_response(PONG_TYPE, "unable to handle ping: ")
if not res then
return nil, "unable to handle ping: " .. err
return nil, err
end

goto continue
Expand All @@ -154,54 +251,52 @@ function _M:start()
assert(typ == "binary")

local payload = decompress_payload(data)
assert(payload.jsonrpc == jsonrpc.VERSION)

if payload.method then
-- invoke

ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")")

local dispatch_cb = self.manager.callbacks.callbacks[payload.method]
if not dispatch_cb and payload.id then
local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND))
if not res then
return nil, "unable to send \"METHOD_NOT_FOUND\" error back to client: " .. err
end

goto continue
-- single rpc call
if not isarray(payload) then
local ok, err = self:process_rpc_msg(payload)
if not ok then
return nil, err
end

-- call dispatch
local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s",
self.node_id, payload.id or 0, payload.method),
0, _M._dispatch, self, dispatch_cb, payload)
if not res and payload.id then
local reso, erro = self.outgoing:push(new_error(payload.id, jsonrpc.INTERNAL_ERROR))
if not reso then
return nil, "unable to send \"INTERNAL_ERROR\" error back to client: " .. erro
end
goto continue
end

return nil, "unable to dispatch JSON-RPC callback: " .. err
-- rpc call with an empty Array
if isempty(payload) then
local res, err = self:push_response(
new_error(nil, jsonrpc.INVALID_REQUEST, "Invalid Request"))
if not res then
return nil, err
end

else
-- response
local interest_cb = self.interest[payload.id]
self.interest[payload.id] = nil -- edge trigger only once
goto continue
end

if not interest_cb then
ngx_log(ngx_WARN, "[rpc] no interest for RPC response id: ", payload.id, ", dropping it")
-- batch rpc call

goto continue
end
local collection = {}

local res, err = interest_cb(payload)
if not res then
ngx_log(ngx_WARN, "[rpc] RPC response interest handler failed: id: ",
payload.id, ", err: ", err)
for _, v in ipairs(payload) do
local ok, err = self:process_rpc_msg(v, collection)
if not ok then
return nil, err
end
end

-- may be all notifications
if isempty(collection) then
goto continue
end

assert(isarray(collection))

local res, err = self:push_response(collection,
"[rpc] unable to push RPC call result: ")
if not res then
return nil, err
end

::continue::
end
end)
Expand Down Expand Up @@ -290,7 +385,7 @@ function _M:call(node_id, method, params, callback)
self.interest[id] = callback
end

return self.outgoing:push({
return self:push_request({
jsonrpc = jsonrpc.VERSION,
method = method,
params = params,
Expand Down
Loading