Skip to content

Commit

Permalink
feat: seed max concurrent (#3482)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <majinjing3@gmail.com>
  • Loading branch information
jim3ma authored Sep 23, 2024
1 parent b226996 commit 3e73231
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 5 deletions.
1 change: 1 addition & 0 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,6 +329,7 @@ type DownloadOption struct {
ResourceClients ResourceClientsOption `mapstructure:"resourceClients" yaml:"resourceClients"`

RecursiveConcurrent RecursiveConcurrent `mapstructure:"recursiveConcurrent" yaml:"recursiveConcurrent"`
SeedConcurrent int64 `mapstructure:"seedConcurrent" yaml:"seedConcurrent"`
CacheRecursiveMetadata time.Duration `mapstructure:"cacheRecursiveMetadata" yaml:"cacheRecursiveMetadata"`
}

Expand Down
3 changes: 2 additions & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
}

rpcManager, err := rpcserver.New(host, peerTaskManager, storageManager, peerExchangeRPC, schedulerClient,
opt.Download.RecursiveConcurrent.GoroutineCount, opt.Download.CacheRecursiveMetadata, downloadServerOption, peerServerOption)
opt.Download.RecursiveConcurrent.GoroutineCount, opt.Download.SeedConcurrent,
opt.Download.CacheRecursiveMetadata, downloadServerOption, peerServerOption)
if err != nil {
return nil, err
}
Expand Down
7 changes: 5 additions & 2 deletions client/daemon/rpcserver/rpcserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,8 @@ func init() {
}

func New(peerHost *schedulerv1.PeerHost, peerTaskManager peer.TaskManager,
storageManager storage.Manager, peerExchanger pex.PeerExchangeRPC, schedulerClient schedulerclient.V1, recursiveConcurrent int, cacheRecursiveMetadata time.Duration,
storageManager storage.Manager, peerExchanger pex.PeerExchangeRPC, schedulerClient schedulerclient.V1,
recursiveConcurrent int, seedConcurrent int64, cacheRecursiveMetadata time.Duration,
downloadOpts []grpc.ServerOption, peerOpts []grpc.ServerOption) (Server, error) {
s := &server{
KeepAlive: util.NewKeepAlive("rpc server"),
Expand All @@ -120,7 +121,9 @@ func New(peerHost *schedulerv1.PeerHost, peerTaskManager peer.TaskManager,
}

sd := &seeder{
server: s,
server: s,
maxConcurrent: seedConcurrent,
concurrent: atomic.NewInt64(0),
}

// set not serving by default
Expand Down
2 changes: 1 addition & 1 deletion client/daemon/rpcserver/rpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func TestServer_New(t *testing.T) {
mockSchedulerClient := schedulerclientmocks.NewMockV1(ctrl)
var mockdownloadOpts []grpc.ServerOption
var mockpeerOpts []grpc.ServerOption
_, err := New(mockpeerHost, mockpeerTaskManager, mockStorageManger, nil, mockSchedulerClient, 16, 0, mockdownloadOpts, mockpeerOpts)
_, err := New(mockpeerHost, mockpeerTaskManager, mockStorageManger, nil, mockSchedulerClient, 16, 0, 0, mockdownloadOpts, mockpeerOpts)
tc.expect(t, err)
})
}
Expand Down
15 changes: 14 additions & 1 deletion client/daemon/rpcserver/seeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"math"
"time"

"go.uber.org/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"

Expand All @@ -39,7 +40,9 @@ import (
)

type seeder struct {
server *server
maxConcurrent int64
concurrent *atomic.Int64
server *server
}

func (s *seeder) GetPieceTasks(ctx context.Context, request *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) {
Expand All @@ -55,6 +58,16 @@ func (s *seeder) ObtainSeeds(seedRequest *cdnsystemv1.SeedRequest, seedsServer c
printAuthInfo(seedsServer.Context())
}

if s.maxConcurrent > 0 {
if s.concurrent.Inc() > s.maxConcurrent {
s.concurrent.Dec()
logger.Infof("seed peer is busying, return ResourceExhausted")
return status.Errorf(codes.ResourceExhausted, "seed peer is busying, limit is %d", s.maxConcurrent)
}

defer s.concurrent.Dec()
}

metrics.SeedPeerConcurrentDownloadGauge.Inc()
defer metrics.SeedPeerConcurrentDownloadGauge.Dec()
metrics.SeedPeerDownloadCount.Add(1)
Expand Down
18 changes: 18 additions & 0 deletions client/daemon/rpcserver/seeder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,13 @@ import (
testifyassert "github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"go.uber.org/mock/gomock"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
"google.golang.org/grpc/status"

cdnsystemv1 "d7y.io/api/v2/pkg/apis/cdnsystem/v1"
commonv1 "d7y.io/api/v2/pkg/apis/common/v1"
Expand Down Expand Up @@ -396,3 +399,18 @@ func setupSeederServerAndClient(t *testing.T, srv *server, sd *seeder, assert *t

return port, client
}

func Test_ObtainSeedsResourceExhausted(t *testing.T) {
sd := &seeder{
maxConcurrent: 10,
concurrent: atomic.NewInt64(10),
}

assert := testifyassert.New(t)

err := sd.ObtainSeeds(nil, nil)
assert.Error(err, "ObtainSeeds should return error")
st, ok := status.FromError(err)
assert.True(ok, "error should be status")
assert.Equal(codes.ResourceExhausted, st.Code())
}

0 comments on commit 3e73231

Please sign in to comment.