diff --git a/internal/cache/tiered.go b/internal/cache/tiered.go index 8f48a43..cac028e 100644 --- a/internal/cache/tiered.go +++ b/internal/cache/tiered.go @@ -108,6 +108,9 @@ func (t Tiered) Stat(ctx context.Context, key Key) (http.Header, error) { } // Open returns a reader from the first cache that succeeds. +// When a higher tier hits but lower tiers missed, the returned reader +// transparently backfills the lowest tier as the caller reads, so that +// subsequent Opens are served locally. // // If all caches fail, all errors are returned. func (t Tiered) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, error) { @@ -120,11 +123,73 @@ func (t Tiered) Open(ctx context.Context, key Key) (io.ReadCloser, http.Header, } else if err != nil { return nil, nil, errors.WithStack(err) } + if i > 0 { + r = t.backfillReader(ctx, key, r, headers, t.caches[0]) + } return r, headers, nil } return nil, nil, errors.Join(errs...) } +// backfillReader wraps src so that every byte read is also written to dst. +// On successful close the dst entry becomes available for future reads. +// On error or partial read the dst entry is discarded per the Cache contract +// (the context is cancelled, causing the writer to discard on Close). +func (t Tiered) backfillReader(ctx context.Context, key Key, src io.ReadCloser, headers http.Header, dst Cache) io.ReadCloser { + logger := logging.FromContext(ctx) + // Use a cancellable context so we can abort the write on failure. + // The Cache contract guarantees that cancelled-context writes are discarded. + writeCtx, cancel := context.WithCancel(ctx) + w, err := dst.Create(writeCtx, key, headers, 0) // 0 → use the cache's max TTL + if err != nil { + cancel() + logger.WarnContext(ctx, "Tier backfill: failed to create writer, skipping", + "error", err.Error()) + return src + } + return &backfillReadCloser{src: src, dst: w, ctx: ctx, cancel: cancel} +} + +// backfillReadCloser tees reads from src into dst. If the full stream is +// consumed and Close completes without error, dst is closed normally +// (committing the cached entry). On any write failure the backfill is +// abandoned but reads continue unaffected. +type backfillReadCloser struct { + src io.ReadCloser + dst io.WriteCloser + ctx context.Context + cancel context.CancelFunc + failed bool +} + +func (b *backfillReadCloser) Read(p []byte) (int, error) { + n, err := b.src.Read(p) + if n > 0 && !b.failed { + if _, wErr := b.dst.Write(p[:n]); wErr != nil { + logging.FromContext(b.ctx).WarnContext(b.ctx, "Tier backfill: write failed, abandoning", + "error", wErr.Error()) + b.failed = true + b.cancel() + } + } + return n, err //nolint:wrapcheck // must return unwrapped io.EOF per io.Reader contract +} + +func (b *backfillReadCloser) Close() error { + srcErr := b.src.Close() + if b.failed || srcErr != nil { + b.cancel() + _ = b.dst.Close() + return errors.WithStack(srcErr) + } + if err := b.dst.Close(); err != nil { + logging.FromContext(b.ctx).WarnContext(b.ctx, "Tier backfill: close failed", + "error", err.Error()) + } + b.cancel() + return nil +} + func (t Tiered) String() string { names := make([]string, len(t.caches)) for i, c := range t.caches { diff --git a/internal/cache/tiered_test.go b/internal/cache/tiered_test.go index d6ae52f..f6a37cc 100644 --- a/internal/cache/tiered_test.go +++ b/internal/cache/tiered_test.go @@ -1,6 +1,7 @@ package cache_test import ( + "io" "log/slog" "os" "testing" @@ -24,6 +25,46 @@ func TestTieredCache(t *testing.T) { }) } +func TestTieredBackfill(t *testing.T) { + _, ctx := logging.Configure(t.Context(), logging.Config{}) + + memory, err := cache.NewMemory(ctx, cache.MemoryConfig{LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + disk, err := cache.NewDisk(ctx, cache.DiskConfig{Root: t.TempDir(), LimitMB: 1024, MaxTTL: time.Hour}) + assert.NoError(t, err) + tiered := cache.MaybeNewTiered(ctx, []cache.Cache{memory, disk}) + + key := cache.NewKey("backfill-test") + content := []byte("hello backfill") + + // Write only to disk (tier 1), simulating S3 having data but memory/disk-L1 not. + w, err := disk.Create(ctx, key, nil, time.Hour) + assert.NoError(t, err) + _, err = w.Write(content) + assert.NoError(t, err) + assert.NoError(t, w.Close()) + + // Verify memory (tier 0) does not have it yet. + _, _, err = memory.Open(ctx, key) + assert.IsError(t, err, os.ErrNotExist) + + // Open through tiered — should hit disk and backfill memory. + r, _, err := tiered.Open(ctx, key) + assert.NoError(t, err) + data, err := io.ReadAll(r) + assert.NoError(t, err) + assert.NoError(t, r.Close()) + assert.Equal(t, content, data) + + // Now memory (tier 0) should have the entry. + r2, _, err := memory.Open(ctx, key) + assert.NoError(t, err) + data2, err := io.ReadAll(r2) + assert.NoError(t, err) + assert.NoError(t, r2.Close()) + assert.Equal(t, content, data2) +} + func TestTieredCacheSoak(t *testing.T) { if os.Getenv("SOAK_TEST") == "" { t.Skip("Skipping soak test; set SOAK_TEST=1 to run")