diff --git a/client/config/peerhost.go b/client/config/peerhost.go index 68bc8bc7f75..5ecadf1ae4d 100644 --- a/client/config/peerhost.go +++ b/client/config/peerhost.go @@ -329,6 +329,7 @@ type DownloadOption struct { ResourceClients ResourceClientsOption `mapstructure:"resourceClients" yaml:"resourceClients"` RecursiveConcurrent RecursiveConcurrent `mapstructure:"recursiveConcurrent" yaml:"recursiveConcurrent"` + SeedConcurrent int64 `mapstructure:"seedConcurrent" yaml:"seedConcurrent"` CacheRecursiveMetadata time.Duration `mapstructure:"cacheRecursiveMetadata" yaml:"cacheRecursiveMetadata"` } diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index ab2cd5f9edf..afb7fa130ed 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -360,7 +360,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { } rpcManager, err := rpcserver.New(host, peerTaskManager, storageManager, peerExchangeRPC, schedulerClient, - opt.Download.RecursiveConcurrent.GoroutineCount, opt.Download.CacheRecursiveMetadata, downloadServerOption, peerServerOption) + opt.Download.RecursiveConcurrent.GoroutineCount, opt.Download.SeedConcurrent, + opt.Download.CacheRecursiveMetadata, downloadServerOption, peerServerOption) if err != nil { return nil, err } diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index f7539e4c81c..5046e9a2e8f 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -103,7 +103,8 @@ func init() { } func New(peerHost *schedulerv1.PeerHost, peerTaskManager peer.TaskManager, - storageManager storage.Manager, peerExchanger pex.PeerExchangeRPC, schedulerClient schedulerclient.V1, recursiveConcurrent int, cacheRecursiveMetadata time.Duration, + storageManager storage.Manager, peerExchanger pex.PeerExchangeRPC, schedulerClient schedulerclient.V1, + recursiveConcurrent int, seedConcurrent int64, cacheRecursiveMetadata time.Duration, downloadOpts []grpc.ServerOption, peerOpts []grpc.ServerOption) (Server, error) { s := &server{ KeepAlive: util.NewKeepAlive("rpc server"), @@ -120,7 +121,9 @@ func New(peerHost *schedulerv1.PeerHost, peerTaskManager peer.TaskManager, } sd := &seeder{ - server: s, + server: s, + maxConcurrent: seedConcurrent, + concurrent: atomic.NewInt64(0), } // set not serving by default diff --git a/client/daemon/rpcserver/rpcserver_test.go b/client/daemon/rpcserver/rpcserver_test.go index f67714698d6..1ec635596c0 100644 --- a/client/daemon/rpcserver/rpcserver_test.go +++ b/client/daemon/rpcserver/rpcserver_test.go @@ -76,7 +76,7 @@ func TestServer_New(t *testing.T) { mockSchedulerClient := schedulerclientmocks.NewMockV1(ctrl) var mockdownloadOpts []grpc.ServerOption var mockpeerOpts []grpc.ServerOption - _, err := New(mockpeerHost, mockpeerTaskManager, mockStorageManger, nil, mockSchedulerClient, 16, 0, mockdownloadOpts, mockpeerOpts) + _, err := New(mockpeerHost, mockpeerTaskManager, mockStorageManger, nil, mockSchedulerClient, 16, 0, 0, mockdownloadOpts, mockpeerOpts) tc.expect(t, err) }) } diff --git a/client/daemon/rpcserver/seeder.go b/client/daemon/rpcserver/seeder.go index a9d370c1baf..a47d902a2a3 100644 --- a/client/daemon/rpcserver/seeder.go +++ b/client/daemon/rpcserver/seeder.go @@ -22,6 +22,7 @@ import ( "math" "time" + "go.uber.org/atomic" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -39,7 +40,9 @@ import ( ) type seeder struct { - server *server + maxConcurrent int64 + concurrent *atomic.Int64 + server *server } func (s *seeder) GetPieceTasks(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { @@ -55,6 +58,16 @@ func (s *seeder) ObtainSeeds(seedRequest *cdnsystemv1.SeedRequest, seedsServer c printAuthInfo(seedsServer.Context()) } + if s.maxConcurrent > 0 { + if s.concurrent.Inc() > s.maxConcurrent { + s.concurrent.Dec() + logger.Infof("seed peer is busying, return ResourceExhausted") + return status.Errorf(codes.ResourceExhausted, "seed peer is busying, limit is %d", s.maxConcurrent) + } + + defer s.concurrent.Dec() + } + metrics.SeedPeerConcurrentDownloadGauge.Inc() defer metrics.SeedPeerConcurrentDownloadGauge.Dec() metrics.SeedPeerDownloadCount.Add(1) diff --git a/client/daemon/rpcserver/seeder_test.go b/client/daemon/rpcserver/seeder_test.go index 13f7c62b914..3ed690ec10d 100644 --- a/client/daemon/rpcserver/seeder_test.go +++ b/client/daemon/rpcserver/seeder_test.go @@ -28,10 +28,13 @@ import ( testifyassert "github.com/stretchr/testify/assert" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/trace" + "go.uber.org/atomic" "go.uber.org/mock/gomock" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/health" + "google.golang.org/grpc/status" cdnsystemv1 "d7y.io/api/v2/pkg/apis/cdnsystem/v1" commonv1 "d7y.io/api/v2/pkg/apis/common/v1" @@ -396,3 +399,18 @@ func setupSeederServerAndClient(t *testing.T, srv *server, sd *seeder, assert *t return port, client } + +func Test_ObtainSeedsResourceExhausted(t *testing.T) { + sd := &seeder{ + maxConcurrent: 10, + concurrent: atomic.NewInt64(10), + } + + assert := testifyassert.New(t) + + err := sd.ObtainSeeds(nil, nil) + assert.Error(err, "ObtainSeeds should return error") + st, ok := status.FromError(err) + assert.True(ok, "error should be status") + assert.Equal(codes.ResourceExhausted, st.Code()) +}