Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ Changelog for NeoFS Node
### Added

### Fixed
- Policer removes redundant local shard copies that could remain on disk forever (#3908)

### Changed

Expand Down
56 changes: 56 additions & 0 deletions pkg/local_object_storage/engine/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,62 @@ func (e *StorageEngine) Delete(addr oid.Address) error {
})
}

// DeleteRedundantCopies marks redundant object copies to be removed from all
// listed shards except the most preferred one according to HRW ordering.
//
// Returns an error if executions are blocked (see BlockExecution) or if none of
// the provided shards is found in the engine.
func (e *StorageEngine) DeleteRedundantCopies(addr oid.Address, shardIDs []string) error {
if e.metrics != nil {
defer elapsed(e.metrics.AddDeleteDuration)()
}

e.blockMtx.RLock()
defer e.blockMtx.RUnlock()

if e.blockErr != nil {
return e.blockErr
}

if len(shardIDs) < 2 {
return nil
}

allowed := make(map[string]struct{}, len(shardIDs))
for i := range shardIDs {
allowed[shardIDs[i]] = struct{}{}
}

var deleteShards []shardWrapper
keep := true
for _, sh := range e.sortedShards(addr.Object()) {
if _, ok := allowed[sh.ID().String()]; !ok {
continue
}

if keep {
keep = false
delete(allowed, sh.ID().String())
continue
}

deleteShards = append(deleteShards, sh)
delete(allowed, sh.ID().String())
}

if keep {
return errShardNotFound
}

if len(deleteShards) == 0 {
return nil
}

return e.processAddrDeleteOnShards(deleteShards, addr, func(sh *shard.Shard, addrs []oid.Address) error {
return sh.MarkGarbage(addrs...)
})
}

// Drop removes an object from the storage engine from all shards. This
// function bypasses any lock checks or other reasons to keep the object and
// performs immediate removal, not just marking it to be removed by GC later
Expand Down
5 changes: 4 additions & 1 deletion pkg/local_object_storage/engine/inhume.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,15 @@ func (e *StorageEngine) InhumeContainer(cID cid.ID) error {

// processAddrDelete processes deletion (inhume or immediate delete) of an object by its address.
func (e *StorageEngine) processAddrDelete(addr oid.Address, deleteFunc func(*shard.Shard, []oid.Address) error) error {
return e.processAddrDeleteOnShards(e.sortedShards(addr.Object()), addr, deleteFunc)
}

func (e *StorageEngine) processAddrDeleteOnShards(shards []shardWrapper, addr oid.Address, deleteFunc func(*shard.Shard, []oid.Address) error) error {
var (
children []oid.Address
err error
root bool
siNoLink *object.SplitInfo
shards = e.sortedShards(addr.Object())
)

// see if the object is root
Expand Down
23 changes: 23 additions & 0 deletions pkg/services/policer/check.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,10 @@ func (p *Policer) processObject(ctx context.Context, addrWithAttrs objectcore.Ad
}

p.dropRedundantLocalObject(addr)
return
}

p.dropRedundantLocalCopies(addrWithAttrs)
}

type processPlacementContext struct {
Expand Down Expand Up @@ -369,6 +372,26 @@ func (p *Policer) dropRedundantLocalObject(addr oid.Address) {
}
}

func (p *Policer) dropRedundantLocalCopies(obj objectcore.AddressWithAttributes) {
if len(obj.ShardIDs) < 2 {
return
}

switch obj.Type {
case object.TypeTombstone, object.TypeLock, object.TypeLink:
return
default:
}

err := p.localStorage.DeleteRedundantCopies(obj.Address, obj.ShardIDs)
if err != nil {
p.log.Warn("could not mark redundant local shard copies as garbage",
zap.Stringer("object", obj.Address),
zap.Strings("shards", obj.ShardIDs),
zap.Error(err))
}
}

func (p *Policer) tryToReplicate(ctx context.Context, addr oid.Address, shortage uint32, candidates []netmap.NodeInfo, res replicator.TaskResult) {
var task replicator.Task
task.SetObjectAddress(addr)
Expand Down
1 change: 1 addition & 0 deletions pkg/services/policer/policer.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type replicatorIface interface {
type localStorage interface {
ListWithCursor(uint32, *engine.Cursor, ...string) ([]objectcore.AddressWithAttributes, *engine.Cursor, error)
Delete(oid.Address) error
DeleteRedundantCopies(oid.Address, []string) error
Put(*object.Object, []byte) error
Head(oid.Address, bool) (*object.Object, error)
HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error)
Expand Down
118 changes: 115 additions & 3 deletions pkg/services/policer/policer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ func (s *storageListerWithDelay) Delete(address oid.Address) error {
panic("do not call me")
}

func (s *storageListerWithDelay) DeleteRedundantCopies(address oid.Address, _ []string) error {
panic("do not call me")
}

func (s *storageListerWithDelay) Put(o *object.Object, i []byte) error {
panic("do not call me")
}
Expand Down Expand Up @@ -1229,6 +1233,89 @@ func testECCheckWithNetworkAndShortage(t *testing.T, mockNet *mockNetwork, local
return lb
}

func TestPolicer_DropShardDuplicates(t *testing.T) {
t.Run("regular", func(t *testing.T) {
cnr := cidtest.ID()
objID := oidtest.ID()
addr := oid.NewAddress(cnr, objID)
nodes := testutil.Nodes(2)

localObj := objectcore.AddressWithAttributes{
Address: addr,
Type: object.TypeRegular,
Attributes: make([]string, 3),
ShardIDs: []string{"redundant", "keeper"},
}

localNode := newTestLocalNode()
localNode.deleteRedundantCopies = func(got oid.Address, shardIDs []string) error {
require.Equal(t, addr, got)
require.ElementsMatch(t, []string{"redundant", "keeper"}, shardIDs)
localNode.delMtx.Lock()
localNode.delByShard[addr] = []string{"redundant"}
localNode.delMtx.Unlock()
return nil
}

mockNet := newMockNetwork()
mockNet.pubKey = nodes[0].PublicKey()
mockNet.setObjectNodesRepResult(cnr, objID, nodes, 1)

p := New(neofscryptotest.Signer(),
WithNetwork(mockNet),
WithLogger(zap.NewNop()),
)
p.localStorage = localNode

p.processObject(context.Background(), localObj)

require.Empty(t, localNode.deletedObjects())
require.Equal(t, []string{"redundant"}, localNode.deletedShardCopies(addr))
})

t.Run("broadcast", func(t *testing.T) {
for _, typ := range []object.Type{object.TypeTombstone, object.TypeLock, object.TypeLink} {
t.Run(typ.String(), func(t *testing.T) {
cnr := cidtest.ID()
objID := oidtest.ID()
addr := oid.NewAddress(cnr, objID)
nodes := testutil.Nodes(2)

localObj := objectcore.AddressWithAttributes{
Address: addr,
Type: typ,
Attributes: make([]string, 3),
ShardIDs: []string{"A", "B"},
}

localNode := newTestLocalNode()
localNode.deleteRedundantCopies = func(oid.Address, []string) error {
t.Fatal("DeleteRedundantCopies must not be called for broadcast objects")
return nil
}

mockNet := newMockNetwork()
mockNet.pubKey = nodes[0].PublicKey()
mockNet.setObjectNodesRepResult(cnr, objID, nodes, 1)
conns := newMockAPIConnections()
conns.setHeadResult(nodes[1], addr, nil)

p := New(neofscryptotest.Signer(),
WithNetwork(mockNet),
WithLogger(zap.NewNop()),
)
p.localStorage = localNode
p.apiConns = conns

p.processObject(context.Background(), localObj)

require.Empty(t, localNode.deletedObjects())
require.Empty(t, localNode.deletedShardCopies(addr))
})
}
})
}

func waitForPolicerResult(t *testing.T, p *Policer, lb *testutil.LogBuffer, mockNet *mockNetwork, localNode *testLocalNode, addr oid.Address, expRedundant bool, expectTask bool, r *testReplicator, waitLog ...*testutil.LogEntry) {
t.Helper()

Expand Down Expand Up @@ -1301,13 +1388,16 @@ func (x *testReplicator) HandleTask(ctx context.Context, task replicator.Task, r
type testLocalNode struct {
objList []objectcore.AddressWithAttributes

delMtx sync.RWMutex
del map[oid.Address]struct{}
delMtx sync.RWMutex
del map[oid.Address]struct{}
delByShard map[oid.Address][]string
deleteRedundantCopies func(oid.Address, []string) error
}

func newTestLocalNode() *testLocalNode {
return &testLocalNode{
del: make(map[oid.Address]struct{}),
del: make(map[oid.Address]struct{}),
delByShard: make(map[oid.Address][]string),
}
}

Expand Down Expand Up @@ -1396,6 +1486,13 @@ func (x *testLocalNode) deletedObjects() []oid.Address {
return res
}

func (x *testLocalNode) deletedShardCopies(addr oid.Address) []string {
x.delMtx.RLock()
res := slices.Clone(x.delByShard[addr])
x.delMtx.RUnlock()
return res
}

func (x *testLocalNode) Delete(addr oid.Address) error {
x.delMtx.Lock()
x.del[addr] = struct{}{}
Expand All @@ -1419,6 +1516,21 @@ func (x *testLocalNode) GetRange(oid.Address, uint64, uint64) ([]byte, error) {
panic("unimplemented")
}

func (x *testLocalNode) DeleteRedundantCopies(addr oid.Address, shardIDs []string) error {
if x.deleteRedundantCopies != nil {
return x.deleteRedundantCopies(addr, shardIDs)
}

if len(shardIDs) < 2 {
return nil
}

x.delMtx.Lock()
x.delByShard[addr] = append(x.delByShard[addr], shardIDs[1:]...)
x.delMtx.Unlock()
return nil
}

type getNodesKey struct {
cnr cid.ID
obj oid.ID
Expand Down
Loading