From 6f192ee2b32164dd09467fb1f2df7574e2c13772 Mon Sep 17 00:00:00 2001 From: ay901246 Date: Thu, 12 Mar 2026 17:16:23 -0400 Subject: [PATCH] Add --with-heartbeat flag for heartbeat logging during long-running errands MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Errands produce no CLI output while the Agent executes the script. This silence causes CI/CD systems to kill the process due to inactivity timeouts and leaves operators unsure whether the task is still running. Add an opt-in --with-heartbeat flag to `bosh run-errand` that prints periodic heartbeat status lines while a task is processing or queued. bosh run-errand smoke_tests --with-heartbeat bosh run-errand smoke_tests --with-heartbeat=10 Output: Task 185528 | 16:16:23 | Task state: processing (5s elapsed) No Director changes required — uses existing task API fields (state, started_at). TaskHeartbeat is part of the TaskReporter interface with a no-op default on NoopTaskReporter; withHeartbeatInterval gates whether output is actually emitted. Throttling is handled in the reporter via timestamp comparison. Made-with: Cursor --- cmd/cmd.go | 19 ++++++- cmd/opts/opts.go | 2 + cmd/opts/opts_test.go | 8 +++ cmd/session.go | 5 +- director/directorfakes/fake_task_reporter.go | 41 +++++++++++++++ director/interfaces.go | 1 + director/noop_reporters.go | 7 +-- director/task_client_request.go | 6 ++- director/task_client_request_test.go | 55 ++++++++++++++++++++ ui/task/interfaces.go | 1 + ui/task/reporter.go | 35 +++++++++++++ ui/task/reporter_test.go | 52 ++++++++++++++++++ ui/task/taskfakes/fake_reporter.go | 47 ++++++++++++++--- 13 files changed, 264 insertions(+), 15 deletions(-) diff --git a/cmd/cmd.go b/cmd/cmd.go index c3d858e4a4..4514a08011 100644 --- a/cmd/cmd.go +++ b/cmd/cmd.go @@ -224,7 +224,19 @@ func (c Cmd) Execute() (cmdErr error) { return NewErrandsCmd(deps.UI, c.deployment()).Run() case *RunErrandOpts: - director, deployment := c.directorAndDeployment() + sess, ok := c.session().(*SessionImpl) + if !ok { + return fmt.Errorf("internal error: expected *SessionImpl") + } + director, err := sess.Director() + c.panicIfErr(err) + deployment, err := sess.Deployment() + c.panicIfErr(err) + + if opts.WithHeartbeat != nil && sess.taskReporter != nil { + sess.taskReporter.EnableWithHeartbeat(time.Duration(*opts.WithHeartbeat) * time.Second) + } + downloader := NewUIDownloader(director, deps.Time, deps.FS, deps.UI) return NewRunErrandCmd(deployment, downloader, deps.UI).Run(*opts) @@ -546,7 +558,10 @@ func (c Cmd) config() cmdconf.Config { } func (c Cmd) session() Session { - return NewSessionFromOpts(c.BoshOpts, c.config(), c.deps.UI, true, true, c.deps.FS, c.deps.Logger) + return NewSessionImpl( + NewSessionContextImpl(c.BoshOpts, c.config(), c.deps.FS), + c.deps.UI, true, true, c.deps.Logger, + ) } func (c Cmd) director() boshdir.Director { diff --git a/cmd/opts/opts.go b/cmd/opts/opts.go index ca3673fcf2..4be6b19f94 100644 --- a/cmd/opts/opts.go +++ b/cmd/opts/opts.go @@ -713,6 +713,8 @@ type RunErrandOpts struct { KeepAlive bool `long:"keep-alive" description:"Use existing VM to run an errand and keep it after completion"` WhenChanged bool `long:"when-changed" description:"Run errand only if errand configuration has changed or if the previous run was unsuccessful"` + WithHeartbeat *int `long:"with-heartbeat" description:"Print task state every N seconds while waiting. Use '=' to specify interval" optional:"true" optional-value:"30"` + DownloadLogs bool `long:"download-logs" description:"Download logs"` LogsDirectory DirOrCWDArg `long:"logs-dir" description:"Destination directory for logs" default:"."` diff --git a/cmd/opts/opts_test.go b/cmd/opts/opts_test.go index 1ad6ef9710..bccf004b86 100644 --- a/cmd/opts/opts_test.go +++ b/cmd/opts/opts_test.go @@ -2150,6 +2150,14 @@ var _ = Describe("Opts", func() { }) }) + Describe("WithHeartbeat", func() { + It("contains desired values", func() { + Expect(getStructTagForName("WithHeartbeat", opts)).To(Equal( + `long:"with-heartbeat" description:"Print task state every N seconds while waiting. Use '=' to specify interval" optional:"true" optional-value:"30"`, + )) + }) + }) + Describe("DownloadLogs", func() { It("contains desired values", func() { Expect(getStructTagForName("DownloadLogs", opts)).To(Equal( diff --git a/cmd/session.go b/cmd/session.go index e1eb09ac75..7d85fbfc95 100644 --- a/cmd/session.go +++ b/cmd/session.go @@ -22,6 +22,7 @@ type SessionImpl struct { // Memoized director boshdir.Director + taskReporter *boshuit.ReporterImpl directorInfo boshdir.Info directorInfoSet bool } @@ -118,10 +119,10 @@ func (c *SessionImpl) Director() (boshdir.Director, error) { c.ui.PrintLinef("Using environment '%s' as %s", c.Environment(), creds.Description()) } - taskReporter := boshuit.NewReporter(c.ui, true) + c.taskReporter = boshuit.NewReporter(c.ui, true) fileReporter := boshui.NewFileReporter(c.ui) - director, err := boshdir.NewFactory(c.logger).New(dirConfig, taskReporter, fileReporter) + director, err := boshdir.NewFactory(c.logger).New(dirConfig, c.taskReporter, fileReporter) if err != nil { return nil, err } diff --git a/director/directorfakes/fake_task_reporter.go b/director/directorfakes/fake_task_reporter.go index 2a5972c3d5..be3edc8404 100644 --- a/director/directorfakes/fake_task_reporter.go +++ b/director/directorfakes/fake_task_reporter.go @@ -14,6 +14,13 @@ type FakeTaskReporter struct { arg1 int arg2 string } + TaskHeartbeatStub func(int, string, int64) + taskHeartbeatMutex sync.RWMutex + taskHeartbeatArgsForCall []struct { + arg1 int + arg2 string + arg3 int64 + } TaskOutputChunkStub func(int, []byte) taskOutputChunkMutex sync.RWMutex taskOutputChunkArgsForCall []struct { @@ -62,6 +69,40 @@ func (fake *FakeTaskReporter) TaskFinishedArgsForCall(i int) (int, string) { return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeTaskReporter) TaskHeartbeat(arg1 int, arg2 string, arg3 int64) { + fake.taskHeartbeatMutex.Lock() + fake.taskHeartbeatArgsForCall = append(fake.taskHeartbeatArgsForCall, struct { + arg1 int + arg2 string + arg3 int64 + }{arg1, arg2, arg3}) + stub := fake.TaskHeartbeatStub + fake.recordInvocation("TaskHeartbeat", []interface{}{arg1, arg2, arg3}) + fake.taskHeartbeatMutex.Unlock() + if stub != nil { + fake.TaskHeartbeatStub(arg1, arg2, arg3) + } +} + +func (fake *FakeTaskReporter) TaskHeartbeatCallCount() int { + fake.taskHeartbeatMutex.RLock() + defer fake.taskHeartbeatMutex.RUnlock() + return len(fake.taskHeartbeatArgsForCall) +} + +func (fake *FakeTaskReporter) TaskHeartbeatCalls(stub func(int, string, int64)) { + fake.taskHeartbeatMutex.Lock() + defer fake.taskHeartbeatMutex.Unlock() + fake.TaskHeartbeatStub = stub +} + +func (fake *FakeTaskReporter) TaskHeartbeatArgsForCall(i int) (int, string, int64) { + fake.taskHeartbeatMutex.RLock() + defer fake.taskHeartbeatMutex.RUnlock() + argsForCall := fake.taskHeartbeatArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + func (fake *FakeTaskReporter) TaskOutputChunk(arg1 int, arg2 []byte) { var arg2Copy []byte if arg2 != nil { diff --git a/director/interfaces.go b/director/interfaces.go index cdeaa0d295..f68c0867be 100644 --- a/director/interfaces.go +++ b/director/interfaces.go @@ -304,6 +304,7 @@ type TaskReporter interface { TaskStarted(int) TaskFinished(int, string) TaskOutputChunk(int, []byte) + TaskHeartbeat(id int, state string, startedAt int64) } //counterfeiter:generate . OrphanDisk diff --git a/director/noop_reporters.go b/director/noop_reporters.go index 1a7d2c45eb..8b9cd074b1 100644 --- a/director/noop_reporters.go +++ b/director/noop_reporters.go @@ -43,6 +43,7 @@ func NewNoopTaskReporter() NoopTaskReporter { return NoopTaskReporter{} } -func (r NoopTaskReporter) TaskStarted(id int) {} -func (r NoopTaskReporter) TaskFinished(id int, state string) {} -func (r NoopTaskReporter) TaskOutputChunk(id int, chunk []byte) {} +func (r NoopTaskReporter) TaskStarted(id int) {} +func (r NoopTaskReporter) TaskFinished(id int, state string) {} +func (r NoopTaskReporter) TaskOutputChunk(id int, chunk []byte) {} +func (r NoopTaskReporter) TaskHeartbeat(id int, state string, startedAt int64) {} diff --git a/director/task_client_request.go b/director/task_client_request.go index 183e23697d..48491267f4 100644 --- a/director/task_client_request.go +++ b/director/task_client_request.go @@ -27,8 +27,9 @@ func NewTaskClientRequest( } type taskShortResp struct { - ID int // 165 - State string // e.g. "queued", "processing", "done", "error", "cancelled" + ID int `json:"id"` + State string `json:"state"` // e.g. "queued", "processing", "done", "error", "cancelled" + StartedAt int64 `json:"started_at"` // 1440318199 } func (r taskShortResp) IsRunning() bool { @@ -111,6 +112,7 @@ func (r TaskClientRequest) WaitForCompletion(id int, type_ string, taskReporter } if taskResp.IsRunning() { + taskReporter.TaskHeartbeat(taskResp.ID, taskResp.State, taskResp.StartedAt) time.Sleep(r.taskCheckStepDuration) continue } diff --git a/director/task_client_request_test.go b/director/task_client_request_test.go index 9b9a0d5295..04c24598f4 100644 --- a/director/task_client_request_test.go +++ b/director/task_client_request_test.go @@ -246,6 +246,61 @@ var _ = Describe("TaskClientRequest", func() { }) }) + Describe("WaitForCompletion heartbeat", func() { + It("emits a heartbeat for a processing task", func() { + hbReporter := &fakedir.FakeTaskReporter{} + hbReq := buildReq(hbReporter) + + server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("GET", "/tasks/42"), + ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"processing","description":"run errand 'smoke'","started_at":1700000000}`), + ), + ghttp.CombineHandlers( + ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"), + ghttp.RespondWith(http.StatusOK, ""), + ), + ghttp.CombineHandlers( + ghttp.VerifyRequest("GET", "/tasks/42"), + ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"done"}`), + ), + ghttp.CombineHandlers( + ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"), + ghttp.RespondWith(http.StatusOK, ""), + ), + ) + + err := hbReq.WaitForCompletion(42, "event", hbReporter) + Expect(err).ToNot(HaveOccurred()) + + Expect(hbReporter.TaskHeartbeatCallCount()).To(BeNumerically(">=", 1)) + id, state, startedAt := hbReporter.TaskHeartbeatArgsForCall(0) + Expect(id).To(Equal(42)) + Expect(state).To(Equal("processing")) + Expect(startedAt).To(Equal(int64(1700000000))) + }) + + It("does not emit heartbeats for tasks that immediately finish", func() { + hbReporter := &fakedir.FakeTaskReporter{} + hbReq := buildReq(hbReporter) + + server.AppendHandlers( + ghttp.CombineHandlers( + ghttp.VerifyRequest("GET", "/tasks/42"), + ghttp.RespondWith(http.StatusOK, `{"id":42,"state":"done"}`), + ), + ghttp.CombineHandlers( + ghttp.VerifyRequest("GET", "/tasks/42/output", "type=event"), + ghttp.RespondWith(http.StatusOK, ""), + ), + ) + + err := hbReq.WaitForCompletion(42, "event", hbReporter) + Expect(err).ToNot(HaveOccurred()) + Expect(hbReporter.TaskHeartbeatCallCount()).To(Equal(0)) + }) + }) + Describe("WaitForCompletion", func() { var ( taskReporter *fakedir.FakeTaskReporter diff --git a/ui/task/interfaces.go b/ui/task/interfaces.go index 8268c32bae..72433ddaf2 100644 --- a/ui/task/interfaces.go +++ b/ui/task/interfaces.go @@ -4,6 +4,7 @@ type Reporter interface { TaskStarted(int) TaskFinished(int, string) TaskOutputChunk(int, []byte) + TaskHeartbeat(id int, state string, startedAt int64) } type Task interface { diff --git a/ui/task/reporter.go b/ui/task/reporter.go index 4664c74893..8d86363e9e 100644 --- a/ui/task/reporter.go +++ b/ui/task/reporter.go @@ -5,17 +5,22 @@ import ( "fmt" "strings" "sync" + "time" "golang.org/x/text/cases" "golang.org/x/text/language" boshui "github.com/cloudfoundry/bosh-cli/v7/ui" + boshuifmt "github.com/cloudfoundry/bosh-cli/v7/ui/fmt" ) type ReporterImpl struct { ui boshui.UI isForEvents bool + withHeartbeatInterval time.Duration + lastHeartbeat time.Time + events map[int][]*Event eventMarkers []eventMarker lastGlobalEvent *Event @@ -45,6 +50,12 @@ func NewReporter(ui boshui.UI, isForEvents bool) *ReporterImpl { } } +func (r *ReporterImpl) EnableWithHeartbeat(interval time.Duration) { + r.Lock() + defer r.Unlock() + r.withHeartbeatInterval = interval +} + func (r *ReporterImpl) TaskStarted(id int) { r.Lock() defer r.Unlock() @@ -111,6 +122,30 @@ func (r *ReporterImpl) TaskOutputChunk(id int, chunk []byte) { r.eventMarkers = append(r.eventMarkers, eventMarker{TaskID: id, Type: taskOutput}) } +func (r *ReporterImpl) TaskHeartbeat(id int, state string, startedAt int64) { + r.Lock() + defer r.Unlock() + + if r.withHeartbeatInterval <= 0 { + return + } + + now := time.Now() + if !r.lastHeartbeat.IsZero() && now.Sub(r.lastHeartbeat) < r.withHeartbeatInterval { + return + } + r.lastHeartbeat = now + + msg := "Task state: " + state + if state != "queued" && startedAt > 0 { + elapsed := time.Since(time.Unix(startedAt, 0)).Truncate(time.Second) + msg += fmt.Sprintf(" (%s elapsed)", elapsed) + } + + r.printBlock(fmt.Sprintf("\nTask %d | %s | ", id, now.UTC().Format(boshuifmt.TimeHoursFmt))) + r.printBlock(msg) +} + func (r *ReporterImpl) showEvent(id int, str string) { event := Event{TaskID: id} diff --git a/ui/task/reporter_test.go b/ui/task/reporter_test.go index f49e6f2c75..faf532521c 100644 --- a/ui/task/reporter_test.go +++ b/ui/task/reporter_test.go @@ -2,6 +2,9 @@ package task_test import ( "bytes" + "fmt" + "strings" + "time" boshlog "github.com/cloudfoundry/bosh-utils/logger" . "github.com/onsi/ginkgo/v2" @@ -335,4 +338,53 @@ Task 101 done `)) }) }) + + Describe("TaskHeartbeat", func() { + It("does not print when heartbeat is not enabled", func() { + impl := boshuit.NewReporter(fakeUI, true) + impl.TaskHeartbeat(42, "processing", int64(1700000000)) + Expect(fakeUI.Blocks).To(BeNil()) + }) + + It("prints state for a queued task when heartbeat is enabled", func() { + impl := boshuit.NewReporter(fakeUI, true) + impl.EnableWithHeartbeat(10 * time.Second) + impl.TaskHeartbeat(42, "queued", int64(0)) + Expect(len(fakeUI.Blocks)).To(BeNumerically(">=", 1)) + combined := fmt.Sprintf("%v", fakeUI.Blocks) + Expect(combined).To(ContainSubstring("Task state: queued")) + Expect(combined).NotTo(ContainSubstring("elapsed")) + }) + + It("prints state with elapsed time for a processing task with startedAt", func() { + impl := boshuit.NewReporter(fakeUI, true) + impl.EnableWithHeartbeat(10 * time.Second) + impl.TaskHeartbeat(42, "processing", int64(1700000000)) + Expect(len(fakeUI.Blocks)).To(BeNumerically(">=", 1)) + combined := fmt.Sprintf("%v", fakeUI.Blocks) + Expect(combined).To(ContainSubstring("Task state: processing")) + Expect(combined).To(ContainSubstring("elapsed")) + }) + + It("prints state without elapsed time when processing but startedAt is 0", func() { + impl := boshuit.NewReporter(fakeUI, true) + impl.EnableWithHeartbeat(10 * time.Second) + impl.TaskHeartbeat(42, "processing", int64(0)) + Expect(len(fakeUI.Blocks)).To(BeNumerically(">=", 1)) + combined := fmt.Sprintf("%v", fakeUI.Blocks) + Expect(combined).To(ContainSubstring("Task state: processing")) + Expect(combined).NotTo(ContainSubstring("elapsed")) + Expect(combined).NotTo(ContainSubstring("queued")) + }) + + It("throttles heartbeat output based on the configured interval", func() { + impl := boshuit.NewReporter(fakeUI, true) + impl.EnableWithHeartbeat(1 * time.Hour) + impl.TaskHeartbeat(42, "processing", int64(1700000000)) + impl.TaskHeartbeat(42, "processing", int64(1700000000)) + impl.TaskHeartbeat(42, "processing", int64(1700000000)) + combined := fmt.Sprintf("%v", fakeUI.Blocks) + Expect(strings.Count(combined, "Task state")).To(Equal(1)) + }) + }) }) diff --git a/ui/task/taskfakes/fake_reporter.go b/ui/task/taskfakes/fake_reporter.go index 115068cecf..ab182cb2b6 100644 --- a/ui/task/taskfakes/fake_reporter.go +++ b/ui/task/taskfakes/fake_reporter.go @@ -14,6 +14,13 @@ type FakeReporter struct { arg1 int arg2 string } + TaskHeartbeatStub func(int, string, int64) + taskHeartbeatMutex sync.RWMutex + taskHeartbeatArgsForCall []struct { + arg1 int + arg2 string + arg3 int64 + } TaskOutputChunkStub func(int, []byte) taskOutputChunkMutex sync.RWMutex taskOutputChunkArgsForCall []struct { @@ -62,6 +69,40 @@ func (fake *FakeReporter) TaskFinishedArgsForCall(i int) (int, string) { return argsForCall.arg1, argsForCall.arg2 } +func (fake *FakeReporter) TaskHeartbeat(arg1 int, arg2 string, arg3 int64) { + fake.taskHeartbeatMutex.Lock() + fake.taskHeartbeatArgsForCall = append(fake.taskHeartbeatArgsForCall, struct { + arg1 int + arg2 string + arg3 int64 + }{arg1, arg2, arg3}) + stub := fake.TaskHeartbeatStub + fake.recordInvocation("TaskHeartbeat", []interface{}{arg1, arg2, arg3}) + fake.taskHeartbeatMutex.Unlock() + if stub != nil { + fake.TaskHeartbeatStub(arg1, arg2, arg3) + } +} + +func (fake *FakeReporter) TaskHeartbeatCallCount() int { + fake.taskHeartbeatMutex.RLock() + defer fake.taskHeartbeatMutex.RUnlock() + return len(fake.taskHeartbeatArgsForCall) +} + +func (fake *FakeReporter) TaskHeartbeatCalls(stub func(int, string, int64)) { + fake.taskHeartbeatMutex.Lock() + defer fake.taskHeartbeatMutex.Unlock() + fake.TaskHeartbeatStub = stub +} + +func (fake *FakeReporter) TaskHeartbeatArgsForCall(i int) (int, string, int64) { + fake.taskHeartbeatMutex.RLock() + defer fake.taskHeartbeatMutex.RUnlock() + argsForCall := fake.taskHeartbeatArgsForCall[i] + return argsForCall.arg1, argsForCall.arg2, argsForCall.arg3 +} + func (fake *FakeReporter) TaskOutputChunk(arg1 int, arg2 []byte) { var arg2Copy []byte if arg2 != nil { @@ -135,12 +176,6 @@ func (fake *FakeReporter) TaskStartedArgsForCall(i int) int { func (fake *FakeReporter) Invocations() map[string][][]interface{} { fake.invocationsMutex.RLock() defer fake.invocationsMutex.RUnlock() - fake.taskFinishedMutex.RLock() - defer fake.taskFinishedMutex.RUnlock() - fake.taskOutputChunkMutex.RLock() - defer fake.taskOutputChunkMutex.RUnlock() - fake.taskStartedMutex.RLock() - defer fake.taskStartedMutex.RUnlock() copiedInvocations := map[string][][]interface{}{} for key, value := range fake.invocations { copiedInvocations[key] = value