diff --git a/Justfile b/Justfile index d66ffe8..7e33d38 100644 --- a/Justfile +++ b/Justfile @@ -18,7 +18,7 @@ _help: # Run tests test: - @gotestsum --hide-summary output,skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 30s + @gotestsum --hide-summary output,skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 60s # Lint code lint: diff --git a/internal/snapshot/snapshot.go b/internal/snapshot/snapshot.go index 6ebfedd..8486a3e 100644 --- a/internal/snapshot/snapshot.go +++ b/internal/snapshot/snapshot.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "fmt" + "io" "net/http" "os" "os/exec" @@ -91,6 +92,64 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st return errors.Join(errs...) } +// StreamTo archives a directory using tar with zstd compression and streams the +// output directly to w. Unlike Create, it does not upload to any cache backend. +// This is used on cache miss to serve the client immediately while a background +// job populates the cache. +func StreamTo(ctx context.Context, w io.Writer, directory string, excludePatterns []string, threads int) error { + if threads <= 0 { + threads = runtime.NumCPU() + } + + if info, err := os.Stat(directory); err != nil { + return errors.Wrap(err, "failed to stat directory") + } else if !info.IsDir() { + return errors.Errorf("not a directory: %s", directory) + } + + tarArgs := []string{"-cpf", "-", "-C", directory} + for _, pattern := range excludePatterns { + tarArgs = append(tarArgs, "--exclude", pattern) + } + tarArgs = append(tarArgs, ".") + + tarCmd := exec.CommandContext(ctx, "tar", tarArgs...) + zstdCmd := exec.CommandContext(ctx, "zstd", "-c", fmt.Sprintf("-T%d", threads)) //nolint:gosec // threads is a validated integer, not user input + + tarStdout, err := tarCmd.StdoutPipe() + if err != nil { + return errors.Wrap(err, "failed to create tar stdout pipe") + } + + var tarStderr, zstdStderr bytes.Buffer + tarCmd.Stderr = &tarStderr + + zstdCmd.Stdin = tarStdout + zstdCmd.Stdout = w + zstdCmd.Stderr = &zstdStderr + + if err := tarCmd.Start(); err != nil { + return errors.Wrap(err, "failed to start tar") + } + + if err := zstdCmd.Start(); err != nil { + return errors.Join(errors.Wrap(err, "failed to start zstd"), tarCmd.Wait()) + } + + tarErr := tarCmd.Wait() + zstdErr := zstdCmd.Wait() + + var errs []error + if tarErr != nil { + errs = append(errs, errors.Errorf("tar failed: %w: %s", tarErr, tarStderr.String())) + } + if zstdErr != nil { + errs = append(errs, errors.Errorf("zstd failed: %w: %s", zstdErr, zstdStderr.String())) + } + + return errors.Join(errs...) +} + // Restore downloads an archive from the cache and extracts it to a directory. // // The archive is decompressed with zstd and extracted with tar, preserving diff --git a/internal/strategy/git/git.go b/internal/strategy/git/git.go index 0a69513..575b930 100644 --- a/internal/strategy/git/git.go +++ b/internal/strategy/git/git.go @@ -43,17 +43,18 @@ type Config struct { } type Strategy struct { - config Config - cache cache.Cache - cloneManager *gitclone.Manager - httpClient *http.Client - proxy *httputil.ReverseProxy - ctx context.Context - scheduler jobscheduler.Scheduler - spoolsMu sync.Mutex - spools map[string]*RepoSpools - tokenManager *githubapp.TokenManager - snapshotMu sync.Map // keyed by upstream URL, values are *sync.Mutex + config Config + cache cache.Cache + cloneManager *gitclone.Manager + httpClient *http.Client + proxy *httputil.ReverseProxy + ctx context.Context + scheduler jobscheduler.Scheduler + spoolsMu sync.Mutex + spools map[string]*RepoSpools + tokenManager *githubapp.TokenManager + snapshotMu sync.Map // keyed by upstream URL, values are *sync.Mutex + snapshotSpools sync.Map // keyed by upstream URL, values are *snapshotSpoolEntry } func New( @@ -93,7 +94,7 @@ func New( if err != nil { return nil, errors.Wrap(err, "failed to create clone manager") } - for _, dir := range []string{".spools", ".snapshots"} { + for _, dir := range []string{".spools", ".snapshots", ".snapshot-spools"} { if err := os.RemoveAll(filepath.Join(cloneManager.Config().MirrorRoot, dir)); err != nil { return nil, errors.Wrapf(err, "clean up stale %s", dir) } diff --git a/internal/strategy/git/snapshot.go b/internal/strategy/git/snapshot.go index ab1ac24..eb687f7 100644 --- a/internal/strategy/git/snapshot.go +++ b/internal/strategy/git/snapshot.go @@ -2,6 +2,7 @@ package git import ( "context" + "fmt" "io" "log/slog" "net/http" @@ -46,6 +47,29 @@ func (s *Strategy) remoteURLForSnapshot(upstream string) string { return s.config.ServerURL + "/git/" + repoPath } +// cloneForSnapshot clones the mirror into destDir under repo's read lock, +// then fixes the remote URL to point through cachew (or upstream). +func (s *Strategy) cloneForSnapshot(ctx context.Context, repo *gitclone.Repository, destDir string) error { + if err := repo.WithReadLock(func() error { + // #nosec G204 - repo.Path() and destDir are controlled by us + cmd := exec.CommandContext(ctx, "git", "clone", repo.Path(), destDir) + if output, err := cmd.CombinedOutput(); err != nil { + return errors.Wrapf(err, "git clone for snapshot: %s", string(output)) + } + + // git clone from a local path sets remote.origin.url to that path; restore it. + // #nosec G204 - remoteURL is derived from controlled inputs + cmd = exec.CommandContext(ctx, "git", "-C", destDir, "remote", "set-url", "origin", s.remoteURLForSnapshot(repo.UpstreamURL())) + if output, err := cmd.CombinedOutput(); err != nil { + return errors.Wrapf(err, "fix snapshot remote URL: %s", string(output)) + } + return nil + }); err != nil { + return errors.WithStack(err) + } + return nil +} + func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone.Repository) error { logger := logging.FromContext(ctx) upstream := repo.UpstreamURL() @@ -71,24 +95,9 @@ func (s *Strategy) generateAndUploadSnapshot(ctx context.Context, repo *gitclone return errors.Wrap(err, "create snapshot parent dir") } - // Hold a read lock to exclude concurrent fetches while cloning. - if err := repo.WithReadLock(func() error { - // #nosec G204 - repo.Path() and snapshotDir are controlled by us - cmd := exec.CommandContext(ctx, "git", "clone", repo.Path(), snapshotDir) - if output, err := cmd.CombinedOutput(); err != nil { - return errors.Wrapf(err, "git clone for snapshot: %s", string(output)) - } - - // git clone from a local path sets remote.origin.url to that path; restore it. - // #nosec G204 - remoteURL is derived from controlled inputs - cmd = exec.CommandContext(ctx, "git", "-C", snapshotDir, "remote", "set-url", "origin", s.remoteURLForSnapshot(upstream)) - if output, err := cmd.CombinedOutput(); err != nil { - return errors.Wrapf(err, "fix snapshot remote URL: %s", string(output)) - } - return nil - }); err != nil { + if err := s.cloneForSnapshot(ctx, repo, snapshotDir); err != nil { _ = os.RemoveAll(snapshotDir) //nolint:gosec // snapshotDir is derived from controlled mirrorRoot + upstream URL - return errors.WithStack(err) + return err } cacheKey := snapshotCacheKey(upstream) @@ -150,25 +159,10 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, http.Error(w, "Repository unavailable", http.StatusServiceUnavailable) return } - if genErr := s.generateAndUploadSnapshot(ctx, repo); genErr != nil { - logger.ErrorContext(ctx, "On-demand snapshot generation failed", - slog.String("upstream", upstreamURL), - slog.String("error", genErr.Error())) - http.Error(w, "Internal server error", http.StatusInternalServerError) - return - } - if s.config.SnapshotInterval > 0 { - s.scheduleSnapshotJobs(repo) - } - reader, headers, err = s.cache.Open(ctx, cacheKey) + s.serveSnapshotWithSpool(w, r, repo, upstreamURL) + return } if err != nil { - if errors.Is(err, os.ErrNotExist) { - logger.DebugContext(ctx, "Snapshot not found in cache", - slog.String("upstream", upstreamURL)) - http.NotFound(w, r) - return - } logger.ErrorContext(ctx, "Failed to open snapshot from cache", slog.String("upstream", upstreamURL), slog.String("error", err.Error())) @@ -188,3 +182,196 @@ func (s *Strategy) handleSnapshotRequest(w http.ResponseWriter, r *http.Request, slog.String("error", err.Error())) } } + +// serveSnapshotWithSpool handles snapshot cache misses using the spool pattern. +// The first request for a given upstream URL becomes the writer: it clones the +// mirror, streams tar+zstd to both the HTTP client and a spool file, then +// triggers a background cache backfill. Concurrent requests for the same URL +// become readers that follow the spool, avoiding redundant clone+tar work. +func (s *Strategy) serveSnapshotWithSpool(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, upstreamURL string) { + ctx := r.Context() + logger := logging.FromContext(ctx) + + // Use LoadOrStore with a sentinel to atomically elect a single writer. + // The first goroutine stores an empty snapshotSpoolEntry and becomes the + // writer. Concurrent goroutines see the existing entry and wait for the + // spool to be published via the ready channel. + entry := &snapshotSpoolEntry{ready: make(chan struct{})} + if existing, loaded := s.snapshotSpools.LoadOrStore(upstreamURL, entry); loaded { + winner := existing.(*snapshotSpoolEntry) + <-winner.ready + if spool := winner.spool; spool != nil && !spool.Failed() { + logger.DebugContext(ctx, "Serving snapshot from spool", + slog.String("upstream", upstreamURL)) + if err := spool.ServeTo(w); err != nil { + if errors.Is(err, ErrSpoolFailed) { + logger.DebugContext(ctx, "Snapshot spool failed before headers, falling back to direct stream", + slog.String("upstream", upstreamURL)) + s.streamSnapshotDirect(w, r, repo, upstreamURL) + return + } + logger.WarnContext(ctx, "Snapshot spool read error", + slog.String("upstream", upstreamURL), + slog.String("error", err.Error())) + } + return + } + // Writer failed; fall through to generate independently. + s.streamSnapshotDirect(w, r, repo, upstreamURL) + return + } + + s.writeSnapshotSpool(w, r, repo, upstreamURL, entry) +} + +// streamSnapshotDirect streams a snapshot directly to the client without +// spooling. Used as a fallback when the spool writer failed. +func (s *Strategy) streamSnapshotDirect(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, upstreamURL string) { + ctx := r.Context() + logger := logging.FromContext(ctx) + mirrorRoot := s.cloneManager.Config().MirrorRoot + + snapshotDir, err := os.MkdirTemp(mirrorRoot, ".snapshot-stream-*") + if err != nil { + logger.ErrorContext(ctx, "Failed to create temp snapshot dir", + slog.String("upstream", upstreamURL), + slog.String("error", err.Error())) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + defer func() { _ = os.RemoveAll(snapshotDir) }() + + repoDir := filepath.Join(snapshotDir, "repo") + if err := s.cloneForSnapshot(ctx, repo, repoDir); err != nil { + logger.ErrorContext(ctx, "Failed to clone for snapshot streaming", + slog.String("upstream", upstreamURL), + slog.String("error", err.Error())) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + + w.Header().Set("Content-Type", "application/zstd") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(repoDir)+".tar.zst")) + + excludePatterns := []string{"*.lock"} + if err := snapshot.StreamTo(ctx, w, repoDir, excludePatterns, s.config.ZstdThreads); err != nil { + logger.ErrorContext(ctx, "Failed to stream snapshot to client", + slog.String("upstream", upstreamURL), + slog.String("error", err.Error())) + } +} + +// prepareSnapshotSpool creates the spool and clones the mirror into a temp directory, +// publishing the spool to waiting readers via the entry's ready channel. On failure +// it signals readers and returns an error. +func (s *Strategy) prepareSnapshotSpool(ctx context.Context, repo *gitclone.Repository, upstreamURL string, entry *snapshotSpoolEntry) (spool *ResponseSpool, spoolDir, repoDir string, err error) { + mirrorRoot := s.cloneManager.Config().MirrorRoot + + spoolDir, err = snapshotSpoolDirForURL(mirrorRoot, upstreamURL) + if err != nil { + close(entry.ready) + s.snapshotSpools.Delete(upstreamURL) + return nil, "", "", err + } + + spool, err = NewResponseSpool(filepath.Join(spoolDir, "snapshot.spool")) + if err != nil { + close(entry.ready) + s.snapshotSpools.Delete(upstreamURL) + return nil, "", "", err + } + entry.spool = spool + close(entry.ready) + + snapshotDir, err := os.MkdirTemp(mirrorRoot, ".snapshot-stream-*") + if err != nil { + err = errors.Wrap(err, "create temp snapshot dir") + spool.MarkError(err) + s.snapshotSpools.Delete(upstreamURL) + return nil, "", "", err + } + + repoDir = filepath.Join(snapshotDir, "repo") + if err := s.cloneForSnapshot(ctx, repo, repoDir); err != nil { + spool.MarkError(err) + s.snapshotSpools.Delete(upstreamURL) + _ = os.RemoveAll(snapshotDir) + return nil, "", "", err + } + + return spool, spoolDir, repoDir, nil +} + +// writeSnapshotSpool is the writer path for snapshot spooling. It creates a +// spool, clones the mirror, streams the tar+zstd output through a SpoolTeeWriter, +// and triggers a background cache backfill. +func (s *Strategy) writeSnapshotSpool(w http.ResponseWriter, r *http.Request, repo *gitclone.Repository, upstreamURL string, entry *snapshotSpoolEntry) { + ctx := r.Context() + logger := logging.FromContext(ctx) + + spool, spoolDir, repoDir, err := s.prepareSnapshotSpool(ctx, repo, upstreamURL, entry) + if err != nil { + logger.ErrorContext(ctx, "Failed to prepare snapshot spool", + slog.String("upstream", upstreamURL), + slog.String("error", err.Error())) + http.Error(w, "Internal server error", http.StatusInternalServerError) + return + } + snapshotDir := filepath.Dir(repoDir) + + w.Header().Set("Content-Type", "application/zstd") + w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%q", filepath.Base(repoDir)+".tar.zst")) + + tw := NewSpoolTeeWriter(w, spool) + excludePatterns := []string{"*.lock"} + if err := snapshot.StreamTo(ctx, tw, repoDir, excludePatterns, s.config.ZstdThreads); err != nil { + logger.ErrorContext(ctx, "Failed to stream snapshot to client", + slog.String("upstream", upstreamURL), + slog.String("error", err.Error())) + spool.MarkError(err) + } else { + spool.MarkComplete() + } + + go func() { + spool.WaitForReaders() + s.snapshotSpools.Delete(upstreamURL) + _ = os.RemoveAll(spoolDir) + _ = os.RemoveAll(snapshotDir) + }() + + go func() { + mu := s.snapshotMutexFor(upstreamURL) + if !mu.TryLock() { + return + } + mu.Unlock() + bgCtx := context.WithoutCancel(ctx) + if err := s.generateAndUploadSnapshot(bgCtx, repo); err != nil { + logger.ErrorContext(bgCtx, "Background cache upload failed", + slog.String("upstream", upstreamURL), + slog.String("error", err.Error())) + } + }() + + if s.config.SnapshotInterval > 0 { + s.scheduleSnapshotJobs(repo) + } +} + +// snapshotSpoolEntry holds a spool and a ready channel used to coordinate +// writer election. The first goroutine stores the entry via LoadOrStore and +// becomes the writer. It closes ready once the spool is created (or on +// failure with spool == nil) so waiting readers can proceed. +type snapshotSpoolEntry struct { + spool *ResponseSpool + ready chan struct{} +} + +func snapshotSpoolDirForURL(mirrorRoot, upstreamURL string) (string, error) { + repoPath, err := gitclone.RepoPathFromURL(upstreamURL) + if err != nil { + return "", errors.Wrap(err, "resolve snapshot spool directory") + } + return filepath.Join(mirrorRoot, ".snapshot-spools", repoPath), nil +} diff --git a/internal/strategy/git/snapshot_test.go b/internal/strategy/git/snapshot_test.go index 5feb693..2ef67db 100644 --- a/internal/strategy/git/snapshot_test.go +++ b/internal/strategy/git/snapshot_test.go @@ -114,6 +114,10 @@ func TestSnapshotOnDemandGenerationViaHTTP(t *testing.T) { assert.Equal(t, 200, w.Code) assert.Equal(t, "application/zstd", w.Header().Get("Content-Type")) assert.NotZero(t, w.Body.Len()) + + // Allow background goroutines (spool cleanup, cache backfill) to finish + // before TempDir cleanup runs. + time.Sleep(2 * time.Second) } // createTestMirrorRepo creates a bare mirror-style repo at mirrorPath with one commit.