Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ _help:

# Run tests
test:
@gotestsum --hide-summary output,skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 30s
@gotestsum --hide-summary output,skipped --format-hide-empty-pkg ${CI:+--format=github-actions} ./... ${CI:+--tags=integration} -race -timeout 60s

# Lint code
lint:
Expand Down
59 changes: 59 additions & 0 deletions internal/snapshot/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"net/http"
"os"
"os/exec"
Expand Down Expand Up @@ -91,6 +92,64 @@ func Create(ctx context.Context, remote cache.Cache, key cache.Key, directory st
return errors.Join(errs...)
}

// StreamTo archives a directory using tar with zstd compression and streams the
// output directly to w. Unlike Create, it does not upload to any cache backend.
// This is used on cache miss to serve the client immediately while a background
// job populates the cache.
func StreamTo(ctx context.Context, w io.Writer, directory string, excludePatterns []string, threads int) error {
if threads <= 0 {
threads = runtime.NumCPU()
}

if info, err := os.Stat(directory); err != nil {
return errors.Wrap(err, "failed to stat directory")
} else if !info.IsDir() {
return errors.Errorf("not a directory: %s", directory)
}

tarArgs := []string{"-cpf", "-", "-C", directory}
for _, pattern := range excludePatterns {
tarArgs = append(tarArgs, "--exclude", pattern)
}
tarArgs = append(tarArgs, ".")

tarCmd := exec.CommandContext(ctx, "tar", tarArgs...)
zstdCmd := exec.CommandContext(ctx, "zstd", "-c", fmt.Sprintf("-T%d", threads)) //nolint:gosec // threads is a validated integer, not user input

tarStdout, err := tarCmd.StdoutPipe()
if err != nil {
return errors.Wrap(err, "failed to create tar stdout pipe")
}

var tarStderr, zstdStderr bytes.Buffer
tarCmd.Stderr = &tarStderr

zstdCmd.Stdin = tarStdout
zstdCmd.Stdout = w
zstdCmd.Stderr = &zstdStderr

if err := tarCmd.Start(); err != nil {
return errors.Wrap(err, "failed to start tar")
}

if err := zstdCmd.Start(); err != nil {
return errors.Join(errors.Wrap(err, "failed to start zstd"), tarCmd.Wait())
}

tarErr := tarCmd.Wait()
zstdErr := zstdCmd.Wait()

var errs []error
if tarErr != nil {
errs = append(errs, errors.Errorf("tar failed: %w: %s", tarErr, tarStderr.String()))
}
if zstdErr != nil {
errs = append(errs, errors.Errorf("zstd failed: %w: %s", zstdErr, zstdStderr.String()))
}

return errors.Join(errs...)
}

// Restore downloads an archive from the cache and extracts it to a directory.
//
// The archive is decompressed with zstd and extracted with tar, preserving
Expand Down
25 changes: 13 additions & 12 deletions internal/strategy/git/git.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,18 @@ type Config struct {
}

type Strategy struct {
config Config
cache cache.Cache
cloneManager *gitclone.Manager
httpClient *http.Client
proxy *httputil.ReverseProxy
ctx context.Context
scheduler jobscheduler.Scheduler
spoolsMu sync.Mutex
spools map[string]*RepoSpools
tokenManager *githubapp.TokenManager
snapshotMu sync.Map // keyed by upstream URL, values are *sync.Mutex
config Config
cache cache.Cache
cloneManager *gitclone.Manager
httpClient *http.Client
proxy *httputil.ReverseProxy
ctx context.Context
scheduler jobscheduler.Scheduler
spoolsMu sync.Mutex
spools map[string]*RepoSpools
tokenManager *githubapp.TokenManager
snapshotMu sync.Map // keyed by upstream URL, values are *sync.Mutex
snapshotSpools sync.Map // keyed by upstream URL, values are *snapshotSpoolEntry
}

func New(
Expand Down Expand Up @@ -93,7 +94,7 @@ func New(
if err != nil {
return nil, errors.Wrap(err, "failed to create clone manager")
}
for _, dir := range []string{".spools", ".snapshots"} {
for _, dir := range []string{".spools", ".snapshots", ".snapshot-spools"} {
if err := os.RemoveAll(filepath.Join(cloneManager.Config().MirrorRoot, dir)); err != nil {
return nil, errors.Wrapf(err, "clean up stale %s", dir)
}
Expand Down
Loading