diff --git a/internal/rpcserver/server.go b/internal/rpcserver/server.go index 31a6cc6f..48f5a05e 100644 --- a/internal/rpcserver/server.go +++ b/internal/rpcserver/server.go @@ -1,4 +1,4 @@ -// Copyright © 2023 Kaleido, Inc. +// Copyright © 2024 Kaleido, Inc. // // SPDX-License-Identifier: Apache-2.0 // @@ -45,7 +45,7 @@ func NewServer(ctx context.Context, wallet ethsigner.Wallet) (ss Server, err err return nil, err } s := &rpcServer{ - backend: rpcbackend.NewRPCClient(httpClient), + backend: rpcbackend.NewRPCClient(ctx, httpClient), apiServerDone: make(chan error), wallet: wallet, chainID: config.GetInt64(signerconfig.BackendChainID), diff --git a/internal/signermsgs/en_error_messges.go b/internal/signermsgs/en_error_messges.go index 4528d4e4..a9b28468 100644 --- a/internal/signermsgs/en_error_messges.go +++ b/internal/signermsgs/en_error_messges.go @@ -103,4 +103,5 @@ var ( MsgInvalidEIP1559Transaction = ffe("FF22084", "Transaction payload invalid (EIP-1559): %v") MsgInvalidEIP155TransactionV = ffe("FF22085", "Invalid V value from EIP-155 transaction (chainId=%d)") MsgInvalidChainID = ffe("FF22086", "Invalid chainId expected=%d actual=%d") + MsgRPCRequestBatchFailed = ffe("FF22087", "Received response doesn't match the number of batched requests.") ) diff --git a/pkg/rpcbackend/backend.go b/pkg/rpcbackend/backend.go index 36fd3a5d..ed4c70c9 100644 --- a/pkg/rpcbackend/backend.go +++ b/pkg/rpcbackend/backend.go @@ -50,12 +50,12 @@ type Backend interface { } // NewRPCClient Constructor -func NewRPCClient(client *resty.Client) Backend { - return NewRPCClientWithOption(client, RPCClientOptions{}) +func NewRPCClient(ctx context.Context, client *resty.Client) Backend { + return NewRPCClientWithOption(ctx, client, RPCClientOptions{}) } // NewRPCClientWithOption Constructor -func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Backend { +func NewRPCClientWithOption(ctx context.Context, client *resty.Client, options RPCClientOptions) Backend { rpcClient := &RPCClient{ client: client, } @@ -64,17 +64,44 @@ func NewRPCClientWithOption(client *resty.Client, options RPCClientOptions) Back rpcClient.concurrencySlots = make(chan bool, options.MaxConcurrentRequest) } + if options.BatchOptions != nil { + batchDelay := 50 * time.Millisecond + batchSize := 1 + batchWorkerCount := 1 + if options.BatchOptions.BatchDelay != nil { + batchDelay = *options.BatchOptions.BatchDelay + } + + if options.BatchOptions.BatchSize > 1 { + batchSize = options.BatchOptions.BatchSize + } + if options.BatchOptions.BatchWorkerCount > 1 { + batchWorkerCount = options.BatchOptions.BatchWorkerCount + } + rpcClient.requestBatchWorkerSlots = make(chan bool, batchWorkerCount) + rpcClient.startBatcher(ctx, batchDelay, batchSize) + } + return rpcClient } type RPCClient struct { - client *resty.Client - concurrencySlots chan bool - requestCounter int64 + client *resty.Client + concurrencySlots chan bool + requestCounter int64 + requestBatchQueue chan *batchRequest + requestBatchWorkerSlots chan bool +} + +type RPCClientBatchOptions struct { + BatchDelay *time.Duration + BatchSize int + BatchWorkerCount int } type RPCClientOptions struct { MaxConcurrentRequest int64 + BatchOptions *RPCClientBatchOptions } type RPCRequest struct { @@ -141,11 +168,137 @@ func (rc *RPCClient) CallRPC(ctx context.Context, result interface{}, method str return nil } -// SyncRequest sends an individual RPC request to the backend (always over HTTP currently), -// and waits synchronously for the response, or an error. -// -// In all return paths *including error paths* the RPCResponse is populated -// so the caller has an RPC structure to send back to the front-end caller. +type batchRequest struct { + rpcReq *RPCRequest + rpcRes chan *RPCResponse + rpcErr chan error +} + +func (rc *RPCClient) startBatcher(ctx context.Context, batchDelay time.Duration, batchSize int) { + requestQueue := make(chan *batchRequest) + + go func() { + ticker := time.NewTicker(batchDelay) + defer ticker.Stop() + + var batch []*batchRequest + + for { + select { + case req := <-requestQueue: + batch = append(batch, req) + if len(batch) >= batchSize { + rc.sendBatch(ctx, batch) + batch = nil + } + case <-ticker.C: + if len(batch) > 0 { + rc.sendBatch(ctx, batch) + batch = nil + } + case <-ctx.Done(): + return + } + } + }() + + rc.requestBatchQueue = requestQueue +} + +func (rc *RPCClient) sendBatch(ctx context.Context, batch []*batchRequest) { + select { + case rc.requestBatchWorkerSlots <- true: + // wait for the worker slot and continue + case <-ctx.Done(): + for _, req := range batch { + err := i18n.NewError(ctx, signermsgs.MsgRPCRequestBatchFailed) + req.rpcErr <- err + } + return + } + go func() { + defer func() { + <-rc.requestBatchWorkerSlots + }() + + batchRPCTraceID := fmt.Sprintf("batch-%d", time.Now().UnixNano()) + traceIDs := make([]string, len(batch)) + + var rpcReqs []*RPCRequest + for i, req := range batch { + // We always set the back-end request ID - as we need to support requests coming in from + // multiple concurrent clients on our front-end that might use clashing IDs. + var beReq = *req.rpcReq + beReq.JSONRpc = "2.0" + rpcTraceID := rc.allocateRequestID(&beReq) + if req.rpcReq.ID != nil { + // We're proxying a request with front-end RPC ID - log that as well + rpcTraceID = fmt.Sprintf("%s->%s/%s", req.rpcReq.ID, batchRPCTraceID, rpcTraceID) + } + traceIDs[i] = rpcTraceID + rpcReqs = append(rpcReqs, &beReq) + } + log.L(ctx).Debugf("RPC[%s] --> BATCH %d requests", batchRPCTraceID, len(rpcReqs)) + + responses := make([]*RPCResponse, len(batch)) + res, err := rc.client.R(). + SetContext(ctx). + SetBody(rpcReqs). + SetResult(&responses). + SetError(&responses). + Post("") + + if err != nil { + log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", batchRPCTraceID, err) + for _, req := range batch { + req.rpcErr <- err + } + return + } + + if len(responses) != len(batch) { + err := i18n.NewError(ctx, signermsgs.MsgRPCRequestBatchFailed) + for _, req := range batch { + req.rpcErr <- err + } + return + } + + for i, resp := range responses { + if logrus.IsLevelEnabled(logrus.TraceLevel) { + jsonOutput, _ := json.Marshal(resp) + log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", batchRPCTraceID, jsonOutput) + } + + // JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes + if res.IsError() || (resp != nil && resp.Error != nil && resp.Error.Code != 0) { + rpcMsg := "" + errLog := "" + if resp != nil { + rpcMsg = resp.Message() + errLog = rpcMsg + } + if rpcMsg == "" { + // Log the raw result in the case of JSON parse error etc. (note that Resty no longer + // returns this as an error - rather the body comes back raw) + errLog = string(res.Body()) + rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error() + } + traceID := traceIDs[i] + log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", traceID, res.StatusCode(), errLog) + batch[i].rpcErr <- fmt.Errorf(rpcMsg) + } else { + if resp.Result == nil { + // We don't want a result for errors, but a null success response needs to go in there + resp.Result = fftypes.JSONAnyPtr(fftypes.NullString) + } + batch[i].rpcRes <- resp + + } + } + }() +} + func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRes *RPCResponse, err error) { if rc.concurrencySlots != nil { select { @@ -160,63 +313,89 @@ func (rc *RPCClient) SyncRequest(ctx context.Context, rpcReq *RPCRequest) (rpcRe }() } - // We always set the back-end request ID - as we need to support requests coming in from - // multiple concurrent clients on our front-end that might use clashing IDs. - var beReq = *rpcReq - beReq.JSONRpc = "2.0" - rpcTraceID := rc.allocateRequestID(&beReq) - if rpcReq.ID != nil { - // We're proxying a request with front-end RPC ID - log that as well - rpcTraceID = fmt.Sprintf("%s->%s", rpcReq.ID, rpcTraceID) - } + if rc.requestBatchQueue != nil { + req := &batchRequest{ + rpcReq: rpcReq, + rpcRes: make(chan *RPCResponse, 1), + rpcErr: make(chan error, 1), + } - rpcRes = new(RPCResponse) + select { + case rc.requestBatchQueue <- req: + case <-ctx.Done(): + err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) + return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err + } - log.L(ctx).Debugf("RPC[%s] --> %s", rpcTraceID, rpcReq.Method) - if logrus.IsLevelEnabled(logrus.TraceLevel) { - jsonInput, _ := json.Marshal(rpcReq) - log.L(ctx).Tracef("RPC[%s] INPUT: %s", rpcTraceID, jsonInput) - } - rpcStartTime := time.Now() - res, err := rc.client.R(). - SetContext(ctx). - SetBody(beReq). - SetResult(&rpcRes). - SetError(rpcRes). - Post("") - - // Restore the original ID - rpcRes.ID = rpcReq.ID - if err != nil { - err := i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, err) - log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", rpcTraceID, err) - rpcRes = RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError) - return rpcRes, err - } - if logrus.IsLevelEnabled(logrus.TraceLevel) { - jsonOutput, _ := json.Marshal(rpcRes) - log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", rpcTraceID, jsonOutput) - } - // JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes - if res.IsError() || rpcRes.Error != nil && rpcRes.Error.Code != 0 { - rpcMsg := rpcRes.Message() - errLog := rpcMsg - if rpcMsg == "" { - // Log the raw result in the case of JSON parse error etc. (note that Resty no longer - // returns this as an error - rather the body comes back raw) - errLog = string(res.Body()) - rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error() - } - log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", rpcTraceID, res.StatusCode(), errLog) - err := fmt.Errorf(rpcMsg) - return rpcRes, err - } - log.L(ctx).Infof("RPC[%s] <-- %s [%d] OK (%.2fms)", rpcTraceID, rpcReq.Method, res.StatusCode(), float64(time.Since(rpcStartTime))/float64(time.Millisecond)) - if rpcRes.Result == nil { - // We don't want a result for errors, but a null success response needs to go in there - rpcRes.Result = fftypes.JSONAnyPtr(fftypes.NullString) + select { + case rpcRes := <-req.rpcRes: + return rpcRes, nil + case err := <-req.rpcErr: + return nil, err + case <-ctx.Done(): + err := i18n.NewError(ctx, signermsgs.MsgRequestCanceledContext, rpcReq.ID) + return RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError), err + } + } else { + // We always set the back-end request ID - as we need to support requests coming in from + // multiple concurrent clients on our front-end that might use clashing IDs. + var beReq = *rpcReq + beReq.JSONRpc = "2.0" + rpcTraceID := rc.allocateRequestID(&beReq) + if rpcReq.ID != nil { + // We're proxying a request with front-end RPC ID - log that as well + rpcTraceID = fmt.Sprintf("%s->%s", rpcReq.ID, rpcTraceID) + } + + rpcRes = new(RPCResponse) + + log.L(ctx).Debugf("RPC[%s] --> %s", rpcTraceID, rpcReq.Method) + if logrus.IsLevelEnabled(logrus.TraceLevel) { + jsonInput, _ := json.Marshal(rpcReq) + log.L(ctx).Tracef("RPC[%s] INPUT: %s", rpcTraceID, jsonInput) + } + rpcStartTime := time.Now() + res, err := rc.client.R(). + SetContext(ctx). + SetBody(beReq). + SetResult(&rpcRes). + SetError(&rpcRes). + Post("") + + // Restore the original ID + rpcRes.ID = rpcReq.ID + if err != nil { + err := i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, err) + log.L(ctx).Errorf("RPC[%s] <-- ERROR: %s", rpcTraceID, err) + rpcRes = RPCErrorResponse(err, rpcReq.ID, RPCCodeInternalError) + return rpcRes, err + } + if logrus.IsLevelEnabled(logrus.TraceLevel) { + jsonOutput, _ := json.Marshal(rpcRes) + log.L(ctx).Tracef("RPC[%s] OUTPUT: %s", rpcTraceID, jsonOutput) + } + // JSON/RPC allows errors to be returned with a 200 status code, as well as other status codes + if res.IsError() || rpcRes.Error != nil && rpcRes.Error.Code != 0 { + rpcMsg := rpcRes.Message() + errLog := rpcMsg + if rpcMsg == "" { + // Log the raw result in the case of JSON parse error etc. (note that Resty no longer + // returns this as an error - rather the body comes back raw) + errLog = string(res.Body()) + rpcMsg = i18n.NewError(ctx, signermsgs.MsgRPCRequestFailed, res.Status()).Error() + } + log.L(ctx).Errorf("RPC[%s] <-- [%d]: %s", rpcTraceID, res.StatusCode(), errLog) + err := fmt.Errorf(rpcMsg) + return rpcRes, err + } + log.L(ctx).Infof("RPC[%s] <-- %s [%d] OK (%.2fms)", rpcTraceID, rpcReq.Method, res.StatusCode(), float64(time.Since(rpcStartTime))/float64(time.Millisecond)) + if rpcRes.Result == nil { + // We don't want a result for errors, but a null success response needs to go in there + rpcRes.Result = fftypes.JSONAnyPtr(fftypes.NullString) + } + return rpcRes, nil } - return rpcRes, nil + } func RPCErrorResponse(err error, id *fftypes.JSONAny, code RPCCode) *RPCResponse { diff --git a/pkg/rpcbackend/backend_test.go b/pkg/rpcbackend/backend_test.go index d8b49736..0ff66294 100644 --- a/pkg/rpcbackend/backend_test.go +++ b/pkg/rpcbackend/backend_test.go @@ -24,6 +24,7 @@ import ( "net/http/httptest" "strconv" "testing" + "time" "github.com/hyperledger/firefly-common/pkg/ffresty" "github.com/hyperledger/firefly-common/pkg/fftypes" @@ -33,9 +34,10 @@ import ( "github.com/stretchr/testify/assert" ) -type testRPCHander func(rpcReq *RPCRequest) (int, *RPCResponse) +type testRPCHandler func(rpcReq *RPCRequest) (int, *RPCResponse) +type testBatchRPCHandler func(rpcReq []*RPCRequest) (int, []*RPCResponse) -func newTestServer(t *testing.T, rpcHandler testRPCHander, options ...RPCClientOptions) (context.Context, *RPCClient, func()) { +func newTestServer(t *testing.T, rpcHandler testRPCHandler, options ...RPCClientOptions) (context.Context, *RPCClient, func()) { ctx, cancelCtx := context.WithCancel(context.Background()) server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -64,7 +66,56 @@ func newTestServer(t *testing.T, rpcHandler testRPCHander, options ...RPCClientO c, err := ffresty.New(ctx, signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClient(c).(*RPCClient) + var rb *RPCClient + + if len(options) == 1 { + rb = NewRPCClientWithOption(ctx, c, options[0]).(*RPCClient) + } else { + rb = NewRPCClient(ctx, c).(*RPCClient) + } + + return ctx, rb, func() { + cancelCtx() + server.Close() + } +} + +func newBatchTestServer(t *testing.T, rpcHandler testBatchRPCHandler, options ...RPCClientOptions) (context.Context, *RPCClient, func()) { + + ctx, cancelCtx := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + + var rpcReqs []*RPCRequest + err := json.NewDecoder(r.Body).Decode(&rpcReqs) + assert.NoError(t, err) + + status, rpcRes := rpcHandler(rpcReqs) + b := []byte(`[]`) + if rpcRes != nil { + b, err = json.Marshal(rpcRes) + assert.NoError(t, err) + } + w.Header().Add("Content-Type", "application/json") + w.Header().Add("Content-Length", strconv.Itoa(len(b))) + w.WriteHeader(status) + w.Write(b) + + })) + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, fmt.Sprintf("http://%s", server.Listener.Addr())) + + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + + var rb *RPCClient + + if len(options) == 1 { + rb = NewRPCClientWithOption(ctx, c, options[0]).(*RPCClient) + } else { + rb = NewRPCClientWithOption(ctx, c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{}}).(*RPCClient) + } return ctx, rb, func() { cancelCtx() @@ -218,7 +269,7 @@ func TestSyncRPCCallBadJSONResponse(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClient(c).(*RPCClient) + rb := NewRPCClient(context.Background(), c).(*RPCClient) var txCount ethtypes.HexInteger rpcErr := rb.CallRPC(context.Background(), &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") @@ -239,7 +290,7 @@ func TestSyncRPCCallFailParseJSONResponse(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClient(c).(*RPCClient) + rb := NewRPCClient(context.Background(), c).(*RPCClient) var mapResult map[string]interface{} rpcErr := rb.CallRPC(context.Background(), &mapResult, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") @@ -290,7 +341,7 @@ func TestSyncRequestConcurrency(t *testing.T) { prefix.Set(ffresty.HTTPConfigURL, server.URL) c, err := ffresty.New(ctx, signerconfig.BackendConfig) assert.NoError(t, err) - rb := NewRPCClientWithOption(c, RPCClientOptions{ + rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ MaxConcurrentRequest: 1, }).(*RPCClient) @@ -308,5 +359,457 @@ func TestSyncRequestConcurrency(t *testing.T) { close(blocked) <-bgDone +} + +func TestBatchSyncRPCCallNullResponse(t *testing.T) { + + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { + rpcReq := rpcReqs[0] + assert.Equal(t, "2.0", rpcReq.JSONRpc) + assert.Equal(t, "eth_getTransactionReceipt", rpcReq.Method) + assert.Equal(t, `"000012346"`, rpcReq.ID.String()) + assert.Equal(t, `"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`, rpcReq.Params[0].String()) + return 200, []*RPCResponse{{ + JSONRpc: "2.0", + ID: rpcReq.ID, + Result: nil}} + }) + rb.requestCounter = 12345 + defer done() + + rpcRes, err := rb.SyncRequest(ctx, &RPCRequest{ + ID: fftypes.JSONAnyPtr("1"), + Method: "eth_getTransactionReceipt", + Params: []*fftypes.JSONAny{ + fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`), + }, + }) + assert.NoError(t, err) + assert.Equal(t, `null`, rpcRes.Result.String()) +} + +func TestBatchSyncRequestCanceledContext(t *testing.T) { + + blocked := make(chan struct{}) + ctx, cancelCtx := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-blocked + cancelCtx() + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(500) + w.Write([]byte(`[{}]`)) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + BatchOptions: &RPCClientBatchOptions{ + BatchSize: 1, + BatchWorkerCount: 1, + }, + }).(*RPCClient) + + checkDone := make(chan bool) + go func() { + _, err = rb.SyncRequest(ctx, &RPCRequest{}) + assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context + close(checkDone) + }() + close(blocked) + <-checkDone + + // this checks request hit cancel context + _, err = rb.SyncRequest(ctx, &RPCRequest{}) + assert.Regexp(t, "FF22063", err) + +} +func TestBatchSyncRequestCanceledContextWhenQueueingABatch(t *testing.T) { + + blocked := make(chan struct{}) + ctx, cancelCtx := context.WithCancel(context.Background()) + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + <-blocked + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(500) + w.Write([]byte(`[{}]`)) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + BatchOptions: &RPCClientBatchOptions{ + BatchSize: 1, + BatchWorkerCount: 1, + }, + }).(*RPCClient) + + rb.requestBatchWorkerSlots <- true // fill the worker slot so all batch will be queueing + + checkDone := make(chan bool) + go func() { + _, err = rb.SyncRequest(ctx, &RPCRequest{}) + assert.Regexp(t, "FF22063", err) // this checks the response hit cancel context + close(checkDone) + }() + time.Sleep(50 * time.Millisecond) // wait for the quest to be queued in the other go routine + cancelCtx() + <-checkDone + +} +func TestBatchSyncRPCCallErrorResponse(t *testing.T) { + + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { + assert.Equal(t, 1, len(rpcReqs)) + return 500, []*RPCResponse{{ + JSONRpc: "2.0", + ID: rpcReqs[0].ID, + Error: &RPCError{ + Message: "pop", + }, + }} + }, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ /*use default configuration for batching*/ }}) + rb.requestCounter = 12345 + defer done() + + var txCount ethtypes.HexInteger + err := rb.CallRPC(ctx, &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") + assert.Regexp(t, "pop", err) +} + +func TestBatchSyncRPCCallErrorCountMismatch(t *testing.T) { + + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { + assert.Equal(t, 1, len(rpcReqs)) + return 500, []*RPCResponse{} + }, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ /*use default configuration for batching*/ }}) + rb.requestCounter = 12345 + defer done() + + var txCount ethtypes.HexInteger + err := rb.CallRPC(ctx, &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") + assert.Regexp(t, "FF22087", err) +} + +func TestBatchSyncRPCCallBadJSONResponse(t *testing.T) { + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(500) + w.Write([]byte(`{!!!!`)) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) + assert.NoError(t, err) + rb := NewRPCClientWithOption(context.Background(), c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ /*use default configuration for batching*/ }}).(*RPCClient) + + var txCount ethtypes.HexInteger + rpcErr := rb.CallRPC(context.Background(), &txCount, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") + assert.Regexp(t, "FF22012", rpcErr.Error()) +} + +func TestBatchSyncRPCCallFailParseJSONResponse(t *testing.T) { + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Add("Content-Type", "application/json") + w.WriteHeader(200) + w.Write([]byte(`[{"result":"not an object"}]`)) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(context.Background(), signerconfig.BackendConfig) + assert.NoError(t, err) + rb := NewRPCClientWithOption(context.Background(), c, RPCClientOptions{BatchOptions: &RPCClientBatchOptions{ /*use default configuration for batching*/ }}).(*RPCClient) + + var mapResult map[string]interface{} + rpcErr := rb.CallRPC(context.Background(), &mapResult, "eth_getTransactionCount", ethtypes.MustNewAddress("0xfb075bb99f2aa4c49955bf703509a227d7a12248"), "pending") + assert.Regexp(t, "FF22065", rpcErr.Error()) +} + +func TestBatchSyncRPCCallErrorBadInput(t *testing.T) { + + ctx, rb, done := newBatchTestServer(t, func(rpcReqs []*RPCRequest) (status int, rpcRes []*RPCResponse) { return 500, nil }) + defer done() + + var txCount ethtypes.HexInteger + err := rb.CallRPC(ctx, &txCount, "test-bad-params", map[bool]bool{false: true}) + assert.Regexp(t, "FF22011", err) +} + +func TestBatchRequestsOKWithBatchSize(t *testing.T) { + // Define the expected server response to the batch + rpcServerResponseBatchBytes := []byte(`[ + { + "jsonrpc": "2.0", + "id": 1, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x24" + } + }, + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x10" + } + } + ]`) + + ctx := context.Background() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rpcReqs []*RPCRequest + err := json.NewDecoder(r.Body).Decode(&rpcReqs) + assert.NoError(t, err) + + w.Header().Add("Content-Type", "application/json") + w.Header().Add("Content-Length", strconv.Itoa(len(rpcServerResponseBatchBytes))) + w.WriteHeader(200) + w.Write(rpcServerResponseBatchBytes) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + tH := 2 * time.Hour + rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + MaxConcurrentRequest: 10, + BatchOptions: &RPCClientBatchOptions{ + BatchDelay: &tH, // very long delay, so need to rely on batch size to be hit for sending a batch + BatchSize: 2, + }, + }).(*RPCClient) + + round := 400 + + reqNumbers := 2*round - 1 + + requestCount := make(chan bool, reqNumbers) + + for i := 0; i < reqNumbers; i++ { + go func() { + _, err := rb.SyncRequest(ctx, &RPCRequest{ + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + requestCount <- true + + }() + } + + for i := 0; i < reqNumbers-1; i++ { + <-requestCount + } + + _, err = rb.SyncRequest(ctx, &RPCRequest{ + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + + <-requestCount +} + +func TestBatchRequestsTestWorkerCounts(t *testing.T) { + // Define the expected server response to the batch + rpcServerResponseBatchBytes := []byte(`[ + { + "jsonrpc": "2.0", + "id": 1, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x24" + } + }, + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x10" + } + } + ]`) + + ctx := context.Background() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rpcReqs []*RPCRequest + err := json.NewDecoder(r.Body).Decode(&rpcReqs) + assert.NoError(t, err) + time.Sleep(200 * time.Millisecond) // set 200s delay for each quest + w.Header().Add("Content-Type", "application/json") + w.Header().Add("Content-Length", strconv.Itoa(len(rpcServerResponseBatchBytes))) + w.WriteHeader(200) + w.Write(rpcServerResponseBatchBytes) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + + // only a single worker + tH := 2 * time.Hour + rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + MaxConcurrentRequest: 10, + BatchOptions: &RPCClientBatchOptions{ + BatchDelay: &tH, // very long delay, so need to rely on batch size to be hit for sending a batch + BatchSize: 2, + }, + }).(*RPCClient) + + round := 5 // doing first round, each round will have at least 200ms delay, so the whole flow will guaranteed to be more than 1s + + reqNumbers := 2 * round + + requestCount := make(chan bool, reqNumbers) + + requestStart := time.Now() + + for i := 0; i < reqNumbers; i++ { + go func() { + _, err := rb.SyncRequest(ctx, &RPCRequest{ + ID: fftypes.JSONAnyPtr("testId"), + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + requestCount <- true + + }() + } + + for i := 0; i < reqNumbers; i++ { + <-requestCount + } + + assert.Greater(t, time.Since(requestStart), 1*time.Second) + + // number of worker equal to the number of rounds + // so the delay should be slightly greater than per request delay (200ms), but hopefully less than 300ms (with 100ms overhead) + rb = NewRPCClientWithOption(ctx, c, RPCClientOptions{ + MaxConcurrentRequest: 10, + BatchOptions: &RPCClientBatchOptions{ + BatchDelay: &tH, // very long delay, so need to rely on batch size to be hit for sending a batch + BatchSize: 2, + BatchWorkerCount: round, + }, + }).(*RPCClient) + + requestStart = time.Now() + + for i := 0; i < reqNumbers; i++ { + go func() { + _, err := rb.SyncRequest(ctx, &RPCRequest{ + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + requestCount <- true + + }() + } + + for i := 0; i < reqNumbers; i++ { + <-requestCount + } + + assert.Greater(t, time.Since(requestStart), 200*time.Millisecond) + assert.Less(t, time.Since(requestStart), 300*time.Millisecond) + +} + +func TestBatchRequestsOKWithBatchDelay(t *testing.T) { + // Define the expected server response to the batch + rpcServerResponseBatchBytes := []byte(`[ + { + "jsonrpc": "2.0", + "id": 1, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x24" + } + }, + { + "jsonrpc": "2.0", + "id": 2, + "result": { + "hash": "0x61ca9c99c1d752fb3bda568b8566edf33ba93585c64a970566e6dfb540a5cbc1", + "nonce": "0x10" + } + } + ]`) + + ctx := context.Background() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var rpcReqs []*RPCRequest + err := json.NewDecoder(r.Body).Decode(&rpcReqs) + assert.NoError(t, err) + + w.Header().Add("Content-Type", "application/json") + w.Header().Add("Content-Length", strconv.Itoa(len(rpcServerResponseBatchBytes))) + w.WriteHeader(200) + w.Write(rpcServerResponseBatchBytes) + })) + defer server.Close() + + signerconfig.Reset() + prefix := signerconfig.BackendConfig + prefix.Set(ffresty.HTTPConfigURL, server.URL) + c, err := ffresty.New(ctx, signerconfig.BackendConfig) + assert.NoError(t, err) + hundredMs := 100 * time.Millisecond + rb := NewRPCClientWithOption(ctx, c, RPCClientOptions{ + MaxConcurrentRequest: 10, + BatchOptions: &RPCClientBatchOptions{ + BatchDelay: &hundredMs, + BatchSize: 2000, // very big batch size, so need to rely on batch delay to be hit for sending a batch + }, + }).(*RPCClient) + + round := 5 + + reqPerRound := 2 + + for i := 0; i < round; i++ { + requestCount := make(chan bool, reqPerRound) + for i := 0; i < reqPerRound; i++ { + go func() { + _, err := rb.SyncRequest(ctx, &RPCRequest{ + Method: "eth_getTransactionByHash", + Params: []*fftypes.JSONAny{fftypes.JSONAnyPtr(`"0xf44d5387087f61237bdb5132e9cf0f38ab20437128f7291b8df595305a1a8284"`)}, + }) + assert.Nil(t, err) + requestCount <- true + + }() + } + + for i := 0; i < reqPerRound; i++ { + <-requestCount + } + + } }