diff --git a/CHANGELOG.md b/CHANGELOG.md index 9018890dc6..f50d28aa73 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ Changelog for NeoFS Node ### Added ### Fixed +- Policer removes redundant local shard copies that could remain on disk forever (#3908) ### Changed diff --git a/pkg/local_object_storage/engine/delete.go b/pkg/local_object_storage/engine/delete.go index afde81f3dc..c08106dadb 100644 --- a/pkg/local_object_storage/engine/delete.go +++ b/pkg/local_object_storage/engine/delete.go @@ -27,6 +27,62 @@ func (e *StorageEngine) Delete(addr oid.Address) error { }) } +// DeleteRedundantCopies marks redundant object copies to be removed from all +// listed shards except the most preferred one according to HRW ordering. +// +// Returns an error if executions are blocked (see BlockExecution) or if none of +// the provided shards is found in the engine. +func (e *StorageEngine) DeleteRedundantCopies(addr oid.Address, shardIDs []string) error { + if e.metrics != nil { + defer elapsed(e.metrics.AddDeleteDuration)() + } + + e.blockMtx.RLock() + defer e.blockMtx.RUnlock() + + if e.blockErr != nil { + return e.blockErr + } + + if len(shardIDs) < 2 { + return nil + } + + allowed := make(map[string]struct{}, len(shardIDs)) + for i := range shardIDs { + allowed[shardIDs[i]] = struct{}{} + } + + var deleteShards []shardWrapper + keep := true + for _, sh := range e.sortedShards(addr.Object()) { + if _, ok := allowed[sh.ID().String()]; !ok { + continue + } + + if keep { + keep = false + delete(allowed, sh.ID().String()) + continue + } + + deleteShards = append(deleteShards, sh) + delete(allowed, sh.ID().String()) + } + + if keep { + return errShardNotFound + } + + if len(deleteShards) == 0 { + return nil + } + + return e.processAddrDeleteOnShards(deleteShards, addr, func(sh *shard.Shard, addrs []oid.Address) error { + return sh.MarkGarbage(addrs...) + }) +} + // Drop removes an object from the storage engine from all shards. This // function bypasses any lock checks or other reasons to keep the object and // performs immediate removal, not just marking it to be removed by GC later diff --git a/pkg/local_object_storage/engine/inhume.go b/pkg/local_object_storage/engine/inhume.go index 3e3fdab645..c50d45b700 100644 --- a/pkg/local_object_storage/engine/inhume.go +++ b/pkg/local_object_storage/engine/inhume.go @@ -44,12 +44,15 @@ func (e *StorageEngine) InhumeContainer(cID cid.ID) error { // processAddrDelete processes deletion (inhume or immediate delete) of an object by its address. func (e *StorageEngine) processAddrDelete(addr oid.Address, deleteFunc func(*shard.Shard, []oid.Address) error) error { + return e.processAddrDeleteOnShards(e.sortedShards(addr.Object()), addr, deleteFunc) +} + +func (e *StorageEngine) processAddrDeleteOnShards(shards []shardWrapper, addr oid.Address, deleteFunc func(*shard.Shard, []oid.Address) error) error { var ( children []oid.Address err error root bool siNoLink *object.SplitInfo - shards = e.sortedShards(addr.Object()) ) // see if the object is root diff --git a/pkg/services/policer/check.go b/pkg/services/policer/check.go index 50ff8c843f..fd1938a153 100644 --- a/pkg/services/policer/check.go +++ b/pkg/services/policer/check.go @@ -216,7 +216,10 @@ func (p *Policer) processObject(ctx context.Context, addrWithAttrs objectcore.Ad } p.dropRedundantLocalObject(addr) + return } + + p.dropRedundantLocalCopies(addrWithAttrs) } type processPlacementContext struct { @@ -369,6 +372,26 @@ func (p *Policer) dropRedundantLocalObject(addr oid.Address) { } } +func (p *Policer) dropRedundantLocalCopies(obj objectcore.AddressWithAttributes) { + if len(obj.ShardIDs) < 2 { + return + } + + switch obj.Type { + case object.TypeTombstone, object.TypeLock, object.TypeLink: + return + default: + } + + err := p.localStorage.DeleteRedundantCopies(obj.Address, obj.ShardIDs) + if err != nil { + p.log.Warn("could not mark redundant local shard copies as garbage", + zap.Stringer("object", obj.Address), + zap.Strings("shards", obj.ShardIDs), + zap.Error(err)) + } +} + func (p *Policer) tryToReplicate(ctx context.Context, addr oid.Address, shortage uint32, candidates []netmap.NodeInfo, res replicator.TaskResult) { var task replicator.Task task.SetObjectAddress(addr) diff --git a/pkg/services/policer/policer.go b/pkg/services/policer/policer.go index b6c0885d16..73633938d1 100644 --- a/pkg/services/policer/policer.go +++ b/pkg/services/policer/policer.go @@ -31,6 +31,7 @@ type replicatorIface interface { type localStorage interface { ListWithCursor(uint32, *engine.Cursor, ...string) ([]objectcore.AddressWithAttributes, *engine.Cursor, error) Delete(oid.Address) error + DeleteRedundantCopies(oid.Address, []string) error Put(*object.Object, []byte) error Head(oid.Address, bool) (*object.Object, error) HeadECPart(cid.ID, oid.ID, iec.PartInfo) (object.Object, error) diff --git a/pkg/services/policer/policer_test.go b/pkg/services/policer/policer_test.go index 3e5df6a15d..8beaf42a4c 100644 --- a/pkg/services/policer/policer_test.go +++ b/pkg/services/policer/policer_test.go @@ -60,6 +60,10 @@ func (s *storageListerWithDelay) Delete(address oid.Address) error { panic("do not call me") } +func (s *storageListerWithDelay) DeleteRedundantCopies(address oid.Address, _ []string) error { + panic("do not call me") +} + func (s *storageListerWithDelay) Put(o *object.Object, i []byte) error { panic("do not call me") } @@ -1229,6 +1233,89 @@ func testECCheckWithNetworkAndShortage(t *testing.T, mockNet *mockNetwork, local return lb } +func TestPolicer_DropShardDuplicates(t *testing.T) { + t.Run("regular", func(t *testing.T) { + cnr := cidtest.ID() + objID := oidtest.ID() + addr := oid.NewAddress(cnr, objID) + nodes := testutil.Nodes(2) + + localObj := objectcore.AddressWithAttributes{ + Address: addr, + Type: object.TypeRegular, + Attributes: make([]string, 3), + ShardIDs: []string{"redundant", "keeper"}, + } + + localNode := newTestLocalNode() + localNode.deleteRedundantCopies = func(got oid.Address, shardIDs []string) error { + require.Equal(t, addr, got) + require.ElementsMatch(t, []string{"redundant", "keeper"}, shardIDs) + localNode.delMtx.Lock() + localNode.delByShard[addr] = []string{"redundant"} + localNode.delMtx.Unlock() + return nil + } + + mockNet := newMockNetwork() + mockNet.pubKey = nodes[0].PublicKey() + mockNet.setObjectNodesRepResult(cnr, objID, nodes, 1) + + p := New(neofscryptotest.Signer(), + WithNetwork(mockNet), + WithLogger(zap.NewNop()), + ) + p.localStorage = localNode + + p.processObject(context.Background(), localObj) + + require.Empty(t, localNode.deletedObjects()) + require.Equal(t, []string{"redundant"}, localNode.deletedShardCopies(addr)) + }) + + t.Run("broadcast", func(t *testing.T) { + for _, typ := range []object.Type{object.TypeTombstone, object.TypeLock, object.TypeLink} { + t.Run(typ.String(), func(t *testing.T) { + cnr := cidtest.ID() + objID := oidtest.ID() + addr := oid.NewAddress(cnr, objID) + nodes := testutil.Nodes(2) + + localObj := objectcore.AddressWithAttributes{ + Address: addr, + Type: typ, + Attributes: make([]string, 3), + ShardIDs: []string{"A", "B"}, + } + + localNode := newTestLocalNode() + localNode.deleteRedundantCopies = func(oid.Address, []string) error { + t.Fatal("DeleteRedundantCopies must not be called for broadcast objects") + return nil + } + + mockNet := newMockNetwork() + mockNet.pubKey = nodes[0].PublicKey() + mockNet.setObjectNodesRepResult(cnr, objID, nodes, 1) + conns := newMockAPIConnections() + conns.setHeadResult(nodes[1], addr, nil) + + p := New(neofscryptotest.Signer(), + WithNetwork(mockNet), + WithLogger(zap.NewNop()), + ) + p.localStorage = localNode + p.apiConns = conns + + p.processObject(context.Background(), localObj) + + require.Empty(t, localNode.deletedObjects()) + require.Empty(t, localNode.deletedShardCopies(addr)) + }) + } + }) +} + func waitForPolicerResult(t *testing.T, p *Policer, lb *testutil.LogBuffer, mockNet *mockNetwork, localNode *testLocalNode, addr oid.Address, expRedundant bool, expectTask bool, r *testReplicator, waitLog ...*testutil.LogEntry) { t.Helper() @@ -1301,13 +1388,16 @@ func (x *testReplicator) HandleTask(ctx context.Context, task replicator.Task, r type testLocalNode struct { objList []objectcore.AddressWithAttributes - delMtx sync.RWMutex - del map[oid.Address]struct{} + delMtx sync.RWMutex + del map[oid.Address]struct{} + delByShard map[oid.Address][]string + deleteRedundantCopies func(oid.Address, []string) error } func newTestLocalNode() *testLocalNode { return &testLocalNode{ - del: make(map[oid.Address]struct{}), + del: make(map[oid.Address]struct{}), + delByShard: make(map[oid.Address][]string), } } @@ -1396,6 +1486,13 @@ func (x *testLocalNode) deletedObjects() []oid.Address { return res } +func (x *testLocalNode) deletedShardCopies(addr oid.Address) []string { + x.delMtx.RLock() + res := slices.Clone(x.delByShard[addr]) + x.delMtx.RUnlock() + return res +} + func (x *testLocalNode) Delete(addr oid.Address) error { x.delMtx.Lock() x.del[addr] = struct{}{} @@ -1419,6 +1516,21 @@ func (x *testLocalNode) GetRange(oid.Address, uint64, uint64) ([]byte, error) { panic("unimplemented") } +func (x *testLocalNode) DeleteRedundantCopies(addr oid.Address, shardIDs []string) error { + if x.deleteRedundantCopies != nil { + return x.deleteRedundantCopies(addr, shardIDs) + } + + if len(shardIDs) < 2 { + return nil + } + + x.delMtx.Lock() + x.delByShard[addr] = append(x.delByShard[addr], shardIDs[1:]...) + x.delMtx.Unlock() + return nil +} + type getNodesKey struct { cnr cid.ID obj oid.ID