Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Fixed

- Avoid evicting yet to be processed heights [#3204](https://github.com/evstack/ev-node/pull/3204)
- Refetch latest da height instead of da height +1 when P2P is offline [#3201](https://github.com/evstack/ev-node/pull/3201)
- Fix race on startup sync. [#3162](https://github.com/evstack/ev-node/pull/3162)
- Strict raft state. [#3167](https://github.com/evstack/ev-node/pull/3167)
Expand Down
220 changes: 79 additions & 141 deletions block/internal/cache/generic_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,11 @@ import (
"sync"
"sync/atomic"

lru "github.com/hashicorp/golang-lru/v2"
ds "github.com/ipfs/go-datastore"

"github.com/evstack/ev-node/pkg/store"
)

const (
// DefaultItemsCacheSize is the default size for items cache.
DefaultItemsCacheSize = 200_000

// DefaultHashesCacheSize is the default size for hash tracking.
DefaultHashesCacheSize = 200_000

// DefaultDAIncludedCacheSize is the default size for DA inclusion tracking.
DefaultDAIncludedCacheSize = 200_000
)

// snapshotEntry is one record in the persisted snapshot.
// Encoded as 16 bytes: [blockHeight uint64 LE][daHeight uint64 LE].
type snapshotEntry struct {
Expand All @@ -34,142 +22,98 @@ type snapshotEntry struct {

const snapshotEntrySize = 16 // bytes per snapshotEntry

// Cache tracks seen blocks and DA inclusion status using bounded LRU caches.
type Cache[T any] struct {
// itemsByHeight stores items keyed by uint64 height.
// Mutex needed for atomic get-and-remove in getNextItem.
itemsByHeight *lru.Cache[uint64, *T]
itemsByHeightMu sync.Mutex
// Cache tracks seen blocks and DA inclusion status.
type Cache struct {
mu sync.Mutex

// hashes tracks whether a given hash has been seen
hashes *lru.Cache[string, bool]
hashes map[string]bool
daIncluded map[string]uint64
hashByHeight map[uint64]string
Comment on lines +29 to +31
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Split the height index by responsibility.

hashByHeight is doing two incompatible jobs: cleanup for every seen hash and lookup for the DA-included hash at a height. Once two hashes share a block height, later writes overwrite earlier ones, so getDAIncludedByHeight can follow the wrong hash and deleteAllForHeight can only purge one of them. With the LRU gone, those superseded hashes stop aging out. Keep a DA-specific height index, and track seen hashes per height separately.

Also applies to: 61-65, 81-89, 95-103, 128-137

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@block/internal/cache/generic_cache.go` around lines 29 - 31, hashByHeight is
mixing two responsibilities (tracking every seen hash per height for cleanup and
storing the single DA-included hash), causing overwrites and stale entries;
split it into a DA-specific index (e.g., daHashByHeight map[uint64]string) and a
seen-index (e.g., seenByHeight map[uint64]map[string]bool) and update code paths
that touch hashByHeight: when adding a hash record, insert into
seenByHeight[height][hash]=true; when marking DA inclusion update daIncluded and
daHashByHeight[height]=hash; change getDAIncludedByHeight to consult
daHashByHeight/daIncluded instead of hashByHeight; change deleteAllForHeight to
iterate seenByHeight[height] to remove all hashes and their entries from
hashes/daIncluded/daHashByHeight, and remove empty maps to avoid leaks; update
any uses of hashByHeight in functions add/delete/get (including the code around
getDAIncludedByHeight and deleteAllForHeight) to use the new structures
accordingly.

maxDAHeight *atomic.Uint64

// daIncluded maps hash → daHeight. Hash may be a real content hash or a
// height placeholder (see HeightPlaceholderKey) immediately after restore.
daIncluded *lru.Cache[string, uint64]

// hashByHeight maps blockHeight → hash, used for pruning and height-based
// lookups. Protected by hashByHeightMu only in deleteAllForHeight where a
// read-then-remove must be atomic.
hashByHeight *lru.Cache[uint64, string]
hashByHeightMu sync.Mutex

// maxDAHeight tracks the maximum DA height seen
maxDAHeight *atomic.Uint64

store store.Store // nil = ephemeral, no persistence
// storeKeyPrefix is the prefix used for store keys
store store.Store
storeKeyPrefix string
}

func (c *Cache[T]) snapshotKey() string {
func (c *Cache) snapshotKey() string {
return c.storeKeyPrefix + "__snap"
}

// NewCache creates a Cache. When store and keyPrefix are set, mutations
// persist a snapshot so RestoreFromStore can recover in-flight state.
func NewCache[T any](s store.Store, keyPrefix string) *Cache[T] {
// LRU cache creation only fails if size <= 0, which won't happen with our defaults
itemsCache, _ := lru.New[uint64, *T](DefaultItemsCacheSize)
hashesCache, _ := lru.New[string, bool](DefaultHashesCacheSize)
daIncludedCache, _ := lru.New[string, uint64](DefaultDAIncludedCacheSize)
hashByHeightCache, _ := lru.New[uint64, string](DefaultHashesCacheSize)

return &Cache[T]{
itemsByHeight: itemsCache,
hashes: hashesCache,
daIncluded: daIncludedCache,
hashByHeight: hashByHeightCache,
func NewCache(s store.Store, keyPrefix string) *Cache {
return &Cache{
hashes: make(map[string]bool),
daIncluded: make(map[string]uint64),
hashByHeight: make(map[uint64]string),
maxDAHeight: &atomic.Uint64{},
store: s,
storeKeyPrefix: keyPrefix,
}
}

// getItem returns an item from the cache by height.
func (c *Cache[T]) getItem(height uint64) *T {
item, ok := c.itemsByHeight.Get(height)
if !ok {
return nil
}
return item
}

// setItem sets an item in the cache by height.
func (c *Cache[T]) setItem(height uint64, item *T) {
c.itemsByHeight.Add(height, item)
}

// getNextItem returns and removes the item at height, or nil if absent.
func (c *Cache[T]) getNextItem(height uint64) *T {
c.itemsByHeightMu.Lock()
defer c.itemsByHeightMu.Unlock()

item, ok := c.itemsByHeight.Get(height)
if !ok {
return nil
}
c.itemsByHeight.Remove(height)
return item
}

// itemCount returns the number of items currently stored by height.
func (c *Cache[T]) itemCount() int {
return c.itemsByHeight.Len()
func (c *Cache) isSeen(hash string) bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.hashes[hash]
}

// isSeen returns true if the hash has been seen.
func (c *Cache[T]) isSeen(hash string) bool {
seen, ok := c.hashes.Get(hash)
return ok && seen
func (c *Cache) setSeen(hash string, height uint64) {
c.mu.Lock()
defer c.mu.Unlock()
c.hashes[hash] = true
c.hashByHeight[height] = hash
}

// setSeen sets the hash as seen and tracks its height for pruning.
func (c *Cache[T]) setSeen(hash string, height uint64) {
c.hashes.Add(hash, true)
c.hashByHeight.Add(height, hash)
func (c *Cache) removeSeen(hash string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.hashes, hash)
}

// getDAIncluded returns the DA height if the hash has been DA-included.
func (c *Cache[T]) getDAIncluded(daCommitmentHash string) (uint64, bool) {
return c.daIncluded.Get(daCommitmentHash)
func (c *Cache) getDAIncluded(hash string) (uint64, bool) {
c.mu.Lock()
defer c.mu.Unlock()
v, ok := c.daIncluded[hash]
return v, ok
}

// getDAIncludedByHeight resolves DA height via the height→hash index.
// Works for both real hashes (steady state) and snapshot placeholders
// (post-restart, before the DA retriever re-fires the real hash).
func (c *Cache[T]) getDAIncludedByHeight(blockHeight uint64) (uint64, bool) {
hash, ok := c.hashByHeight.Get(blockHeight)
func (c *Cache) getDAIncludedByHeight(blockHeight uint64) (uint64, bool) {
c.mu.Lock()
defer c.mu.Unlock()
hash, ok := c.hashByHeight[blockHeight]
if !ok {
return 0, false
}
return c.getDAIncluded(hash)
v, exists := c.daIncluded[hash]
return v, exists
}

// setDAIncluded records DA inclusion in memory.
// If a previous entry already exists at blockHeight (e.g. a placeholder from
// RestoreFromStore), it is evicted from daIncluded to avoid orphan leaks.
func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) {
if prev, ok := c.hashByHeight.Get(blockHeight); ok && prev != hash {
c.daIncluded.Remove(prev)
func (c *Cache) setDAIncluded(hash string, daHeight uint64, blockHeight uint64) {
c.mu.Lock()
defer c.mu.Unlock()
if prev, ok := c.hashByHeight[blockHeight]; ok && prev != hash {
delete(c.daIncluded, prev)
}
c.daIncluded.Add(hash, daHeight)
c.hashByHeight.Add(blockHeight, hash)
c.daIncluded[hash] = daHeight
c.hashByHeight[blockHeight] = hash
c.setMaxDAHeight(daHeight)
}

// removeDAIncluded removes the DA-included status of the hash from the cache.
func (c *Cache[T]) removeDAIncluded(hash string) {
c.daIncluded.Remove(hash)
func (c *Cache) removeDAIncluded(hash string) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.daIncluded, hash)
}

// daHeight returns the maximum DA height from all DA-included items.
func (c *Cache[T]) daHeight() uint64 {
func (c *Cache) daHeight() uint64 {
return c.maxDAHeight.Load()
}

// setMaxDAHeight sets the maximum DA height if the provided value is greater.
func (c *Cache[T]) setMaxDAHeight(daHeight uint64) {
func (c *Cache) setMaxDAHeight(daHeight uint64) {
for range 1_000 {
current := c.maxDAHeight.Load()
if daHeight <= current {
Expand All @@ -181,49 +125,41 @@ func (c *Cache[T]) setMaxDAHeight(daHeight uint64) {
}
}

// removeSeen removes a hash from the seen cache.
func (c *Cache[T]) removeSeen(hash string) {
c.hashes.Remove(hash)
}

// deleteAllForHeight removes all items and their associated data from the
// cache at the given height.
func (c *Cache[T]) deleteAllForHeight(height uint64) {
c.itemsByHeight.Remove(height)

c.hashByHeightMu.Lock()
hash, ok := c.hashByHeight.Get(height)
if ok {
c.hashByHeight.Remove(height)
func (c *Cache) deleteAllForHeight(height uint64) {
c.mu.Lock()
defer c.mu.Unlock()
hash, ok := c.hashByHeight[height]
if !ok {
return
}
c.hashByHeightMu.Unlock()
delete(c.hashByHeight, height)
delete(c.hashes, hash)
delete(c.daIncluded, hash)
}

if ok {
c.hashes.Remove(hash)
c.daIncluded.Remove(hash)
}
func (c *Cache) daIncludedLen() int {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.daIncluded)
}

// persistSnapshot writes all current in-flight [blockHeight, daHeight] pairs to the store under a single key.
// Only called explicitly via SaveToStore. NEVER CALL IT ON HOT-PATH TO AVOID BAGER WRITE AMPLIFICATION.
func (c *Cache[T]) persistSnapshot(ctx context.Context) error {
func (c *Cache) persistSnapshot(ctx context.Context) error {
if c.store == nil || c.storeKeyPrefix == "" {
return nil
}

heights := c.hashByHeight.Keys()
entries := make([]snapshotEntry, 0, len(heights))
for _, h := range heights {
hash, ok := c.hashByHeight.Peek(h)
if !ok {
continue
}
daH, ok := c.daIncluded.Peek(hash)
c.mu.Lock()
entries := make([]snapshotEntry, 0, len(c.hashByHeight))
for h, hash := range c.hashByHeight {
daH, ok := c.daIncluded[hash]
if !ok {
continue
}
entries = append(entries, snapshotEntry{blockHeight: h, daHeight: daH})
}
c.mu.Unlock()

return c.store.SetMetadata(ctx, c.snapshotKey(), encodeSnapshot(entries))
}
Expand Down Expand Up @@ -257,8 +193,7 @@ func decodeSnapshot(buf []byte) []snapshotEntry {
// RestoreFromStore loads the in-flight snapshot with a single store read.
// Each entry is installed as a height placeholder; real hashes replace them
// once the DA retriever re-fires SetHeaderDAIncluded after startup.
// Missing snapshot key is treated as a no-op (fresh node or pre-snapshot version).
func (c *Cache[T]) RestoreFromStore(ctx context.Context) error {
func (c *Cache) RestoreFromStore(ctx context.Context) error {
if c.store == nil || c.storeKeyPrefix == "" {
return nil
}
Expand All @@ -271,10 +206,13 @@ func (c *Cache[T]) RestoreFromStore(ctx context.Context) error {
return fmt.Errorf("reading cache snapshot from store: %w", err)
}

c.mu.Lock()
defer c.mu.Unlock()

for _, e := range decodeSnapshot(buf) {
placeholder := HeightPlaceholderKey(c.storeKeyPrefix, e.blockHeight)
c.daIncluded.Add(placeholder, e.daHeight)
c.hashByHeight.Add(e.blockHeight, placeholder)
c.daIncluded[placeholder] = e.daHeight
c.hashByHeight[e.blockHeight] = placeholder
c.setMaxDAHeight(e.daHeight)
}

Expand All @@ -297,7 +235,7 @@ func HeightPlaceholderKey(prefix string, height uint64) string {
}

// SaveToStore flushes the current snapshot to the store.
func (c *Cache[T]) SaveToStore(ctx context.Context) error {
func (c *Cache) SaveToStore(ctx context.Context) error {
if c.store == nil {
return nil
}
Expand All @@ -308,7 +246,7 @@ func (c *Cache[T]) SaveToStore(ctx context.Context) error {
}

// ClearFromStore deletes the snapshot key from the store.
func (c *Cache[T]) ClearFromStore(ctx context.Context) error {
func (c *Cache) ClearFromStore(ctx context.Context) error {
if c.store == nil {
return nil
}
Expand Down
Loading
Loading