From 3aa1f3d04f94c5c1929bface1b2962b0ce28caa2 Mon Sep 17 00:00:00 2001 From: Andrey Butusov Date: Tue, 17 Mar 2026 17:39:50 +0300 Subject: [PATCH 1/2] storage: move shard ID type into struct Extract shard ID into `pkg/local_object_storage/blobstor/common` and switch internal usages to `common.ID`. This moves the type closer to its storage domain and avoids base58-oriented handling in internal storage. Signed-off-by: Andrey Butusov --- cmd/neofs-lancet/internal/meta/resync.go | 12 +- .../blobstor/common/id.go | 87 ++++++++++++ .../blobstor/common/storage.go | 4 +- .../blobstor/fstree/control.go | 35 +++-- .../blobstor/fstree/fstree.go | 21 ++- .../blobstor/fstree/fstree_descriptor_test.go | 131 ++++++++++++------ pkg/local_object_storage/engine/dump.go | 4 +- pkg/local_object_storage/engine/ec_test.go | 58 ++++---- pkg/local_object_storage/engine/engine.go | 3 +- .../engine/engine_test.go | 12 +- pkg/local_object_storage/engine/error_test.go | 9 +- pkg/local_object_storage/engine/evacuate.go | 3 +- .../engine/evacuate_test.go | 5 +- pkg/local_object_storage/engine/restore.go | 4 +- pkg/local_object_storage/engine/revive.go | 2 +- pkg/local_object_storage/engine/shards.go | 26 ++-- pkg/local_object_storage/engine/status.go | 2 +- pkg/local_object_storage/engine/writecache.go | 4 +- pkg/local_object_storage/metabase/control.go | 11 +- .../shard/control_test.go | 12 +- pkg/local_object_storage/shard/id.go | 37 ++--- pkg/local_object_storage/shard/info.go | 3 +- pkg/local_object_storage/shard/shard.go | 2 +- .../shard/shard_internal_test.go | 4 +- pkg/local_object_storage/shard/shard_test.go | 9 +- .../writecache/storage.go | 10 +- pkg/services/control/server/dump.go | 7 +- pkg/services/control/server/evacuate.go | 7 +- pkg/services/control/server/flush_cache.go | 7 +- pkg/services/control/server/helpers.go | 23 ++- pkg/services/control/server/list_shards.go | 2 +- pkg/services/control/server/restore.go | 7 +- pkg/services/control/server/set_shard_mode.go | 7 +- 33 files changed, 383 insertions(+), 187 deletions(-) create mode 100644 pkg/local_object_storage/blobstor/common/id.go diff --git a/cmd/neofs-lancet/internal/meta/resync.go b/cmd/neofs-lancet/internal/meta/resync.go index d2440c2fd7..389ca523af 100644 --- a/cmd/neofs-lancet/internal/meta/resync.go +++ b/cmd/neofs-lancet/internal/meta/resync.go @@ -3,8 +3,8 @@ package meta import ( "fmt" - "github.com/mr-tron/base58" common "github.com/nspcc-dev/neofs-node/cmd/neofs-lancet/internal" + blobstorcommon "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" @@ -84,8 +84,14 @@ func resyncFunc(cmd *cobra.Command, _ []string) error { if err != nil { return fmt.Errorf("read shard ID from metabase: %w", err) } - metaShardID := base58.Encode(idRaw) - if len(metaShardID) > 0 && metaShardID != blobstorShardID && !vForce { + var metaShardID blobstorcommon.ID + if len(idRaw) != 0 { + metaShardID, err = blobstorcommon.NewIDFromBytes(idRaw) + if err != nil { + return fmt.Errorf("decode metabase shard ID: %w", err) + } + } + if !metaShardID.IsZero() && !metaShardID.Equal(blobstorShardID) && !vForce { return fmt.Errorf("metabase shard ID %q does not match blobstor shard ID %q, use --%s to override", metaShardID, blobstorShardID, forceFlagName) } diff --git a/pkg/local_object_storage/blobstor/common/id.go b/pkg/local_object_storage/blobstor/common/id.go new file mode 100644 index 0000000000..ebf5528f4b --- /dev/null +++ b/pkg/local_object_storage/blobstor/common/id.go @@ -0,0 +1,87 @@ +package common + +import ( + "bytes" + "encoding/binary" + "fmt" + + "github.com/google/uuid" + "github.com/mr-tron/base58" +) + +// IDSize is the binary shard ID size in bytes. +const IDSize = 16 + +// ID represents a shard identifier. +type ID struct { + raw [IDSize]byte + str string + hash uint64 +} + +// NewID generates a new shard identifier. +func NewID() (ID, error) { + uid, err := uuid.NewRandom() + if err != nil { + return ID{}, err + } + + return NewIDFromBytes(uid[:]) +} + +// NewIDFromBytes constructs ID from fixed-size raw bytes. +func NewIDFromBytes(v []byte) (ID, error) { + if len(v) != IDSize { + return ID{}, fmt.Errorf("invalid shard ID length %d, expected %d", len(v), IDSize) + } + + var id ID + copy(id.raw[:], v) + id.str = base58.Encode(id.raw[:]) + id.hash = binary.BigEndian.Uint64(id.raw[:8]) + + return id, nil +} + +// DecodeIDString decodes base58 string into ID. +func DecodeIDString(s string) (ID, error) { + if s == "" { + return ID{}, fmt.Errorf("empty shard ID string") + } + + b, err := base58.Decode(s) + if err != nil { + return ID{}, err + } + + return NewIDFromBytes(b) +} + +// String encodes ID into base58 form. +func (id ID) String() string { + return id.str +} + +// Bytes returns a copy of the raw ID bytes. +func (id ID) Bytes() []byte { + if id.IsZero() { + return nil + } + + return bytes.Clone(id.raw[:]) +} + +// Equal reports whether two IDs are equal. +func (id ID) Equal(other ID) bool { + return id == other +} + +// Hash returns the cached HRW hash. +func (id ID) Hash() uint64 { + return id.hash +} + +// IsZero reports whether ID is empty. +func (id ID) IsZero() bool { + return id.str == "" +} diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index cba2814520..91f238f2a3 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -19,10 +19,10 @@ type Storage interface { Type() string Path() string - ShardID() string + ShardID() ID SetLogger(*zap.Logger) SetCompressor(cc *compression.Config) - SetShardID(id string) + SetShardID(id ID) // GetBytes reads object by address into memory buffer in a canonical NeoFS // binary format. Returns [apistatus.ObjectNotFound] if object is missing. diff --git a/pkg/local_object_storage/blobstor/fstree/control.go b/pkg/local_object_storage/blobstor/fstree/control.go index 721ca05435..e6639df061 100644 --- a/pkg/local_object_storage/blobstor/fstree/control.go +++ b/pkg/local_object_storage/blobstor/fstree/control.go @@ -8,6 +8,7 @@ import ( "os" "path/filepath" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/util" ) @@ -68,10 +69,14 @@ func (t *FSTree) checkConfig() error { return fmt.Errorf("descriptor %q is missing, can't open read-only storage", descPath) } // create new descriptor + var shardID string + if !t.shardID.IsZero() { + shardID = t.shardID.String() + } d := fsDescriptor{ Version: currentVersion, Depth: t.Depth, - ShardID: t.shardID, + ShardID: shardID, } data, err := json.Marshal(d) if err != nil { @@ -110,12 +115,21 @@ func (t *FSTree) checkConfig() error { t.Depth = d.Depth } if t.shardIDSet { - if d.ShardID != t.shardID { - return fmt.Errorf("shard ID mismatch: on-disk shard ID=%s, configured shard ID=%s", d.ShardID, t.shardID) + if d.ShardID != t.shardID.String() { + return fmt.Errorf("shard ID mismatch: on-disk shard ID=%s, configured shard ID=%s", d.ShardID, t.shardID.String()) } } else { - t.shardID = d.ShardID - t.shardIDSet = true + if d.ShardID == "" { + t.shardID = common.ID{} + t.shardIDSet = false + return nil + } + id, err := common.DecodeIDString(d.ShardID) + if err != nil { + return fmt.Errorf("invalid shard ID %q in descriptor: %w", d.ShardID, err) + } + t.shardID = id + t.shardIDSet = !id.IsZero() } return nil } @@ -131,15 +145,18 @@ func (t *FSTree) migrateDescriptorFrom1Version(d *fsDescriptor, descPath string) t.Depth = d.Depth } - if !t.shardIDSet { - t.shardID = d.ShardID - t.shardIDSet = true + if !t.shardIDSet && d.ShardID != "" { + id, err := common.DecodeIDString(d.ShardID) + if err == nil { + t.shardID = id + t.shardIDSet = !id.IsZero() + } } if !t.readOnly { d.Version = currentVersion // update shard ID - d.ShardID = t.shardID + d.ShardID = t.shardID.String() data, err := json.Marshal(d) if err != nil { return fmt.Errorf("encode descriptor to JSON during migration: %w", err) diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index 0399c56406..b1cd3d58d0 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -38,7 +38,7 @@ type FSTree struct { shardIDSet bool noSync bool readOnly bool - shardID string + shardID common.ID combinedCountLimit int combinedSizeLimit int @@ -572,12 +572,12 @@ func (t *FSTree) Path() string { } // ShardID returns the shard ID associated with this FSTree. -func (t *FSTree) ShardID() string { +func (t *FSTree) ShardID() common.ID { if !t.shardIDSet { descPath := t.descriptorPath() f, err := os.Open(descPath) if err != nil { - return "" + return common.ID{} } defer f.Close() @@ -585,16 +585,25 @@ func (t *FSTree) ShardID() string { dec := json.NewDecoder(f) dec.DisallowUnknownFields() if err = dec.Decode(&d); err != nil { - return "" + return common.ID{} } - return d.ShardID + id, err := common.DecodeIDString(d.ShardID) + if err != nil { + return common.ID{} + } + return id } return t.shardID } // SetShardID sets the shard ID to be written to the on-disk descriptor. // Must be called after the shard ID was generated and before Init(). -func (t *FSTree) SetShardID(id string) { +func (t *FSTree) SetShardID(id common.ID) { + if id.IsZero() { + t.shardID = common.ID{} + t.shardIDSet = false + return + } t.shardID = id t.shardIDSet = true } diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_descriptor_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_descriptor_test.go index 2b46aadec3..bfedb9b40e 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_descriptor_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_descriptor_test.go @@ -5,6 +5,7 @@ import ( "path/filepath" "testing" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/stretchr/testify/require" ) @@ -15,20 +16,22 @@ func TestFSTreeDescriptor_CreateAndValidate(t *testing.T) { WithPath(dir), WithDepth(2), ) - fs1.SetShardID("shard1") + id1, err := common.NewID() + require.NoError(t, err) + fs1.SetShardID(id1) require.NoError(t, fs1.Init()) desc := filepath.Join(dir, ".fstree.json") b, err := os.ReadFile(desc) require.NoError(t, err) - require.JSONEq(t, `{"version": 2,"depth": 2,"shard_id": "shard1"}`, string(b)) + require.JSONEq(t, `{"version": 2,"depth": 2,"shard_id": "`+id1.String()+`"}`, string(b)) t.Run("same config", func(t *testing.T) { fs := New( WithPath(dir), WithDepth(2), ) - fs.SetShardID("shard1") + fs.SetShardID(id1) err = fs.Init() require.NoError(t, err) }) @@ -38,7 +41,7 @@ func TestFSTreeDescriptor_CreateAndValidate(t *testing.T) { WithPath(dir), WithDepth(3), // mismatch ) - fs.SetShardID("shard1") + fs.SetShardID(id1) err = fs.Init() require.EqualError(t, err, "layout mismatch: on-disk depth=2, configured depth=3") }) @@ -48,20 +51,22 @@ func TestFSTreeDescriptor_CreateAndValidate(t *testing.T) { WithPath(dir), WithDepth(2), ) - fs.SetShardID("shard2") // mismatch + id2, err := common.NewID() + require.NoError(t, err) + fs.SetShardID(id2) // mismatch err = fs.Init() - require.EqualError(t, err, "shard ID mismatch: on-disk shard ID=shard1, configured shard ID=shard2") + require.EqualError(t, err, "shard ID mismatch: on-disk shard ID="+id1.String()+", configured shard ID="+id2.String()) }) t.Run("version mismatch", func(t *testing.T) { - data := []byte(`{"version":3,"depth":2,"shard_id":"shard1"}`) // version mismatch + data := []byte(`{"version":3,"depth":2,"shard_id":"` + id1.String() + `"}`) // version mismatch require.NoError(t, os.WriteFile(desc, data, 0o600)) fs := New( WithPath(dir), WithDepth(2), ) - fs.SetShardID("shard1") + fs.SetShardID(id1) err = fs.Init() require.EqualError(t, err, "unsupported layout version: 3 (current version: 2)") }) @@ -73,20 +78,20 @@ func TestFSTreeDescriptor_CreateAndValidate(t *testing.T) { WithPath(dir), WithDepth(2), ) - fs.SetShardID("shard1") + fs.SetShardID(id1) err = fs.Init() require.ErrorContains(t, err, "decode descriptor from JSON:") }) t.Run("unknown fields", func(t *testing.T) { - data := []byte(`{"version":1,"depth":2,"shard_id":"shard1","extra":42}`) + data := []byte(`{"version":1,"depth":2,"shard_id":"` + id1.String() + `","extra":42}`) require.NoError(t, os.WriteFile(desc, data, 0o600)) fs := New( WithPath(dir), WithDepth(2), ) - fs.SetShardID("shard1") + fs.SetShardID(id1) err = fs.Init() require.ErrorContains(t, err, "decode descriptor from JSON:") require.ErrorContains(t, err, "unknown field \"extra\"") @@ -94,38 +99,80 @@ func TestFSTreeDescriptor_CreateAndValidate(t *testing.T) { } func TestFSTreeDescriptor_MigrationFrom1Version(t *testing.T) { - dir := t.TempDir() - desc := filepath.Join(dir, ".fstree.json") - - // old path-based ShardID - data := []byte(`{"version":1,"depth":2,"shard_id":"/storage/fstree1"}`) - require.NoError(t, os.WriteFile(desc, data, 0o600)) - - fs := New( - WithPath(dir), - WithDepth(2), - ) - fs.SetShardID("YZfSQhkAhFjXyyGReAEuTU") - require.NoError(t, fs.Init()) - require.NoError(t, fs.Close()) - - b, err := os.ReadFile(desc) + id1, err := common.NewID() require.NoError(t, err) - require.JSONEq(t, `{"version":2,"depth":2,"shard_id":"YZfSQhkAhFjXyyGReAEuTU"}`, string(b)) - fs2 := New( - WithPath(dir), - WithDepth(2), - ) - fs2.SetShardID("YZfSQhkAhFjXyyGReAEuTU") - require.NoError(t, fs2.Init()) - require.NoError(t, fs2.Close()) + id2, err := common.NewID() + require.NoError(t, err) - fs3 := New( - WithPath(dir), - WithDepth(2), - ) - fs3.SetShardID("DifferentShardID") - err = fs3.Init() - require.EqualError(t, err, "shard ID mismatch: on-disk shard ID=YZfSQhkAhFjXyyGReAEuTU, configured shard ID=DifferentShardID") + tests := []struct { + name string + initialShardID string + configuredShardID common.ID + expectedShardID string + checkMismatch bool + }{ + { + name: "path-based configured ID", + initialShardID: "/storage/fstree1", + configuredShardID: id1, + expectedShardID: id1.String(), + checkMismatch: true, + }, + { + name: "path-based without configured ID", + initialShardID: "/storage/fstree1", + expectedShardID: "", + }, + { + name: "empty shard ID configured ID", + initialShardID: "", + configuredShardID: id1, + expectedShardID: id1.String(), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + dir := t.TempDir() + desc := filepath.Join(dir, ".fstree.json") + + data := []byte(`{"version":1,"depth":2,"shard_id":"` + tc.initialShardID + `"}`) + require.NoError(t, os.WriteFile(desc, data, 0o600)) + + fs := New( + WithPath(dir), + WithDepth(2), + ) + if !tc.configuredShardID.IsZero() { + fs.SetShardID(tc.configuredShardID) + } + require.NoError(t, fs.Init()) + require.NoError(t, fs.Close()) + + b, err := os.ReadFile(desc) + require.NoError(t, err) + require.JSONEq(t, `{"version":2,"depth":2,"shard_id":"`+tc.expectedShardID+`"}`, string(b)) + + if !tc.configuredShardID.IsZero() { + fs2 := New( + WithPath(dir), + WithDepth(2), + ) + fs2.SetShardID(tc.configuredShardID) + require.NoError(t, fs2.Init()) + require.NoError(t, fs2.Close()) + } + + if tc.checkMismatch { + fs3 := New( + WithPath(dir), + WithDepth(2), + ) + fs3.SetShardID(id2) + err = fs3.Init() + require.EqualError(t, err, "shard ID mismatch: on-disk shard ID="+tc.expectedShardID+", configured shard ID="+id2.String()) + } + }) + } } diff --git a/pkg/local_object_storage/engine/dump.go b/pkg/local_object_storage/engine/dump.go index e3d00b4856..e5f72598d8 100644 --- a/pkg/local_object_storage/engine/dump.go +++ b/pkg/local_object_storage/engine/dump.go @@ -3,13 +3,13 @@ package engine import ( "io" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" ) // DumpShard dumps objects from the shard with provided identifier. // // Returns an error if shard is not read-only. -func (e *StorageEngine) DumpShard(id *shard.ID, w io.Writer, ignoreErrors bool) error { +func (e *StorageEngine) DumpShard(id common.ID, w io.Writer, ignoreErrors bool) error { e.mtx.RLock() defer e.mtx.RUnlock() diff --git a/pkg/local_object_storage/engine/ec_test.go b/pkg/local_object_storage/engine/ec_test.go index 77f38c2a93..4a475c9a9a 100644 --- a/pkg/local_object_storage/engine/ec_test.go +++ b/pkg/local_object_storage/engine/ec_test.go @@ -12,11 +12,11 @@ import ( "testing/synctest" "time" - "github.com/mr-tron/base58" iec "github.com/nspcc-dev/neofs-node/internal/ec" ierrors "github.com/nspcc-dev/neofs-node/internal/errors" "github.com/nspcc-dev/neofs-node/internal/testutil" objectcore "github.com/nspcc-dev/neofs-node/pkg/core/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-sdk-go/client" @@ -30,6 +30,14 @@ import ( "go.uber.org/zap" ) +func testShardIDString(n int) string { + id, err := common.NewIDFromBytes(fmt.Appendf(nil, "%016d", n)) + if err != nil { + panic(err) + } + return id.String() +} + func TestStorageEngine_GetECPart(t *testing.T) { cnr := cidtest.ID() parentID := oidtest.ID() @@ -231,7 +239,7 @@ func TestStorageEngine_GetECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("0")), + "shardID": testShardIDString(0), "error": "some shard error", }, }) @@ -255,7 +263,7 @@ func TestStorageEngine_GetECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("0")), + "shardID": testShardIDString(0), "error": "some shard error", }, }) @@ -279,7 +287,7 @@ func TestStorageEngine_GetECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("0")), + "shardID": testShardIDString(0), "error": "some shard error", }, }) @@ -303,7 +311,7 @@ func TestStorageEngine_GetECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("0")), + "shardID": testShardIDString(0), "error": "some shard error", }, }) @@ -336,7 +344,7 @@ func TestStorageEngine_GetECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some error: " + partID.String(), }, }) @@ -365,7 +373,7 @@ func TestStorageEngine_GetECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some error: " + partID.String(), }, }) @@ -399,7 +407,7 @@ func TestStorageEngine_GetECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some error: " + partID.String(), }, }, { @@ -411,7 +419,7 @@ func TestStorageEngine_GetECPart(t *testing.T) { "ecRule": json.Number("123"), "partIdx": json.Number("456"), "partID": partID.String(), - "shardID": base58.Encode([]byte("2")), + "shardID": testShardIDString(2), "error": "some shard error", }, }}) @@ -697,7 +705,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some shard error", }, }) @@ -719,7 +727,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some shard error", }, }) @@ -741,7 +749,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some shard error", }, }) @@ -763,7 +771,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some shard error", }, }) @@ -795,7 +803,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { "ecRule": json.Number("123"), "partIdx": json.Number("456"), "partID": partID.String(), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some error: " + partID.String(), }, }) @@ -823,7 +831,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { "ecRule": json.Number("123"), "partIdx": json.Number("456"), "partID": partID.String(), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some error: " + partID.String(), }, }) @@ -856,7 +864,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { "ecRule": json.Number("123"), "partIdx": json.Number("456"), "partID": partID.String(), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some error: " + partID.String(), }, }, { @@ -868,7 +876,7 @@ func TestStorageEngine_GetECPartRange(t *testing.T) { "ecRule": json.Number("123"), "partIdx": json.Number("456"), "partID": partID.String(), - "shardID": base58.Encode([]byte("2")), + "shardID": testShardIDString(2), "error": "some shard error", }, }}) @@ -1143,7 +1151,7 @@ func TestStorageEngine_HeadECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("0")), + "shardID": testShardIDString(0), "error": "some shard error", }, }) @@ -1167,7 +1175,7 @@ func TestStorageEngine_HeadECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("0")), + "shardID": testShardIDString(0), "error": "some shard error", }, }) @@ -1191,7 +1199,7 @@ func TestStorageEngine_HeadECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("0")), + "shardID": testShardIDString(0), "error": "some shard error", }, }) @@ -1215,7 +1223,7 @@ func TestStorageEngine_HeadECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("0")), + "shardID": testShardIDString(0), "error": "some shard error", }, }) @@ -1248,7 +1256,7 @@ func TestStorageEngine_HeadECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some error: " + partID.String(), }, }) @@ -1277,7 +1285,7 @@ func TestStorageEngine_HeadECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some error: " + partID.String(), }, }) @@ -1311,7 +1319,7 @@ func TestStorageEngine_HeadECPart(t *testing.T) { "parent": parentID.String(), "ecRule": json.Number("123"), "partIdx": json.Number("456"), - "shardID": base58.Encode([]byte("1")), + "shardID": testShardIDString(1), "error": "some error: " + partID.String(), }, }, { @@ -1323,7 +1331,7 @@ func TestStorageEngine_HeadECPart(t *testing.T) { "ecRule": json.Number("123"), "partIdx": json.Number("456"), "partID": partID.String(), - "shardID": base58.Encode([]byte("2")), + "shardID": testShardIDString(2), "error": "some shard error", }, }}) diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index a308c21572..adab654e03 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -9,6 +9,7 @@ import ( iec "github.com/nspcc-dev/neofs-node/internal/ec" "github.com/nspcc-dev/neofs-node/pkg/core/container" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" @@ -41,7 +42,7 @@ type StorageEngine struct { // interface of [shard.Shard] used by [StorageEngine] for overriding in tests. type shardInterface interface { - ID() *shard.ID + ID() common.ID GetStream(oid.Address, bool) (*object.Object, io.ReadCloser, error) ReadObject(oid.Address, bool, []byte) (int, io.ReadCloser, error) GetRangeStream(cnr cid.ID, id oid.ID, off, ln int64) (uint64, io.ReadCloser, error) diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index af85990e15..09f1e25e69 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -7,7 +7,6 @@ import ( "fmt" "io" "path/filepath" - "strconv" "sync/atomic" "testing" "time" @@ -199,7 +198,7 @@ func newEngineWithFixedShardOrder(ss []shardInterface) *StorageEngine { type unimplementedShard struct{} -func (unimplementedShard) ID() *shard.ID { +func (unimplementedShard) ID() common.ID { panic("unimplemented") } @@ -314,9 +313,12 @@ type mockShard struct { headECPart map[headECPartKey]headECPartValue } -func (x *mockShard) ID() *shard.ID { - si := strconv.Itoa(x.i) - return shard.NewIDFromBytes([]byte(si)) +func (x *mockShard) ID() common.ID { + id, err := common.NewIDFromBytes(fmt.Appendf(nil, "%016d", x.i)) + if err != nil { + panic(err) + } + return id } func (x *mockShard) GetStream(addr oid.Address, skipMeta bool) (*object.Object, io.ReadCloser, error) { diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index 3fd9d22c1c..201ad7f967 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" @@ -20,14 +21,14 @@ import ( const errSmallSize = 256 -func newEngine(t testing.TB, dir string, opts ...Option) (*StorageEngine, string, [2]*shard.ID) { +func newEngine(t testing.TB, dir string, opts ...Option) (*StorageEngine, string, [2]common.ID) { if dir == "" { dir = t.TempDir() } e := New(append([]Option{WithShardPoolSize(1)}, opts...)...) - var ids [2]*shard.ID + var ids [2]common.ID var err error for i := range ids { @@ -48,7 +49,7 @@ func newEngine(t testing.TB, dir string, opts ...Option) (*StorageEngine, string return e, dir, ids } -func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]*shard.ID) { +func newEngineWithErrorThreshold(t testing.TB, dir string, errThreshold uint32) (*StorageEngine, string, [2]common.ID) { return newEngine(t, dir, WithLogger(zaptest.NewLogger(t)), WithErrorThreshold(errThreshold)) } @@ -181,7 +182,7 @@ func TestBlobstorFailback(t *testing.T) { checkShardState(t, e, id[1], 0, mode.ReadWrite) } -func checkShardState(t *testing.T, e *StorageEngine, id *shard.ID, errCount uint32, mode mode.Mode) { +func checkShardState(t *testing.T, e *StorageEngine, id common.ID, errCount uint32, mode mode.Mode) { e.mtx.RLock() sh := e.shards[id.String()] e.mtx.RUnlock() diff --git a/pkg/local_object_storage/engine/evacuate.go b/pkg/local_object_storage/engine/evacuate.go index 0c0c43cde5..fe1229716d 100644 --- a/pkg/local_object_storage/engine/evacuate.go +++ b/pkg/local_object_storage/engine/evacuate.go @@ -8,6 +8,7 @@ import ( "github.com/nspcc-dev/hrw/v2" iec "github.com/nspcc-dev/neofs-node/internal/ec" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -27,7 +28,7 @@ var errMustHaveTwoShards = errors.New("must have at least 1 spare shard") // (if provided, fails otherwise) which can return its own error to abort // evacuation (or nil to continue). Returns the number of evacuated objects // (which can be non-zero even in case of error). -func (e *StorageEngine) Evacuate(shardIDs []*shard.ID, ignoreErrors bool, faultHandler func(oid.Address, *object.Object) error) (int, error) { +func (e *StorageEngine) Evacuate(shardIDs []common.ID, ignoreErrors bool, faultHandler func(oid.Address, *object.Object) error) (int, error) { sidList := make([]string, len(shardIDs)) for i := range shardIDs { sidList[i] = shardIDs[i].String() diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 0e8824bc9b..9d49eb4f3a 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -8,6 +8,7 @@ import ( "testing" "time" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" @@ -19,14 +20,14 @@ import ( "go.uber.org/zap/zaptest" ) -func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []*shard.ID, []*object.Object) { +func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEngine, []common.ID, []*object.Object) { var ( dir = t.TempDir() e = New( WithLogger(zaptest.NewLogger(t)), WithShardPoolSize(uint32(objPerShard))) err error - ids = make([]*shard.ID, shardNum) + ids = make([]common.ID, shardNum) ) for i := range ids { diff --git a/pkg/local_object_storage/engine/restore.go b/pkg/local_object_storage/engine/restore.go index 9d4f331dcb..fed3a99180 100644 --- a/pkg/local_object_storage/engine/restore.go +++ b/pkg/local_object_storage/engine/restore.go @@ -3,13 +3,13 @@ package engine import ( "io" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" ) // RestoreShard restores objects from dump to the shard with provided identifier. // // Returns an error if shard is not read-only. -func (e *StorageEngine) RestoreShard(id *shard.ID, r io.Reader, ignoreErrors bool) error { +func (e *StorageEngine) RestoreShard(id common.ID, r io.Reader, ignoreErrors bool) error { e.mtx.RLock() defer e.mtx.RUnlock() diff --git a/pkg/local_object_storage/engine/revive.go b/pkg/local_object_storage/engine/revive.go index 0f88f66252..24adc20216 100644 --- a/pkg/local_object_storage/engine/revive.go +++ b/pkg/local_object_storage/engine/revive.go @@ -24,7 +24,7 @@ func (e *StorageEngine) ReviveObject(address oid.Address) (ReviveStatus, error) for _, sh := range e.unsortedShards() { reviveStatus, err := sh.ReviveObject(address) - id := *sh.ID() + id := sh.ID() res.Shards = append(res.Shards, ReviveShardStatus{ ID: id.String(), Status: reviveStatus, diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index d19e146ddf..f43c118185 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -7,8 +7,8 @@ import ( "slices" "sync/atomic" - "github.com/google/uuid" "github.com/nspcc-dev/hrw/v2" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" @@ -61,15 +61,15 @@ func (m *metricsWithID) AddToPayloadSize(size int64) { // // Returns any error encountered that did not allow adding a shard. // Otherwise returns the ID of the added shard. -func (e *StorageEngine) AddShard(opts ...shard.Option) (*shard.ID, error) { +func (e *StorageEngine) AddShard(opts ...shard.Option) (common.ID, error) { sh, err := e.createShard(opts) if err != nil { - return nil, fmt.Errorf("could not create a shard: %w", err) + return common.ID{}, fmt.Errorf("could not create a shard: %w", err) } err = e.addShard(sh) if err != nil { - return nil, fmt.Errorf("could not add %s shard: %w", sh.ID().String(), err) + return common.ID{}, fmt.Errorf("could not add %s shard: %w", sh.ID().String(), err) } if e.metrics != nil { @@ -172,18 +172,8 @@ func (e *StorageEngine) removeShards(ids ...string) { } } -func generateShardID() (*shard.ID, error) { - uid, err := uuid.NewRandom() - if err != nil { - return nil, err - } - - bin, err := uid.MarshalBinary() - if err != nil { - return nil, err - } - - return shard.NewIDFromBytes(bin), nil +func generateShardID() (common.ID, error) { + return common.NewID() } func (e *StorageEngine) sortedShards(id oid.ID) []shardWrapper { @@ -215,7 +205,7 @@ func (e *StorageEngine) getShard(id string) shardWrapper { // SetShardMode sets mode of the shard with provided identifier. // // Returns an error if shard mode was not set, or shard was not found in storage engine. -func (e *StorageEngine) SetShardMode(id *shard.ID, m mode.Mode, resetErrorCounter bool) error { +func (e *StorageEngine) SetShardMode(id common.ID, m mode.Mode, resetErrorCounter bool) error { e.mtx.RLock() defer e.mtx.RUnlock() @@ -248,7 +238,7 @@ func (e *StorageEngine) HandleNewEpoch(epoch uint64) { } func (s shardWrapper) Hash() uint64 { - return binary.BigEndian.Uint64(*s.ID()) + return s.ID().Hash() } type hrwOIDWrapper oid.ID diff --git a/pkg/local_object_storage/engine/status.go b/pkg/local_object_storage/engine/status.go index 906b0969a5..53889f56f1 100644 --- a/pkg/local_object_storage/engine/status.go +++ b/pkg/local_object_storage/engine/status.go @@ -22,7 +22,7 @@ func (e *StorageEngine) ObjectStatus(address oid.Address) (ObjectStatus, error) for _, sh := range e.sortedShards(address.Object()) { shardStatus, err := sh.ObjectStatus(address) - id := *sh.ID() + id := sh.ID() if err != nil { return res, err } diff --git a/pkg/local_object_storage/engine/writecache.go b/pkg/local_object_storage/engine/writecache.go index d6d1f6d393..3ce2546b56 100644 --- a/pkg/local_object_storage/engine/writecache.go +++ b/pkg/local_object_storage/engine/writecache.go @@ -3,11 +3,11 @@ package engine import ( "fmt" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" ) // FlushWriteCache flushes write-cache on a single shard with the given ID. -func (e *StorageEngine) FlushWriteCache(id *shard.ID) error { +func (e *StorageEngine) FlushWriteCache(id common.ID) error { e.mtx.RLock() sh, ok := e.shards[id.String()] e.mtx.RUnlock() diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index f9efbb7926..d0a58e3686 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -6,7 +6,6 @@ import ( "path/filepath" "slices" - "github.com/mr-tron/base58" "github.com/nspcc-dev/bbolt" bolterrors "github.com/nspcc-dev/bbolt/errors" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" @@ -232,13 +231,9 @@ func (db *DB) ResyncFromBlobstor(bs common.Storage, onIterationError func(oid.Ad return fmt.Errorf("could not reset metabase: %w", err) } - strBlobstorShardID := bs.ShardID() - if strBlobstorShardID != "" { - blobstorShardID, err := base58.Decode(strBlobstorShardID) - if err != nil { - return fmt.Errorf("invalid blobstor shard ID %q: %w", strBlobstorShardID, err) - } - err = db.WriteShardID(blobstorShardID) + blobstorShardID := bs.ShardID() + if !blobstorShardID.IsZero() { + err = db.WriteShardID(blobstorShardID.Bytes()) if err != nil { return fmt.Errorf("could not write shard ID: %w", err) } diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index 3c0b858880..780eefd72e 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" @@ -124,13 +125,14 @@ func TestResyncMetabase(t *testing.T) { defer os.RemoveAll(p) - shID := ID("test") + shID, err := common.NewID() + require.NoError(t, err) sh := New( WithBlobstor(fstree.New( fstree.WithPath(filepath.Join(p, "fstree")), fstree.WithDepth(1)), ), - WithID(&shID), + WithID(shID), WithLogger(zaptest.NewLogger(t)), WithMetaBaseOptions( meta.WithPath(filepath.Join(p, "meta")), @@ -189,11 +191,11 @@ func TestResyncMetabase(t *testing.T) { tombedAddress := oid.NewAddress(tombObj.GetContainerID(), tombedID) for _, v := range mObjs { - err := sh.Put(v.obj, nil) + err = sh.Put(v.obj, nil) require.NoError(t, err) } - err := sh.Put(&tombObj, nil) + err = sh.Put(&tombObj, nil) require.NoError(t, err) // LOCK object handling @@ -269,7 +271,7 @@ func TestResyncMetabase(t *testing.T) { fstree.WithPath(filepath.Join(p, "fstree")), fstree.WithDepth(1)), ), - WithID(&shID), + WithID(shID), WithLogger(zaptest.NewLogger(t)), WithMetaBaseOptions( meta.WithPath(filepath.Join(p, "meta_restored")), diff --git a/pkg/local_object_storage/shard/id.go b/pkg/local_object_storage/shard/id.go index 68707fa4b3..caa04aa140 100644 --- a/pkg/local_object_storage/shard/id.go +++ b/pkg/local_object_storage/shard/id.go @@ -1,27 +1,12 @@ package shard import ( - "github.com/mr-tron/base58" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "go.uber.org/zap" ) -// ID represents Shard identifier. -// -// Each shard should have the unique ID within -// a single instance of local storage. -type ID []byte - -// NewIDFromBytes constructs ID from byte slice. -func NewIDFromBytes(v []byte) *ID { - return (*ID)(&v) -} - -func (id ID) String() string { - return base58.Encode(id) -} - // ID returns Shard identifier. -func (s *Shard) ID() *ID { +func (s *Shard) ID() common.ID { return s.info.ID } @@ -41,20 +26,18 @@ func (s *Shard) UpdateID() (err error) { return err } if len(id) != 0 { - s.info.ID = NewIDFromBytes(id) + s.info.ID, err = common.NewIDFromBytes(id) + if err != nil { + return err + } if s.metricsWriter != nil { s.metricsWriter.SetShardID(s.info.ID.String()) } } else { blobShardID := s.blobStor.ShardID() - if blobShardID != "" { - var bBlobShardID []byte - bBlobShardID, err = base58.Decode(blobShardID) - if err != nil { - return err - } - s.info.ID = NewIDFromBytes(bBlobShardID) + if !blobShardID.IsZero() { + s.info.ID = blobShardID if s.metricsWriter != nil { s.metricsWriter.SetShardID(s.info.ID.String()) @@ -70,7 +53,7 @@ func (s *Shard) UpdateID() (err error) { s.gcCfg.log = s.gcCfg.log.With(zap.String("shard_id", sID)) s.metaBase.SetLogger(l) s.blobStor.SetLogger(l) - s.blobStor.SetShardID(sID) + s.blobStor.SetShardID(s.info.ID) if s.hasWriteCache() { s.writeCache.SetLogger(l) s.writeCache.SetShardIDMetrics(sID) @@ -79,5 +62,5 @@ func (s *Shard) UpdateID() (err error) { if len(id) != 0 { return nil } - return s.metaBase.WriteShardID(*s.info.ID) + return s.metaBase.WriteShardID(s.info.ID.Bytes()) } diff --git a/pkg/local_object_storage/shard/info.go b/pkg/local_object_storage/shard/info.go index 0e7b92e479..e2c8749bc0 100644 --- a/pkg/local_object_storage/shard/info.go +++ b/pkg/local_object_storage/shard/info.go @@ -1,6 +1,7 @@ package shard import ( + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/writecache" @@ -9,7 +10,7 @@ import ( // Info groups the information about Shard. type Info struct { // Identifier of the shard. - ID *ID + ID common.ID // Shard mode. Mode mode.Mode diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index fed00b6522..51052763c9 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -156,7 +156,7 @@ func New(opts ...Option) *Shard { } // WithID returns option to set the default shard identifier. -func WithID(id *ID) Option { +func WithID(id common.ID) Option { return func(c *cfg) { c.info.ID = id } diff --git a/pkg/local_object_storage/shard/shard_internal_test.go b/pkg/local_object_storage/shard/shard_internal_test.go index 50a5beaff2..b2b83324b1 100644 --- a/pkg/local_object_storage/shard/shard_internal_test.go +++ b/pkg/local_object_storage/shard/shard_internal_test.go @@ -263,7 +263,7 @@ func (unimplementedBLOBStore) Path() string { panic("unimplemented") } -func (unimplementedBLOBStore) ShardID() string { +func (unimplementedBLOBStore) ShardID() common.ID { panic("unimplemented") } @@ -275,7 +275,7 @@ func (unimplementedBLOBStore) SetCompressor(*compression.Config) { panic("unimplemented") } -func (unimplementedBLOBStore) SetShardID(string) { +func (unimplementedBLOBStore) SetShardID(common.ID) { panic("unimplemented") } diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index 2ac47c7cb8..5d51d899c1 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" @@ -55,8 +56,14 @@ func _newShardWithFSTree(t testing.TB, rootPath string, enableWriteCache bool, w fstree.WithPath(filepath.Join(rootPath, "fstree")), ) + newShardID := func() common.ID { + id, err := common.NewID() + require.NoError(t, err) + return id + } + opts := append([]shard.Option{ - shard.WithID(shard.NewIDFromBytes([]byte("testShard"))), + shard.WithID(newShardID()), shard.WithLogger(zap.L()), shard.WithMetaBaseOptions( meta.WithPath(filepath.Join(rootPath, "meta")), diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index 1107d94451..f156164e70 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -4,6 +4,7 @@ import ( "fmt" "os" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "github.com/nspcc-dev/neofs-node/pkg/util" ) @@ -21,7 +22,14 @@ func (c *cache) openStore(readOnly bool) error { fstree.WithNoSync(c.noSync), fstree.WithCombinedCountLimit(1)) c.fsTree.SetLogger(c.log) - c.fsTree.SetShardID(c.metrics.id) + var id common.ID + if c.metrics.id != "" { + id, err = common.DecodeIDString(c.metrics.id) + if err != nil { + return fmt.Errorf("decode shard ID: %w", err) + } + } + c.fsTree.SetShardID(id) if err := c.fsTree.Open(readOnly); err != nil { return fmt.Errorf("could not open FSTree: %w", err) } diff --git a/pkg/services/control/server/dump.go b/pkg/services/control/server/dump.go index 41c69e6b14..0ff0816925 100644 --- a/pkg/services/control/server/dump.go +++ b/pkg/services/control/server/dump.go @@ -5,7 +5,7 @@ import ( "fmt" "os" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/services/control" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -23,7 +23,10 @@ func (s *Server) DumpShard(_ context.Context, req *control.DumpShardRequest) (*c return nil, err } - shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID()) + shardID, err := common.NewIDFromBytes(req.GetBody().GetShard_ID()) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } f, err := os.Create(req.GetBody().GetFilepath()) if err != nil { diff --git a/pkg/services/control/server/evacuate.go b/pkg/services/control/server/evacuate.go index 36cd43361c..9b711442e2 100644 --- a/pkg/services/control/server/evacuate.go +++ b/pkg/services/control/server/evacuate.go @@ -31,7 +31,12 @@ func (s *Server) EvacuateShard(_ context.Context, req *control.EvacuateShardRequ return nil, err } - count, err := s.storage.Evacuate(s.getShardIDList(req.GetBody().GetShard_ID()), req.GetBody().GetIgnoreErrors(), s.replicate) + shardIDs, err := s.getShardIDList(req.GetBody().GetShard_ID()) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + count, err := s.storage.Evacuate(shardIDs, req.GetBody().GetIgnoreErrors(), s.replicate) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } diff --git a/pkg/services/control/server/flush_cache.go b/pkg/services/control/server/flush_cache.go index a6305deefc..bee24424f1 100644 --- a/pkg/services/control/server/flush_cache.go +++ b/pkg/services/control/server/flush_cache.go @@ -20,7 +20,12 @@ func (s *Server) FlushCache(_ context.Context, req *control.FlushCacheRequest) ( return nil, err } - for _, shardID := range s.getShardIDList(req.GetBody().GetShard_ID()) { + shardIDs, err := s.getShardIDList(req.GetBody().GetShard_ID()) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + for _, shardID := range shardIDs { err = s.storage.FlushWriteCache(shardID) if err != nil { return nil, status.Error(codes.Internal, err.Error()) diff --git a/pkg/services/control/server/helpers.go b/pkg/services/control/server/helpers.go index 930ffd4d48..40b4d6e841 100644 --- a/pkg/services/control/server/helpers.go +++ b/pkg/services/control/server/helpers.go @@ -1,27 +1,36 @@ package control import ( - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "fmt" + + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) // call only if `ready` returned no error. -func (s *Server) getShardIDList(raw [][]byte) []*shard.ID { +func (s *Server) getShardIDList(raw [][]byte) ([]common.ID, error) { if len(raw) != 0 { - res := make([]*shard.ID, 0, len(raw)) + res := make([]common.ID, 0, len(raw)) for i := range raw { - res = append(res, shard.NewIDFromBytes(raw[i])) + if len(raw[i]) == 0 { + return nil, fmt.Errorf("invalid shard ID #%d: empty shard ID", i) + } + id, err := common.NewIDFromBytes(raw[i]) + if err != nil { + return nil, fmt.Errorf("invalid shard ID #%d: %w", i, err) + } + res = append(res, id) } - return res + return res, nil } info := s.storage.DumpInfo() - res := make([]*shard.ID, 0, len(info.Shards)) + res := make([]common.ID, 0, len(info.Shards)) for i := range info.Shards { res = append(res, info.Shards[i].ID) } - return res + return res, nil } func (s *Server) ready() error { diff --git a/pkg/services/control/server/list_shards.go b/pkg/services/control/server/list_shards.go index 8ea2ce7efe..c86c0c467d 100644 --- a/pkg/services/control/server/list_shards.go +++ b/pkg/services/control/server/list_shards.go @@ -28,7 +28,7 @@ func (s *Server) ListShards(_ context.Context, req *control.ListShardsRequest) ( for _, sh := range info.Shards { si := new(control.ShardInfo) - si.SetID(*sh.ID) + si.SetID(sh.ID.Bytes()) si.SetMetabasePath(sh.MetaBaseInfo.Path) si.Blobstor = &control.BlobstorInfo{ Path: sh.BlobStorInfo.Path, diff --git a/pkg/services/control/server/restore.go b/pkg/services/control/server/restore.go index 82920b8182..70bf4fa250 100644 --- a/pkg/services/control/server/restore.go +++ b/pkg/services/control/server/restore.go @@ -4,7 +4,7 @@ import ( "context" "os" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/services/control" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -22,7 +22,10 @@ func (s *Server) RestoreShard(_ context.Context, req *control.RestoreShardReques return nil, err } - shardID := shard.NewIDFromBytes(req.GetBody().GetShard_ID()) + shardID, err := common.NewIDFromBytes(req.GetBody().GetShard_ID()) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } f, err := os.Open(req.GetBody().GetFilepath()) if err != nil { diff --git a/pkg/services/control/server/set_shard_mode.go b/pkg/services/control/server/set_shard_mode.go index fc7d94bbf3..b1276b9044 100644 --- a/pkg/services/control/server/set_shard_mode.go +++ b/pkg/services/control/server/set_shard_mode.go @@ -42,7 +42,12 @@ func (s *Server) SetShardMode(_ context.Context, req *control.SetShardModeReques return nil, status.Error(codes.Internal, fmt.Sprintf("unknown shard mode: %s", requestedMode)) } - for _, shardID := range s.getShardIDList(req.Body.GetShard_ID()) { + shardIDs, err := s.getShardIDList(req.Body.GetShard_ID()) + if err != nil { + return nil, status.Error(codes.InvalidArgument, err.Error()) + } + + for _, shardID := range shardIDs { err = s.storage.SetShardMode(shardID, m, req.Body.GetResetErrorCounter()) if err != nil { return nil, status.Error(codes.Internal, err.Error()) From 4c4a082e7d88b36973eb28c5b846d83c8dcfb593 Mon Sep 17 00:00:00 2001 From: Andrey Butusov Date: Fri, 27 Mar 2026 19:43:24 +0300 Subject: [PATCH 2/2] storage: move shard ID ownership to fstree Make fstree/blobstor the canonical shard ID source and propagate the resulting ID to metabase and write-cache during shard initialization. Align shard and engine initialization with this flow. Store fstree subtype in the descriptor to distinguish blobstor from write-cache in recovery scenarios, with blobstor as the default subtype. Closes #3859. Signed-off-by: Andrey Butusov --- cmd/neofs-lancet/internal/fstree/get.go | 3 +- cmd/neofs-lancet/internal/fstree/list.go | 3 +- cmd/neofs-lancet/internal/fstree/remove.go | 3 +- cmd/neofs-lancet/internal/meta/put.go | 3 +- cmd/neofs-lancet/internal/meta/remove.go | 3 +- cmd/neofs-lancet/internal/meta/resync.go | 4 +- cmd/neofs-lancet/internal/storage/root.go | 3 - cmd/neofs-lancet/internal/storage/sanity.go | 4 +- cmd/neofs-node/main.go | 1 - cmd/neofs-node/storage.go | 1 + .../blobstor/common/storage.go | 9 +- .../blobstor/common/storage_test.go | 2 +- .../blobstor/fstree/common_test.go | 3 +- .../blobstor/fstree/control.go | 46 ++++++-- .../blobstor/fstree/fstree.go | 24 ++--- .../blobstor/fstree/fstree_descriptor_test.go | 57 +++++----- .../blobstor/fstree/getstream_test.go | 3 +- .../blobstor/fstree/option.go | 14 +++ .../blobstor/internal/storagetest/control.go | 2 +- .../blobstor/internal/storagetest/delete.go | 3 +- .../blobstor/internal/storagetest/exists.go | 3 +- .../blobstor/internal/storagetest/get.go | 3 +- .../internal/storagetest/get_range.go | 3 +- .../blobstor/internal/storagetest/iterate.go | 3 +- .../engine/container_test.go | 2 - pkg/local_object_storage/engine/control.go | 46 ++------ .../engine/control_test.go | 21 ++-- pkg/local_object_storage/engine/engine.go | 2 + .../engine/engine_test.go | 5 - pkg/local_object_storage/engine/error_test.go | 1 - .../engine/evacuate_test.go | 1 - pkg/local_object_storage/engine/gc_test.go | 4 - pkg/local_object_storage/engine/lock_test.go | 3 - pkg/local_object_storage/engine/shards.go | 46 ++++---- .../internal/storagetest/storage.go | 11 +- pkg/local_object_storage/metabase/control.go | 11 +- .../metabase/control_test.go | 3 +- pkg/local_object_storage/metabase/db.go | 5 - pkg/local_object_storage/metabase/db_test.go | 3 +- .../metabase/last_resync_epoch_test.go | 5 +- pkg/local_object_storage/metabase/mode.go | 3 +- pkg/local_object_storage/metabase/shard_id.go | 37 +++++++ .../metabase/version_test.go | 11 +- pkg/local_object_storage/shard/bench_test.go | 4 +- pkg/local_object_storage/shard/control.go | 100 +++++++++--------- .../shard/control_test.go | 10 +- pkg/local_object_storage/shard/ec_test.go | 9 +- pkg/local_object_storage/shard/gc_test.go | 4 +- .../shard/generic_test.go | 4 +- pkg/local_object_storage/shard/id.go | 66 ------------ pkg/local_object_storage/shard/mode.go | 3 +- pkg/local_object_storage/shard/shard.go | 10 +- .../shard/shard_internal_test.go | 21 +--- pkg/local_object_storage/shard/shard_test.go | 8 -- .../writecache/flush_test.go | 18 ++-- .../writecache/generic_test.go | 4 +- .../writecache/storage.go | 12 +-- .../writecache/writecache.go | 25 ++--- .../writecache/writecache_test.go | 3 +- pkg/services/object/server_test.go | 1 - 60 files changed, 332 insertions(+), 393 deletions(-) delete mode 100644 pkg/local_object_storage/shard/id.go diff --git a/cmd/neofs-lancet/internal/fstree/get.go b/cmd/neofs-lancet/internal/fstree/get.go index 37bc46664d..3eb56e4777 100644 --- a/cmd/neofs-lancet/internal/fstree/get.go +++ b/cmd/neofs-lancet/internal/fstree/get.go @@ -4,6 +4,7 @@ import ( "fmt" common "github.com/nspcc-dev/neofs-node/cmd/neofs-lancet/internal" + blobstorcommon "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -30,7 +31,7 @@ func getFunc(cmd *cobra.Command, _ []string) error { } defer fst.Close() - err = fst.Init() + err = fst.Init(blobstorcommon.ID{}) if err != nil { return fmt.Errorf("failed to init FSTree: %w", err) } diff --git a/cmd/neofs-lancet/internal/fstree/list.go b/cmd/neofs-lancet/internal/fstree/list.go index 66d376c7ae..96aac2ea1a 100644 --- a/cmd/neofs-lancet/internal/fstree/list.go +++ b/cmd/neofs-lancet/internal/fstree/list.go @@ -5,6 +5,7 @@ import ( "io" common "github.com/nspcc-dev/neofs-node/cmd/neofs-lancet/internal" + blobstorcommon "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -36,7 +37,7 @@ func listFunc(cmd *cobra.Command, _ []string) error { } defer fst.Close() - err = fst.Init() + err = fst.Init(blobstorcommon.ID{}) if err != nil { return fmt.Errorf("failed to init FSTree: %w", err) } diff --git a/cmd/neofs-lancet/internal/fstree/remove.go b/cmd/neofs-lancet/internal/fstree/remove.go index b624d144bc..3026a74563 100644 --- a/cmd/neofs-lancet/internal/fstree/remove.go +++ b/cmd/neofs-lancet/internal/fstree/remove.go @@ -5,6 +5,7 @@ import ( "fmt" common "github.com/nspcc-dev/neofs-node/cmd/neofs-lancet/internal" + blobstorcommon "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -44,7 +45,7 @@ func removeFunc(cmd *cobra.Command, _ []string) error { } defer fst.Close() - err = fst.Init() + err = fst.Init(blobstorcommon.ID{}) if err != nil { return fmt.Errorf("failed to init FSTree: %w", err) } diff --git a/cmd/neofs-lancet/internal/meta/put.go b/cmd/neofs-lancet/internal/meta/put.go index f9c9763f0e..deadedfb90 100644 --- a/cmd/neofs-lancet/internal/meta/put.go +++ b/cmd/neofs-lancet/internal/meta/put.go @@ -6,6 +6,7 @@ import ( "os" common "github.com/nspcc-dev/neofs-node/cmd/neofs-lancet/internal" + blobstorcommon "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-sdk-go/object" "github.com/spf13/cobra" ) @@ -30,7 +31,7 @@ func writeObject(cmd *cobra.Command, _ []string) error { } defer db.Close() - err = db.Init() + err = db.Init(blobstorcommon.ID{}) if err != nil { return fmt.Errorf("can't init metabase: %w", err) } diff --git a/cmd/neofs-lancet/internal/meta/remove.go b/cmd/neofs-lancet/internal/meta/remove.go index 491a9648c8..5ac206622b 100644 --- a/cmd/neofs-lancet/internal/meta/remove.go +++ b/cmd/neofs-lancet/internal/meta/remove.go @@ -4,6 +4,7 @@ import ( "fmt" common "github.com/nspcc-dev/neofs-node/cmd/neofs-lancet/internal" + blobstorcommon "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/spf13/cobra" ) @@ -43,7 +44,7 @@ func removeFunc(cmd *cobra.Command, _ []string) error { } defer db.Close() - err = db.Init() + err = db.Init(blobstorcommon.ID{}) if err != nil { return fmt.Errorf("can't init metabase: %w", err) } diff --git a/cmd/neofs-lancet/internal/meta/resync.go b/cmd/neofs-lancet/internal/meta/resync.go index 389ca523af..e68e0e738a 100644 --- a/cmd/neofs-lancet/internal/meta/resync.go +++ b/cmd/neofs-lancet/internal/meta/resync.go @@ -51,7 +51,7 @@ func resyncFunc(cmd *cobra.Command, _ []string) error { } defer db.Close() - err = db.Init() + err = db.Init(blobstorcommon.ID{}) if err != nil { return fmt.Errorf("init metabase: %w", err) } @@ -74,7 +74,7 @@ func resyncFunc(cmd *cobra.Command, _ []string) error { return fmt.Errorf("failed to open FSTree: %w", err) } - err = fst.Init() + err = fst.Init(blobstorcommon.ID{}) if err != nil { return fmt.Errorf("init blobstor: %w", err) } diff --git a/cmd/neofs-lancet/internal/storage/root.go b/cmd/neofs-lancet/internal/storage/root.go index 992b92189c..b8ceef0143 100644 --- a/cmd/neofs-lancet/internal/storage/root.go +++ b/cmd/neofs-lancet/internal/storage/root.go @@ -137,9 +137,6 @@ func openEngine(readOnly bool) (*engine.StorageEngine, error) { } } - if err := ls.Open(); err != nil { - return nil, err - } if err := ls.Init(); err != nil { return nil, err } diff --git a/cmd/neofs-lancet/internal/storage/sanity.go b/cmd/neofs-lancet/internal/storage/sanity.go index 524fd458a4..c59786eb2a 100644 --- a/cmd/neofs-lancet/internal/storage/sanity.go +++ b/cmd/neofs-lancet/internal/storage/sanity.go @@ -97,11 +97,11 @@ func sanityCheck(cmd *cobra.Command, _ []string) error { return fmt.Errorf("moving metabase in readonly mode: %w", err) } - if err := sh.m.Init(); err != nil { + if err := sh.m.Init(commonb.ID{}); err != nil { return fmt.Errorf("init metabase: %w", err) } if sh.fsT != nil { - if err := sh.fsT.Init(); err != nil { + if err := sh.fsT.Init(commonb.ID{}); err != nil { return fmt.Errorf("init fstree: %w", err) } } diff --git a/cmd/neofs-node/main.go b/cmd/neofs-node/main.go index 48330e1747..a3227be6fb 100644 --- a/cmd/neofs-node/main.go +++ b/cmd/neofs-node/main.go @@ -127,7 +127,6 @@ func initApp(c *cfg) { initAndLog(c, "container", initContainerService) initAndLog(c, "storage engine", func(c *cfg) { - fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Open()) fatalOnErr(c.cfgObject.cfgLocalStorage.localStorage.Init()) }) diff --git a/cmd/neofs-node/storage.go b/cmd/neofs-node/storage.go index 0d9c858e47..4faff3af39 100644 --- a/cmd/neofs-node/storage.go +++ b/cmd/neofs-node/storage.go @@ -112,6 +112,7 @@ func (c *cfg) shardOpts() []shardOptsWithID { wcMaxBatchCount = sRead.CombinedCountLimit wcMaxBatchThreshold = uint64(sRead.CombinedSizeThreshold) s = fstree.New( + fstree.WithLogger(c.log), fstree.WithPath(sRead.Path), fstree.WithPerm(sRead.Perm), fstree.WithDepth(sRead.Depth), diff --git a/pkg/local_object_storage/blobstor/common/storage.go b/pkg/local_object_storage/blobstor/common/storage.go index 91f238f2a3..ce2564aa8b 100644 --- a/pkg/local_object_storage/blobstor/common/storage.go +++ b/pkg/local_object_storage/blobstor/common/storage.go @@ -7,22 +7,19 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "go.uber.org/zap" ) // Storage represents key-value object storage. // It is used as a building block for a blobstor of a shard. type Storage interface { Open(readOnly bool) error - Init() error + Init(ID) error Close() error Type() string Path() string ShardID() ID - SetLogger(*zap.Logger) SetCompressor(cc *compression.Config) - SetShardID(id ID) // GetBytes reads object by address into memory buffer in a canonical NeoFS // binary format. Returns [apistatus.ObjectNotFound] if object is missing. @@ -58,7 +55,7 @@ func CopyBatched(dst, src Storage, batchSize int) error { defer func() { _ = src.Close() }() - err = src.Init() + err = src.Init(ID{}) if err != nil { return fmt.Errorf("initialize source sub-storage: %w", err) } @@ -70,7 +67,7 @@ func CopyBatched(dst, src Storage, batchSize int) error { defer func() { _ = dst.Close() }() - err = dst.Init() + err = dst.Init(ID{}) if err != nil { return fmt.Errorf("initialize destination sub-storage: %w", err) } diff --git a/pkg/local_object_storage/blobstor/common/storage_test.go b/pkg/local_object_storage/blobstor/common/storage_test.go index d503df8c9d..4695361c5c 100644 --- a/pkg/local_object_storage/blobstor/common/storage_test.go +++ b/pkg/local_object_storage/blobstor/common/storage_test.go @@ -29,7 +29,7 @@ func testCopy(t *testing.T, copier func(dst, src common.Storage) error) { src := fstree.New(fstree.WithPath(filepath.Join(dir, "src"))) require.NoError(t, src.Open(false)) - require.NoError(t, src.Init()) + require.NoError(t, src.Init(common.ID{})) mObjs := make(map[oid.Address][]byte, nObjects) diff --git a/pkg/local_object_storage/blobstor/fstree/common_test.go b/pkg/local_object_storage/blobstor/fstree/common_test.go index fef5c530a0..43c3f67ef5 100644 --- a/pkg/local_object_storage/blobstor/fstree/common_test.go +++ b/pkg/local_object_storage/blobstor/fstree/common_test.go @@ -5,6 +5,7 @@ import ( "fmt" "testing" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -26,7 +27,7 @@ var payloadSizes = []int{ func setupFSTree(tb testing.TB) *fstree.FSTree { fsTree := fstree.New(fstree.WithPath(tb.TempDir())) require.NoError(tb, fsTree.Open(false)) - require.NoError(tb, fsTree.Init()) + require.NoError(tb, fsTree.Init(common.ID{})) return fsTree } diff --git a/pkg/local_object_storage/blobstor/fstree/control.go b/pkg/local_object_storage/blobstor/fstree/control.go index e6639df061..096e64e960 100644 --- a/pkg/local_object_storage/blobstor/fstree/control.go +++ b/pkg/local_object_storage/blobstor/fstree/control.go @@ -10,6 +10,7 @@ import ( "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/util" + "go.uber.org/zap" ) // currentVersion contains current FSTree config version. @@ -26,6 +27,7 @@ type fsDescriptor struct { Version int `json:"version"` Depth uint64 `json:"depth"` ShardID string `json:"shard_id"` + Subtype string `json:"subtype"` } func (t *FSTree) descriptorPath() string { @@ -33,17 +35,27 @@ func (t *FSTree) descriptorPath() string { } // Init implements common.Storage. -func (t *FSTree) Init() error { +func (t *FSTree) Init(id common.ID) error { err := util.MkdirAllX(t.RootPath, t.Permissions) if err != nil { return fmt.Errorf("mkdir all for %q: %w", t.RootPath, err) } + if !id.IsZero() { + t.shardID = id + t.shardIDSet = true + } + err = t.checkConfig() if err != nil { return err } + t.log = t.log.With( + zap.String("substorage", t.subtype), + zap.String("shard_id", t.shardID.String()), + ) + if !t.readOnly { var w = newSpecificWriter(t) if w != nil { @@ -68,15 +80,19 @@ func (t *FSTree) checkConfig() error { if t.readOnly { return fmt.Errorf("descriptor %q is missing, can't open read-only storage", descPath) } - // create new descriptor - var shardID string - if !t.shardID.IsZero() { - shardID = t.shardID.String() + if !t.shardIDSet { + t.shardID, err = common.NewID() + if err != nil { + return fmt.Errorf("generate shard ID: %w", err) + } + t.shardIDSet = true } + // create new descriptor d := fsDescriptor{ Version: currentVersion, Depth: t.Depth, - ShardID: shardID, + ShardID: t.shardID.String(), + Subtype: t.subtype, } data, err := json.Marshal(d) if err != nil { @@ -107,6 +123,10 @@ func (t *FSTree) checkConfig() error { if d.Version != currentVersion { return fmt.Errorf("unsupported layout version: %d (current version: %d)", d.Version, currentVersion) } + if d.Subtype != t.subtype { + return fmt.Errorf("subtype mismatch: on-disk subtype=%s, configured subtype=%s", d.Subtype, t.subtype) + } + if t.depthSet { if d.Depth != t.Depth { return fmt.Errorf("layout mismatch: on-disk depth=%d, configured depth=%d", d.Depth, t.Depth) @@ -135,7 +155,8 @@ func (t *FSTree) checkConfig() error { } // migrateDescriptorFrom1Version migrates descriptor from version 1 to version 2. -// In version 1, ShardID was path-based and needs to be updated during migration. +// In version 1, ShardID was path-based and needs to be updated during migration, +// and a new subtype field was presented. func (t *FSTree) migrateDescriptorFrom1Version(d *fsDescriptor, descPath string) error { if t.depthSet { if d.Depth != t.Depth { @@ -153,10 +174,19 @@ func (t *FSTree) migrateDescriptorFrom1Version(d *fsDescriptor, descPath string) } } + if !t.shardIDSet { + id, err := common.NewID() + if err != nil { + return fmt.Errorf("generate shard ID during migration: %w", err) + } + t.shardID = id + t.shardIDSet = true + } + if !t.readOnly { d.Version = currentVersion - // update shard ID d.ShardID = t.shardID.String() + d.Subtype = t.subtype data, err := json.Marshal(d) if err != nil { return fmt.Errorf("encode descriptor to JSON during migration: %w", err) diff --git a/pkg/local_object_storage/blobstor/fstree/fstree.go b/pkg/local_object_storage/blobstor/fstree/fstree.go index b1cd3d58d0..14c5ee289f 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree.go @@ -39,6 +39,7 @@ type FSTree struct { noSync bool readOnly bool shardID common.ID + subtype string combinedCountLimit int combinedSizeLimit int @@ -111,6 +112,7 @@ func New(opts ...Option) *FSTree { combinedSizeThreshold: 128 * 1024, combinedWriteInterval: 10 * time.Millisecond, log: zap.NewNop(), + subtype: SubtypeBlobstor, } for i := range opts { opts[i](f) @@ -561,6 +563,11 @@ func (t *FSTree) GetRangeStream(addr oid.Address, off uint64, ln uint64) (io.Rea // Type is fstree storage type used in logs and configuration. const Type = "fstree" +const ( + SubtypeBlobstor = "blobstor" + SubtypeWriteCache = "write-cache" +) + // Type implements common.Storage. func (*FSTree) Type() string { return Type @@ -596,28 +603,11 @@ func (t *FSTree) ShardID() common.ID { return t.shardID } -// SetShardID sets the shard ID to be written to the on-disk descriptor. -// Must be called after the shard ID was generated and before Init(). -func (t *FSTree) SetShardID(id common.ID) { - if id.IsZero() { - t.shardID = common.ID{} - t.shardIDSet = false - return - } - t.shardID = id - t.shardIDSet = true -} - // SetCompressor implements common.Storage. func (t *FSTree) SetCompressor(cc *compression.Config) { t.Config = cc } -// SetLogger sets logger. It is used after the shard ID was generated to use it in logs. -func (t *FSTree) SetLogger(l *zap.Logger) { - t.log = l.With(zap.String("substorage", Type)) -} - // CleanUpTmp removes all temporary files garbage. func (t *FSTree) CleanUpTmp() error { if t.readOnly { diff --git a/pkg/local_object_storage/blobstor/fstree/fstree_descriptor_test.go b/pkg/local_object_storage/blobstor/fstree/fstree_descriptor_test.go index bfedb9b40e..5170bdebde 100644 --- a/pkg/local_object_storage/blobstor/fstree/fstree_descriptor_test.go +++ b/pkg/local_object_storage/blobstor/fstree/fstree_descriptor_test.go @@ -18,21 +18,19 @@ func TestFSTreeDescriptor_CreateAndValidate(t *testing.T) { ) id1, err := common.NewID() require.NoError(t, err) - fs1.SetShardID(id1) - require.NoError(t, fs1.Init()) + require.NoError(t, fs1.Init(id1)) desc := filepath.Join(dir, ".fstree.json") b, err := os.ReadFile(desc) require.NoError(t, err) - require.JSONEq(t, `{"version": 2,"depth": 2,"shard_id": "`+id1.String()+`"}`, string(b)) + require.JSONEq(t, `{"version": 2,"depth": 2,"shard_id": "`+id1.String()+`","subtype":"blobstor"}`, string(b)) t.Run("same config", func(t *testing.T) { fs := New( WithPath(dir), WithDepth(2), ) - fs.SetShardID(id1) - err = fs.Init() + err = fs.Init(id1) require.NoError(t, err) }) @@ -41,8 +39,7 @@ func TestFSTreeDescriptor_CreateAndValidate(t *testing.T) { WithPath(dir), WithDepth(3), // mismatch ) - fs.SetShardID(id1) - err = fs.Init() + err = fs.Init(id1) require.EqualError(t, err, "layout mismatch: on-disk depth=2, configured depth=3") }) @@ -53,24 +50,35 @@ func TestFSTreeDescriptor_CreateAndValidate(t *testing.T) { ) id2, err := common.NewID() require.NoError(t, err) - fs.SetShardID(id2) // mismatch - err = fs.Init() + err = fs.Init(id2) require.EqualError(t, err, "shard ID mismatch: on-disk shard ID="+id1.String()+", configured shard ID="+id2.String()) }) t.Run("version mismatch", func(t *testing.T) { - data := []byte(`{"version":3,"depth":2,"shard_id":"` + id1.String() + `"}`) // version mismatch + data := []byte(`{"version":3,"depth":2,"shard_id":"` + id1.String() + `","subtype":"blobstor"}`) // version mismatch require.NoError(t, os.WriteFile(desc, data, 0o600)) fs := New( WithPath(dir), WithDepth(2), ) - fs.SetShardID(id1) - err = fs.Init() + err = fs.Init(id1) require.EqualError(t, err, "unsupported layout version: 3 (current version: 2)") }) + t.Run("subtype mismatch", func(t *testing.T) { + data := []byte(`{"version":2,"depth":2,"shard_id":"` + id1.String() + `","subtype":"blobstor"}`) + require.NoError(t, os.WriteFile(desc, data, 0o600)) + + fs := New( + WithPath(dir), + WithDepth(2), + WithSubtype(SubtypeWriteCache), + ) + err = fs.Init(id1) + require.EqualError(t, err, "subtype mismatch: on-disk subtype=blobstor, configured subtype=write-cache") + }) + t.Run("invalid Json", func(t *testing.T) { require.NoError(t, os.WriteFile(desc, []byte("{invalid"), 0o600)) @@ -78,8 +86,7 @@ func TestFSTreeDescriptor_CreateAndValidate(t *testing.T) { WithPath(dir), WithDepth(2), ) - fs.SetShardID(id1) - err = fs.Init() + err = fs.Init(id1) require.ErrorContains(t, err, "decode descriptor from JSON:") }) @@ -91,8 +98,7 @@ func TestFSTreeDescriptor_CreateAndValidate(t *testing.T) { WithPath(dir), WithDepth(2), ) - fs.SetShardID(id1) - err = fs.Init() + err = fs.Init(id1) require.ErrorContains(t, err, "decode descriptor from JSON:") require.ErrorContains(t, err, "unknown field \"extra\"") }) @@ -122,7 +128,7 @@ func TestFSTreeDescriptor_MigrationFrom1Version(t *testing.T) { { name: "path-based without configured ID", initialShardID: "/storage/fstree1", - expectedShardID: "", + expectedShardID: "generated", }, { name: "empty shard ID configured ID", @@ -144,23 +150,23 @@ func TestFSTreeDescriptor_MigrationFrom1Version(t *testing.T) { WithPath(dir), WithDepth(2), ) - if !tc.configuredShardID.IsZero() { - fs.SetShardID(tc.configuredShardID) - } - require.NoError(t, fs.Init()) + require.NoError(t, fs.Init(tc.configuredShardID)) require.NoError(t, fs.Close()) b, err := os.ReadFile(desc) require.NoError(t, err) - require.JSONEq(t, `{"version":2,"depth":2,"shard_id":"`+tc.expectedShardID+`"}`, string(b)) + if tc.expectedShardID == "generated" { + require.NotContains(t, string(b), `"shard_id":""`) + } else { + require.JSONEq(t, `{"version":2,"depth":2,"shard_id":"`+tc.expectedShardID+`","subtype":"blobstor"}`, string(b)) + } if !tc.configuredShardID.IsZero() { fs2 := New( WithPath(dir), WithDepth(2), ) - fs2.SetShardID(tc.configuredShardID) - require.NoError(t, fs2.Init()) + require.NoError(t, fs2.Init(tc.configuredShardID)) require.NoError(t, fs2.Close()) } @@ -169,8 +175,7 @@ func TestFSTreeDescriptor_MigrationFrom1Version(t *testing.T) { WithPath(dir), WithDepth(2), ) - fs3.SetShardID(id2) - err = fs3.Init() + err = fs3.Init(id2) require.EqualError(t, err, "shard ID mismatch: on-disk shard ID="+tc.expectedShardID+", configured shard ID="+id2.String()) } }) diff --git a/pkg/local_object_storage/blobstor/fstree/getstream_test.go b/pkg/local_object_storage/blobstor/fstree/getstream_test.go index 09ed03f0c6..6e9076458a 100644 --- a/pkg/local_object_storage/blobstor/fstree/getstream_test.go +++ b/pkg/local_object_storage/blobstor/fstree/getstream_test.go @@ -9,6 +9,7 @@ import ( "testing" iobject "github.com/nspcc-dev/neofs-node/internal/object" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/compression" "github.com/nspcc-dev/neofs-node/pkg/util" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" @@ -234,7 +235,7 @@ func TestGetStreamAfterErrors(t *testing.T) { func setupFSTree(t *testing.T) *FSTree { tree := New(WithPath(t.TempDir())) require.NoError(t, tree.Open(false)) - require.NoError(t, tree.Init()) + require.NoError(t, tree.Init(common.ID{})) t.Cleanup(func() { require.NoError(t, tree.Close()) }) return tree } diff --git a/pkg/local_object_storage/blobstor/fstree/option.go b/pkg/local_object_storage/blobstor/fstree/option.go index 1edc271472..9baf1e2285 100644 --- a/pkg/local_object_storage/blobstor/fstree/option.go +++ b/pkg/local_object_storage/blobstor/fstree/option.go @@ -3,6 +3,8 @@ package fstree import ( "io/fs" "time" + + "go.uber.org/zap" ) type Option func(*FSTree) @@ -26,6 +28,18 @@ func WithPath(p string) Option { } } +func WithLogger(l *zap.Logger) Option { + return func(f *FSTree) { + f.log = l + } +} + +func WithSubtype(st string) Option { + return func(f *FSTree) { + f.subtype = st + } +} + func WithNoSync(noSync bool) Option { return func(f *FSTree) { f.noSync = noSync diff --git a/pkg/local_object_storage/blobstor/internal/storagetest/control.go b/pkg/local_object_storage/blobstor/internal/storagetest/control.go index 2348603adb..3236a5a614 100644 --- a/pkg/local_object_storage/blobstor/internal/storagetest/control.go +++ b/pkg/local_object_storage/blobstor/internal/storagetest/control.go @@ -14,7 +14,7 @@ import ( func TestControl(t *testing.T, cons Constructor, minSize, maxSize uint64) { s := cons(t) require.NoError(t, s.Open(false)) - require.NoError(t, s.Init()) + require.NoError(t, s.Init(common.ID{})) objects := prepare(t, 10, s, minSize, maxSize) objectsBatch := prepareBatch(t, 10, s, minSize, maxSize) diff --git a/pkg/local_object_storage/blobstor/internal/storagetest/delete.go b/pkg/local_object_storage/blobstor/internal/storagetest/delete.go index 4a2ffed212..49c1f8982a 100644 --- a/pkg/local_object_storage/blobstor/internal/storagetest/delete.go +++ b/pkg/local_object_storage/blobstor/internal/storagetest/delete.go @@ -3,6 +3,7 @@ package storagetest import ( "testing" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" @@ -11,7 +12,7 @@ import ( func TestDelete(t *testing.T, cons Constructor, minSize, maxSize uint64) { s := cons(t) require.NoError(t, s.Open(false)) - require.NoError(t, s.Init()) + require.NoError(t, s.Init(common.ID{})) t.Cleanup(func() { require.NoError(t, s.Close()) }) testDelete := func(t *testing.T, objects []objectDesc) { diff --git a/pkg/local_object_storage/blobstor/internal/storagetest/exists.go b/pkg/local_object_storage/blobstor/internal/storagetest/exists.go index 9c87f6c9a2..dd5bd3c2be 100644 --- a/pkg/local_object_storage/blobstor/internal/storagetest/exists.go +++ b/pkg/local_object_storage/blobstor/internal/storagetest/exists.go @@ -3,6 +3,7 @@ package storagetest import ( "testing" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" ) @@ -10,7 +11,7 @@ import ( func TestExists(t *testing.T, cons Constructor, minSize, maxSize uint64) { s := cons(t) require.NoError(t, s.Open(false)) - require.NoError(t, s.Init()) + require.NoError(t, s.Init(common.ID{})) t.Cleanup(func() { require.NoError(t, s.Close()) }) objects := prepare(t, 1, s, minSize, maxSize) diff --git a/pkg/local_object_storage/blobstor/internal/storagetest/get.go b/pkg/local_object_storage/blobstor/internal/storagetest/get.go index 87318df308..65002ed528 100644 --- a/pkg/local_object_storage/blobstor/internal/storagetest/get.go +++ b/pkg/local_object_storage/blobstor/internal/storagetest/get.go @@ -3,6 +3,7 @@ package storagetest import ( "testing" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" @@ -11,7 +12,7 @@ import ( func TestGet(t *testing.T, cons Constructor, minSize, maxSize uint64) { s := cons(t) require.NoError(t, s.Open(false)) - require.NoError(t, s.Init()) + require.NoError(t, s.Init(common.ID{})) t.Cleanup(func() { require.NoError(t, s.Close()) }) t.Run("missing object", func(t *testing.T) { diff --git a/pkg/local_object_storage/blobstor/internal/storagetest/get_range.go b/pkg/local_object_storage/blobstor/internal/storagetest/get_range.go index c91390f19b..365117646b 100644 --- a/pkg/local_object_storage/blobstor/internal/storagetest/get_range.go +++ b/pkg/local_object_storage/blobstor/internal/storagetest/get_range.go @@ -5,6 +5,7 @@ import ( "testing" "testing/iotest" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" "github.com/stretchr/testify/require" @@ -13,7 +14,7 @@ import ( func TestGetRangeStream(t *testing.T, cons Constructor, minSize, maxSize uint64) { s := cons(t) require.NoError(t, s.Open(false)) - require.NoError(t, s.Init()) + require.NoError(t, s.Init(common.ID{})) t.Cleanup(func() { require.NoError(t, s.Close()) }) t.Run("missing object", func(t *testing.T) { diff --git a/pkg/local_object_storage/blobstor/internal/storagetest/iterate.go b/pkg/local_object_storage/blobstor/internal/storagetest/iterate.go index 91c304bf85..c1e5f0349e 100644 --- a/pkg/local_object_storage/blobstor/internal/storagetest/iterate.go +++ b/pkg/local_object_storage/blobstor/internal/storagetest/iterate.go @@ -4,6 +4,7 @@ import ( "errors" "testing" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" "github.com/stretchr/testify/require" ) @@ -11,7 +12,7 @@ import ( func TestIterate(t *testing.T, cons Constructor, minSize, maxSize uint64) { s := cons(t) require.NoError(t, s.Open(false)) - require.NoError(t, s.Init()) + require.NoError(t, s.Init(common.ID{})) t.Cleanup(func() { require.NoError(t, s.Close()) }) objects := prepare(t, 10, s, minSize, maxSize) diff --git a/pkg/local_object_storage/engine/container_test.go b/pkg/local_object_storage/engine/container_test.go index f979b86d61..ad9d3f31db 100644 --- a/pkg/local_object_storage/engine/container_test.go +++ b/pkg/local_object_storage/engine/container_test.go @@ -43,8 +43,6 @@ func TestStorageEngine_ContainerCleanUp(t *testing.T) { ) require.NoError(t, err) } - require.NoError(t, e.Open()) - o1 := objecttest.Object() o2 := objecttest.Object() o2.SetPayload(make([]byte, errSmallSize+1)) diff --git a/pkg/local_object_storage/engine/control.go b/pkg/local_object_storage/engine/control.go index 0a686ed299..08b7c96d0f 100644 --- a/pkg/local_object_storage/engine/control.go +++ b/pkg/local_object_storage/engine/control.go @@ -10,11 +10,6 @@ import ( "go.uber.org/zap" ) -// Open opens all StorageEngine's components. -func (e *StorageEngine) Open() error { - return e.open() -} - func (e *StorageEngine) open() error { e.mtx.Lock() defer e.mtx.Unlock() @@ -35,23 +30,15 @@ func (e *StorageEngine) open() error { return nil } -// Init initializes all StorageEngine's components. +// Init initializes the engine runtime state for already attached shards. func (e *StorageEngine) Init() error { e.mtx.Lock() - defer e.mtx.Unlock() - - for id, sh := range e.shards { - if err := sh.Init(); err != nil { - if !e.isIgnoreUninitedShards { - return fmt.Errorf("init shard %s: %w", id, err) - } - e.log.Debug("could not init shard", - zap.String("id", id), - zap.Error(err), - ) - delete(e.shards, id) - } + if e.inited { + e.mtx.Unlock() + return nil } + e.inited = true + e.mtx.Unlock() err := e.deleteNotFoundContainers() if err != nil { @@ -251,29 +238,12 @@ loop: } for _, newID := range shardsToAdd { - sh, err := e.createShard(rcfg.shards[newID]) + sh, err := e.attachShard(rcfg.shards[newID]) if err != nil { return fmt.Errorf("could not add new shard with '%s' metabase path: %w", newID, err) } - idStr := sh.ID().String() - - err = sh.Open() - if err == nil { - err = sh.Init() - } - if err != nil { - _ = sh.Close() - return fmt.Errorf("could not init %s shard: %w", idStr, err) - } - - err = e.addShard(sh) - if err != nil { - _ = sh.Close() - return fmt.Errorf("could not add %s shard: %w", idStr, err) - } - - e.log.Info("added new shard", zap.String("id", idStr)) + e.log.Info("added new shard", zap.Stringer("id", sh.ID())) } return nil diff --git a/pkg/local_object_storage/engine/control_test.go b/pkg/local_object_storage/engine/control_test.go index 88bf4e8faa..0aeb3f6367 100644 --- a/pkg/local_object_storage/engine/control_test.go +++ b/pkg/local_object_storage/engine/control_test.go @@ -32,11 +32,7 @@ func TestInitializationFailure(t *testing.T) { badDir := filepath.Join(t.TempDir(), "missing") testShard := func(c paths) []shard.Option { - sid, err := generateShardID() - require.NoError(t, err) - return []shard.Option{ - shard.WithID(sid), shard.WithLogger(zaptest.NewLogger(t)), shard.WithBlobstor( newStorage(c.storage)), @@ -57,7 +53,7 @@ func TestInitializationFailure(t *testing.T) { badDir := filepath.Join(badDir, t.Name()) require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) require.NoError(t, os.Chmod(badDir, 0)) - testEngineFailInitAndReload(t, badDir, false, testShard(paths{ + testEngineFailInitAndReload(t, badDir, true, testShard(paths{ storage: filepath.Join(badDir, "0"), metabase: filepath.Join(existsDir, t.Name(), "1"), writecache: filepath.Join(existsDir, t.Name(), "2"), @@ -67,7 +63,7 @@ func TestInitializationFailure(t *testing.T) { badDir := filepath.Join(badDir, t.Name()) require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) require.NoError(t, os.Chmod(badDir, 0)) - testEngineFailInitAndReload(t, badDir, true, testShard(paths{ + testEngineFailInitAndReload(t, badDir, false, testShard(paths{ storage: filepath.Join(existsDir, t.Name(), "0"), metabase: filepath.Join(badDir, "1"), writecache: filepath.Join(existsDir, t.Name(), "2"), @@ -77,7 +73,7 @@ func TestInitializationFailure(t *testing.T) { badDir := filepath.Join(badDir, t.Name()) require.NoError(t, os.MkdirAll(badDir, os.ModePerm)) require.NoError(t, os.Chmod(badDir, 0)) - testEngineFailInitAndReload(t, badDir, false, testShard(paths{ + testEngineFailInitAndReload(t, badDir, true, testShard(paths{ storage: filepath.Join(existsDir, t.Name(), "0"), metabase: filepath.Join(existsDir, t.Name(), "1"), writecache: filepath.Join(badDir, "2"), @@ -92,9 +88,8 @@ func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s [ _, err := e.AddShard(s...) if errOnAdd { require.Error(t, err) - // This branch is only taken when we cannot update shard ID in the metabase. - // The id cannot be encountered during normal operation, but it is ok for tests: - // it is only compared for equality with other ids and we have 0 shards here. + // AddShard initializes the shard eagerly, so unrecoverable blobstor/write-cache + // failures surface here before the shard is attached. configID = "id" } else { require.NoError(t, err) @@ -107,10 +102,7 @@ func testEngineFailInitAndReload(t *testing.T, badDir string, errOnAdd bool, s [ configID = calculateShardID(e.shards[id].DumpInfo()) e.mtx.RUnlock() - err = e.Open() - if err == nil { - require.Error(t, e.Init()) - } + require.NoError(t, e.Init()) } require.NoError(t, os.Chmod(badDir, os.ModePerm)) @@ -265,7 +257,6 @@ func engineWithShards(t *testing.T, path string, num int) (*StorageEngine, []str require.Equal(t, num, len(e.shards)) - require.NoError(t, e.Open()) require.NoError(t, e.Init()) return e, currShards diff --git a/pkg/local_object_storage/engine/engine.go b/pkg/local_object_storage/engine/engine.go index adab654e03..cbc5563461 100644 --- a/pkg/local_object_storage/engine/engine.go +++ b/pkg/local_object_storage/engine/engine.go @@ -37,6 +37,8 @@ type StorageEngine struct { blockMtx sync.RWMutex blockErr error + inited bool + sortShardsFn func(*StorageEngine, oid.ID) []shardWrapper } diff --git a/pkg/local_object_storage/engine/engine_test.go b/pkg/local_object_storage/engine/engine_test.go index 09f1e25e69..a40c18aef0 100644 --- a/pkg/local_object_storage/engine/engine_test.go +++ b/pkg/local_object_storage/engine/engine_test.go @@ -99,13 +99,9 @@ func newStorage(root string) common.Storage { } func testNewShard(t testing.TB, id int) *shard.Shard { - sid, err := generateShardID() - require.NoError(t, err) - dir := t.TempDir() s := shard.New( - shard.WithID(sid), shard.WithLogger(zap.L()), shard.WithBlobstor( newStorage(filepath.Join(dir, fmt.Sprintf("%d.fstree", id)))), @@ -142,7 +138,6 @@ func testEngineFromShardOpts(t *testing.T, num int, extraOpts []shard.Option) *S require.NoError(t, err) } - require.NoError(t, engine.Open()) require.NoError(t, engine.Init()) return engine diff --git a/pkg/local_object_storage/engine/error_test.go b/pkg/local_object_storage/engine/error_test.go index 201ad7f967..60cff2be69 100644 --- a/pkg/local_object_storage/engine/error_test.go +++ b/pkg/local_object_storage/engine/error_test.go @@ -43,7 +43,6 @@ func newEngine(t testing.TB, dir string, opts ...Option) (*StorageEngine, string )) require.NoError(t, err) } - require.NoError(t, e.Open()) require.NoError(t, e.Init()) return e, dir, ids diff --git a/pkg/local_object_storage/engine/evacuate_test.go b/pkg/local_object_storage/engine/evacuate_test.go index 9d49eb4f3a..119e056e05 100644 --- a/pkg/local_object_storage/engine/evacuate_test.go +++ b/pkg/local_object_storage/engine/evacuate_test.go @@ -45,7 +45,6 @@ func newEngineEvacuate(t *testing.T, shardNum int, objPerShard int) (*StorageEng )) require.NoError(t, err) } - require.NoError(t, e.Open()) require.NoError(t, e.Init()) objects := make([]*object.Object, 0, objPerShard*len(ids)) diff --git a/pkg/local_object_storage/engine/gc_test.go b/pkg/local_object_storage/engine/gc_test.go index 677bdc2903..c1442ff70c 100644 --- a/pkg/local_object_storage/engine/gc_test.go +++ b/pkg/local_object_storage/engine/gc_test.go @@ -51,7 +51,6 @@ func TestChildrenExpiration(t *testing.T) { ) require.NoError(t, err) } - require.NoError(t, e.Open()) require.NoError(t, e.Init()) t.Cleanup(func() { _ = e.Close() @@ -184,7 +183,6 @@ func TestGC(t *testing.T) { ) require.NoError(t, err) } - require.NoError(t, e.Open()) require.NoError(t, e.Init()) t.Cleanup(func() { _ = e.Close() }) @@ -326,7 +324,6 @@ func TestSplitObjectExpirationWithoutLink(t *testing.T) { ) require.NoError(t, err) - require.NoError(t, e.Open()) require.NoError(t, e.Init()) t.Cleanup(func() { _ = e.Close() }) @@ -409,7 +406,6 @@ func TestSplitObjectExpirationWithLinkNotFound(t *testing.T) { ) require.NoError(t, err) - require.NoError(t, e.Open()) require.NoError(t, e.Init()) t.Cleanup(func() { _ = e.Close() }) diff --git a/pkg/local_object_storage/engine/lock_test.go b/pkg/local_object_storage/engine/lock_test.go index db6441ffcd..a26e11d837 100644 --- a/pkg/local_object_storage/engine/lock_test.go +++ b/pkg/local_object_storage/engine/lock_test.go @@ -239,7 +239,6 @@ func testLockRemoved(t *testing.T, shardNum int) { require.NoError(t, err) } - require.NoError(t, s.Open()) require.NoError(t, s.Init()) return s @@ -355,7 +354,6 @@ func TestSplitObjectLockExpiration(t *testing.T) { ) require.NoError(t, err) } - require.NoError(t, e.Open()) require.NoError(t, e.Init()) t.Cleanup(func() { _ = e.Close() }) @@ -454,7 +452,6 @@ func TestSimpleLockExpiration(t *testing.T) { ) require.NoError(t, err) } - require.NoError(t, e.Open()) require.NoError(t, e.Init()) t.Cleanup(func() { _ = e.Close() }) diff --git a/pkg/local_object_storage/engine/shards.go b/pkg/local_object_storage/engine/shards.go index f43c118185..26cd6d2b69 100644 --- a/pkg/local_object_storage/engine/shards.go +++ b/pkg/local_object_storage/engine/shards.go @@ -62,14 +62,9 @@ func (m *metricsWithID) AddToPayloadSize(size int64) { // Returns any error encountered that did not allow adding a shard. // Otherwise returns the ID of the added shard. func (e *StorageEngine) AddShard(opts ...shard.Option) (common.ID, error) { - sh, err := e.createShard(opts) - if err != nil { - return common.ID{}, fmt.Errorf("could not create a shard: %w", err) - } - - err = e.addShard(sh) + sh, err := e.attachShard(opts) if err != nil { - return common.ID{}, fmt.Errorf("could not add %s shard: %w", sh.ID().String(), err) + return common.ID{}, err } if e.metrics != nil { @@ -79,18 +74,36 @@ func (e *StorageEngine) AddShard(opts ...shard.Option) (common.ID, error) { return sh.ID(), nil } -func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { - id, err := generateShardID() +func (e *StorageEngine) attachShard(opts []shard.Option) (*shard.Shard, error) { + sh, err := e.createShard(opts) + if err != nil { + return nil, fmt.Errorf("could not create a shard: %w", err) + } + + err = sh.Open() + if err == nil { + err = sh.Init() + } if err != nil { - return nil, fmt.Errorf("could not generate shard ID: %w", err) + _ = sh.Close() + return nil, fmt.Errorf("could not initialize shard: %w", err) } + err = e.addShard(sh) + if err != nil { + _ = sh.Close() + return nil, fmt.Errorf("could not add %s shard: %w", sh.ID().String(), err) + } + + return sh, nil +} + +func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { e.mtx.RLock() if e.metrics != nil { opts = append(opts, shard.WithMetricsWriter( &metricsWithID{ - id: id.String(), mw: e.metrics, }, )) @@ -99,16 +112,11 @@ func (e *StorageEngine) createShard(opts []shard.Option) (*shard.Shard, error) { e.mtx.RUnlock() sh := shard.New(append(opts, - shard.WithID(id), shard.WithExpiredObjectsCallback(e.processExpiredObjects), shard.WithReportErrorFunc(e.reportShardErrorBackground), )...) - if err := sh.UpdateID(); err != nil { - return nil, fmt.Errorf("could not update shard ID: %w", err) - } - - return sh, err + return sh, nil } func (e *StorageEngine) addShard(sh *shard.Shard) error { @@ -172,10 +180,6 @@ func (e *StorageEngine) removeShards(ids ...string) { } } -func generateShardID() (common.ID, error) { - return common.NewID() -} - func (e *StorageEngine) sortedShards(id oid.ID) []shardWrapper { shards := e.unsortedShards() diff --git a/pkg/local_object_storage/internal/storagetest/storage.go b/pkg/local_object_storage/internal/storagetest/storage.go index 3f9a01242e..fd473531f8 100644 --- a/pkg/local_object_storage/internal/storagetest/storage.go +++ b/pkg/local_object_storage/internal/storagetest/storage.go @@ -3,6 +3,7 @@ package storagetest import ( "testing" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/stretchr/testify/require" ) @@ -11,7 +12,7 @@ import ( type Component interface { Open(bool) error SetMode(mode.Mode) error - Init() error + Init(common.ID) error Close() error } @@ -65,7 +66,7 @@ func TestCloseAfterOpen(t *testing.T, cons Constructor) { // Open in read-only must be done after the db is here. s := cons(t) require.NoError(t, s.Open(false)) - require.NoError(t, s.Init()) + require.NoError(t, s.Init(common.ID{})) require.NoError(t, s.Close()) require.NoError(t, s.Open(true)) @@ -78,7 +79,7 @@ func TestCloseTwice(t *testing.T, cons Constructor) { // Use-case: move to maintenance mode twice, first time failed. s := cons(t) require.NoError(t, s.Open(false)) - require.NoError(t, s.Init()) + require.NoError(t, s.Init(common.ID{})) require.NoError(t, s.Close()) require.NoError(t, s.Close()) // already closed, no-op } @@ -104,7 +105,7 @@ func TestSetMode(t *testing.T, cons Constructor, m mode.Mode) { s := cons(t) // Use-case: notmal node operation. require.NoError(t, s.Open(false)) - require.NoError(t, s.Init()) + require.NoError(t, s.Init(common.ID{})) require.NoError(t, s.SetMode(m)) }) } @@ -113,7 +114,7 @@ func TestModeTransition(t *testing.T, cons Constructor, from, to mode.Mode) { // Use-case: normal node operation. s := cons(t) require.NoError(t, s.Open(false)) - require.NoError(t, s.Init()) + require.NoError(t, s.Init(common.ID{})) require.NoError(t, s.SetMode(from)) require.NoError(t, s.SetMode(to)) require.NoError(t, s.Close()) diff --git a/pkg/local_object_storage/metabase/control.go b/pkg/local_object_storage/metabase/control.go index d0a58e3686..a5a0576d41 100644 --- a/pkg/local_object_storage/metabase/control.go +++ b/pkg/local_object_storage/metabase/control.go @@ -70,8 +70,15 @@ func (db *DB) openBolt() error { // // Does nothing if metabase has already been initialized and filled. To roll back the database to its initial state, // use Reset. -func (db *DB) Init() error { - return db.init(false) +func (db *DB) Init(id common.ID) error { + if !id.IsZero() { + db.log = db.log.With(zap.Stringer("shard_id", id)) + } + if err := db.init(false); err != nil { + return err + } + + return db.ensureShardID(id) } // Reset resets metabase. Works similar to Init but cleans up all static buckets and diff --git a/pkg/local_object_storage/metabase/control_test.go b/pkg/local_object_storage/metabase/control_test.go index a8ff6a2b53..d0d55d7980 100644 --- a/pkg/local_object_storage/metabase/control_test.go +++ b/pkg/local_object_storage/metabase/control_test.go @@ -4,6 +4,7 @@ import ( "path/filepath" "testing" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" oidtest "github.com/nspcc-dev/neofs-sdk-go/object/id/test" @@ -60,7 +61,7 @@ func TestOpenRO(t *testing.T) { ) require.NoError(t, db.Open(false)) - require.NoError(t, db.Init()) + require.NoError(t, db.Init(common.ID{})) obj := generateObject(t) addr := obj.Address() diff --git a/pkg/local_object_storage/metabase/db.go b/pkg/local_object_storage/metabase/db.go index 7e5f23142e..675c0bc8cb 100644 --- a/pkg/local_object_storage/metabase/db.go +++ b/pkg/local_object_storage/metabase/db.go @@ -85,11 +85,6 @@ func New(opts ...Option) *DB { } } -// SetLogger sets logger. It is used after the shard ID was generated to use it in logs. -func (db *DB) SetLogger(l *zap.Logger) { - db.log = l -} - // WithLogger returns option to set logger of DB. func WithLogger(l *zap.Logger) Option { return func(c *cfg) { diff --git a/pkg/local_object_storage/metabase/db_test.go b/pkg/local_object_storage/metabase/db_test.go index a7bf8fa351..a9ef6945c4 100644 --- a/pkg/local_object_storage/metabase/db_test.go +++ b/pkg/local_object_storage/metabase/db_test.go @@ -11,6 +11,7 @@ import ( "github.com/nspcc-dev/bbolt" "github.com/nspcc-dev/neofs-node/internal/testutil" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-sdk-go/checksum" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" @@ -61,7 +62,7 @@ func newDB(t testing.TB, opts ...meta.Option) *meta.DB { ) require.NoError(t, bdb.Open(false)) - require.NoError(t, bdb.Init()) + require.NoError(t, bdb.Init(common.ID{})) t.Cleanup(func() { bdb.Close() diff --git a/pkg/local_object_storage/metabase/last_resync_epoch_test.go b/pkg/local_object_storage/metabase/last_resync_epoch_test.go index 4d026f7510..3afcb8d652 100644 --- a/pkg/local_object_storage/metabase/last_resync_epoch_test.go +++ b/pkg/local_object_storage/metabase/last_resync_epoch_test.go @@ -4,6 +4,7 @@ import ( "path/filepath" "testing" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/stretchr/testify/require" ) @@ -20,7 +21,7 @@ func TestDB_ReadLastResyncEpoch(t *testing.T) { }...) require.NoError(t, db.Open(false)) - require.NoError(t, db.Init()) + require.NoError(t, db.Init(common.ID{})) t.Cleanup(func() { db.Close() @@ -57,7 +58,7 @@ func TestDB_ReadLastResyncEpoch(t *testing.T) { // After reload, last resync epoch the same. require.NoError(t, db.Close()) require.NoError(t, db.Open(false)) - require.NoError(t, db.Init()) + require.NoError(t, db.Init(common.ID{})) checkEpoch(t, resyncEpoch) } diff --git a/pkg/local_object_storage/metabase/mode.go b/pkg/local_object_storage/metabase/mode.go index f5d4a84331..b7865e489d 100644 --- a/pkg/local_object_storage/metabase/mode.go +++ b/pkg/local_object_storage/metabase/mode.go @@ -3,6 +3,7 @@ package meta import ( "fmt" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" ) @@ -32,7 +33,7 @@ func (db *DB) SetMode(m mode.Mode) error { err = db.Open(false) } if err == nil && !m.NoMetabase() && !m.ReadOnly() { - err = db.Init() + err = db.Init(common.ID{}) } if err != nil { diff --git a/pkg/local_object_storage/metabase/shard_id.go b/pkg/local_object_storage/metabase/shard_id.go index 627430db36..dbdb601aa3 100644 --- a/pkg/local_object_storage/metabase/shard_id.go +++ b/pkg/local_object_storage/metabase/shard_id.go @@ -2,8 +2,10 @@ package meta import ( "bytes" + "fmt" "github.com/nspcc-dev/bbolt" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" ) var ( @@ -51,3 +53,38 @@ func (db *DB) WriteShardID(id []byte) error { return b.Put(shardIDKey, id) }) } + +func (db *DB) ensureShardID(id common.ID) error { + if id.IsZero() { + return nil + } + + db.modeMtx.RLock() + defer db.modeMtx.RUnlock() + + if db.mode.NoMetabase() { + return ErrDegradedMode + } else if db.mode.ReadOnly() { + return ErrReadOnlyMode + } + + return db.boltDB.Update(func(tx *bbolt.Tx) error { + b, err := tx.CreateBucketIfNotExists(shardInfoBucket) + if err != nil { + return err + } + + storedID := b.Get(shardIDKey) + switch { + case len(storedID) == 0: + err = b.Put(shardIDKey, id.Bytes()) + if err != nil { + return fmt.Errorf("store shard ID: %w", err) + } + case !bytes.Equal(storedID, id.Bytes()): + return fmt.Errorf("shard ID mismatch: in-metabase=%q, expected=%q", storedID, id.Bytes()) + } + + return nil + }) +} diff --git a/pkg/local_object_storage/metabase/version_test.go b/pkg/local_object_storage/metabase/version_test.go index 386f6dcd7c..322d1e3f58 100644 --- a/pkg/local_object_storage/metabase/version_test.go +++ b/pkg/local_object_storage/metabase/version_test.go @@ -12,6 +12,7 @@ import ( "testing" "github.com/nspcc-dev/bbolt" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" checksumtest "github.com/nspcc-dev/neofs-sdk-go/checksum/test" cid "github.com/nspcc-dev/neofs-sdk-go/container/id" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" @@ -54,13 +55,13 @@ func TestVersion(t *testing.T) { t.Run("simple", func(t *testing.T) { db := newDB(t) require.NoError(t, db.Open(false)) - require.NoError(t, db.Init()) + require.NoError(t, db.Init(common.ID{})) check(t, db) require.NoError(t, db.Close()) t.Run("reopen", func(t *testing.T) { require.NoError(t, db.Open(false)) - require.NoError(t, db.Init()) + require.NoError(t, db.Init(common.ID{})) check(t, db) require.NoError(t, db.Close()) }) @@ -72,7 +73,7 @@ func TestVersion(t *testing.T) { require.NoError(t, db.Close()) require.NoError(t, db.Open(false)) - require.NoError(t, db.Init()) + require.NoError(t, db.Init(common.ID{})) check(t, db) require.NoError(t, db.Close()) }) @@ -85,7 +86,7 @@ func TestVersion(t *testing.T) { require.NoError(t, db.Close()) require.NoError(t, db.Open(false)) - require.Error(t, db.Init()) + require.Error(t, db.Init(common.ID{})) require.NoError(t, db.Close()) t.Run("reset", func(t *testing.T) { @@ -119,7 +120,7 @@ func newDB(t testing.TB, opts ...Option) *DB { ) require.NoError(t, bdb.Open(false)) - require.NoError(t, bdb.Init()) + require.NoError(t, bdb.Init(common.ID{})) t.Cleanup(func() { bdb.Close() diff --git a/pkg/local_object_storage/shard/bench_test.go b/pkg/local_object_storage/shard/bench_test.go index 7bcdaa8496..4d929f58a9 100644 --- a/pkg/local_object_storage/shard/bench_test.go +++ b/pkg/local_object_storage/shard/bench_test.go @@ -66,7 +66,7 @@ func BenchmarkPut(b *testing.B) { b.Run(name, func(b *testing.B) { ptt := creat(b) require.NoError(b, ptt.Open(false)) - require.NoError(b, ptt.Init()) + require.NoError(b, ptt.Init(common.ID{})) b.Cleanup(func() { _ = ptt.Close() }) benchmark(b, ptt, tc.objSize, tc.nThreads) @@ -141,7 +141,7 @@ func prepareObjects(b *testing.B, creat func(testing.TB) common.Storage, objSize ptt := creat(b) require.NoError(b, ptt.Open(false)) - require.NoError(b, ptt.Init()) + require.NoError(b, ptt.Init(common.ID{})) b.Cleanup(func() { _ = ptt.Close() }) obj := object.New(cid.ID{1, 2, 3}, usertest.ID()) diff --git a/pkg/local_object_storage/shard/control.go b/pkg/local_object_storage/shard/control.go index 21353a0e71..ba02c5f505 100644 --- a/pkg/local_object_storage/shard/control.go +++ b/pkg/local_object_storage/shard/control.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "go.uber.org/zap" @@ -34,74 +35,77 @@ func (s *Shard) handleMetabaseFailure(stage string, err error) error { // Open opens all Shard's components. func (s *Shard) Open() error { - components := []interface{ Open(bool) error }{ - s.blobStor, s.metaBase, + if err := s.blobStor.Open(false); err != nil { + return fmt.Errorf("could not open %T: %w", s.blobStor, err) } + metaErr := s.metaBase.Open(false) + if s.hasWriteCache() { - components = append(components, s.writeCache) + if err := s.writeCache.Open(false); err != nil { + return fmt.Errorf("could not open %T: %w", s.writeCache, err) + } } - for i, component := range components { - if err := component.Open(false); err != nil { - if component == s.metaBase { - // We must first open all other components to avoid - // opening non-existent DB in read-only mode. - for j := i + 1; j < len(components); j++ { - if err := components[j].Open(false); err != nil { - // Other components must be opened, fail. - return fmt.Errorf("could not open %T: %w", components[j], err) - } - } - err = s.handleMetabaseFailure("open", err) - if err != nil { - return err - } - - break - } + if metaErr == nil { + return nil + } - return fmt.Errorf("could not open %T: %w", component, err) - } + if !s.initedStorage { + s.metaBaseOpenErr = metaErr + return nil } - return nil + return s.handleMetabaseFailure("open", metaErr) } // Init initializes all Shard's components. func (s *Shard) Init() error { - type initializer interface { - Init() error + if err := s.compression.Init(); err != nil { + return fmt.Errorf("could not initialize %T: %w", &s.compression, err) } - var components = []initializer{&s.compression, s.blobStor} + if err := s.blobStor.Init(common.ID{}); err != nil { + return fmt.Errorf("could not initialize %T: %w", s.blobStor, err) + } + s.initedStorage = true - if !s.GetMode().NoMetabase() { - components = append(components, s.metaBase) + shardID := s.blobStor.ShardID() + if shardID.IsZero() { + return fmt.Errorf("%T returned empty shard ID after init", s.blobStor) + } + + s.info.ID = shardID + if s.metricsWriter != nil { + s.metricsWriter.SetShardID(shardID.String()) } + l := s.log.With(zap.String("shard_id", shardID.String())) + s.log = l + s.gcCfg.log = s.gcCfg.log.With(zap.String("shard_id", shardID.String())) if s.hasWriteCache() { - components = append(components, s.writeCache) + if err := s.writeCache.Init(shardID); err != nil { + return fmt.Errorf("could not initialize %T: %w", s.writeCache, err) + } } - for _, component := range components { - if err := component.Init(); err != nil { - if component == s.metaBase { - if errors.Is(err, meta.ErrOutdatedVersion) { - return fmt.Errorf("metabase initialization: %w", err) - } - - err = s.handleMetabaseFailure("init", err) - if err != nil { - return err - } - - break + if s.metaBaseOpenErr != nil { + err := s.handleMetabaseFailure("open", s.metaBaseOpenErr) + if err != nil { + return err + } + s.metaBaseOpenErr = nil + } + + if !s.GetMode().NoMetabase() { + if err := s.metaBase.Init(shardID); err != nil { + if errors.Is(err, meta.ErrOutdatedVersion) { + return fmt.Errorf("metabase initialization: %w", err) } - return fmt.Errorf("could not initialize %T: %w", component, err) - } - if component == s.blobStor { - s.initedStorage = true + err = s.handleMetabaseFailure("init", err) + if err != nil { + return err + } } } @@ -170,7 +174,7 @@ func (s *Shard) Reload(opts ...Option) error { return err } if ok { - err = s.metaBase.Init() + err = s.metaBase.Init(s.ID()) if err != nil { s.log.Error("can't initialize metabase, move to a degraded-read-only mode", zap.Error(err)) _ = s.setMode(mode.DegradedReadOnly) diff --git a/pkg/local_object_storage/shard/control_test.go b/pkg/local_object_storage/shard/control_test.go index 780eefd72e..fe859c98c7 100644 --- a/pkg/local_object_storage/shard/control_test.go +++ b/pkg/local_object_storage/shard/control_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" @@ -125,14 +124,11 @@ func TestResyncMetabase(t *testing.T) { defer os.RemoveAll(p) - shID, err := common.NewID() - require.NoError(t, err) sh := New( WithBlobstor(fstree.New( fstree.WithPath(filepath.Join(p, "fstree")), fstree.WithDepth(1)), ), - WithID(shID), WithLogger(zaptest.NewLogger(t)), WithMetaBaseOptions( meta.WithPath(filepath.Join(p, "meta")), @@ -146,8 +142,6 @@ func TestResyncMetabase(t *testing.T) { ), ) - require.NoError(t, sh.UpdateID()) - // open Blobstor require.NoError(t, sh.Open()) @@ -190,6 +184,7 @@ func TestResyncMetabase(t *testing.T) { tombObj.AssociateDeleted(tombedID) tombedAddress := oid.NewAddress(tombObj.GetContainerID(), tombedID) + var err error for _, v := range mObjs { err = sh.Put(v.obj, nil) require.NoError(t, err) @@ -271,7 +266,6 @@ func TestResyncMetabase(t *testing.T) { fstree.WithPath(filepath.Join(p, "fstree")), fstree.WithDepth(1)), ), - WithID(shID), WithLogger(zaptest.NewLogger(t)), WithMetaBaseOptions( meta.WithPath(filepath.Join(p, "meta_restored")), @@ -285,8 +279,6 @@ func TestResyncMetabase(t *testing.T) { ), ) - require.NoError(t, sh.UpdateID()) - // open Blobstor require.NoError(t, sh.Open()) diff --git a/pkg/local_object_storage/shard/ec_test.go b/pkg/local_object_storage/shard/ec_test.go index 235d5e6752..fca950b62e 100644 --- a/pkg/local_object_storage/shard/ec_test.go +++ b/pkg/local_object_storage/shard/ec_test.go @@ -12,6 +12,7 @@ import ( iec "github.com/nspcc-dev/neofs-node/internal/ec" ierrors "github.com/nspcc-dev/neofs-node/internal/errors" "github.com/nspcc-dev/neofs-node/internal/testutil" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" apistatus "github.com/nspcc-dev/neofs-sdk-go/client/status" cidtest "github.com/nspcc-dev/neofs-sdk-go/container/id/test" @@ -205,7 +206,7 @@ func TestShard_GetECPart(t *testing.T) { ) require.NoError(t, mb.Open(false)) t.Cleanup(func() { _ = mb.Close() }) - require.NoError(t, mb.Init()) + require.NoError(t, mb.Init(common.ID{})) sysObj := *newObject(t) sysObj.SetContainerID(cnr) @@ -243,7 +244,7 @@ func TestShard_GetECPart(t *testing.T) { ) require.NoError(t, mb.Open(false)) t.Cleanup(func() { _ = mb.Close() }) - require.NoError(t, mb.Init()) + require.NoError(t, mb.Init(common.ID{})) payload := testutil.RandByteSlice(32) // any @@ -523,7 +524,7 @@ func TestShard_GetECPartRange(t *testing.T) { ) require.NoError(t, mb.Open(false)) t.Cleanup(func() { _ = mb.Close() }) - require.NoError(t, mb.Init()) + require.NoError(t, mb.Init(common.ID{})) sysObj := *newObject(t) sysObj.SetContainerID(cnr) @@ -728,7 +729,7 @@ func TestShard_HeadECPart(t *testing.T) { ) require.NoError(t, mb.Open(false)) t.Cleanup(func() { _ = mb.Close() }) - require.NoError(t, mb.Init()) + require.NoError(t, mb.Init(common.ID{})) sysObj := *newObject(t) sysObj.SetContainerID(cnr) diff --git a/pkg/local_object_storage/shard/gc_test.go b/pkg/local_object_storage/shard/gc_test.go index 4e342fd067..27f7a779c7 100644 --- a/pkg/local_object_storage/shard/gc_test.go +++ b/pkg/local_object_storage/shard/gc_test.go @@ -309,6 +309,7 @@ func TestContainerPayments(t *testing.T) { Fields: map[string]any{ "epoch": json.Number(strconv.FormatInt(currEpoch, 10)), "cID": cID.String(), + "shard_id": sh.ID().String(), "unpaidSince": json.Number(strconv.FormatInt(unpaidSince, 10)), }, } @@ -333,7 +334,8 @@ func TestContainerPayments(t *testing.T) { Level: zap.DebugLevel, Message: "payments system is disabled, skipping container payments check", Fields: map[string]any{ - "epoch": json.Number(strconv.FormatInt(currEpoch, 10)), + "epoch": json.Number(strconv.FormatInt(currEpoch, 10)), + "shard_id": sh.ID().String(), }, } lb.AssertContains(expLog) diff --git a/pkg/local_object_storage/shard/generic_test.go b/pkg/local_object_storage/shard/generic_test.go index f0d66e9d36..42ca2174f2 100644 --- a/pkg/local_object_storage/shard/generic_test.go +++ b/pkg/local_object_storage/shard/generic_test.go @@ -33,7 +33,7 @@ func (m *ModeAwareStorage) SetMode(newMode mode.Mode) error { err := m.Close() if err == nil { if err = m.Open(newMode.ReadOnly()); err == nil { - err = m.Init() + err = m.Init(common.ID{}) } } @@ -59,7 +59,7 @@ func TestBlobstorGeneric(t *testing.T) { // fstree must be initialized to create a descriptor require.NoError(t, fsTree.Open(false)) - require.NoError(t, fsTree.Init()) + require.NoError(t, fsTree.Init(common.ID{})) require.NoError(t, fsTree.Close()) return NewModeAwareStorage(fsTree) diff --git a/pkg/local_object_storage/shard/id.go b/pkg/local_object_storage/shard/id.go deleted file mode 100644 index caa04aa140..0000000000 --- a/pkg/local_object_storage/shard/id.go +++ /dev/null @@ -1,66 +0,0 @@ -package shard - -import ( - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" - "go.uber.org/zap" -) - -// ID returns Shard identifier. -func (s *Shard) ID() common.ID { - return s.info.ID -} - -// UpdateID reads shard ID saved in the metabase and updates it if it is missing. -func (s *Shard) UpdateID() (err error) { - if err = s.metaBase.Open(false); err != nil { - return err - } - defer func() { - cErr := s.metaBase.Close() - if err == nil { - err = cErr - } - }() - id, err := s.metaBase.ReadShardID() - if err != nil { - return err - } - if len(id) != 0 { - s.info.ID, err = common.NewIDFromBytes(id) - if err != nil { - return err - } - - if s.metricsWriter != nil { - s.metricsWriter.SetShardID(s.info.ID.String()) - } - } else { - blobShardID := s.blobStor.ShardID() - if !blobShardID.IsZero() { - s.info.ID = blobShardID - - if s.metricsWriter != nil { - s.metricsWriter.SetShardID(s.info.ID.String()) - } - } - } - - var ( - sID = s.info.ID.String() - l = s.log.With(zap.String("shard_id", sID)) - ) - s.log = l - s.gcCfg.log = s.gcCfg.log.With(zap.String("shard_id", sID)) - s.metaBase.SetLogger(l) - s.blobStor.SetLogger(l) - s.blobStor.SetShardID(s.info.ID) - if s.hasWriteCache() { - s.writeCache.SetLogger(l) - s.writeCache.SetShardIDMetrics(sID) - } - - if len(id) != 0 { - return nil - } - return s.metaBase.WriteShardID(s.info.ID.Bytes()) -} diff --git a/pkg/local_object_storage/shard/mode.go b/pkg/local_object_storage/shard/mode.go index 1c7e8400a3..a75a47f142 100644 --- a/pkg/local_object_storage/shard/mode.go +++ b/pkg/local_object_storage/shard/mode.go @@ -3,6 +3,7 @@ package shard import ( "fmt" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/util/logicerr" "go.uber.org/zap" @@ -85,7 +86,7 @@ func (s *Shard) setModeStorage(m mode.Mode) error { err := s.blobStor.Close() if err == nil { if err = s.blobStor.Open(m.ReadOnly()); err == nil && s.initedStorage { - err = s.blobStor.Init() + err = s.blobStor.Init(common.ID{}) } } if err != nil { diff --git a/pkg/local_object_storage/shard/shard.go b/pkg/local_object_storage/shard/shard.go index 51052763c9..8206ab98de 100644 --- a/pkg/local_object_storage/shard/shard.go +++ b/pkg/local_object_storage/shard/shard.go @@ -32,6 +32,8 @@ type Shard struct { metaBase *meta.DB // TODO: make metaBase of metabase type metaBaseIface metabase + + metaBaseOpenErr error } // Option represents Shard's constructor option. @@ -155,11 +157,9 @@ func New(opts ...Option) *Shard { return s } -// WithID returns option to set the default shard identifier. -func WithID(id common.ID) Option { - return func(c *cfg) { - c.info.ID = id - } +// ID returns Shard identifier. +func (s *Shard) ID() common.ID { + return s.info.ID } // WithBlobstor provides storage. diff --git a/pkg/local_object_storage/shard/shard_internal_test.go b/pkg/local_object_storage/shard/shard_internal_test.go index b2b83324b1..d8113d0488 100644 --- a/pkg/local_object_storage/shard/shard_internal_test.go +++ b/pkg/local_object_storage/shard/shard_internal_test.go @@ -15,7 +15,6 @@ import ( cid "github.com/nspcc-dev/neofs-sdk-go/container/id" "github.com/nspcc-dev/neofs-sdk-go/object" oid "github.com/nspcc-dev/neofs-sdk-go/object/id" - "go.uber.org/zap" ) func newSimpleTestShard(_ testing.TB, bs common.Storage, mb metabase, wc writecache.Cache) *Shard { @@ -247,7 +246,7 @@ func (unimplementedBLOBStore) Open(bool) error { panic("unimplemented") } -func (unimplementedBLOBStore) Init() error { +func (unimplementedBLOBStore) Init(common.ID) error { panic("unimplemented") } @@ -267,18 +266,10 @@ func (unimplementedBLOBStore) ShardID() common.ID { panic("unimplemented") } -func (unimplementedBLOBStore) SetLogger(*zap.Logger) { - panic("unimplemented") -} - func (unimplementedBLOBStore) SetCompressor(*compression.Config) { panic("unimplemented") } -func (unimplementedBLOBStore) SetShardID(common.ID) { - panic("unimplemented") -} - func (unimplementedBLOBStore) GetBytes(oid.Address) ([]byte, error) { panic("unimplemented") } @@ -377,14 +368,6 @@ func (unimplementedWriteCache) SetMode(mode.Mode) error { panic("unimplemented") } -func (unimplementedWriteCache) SetLogger(*zap.Logger) { - panic("unimplemented") -} - -func (unimplementedWriteCache) SetShardIDMetrics(string) { - panic("unimplemented") -} - func (unimplementedWriteCache) DumpInfo() writecache.Info { panic("unimplemented") } @@ -393,7 +376,7 @@ func (unimplementedWriteCache) Flush(bool) error { panic("unimplemented") } -func (unimplementedWriteCache) Init() error { +func (unimplementedWriteCache) Init(common.ID) error { panic("unimplemented") } diff --git a/pkg/local_object_storage/shard/shard_test.go b/pkg/local_object_storage/shard/shard_test.go index 5d51d899c1..f1fe667885 100644 --- a/pkg/local_object_storage/shard/shard_test.go +++ b/pkg/local_object_storage/shard/shard_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" meta "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/metabase" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard" @@ -56,14 +55,7 @@ func _newShardWithFSTree(t testing.TB, rootPath string, enableWriteCache bool, w fstree.WithPath(filepath.Join(rootPath, "fstree")), ) - newShardID := func() common.ID { - id, err := common.NewID() - require.NoError(t, err) - return id - } - opts := append([]shard.Option{ - shard.WithID(newShardID()), shard.WithLogger(zap.L()), shard.WithMetaBaseOptions( meta.WithPath(filepath.Join(rootPath, "meta")), diff --git a/pkg/local_object_storage/writecache/flush_test.go b/pkg/local_object_storage/writecache/flush_test.go index c02bc9ccd2..6eca2712c0 100644 --- a/pkg/local_object_storage/writecache/flush_test.go +++ b/pkg/local_object_storage/writecache/flush_test.go @@ -170,7 +170,7 @@ func TestFlushPerformance(t *testing.T) { require.NoError(t, storageSetMode(s, mode.ReadWrite)) require.NoError(t, wc.Open(false)) - require.NoError(t, wc.Init()) + require.NoError(t, wc.Init(common.ID{})) start := time.Now() waitForFlush(t, wc, objects) duration := time.Since(start) @@ -211,7 +211,7 @@ func TestFlushErrorRetry(t *testing.T) { s.SetCompressor(comp) require.NoError(t, s.Open(false)) - require.NoError(t, s.Init()) + require.NoError(t, s.Init(common.ID{})) logger, logBuf := testutil.NewBufferedLogger(t, zap.DebugLevel) wc := New(WithPath(filepath.Join(dir, "writecache")), @@ -219,7 +219,7 @@ func TestFlushErrorRetry(t *testing.T) { WithFlushWorkersCount(workerCount), WithLogger(logger)) require.NoError(t, wc.Open(false)) - require.NoError(t, wc.Init()) + require.NoError(t, wc.Init(common.ID{})) defer wc.Close() @@ -251,8 +251,10 @@ func TestFlushErrorRetry(t *testing.T) { Level: zap.WarnLevel, Message: "flush scheduler paused due to error", Fields: map[string]any{ - "component": "WriteCache", - "delay": json.Number("10"), + "component": "WriteCache", + "delay": json.Number("10"), + "shard_id": "", + "substorage": wcStorageType, }, }) logBuf.AssertContainsMsg(zap.ErrorLevel, "can't flush objects") @@ -282,7 +284,7 @@ func TestFlushScheduler(t *testing.T) { require.NoError(t, storageSetMode(s, mode.ReadWrite)) require.NoError(t, wc.Open(false)) - require.NoError(t, wc.Init()) + require.NoError(t, wc.Init(common.ID{})) waitForFlush(t, wc, objects) @@ -356,7 +358,7 @@ func storageSetMode(s common.Storage, m mode.Mode) error { err := s.Close() if err == nil { if err = s.Open(m.ReadOnly()); err == nil { - err = s.Init() + err = s.Init(common.ID{}) } } return err @@ -381,7 +383,7 @@ func (m *ModeAwareStorage) SetMode(newMode mode.Mode) error { err := m.Close() if err == nil { if err = m.Open(newMode.ReadOnly()); err == nil { - err = m.Init() + err = m.Init(common.ID{}) } } diff --git a/pkg/local_object_storage/writecache/generic_test.go b/pkg/local_object_storage/writecache/generic_test.go index 7d30d64d9c..bc72907cdc 100644 --- a/pkg/local_object_storage/writecache/generic_test.go +++ b/pkg/local_object_storage/writecache/generic_test.go @@ -44,7 +44,7 @@ func newCache(tb testing.TB, opts ...Option) (Cache, common.Storage) { fsTree.SetCompressor(comp) require.NoError(tb, fsTree.Open(false)) - require.NoError(tb, fsTree.Init()) + require.NoError(tb, fsTree.Init(common.ID{})) modeAwareStorage := NewModeAwareStorage(fsTree) @@ -54,7 +54,7 @@ func newCache(tb testing.TB, opts ...Option) (Cache, common.Storage) { WithStorage(modeAwareStorage), }, opts...)...) require.NoError(tb, wc.Open(false)) - require.NoError(tb, wc.Init()) + require.NoError(tb, wc.Init(common.ID{})) return wc, modeAwareStorage } diff --git a/pkg/local_object_storage/writecache/storage.go b/pkg/local_object_storage/writecache/storage.go index f156164e70..8f7e1225ab 100644 --- a/pkg/local_object_storage/writecache/storage.go +++ b/pkg/local_object_storage/writecache/storage.go @@ -4,7 +4,6 @@ import ( "fmt" "os" - "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "github.com/nspcc-dev/neofs-node/pkg/util" ) @@ -16,20 +15,13 @@ func (c *cache) openStore(readOnly bool) error { } c.fsTree = fstree.New( + fstree.WithLogger(c.log), fstree.WithPath(c.path), fstree.WithPerm(os.ModePerm), fstree.WithDepth(1), + fstree.WithSubtype(fstree.SubtypeWriteCache), fstree.WithNoSync(c.noSync), fstree.WithCombinedCountLimit(1)) - c.fsTree.SetLogger(c.log) - var id common.ID - if c.metrics.id != "" { - id, err = common.DecodeIDString(c.metrics.id) - if err != nil { - return fmt.Errorf("decode shard ID: %w", err) - } - } - c.fsTree.SetShardID(id) if err := c.fsTree.Open(readOnly); err != nil { return fmt.Errorf("could not open FSTree: %w", err) } diff --git a/pkg/local_object_storage/writecache/writecache.go b/pkg/local_object_storage/writecache/writecache.go index 4d779f5e28..2b6c5fa24c 100644 --- a/pkg/local_object_storage/writecache/writecache.go +++ b/pkg/local_object_storage/writecache/writecache.go @@ -5,6 +5,7 @@ import ( "io" "sync" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/fstree" "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/shard/mode" "github.com/nspcc-dev/neofs-sdk-go/object" @@ -41,12 +42,10 @@ type Cache interface { Iterate(func(oid.Address, []byte) error, bool) error Put(oid.Address, *object.Object, []byte) error SetMode(mode.Mode) error - SetLogger(*zap.Logger) - SetShardIDMetrics(string) DumpInfo() Info Flush(bool) error - Init() error + Init(common.ID) error Open(readOnly bool) error Close() error ObjectStatus(address oid.Address) (ObjectStatus, error) @@ -108,16 +107,6 @@ func New(opts ...Option) Cache { return c } -// SetLogger sets logger. It is used after the shard ID was generated to use it in logs. -func (c *cache) SetLogger(l *zap.Logger) { - c.log = l.With(zap.String("substorage", wcStorageType)) -} - -// SetShardIDMetrics sets shard id for metrics. It is used after the shard ID was generated. -func (c *cache) SetShardIDMetrics(id string) { - c.metrics.id = id -} - func (c *cache) DumpInfo() Info { return Info{ Path: c.path, @@ -147,8 +136,14 @@ func (c *cache) Open(readOnly bool) error { } // Init runs necessary services. No-op in read-only mode. -func (c *cache) Init() error { - err := c.fsTree.Init() +func (c *cache) Init(id common.ID) error { + c.metrics.id = id.String() + c.log = c.log.With( + zap.String("substorage", wcStorageType), + zap.Stringer("shard_id", id), + ) + + err := c.fsTree.Init(id) if err != nil { return fmt.Errorf("init FSTree: %w", err) } diff --git a/pkg/local_object_storage/writecache/writecache_test.go b/pkg/local_object_storage/writecache/writecache_test.go index f38e846833..7608d4d9f9 100644 --- a/pkg/local_object_storage/writecache/writecache_test.go +++ b/pkg/local_object_storage/writecache/writecache_test.go @@ -3,6 +3,7 @@ package writecache import ( "testing" + "github.com/nspcc-dev/neofs-node/pkg/local_object_storage/blobstor/common" objecttest "github.com/nspcc-dev/neofs-sdk-go/object/test" "github.com/stretchr/testify/require" ) @@ -24,6 +25,6 @@ func TestCache_InitReadOnly(t *testing.T) { t.Cleanup(func() { wc.Close() }) - err = wc.Init() + err = wc.Init(common.ID{}) require.NoError(t, err) } diff --git a/pkg/services/object/server_test.go b/pkg/services/object/server_test.go index 13c1b5b8d0..845da645f3 100644 --- a/pkg/services/object/server_test.go +++ b/pkg/services/object/server_test.go @@ -747,7 +747,6 @@ func newSimpleStorage(t *testing.T, fsChain FSChain) *engine.StorageEngine { ) require.NoError(t, err) - require.NoError(t, storage.Open()) require.NoError(t, storage.Init()) t.Cleanup(func() { storage.Close() })