diff --git a/CHANGELOG.md b/CHANGELOG.md index a8724961f..8f2764f39 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Fixed +- Avoid evicting yet to be processed heights [#3204](https://github.com/evstack/ev-node/pull/3204) - Refetch latest da height instead of da height +1 when P2P is offline [#3201](https://github.com/evstack/ev-node/pull/3201) - Fix race on startup sync. [#3162](https://github.com/evstack/ev-node/pull/3162) - Strict raft state. [#3167](https://github.com/evstack/ev-node/pull/3167) diff --git a/block/internal/cache/generic_cache.go b/block/internal/cache/generic_cache.go index c04098704..d014062aa 100644 --- a/block/internal/cache/generic_cache.go +++ b/block/internal/cache/generic_cache.go @@ -8,23 +8,11 @@ import ( "sync" "sync/atomic" - lru "github.com/hashicorp/golang-lru/v2" ds "github.com/ipfs/go-datastore" "github.com/evstack/ev-node/pkg/store" ) -const ( - // DefaultItemsCacheSize is the default size for items cache. - DefaultItemsCacheSize = 200_000 - - // DefaultHashesCacheSize is the default size for hash tracking. - DefaultHashesCacheSize = 200_000 - - // DefaultDAIncludedCacheSize is the default size for DA inclusion tracking. - DefaultDAIncludedCacheSize = 200_000 -) - // snapshotEntry is one record in the persisted snapshot. // Encoded as 16 bytes: [blockHeight uint64 LE][daHeight uint64 LE]. type snapshotEntry struct { @@ -34,142 +22,98 @@ type snapshotEntry struct { const snapshotEntrySize = 16 // bytes per snapshotEntry -// Cache tracks seen blocks and DA inclusion status using bounded LRU caches. -type Cache[T any] struct { - // itemsByHeight stores items keyed by uint64 height. - // Mutex needed for atomic get-and-remove in getNextItem. - itemsByHeight *lru.Cache[uint64, *T] - itemsByHeightMu sync.Mutex +// Cache tracks seen blocks and DA inclusion status. +type Cache struct { + mu sync.Mutex - // hashes tracks whether a given hash has been seen - hashes *lru.Cache[string, bool] + hashes map[string]bool + daIncluded map[string]uint64 + hashByHeight map[uint64]string + maxDAHeight *atomic.Uint64 - // daIncluded maps hash → daHeight. Hash may be a real content hash or a - // height placeholder (see HeightPlaceholderKey) immediately after restore. - daIncluded *lru.Cache[string, uint64] - - // hashByHeight maps blockHeight → hash, used for pruning and height-based - // lookups. Protected by hashByHeightMu only in deleteAllForHeight where a - // read-then-remove must be atomic. - hashByHeight *lru.Cache[uint64, string] - hashByHeightMu sync.Mutex - - // maxDAHeight tracks the maximum DA height seen - maxDAHeight *atomic.Uint64 - - store store.Store // nil = ephemeral, no persistence - // storeKeyPrefix is the prefix used for store keys + store store.Store storeKeyPrefix string } -func (c *Cache[T]) snapshotKey() string { +func (c *Cache) snapshotKey() string { return c.storeKeyPrefix + "__snap" } // NewCache creates a Cache. When store and keyPrefix are set, mutations // persist a snapshot so RestoreFromStore can recover in-flight state. -func NewCache[T any](s store.Store, keyPrefix string) *Cache[T] { - // LRU cache creation only fails if size <= 0, which won't happen with our defaults - itemsCache, _ := lru.New[uint64, *T](DefaultItemsCacheSize) - hashesCache, _ := lru.New[string, bool](DefaultHashesCacheSize) - daIncludedCache, _ := lru.New[string, uint64](DefaultDAIncludedCacheSize) - hashByHeightCache, _ := lru.New[uint64, string](DefaultHashesCacheSize) - - return &Cache[T]{ - itemsByHeight: itemsCache, - hashes: hashesCache, - daIncluded: daIncludedCache, - hashByHeight: hashByHeightCache, +func NewCache(s store.Store, keyPrefix string) *Cache { + return &Cache{ + hashes: make(map[string]bool), + daIncluded: make(map[string]uint64), + hashByHeight: make(map[uint64]string), maxDAHeight: &atomic.Uint64{}, store: s, storeKeyPrefix: keyPrefix, } } -// getItem returns an item from the cache by height. -func (c *Cache[T]) getItem(height uint64) *T { - item, ok := c.itemsByHeight.Get(height) - if !ok { - return nil - } - return item -} - -// setItem sets an item in the cache by height. -func (c *Cache[T]) setItem(height uint64, item *T) { - c.itemsByHeight.Add(height, item) -} - -// getNextItem returns and removes the item at height, or nil if absent. -func (c *Cache[T]) getNextItem(height uint64) *T { - c.itemsByHeightMu.Lock() - defer c.itemsByHeightMu.Unlock() - - item, ok := c.itemsByHeight.Get(height) - if !ok { - return nil - } - c.itemsByHeight.Remove(height) - return item -} - -// itemCount returns the number of items currently stored by height. -func (c *Cache[T]) itemCount() int { - return c.itemsByHeight.Len() +func (c *Cache) isSeen(hash string) bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.hashes[hash] } -// isSeen returns true if the hash has been seen. -func (c *Cache[T]) isSeen(hash string) bool { - seen, ok := c.hashes.Get(hash) - return ok && seen +func (c *Cache) setSeen(hash string, height uint64) { + c.mu.Lock() + defer c.mu.Unlock() + c.hashes[hash] = true + c.hashByHeight[height] = hash } -// setSeen sets the hash as seen and tracks its height for pruning. -func (c *Cache[T]) setSeen(hash string, height uint64) { - c.hashes.Add(hash, true) - c.hashByHeight.Add(height, hash) +func (c *Cache) removeSeen(hash string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.hashes, hash) } -// getDAIncluded returns the DA height if the hash has been DA-included. -func (c *Cache[T]) getDAIncluded(daCommitmentHash string) (uint64, bool) { - return c.daIncluded.Get(daCommitmentHash) +func (c *Cache) getDAIncluded(hash string) (uint64, bool) { + c.mu.Lock() + defer c.mu.Unlock() + v, ok := c.daIncluded[hash] + return v, ok } -// getDAIncludedByHeight resolves DA height via the height→hash index. -// Works for both real hashes (steady state) and snapshot placeholders -// (post-restart, before the DA retriever re-fires the real hash). -func (c *Cache[T]) getDAIncludedByHeight(blockHeight uint64) (uint64, bool) { - hash, ok := c.hashByHeight.Get(blockHeight) +func (c *Cache) getDAIncludedByHeight(blockHeight uint64) (uint64, bool) { + c.mu.Lock() + defer c.mu.Unlock() + hash, ok := c.hashByHeight[blockHeight] if !ok { return 0, false } - return c.getDAIncluded(hash) + v, exists := c.daIncluded[hash] + return v, exists } // setDAIncluded records DA inclusion in memory. // If a previous entry already exists at blockHeight (e.g. a placeholder from // RestoreFromStore), it is evicted from daIncluded to avoid orphan leaks. -func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) { - if prev, ok := c.hashByHeight.Get(blockHeight); ok && prev != hash { - c.daIncluded.Remove(prev) +func (c *Cache) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) { + c.mu.Lock() + defer c.mu.Unlock() + if prev, ok := c.hashByHeight[blockHeight]; ok && prev != hash { + delete(c.daIncluded, prev) } - c.daIncluded.Add(hash, daHeight) - c.hashByHeight.Add(blockHeight, hash) + c.daIncluded[hash] = daHeight + c.hashByHeight[blockHeight] = hash c.setMaxDAHeight(daHeight) } -// removeDAIncluded removes the DA-included status of the hash from the cache. -func (c *Cache[T]) removeDAIncluded(hash string) { - c.daIncluded.Remove(hash) +func (c *Cache) removeDAIncluded(hash string) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.daIncluded, hash) } -// daHeight returns the maximum DA height from all DA-included items. -func (c *Cache[T]) daHeight() uint64 { +func (c *Cache) daHeight() uint64 { return c.maxDAHeight.Load() } -// setMaxDAHeight sets the maximum DA height if the provided value is greater. -func (c *Cache[T]) setMaxDAHeight(daHeight uint64) { +func (c *Cache) setMaxDAHeight(daHeight uint64) { for range 1_000 { current := c.maxDAHeight.Load() if daHeight <= current { @@ -181,49 +125,41 @@ func (c *Cache[T]) setMaxDAHeight(daHeight uint64) { } } -// removeSeen removes a hash from the seen cache. -func (c *Cache[T]) removeSeen(hash string) { - c.hashes.Remove(hash) -} - -// deleteAllForHeight removes all items and their associated data from the -// cache at the given height. -func (c *Cache[T]) deleteAllForHeight(height uint64) { - c.itemsByHeight.Remove(height) - - c.hashByHeightMu.Lock() - hash, ok := c.hashByHeight.Get(height) - if ok { - c.hashByHeight.Remove(height) +func (c *Cache) deleteAllForHeight(height uint64) { + c.mu.Lock() + defer c.mu.Unlock() + hash, ok := c.hashByHeight[height] + if !ok { + return } - c.hashByHeightMu.Unlock() + delete(c.hashByHeight, height) + delete(c.hashes, hash) + delete(c.daIncluded, hash) +} - if ok { - c.hashes.Remove(hash) - c.daIncluded.Remove(hash) - } +func (c *Cache) daIncludedLen() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.daIncluded) } // persistSnapshot writes all current in-flight [blockHeight, daHeight] pairs to the store under a single key. // Only called explicitly via SaveToStore. NEVER CALL IT ON HOT-PATH TO AVOID BAGER WRITE AMPLIFICATION. -func (c *Cache[T]) persistSnapshot(ctx context.Context) error { +func (c *Cache) persistSnapshot(ctx context.Context) error { if c.store == nil || c.storeKeyPrefix == "" { return nil } - heights := c.hashByHeight.Keys() - entries := make([]snapshotEntry, 0, len(heights)) - for _, h := range heights { - hash, ok := c.hashByHeight.Peek(h) - if !ok { - continue - } - daH, ok := c.daIncluded.Peek(hash) + c.mu.Lock() + entries := make([]snapshotEntry, 0, len(c.hashByHeight)) + for h, hash := range c.hashByHeight { + daH, ok := c.daIncluded[hash] if !ok { continue } entries = append(entries, snapshotEntry{blockHeight: h, daHeight: daH}) } + c.mu.Unlock() return c.store.SetMetadata(ctx, c.snapshotKey(), encodeSnapshot(entries)) } @@ -257,8 +193,7 @@ func decodeSnapshot(buf []byte) []snapshotEntry { // RestoreFromStore loads the in-flight snapshot with a single store read. // Each entry is installed as a height placeholder; real hashes replace them // once the DA retriever re-fires SetHeaderDAIncluded after startup. -// Missing snapshot key is treated as a no-op (fresh node or pre-snapshot version). -func (c *Cache[T]) RestoreFromStore(ctx context.Context) error { +func (c *Cache) RestoreFromStore(ctx context.Context) error { if c.store == nil || c.storeKeyPrefix == "" { return nil } @@ -271,10 +206,13 @@ func (c *Cache[T]) RestoreFromStore(ctx context.Context) error { return fmt.Errorf("reading cache snapshot from store: %w", err) } + c.mu.Lock() + defer c.mu.Unlock() + for _, e := range decodeSnapshot(buf) { placeholder := HeightPlaceholderKey(c.storeKeyPrefix, e.blockHeight) - c.daIncluded.Add(placeholder, e.daHeight) - c.hashByHeight.Add(e.blockHeight, placeholder) + c.daIncluded[placeholder] = e.daHeight + c.hashByHeight[e.blockHeight] = placeholder c.setMaxDAHeight(e.daHeight) } @@ -297,7 +235,7 @@ func HeightPlaceholderKey(prefix string, height uint64) string { } // SaveToStore flushes the current snapshot to the store. -func (c *Cache[T]) SaveToStore(ctx context.Context) error { +func (c *Cache) SaveToStore(ctx context.Context) error { if c.store == nil { return nil } @@ -308,7 +246,7 @@ func (c *Cache[T]) SaveToStore(ctx context.Context) error { } // ClearFromStore deletes the snapshot key from the store. -func (c *Cache[T]) ClearFromStore(ctx context.Context) error { +func (c *Cache) ClearFromStore(ctx context.Context) error { if c.store == nil { return nil } diff --git a/block/internal/cache/generic_cache_test.go b/block/internal/cache/generic_cache_test.go index a1d36b487..d3766fc43 100644 --- a/block/internal/cache/generic_cache_test.go +++ b/block/internal/cache/generic_cache_test.go @@ -10,8 +10,6 @@ import ( pkgstore "github.com/evstack/ev-node/pkg/store" ) -type testItem struct{ V int } - // testMemStore creates an in-memory store for testing. func testMemStore(t *testing.T) pkgstore.Store { t.Helper() @@ -21,9 +19,7 @@ func testMemStore(t *testing.T) pkgstore.Store { } // writeSnapshot directly encodes and writes a snapshot into the store under -// the cache's snapshot key (storeKeyPrefix + "__snap"). This simulates the -// state that persistSnapshot would have written during a previous run, so that -// RestoreFromStore can recover from it. +// the cache's snapshot key (storeKeyPrefix + "__snap"). func writeSnapshot(t *testing.T, st pkgstore.Store, storeKeyPrefix string, entries []snapshotEntry) { t.Helper() buf := encodeSnapshot(entries) @@ -37,14 +33,14 @@ func writeSnapshot(t *testing.T, st pkgstore.Store, storeKeyPrefix string, entri // TestCache_MaxDAHeight verifies that daHeight tracks the maximum DA height // across successive setDAIncluded calls. func TestCache_MaxDAHeight(t *testing.T) { - c := NewCache[testItem](nil, "") + c := NewCache(nil, "") assert.Equal(t, uint64(0), c.daHeight(), "initial daHeight should be 0") c.setDAIncluded("hash1", 100, 1) assert.Equal(t, uint64(100), c.daHeight(), "after setDAIncluded(100)") - c.setDAIncluded("hash2", 50, 2) // lower, should not change max + c.setDAIncluded("hash2", 50, 2) assert.Equal(t, uint64(100), c.daHeight(), "after setDAIncluded(50)") c.setDAIncluded("hash3", 200, 3) @@ -52,7 +48,7 @@ func TestCache_MaxDAHeight(t *testing.T) { } // --------------------------------------------------------------------------- -// RestoreFromStore — O(1) snapshot-based recovery +// RestoreFromStore // --------------------------------------------------------------------------- // TestCache_RestoreFromStore_EmptyChain verifies that RestoreFromStore is a @@ -60,10 +56,10 @@ func TestCache_MaxDAHeight(t *testing.T) { func TestCache_RestoreFromStore_EmptyChain(t *testing.T) { st := testMemStore(t) - c := NewCache[testItem](st, "hdr/") + c := NewCache(st, "hdr/") require.NoError(t, c.RestoreFromStore(context.Background())) - assert.Equal(t, 0, c.daIncluded.Len(), "no entries expected on empty chain") + assert.Equal(t, 0, c.daIncludedLen(), "no entries expected on empty chain") assert.Equal(t, uint64(0), c.daHeight()) } @@ -78,10 +74,10 @@ func TestCache_RestoreFromStore_FullyFinalized(t *testing.T) { // empty (persistSnapshot writes an empty buf when daIncluded is empty). writeSnapshot(t, st, "hdr/", nil) - c := NewCache[testItem](st, "hdr/") + c := NewCache(st, "hdr/") require.NoError(t, c.RestoreFromStore(ctx)) - assert.Equal(t, 0, c.daIncluded.Len(), "no in-flight entries expected") + assert.Equal(t, 0, c.daIncludedLen(), "no in-flight entries expected") assert.Equal(t, uint64(0), c.daHeight(), "no in-flight entries means daHeight is 0") } @@ -91,30 +87,24 @@ func TestCache_RestoreFromStore_InFlightWindow(t *testing.T) { st := testMemStore(t) ctx := context.Background() - // Simulate two in-flight entries written by a previous run: heights 4 and 5. writeSnapshot(t, st, "hdr/", []snapshotEntry{ {blockHeight: 4, daHeight: 13}, {blockHeight: 5, daHeight: 14}, }) - c := NewCache[testItem](st, "hdr/") + c := NewCache(st, "hdr/") require.NoError(t, c.RestoreFromStore(ctx)) - assert.Equal(t, 2, c.daIncluded.Len(), "exactly the in-flight snapshot entries should be loaded") + assert.Equal(t, 2, c.daIncludedLen(), "exactly the in-flight snapshot entries should be loaded") assert.Equal(t, uint64(14), c.daHeight(), "maxDAHeight should reflect the highest in-flight DA height") - // Verify the placeholder keys are addressable by height via hashByHeight. - hash4, ok := c.hashByHeight.Get(4) - require.True(t, ok, "hashByHeight[4] should exist") - daH4, ok := c.daIncluded.Get(hash4) - require.True(t, ok) - assert.Equal(t, uint64(13), daH4) + hash4, ok := c.getDAIncludedByHeight(4) + require.True(t, ok, "height 4 should exist in daIncluded") + assert.Equal(t, uint64(13), hash4) - hash5, ok := c.hashByHeight.Get(5) - require.True(t, ok, "hashByHeight[5] should exist") - daH5, ok := c.daIncluded.Get(hash5) - require.True(t, ok) - assert.Equal(t, uint64(14), daH5) + hash5, ok := c.getDAIncludedByHeight(5) + require.True(t, ok, "height 5 should exist in daIncluded") + assert.Equal(t, uint64(14), hash5) } // TestCache_RestoreFromStore_SingleEntry verifies a snapshot with one in-flight @@ -127,24 +117,22 @@ func TestCache_RestoreFromStore_SingleEntry(t *testing.T) { {blockHeight: 3, daHeight: 20}, }) - c := NewCache[testItem](st, "hdr/") + c := NewCache(st, "hdr/") require.NoError(t, c.RestoreFromStore(ctx)) - assert.Equal(t, 1, c.daIncluded.Len(), "one entry should be in-flight") + assert.Equal(t, 1, c.daIncludedLen(), "one entry should be in-flight") assert.Equal(t, uint64(20), c.daHeight()) - _, ok := c.hashByHeight.Get(4) + _, ok := c.getDAIncludedByHeight(4) assert.False(t, ok, "height 4 was not in snapshot") - _, ok = c.hashByHeight.Get(5) + _, ok = c.getDAIncludedByHeight(5) assert.False(t, ok, "height 5 was not in snapshot") } -// TestCache_RestoreFromStore_NilStore verifies that RestoreFromStore is a -// no-op when the cache has no backing store. func TestCache_RestoreFromStore_NilStore(t *testing.T) { - c := NewCache[testItem](nil, "") + c := NewCache(nil, "") require.NoError(t, c.RestoreFromStore(context.Background())) - assert.Equal(t, 0, c.daIncluded.Len()) + assert.Equal(t, 0, c.daIncludedLen()) } // TestCache_RestoreFromStore_PlaceholderOverwrittenByRealHash @@ -154,28 +142,24 @@ func TestCache_RestoreFromStore_PlaceholderOverwrittenByRealHash(t *testing.T) { st := testMemStore(t) ctx := context.Background() - // Snapshot contains one in-flight entry for height 3. writeSnapshot(t, st, "hdr/", []snapshotEntry{ {blockHeight: 3, daHeight: 99}, }) - c := NewCache[testItem](st, "hdr/") + c := NewCache(st, "hdr/") require.NoError(t, c.RestoreFromStore(ctx)) - assert.Equal(t, 1, c.daIncluded.Len(), "one placeholder for height 3") + assert.Equal(t, 1, c.daIncludedLen(), "one placeholder for height 3") - // Simulate the DA submitter writing the real hash entry. c.setDAIncluded("realHash_height3", 99, 3) - // hashByHeight[3] now points to the new real hash. - newHash, ok := c.hashByHeight.Get(3) - require.True(t, ok) - assert.Equal(t, "realHash_height3", newHash) - - // The real entry must be queryable by its content hash. daH, ok := c.getDAIncluded("realHash_height3") require.True(t, ok) assert.Equal(t, uint64(99), daH) + + daH2, ok := c.getDAIncludedByHeight(3) + require.True(t, ok) + assert.Equal(t, uint64(99), daH2) } // TestCache_RestoreFromStore_RoundTrip verifies that SaveToStore persists a @@ -184,53 +168,40 @@ func TestCache_RestoreFromStore_RoundTrip(t *testing.T) { st := testMemStore(t) ctx := context.Background() - // First cache instance: write some in-flight entries, then flush (shutdown). - c1 := NewCache[testItem](st, "rt/") + c1 := NewCache(st, "rt/") c1.setDAIncluded("hashA", 10, 1) c1.setDAIncluded("hashB", 20, 2) c1.setDAIncluded("hashC", 30, 3) - // Remove one entry to confirm deletions are also snapshotted. c1.removeDAIncluded("hashB") require.NoError(t, c1.SaveToStore(ctx)) - // Second cache instance on same store: should recover {hashA→10, hashC→30}. - c2 := NewCache[testItem](st, "rt/") + c2 := NewCache(st, "rt/") require.NoError(t, c2.RestoreFromStore(ctx)) - assert.Equal(t, 2, c2.daIncluded.Len(), "only non-deleted entries should be restored") + assert.Equal(t, 2, c2.daIncludedLen(), "only non-deleted entries should be restored") assert.Equal(t, uint64(30), c2.daHeight()) - // Placeholder keys are created for heights 1 and 3 (height 2 was removed). - _, ok := c2.hashByHeight.Get(1) + _, ok := c2.getDAIncludedByHeight(1) assert.True(t, ok, "height 1 placeholder should exist") - _, ok = c2.hashByHeight.Get(2) + _, ok = c2.getDAIncludedByHeight(2) assert.False(t, ok, "height 2 was removed, should not exist") - _, ok = c2.hashByHeight.Get(3) + _, ok = c2.getDAIncludedByHeight(3) assert.True(t, ok, "height 3 placeholder should exist") } // --------------------------------------------------------------------------- -// Basic operations (no store required) +// Basic operations // --------------------------------------------------------------------------- func TestCache_BasicOperations(t *testing.T) { - c := NewCache[testItem](nil, "") - - // setItem / getItem - c.setItem(1, &testItem{V: 42}) - got := c.getItem(1) - require.NotNil(t, got) - assert.Equal(t, 42, got.V) - assert.Nil(t, c.getItem(999)) + c := NewCache(nil, "") - // setSeen / isSeen / removeSeen assert.False(t, c.isSeen("hash1")) c.setSeen("hash1", 1) assert.True(t, c.isSeen("hash1")) c.removeSeen("hash1") assert.False(t, c.isSeen("hash1")) - // setDAIncluded / getDAIncluded / removeDAIncluded _, ok := c.getDAIncluded("hash2") assert.False(t, ok) c.setDAIncluded("hash2", 100, 2) @@ -242,51 +213,22 @@ func TestCache_BasicOperations(t *testing.T) { assert.False(t, ok) } -func TestCache_GetNextItem(t *testing.T) { - c := NewCache[testItem](nil, "") - - c.setItem(1, &testItem{V: 1}) - c.setItem(2, &testItem{V: 2}) - c.setItem(3, &testItem{V: 3}) - - got := c.getNextItem(2) - require.NotNil(t, got) - assert.Equal(t, 2, got.V) - - // removed - assert.Nil(t, c.getNextItem(2)) - - // others intact - assert.NotNil(t, c.getItem(1)) - assert.NotNil(t, c.getItem(3)) -} - func TestCache_DeleteAllForHeight(t *testing.T) { - c := NewCache[testItem](nil, "") + c := NewCache(nil, "") - c.setItem(1, &testItem{V: 1}) - c.setItem(2, &testItem{V: 2}) c.setSeen("hash1", 1) c.setSeen("hash2", 2) c.deleteAllForHeight(1) - assert.Nil(t, c.getItem(1)) assert.False(t, c.isSeen("hash1")) - - assert.NotNil(t, c.getItem(2)) assert.True(t, c.isSeen("hash2")) } func TestCache_WithNilStore(t *testing.T) { - c := NewCache[testItem](nil, "") + c := NewCache(nil, "") require.NotNil(t, c) - c.setItem(1, &testItem{V: 1}) - got := c.getItem(1) - require.NotNil(t, got) - assert.Equal(t, 1, got.V) - c.setDAIncluded("hash1", 100, 1) daHeight, ok := c.getDAIncluded("hash1") assert.True(t, ok) @@ -301,19 +243,16 @@ func TestCache_SaveToStore(t *testing.T) { st := testMemStore(t) ctx := context.Background() - c := NewCache[testItem](st, "save-test/") + c := NewCache(st, "save-test/") c.setDAIncluded("hash1", 100, 1) c.setDAIncluded("hash2", 200, 2) require.NoError(t, c.SaveToStore(ctx)) - // SaveToStore rewrites the single snapshot key (storeKeyPrefix + "__snap"). - // Two entries × 16 bytes each = 32 bytes total. raw, err := st.GetMetadata(ctx, "save-test/__snap") require.NoError(t, err) assert.Len(t, raw, 2*snapshotEntrySize, "snapshot should contain 2 entries of 16 bytes each") - // The individual per-hash keys are NOT written by the snapshot design. _, err = st.GetMetadata(ctx, "save-test/hash1") assert.Error(t, err, "per-hash keys should not exist in the snapshot design") } @@ -322,12 +261,11 @@ func TestCache_ClearFromStore(t *testing.T) { st := testMemStore(t) ctx := context.Background() - c := NewCache[testItem](st, "clear-test/") + c := NewCache(st, "clear-test/") c.setDAIncluded("hash1", 100, 1) c.setDAIncluded("hash2", 200, 2) require.NoError(t, c.SaveToStore(ctx)) - // Verify the snapshot key was written before clearing. _, err := st.GetMetadata(ctx, "clear-test/__snap") require.NoError(t, err, "snapshot key should exist before ClearFromStore") @@ -337,26 +275,10 @@ func TestCache_ClearFromStore(t *testing.T) { assert.Error(t, err, "snapshot key should have been removed from store") } -// --------------------------------------------------------------------------- -// Large-dataset smoke test -// --------------------------------------------------------------------------- - -func TestCache_LargeDataset(t *testing.T) { - c := NewCache[testItem](nil, "") - const N = 20_000 - for i := N - 1; i >= 0; i-- { - c.setItem(uint64(i), &testItem{V: i}) - } - for i := 5000; i < 10000; i += 2 { - c.getNextItem(uint64(i)) - } -} - // --------------------------------------------------------------------------- // heightPlaceholderKey // --------------------------------------------------------------------------- -// TestHeightPlaceholderKey verifies the placeholder key format and uniqueness. func TestHeightPlaceholderKey(t *testing.T) { k0 := HeightPlaceholderKey("pfx/", 0) k1 := HeightPlaceholderKey("pfx/", 1) @@ -365,12 +287,10 @@ func TestHeightPlaceholderKey(t *testing.T) { assert.NotEqual(t, k0, k1) assert.NotEqual(t, k1, kMax) - // Must start with the provided prefix. assert.Contains(t, k0, "pfx/") assert.Contains(t, k1, "pfx/") assert.Contains(t, kMax, "pfx/") - // Different prefixes must not collide. assert.NotEqual(t, HeightPlaceholderKey("a/", 1), HeightPlaceholderKey("b/", 1)) } @@ -383,35 +303,28 @@ func TestCache_NoPlaceholderLeakAfterRefire(t *testing.T) { st := testMemStore(t) ctx := context.Background() - // Step 1: initial run — write a real hash for height 3, then flush (shutdown). - c1 := NewCache[testItem](st, "pfx/") + c1 := NewCache(st, "pfx/") c1.setDAIncluded("realHash3", 99, 3) require.NoError(t, c1.SaveToStore(ctx)) - // snapshot now contains [{blockHeight:3, daHeight:99}] - // Step 2: restart — placeholder installed for height 3. - c2 := NewCache[testItem](st, "pfx/") + c2 := NewCache(st, "pfx/") require.NoError(t, c2.RestoreFromStore(ctx)) placeholder := HeightPlaceholderKey("pfx/", 3) - _, placeholderPresent := c2.daIncluded.Get(placeholder) + _, placeholderPresent := c2.getDAIncluded(placeholder) require.True(t, placeholderPresent, "placeholder must be present immediately after restore") - assert.Equal(t, 1, c2.daIncluded.Len(), "only one entry expected before re-fire") + assert.Equal(t, 1, c2.daIncludedLen(), "only one entry expected before re-fire") - // Step 3: DA retriever re-fires with the real hash. c2.setDAIncluded("realHash3", 99, 3) - // The real hash must be present. daH, ok := c2.getDAIncluded("realHash3") require.True(t, ok, "real hash must be present after re-fire") assert.Equal(t, uint64(99), daH) - // The placeholder must be gone — no orphan leak. - _, placeholderPresent = c2.daIncluded.Get(placeholder) + _, placeholderPresent = c2.getDAIncluded(placeholder) assert.False(t, placeholderPresent, "placeholder must be evicted after real hash is written") - // Total entries must still be exactly one. - assert.Equal(t, 1, c2.daIncluded.Len(), "exactly one daIncluded entry after re-fire — no orphan") + assert.Equal(t, 1, c2.daIncludedLen(), "exactly one daIncluded entry after re-fire — no orphan") } // TestCache_RestartIdempotent verifies that multiple successive restarts all @@ -427,35 +340,54 @@ func TestCache_RestartIdempotent(t *testing.T) { const blockH = uint64(5) const daH = uint64(42) - // ── Run 1: normal operation, height 5 in-flight; flush at shutdown ─────── - c1 := NewCache[testItem](st, "pfx/") + c1 := NewCache(st, "pfx/") c1.setDAIncluded(realHash, daH, blockH) require.NoError(t, c1.SaveToStore(ctx)) - // snapshot: [{5, 42}] for restart := 1; restart <= 3; restart++ { - // ── Restart N: restore from snapshot - cR := NewCache[testItem](st, "pfx/") + cR := NewCache(st, "pfx/") require.NoError(t, cR.RestoreFromStore(ctx), "restart %d: RestoreFromStore", restart) - assert.Equal(t, 1, cR.daIncluded.Len(), "restart %d: one placeholder entry", restart) + assert.Equal(t, 1, cR.daIncludedLen(), "restart %d: one placeholder entry", restart) assert.Equal(t, daH, cR.daHeight(), "restart %d: daHeight correct", restart) - // Fallback lookup by height must work. gotDAH, ok := cR.getDAIncludedByHeight(blockH) require.True(t, ok, "restart %d: height-based lookup must succeed", restart) assert.Equal(t, daH, gotDAH, "restart %d: height-based DA height correct", restart) - // ── DA retriever re-fires with the real hash, then flushes (shutdown). cR.setDAIncluded(realHash, daH, blockH) require.NoError(t, cR.SaveToStore(ctx), "restart %d: SaveToStore", restart) - // After re-fire: real hash present, no orphan, snapshot updated. - _, realPresent := cR.daIncluded.Get(realHash) + _, realPresent := cR.getDAIncluded(realHash) assert.True(t, realPresent, "restart %d: real hash present after re-fire", restart) - assert.Equal(t, 1, cR.daIncluded.Len(), "restart %d: no orphan after re-fire", restart) - - // The snapshot written by SaveToStore must still encode the right data - // so the next restart can load it correctly. + assert.Equal(t, 1, cR.daIncludedLen(), "restart %d: no orphan after re-fire", restart) } } + +// --------------------------------------------------------------------------- +// Cleanup via deleteAllForHeight +// --------------------------------------------------------------------------- + +func TestCache_DeleteAllForHeight_CleansHashAndDA(t *testing.T) { + c := NewCache(nil, "") + + c.setDAIncluded("hash1", 100, 1) + c.setSeen("hash1", 1) + c.setDAIncluded("hash2", 200, 2) + c.setSeen("hash2", 2) + + assert.Equal(t, 2, c.daIncludedLen()) + assert.True(t, c.isSeen("hash1")) + assert.True(t, c.isSeen("hash2")) + + c.deleteAllForHeight(1) + + assert.Equal(t, 1, c.daIncludedLen()) + assert.False(t, c.isSeen("hash1")) + assert.True(t, c.isSeen("hash2")) + + _, ok := c.getDAIncludedByHeight(1) + assert.False(t, ok) + _, ok = c.getDAIncludedByHeight(2) + assert.True(t, ok) +} diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index 6e22816c3..6f1b9d9cf 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -89,24 +89,24 @@ type Manager interface { var _ Manager = (*implementation)(nil) type implementation struct { - headerCache *Cache[types.SignedHeader] - dataCache *Cache[types.Data] - txCache *Cache[struct{}] - txTimestamps *sync.Map // map[string]time.Time - pendingEventsCache *Cache[common.DAHeightEvent] - pendingHeaders *PendingHeaders - pendingData *PendingData - store store.Store - config config.Config - logger zerolog.Logger + headerCache *Cache + dataCache *Cache + txCache *Cache + txTimestamps *sync.Map // map[string]time.Time + pendingEvents map[uint64]*common.DAHeightEvent + pendingMu sync.Mutex + pendingHeaders *PendingHeaders + pendingData *PendingData + store store.Store + config config.Config + logger zerolog.Logger } // NewManager creates a new Manager, restoring or clearing persisted state as configured. func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manager, error) { - headerCache := NewCache[types.SignedHeader](st, HeaderDAIncludedPrefix) - dataCache := NewCache[types.Data](st, DataDAIncludedPrefix) - txCache := NewCache[struct{}](nil, "") - pendingEventsCache := NewCache[common.DAHeightEvent](nil, "") + headerCache := NewCache(st, HeaderDAIncludedPrefix) + dataCache := NewCache(st, DataDAIncludedPrefix) + txCache := NewCache(nil, "") pendingHeaders, err := NewPendingHeaders(st, logger) if err != nil { @@ -119,16 +119,16 @@ func NewManager(cfg config.Config, st store.Store, logger zerolog.Logger) (Manag } impl := &implementation{ - headerCache: headerCache, - dataCache: dataCache, - txCache: txCache, - txTimestamps: new(sync.Map), - pendingEventsCache: pendingEventsCache, - pendingHeaders: pendingHeaders, - pendingData: pendingData, - store: st, - config: cfg, - logger: logger, + headerCache: headerCache, + dataCache: dataCache, + txCache: txCache, + txTimestamps: new(sync.Map), + pendingEvents: make(map[uint64]*common.DAHeightEvent), + pendingHeaders: pendingHeaders, + pendingData: pendingData, + store: st, + config: cfg, + logger: logger, } if cfg.ClearCache { @@ -252,7 +252,9 @@ func (m *implementation) CleanupOldTxs(olderThan time.Duration) int { func (m *implementation) DeleteHeight(blockHeight uint64) { m.headerCache.deleteAllForHeight(blockHeight) m.dataCache.deleteAllForHeight(blockHeight) - m.pendingEventsCache.deleteAllForHeight(blockHeight) + m.pendingMu.Lock() + delete(m.pendingEvents, blockHeight) + m.pendingMu.Unlock() // Note: txCache is intentionally NOT deleted here because: // 1. Transactions are tracked by hash, not by block height (they use height 0) @@ -319,17 +321,27 @@ func (m *implementation) NumPendingData() uint64 { // SetPendingEvent sets the event at the specified height. func (m *implementation) SetPendingEvent(height uint64, event *common.DAHeightEvent) { - m.pendingEventsCache.setItem(height, event) + m.pendingMu.Lock() + m.pendingEvents[height] = event + m.pendingMu.Unlock() } func (m *implementation) PendingEventsCount() int { - return m.pendingEventsCache.itemCount() + m.pendingMu.Lock() + defer m.pendingMu.Unlock() + return len(m.pendingEvents) } // GetNextPendingEvent efficiently retrieves and removes the event at the specified height. // Returns nil if no event exists at that height. func (m *implementation) GetNextPendingEvent(height uint64) *common.DAHeightEvent { - return m.pendingEventsCache.getNextItem(height) + m.pendingMu.Lock() + item, ok := m.pendingEvents[height] + if ok { + delete(m.pendingEvents, height) + } + m.pendingMu.Unlock() + return item } // SaveToStore flushes the DA inclusion snapshot to the store. @@ -364,8 +376,8 @@ func (m *implementation) RestoreFromStore() error { m.initDAHeightFromStore(ctx) m.logger.Info(). - Int("header_entries", m.headerCache.daIncluded.Len()). - Int("data_entries", m.dataCache.daIncluded.Len()). + Int("header_entries", m.headerCache.daIncludedLen()). + Int("data_entries", m.dataCache.daIncludedLen()). Uint64("da_height", m.DaHeight()). Msg("restored DA inclusion cache from store") @@ -384,10 +396,10 @@ func (m *implementation) ClearFromStore() error { return fmt.Errorf("failed to clear data cache from store: %w", err) } - m.headerCache = NewCache[types.SignedHeader](m.store, HeaderDAIncludedPrefix) - m.dataCache = NewCache[types.Data](m.store, DataDAIncludedPrefix) - m.txCache = NewCache[struct{}](nil, "") - m.pendingEventsCache = NewCache[common.DAHeightEvent](nil, "") + m.headerCache = NewCache(m.store, HeaderDAIncludedPrefix) + m.dataCache = NewCache(m.store, DataDAIncludedPrefix) + m.txCache = NewCache(nil, "") + m.pendingEvents = make(map[uint64]*common.DAHeightEvent) // Initialize DA height from store metadata to ensure DaHeight() is never 0. m.initDAHeightFromStore(ctx) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 76105e71d..42841f89a 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -354,17 +354,25 @@ func (s *Syncer) initializeState() error { } // Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height. - // Use the DA height from the last executed block instead of the maximum from all blocks, - // because P2P-fetched heights may be lost on restart. - daHeight := max(s.genesis.DAStartHeight, min(state.DAHeight-1, 0)) - if state.LastBlockHeight > 0 { - if lastHeaderDA, ok := s.cache.GetHeaderDAIncludedByHeight(state.LastBlockHeight); ok { - daHeight = max(daHeight, lastHeaderDA) - } - if lastDataDA, ok := s.cache.GetDataDAIncludedByHeight(state.LastBlockHeight); ok { - daHeight = max(daHeight, lastDataDA) - } + // The cache's DaHeight() is initialized from store metadata, so it's always correct even after cache clear. + // Only use cache.DaHeight() when P2P is actively syncing (headerStore has higher height than current state). + daHeight := s.genesis.DAStartHeight + if state.DAHeight > s.genesis.DAStartHeight { + daHeight = max(daHeight, state.DAHeight-1) + } + if s.headerStore != nil && s.headerStore.Height() > state.LastBlockHeight { + daHeight = max(daHeight, s.cache.DaHeight()) } + + // dev mode for da start height + if startHeight := s.config.DA.StartHeight; startHeight > 0 { + s.logger.Info(). + Uint64("previous_da_start_height", daHeight). + Uint64("override_da_start_height", s.config.DA.StartHeight). + Msg("DA start height overridden by flag") + daHeight = startHeight + } + s.daRetrieverHeight.Store(daHeight) s.logger.Info(). diff --git a/pkg/config/config.go b/pkg/config/config.go index 1fc3bcb4b..e278aebba 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -89,6 +89,8 @@ const ( FlagDABatchMaxDelay = FlagPrefixEvnode + "da.batch_max_delay" // FlagDABatchMinItems is a flag for specifying the minimum batch items FlagDABatchMinItems = FlagPrefixEvnode + "da.batch_min_items" + // FlagDAStartHeight is a flag for forcing the DA retrieval height to start from a specific height + FlagDAStartHeight = FlagPrefixEvnode + "da.start_height" // P2P configuration flags @@ -237,6 +239,8 @@ type Config struct { // DAConfig contains all Data Availability configuration parameters type DAConfig struct { + StartHeight uint64 `mapstructure:"start_height" yaml:"-" comment:"Force DA retrieval to start from a specific height (0 for default)"` + Address string `mapstructure:"address" yaml:"address" comment:"Address of the data availability layer service (host:port). This is the endpoint where Rollkit will connect to submit and retrieve data."` AuthToken string `mapstructure:"auth_token" yaml:"auth_token" comment:"Authentication token for the data availability layer service. Required if the DA service needs authentication."` //nolint:gosec // this is ok. SubmitOptions string `mapstructure:"submit_options" yaml:"submit_options" comment:"Additional options passed to the DA layer when submitting data. Format depends on the specific DA implementation being used."` @@ -563,7 +567,7 @@ func AddFlags(cmd *cobra.Command) { }) // Add base flags - cmd.Flags().String(FlagDBPath, def.DBPath, "path for the node database") + cmd.Flags().String(FlagDBPath, def.DBPath, "path for for node database") cmd.Flags().Bool(FlagClearCache, def.ClearCache, "clear the cache") // Node configuration flags @@ -595,6 +599,8 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().Float64(FlagDABatchSizeThreshold, def.DA.BatchSizeThreshold, "batch size threshold as fraction of max blob size (0.0-1.0)") cmd.Flags().Duration(FlagDABatchMaxDelay, def.DA.BatchMaxDelay.Duration, "maximum time to wait before submitting a batch") cmd.Flags().Uint64(FlagDABatchMinItems, def.DA.BatchMinItems, "minimum number of items to accumulate before submission") + cmd.Flags().Uint64(FlagDAStartHeight, def.DA.StartHeight, "force DA retrieval to start from a specific height (0 for disabled)") + cmd.Flags().MarkHidden(FlagDAStartHeight) // P2P configuration flags cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 2ae2b0ef6..594114c77 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -128,7 +128,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagPruningInterval, DefaultConfig().Pruning.Interval.Duration) // Count the number of flags we're explicitly checking - expectedFlagCount := 77 // Update this number if you add more flag checks above + expectedFlagCount := 78 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0