Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions cmd/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,19 @@ func (c Cmd) Execute() (cmdErr error) {
return NewErrandsCmd(deps.UI, c.deployment()).Run()

case *RunErrandOpts:
director, deployment := c.directorAndDeployment()
sess, ok := c.session().(*SessionImpl)
if !ok {
return fmt.Errorf("internal error: expected *SessionImpl")
}
director, err := sess.Director()
c.panicIfErr(err)
deployment, err := sess.Deployment()
c.panicIfErr(err)

if opts.WithHeartbeat != nil && sess.taskReporter != nil {
sess.taskReporter.EnableWithHeartbeat(time.Duration(*opts.WithHeartbeat) * time.Second)
}

downloader := NewUIDownloader(director, deps.Time, deps.FS, deps.UI)
return NewRunErrandCmd(deployment, downloader, deps.UI).Run(*opts)

Expand Down Expand Up @@ -546,7 +558,10 @@ func (c Cmd) config() cmdconf.Config {
}

func (c Cmd) session() Session {
return NewSessionFromOpts(c.BoshOpts, c.config(), c.deps.UI, true, true, c.deps.FS, c.deps.Logger)
return NewSessionImpl(
NewSessionContextImpl(c.BoshOpts, c.config(), c.deps.FS),
c.deps.UI, true, true, c.deps.Logger,
)
}

func (c Cmd) director() boshdir.Director {
Expand Down
2 changes: 2 additions & 0 deletions cmd/opts/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -713,6 +713,8 @@ type RunErrandOpts struct {
KeepAlive bool `long:"keep-alive" description:"Use existing VM to run an errand and keep it after completion"`
WhenChanged bool `long:"when-changed" description:"Run errand only if errand configuration has changed or if the previous run was unsuccessful"`

WithHeartbeat *int `long:"with-heartbeat" description:"Print task state every N seconds while waiting. Use '=' to specify interval" optional:"true" optional-value:"30"`

DownloadLogs bool `long:"download-logs" description:"Download logs"`
LogsDirectory DirOrCWDArg `long:"logs-dir" description:"Destination directory for logs" default:"."`

Expand Down
8 changes: 8 additions & 0 deletions cmd/opts/opts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2150,6 +2150,14 @@ var _ = Describe("Opts", func() {
})
})

Describe("WithHeartbeat", func() {
It("contains desired values", func() {
Expect(getStructTagForName("WithHeartbeat", opts)).To(Equal(
`long:"with-heartbeat" description:"Print task state every N seconds while waiting. Use '=' to specify interval" optional:"true" optional-value:"30"`,
))
})
})

Describe("DownloadLogs", func() {
It("contains desired values", func() {
Expect(getStructTagForName("DownloadLogs", opts)).To(Equal(
Expand Down
5 changes: 3 additions & 2 deletions cmd/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type SessionImpl struct {

// Memoized
director boshdir.Director
taskReporter *boshuit.ReporterImpl
directorInfo boshdir.Info
directorInfoSet bool
}
Expand Down Expand Up @@ -118,10 +119,10 @@ func (c *SessionImpl) Director() (boshdir.Director, error) {
c.ui.PrintLinef("Using environment '%s' as %s", c.Environment(), creds.Description())
}

taskReporter := boshuit.NewReporter(c.ui, true)
c.taskReporter = boshuit.NewReporter(c.ui, true)
Copy link
Copy Markdown
Contributor

@Alphasite Alphasite Mar 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this is never used except directly below, it probably doesnt need to be a property

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reporter is created inside Director() and wired into the director's HTTP client chain. We access the same instance afterward to enable heartbeats.

		if opts.WithHeartbeat != nil && sess.taskReporter != nil {
			sess.taskReporter.EnableWithHeartbeat(time.Duration(*opts.WithHeartbeat) * time.Second)
		}

Storing it as a field avoids changing Director()'s signature (which is called from ~10 places). Open to changing if there's a better way.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eh. It’s fine. Sorry if I misunderstood something. I figured this was just leftover from some ai thing.

fileReporter := boshui.NewFileReporter(c.ui)

director, err := boshdir.NewFactory(c.logger).New(dirConfig, taskReporter, fileReporter)
director, err := boshdir.NewFactory(c.logger).New(dirConfig, c.taskReporter, fileReporter)
if err != nil {
return nil, err
}
Expand Down
41 changes: 41 additions & 0 deletions director/directorfakes/fake_task_reporter.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions director/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,7 @@ type TaskReporter interface {
TaskStarted(int)
TaskFinished(int, string)
TaskOutputChunk(int, []byte)
TaskHeartbeat(id int, state string, startedAt int64)
}

//counterfeiter:generate . OrphanDisk
Expand Down
7 changes: 4 additions & 3 deletions director/noop_reporters.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ func NewNoopTaskReporter() NoopTaskReporter {
return NoopTaskReporter{}
}

func (r NoopTaskReporter) TaskStarted(id int) {}
func (r NoopTaskReporter) TaskFinished(id int, state string) {}
func (r NoopTaskReporter) TaskOutputChunk(id int, chunk []byte) {}
func (r NoopTaskReporter) TaskStarted(id int) {}
func (r NoopTaskReporter) TaskFinished(id int, state string) {}
func (r NoopTaskReporter) TaskOutputChunk(id int, chunk []byte) {}
func (r NoopTaskReporter) TaskHeartbeat(id int, state string, startedAt int64) {}
6 changes: 4 additions & 2 deletions director/task_client_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ func NewTaskClientRequest(
}

type taskShortResp struct {
ID int // 165
State string // e.g. "queued", "processing", "done", "error", "cancelled"
ID int `json:"id"`
State string `json:"state"` // e.g. "queued", "processing", "done", "error", "cancelled"
StartedAt int64 `json:"started_at"` // 1440318199
}

func (r taskShortResp) IsRunning() bool {
Expand Down Expand Up @@ -111,6 +112,7 @@ func (r TaskClientRequest) WaitForCompletion(id int, type_ string, taskReporter
}

if taskResp.IsRunning() {
taskReporter.TaskHeartbeat(taskResp.ID, taskResp.State, taskResp.StartedAt)
time.Sleep(r.taskCheckStepDuration)
continue
}
Expand Down
55 changes: 55 additions & 0 deletions director/task_client_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,61 @@ var _ = Describe("TaskClientRequest", func() {
})
})

Describe("WaitForCompletion heartbeat", func() {
It("emits a heartbeat for a processing task", func() {
hbReporter := &fakedir.FakeTaskReporter{}
hbReq := buildReq(hbReporter)

server.AppendHandlers(
ghttp.CombineHandlers(
ghttp.VerifyRequest("GET", "/tasks/42"),
ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"processing","description":"run errand 'smoke'","started_at":1700000000}`),
),
ghttp.CombineHandlers(
ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"),
ghttp.RespondWith(http.StatusOK, ""),
),
ghttp.CombineHandlers(
ghttp.VerifyRequest("GET", "/tasks/42"),
ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"done"}`),
),
ghttp.CombineHandlers(
ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"),
ghttp.RespondWith(http.StatusOK, ""),
),
)

err := hbReq.WaitForCompletion(42, "event", hbReporter)
Expect(err).ToNot(HaveOccurred())

Expect(hbReporter.TaskHeartbeatCallCount()).To(BeNumerically(">=", 1))
id, state, startedAt := hbReporter.TaskHeartbeatArgsForCall(0)
Expect(id).To(Equal(42))
Expect(state).To(Equal("processing"))
Expect(startedAt).To(Equal(int64(1700000000)))
})

It("does not emit heartbeats for tasks that immediately finish", func() {
hbReporter := &fakedir.FakeTaskReporter{}
hbReq := buildReq(hbReporter)

server.AppendHandlers(
ghttp.CombineHandlers(
ghttp.VerifyRequest("GET", "/tasks/42"),
ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"done"}`),
),
ghttp.CombineHandlers(
ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"),
ghttp.RespondWith(http.StatusOK, ""),
),
)

err := hbReq.WaitForCompletion(42, "event", hbReporter)
Expect(err).ToNot(HaveOccurred())
Expect(hbReporter.TaskHeartbeatCallCount()).To(Equal(0))
})
})

Describe("WaitForCompletion", func() {
var (
taskReporter *fakedir.FakeTaskReporter
Expand Down
1 change: 1 addition & 0 deletions ui/task/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ type Reporter interface {
TaskStarted(int)
TaskFinished(int, string)
TaskOutputChunk(int, []byte)
TaskHeartbeat(id int, state string, startedAt int64)
}

type Task interface {
Expand Down
35 changes: 35 additions & 0 deletions ui/task/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,22 @@ import (
"fmt"
"strings"
"sync"
"time"

"golang.org/x/text/cases"
"golang.org/x/text/language"

boshui "github.com/cloudfoundry/bosh-cli/v7/ui"
boshuifmt "github.com/cloudfoundry/bosh-cli/v7/ui/fmt"
)

type ReporterImpl struct {
ui boshui.UI
isForEvents bool

withHeartbeatInterval time.Duration
lastHeartbeat time.Time

events map[int][]*Event
eventMarkers []eventMarker
lastGlobalEvent *Event
Expand Down Expand Up @@ -45,6 +50,12 @@ func NewReporter(ui boshui.UI, isForEvents bool) *ReporterImpl {
}
}

func (r *ReporterImpl) EnableWithHeartbeat(interval time.Duration) {
r.Lock()
defer r.Unlock()
r.withHeartbeatInterval = interval
}

func (r *ReporterImpl) TaskStarted(id int) {
r.Lock()
defer r.Unlock()
Expand Down Expand Up @@ -111,6 +122,30 @@ func (r *ReporterImpl) TaskOutputChunk(id int, chunk []byte) {
r.eventMarkers = append(r.eventMarkers, eventMarker{TaskID: id, Type: taskOutput})
}

func (r *ReporterImpl) TaskHeartbeat(id int, state string, startedAt int64) {
r.Lock()
defer r.Unlock()

if r.withHeartbeatInterval <= 0 {
return
}

now := time.Now()
if !r.lastHeartbeat.IsZero() && now.Sub(r.lastHeartbeat) < r.withHeartbeatInterval {
return
}
r.lastHeartbeat = now

msg := "Task state: " + state
if state != "queued" && startedAt > 0 {
elapsed := time.Since(time.Unix(startedAt, 0)).Truncate(time.Second)
msg += fmt.Sprintf(" (%s elapsed)", elapsed)
}

r.printBlock(fmt.Sprintf("\nTask %d | %s | ", id, now.UTC().Format(boshuifmt.TimeHoursFmt)))
r.printBlock(msg)
}

func (r *ReporterImpl) showEvent(id int, str string) {
event := Event{TaskID: id}

Expand Down
52 changes: 52 additions & 0 deletions ui/task/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package task_test

import (
"bytes"
"fmt"
"strings"
"time"

boshlog "github.com/cloudfoundry/bosh-utils/logger"
. "github.com/onsi/ginkgo/v2"
Expand Down Expand Up @@ -335,4 +338,53 @@ Task 101 done
`))
})
})

Describe("TaskHeartbeat", func() {
It("does not print when heartbeat is not enabled", func() {
impl := boshuit.NewReporter(fakeUI, true)
impl.TaskHeartbeat(42, "processing", int64(1700000000))
Expect(fakeUI.Blocks).To(BeNil())
})

It("prints state for a queued task when heartbeat is enabled", func() {
impl := boshuit.NewReporter(fakeUI, true)
impl.EnableWithHeartbeat(10 * time.Second)
impl.TaskHeartbeat(42, "queued", int64(0))
Expect(len(fakeUI.Blocks)).To(BeNumerically(">=", 1))
combined := fmt.Sprintf("%v", fakeUI.Blocks)
Expect(combined).To(ContainSubstring("Task state: queued"))
Expect(combined).NotTo(ContainSubstring("elapsed"))
})

It("prints state with elapsed time for a processing task with startedAt", func() {
impl := boshuit.NewReporter(fakeUI, true)
impl.EnableWithHeartbeat(10 * time.Second)
impl.TaskHeartbeat(42, "processing", int64(1700000000))
Expect(len(fakeUI.Blocks)).To(BeNumerically(">=", 1))
combined := fmt.Sprintf("%v", fakeUI.Blocks)
Expect(combined).To(ContainSubstring("Task state: processing"))
Expect(combined).To(ContainSubstring("elapsed"))
})

It("prints state without elapsed time when processing but startedAt is 0", func() {
impl := boshuit.NewReporter(fakeUI, true)
impl.EnableWithHeartbeat(10 * time.Second)
impl.TaskHeartbeat(42, "processing", int64(0))
Expect(len(fakeUI.Blocks)).To(BeNumerically(">=", 1))
combined := fmt.Sprintf("%v", fakeUI.Blocks)
Expect(combined).To(ContainSubstring("Task state: processing"))
Expect(combined).NotTo(ContainSubstring("elapsed"))
Expect(combined).NotTo(ContainSubstring("queued"))
})

It("throttles heartbeat output based on the configured interval", func() {
impl := boshuit.NewReporter(fakeUI, true)
impl.EnableWithHeartbeat(1 * time.Hour)
impl.TaskHeartbeat(42, "processing", int64(1700000000))
impl.TaskHeartbeat(42, "processing", int64(1700000000))
impl.TaskHeartbeat(42, "processing", int64(1700000000))
combined := fmt.Sprintf("%v", fakeUI.Blocks)
Expect(strings.Count(combined, "Task state")).To(Equal(1))
})
})
})
Loading
Loading