Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions cmd/arduino-app-cli/app/destroy.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,15 @@ func newDestroyCmd(cfg config.Configuration) *cobra.Command {
func destroyHandler(ctx context.Context, app app.ArduinoApp) error {
out, _, getResult := feedback.OutputStreams()

for message := range orchestrator.StopAndDestroyApp(ctx, servicelocator.GetDockerClient(), servicelocator.GetPlatform(), app) {
if err := orchestrator.StopAndDestroyApp(ctx, servicelocator.GetDockerClient(), servicelocator.GetPlatform(), app, func(message orchestrator.StreamMessage) {
switch message.GetType() {
case orchestrator.ProgressType:
fmt.Fprintf(out, "Progress[%s]: %.0f%%\n", message.GetProgress().Name, message.GetProgress().Progress)
case orchestrator.InfoType:
fmt.Fprintln(out, "[INFO]", message.GetData())
case orchestrator.ErrorType:
feedback.Fatal(message.GetError().Error(), feedback.ErrGeneric)
return nil
}
}); err != nil {
feedback.Fatal(err.Error(), feedback.ErrGeneric)
}
outputResult := getResult()

Expand Down
12 changes: 5 additions & 7 deletions cmd/arduino-app-cli/app/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,17 @@ func logsHandler(ctx context.Context, app app.ArduinoApp, tail *uint64, follow,
if all {
cfg.ShowServicesLogs = true
}
logsIter, err := orchestrator.AppLogs(
if err := orchestrator.AppLogs(
ctx,
app,
cfg,
servicelocator.GetDockerClient(),
servicelocator.GetBricksIndex(),
)
if err != nil {
func(msg orchestrator.LogMessage) {
fmt.Fprintf(stdout, "[%s] %s\n", msg.Name, msg.Content)
},
); err != nil {
feedback.Fatal(err.Error(), feedback.ErrGeneric)
return nil
}
for msg := range logsIter {
fmt.Fprintf(stdout, "[%s] %s\n", msg.Name, msg.Content)
}
return nil
}
24 changes: 12 additions & 12 deletions cmd/arduino-app-cli/app/restart.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func newRestartCmd(cfg config.Configuration) *cobra.Command {
func restartHandler(ctx context.Context, cfg config.Configuration, app app.ArduinoApp) error {
out, _, getResult := feedback.OutputStreams()

stream := orchestrator.RestartApp(
err := orchestrator.RestartApp(
ctx,
servicelocator.GetDockerClient(),
servicelocator.GetProvisioner(),
Expand All @@ -66,18 +66,18 @@ func restartHandler(ctx context.Context, cfg config.Configuration, app app.Ardui
cfg,
servicelocator.GetStaticStore(),
servicelocator.GetPlatform(),
func(message orchestrator.StreamMessage) {
switch message.GetType() {
case orchestrator.ProgressType:
fmt.Fprintf(out, "Progress[%s]: %.0f%%\n", message.GetProgress().Name, message.GetProgress().Progress)
case orchestrator.InfoType:
fmt.Fprintln(out, "[INFO]", message.GetData())
}
},
)
for message := range stream {
switch message.GetType() {
case orchestrator.ProgressType:
fmt.Fprintf(out, "Progress[%s]: %.0f%%\n", message.GetProgress().Name, message.GetProgress().Progress)
case orchestrator.InfoType:
fmt.Fprintln(out, "[INFO]", message.GetData())
case orchestrator.ErrorType:
errMesg := cases.Title(language.AmericanEnglish).String(message.GetError().Error())
feedback.Fatal(fmt.Sprintf("[ERROR] %s", errMesg), feedback.ErrGeneric)
return nil
}
if err != nil {
errMesg := cases.Title(language.AmericanEnglish).String(err.Error())
feedback.Fatal(fmt.Sprintf("[ERROR] %s", errMesg), feedback.ErrGeneric)
}

outputResult := getResult()
Expand Down
25 changes: 13 additions & 12 deletions cmd/arduino-app-cli/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func newStartCmd(cfg config.Configuration) *cobra.Command {
func startHandler(ctx context.Context, cfg config.Configuration, app app.ArduinoApp) error {
out, _, getResult := feedback.OutputStreams()

stream := orchestrator.StartApp(
err := orchestrator.StartApp(
ctx,
servicelocator.GetDockerClient(),
servicelocator.GetProvisioner(),
Expand All @@ -68,19 +68,20 @@ func startHandler(ctx context.Context, cfg config.Configuration, app app.Arduino
cfg,
servicelocator.GetStaticStore(),
servicelocator.GetPlatform(),
func(message orchestrator.StreamMessage) {
switch message.GetType() {
case orchestrator.ProgressType:
fmt.Fprintf(out, "Progress[%s]: %.0f%%\n", message.GetProgress().Name, message.GetProgress().Progress)
case orchestrator.InfoType:
fmt.Fprintln(out, "[INFO]", message.GetData())
}
},
)
for message := range stream {
switch message.GetType() {
case orchestrator.ProgressType:
fmt.Fprintf(out, "Progress[%s]: %.0f%%\n", message.GetProgress().Name, message.GetProgress().Progress)
case orchestrator.InfoType:
fmt.Fprintln(out, "[INFO]", message.GetData())
case orchestrator.ErrorType:
errMesg := cases.Title(language.AmericanEnglish).String(message.GetError().Error())
feedback.Fatal(fmt.Sprintf("[ERROR] %s", errMesg), feedback.ErrGeneric)
return nil
}
if err != nil {
errMesg := cases.Title(language.AmericanEnglish).String(err.Error())
feedback.Fatal(fmt.Sprintf("[ERROR] %s", errMesg), feedback.ErrGeneric)
}

outputResult := getResult()
feedback.PrintResult(startAppResult{
AppName: app.Name,
Expand Down
7 changes: 3 additions & 4 deletions cmd/arduino-app-cli/app/stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,16 +56,15 @@ func newStopCmd(cfg config.Configuration) *cobra.Command {
func stopHandler(ctx context.Context, app app.ArduinoApp) error {
out, _, getResult := feedback.OutputStreams()

for message := range orchestrator.StopApp(ctx, servicelocator.GetDockerClient(), servicelocator.GetPlatform(), app) {
if err := orchestrator.StopApp(ctx, servicelocator.GetDockerClient(), servicelocator.GetPlatform(), app, func(message orchestrator.StreamMessage) {
switch message.GetType() {
case orchestrator.ProgressType:
fmt.Fprintf(out, "Progress[%s]: %.0f%%\n", message.GetProgress().Name, message.GetProgress().Progress)
case orchestrator.InfoType:
fmt.Fprintln(out, "[INFO]", message.GetData())
case orchestrator.ErrorType:
feedback.Fatal(message.GetError().Error(), feedback.ErrGeneric)
return nil
}
}); err != nil {
feedback.Fatal(err.Error(), feedback.ErrGeneric)
}
outputResult := getResult()

Expand Down
15 changes: 6 additions & 9 deletions internal/api/handlers/app_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,20 +95,17 @@ func HandleAppLogs(
BrickID string `json:"brick_id,omitempty"`
Message string `json:"message"`
}
messagesIter, err := orchestrator.AppLogs(r.Context(), app, appLogsRequest, dockerClient, bricksIndex)
if err != nil {
sseStream.SendError(render.SSEErrorData{
Code: render.InternalServiceErr,
Message: "failed to start the app",
})
return
}
for item := range messagesIter {
if err := orchestrator.AppLogs(r.Context(), app, appLogsRequest, dockerClient, bricksIndex, func(item orchestrator.LogMessage) {
sseStream.Send(render.SSEEvent{Type: "message", Data: log{
ID: item.Name,
Message: item.Content,
BrickID: item.BrickName,
}})
}); err != nil {
sseStream.SendError(render.SSEErrorData{
Code: render.InternalServiceErr,
Message: "failed to start the app",
})
}
}
}
13 changes: 7 additions & 6 deletions internal/api/handlers/app_start.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,19 @@ func HandleAppStart(
type log struct {
Message string `json:"message"`
}
for item := range orchestrator.StartApp(r.Context(), dockerCli, provisioner, modelsIndex, bricksIndex, app, cfg, staticStore, platform) {
err = orchestrator.StartApp(r.Context(), dockerCli, provisioner, modelsIndex, bricksIndex, app, cfg, staticStore, platform, func(item orchestrator.StreamMessage) {
switch item.GetType() {
case orchestrator.ProgressType:
sseStream.Send(render.SSEEvent{Type: "progress", Data: progress(*item.GetProgress())})
case orchestrator.InfoType:
sseStream.Send(render.SSEEvent{Type: "message", Data: log{Message: item.GetData()}})
case orchestrator.ErrorType:
sseStream.SendError(render.SSEErrorData{
Code: render.InternalServiceErr,
Message: item.GetError().Error(),
})
}
})
if err != nil {
sseStream.SendError(render.SSEErrorData{
Code: render.InternalServiceErr,
Message: err.Error(),
})
}
}
}
8 changes: 3 additions & 5 deletions internal/api/handlers/app_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,10 @@ func HandlerAppStatus(
}
}

for appStatus, err := range orchestrator.AppStatusEvents(r.Context(), cfg, dockerCli, idProvider) {
if err != nil {
sseStream.SendError(render.SSEErrorData{Code: render.InternalServiceErr, Message: err.Error()})
continue
}
if err := orchestrator.AppStatusEvents(r.Context(), cfg, dockerCli, idProvider, func(appStatus orchestrator.AppInfo) {
sseStream.Send(render.SSEEvent{Type: "app", Data: appStatus})
}); err != nil {
sseStream.SendError(render.SSEErrorData{Code: render.InternalServiceErr, Message: err.Error()})
Comment on lines -57 to +60
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happen now if appstatusEvents returns an error? With iterators, there was a continue, and the SSE streaming stayed alive. Now any error terminate the session?

}
}
}
13 changes: 7 additions & 6 deletions internal/api/handlers/app_stop.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,18 +64,19 @@ func HandleAppStop(
type log struct {
Message string `json:"message"`
}
for item := range orchestrator.StopApp(r.Context(), dockerClient, platform, app) {
err = orchestrator.StopApp(r.Context(), dockerClient, platform, app, func(item orchestrator.StreamMessage) {
switch item.GetType() {
case orchestrator.ProgressType:
sseStream.Send(render.SSEEvent{Type: "progress", Data: progress(*item.GetProgress())})
case orchestrator.InfoType:
sseStream.Send(render.SSEEvent{Type: "message", Data: log{Message: item.GetData()}})
case orchestrator.ErrorType:
sseStream.SendError(render.SSEErrorData{
Code: render.InternalServiceErr,
Message: item.GetError().Error(),
})
}
})
if err != nil {
sseStream.SendError(render.SSEErrorData{
Code: render.InternalServiceErr,
Message: err.Error(),
})
}
}
}
15 changes: 6 additions & 9 deletions internal/api/handlers/system_resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,7 @@ func HandleSystemResources(cfg config.Configuration) http.HandlerFunc {
}
defer sseStream.Close()

resources, err := orchestrator.SystemResources(ctx, cfg, nil)
if err != nil {
sseStream.SendError(render.SSEErrorData{
Code: render.InternalServiceErr,
Message: "failed to obtain the resources",
})
return
}
for resource := range resources {
if err := orchestrator.SystemResources(ctx, cfg, nil, func(resource orchestrator.SystemResource) {
switch res := resource.(type) {
case *orchestrator.SystemDiskResource:
sseStream.Send(render.SSEEvent{Type: "disk", Data: res})
Expand All @@ -55,6 +47,11 @@ func HandleSystemResources(cfg config.Configuration) http.HandlerFunc {
case *orchestrator.SystemMemoryResource:
sseStream.Send(render.SSEEvent{Type: "mem", Data: res})
}
}); err != nil {
sseStream.SendError(render.SSEErrorData{
Code: render.InternalServiceErr,
Message: "failed to obtain the resources",
})
}
}
}
89 changes: 45 additions & 44 deletions internal/e2e/updatetest/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (
"context"
"encoding/json"
"fmt"
"iter"
"log"
"net"
"net/http"
Expand Down Expand Up @@ -263,51 +262,51 @@ func putUpdateRequest(t *testing.T, host string) {

}

func NewSSEClient(ctx context.Context, method, url string) iter.Seq2[Event, error] {
return func(yield func(Event, error) bool) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
_ = yield(Event{}, err)
return
}
func newSSEClient(ctx context.Context, method, url string, cb func(Event)) error {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
if err != nil {
return err
}

resp, err := http.DefaultClient.Do(req)
if err != nil {
_ = yield(Event{}, err)
return
resp, err := http.DefaultClient.Do(req)
if err != nil {
if ctx.Err() != nil {
return nil
}
defer resp.Body.Close()
return err
}
defer resp.Body.Close()

if resp.StatusCode != 200 {
_ = yield(Event{}, fmt.Errorf("got response status code %d", resp.StatusCode))
return
}
if resp.StatusCode != 200 {
return fmt.Errorf("got response status code %d", resp.StatusCode)
}

reader := bufio.NewReader(resp.Body)
reader := bufio.NewReader(resp.Body)

evt := Event{}
for {
line, err := reader.ReadString('\n')
if err != nil {
_ = yield(Event{}, err)
return
evt := Event{}
for {
line, err := reader.ReadString('\n')
if err != nil {
if ctx.Err() != nil {
return nil
}
switch {
case strings.HasPrefix(line, "data:"):
evt.Data = []byte(strings.TrimSpace(strings.TrimPrefix(line, "data:")))
case strings.HasPrefix(line, "event:"):
evt.Event = strings.TrimSpace(strings.TrimPrefix(line, "event:"))
case strings.HasPrefix(line, "id:"):
evt.ID = strings.TrimSpace(strings.TrimPrefix(line, "id:"))
case strings.HasPrefix(line, "\n"):
if !yield(evt, nil) {
return
}
evt = Event{}
default:
_ = yield(Event{}, fmt.Errorf("unknown line: '%s'", line))
return
return err
}
switch {
case strings.HasPrefix(line, "data:"):
evt.Data = []byte(strings.TrimSpace(strings.TrimPrefix(line, "data:")))
case strings.HasPrefix(line, "event:"):
evt.Event = strings.TrimSpace(strings.TrimPrefix(line, "event:"))
case strings.HasPrefix(line, "id:"):
evt.ID = strings.TrimSpace(strings.TrimPrefix(line, "id:"))
case strings.HasPrefix(line, "\n"):
cb(evt)
if ctx.Err() != nil {
return nil
}
evt = Event{}
default:
return fmt.Errorf("unknown line: '%s'", line)
}
}
}
Expand Down Expand Up @@ -338,13 +337,15 @@ func waitForUpgrade(t *testing.T, host string) {

url := fmt.Sprintf("http://%s/v1/system/update/events", host)

itr := NewSSEClient(t.Context(), "GET", url)
for event, err := range itr {
require.NoError(t, err)
ctx, cancel := context.WithCancel(t.Context())
defer cancel()

if err := newSSEClient(ctx, "GET", url, func(event Event) {
t.Logf("Received event: ID=%s, Event=%s, Data=%s\n", event.ID, event.Event, string(event.Data))
if event.Event == "restarting" {
break
cancel()
}
}); err != nil {
require.NoError(t, err)
}

}
Loading
Loading