Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions internal/cache/tiered.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func (t Tiered) Stat(ctx context.Context, key Key) (http.Header, error) {
}

// Open returns a reader from the first cache that succeeds.
// When a higher tier hits but lower tiers missed, the returned reader
// transparently backfills the lowest tier as the caller reads, so that
// subsequent Opens are served locally.
//
// If all caches fail, all errors are returned.
func (t Tiered) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, error) {
Expand All @@ -120,11 +123,73 @@ func (t Tiered) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header,
} else if err != nil {
return nil, nil, errors.WithStack(err)
}
if i > 0 {
r = t.backfillReader(ctx, key, r, headers, t.caches[0])
}
return r, headers, nil
}
return nil, nil, errors.Join(errs...)
}

// backfillReader wraps src so that every byte read is also written to dst.
// On successful close the dst entry becomes available for future reads.
// On error or partial read the dst entry is discarded per the Cache contract
// (the context is cancelled, causing the writer to discard on Close).
func (t Tiered) backfillReader(ctx context.Context, key Key, src io.ReadCloser, headers http.Header, dst Cache) io.ReadCloser {
logger := logging.FromContext(ctx)
// Use a cancellable context so we can abort the write on failure.
// The Cache contract guarantees that cancelled-context writes are discarded.
writeCtx, cancel := context.WithCancel(ctx)
w, err := dst.Create(writeCtx, key, headers, 0) // 0 → use the cache's max TTL
if err != nil {
cancel()
logger.WarnContext(ctx, "Tier backfill: failed to create writer, skipping",
"error", err.Error())
return src
}
return &backfillReadCloser{src: src, dst: w, ctx: ctx, cancel: cancel}
}

// backfillReadCloser tees reads from src into dst. If the full stream is
// consumed and Close completes without error, dst is closed normally
// (committing the cached entry). On any write failure the backfill is
// abandoned but reads continue unaffected.
type backfillReadCloser struct {
src io.ReadCloser
dst io.WriteCloser
ctx context.Context
cancel context.CancelFunc
failed bool
}

func (b *backfillReadCloser) Read(p []byte) (int, error) {
n, err := b.src.Read(p)
if n > 0 && !b.failed {
if _, wErr := b.dst.Write(p[:n]); wErr != nil {
logging.FromContext(b.ctx).WarnContext(b.ctx, "Tier backfill: write failed, abandoning",
"error", wErr.Error())
b.failed = true
b.cancel()
}
}
return n, err //nolint:wrapcheck // must return unwrapped io.EOF per io.Reader contract
}

func (b *backfillReadCloser) Close() error {
srcErr := b.src.Close()
if b.failed || srcErr != nil {
b.cancel()
_ = b.dst.Close()
return errors.WithStack(srcErr)
}
if err := b.dst.Close(); err != nil {
logging.FromContext(b.ctx).WarnContext(b.ctx, "Tier backfill: close failed",
"error", err.Error())
}
b.cancel()
return nil
}

func (t Tiered) String() string {
names := make([]string, len(t.caches))
for i, c := range t.caches {
Expand Down
41 changes: 41 additions & 0 deletions internal/cache/tiered_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package cache_test

import (
"io"
"log/slog"
"os"
"testing"
Expand All @@ -24,6 +25,46 @@ func TestTieredCache(t *testing.T) {
})
}

func TestTieredBackfill(t *testing.T) {
_, ctx := logging.Configure(t.Context(), logging.Config{})

memory, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
disk, err := cache.NewDisk(ctx, cache.DiskConfig{Root: t.TempDir(), LimitMB: 1024, MaxTTL: time.Hour})
assert.NoError(t, err)
tiered := cache.MaybeNewTiered(ctx, []cache.Cache{memory, disk})

key := cache.NewKey("backfill-test")
content := []byte("hello backfill")

// Write only to disk (tier 1), simulating S3 having data but memory/disk-L1 not.
w, err := disk.Create(ctx, key, nil, time.Hour)
assert.NoError(t, err)
_, err = w.Write(content)
assert.NoError(t, err)
assert.NoError(t, w.Close())

// Verify memory (tier 0) does not have it yet.
_, _, err = memory.Open(ctx, key)
assert.IsError(t, err, os.ErrNotExist)

// Open through tiered — should hit disk and backfill memory.
r, _, err := tiered.Open(ctx, key)
assert.NoError(t, err)
data, err := io.ReadAll(r)
assert.NoError(t, err)
assert.NoError(t, r.Close())
assert.Equal(t, content, data)

// Now memory (tier 0) should have the entry.
r2, _, err := memory.Open(ctx, key)
assert.NoError(t, err)
data2, err := io.ReadAll(r2)
assert.NoError(t, err)
assert.NoError(t, r2.Close())
assert.Equal(t, content, data2)
}

func TestTieredCacheSoak(t *testing.T) {
if os.Getenv("SOAK_TEST") == "" {
t.Skip("Skipping soak test; set SOAK_TEST=1 to run")
Expand Down