Skip to content

Commit

Permalink
adding batching options for RPC client
Browse files Browse the repository at this point in the history
Signed-off-by: Chengxuan Xing <chengxuan.xing@kaleido.io>
  • Loading branch information
Chengxuan committed Aug 12, 2024
1 parent c144978 commit f92917d
Show file tree
Hide file tree
Showing 4 changed files with 756 additions and 73 deletions.
4 changes: 2 additions & 2 deletions internal/rpcserver/server.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright © 2023 Kaleido, Inc.
// Copyright © 2024 Kaleido, Inc.
//
// SPDX-License-Identifier: Apache-2.0
//
Expand Down Expand Up @@ -45,7 +45,7 @@ func NewServer(ctx context.Context, wallet ethsigner.Wallet) (ss Server, err err
return nil, err
}
s := &rpcServer{
backend: rpcbackend.NewRPCClient(httpClient),
backend: rpcbackend.NewRPCClient(ctx, httpClient),
apiServerDone: make(chan error),
wallet: wallet,
chainID: config.GetInt64(signerconfig.BackendChainID),
Expand Down
1 change: 1 addition & 0 deletions internal/signermsgs/en_error_messges.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,5 @@ var (
MsgInvalidEIP1559Transaction = ffe("FF22084", "Transaction payload invalid (EIP-1559): %v")
MsgInvalidEIP155TransactionV = ffe("FF22085", "Invalid V value from EIP-155 transaction (chainId=%d)")
MsgInvalidChainID = ffe("FF22086", "Invalid chainId expected=%d actual=%d")
MsgRPCRequestBatchFailed = ffe("FF22087", "Received response doesn't match the number of batched requests.")
)
309 changes: 244 additions & 65 deletions pkg/rpcbackend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,12 @@ type Backend interface {
}

// NewRPCClient Constructor
func NewRPCClient(client *resty.Client) Backend {
return NewRPCClientWithOption(client, RPCClientOptions{})
func NewRPCClient(ctx context.Context, client *resty.Client) Backend {
return NewRPCClientWithOption(ctx, client, RPCClientOptions{})
}

// NewRPCClientWithOption Constructor
func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Backend {
func NewRPCClientWithOption(ctx context.Context, client *resty.Client, options RPCClientOptions) Backend {
rpcClient := &RPCClient{
client: client,
}
Expand All @@ -64,17 +64,44 @@ func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Back
rpcClient.concurrencySlots = make(chan bool, options.MaxConcurrentRequest)
}

if options.BatchOptions != nil {
batchDelay := 50 * time.Millisecond
batchSize := 1
batchWorkerCount := 1
if options.BatchOptions.BatchDelay != nil {
batchDelay = *options.BatchOptions.BatchDelay
}

if options.BatchOptions.BatchSize > 1 {
batchSize = options.BatchOptions.BatchSize
}
if options.BatchOptions.BatchWorkerCount > 1 {
batchWorkerCount = options.BatchOptions.BatchWorkerCount
}
rpcClient.requestBatchWorkerSlots = make(chan bool, batchWorkerCount)
rpcClient.startBatcher(ctx, batchDelay, batchSize)
}

return rpcClient
}

type RPCClient struct {
client *resty.Client
concurrencySlots chan bool
requestCounter int64
client *resty.Client
concurrencySlots chan bool
requestCounter int64
requestBatchQueue chan *batchRequest
requestBatchWorkerSlots chan bool
}

type RPCClientBatchOptions struct {
BatchDelay *time.Duration
BatchSize int
BatchWorkerCount int
}

type RPCClientOptions struct {
MaxConcurrentRequest int64
BatchOptions *RPCClientBatchOptions
}

type RPCRequest struct {
Expand Down Expand Up @@ -141,11 +168,137 @@ func (rc *RPCClient) CallRPC(ctx context.Context, result interface{}, method str
return nil
}

// SyncRequest sends an individual RPC request to the backend (always over HTTP currently),
// and waits synchronously for the response, or an error.
//
// In all return paths *including error paths* the RPCResponse is populated
// so the caller has an RPC structure to send back to the front-end caller.
type batchRequest struct {
rpcReq *RPCRequest
rpcRes chan *RPCResponse
rpcErr chan error
}

func (rc *RPCClient) startBatcher(ctx context.Context, batchDelay time.Duration, batchSize int) {
requestQueue := make(chan *batchRequest)

go func() {
ticker := time.NewTicker(batchDelay)
defer ticker.Stop()

var batch []*batchRequest

for {
select {
case req := <-requestQueue:
batch = append(batch, req)
if len(batch) >= batchSize {
rc.sendBatch(ctx, batch)
batch = nil
}
case <-ticker.C:
if len(batch) > 0 {
rc.sendBatch(ctx, batch)
batch = nil
}
case <-ctx.Done():
return
}
}
}()

rc.requestBatchQueue = requestQueue
}

func (rc *RPCClient) sendBatch(ctx context.Context, batch []*batchRequest) {
select {
case rc.requestBatchWorkerSlots <- true:
// wait for the worker slot and continue
case <-ctx.Done():
for _, req := range batch {
err := i18n.NewError(ctx, signermsgs.MsgRPCRequestBatchFailed)
req.rpcErr <- err
}
return
}
go func() {
defer func() {
<-rc.requestBatchWorkerSlots
}()

batchRPCTraceID := fmt.Sprintf("batch-%d", time.Now().UnixNano())
traceIDs := make([]string, len(batch))

var rpcReqs []*RPCRequest
for i, req := range batch {
// We always set the back-end request ID - as we need to support requests coming in from
// multiple concurrent clients on our front-end that might use clashing IDs.
var beReq = *req.rpcReq
beReq.JSONRpc = "2.0"
rpcTraceID := rc.allocateRequestID(&beReq)
if req.rpcReq.ID != nil {
// We're proxying a request with front-end RPC ID - log that as well
rpcTraceID = fmt.Sprintf("%s->%s/%s", req.rpcReq.ID, batchRPCTraceID, rpcTraceID)
}
traceIDs[i] = rpcTraceID
rpcReqs = append(rpcReqs, &beReq)
}
log.L(ctx).Debugf("RPC[%s] --> BATCH %d requests", batchRPCTraceID, len(rpcReqs))

responses := make([]*RPCResponse, len(batch))
res, err := rc.client.R().
SetContext(ctx).
SetBody(rpcReqs).
SetResult(&responses).
SetError(&responses).
Post("")

if err != nil {
log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", batchRPCTraceID, err)
for _, req := range batch {
req.rpcErr <- err
}
return
}

if len(responses) != len(batch) {
err := i18n.NewError(ctx, signermsgs.MsgRPCRequestBatchFailed)
for _, req := range batch {
req.rpcErr <- err
}
return
}

for i, resp := range responses {
if logrus.IsLevelEnabled(logrus.TraceLevel) {
jsonOutput, _ := json.Marshal(resp)
log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", batchRPCTraceID, jsonOutput)
}

// JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes
if res.IsError() || (resp != nil && resp.Error != nil && resp.Error.Code != 0) {
rpcMsg := ""
errLog := ""
if resp != nil {
rpcMsg = resp.Message()
errLog = rpcMsg
}
if rpcMsg == "" {
// Log the raw result in the case of JSON parse error etc. (note that Resty no longer
// returns this as an error - rather the body comes back raw)
errLog = string(res.Body())
rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error()
}
traceID := traceIDs[i]
log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", traceID, res.StatusCode(), errLog)
batch[i].rpcErr <- fmt.Errorf(rpcMsg)
} else {
if resp.Result == nil {
// We don't want a result for errors, but a null success response needs to go in there
resp.Result = fftypes.JSONAnyPtr(fftypes.NullString)
}
batch[i].rpcRes <- resp

}
}
}()
}

func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRes *RPCResponse, err error) {
if rc.concurrencySlots != nil {
select {
Expand All @@ -160,63 +313,89 @@ func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRe
}()
}

// We always set the back-end request ID - as we need to support requests coming in from
// multiple concurrent clients on our front-end that might use clashing IDs.
var beReq = *rpcReq
beReq.JSONRpc = "2.0"
rpcTraceID := rc.allocateRequestID(&beReq)
if rpcReq.ID != nil {
// We're proxying a request with front-end RPC ID - log that as well
rpcTraceID = fmt.Sprintf("%s->%s", rpcReq.ID, rpcTraceID)
}
if rc.requestBatchQueue != nil {
req := &batchRequest{
rpcReq: rpcReq,
rpcRes: make(chan *RPCResponse, 1),
rpcErr: make(chan error, 1),
}

rpcRes = new(RPCResponse)
select {
case rc.requestBatchQueue <- req:
case <-ctx.Done():
err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID)
return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err
}

log.L(ctx).Debugf("RPC[%s] --> %s", rpcTraceID, rpcReq.Method)
if logrus.IsLevelEnabled(logrus.TraceLevel) {
jsonInput, _ := json.Marshal(rpcReq)
log.L(ctx).Tracef("RPC[%s] INPUT: %s", rpcTraceID, jsonInput)
}
rpcStartTime := time.Now()
res, err := rc.client.R().
SetContext(ctx).
SetBody(beReq).
SetResult(&rpcRes).
SetError(rpcRes).
Post("")

// Restore the original ID
rpcRes.ID = rpcReq.ID
if err != nil {
err := i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, err)
log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", rpcTraceID, err)
rpcRes = RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError)
return rpcRes, err
}
if logrus.IsLevelEnabled(logrus.TraceLevel) {
jsonOutput, _ := json.Marshal(rpcRes)
log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", rpcTraceID, jsonOutput)
}
// JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes
if res.IsError() || rpcRes.Error != nil && rpcRes.Error.Code != 0 {
rpcMsg := rpcRes.Message()
errLog := rpcMsg
if rpcMsg == "" {
// Log the raw result in the case of JSON parse error etc. (note that Resty no longer
// returns this as an error - rather the body comes back raw)
errLog = string(res.Body())
rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error()
}
log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", rpcTraceID, res.StatusCode(), errLog)
err := fmt.Errorf(rpcMsg)
return rpcRes, err
}
log.L(ctx).Infof("RPC[%s] <-- %s [%d] OK (%.2fms)", rpcTraceID, rpcReq.Method, res.StatusCode(), float64(time.Since(rpcStartTime))/float64(time.Millisecond))
if rpcRes.Result == nil {
// We don't want a result for errors, but a null success response needs to go in there
rpcRes.Result = fftypes.JSONAnyPtr(fftypes.NullString)
select {
case rpcRes := <-req.rpcRes:
return rpcRes, nil
case err := <-req.rpcErr:
return nil, err
case <-ctx.Done():
err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID)
return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err
}
} else {
// We always set the back-end request ID - as we need to support requests coming in from
// multiple concurrent clients on our front-end that might use clashing IDs.
var beReq = *rpcReq
beReq.JSONRpc = "2.0"
rpcTraceID := rc.allocateRequestID(&beReq)
if rpcReq.ID != nil {
// We're proxying a request with front-end RPC ID - log that as well
rpcTraceID = fmt.Sprintf("%s->%s", rpcReq.ID, rpcTraceID)
}

rpcRes = new(RPCResponse)

log.L(ctx).Debugf("RPC[%s] --> %s", rpcTraceID, rpcReq.Method)
if logrus.IsLevelEnabled(logrus.TraceLevel) {
jsonInput, _ := json.Marshal(rpcReq)
log.L(ctx).Tracef("RPC[%s] INPUT: %s", rpcTraceID, jsonInput)
}
rpcStartTime := time.Now()
res, err := rc.client.R().
SetContext(ctx).
SetBody(beReq).
SetResult(&rpcRes).
SetError(&rpcRes).
Post("")

// Restore the original ID
rpcRes.ID = rpcReq.ID
if err != nil {
err := i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, err)
log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", rpcTraceID, err)
rpcRes = RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError)
return rpcRes, err
}
if logrus.IsLevelEnabled(logrus.TraceLevel) {
jsonOutput, _ := json.Marshal(rpcRes)
log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", rpcTraceID, jsonOutput)
}
// JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes
if res.IsError() || rpcRes.Error != nil && rpcRes.Error.Code != 0 {
rpcMsg := rpcRes.Message()
errLog := rpcMsg
if rpcMsg == "" {
// Log the raw result in the case of JSON parse error etc. (note that Resty no longer
// returns this as an error - rather the body comes back raw)
errLog = string(res.Body())
rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error()
}
log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", rpcTraceID, res.StatusCode(), errLog)
err := fmt.Errorf(rpcMsg)
return rpcRes, err
}
log.L(ctx).Infof("RPC[%s] <-- %s [%d] OK (%.2fms)", rpcTraceID, rpcReq.Method, res.StatusCode(), float64(time.Since(rpcStartTime))/float64(time.Millisecond))
if rpcRes.Result == nil {
// We don't want a result for errors, but a null success response needs to go in there
rpcRes.Result = fftypes.JSONAnyPtr(fftypes.NullString)
}
return rpcRes, nil
}
return rpcRes, nil

}

func RPCErrorResponse(err error, id *fftypes.JSONAny, code RPCCode) *RPCResponse {
Expand Down
Loading

0 comments on commit f92917d

Please sign in to comment.