Skip to content

Commit

Permalink
ir: maintain container placement in container contract
Browse files Browse the repository at this point in the history
After nspcc-dev/neofs-contract#438 Container contract
now has API for container-side placement/verification operations. But building
placement vectors and updating when needed is still a complex operation that
should be done on a Go application side. This commit adds such a responsibility
for Alphabet nodes. For simplicity, updating is done every epoch and once for
_every_ container if netmap has been changed. Additional optimization can be
considered.

Signed-off-by: Pavel Karpy <carpawell@nspcc.ru>
  • Loading branch information
carpawell committed Dec 12, 2024
1 parent 75eaea8 commit 4fc0b0d
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 9 deletions.
37 changes: 37 additions & 0 deletions pkg/innerring/processors/container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package processors

import (
"fmt"

cnrcli "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
cid "github.com/nspcc-dev/neofs-sdk-go/container/id"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
)

// UpdatePlacementVectors updates placement vectors after a container placement
// change in Container contract. Empty vectors drops container vectors from the
// contract.
func UpdatePlacementVectors(cID cid.ID, cnrCli *cnrcli.Client, vectors [][]netmap.NodeInfo, replicas []uint32) error {
for i, vector := range vectors {
err := cnrCli.AddNextEpochNodes(cID, i, pubKeys(vector))
if err != nil {
return fmt.Errorf("can't add %d placement vector to Container contract: %w", i, err)
}
}

err := cnrCli.CommitContainerListUpdate(cID, replicas)
if err != nil {
return fmt.Errorf("can't commit container list to Container contract: %w", err)
}

return nil
}

func pubKeys(nodes []netmap.NodeInfo) [][]byte {
res := make([][]byte, 0, len(nodes))
for _, node := range nodes {
res = append(res, node.PublicKey())
}

return res
}
49 changes: 43 additions & 6 deletions pkg/innerring/processors/container/process_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/nspcc-dev/neo-go/pkg/network/payload"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors"
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
Expand All @@ -26,7 +27,10 @@ type putEvent interface {
type putContainerContext struct {
e putEvent

d containerSDK.Domain
// must be filled when verifying raw data from e
cID cid.ID
cnr containerSDK.Container
d containerSDK.Domain
}

// Process a new container from the user by checking the container sanity
Expand Down Expand Up @@ -55,15 +59,15 @@ func (cp *Processor) processContainerPut(put putEvent) {

func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
binCnr := ctx.e.Container()
var cnr containerSDK.Container
ctx.cID = cid.NewFromMarshalledContainer(binCnr)

err := cnr.Unmarshal(binCnr)
err := ctx.cnr.Unmarshal(binCnr)
if err != nil {
return fmt.Errorf("invalid binary container: %w", err)
}

err = cp.verifySignature(signatureVerificationData{
ownerContainer: cnr.Owner(),
ownerContainer: ctx.cnr.Owner(),
verb: session.VerbContainerPut,
binTokenSession: ctx.e.SessionToken(),
binPublicKey: ctx.e.PublicKey(),
Expand All @@ -75,13 +79,13 @@ func (cp *Processor) checkPutContainer(ctx *putContainerContext) error {
}

// check homomorphic hashing setting
err = checkHomomorphicHashing(cp.netState, cnr)
err = checkHomomorphicHashing(cp.netState, ctx.cnr)
if err != nil {
return fmt.Errorf("incorrect homomorphic hashing setting: %w", err)
}

// check native name and zone
err = checkNNS(ctx, cnr)
err = checkNNS(ctx, ctx.cnr)
if err != nil {
return fmt.Errorf("NNS: %w", err)
}
Expand Down Expand Up @@ -110,6 +114,31 @@ func (cp *Processor) approvePutContainer(ctx *putContainerContext) {
cp.log.Error("could not approve put container",
zap.Error(err),
)
return
}

nm, err := cp.netState.NetMap()
if err != nil {
cp.log.Error("could not get netmap for Container contract update", zap.Stringer("cid", ctx.cID), zap.Error(err))
return
}

policy := ctx.cnr.PlacementPolicy()
vectors, err := nm.ContainerNodes(policy, ctx.cID)
if err != nil {
cp.log.Error("could not build placement for Container contract update", zap.Stringer("cid", ctx.cID), zap.Error(err))
return
}

replicas := make([]uint32, 0, policy.NumberOfReplicas())
for i := range vectors {
replicas = append(replicas, policy.ReplicaNumberByIndex(i))
}

err = processors.UpdatePlacementVectors(ctx.cID, cp.cnrClient, vectors, replicas)
if err != nil {
cp.log.Error("could not update Container contract", zap.Stringer("cid", ctx.cID), zap.Error(err))
return
}
}

Expand Down Expand Up @@ -181,6 +210,14 @@ func (cp *Processor) approveDeleteContainer(e *containerEvent.Delete) {
cp.log.Error("could not approve delete container",
zap.Error(err),
)
return
}

cID := cid.ID(e.ContainerID())
err = processors.UpdatePlacementVectors(cID, cp.cnrClient, nil, nil)
if err != nil {
cp.log.Error("could not drop placement in Container contact", zap.Stringer("cid", cID), zap.Error(err))
return
}
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/innerring/processors/container/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/nspcc-dev/neofs-node/pkg/morph/client/neofsid"
"github.com/nspcc-dev/neofs-node/pkg/morph/event"
containerEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/container"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"github.com/panjf2000/ants/v2"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -55,6 +56,9 @@ type NetworkState interface {
//
// which did not allow reading the value.
HomomorphicHashDisabled() (bool, error)

// NetMap must return actual network map.
NetMap() (*netmap.NetMap, error)
}

// New creates a container contract processor instance.
Expand Down
16 changes: 14 additions & 2 deletions pkg/innerring/processors/netmap/cleanup_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package netmap

import (
"bytes"
"slices"
"sync"

"github.com/nspcc-dev/neofs-sdk-go/netmap"
Expand All @@ -13,6 +14,8 @@ type (
enabled bool
threshold uint64
lastAccess map[string]epochStampWithNodeInfo

prev netmap.NetMap
}

epochStamp struct {
Expand All @@ -36,8 +39,9 @@ func newCleanupTable(enabled bool, threshold uint64) cleanupTable {
}
}

// Update cleanup table based on on-chain information about netmap.
func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) {
// Update cleanup table based on on-chain information about netmap. Returned
// value indicates if the composition of network map memebers has changed.
func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) bool {
c.Lock()
defer c.Unlock()

Expand All @@ -64,6 +68,14 @@ func (c *cleanupTable) update(snapshot netmap.NetMap, now uint64) {
}

c.lastAccess = newMap

// order is expected to be the same from epoch to epoch
mapChanged := !slices.EqualFunc(c.prev.Nodes(), nmNodes, func(i1 netmap.NodeInfo, i2 netmap.NodeInfo) bool {
return bytes.Equal(i1.PublicKey(), i2.PublicKey())
})
c.prev = snapshot

return mapChanged
}

// updates last access time of the netmap node by string public key.
Expand Down
56 changes: 55 additions & 1 deletion pkg/innerring/processors/netmap/process_epoch.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package netmap

import (
"fmt"

"github.com/nspcc-dev/neofs-node/pkg/innerring/processors"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/audit"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/governance"
"github.com/nspcc-dev/neofs-node/pkg/innerring/processors/settlement"
cntClient "github.com/nspcc-dev/neofs-node/pkg/morph/client/container"
netmapEvent "github.com/nspcc-dev/neofs-node/pkg/morph/event/netmap"
"github.com/nspcc-dev/neofs-sdk-go/netmap"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -65,14 +69,64 @@ func (np *Processor) processNewEpoch(ev netmapEvent.NewEpoch) {
}
}

np.netmapSnapshot.update(*networkMap, epoch)
if np.netmapSnapshot.update(*networkMap, epoch) {
l.Debug("updating placements in Container contract...")
err = np.updatePlacementInContract(*networkMap, l)
if err != nil {
l.Error("can't update placements in Container contract", zap.Error(err))
} else {
l.Debug("updated placements in Container contract")
}
}
np.handleCleanupTick(netmapCleanupTick{epoch: epoch, txHash: ev.TxHash()})
np.handleNewAudit(audit.NewAuditStartEvent(epoch))
np.handleAuditSettlements(settlement.NewAuditEvent(epoch))
np.handleAlphabetSync(governance.NewSyncEvent(ev.TxHash()))
np.handleNotaryDeposit(ev)
}

func (np *Processor) updatePlacementInContract(nm netmap.NetMap, l *zap.Logger) error {
// TODO: https://github.com/nspcc-dev/neofs-node/issues/3045
cids, err := np.containerWrp.List(nil)
if err != nil {
return fmt.Errorf("can't get containers list: %w", err)
}

for _, cID := range cids {
l := l.With(zap.Stringer("cid", cID))
l.Debug("updating container placement in Container contract...")

cnr, err := np.containerWrp.Get(cID[:])
if err != nil {
l.Error("can't get container to update its placement in Container contract", zap.Error(err))
continue
}

policy := cnr.Value.PlacementPolicy()

vectors, err := nm.ContainerNodes(policy, cID)
if err != nil {
l.Error("can't build placement vectors for update in Container contract", zap.Error(err))
continue
}

replicas := make([]uint32, 0, policy.NumberOfReplicas())
for i := range vectors {
replicas = append(replicas, policy.ReplicaNumberByIndex(i))
}

err = processors.UpdatePlacementVectors(cID, np.containerWrp, vectors, replicas)
if err != nil {
l.Error("can't put placement vectors to Container contract", zap.Error(err))
continue
}

l.Debug("updated container placement in Container contract")
}

return nil
}

// Process new epoch tick by invoking new epoch method in network map contract.
func (np *Processor) processNewEpochTick() {
if !np.alphabetState.IsAlphabet() {
Expand Down

0 comments on commit 4fc0b0d

Please sign in to comment.