diff --git a/cmd/arduino-app-cli/app/destroy.go b/cmd/arduino-app-cli/app/destroy.go index bb359e31b..e334861b0 100644 --- a/cmd/arduino-app-cli/app/destroy.go +++ b/cmd/arduino-app-cli/app/destroy.go @@ -55,16 +55,15 @@ func newDestroyCmd(cfg config.Configuration) *cobra.Command { func destroyHandler(ctx context.Context, app app.ArduinoApp) error { out, _, getResult := feedback.OutputStreams() - for message := range orchestrator.StopAndDestroyApp(ctx, servicelocator.GetDockerClient(), servicelocator.GetPlatform(), app) { + if err := orchestrator.StopAndDestroyApp(ctx, servicelocator.GetDockerClient(), servicelocator.GetPlatform(), app, func(message orchestrator.StreamMessage) { switch message.GetType() { case orchestrator.ProgressType: fmt.Fprintf(out, "Progress[%s]: %.0f%%\n", message.GetProgress().Name, message.GetProgress().Progress) case orchestrator.InfoType: fmt.Fprintln(out, "[INFO]", message.GetData()) - case orchestrator.ErrorType: - feedback.Fatal(message.GetError().Error(), feedback.ErrGeneric) - return nil } + }); err != nil { + feedback.Fatal(err.Error(), feedback.ErrGeneric) } outputResult := getResult() diff --git a/cmd/arduino-app-cli/app/logs.go b/cmd/arduino-app-cli/app/logs.go index da4dce177..f979e86c4 100644 --- a/cmd/arduino-app-cli/app/logs.go +++ b/cmd/arduino-app-cli/app/logs.go @@ -74,19 +74,17 @@ func logsHandler(ctx context.Context, app app.ArduinoApp, tail *uint64, follow, if all { cfg.ShowServicesLogs = true } - logsIter, err := orchestrator.AppLogs( + if err := orchestrator.AppLogs( ctx, app, cfg, servicelocator.GetDockerClient(), servicelocator.GetBricksIndex(), - ) - if err != nil { + func(msg orchestrator.LogMessage) { + fmt.Fprintf(stdout, "[%s] %s\n", msg.Name, msg.Content) + }, + ); err != nil { feedback.Fatal(err.Error(), feedback.ErrGeneric) - return nil - } - for msg := range logsIter { - fmt.Fprintf(stdout, "[%s] %s\n", msg.Name, msg.Content) } return nil } diff --git a/cmd/arduino-app-cli/app/restart.go b/cmd/arduino-app-cli/app/restart.go index aaea80bd5..bb31f5a8a 100644 --- a/cmd/arduino-app-cli/app/restart.go +++ b/cmd/arduino-app-cli/app/restart.go @@ -56,7 +56,7 @@ func newRestartCmd(cfg config.Configuration) *cobra.Command { func restartHandler(ctx context.Context, cfg config.Configuration, app app.ArduinoApp) error { out, _, getResult := feedback.OutputStreams() - stream := orchestrator.RestartApp( + err := orchestrator.RestartApp( ctx, servicelocator.GetDockerClient(), servicelocator.GetProvisioner(), @@ -66,18 +66,18 @@ func restartHandler(ctx context.Context, cfg config.Configuration, app app.Ardui cfg, servicelocator.GetStaticStore(), servicelocator.GetPlatform(), + func(message orchestrator.StreamMessage) { + switch message.GetType() { + case orchestrator.ProgressType: + fmt.Fprintf(out, "Progress[%s]: %.0f%%\n", message.GetProgress().Name, message.GetProgress().Progress) + case orchestrator.InfoType: + fmt.Fprintln(out, "[INFO]", message.GetData()) + } + }, ) - for message := range stream { - switch message.GetType() { - case orchestrator.ProgressType: - fmt.Fprintf(out, "Progress[%s]: %.0f%%\n", message.GetProgress().Name, message.GetProgress().Progress) - case orchestrator.InfoType: - fmt.Fprintln(out, "[INFO]", message.GetData()) - case orchestrator.ErrorType: - errMesg := cases.Title(language.AmericanEnglish).String(message.GetError().Error()) - feedback.Fatal(fmt.Sprintf("[ERROR] %s", errMesg), feedback.ErrGeneric) - return nil - } + if err != nil { + errMesg := cases.Title(language.AmericanEnglish).String(err.Error()) + feedback.Fatal(fmt.Sprintf("[ERROR] %s", errMesg), feedback.ErrGeneric) } outputResult := getResult() diff --git a/cmd/arduino-app-cli/app/start.go b/cmd/arduino-app-cli/app/start.go index a6b10e33c..f8eecae16 100644 --- a/cmd/arduino-app-cli/app/start.go +++ b/cmd/arduino-app-cli/app/start.go @@ -58,7 +58,7 @@ func newStartCmd(cfg config.Configuration) *cobra.Command { func startHandler(ctx context.Context, cfg config.Configuration, app app.ArduinoApp) error { out, _, getResult := feedback.OutputStreams() - stream := orchestrator.StartApp( + err := orchestrator.StartApp( ctx, servicelocator.GetDockerClient(), servicelocator.GetProvisioner(), @@ -68,19 +68,20 @@ func startHandler(ctx context.Context, cfg config.Configuration, app app.Arduino cfg, servicelocator.GetStaticStore(), servicelocator.GetPlatform(), + func(message orchestrator.StreamMessage) { + switch message.GetType() { + case orchestrator.ProgressType: + fmt.Fprintf(out, "Progress[%s]: %.0f%%\n", message.GetProgress().Name, message.GetProgress().Progress) + case orchestrator.InfoType: + fmt.Fprintln(out, "[INFO]", message.GetData()) + } + }, ) - for message := range stream { - switch message.GetType() { - case orchestrator.ProgressType: - fmt.Fprintf(out, "Progress[%s]: %.0f%%\n", message.GetProgress().Name, message.GetProgress().Progress) - case orchestrator.InfoType: - fmt.Fprintln(out, "[INFO]", message.GetData()) - case orchestrator.ErrorType: - errMesg := cases.Title(language.AmericanEnglish).String(message.GetError().Error()) - feedback.Fatal(fmt.Sprintf("[ERROR] %s", errMesg), feedback.ErrGeneric) - return nil - } + if err != nil { + errMesg := cases.Title(language.AmericanEnglish).String(err.Error()) + feedback.Fatal(fmt.Sprintf("[ERROR] %s", errMesg), feedback.ErrGeneric) } + outputResult := getResult() feedback.PrintResult(startAppResult{ AppName: app.Name, diff --git a/cmd/arduino-app-cli/app/stop.go b/cmd/arduino-app-cli/app/stop.go index 27343f3a2..168a188d9 100644 --- a/cmd/arduino-app-cli/app/stop.go +++ b/cmd/arduino-app-cli/app/stop.go @@ -56,16 +56,15 @@ func newStopCmd(cfg config.Configuration) *cobra.Command { func stopHandler(ctx context.Context, app app.ArduinoApp) error { out, _, getResult := feedback.OutputStreams() - for message := range orchestrator.StopApp(ctx, servicelocator.GetDockerClient(), servicelocator.GetPlatform(), app) { + if err := orchestrator.StopApp(ctx, servicelocator.GetDockerClient(), servicelocator.GetPlatform(), app, func(message orchestrator.StreamMessage) { switch message.GetType() { case orchestrator.ProgressType: fmt.Fprintf(out, "Progress[%s]: %.0f%%\n", message.GetProgress().Name, message.GetProgress().Progress) case orchestrator.InfoType: fmt.Fprintln(out, "[INFO]", message.GetData()) - case orchestrator.ErrorType: - feedback.Fatal(message.GetError().Error(), feedback.ErrGeneric) - return nil } + }); err != nil { + feedback.Fatal(err.Error(), feedback.ErrGeneric) } outputResult := getResult() diff --git a/internal/api/handlers/app_logs.go b/internal/api/handlers/app_logs.go index a350fb68c..f4af9427e 100644 --- a/internal/api/handlers/app_logs.go +++ b/internal/api/handlers/app_logs.go @@ -95,20 +95,17 @@ func HandleAppLogs( BrickID string `json:"brick_id,omitempty"` Message string `json:"message"` } - messagesIter, err := orchestrator.AppLogs(r.Context(), app, appLogsRequest, dockerClient, bricksIndex) - if err != nil { - sseStream.SendError(render.SSEErrorData{ - Code: render.InternalServiceErr, - Message: "failed to start the app", - }) - return - } - for item := range messagesIter { + if err := orchestrator.AppLogs(r.Context(), app, appLogsRequest, dockerClient, bricksIndex, func(item orchestrator.LogMessage) { sseStream.Send(render.SSEEvent{Type: "message", Data: log{ ID: item.Name, Message: item.Content, BrickID: item.BrickName, }}) + }); err != nil { + sseStream.SendError(render.SSEErrorData{ + Code: render.InternalServiceErr, + Message: "failed to start the app", + }) } } } diff --git a/internal/api/handlers/app_start.go b/internal/api/handlers/app_start.go index 637d770f6..e15e1ddc1 100644 --- a/internal/api/handlers/app_start.go +++ b/internal/api/handlers/app_start.go @@ -73,18 +73,19 @@ func HandleAppStart( type log struct { Message string `json:"message"` } - for item := range orchestrator.StartApp(r.Context(), dockerCli, provisioner, modelsIndex, bricksIndex, app, cfg, staticStore, platform) { + err = orchestrator.StartApp(r.Context(), dockerCli, provisioner, modelsIndex, bricksIndex, app, cfg, staticStore, platform, func(item orchestrator.StreamMessage) { switch item.GetType() { case orchestrator.ProgressType: sseStream.Send(render.SSEEvent{Type: "progress", Data: progress(*item.GetProgress())}) case orchestrator.InfoType: sseStream.Send(render.SSEEvent{Type: "message", Data: log{Message: item.GetData()}}) - case orchestrator.ErrorType: - sseStream.SendError(render.SSEErrorData{ - Code: render.InternalServiceErr, - Message: item.GetError().Error(), - }) } + }) + if err != nil { + sseStream.SendError(render.SSEErrorData{ + Code: render.InternalServiceErr, + Message: err.Error(), + }) } } } diff --git a/internal/api/handlers/app_status.go b/internal/api/handlers/app_status.go index f8a904762..240d16dc9 100644 --- a/internal/api/handlers/app_status.go +++ b/internal/api/handlers/app_status.go @@ -54,12 +54,10 @@ func HandlerAppStatus( } } - for appStatus, err := range orchestrator.AppStatusEvents(r.Context(), cfg, dockerCli, idProvider) { - if err != nil { - sseStream.SendError(render.SSEErrorData{Code: render.InternalServiceErr, Message: err.Error()}) - continue - } + if err := orchestrator.AppStatusEvents(r.Context(), cfg, dockerCli, idProvider, func(appStatus orchestrator.AppInfo) { sseStream.Send(render.SSEEvent{Type: "app", Data: appStatus}) + }); err != nil { + sseStream.SendError(render.SSEErrorData{Code: render.InternalServiceErr, Message: err.Error()}) } } } diff --git a/internal/api/handlers/app_stop.go b/internal/api/handlers/app_stop.go index b5efe701e..1cd5fd3d5 100644 --- a/internal/api/handlers/app_stop.go +++ b/internal/api/handlers/app_stop.go @@ -64,18 +64,19 @@ func HandleAppStop( type log struct { Message string `json:"message"` } - for item := range orchestrator.StopApp(r.Context(), dockerClient, platform, app) { + err = orchestrator.StopApp(r.Context(), dockerClient, platform, app, func(item orchestrator.StreamMessage) { switch item.GetType() { case orchestrator.ProgressType: sseStream.Send(render.SSEEvent{Type: "progress", Data: progress(*item.GetProgress())}) case orchestrator.InfoType: sseStream.Send(render.SSEEvent{Type: "message", Data: log{Message: item.GetData()}}) - case orchestrator.ErrorType: - sseStream.SendError(render.SSEErrorData{ - Code: render.InternalServiceErr, - Message: item.GetError().Error(), - }) } + }) + if err != nil { + sseStream.SendError(render.SSEErrorData{ + Code: render.InternalServiceErr, + Message: err.Error(), + }) } } } diff --git a/internal/api/handlers/system_resources.go b/internal/api/handlers/system_resources.go index aaf271a95..86a75303d 100644 --- a/internal/api/handlers/system_resources.go +++ b/internal/api/handlers/system_resources.go @@ -38,15 +38,7 @@ func HandleSystemResources(cfg config.Configuration) http.HandlerFunc { } defer sseStream.Close() - resources, err := orchestrator.SystemResources(ctx, cfg, nil) - if err != nil { - sseStream.SendError(render.SSEErrorData{ - Code: render.InternalServiceErr, - Message: "failed to obtain the resources", - }) - return - } - for resource := range resources { + if err := orchestrator.SystemResources(ctx, cfg, nil, func(resource orchestrator.SystemResource) { switch res := resource.(type) { case *orchestrator.SystemDiskResource: sseStream.Send(render.SSEEvent{Type: "disk", Data: res}) @@ -55,6 +47,11 @@ func HandleSystemResources(cfg config.Configuration) http.HandlerFunc { case *orchestrator.SystemMemoryResource: sseStream.Send(render.SSEEvent{Type: "mem", Data: res}) } + }); err != nil { + sseStream.SendError(render.SSEErrorData{ + Code: render.InternalServiceErr, + Message: "failed to obtain the resources", + }) } } } diff --git a/internal/e2e/updatetest/helpers.go b/internal/e2e/updatetest/helpers.go index 78ed40cb1..8b8fb3e50 100644 --- a/internal/e2e/updatetest/helpers.go +++ b/internal/e2e/updatetest/helpers.go @@ -23,7 +23,6 @@ import ( "context" "encoding/json" "fmt" - "iter" "log" "net" "net/http" @@ -263,51 +262,51 @@ func putUpdateRequest(t *testing.T, host string) { } -func NewSSEClient(ctx context.Context, method, url string) iter.Seq2[Event, error] { - return func(yield func(Event, error) bool) { - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) - if err != nil { - _ = yield(Event{}, err) - return - } +func newSSEClient(ctx context.Context, method, url string, cb func(Event)) error { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + if err != nil { + return err + } - resp, err := http.DefaultClient.Do(req) - if err != nil { - _ = yield(Event{}, err) - return + resp, err := http.DefaultClient.Do(req) + if err != nil { + if ctx.Err() != nil { + return nil } - defer resp.Body.Close() + return err + } + defer resp.Body.Close() - if resp.StatusCode != 200 { - _ = yield(Event{}, fmt.Errorf("got response status code %d", resp.StatusCode)) - return - } + if resp.StatusCode != 200 { + return fmt.Errorf("got response status code %d", resp.StatusCode) + } - reader := bufio.NewReader(resp.Body) + reader := bufio.NewReader(resp.Body) - evt := Event{} - for { - line, err := reader.ReadString('\n') - if err != nil { - _ = yield(Event{}, err) - return + evt := Event{} + for { + line, err := reader.ReadString('\n') + if err != nil { + if ctx.Err() != nil { + return nil } - switch { - case strings.HasPrefix(line, "data:"): - evt.Data = []byte(strings.TrimSpace(strings.TrimPrefix(line, "data:"))) - case strings.HasPrefix(line, "event:"): - evt.Event = strings.TrimSpace(strings.TrimPrefix(line, "event:")) - case strings.HasPrefix(line, "id:"): - evt.ID = strings.TrimSpace(strings.TrimPrefix(line, "id:")) - case strings.HasPrefix(line, "\n"): - if !yield(evt, nil) { - return - } - evt = Event{} - default: - _ = yield(Event{}, fmt.Errorf("unknown line: '%s'", line)) - return + return err + } + switch { + case strings.HasPrefix(line, "data:"): + evt.Data = []byte(strings.TrimSpace(strings.TrimPrefix(line, "data:"))) + case strings.HasPrefix(line, "event:"): + evt.Event = strings.TrimSpace(strings.TrimPrefix(line, "event:")) + case strings.HasPrefix(line, "id:"): + evt.ID = strings.TrimSpace(strings.TrimPrefix(line, "id:")) + case strings.HasPrefix(line, "\n"): + cb(evt) + if ctx.Err() != nil { + return nil } + evt = Event{} + default: + return fmt.Errorf("unknown line: '%s'", line) } } } @@ -338,13 +337,15 @@ func waitForUpgrade(t *testing.T, host string) { url := fmt.Sprintf("http://%s/v1/system/update/events", host) - itr := NewSSEClient(t.Context(), "GET", url) - for event, err := range itr { - require.NoError(t, err) + ctx, cancel := context.WithCancel(t.Context()) + defer cancel() + + if err := newSSEClient(ctx, "GET", url, func(event Event) { t.Logf("Received event: ID=%s, Event=%s, Data=%s\n", event.ID, event.Event, string(event.Data)) if event.Event == "restarting" { - break + cancel() } + }); err != nil { + require.NoError(t, err) } - } diff --git a/internal/helpers/iter.go b/internal/helpers/iter.go deleted file mode 100644 index 678de3f7c..000000000 --- a/internal/helpers/iter.go +++ /dev/null @@ -1,26 +0,0 @@ -// This file is part of arduino-app-cli. -// -// Copyright (C) Arduino s.r.l. and/or its affiliated companies -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -package helpers - -import ( - "iter" -) - -func EmptyIter[V any]() iter.Seq[V] { - return func(yield func(V) bool) {} -} diff --git a/internal/orchestrator/app_status.go b/internal/orchestrator/app_status.go index cb0b4fd70..2fe845314 100644 --- a/internal/orchestrator/app_status.go +++ b/internal/orchestrator/app_status.go @@ -20,7 +20,6 @@ package orchestrator import ( "context" "fmt" - "iter" "log/slog" "github.com/arduino/go-paths-helper" @@ -32,7 +31,7 @@ import ( "github.com/arduino/arduino-app-cli/internal/orchestrator/config" ) -func AppStatusEvents(ctx context.Context, cfg config.Configuration, docker command.Cli, idProvider *app.IDProvider) iter.Seq2[AppInfo, error] { +func AppStatusEvents(ctx context.Context, cfg config.Configuration, docker command.Cli, idProvider *app.IDProvider, cb func(AppInfo)) error { chanMsg, chanError := docker.Client().Events(ctx, events.ListOptions{ Filters: filters.NewArgs( filters.Arg("label", DockerAppLabel+"=true"), @@ -47,37 +46,30 @@ func AppStatusEvents(ctx context.Context, cfg config.Configuration, docker comma ), }) - return func(yield func(AppInfo, error) bool) { - for { - select { - case <-ctx.Done(): - slog.Debug("Stopping to listen to docker events") - return - default: - } + for { + select { + case <-ctx.Done(): + slog.Debug("Stopping to listen to docker events") + return nil + default: + } - select { - - case err := <-chanError: - if err != nil { - slog.Error("Error listening to docker events", slog.String("error", err.Error())) - _ = yield(AppInfo{}, fmt.Errorf("error listening to docker events: %w", err)) - return - } - case event := <-chanMsg: - appStatus, err := parseDockerStatusEvent(ctx, cfg, docker, idProvider, event) - if err != nil { - slog.Error("Unable to get apps status", slog.String("error", err.Error())) - if !yield(AppInfo{}, err) { - return - } - } - if !yield(appStatus, nil) { - return - } - } + select { + case err := <-chanError: + if err != nil { + slog.Error("Error listening to docker events", slog.String("error", err.Error())) + return fmt.Errorf("error listening to docker events: %w", err) + } + case event := <-chanMsg: + appStatus, err := parseDockerStatusEvent(ctx, cfg, docker, idProvider, event) + if err != nil { + slog.Error("Unable to get apps status", slog.String("error", err.Error())) + return err + } + cb(appStatus) } + } } diff --git a/internal/orchestrator/cache.go b/internal/orchestrator/cache.go index 6207900dc..4c3fa8058 100644 --- a/internal/orchestrator/cache.go +++ b/internal/orchestrator/cache.go @@ -51,9 +51,7 @@ func CleanAppCache( return ErrCleanCacheRunningApp } // We try to remove docker related resources at best effort - for range StopAndDestroyApp(ctx, docker, platform, app) { - // just consume the iterator - } + _ = StopAndDestroyApp(ctx, docker, platform, app, func(StreamMessage) {}) } return app.ProvisioningStateDir().RemoveAll() diff --git a/internal/orchestrator/logs.go b/internal/orchestrator/logs.go index 7d180350e..a63ab7706 100644 --- a/internal/orchestrator/logs.go +++ b/internal/orchestrator/logs.go @@ -20,12 +20,10 @@ package orchestrator import ( "context" "fmt" - "iter" "log/slog" "os" "strings" "sync" - "sync/atomic" "github.com/compose-spec/compose-go/v2/loader" "github.com/compose-spec/compose-go/v2/types" @@ -35,7 +33,6 @@ import ( "github.com/docker/compose/v2/pkg/compose" "go.bug.st/f" - "github.com/arduino/arduino-app-cli/internal/helpers" "github.com/arduino/arduino-app-cli/internal/orchestrator/app" "github.com/arduino/arduino-app-cli/internal/orchestrator/bricksindex" ) @@ -59,14 +56,15 @@ func AppLogs( req AppLogsRequest, dockerCli command.Cli, bricksIndex *bricksindex.BricksIndex, -) (iter.Seq[LogMessage], error) { + cb func(LogMessage), +) error { if app.MainPythonFile == nil { - return helpers.EmptyIter[LogMessage](), nil + return nil } mainCompose := app.AppComposeFilePath() if mainCompose.NotExist() { - return helpers.EmptyIter[LogMessage](), nil + return nil } bricksIndex = bricksIndex.WithAppBricks(app.LocalBricks) @@ -91,7 +89,7 @@ func AppLogs( services, err := extractServicesFromComposeFile(composeFilePath) if err != nil { - return helpers.EmptyIter[LogMessage](), err + return err } for _, s := range services { serviceToBrickMapping[s.name] = brick.ID @@ -108,7 +106,7 @@ func AppLogs( loader.WithSkipValidation, //TODO: check if there is a bug on docker compose upstream ) if err != nil { - return nil, err + return err } filteredServices := prj.ServiceNames() @@ -119,42 +117,40 @@ func AppLogs( } backend := compose.NewComposeService(dockerCli).(commands.Backend) - return func(yield func(LogMessage) bool) { - opts := api.LogOptions{ - Project: prj, - Follow: req.Follow, - Services: filteredServices, - Timestamps: false, - } - if req.Tail != nil { - opts.Tail = fmt.Sprintf("%d", *req.Tail) - } - err = backend.Logs( - ctx, - prj.Name, - NewDockerLogConsumer(ctx, yield, serviceToBrickMapping), - opts, - ) - if err != nil { - slog.Error("docker logs error", slog.String("error", err.Error())) - return - } - }, nil + opts := api.LogOptions{ + Project: prj, + Follow: req.Follow, + Services: filteredServices, + Timestamps: false, + } + if req.Tail != nil { + opts.Tail = fmt.Sprintf("%d", *req.Tail) + } + err = backend.Logs( + ctx, + prj.Name, + NewDockerLogConsumer(ctx, cb, serviceToBrickMapping), + opts, + ) + if err != nil { + slog.Error("docker logs error", slog.String("error", err.Error())) + return err + } + return nil } var _ api.LogConsumer = (*DockerLogConsumer)(nil) type DockerLogConsumer struct { - ctx context.Context - cb func(LogMessage) bool - mapping map[string]string - shuttingDown atomic.Bool - mu sync.Mutex + ctx context.Context + cb func(LogMessage) + mapping map[string]string + mu sync.Mutex } func NewDockerLogConsumer( ctx context.Context, - cb func(LogMessage) bool, + cb func(LogMessage), mapping map[string]string, ) *DockerLogConsumer { return &DockerLogConsumer{ @@ -180,14 +176,11 @@ func (d *DockerLogConsumer) Status(container string, msg string) { } func (d *DockerLogConsumer) write(container, message string) { - if d.ctx.Err() != nil || d.shuttingDown.Load() { + if d.ctx.Err() != nil { return } d.mu.Lock() defer d.mu.Unlock() - if d.shuttingDown.Load() { - return - } serviceName := strings.TrimSpace(container) idx := strings.LastIndex(serviceName, "-") @@ -196,13 +189,10 @@ func (d *DockerLogConsumer) write(container, message string) { serviceName = serviceName[:idx] } for line := range strings.SplitSeq(message, "\n") { - if !d.cb(LogMessage{ + d.cb(LogMessage{ Name: serviceName, BrickName: d.mapping[serviceName], Content: line, - }) { - d.shuttingDown.CompareAndSwap(false, true) - return - } + }) } } diff --git a/internal/orchestrator/models.go b/internal/orchestrator/models.go index f6de90820..625d8ccaa 100644 --- a/internal/orchestrator/models.go +++ b/internal/orchestrator/models.go @@ -181,7 +181,8 @@ func AIModelDelete(ctx context.Context, dockerClient command.Cli, cfg config.Con } if runningAppReference != nil { - StopApp(ctx, dockerClient, platform, *runningAppReference) + // TODO: Should we report error in some way? + _ = StopApp(ctx, dockerClient, platform, *runningAppReference, func(StreamMessage) {}) } if res.ModelFolderPath == nil { diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 63d5cc31c..b1d3dd64d 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -22,7 +22,6 @@ import ( "context" "fmt" "io" - "iter" "log/slog" "maps" "os" @@ -74,12 +73,10 @@ const ( UnknownType MessageType = "" ProgressType MessageType = "progress" InfoType MessageType = "info" - ErrorType MessageType = "error" ) type StreamMessage struct { data string - error error progress *Progress } @@ -89,18 +86,13 @@ type Progress struct { } func (p *StreamMessage) IsData() bool { return p.data != "" } -func (p *StreamMessage) IsError() bool { return p.error != nil } func (p *StreamMessage) IsProgress() bool { return p.progress != nil } func (p *StreamMessage) GetData() string { return p.data } -func (p *StreamMessage) GetError() error { return p.error } func (p *StreamMessage) GetProgress() *Progress { return p.progress } func (p *StreamMessage) GetType() MessageType { if p.IsData() { return InfoType } - if p.IsError() { - return ErrorType - } if p.IsProgress() { return ProgressType } @@ -117,152 +109,114 @@ func StartApp( cfg config.Configuration, staticStore *store.StaticStore, platform platform.Platform, -) iter.Seq[StreamMessage] { - return func(yield func(StreamMessage) bool) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() + cb func(StreamMessage), +) error { + bricksIndex = bricksIndex.WithAppBricks(appToStart.LocalBricks) - bricksIndex = bricksIndex.WithAppBricks(appToStart.LocalBricks) + if err := checkBricks(appToStart.Descriptor, bricksIndex, modelsIndex); err != nil { + return err + } - err := checkBricks(appToStart.Descriptor, bricksIndex, modelsIndex) - if err != nil { - yield(StreamMessage{error: err}) - return - } + devices, err := peripherals.Detect() + if err != nil { + return err + } - devices, err := peripherals.Detect() - if err != nil { - yield(StreamMessage{error: err}) - return - } + err = checkRequiredDevices(bricksIndex, appToStart.Descriptor.Bricks, devices) + if err != nil { + return err - err = checkRequiredDevices(bricksIndex, appToStart.Descriptor.Bricks, devices) - if err != nil { - yield(StreamMessage{error: err}) - return - } + } - running, err := getRunningApp(ctx, docker.Client()) - if err != nil { - yield(StreamMessage{error: err}) - return - } - if running != nil { - yield(StreamMessage{error: fmt.Errorf("app %q is running", running.Name)}) - return - } - if !yield(StreamMessage{data: fmt.Sprintf("Starting app %q", appToStart.Name)}) { - return - } + running, err := getRunningApp(ctx, docker.Client()) + if err != nil { + return err + } + if running != nil { + return fmt.Errorf("app %q is running", running.Name) + } + cb(StreamMessage{data: fmt.Sprintf("Starting app %q", appToStart.Name)}) - if err := setStatusLeds(platform, LedTriggerNone); err != nil { - slog.Debug("unable to set status leds", slog.String("error", err.Error())) - } + if err := setStatusLeds(platform, LedTriggerNone); err != nil { + slog.Debug("unable to set status leds", slog.String("error", err.Error())) + } - sketchCallbackWriter := NewCallbackWriter(func(line string) { - if !yield(StreamMessage{data: line}) { - cancel() - return - } - }) - if !yield(StreamMessage{progress: &Progress{Name: "preparing", Progress: 0.0}}) { - return + sketchCallbackWriter := NewCallbackWriter(func(line string) { + cb(StreamMessage{data: line}) + }) + cb(StreamMessage{progress: &Progress{Name: "preparing", Progress: 0.0}}) + + if _, ok := appToStart.GetSketchPath(); ok { + cb(StreamMessage{progress: &Progress{Name: "sketch compiling and uploading", Progress: 0.0}}) + if err := compileUploadSketch(ctx, platform, &appToStart, sketchCallbackWriter); err != nil { + return err } + cb(StreamMessage{progress: &Progress{Name: "sketch updated", Progress: 10.0}}) + } + if appToStart.MainPythonFile != nil { + envs := getAppEnvironmentVariables(appToStart, bricksIndex, modelsIndex) + + cb(StreamMessage{data: "python provisioning"}) + provisionStartProgress := float32(0.0) if _, ok := appToStart.GetSketchPath(); ok { - if !yield(StreamMessage{progress: &Progress{Name: "sketch compiling and uploading", Progress: 0.0}}) { - return - } - if err := compileUploadSketch(ctx, platform, &appToStart, sketchCallbackWriter); err != nil { - yield(StreamMessage{error: err}) - return - } - if !yield(StreamMessage{progress: &Progress{Name: "sketch updated", Progress: 10.0}}) { - return - } + provisionStartProgress = 10.0 } - if appToStart.MainPythonFile != nil { - envs := getAppEnvironmentVariables(appToStart, bricksIndex, modelsIndex) + cb(StreamMessage{progress: &Progress{Name: "python provisioning", Progress: provisionStartProgress}}) - if !yield(StreamMessage{data: "python provisioning"}) { - cancel() - return - } - provisionStartProgress := float32(0.0) - if _, ok := appToStart.GetSketchPath(); ok { - provisionStartProgress = 10.0 - } + if err := provisioner.App(ctx, bricksIndex, &appToStart, cfg, envs, platform, devices); err != nil { + return err + } - if !yield(StreamMessage{progress: &Progress{Name: "python provisioning", Progress: provisionStartProgress}}) { - return - } + cb(StreamMessage{data: "python downloading"}) - if err := provisioner.App(ctx, bricksIndex, &appToStart, cfg, envs, platform, devices); err != nil { - yield(StreamMessage{error: err}) - return - } + // Launch the docker compose command to start the app + overrideComposeFile := appToStart.AppComposeOverrideFilePath() - if !yield(StreamMessage{data: "python downloading"}) { - cancel() - return - } + commands := []string{} + commands = append(commands, "docker", "compose", "-f", appToStart.AppComposeFilePath().String()) + if ok, _ := overrideComposeFile.ExistCheck(); ok { + commands = append(commands, "-f", overrideComposeFile.String()) + } + commands = append(commands, "up", "-d", "--remove-orphans", "--pull", "missing") - // Launch the docker compose command to start the app - overrideComposeFile := appToStart.AppComposeOverrideFilePath() + dockerParser := NewDockerProgressParser(200) - commands := []string{} - commands = append(commands, "docker", "compose", "-f", appToStart.AppComposeFilePath().String()) - if ok, _ := overrideComposeFile.ExistCheck(); ok { - commands = append(commands, "-f", overrideComposeFile.String()) + var customError error + callbackDockerWriter := NewCallbackWriter(func(line string) { + // docker compose sometimes returns errors as info lines, we try to parse them here and return a proper error + if e := GetCustomErrorFomDockerEvent(line); e != nil { + customError = e } - commands = append(commands, "up", "-d", "--remove-orphans", "--pull", "missing") - - dockerParser := NewDockerProgressParser(200) - - var customError error - callbackDockerWriter := NewCallbackWriter(func(line string) { - // docker compose sometimes returns errors as info lines, we try to parse them here and return a proper error - - if e := GetCustomErrorFomDockerEvent(line); e != nil { - customError = e - } - if percentage, ok := dockerParser.Parse(line); ok { - - // assumption: docker pull progress goes from 0 to 80% of the total app start progress - totalProgress := 20.0 + (percentage/100.0)*80.0 - - if !yield(StreamMessage{progress: &Progress{Name: "python starting", Progress: float32(totalProgress)}}) { - cancel() - return - } - return - } else if !yield(StreamMessage{data: line}) { - cancel() - return - } - }) - slog.Debug("starting app", slog.String("command", strings.Join(commands, " ")), slog.Any("envs", envs)) - process, err := paths.NewProcess(envs.AsList(), commands...) - if err != nil { - yield(StreamMessage{error: err}) - return + if percentage, ok := dockerParser.Parse(line); ok { + // assumption: docker pull progress goes from 0 to 80% of the total app start progress + totalProgress := 20.0 + (percentage/100.0)*80.0 + cb(StreamMessage{progress: &Progress{Name: "python starting", Progress: float32(totalProgress)}}) + } else { + cb(StreamMessage{data: line}) } - process.RedirectStderrTo(callbackDockerWriter) - process.RedirectStdoutTo(callbackDockerWriter) - if err := process.RunWithinContext(ctx); err != nil { - // custom error could have been set while reading the output. Not detected by the process exit code - if customError != nil { - err = customError - } + }) - yield(StreamMessage{error: err}) - return + slog.Debug("starting app", slog.String("command", strings.Join(commands, " ")), slog.Any("envs", envs)) + process, err := paths.NewProcess(envs.AsList(), commands...) + if err != nil { + return err + } + process.RedirectStderrTo(callbackDockerWriter) + process.RedirectStdoutTo(callbackDockerWriter) + if err := process.RunWithinContext(ctx); err != nil { + // custom error could have been set while reading the output. Not detected by the process exit code + if customError != nil { + err = customError } + + return err } - _ = yield(StreamMessage{progress: &Progress{Name: "", Progress: 100.0}}) } + cb(StreamMessage{progress: &Progress{Name: "", Progress: 100.0}}) + return nil } // getAppEnvironmentVariables returns the environment variables for the app by merging variables and config in the following order: @@ -309,121 +263,99 @@ func getAppEnvironmentVariables(app app.ArduinoApp, brickIndex *bricksindex.Bric return envs } -func stopAppWithCmd(ctx context.Context, docker command.Cli, platform platform.Platform, app app.ArduinoApp, cmd string) iter.Seq[StreamMessage] { - return func(yield func(StreamMessage) bool) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - - var message string - switch cmd { - case "stop": - message = fmt.Sprintf("Stopping app %q", app.Name) - case "down": - message = fmt.Sprintf("Destroying app %q", app.Name) - } +func stopAppWithCmd(ctx context.Context, docker command.Cli, platform platform.Platform, app app.ArduinoApp, cmd string, cb func(StreamMessage)) error { + var message string + switch cmd { + case "stop": + message = fmt.Sprintf("Stopping app %q", app.Name) + case "down": + message = fmt.Sprintf("Destroying app %q", app.Name) + } - if !yield(StreamMessage{data: message}) { - return + cb(StreamMessage{data: message}) + if err := setStatusLeds(platform, LedTriggerDefault); err != nil { + slog.Debug("unable to set status leds", slog.String("error", err.Error())) + } + + callbackWriter := NewCallbackWriter(func(line string) { + cb(StreamMessage{data: line}) + }) + + if _, ok := app.GetSketchPath(); ok { + // Before stopping the microcontroller we want to make sure that the app was running. + running, err := getRunningApp(ctx, docker.Client()) + if err != nil { + return err } - if err := setStatusLeds(platform, LedTriggerDefault); err != nil { - slog.Debug("unable to set status leds", slog.String("error", err.Error())) + if running != nil && running.FullPath.String() == app.FullPath.String() { + cb(StreamMessage{data: "Stopping microcontroller..."}) + if err := platform.GetMicro().Disable(); err != nil { + return err // TODO: should we continue to stop the app even if we fail to stop the microcontroller? + } } + } - callbackWriter := NewCallbackWriter(func(line string) { - if !yield(StreamMessage{data: line}) { - cancel() - return + if app.MainPythonFile != nil { + mainCompose := app.AppComposeFilePath() + // In case the app was never started + if mainCompose.Exist() { + args := []string{ + "docker", + "compose", + "-f", mainCompose.String(), + cmd, + fmt.Sprintf("--timeout=%d", DefaultDockerStopTimeoutSeconds), + } + if cmd == "down" { + args = append(args, "--volumes", "--remove-orphans") } - }) - if _, ok := app.GetSketchPath(); ok { - // Before stopping the microcontroller we want to make sure that the app was running. - running, err := getRunningApp(ctx, docker.Client()) + process, err := paths.NewProcess(nil, args...) if err != nil { - yield(StreamMessage{error: err}) - return - } - if running != nil && running.FullPath.String() == app.FullPath.String() { - if !yield(StreamMessage{data: "Stopping microcontroller..."}) { - return - } - if err := platform.GetMicro().Disable(); err != nil { - _ = yield(StreamMessage{error: err}) - } + return err } - } - if app.MainPythonFile != nil { - mainCompose := app.AppComposeFilePath() - // In case the app was never started - if mainCompose.Exist() { - args := []string{ - "docker", - "compose", - "-f", mainCompose.String(), - cmd, - fmt.Sprintf("--timeout=%d", DefaultDockerStopTimeoutSeconds), - } - if cmd == "down" { - args = append(args, "--volumes", "--remove-orphans") - } - - process, err := paths.NewProcess(nil, args...) - if err != nil { - yield(StreamMessage{error: err}) - return - } - - process.RedirectStderrTo(callbackWriter) - process.RedirectStdoutTo(callbackWriter) - if err := process.RunWithinContext(ctx); err != nil { - yield(StreamMessage{error: err}) - return - } + process.RedirectStderrTo(callbackWriter) + process.RedirectStdoutTo(callbackWriter) + if err := process.RunWithinContext(ctx); err != nil { + return err } } - _ = yield(StreamMessage{progress: &Progress{Name: "", Progress: 100.0}}) } + cb(StreamMessage{progress: &Progress{Name: "", Progress: 100.0}}) + return nil } -func StopApp(ctx context.Context, dockerClient command.Cli, platform platform.Platform, app app.ArduinoApp) iter.Seq[StreamMessage] { - return stopAppWithCmd(ctx, dockerClient, platform, app, "stop") +func StopApp(ctx context.Context, dockerClient command.Cli, platform platform.Platform, app app.ArduinoApp, cb func(StreamMessage)) error { + return stopAppWithCmd(ctx, dockerClient, platform, app, "stop", cb) } -func StopAndDestroyApp(ctx context.Context, dockerClient command.Cli, platform platform.Platform, app app.ArduinoApp) iter.Seq[StreamMessage] { - return func(yield func(StreamMessage) bool) { - for msg := range stopAppWithCmd(ctx, dockerClient, platform, app, "down") { - if !yield(msg) { - return - } - } +func StopAndDestroyApp(ctx context.Context, dockerClient command.Cli, platform platform.Platform, app app.ArduinoApp, cb func(StreamMessage)) error { + if err := stopAppWithCmd(ctx, dockerClient, platform, app, "down", cb); err != nil { + return err + } - for msg := range cleanAppCacheFiles(app) { - if !yield(msg) { - return - } - } + if err := cleanAppCacheFiles(app, cb); err != nil { + return err } + + return nil } -func cleanAppCacheFiles(app app.ArduinoApp) iter.Seq[StreamMessage] { - return func(yield func(StreamMessage) bool) { - cachePath := app.FullPath.Join(".cache") +func cleanAppCacheFiles(app app.ArduinoApp, cb func(StreamMessage)) error { + cachePath := app.FullPath.Join(".cache") - if exists, _ := cachePath.ExistCheck(); !exists { - yield(StreamMessage{data: "No cache to clean."}) - return - } - if !yield(StreamMessage{data: "Removing app cache files..."}) { - return - } - slog.Debug("removing app cache", slog.String("path", cachePath.String())) - if err := cachePath.RemoveAll(); err != nil { - yield(StreamMessage{error: fmt.Errorf("unable to remove app cache: %w", err)}) - return - } - yield(StreamMessage{data: "Cache removed successfully."}) + if exists, _ := cachePath.ExistCheck(); !exists { + cb(StreamMessage{data: "No cache to clean."}) + return nil + } + cb(StreamMessage{data: "Removing app cache files..."}) + slog.Debug("removing app cache", slog.String("path", cachePath.String())) + if err := cachePath.RemoveAll(); err != nil { + return fmt.Errorf("unable to remove app cache: %w", err) } + cb(StreamMessage{data: "Cache removed successfully."}) + return nil } func RestartApp( @@ -436,35 +368,28 @@ func RestartApp( cfg config.Configuration, staticStore *store.StaticStore, platform platform.Platform, -) iter.Seq[StreamMessage] { - return func(yield func(StreamMessage) bool) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() - runningApp, err := getRunningApp(ctx, docker.Client()) - if err != nil { - yield(StreamMessage{error: err}) - return - } + cb func(StreamMessage), +) error { + runningApp, err := getRunningApp(ctx, docker.Client()) + if err != nil { + return err + } - if runningApp != nil { - if runningApp.FullPath.String() != appToStart.FullPath.String() { - yield(StreamMessage{error: fmt.Errorf("another app %q is running", runningApp.Name)}) - return - } + if runningApp != nil { + if runningApp.FullPath.String() != appToStart.FullPath.String() { + return fmt.Errorf("another app %q is running", runningApp.Name) + } - stopStream := StopApp(ctx, docker, platform, *runningApp) - for msg := range stopStream { - if !yield(msg) { - return - } - if msg.error != nil { - return - } - } + if err := StopApp(ctx, docker, platform, *runningApp, cb); err != nil { + return err } - startStream := StartApp(ctx, docker, provisioner, modelsIndex, bricksIndex, appToStart, cfg, staticStore, platform) - startStream(yield) } + + if err := StartApp(ctx, docker, provisioner, modelsIndex, bricksIndex, appToStart, cfg, staticStore, platform, cb); err != nil { + return err + } + + return nil } func StartDefaultApp( @@ -496,10 +421,9 @@ func StartDefaultApp( } // TODO: we need to stop all other running app before starting the default app. - for msg := range StartApp(ctx, docker, provisioner, modelsIndex, bricksIndex, *app, cfg, staticStore, platform) { - if msg.IsError() { - return fmt.Errorf("failed to start app: %w", msg.GetError()) - } + + if err := StartApp(ctx, docker, provisioner, modelsIndex, bricksIndex, *app, cfg, staticStore, platform, func(msg StreamMessage) {}); err != nil { + return fmt.Errorf("failed to start app: %w", err) } return nil @@ -870,11 +794,9 @@ func CloneApp( } func DeleteApp(ctx context.Context, dockerClient command.Cli, platform platform.Platform, app app.ArduinoApp) error { - // We try to remove docker related resources at best effort - for range StopAndDestroyApp(ctx, dockerClient, platform, app) { - // just consume the iterator - } + _ = StopAndDestroyApp(ctx, dockerClient, platform, app, func(StreamMessage) {}) + // TODO: Shall we report stop error? return app.FullPath.RemoveAll() } diff --git a/internal/orchestrator/resources.go b/internal/orchestrator/resources.go index 9c11e6d05..fcb49d0b6 100644 --- a/internal/orchestrator/resources.go +++ b/internal/orchestrator/resources.go @@ -20,7 +20,6 @@ package orchestrator import ( "context" "errors" - "iter" "log/slog" "syscall" "time" @@ -29,7 +28,6 @@ import ( "github.com/shirou/gopsutil/v4/disk" "github.com/shirou/gopsutil/v4/mem" - "github.com/arduino/arduino-app-cli/internal/helpers" "github.com/arduino/arduino-app-cli/internal/orchestrator/config" ) @@ -64,7 +62,7 @@ type SystemResourceConfig struct { DiskScrapeInterval time.Duration } -func SystemResources(ctx context.Context, cfg config.Configuration, resourceCfg *SystemResourceConfig) (iter.Seq[SystemResource], error) { +func SystemResources(ctx context.Context, cfg config.Configuration, resourceCfg *SystemResourceConfig, cb func(SystemResource)) error { if resourceCfg == nil { resourceCfg = &SystemResourceConfig{ CPUScrapeInterval: time.Second * 5, @@ -76,13 +74,13 @@ func SystemResources(ctx context.Context, cfg config.Configuration, resourceCfg firstMessagesToSend := []SystemResource{} memory, err := mem.VirtualMemory() if err != nil { - return helpers.EmptyIter[SystemResource](), err + return err } firstMessagesToSend = append(firstMessagesToSend, &SystemMemoryResource{Used: memory.Used, Total: memory.Total}) cpuStats, err := cpu.Percent(0, false) if err != nil { - return helpers.EmptyIter[SystemResource](), err + return err } firstMessagesToSend = append(firstMessagesToSend, &SystemCPUResource{UsedPercent: cpuStats[0]}) @@ -90,63 +88,53 @@ func SystemResources(ctx context.Context, cfg config.Configuration, resourceCfg for _, path := range diskPaths { diskStats, err := disk.Usage(path) if err != nil && !errors.Is(err, syscall.ENOENT) { - return helpers.EmptyIter[SystemResource](), err + return err } if diskStats != nil { firstMessagesToSend = append(firstMessagesToSend, &SystemDiskResource{Path: path, Used: diskStats.Used, Total: diskStats.Total}) } } - return func(yield func(SystemResource) bool) { - for _, msg := range firstMessagesToSend { - if !yield(msg) { - return - } - } + for _, msg := range firstMessagesToSend { + cb(msg) + } - cpuTicker := time.NewTicker(resourceCfg.CPUScrapeInterval) - defer cpuTicker.Stop() + cpuTicker := time.NewTicker(resourceCfg.CPUScrapeInterval) + defer cpuTicker.Stop() - memoryTicker := time.NewTicker(resourceCfg.MemoryScrapeInterval) - defer memoryTicker.Stop() + memoryTicker := time.NewTicker(resourceCfg.MemoryScrapeInterval) + defer memoryTicker.Stop() - diskTicker := time.NewTicker(resourceCfg.DiskScrapeInterval) - defer diskTicker.Stop() + diskTicker := time.NewTicker(resourceCfg.DiskScrapeInterval) + defer diskTicker.Stop() - for { - select { - case <-cpuTicker.C: - cpuStats, err := cpu.Percent(0, false) - if err != nil { - slog.Warn("Failed to get CPU usage", "error", err) - continue - } - if !yield(&SystemCPUResource{UsedPercent: cpuStats[0]}) { - return - } - case <-memoryTicker.C: - memory, err := mem.VirtualMemory() + for { + select { + case <-cpuTicker.C: + cpuStats, err := cpu.Percent(0, false) + if err != nil { + slog.Warn("Failed to get CPU usage", "error", err) + continue + } + cb(&SystemCPUResource{UsedPercent: cpuStats[0]}) + case <-memoryTicker.C: + memory, err := mem.VirtualMemory() + if err != nil { + slog.Warn("Failed to get memory usage", "error", err) + continue + } + cb(&SystemMemoryResource{Used: memory.Used, Total: memory.Total}) + case <-diskTicker.C: + for _, path := range diskPaths { + diskStats, err := disk.Usage(path) if err != nil { - slog.Warn("Failed to get memory usage", "error", err) + slog.Warn("Failed to get disk usage", "path", path, "error", err) continue } - if !yield(&SystemMemoryResource{Used: memory.Used, Total: memory.Total}) { - return - } - case <-diskTicker.C: - for _, path := range diskPaths { - diskStats, err := disk.Usage(path) - if err != nil { - slog.Warn("Failed to get disk usage", "path", path, "error", err) - continue - } - if !yield(&SystemDiskResource{Path: path, Used: diskStats.Used, Total: diskStats.Total}) { - return - } - } - case <-ctx.Done(): - return + cb(&SystemDiskResource{Path: path, Used: diskStats.Used, Total: diskStats.Total}) } + case <-ctx.Done(): + return nil } - }, nil + } } diff --git a/internal/orchestrator/system.go b/internal/orchestrator/system.go index 3f9b3db84..7cee6416f 100644 --- a/internal/orchestrator/system.go +++ b/internal/orchestrator/system.go @@ -327,13 +327,12 @@ func SystemCleanup(ctx context.Context, cfg config.Configuration, staticStore *s feedback.Warnf("failed to get running app - %v", err) } if runningApp != nil { - for item := range StopAndDestroyApp(ctx, docker, platform, *runningApp) { - if item.GetType() == ErrorType { - feedback.Warnf("failed to stop and destroy running app - %v", item.GetError()) - break - } + err := StopAndDestroyApp(ctx, docker, platform, *runningApp, func(item StreamMessage) {}) + if err != nil { + feedback.Warnf("failed to stop and destroy running app - %v", err) + } else { + result.RunningAppRemoved = true } - result.RunningAppRemoved = true } // Remove dangling stuff diff --git a/internal/update/apt/service.go b/internal/update/apt/service.go index 71e074787..8839ced6d 100644 --- a/internal/update/apt/service.go +++ b/internal/update/apt/service.go @@ -22,7 +22,6 @@ import ( "context" "fmt" "io" - "iter" "log/slog" "regexp" "strings" @@ -98,59 +97,46 @@ func (s *Service) UpgradePackages(ctx context.Context, packages []update.Package return pkg.Name }) eventCB(update.NewDataEvent(update.StartEvent, "Upgrade is starting")) - stream := runUpgradeCommand(ctx, names) - for line, err := range stream { - if err != nil { - return fmt.Errorf("error running upgrade command: %w", err) - } + if err := runUpgradeCommand(ctx, names, func(line string) { eventCB(update.NewDataEvent(update.UpgradeLineEvent, line)) + }); err != nil { + return fmt.Errorf("error running upgrade command: %w", err) } eventCB(update.NewDataEvent(update.StartEvent, "apt cleaning cache is starting")) - for line, err := range runAptCleanCommand(ctx) { - if err != nil { - return fmt.Errorf("error running apt clean command: %w", err) - } + if err := runAptCleanCommand(ctx, func(line string) { eventCB(update.NewDataEvent(update.UpgradeLineEvent, line)) + }); err != nil { + return fmt.Errorf("error running apt clean command: %w", err) } eventCB(update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images ...")) - for line, err := range pullDockerImages(ctx) { - if err != nil { - // In case of errors, including "out of disk space" erros, do a cleanup and then retry once. - - eventCB(update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images, to free up space ...")) - streamCleanup := cleanupDockerContainers(ctx) - for line, err := range streamCleanup { - if err != nil { - slog.Warn("Error during cleanup of container and images", "error", err) - } else { - eventCB(update.NewDataEvent(update.UpgradeLineEvent, line)) - } - } - - // Try again to pull the docker containers. - eventCB(update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images (again) ...")) - for line, err := range pullDockerImages(ctx) { - if err != nil { - return fmt.Errorf("error pulling docker images: %w", err) - } - eventCB(update.NewDataEvent(update.UpgradeLineEvent, line)) - } - } else { + if err := pullDockerImages(ctx, func(line string) { + eventCB(update.NewDataEvent(update.UpgradeLineEvent, line)) + }); err != nil { + // In case of errors, including "out of disk space" errors, do a cleanup and then retry once. + eventCB(update.NewDataEvent(update.UpgradeLineEvent, "Stop and destroy docker containers and images, to free up space ...")) + if err := cleanupDockerContainers(ctx, func(line string) { + eventCB(update.NewDataEvent(update.UpgradeLineEvent, line)) + }); err != nil { + slog.Warn("Error during cleanup of container and images", "error", err) + } + + // Try again to pull the docker containers. + eventCB(update.NewDataEvent(update.UpgradeLineEvent, "Pulling the latest docker images (again) ...")) + if err := pullDockerImages(ctx, func(line string) { eventCB(update.NewDataEvent(update.UpgradeLineEvent, line)) + }); err != nil { + return fmt.Errorf("error pulling docker images: %w", err) } } // After pulling new images is completed, remove old images to free up space. eventCB(update.NewDataEvent(update.UpgradeLineEvent, "Cleanup docker containers and images, to remove old unused images")) - streamCleanup := cleanupDockerContainers(ctx) - for line, err := range streamCleanup { - if err != nil { - slog.Warn("Error during cleanup of container and images", "error", err) - } else { - eventCB(update.NewDataEvent(update.UpgradeLineEvent, line)) - } + if err := cleanupDockerContainers(ctx, func(line string) { + eventCB(update.NewDataEvent(update.UpgradeLineEvent, line)) + }); err != nil { + slog.Warn("Error during cleanup of container and images", "error", err) } return nil @@ -180,7 +166,7 @@ func runUpdateCommand(ctx context.Context) error { return nil } -func runUpgradeCommand(ctx context.Context, names []string) iter.Seq2[string, error] { +func runUpgradeCommand(ctx context.Context, names []string, cb func(string)) error { env := []string{"NEEDRESTART_MODE=l"} aptOptions := []string{ @@ -193,105 +179,76 @@ func runUpgradeCommand(ctx context.Context, names []string) iter.Seq2[string, er args = append(args, aptOptions...) args = append(args, names...) - return func(yield func(string, error) bool) { - cmd, err := paths.NewProcess(env, args...) - if err != nil { - _ = yield("", err) - return - } - - stdout := orchestrator.NewCallbackWriter(func(line string) { - if !yield(line, nil) { - if err := cmd.Kill(); err != nil { - slog.Error("Failed to kill upgrade command", slog.String("error", err.Error())) - } - } - }) - cmd.RedirectStderrTo(stdout) - cmd.RedirectStdoutTo(stdout) - - if err := cmd.RunWithinContext(ctx); err != nil { - _ = yield("", err) - return - } + cmd, err := paths.NewProcess(env, args...) + if err != nil { + return err } + stdout := orchestrator.NewCallbackWriter(func(line string) { + cb(line) + }) + cmd.RedirectStderrTo(stdout) + cmd.RedirectStdoutTo(stdout) + + if err := cmd.RunWithinContext(ctx); err != nil { + return err + } + return nil } -func runAptCleanCommand(ctx context.Context) iter.Seq2[string, error] { - return func(yield func(string, error) bool) { - cmd, err := paths.NewProcess(nil, "sudo", "apt-get", "clean", "-y") - if err != nil { - _ = yield("", err) - return - } +func runAptCleanCommand(ctx context.Context, cb func(string)) error { + cmd, err := paths.NewProcess(nil, "sudo", "apt-get", "clean", "-y") + if err != nil { + return err + } - stdout := orchestrator.NewCallbackWriter(func(line string) { - if !yield(line, nil) { - if err := cmd.Kill(); err != nil { - slog.Error("Failed to kill apt clean command", slog.String("error", err.Error())) - } - } - }) - cmd.RedirectStderrTo(stdout) - cmd.RedirectStdoutTo(stdout) - - if err := cmd.RunWithinContext(ctx); err != nil { - _ = yield("", err) - return - } + stdout := orchestrator.NewCallbackWriter(func(line string) { + cb(line) + }) + cmd.RedirectStderrTo(stdout) + cmd.RedirectStdoutTo(stdout) + + if err := cmd.RunWithinContext(ctx); err != nil { + return err } + return nil } -func pullDockerImages(ctx context.Context) iter.Seq2[string, error] { - return func(yield func(string, error) bool) { - cmd, err := paths.NewProcess(nil, "arduino-app-cli", "system", "init") - if err != nil { - _ = yield("", err) - return - } +func pullDockerImages(ctx context.Context, cb func(string)) error { + cmd, err := paths.NewProcess(nil, "arduino-app-cli", "system", "init") + if err != nil { + return err + } - stdout := orchestrator.NewCallbackWriter(func(line string) { - if !yield(line, nil) { - if err := cmd.Kill(); err != nil { - slog.Error("Failed to kill 'arduino-app-cli system init' command", slog.String("error", err.Error())) - } - } - }) - cmd.RedirectStderrTo(stdout) - cmd.RedirectStdoutTo(stdout) - - if err = cmd.RunWithinContext(ctx); err != nil { - _ = yield("", err) - return - } + stdout := orchestrator.NewCallbackWriter(func(line string) { + cb(line) + }) + cmd.RedirectStderrTo(stdout) + cmd.RedirectStdoutTo(stdout) + + if err = cmd.RunWithinContext(ctx); err != nil { + return err } + return nil } // Remove all stopped containers -func cleanupDockerContainers(ctx context.Context) iter.Seq2[string, error] { - return func(yield func(string, error) bool) { - cmd, err := paths.NewProcess(nil, "arduino-app-cli", "system", "cleanup") - if err != nil { - _ = yield("", err) - return - } +func cleanupDockerContainers(ctx context.Context, cb func(string)) error { + cmd, err := paths.NewProcess(nil, "arduino-app-cli", "system", "cleanup") + if err != nil { + return err + } - stdout := orchestrator.NewCallbackWriter(func(line string) { - if !yield(line, nil) { - if err := cmd.Kill(); err != nil { - slog.Error("Failed to kill 'arduino-app-cli system cleanup' command", slog.String("error", err.Error())) - } - } - }) - cmd.RedirectStderrTo(stdout) - cmd.RedirectStdoutTo(stdout) - - if err = cmd.RunWithinContext(ctx); err != nil { - _ = yield("", err) - return - } + stdout := orchestrator.NewCallbackWriter(func(line string) { + cb(line) + }) + cmd.RedirectStderrTo(stdout) + cmd.RedirectStdoutTo(stdout) + + if err = cmd.RunWithinContext(ctx); err != nil { + return err } + return nil } // RestartServices restarts services that need to be restarted after an upgrade.