Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optional Message to Start/Stop Workloads #479

Merged
merged 3 commits into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions api/nodecontrol/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"github.com/synadia-io/nex/models"
)

var (
DefaultRequestTimeout = 5 * time.Second
)
var DefaultRequestTimeout = 5 * time.Second

type ControlAPIClient struct {
nc *nats.Conn
Expand Down
6 changes: 6 additions & 0 deletions api/nodecontrol/gen/start_workload_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions api/nodecontrol/gen/stop_workload_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion api/nodecontrol/start-workload-response.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
},
"started": {
"type": "boolean"
},
"message": {
"type": "string"
}
},
"required": [
"id",
"issuer",
"name",
"started"
"started",
"message"
],
"definitions": {},
"additionalProperties": false
Expand Down
6 changes: 5 additions & 1 deletion api/nodecontrol/stop-workload-response.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
},
"stopped": {
"type": "boolean"
},
"message": {
"type": "string"
}
},
"required": [
"id",
"issuer",
"stopped"
"stopped",
"message"
],
"definitions": {},
"additionalProperties": false
Expand Down
4 changes: 4 additions & 0 deletions cmd/nex/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ type Up struct {
HideWorkloadLogs bool `name:"hide-workload-logs" help:"Hide logs from workloads" default:"false"`
ShowSystemLogs bool `name:"system-logs" help:"Show verbose level logs from inside actor framework" default:"false" hidden:""`
OCICache string `name:"oci-cache" help:"Path to OCI cache registry" placeholder:"localhost:5000"`
StartMessage string `name:"start-message" help:"Message to display on the successful start of a workload" placeholder:"Great job!"`
StopMessage string `name:"stop-message" help:"Message to display on the successful stop of a workload" placeholder:"Goodbye!"`

HostServicesConfig HostServicesConfig `embed:"" prefix:"hostservices." group:"Host Services Configuration"`
OtelConfig OtelConfig `embed:"" prefix:"otel." group:"OpenTelemetry Configuration"`
Expand Down Expand Up @@ -435,6 +437,8 @@ func (u Up) Run(ctx context.Context, globals *Globals, n *Node) error {
options.WithValidIssuers(u.ValidIssuers),
options.WithOCICacheRegistry(u.OCICache),
options.WithDevMode(globals.DevMode),
options.WithStartWorkloadMessage(u.StartMessage),
options.WithStopWorkloadMessage(u.StopMessage),
options.WithOTelOptions(options.OTelOptions{
MetricsEnabled: u.OtelConfig.OtelMetrics,
MetricsPort: u.OtelConfig.OtelMetricsPort,
Expand Down
13 changes: 10 additions & 3 deletions cmd/nex/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import (
"net/http"
"os"
"path/filepath"
"time"

"strings"
"time"

"github.com/jedib0t/go-pretty/v6/table"
"github.com/synadia-io/nex/api/nodecontrol"
Expand Down Expand Up @@ -239,8 +238,14 @@ func (r RunWorkload) Run(ctx context.Context, globals *Globals, w *Workload) err
if resp.Started {
if r.NodeId != "" {
fmt.Printf("Workload %s [%s] started on node %s\n", r.WorkloadName, resp.Id, r.NodeId)
if resp.Message != "" {
fmt.Println(resp.Message)
}
} else {
fmt.Printf("Workload %s [%s] started\n", r.WorkloadName, resp.Id)
if resp.Message != "" {
fmt.Println(resp.Message)
}
}
} else {
fmt.Printf("Workload %s failed to start\n", r.WorkloadName)
Expand Down Expand Up @@ -289,6 +294,9 @@ func (s StopWorkload) Run(ctx context.Context, globals *Globals, w *Workload) er

if resp.Stopped {
fmt.Printf("Workload %s stopped\n", s.WorkloadId)
if resp.Message != "" {
fmt.Println(resp.Message)
}
} else {
fmt.Printf("Workload %s failed to stop\n", s.WorkloadId)
}
Expand Down Expand Up @@ -779,7 +787,6 @@ func (b BundleWorkload) Run(ctx context.Context, globals *Globals) error {
_, err = io.Copy(tw, file)
return err
})

if err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions models/node_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type NodeOptions struct {
OCICacheRegistry string
DevMode bool

StartWorkloadMessage string
StopWorkloadMessage string

Errs error
}

Expand Down Expand Up @@ -136,6 +139,18 @@ func WithDevMode(b bool) NodeOption {
}
}

func WithStartWorkloadMessage(s string) NodeOption {
return func(n *NodeOptions) {
n.StartWorkloadMessage = s
}
}

func WithStopWorkloadMessage(s string) NodeOption {
return func(n *NodeOptions) {
n.StopWorkloadMessage = s
}
}

type OTelOptions struct {
MetricsEnabled bool
MetricsExporter string
Expand Down
22 changes: 14 additions & 8 deletions node/internal/actors/control_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
actorproto "github.com/synadia-io/nex/node/internal/actors/pb"
)

const ControlAPIActorName = "control_api"
const DefaultAskDuration = 10 * time.Second
const (
ControlAPIActorName = "control_api"
DefaultAskDuration = 10 * time.Second
)

const (
AuctionResponseType = "io.nats.nex.v2.auction_response"
Expand Down Expand Up @@ -59,6 +61,8 @@ type NodeCallback interface {
EncryptPayload([]byte, string) ([]byte, string, error)
DecryptPayload([]byte) ([]byte, error)
EmitEvent(string, json.RawMessage) error
StartWorkloadMessage() string
StopWorkloadMessage() string
}

type StateCallback interface {
Expand Down Expand Up @@ -95,7 +99,6 @@ func (a *ControlAPI) PreStart(ctx context.Context) error {
}

func (a *ControlAPI) PostStop(ctx context.Context) error {

return nil
}

Expand Down Expand Up @@ -362,8 +365,9 @@ func (api *ControlAPI) handleDeploy(m *nats.Msg) {
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err))
return
}

models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(&workloadStarted), "")
resp := startResponseFromProto(&workloadStarted)
resp.Message = api.nodeCallback.StartWorkloadMessage()
models.RespondEnvelope(m, RunResponseType, 200, resp, "")
}

func (api *ControlAPI) handleUndeploy(m *nats.Msg) {
Expand All @@ -386,7 +390,7 @@ findWorkload:
for _, grandchild := range child.Children() { // iterate over all workloads
if grandchild.Name() == workloadId {
askResp, err = api.self.Ask(context.Background(), child, &actorproto.StopWorkload{Namespace: namespace, WorkloadId: workloadId}, DefaultAskDuration)
//err = api.self.Tell(context.Background(), child, &actorproto.StopWorkload{Namespace: namespace, WorkloadId: workloadId})
// err = api.self.Tell(context.Background(), child, &actorproto.StopWorkload{Namespace: namespace, WorkloadId: workloadId})
if err != nil {
api.logger.Error("Failed to stop workload", slog.Any("error", err))
models.RespondEnvelope(m, StopResponseType, 500, "", fmt.Sprintf("Failed to stop workload: %s", err))
Expand Down Expand Up @@ -418,7 +422,9 @@ findWorkload:
models.RespondEnvelope(m, StopResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err))
return
}
models.RespondEnvelope(m, StopResponseType, 200, stopResponseFromProto(&workloadStopped), "")
resp := stopResponseFromProto(&workloadStopped)
resp.Message = api.nodeCallback.StopWorkloadMessage()
models.RespondEnvelope(m, StopResponseType, 200, resp, "")
}

func (api *ControlAPI) handleInfo(m *nats.Msg) {
Expand Down Expand Up @@ -477,7 +483,7 @@ func (api *ControlAPI) handleLameDuck(m *nats.Msg) {
}

ticker := time.NewTicker(100 * time.Millisecond)
for _ = range ticker.C {
for range ticker.C {
if agentSuper.ChildrenCount() == 0 {
ticker.Stop()
cancel()
Expand Down
29 changes: 18 additions & 11 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ func NewNexNode(serverKey nkeys.KeyPair, nc *nats.Conn, opts ...models.NodeOptio
HostServiceOptions: models.HostServiceOptions{
Services: make(map[string]models.ServiceConfig),
},
OCICacheRegistry: "",
DevMode: false,
OCICacheRegistry: "",
DevMode: false,
StartWorkloadMessage: "",
StopWorkloadMessage: "",
},
}

Expand Down Expand Up @@ -209,9 +211,9 @@ func (nn *nexNode) initializeSupervisionTree() error {
goakt.WithPassivationDisabled(),
// In the non-v2 version of goakt, these functions were supported.
// TODO: figure out why they're gone or how we can plug in our own impls
//goakt.WithTelemetry(telemetry),
//goakt.WithTracing(),
//goakt.WithSupervisorDirective(restartDirective),
// goakt.WithTelemetry(telemetry),
// goakt.WithTracing(),
// goakt.WithSupervisorDirective(restartDirective),
goakt.WithActorInitMaxRetries(3))
if err != nil {
return err
Expand All @@ -230,7 +232,6 @@ func (nn *nexNode) initializeSupervisionTree() error {
agentSuper, err := nn.actorSystem.Spawn(nn.ctx, actors.AgentSupervisorActorName, actors.CreateAgentSupervisor(nn.actorSystem, *nn.options),
goakt.WithSupervisorStrategies(goakt.NewSupervisorStrategy(nil, restartDirective)),
)

if err != nil {
return err
}
Expand All @@ -248,7 +249,6 @@ func (nn *nexNode) initializeSupervisionTree() error {
_, err = nn.actorSystem.Spawn(nn.ctx, actors.InternalNatsServerActorName, inats,
goakt.WithSupervisorStrategies(goakt.NewSupervisorStrategy(nil, restartDirective)),
)

if err != nil {
return err
}
Expand All @@ -274,7 +274,6 @@ func (nn *nexNode) initializeSupervisionTree() error {
_, err = agentSuper.SpawnChild(nn.ctx, models.DirectStartActorName, actors.CreateDirectStartAgent(nn.ctx, nn.nc, pk, *nn.options, nn.options.Logger.WithGroup(models.DirectStartActorName), nn),
goakt.WithSupervisorStrategies(goakt.NewSupervisorStrategy(nil, restartDirective)),
)

if err != nil {
return err
}
Expand Down Expand Up @@ -320,7 +319,7 @@ func (nn *nexNode) initializeSupervisionTree() error {
}

if len(wl) > 0 {
//direct-start_wdhGT117n7TOHpsasG2lRP
// direct-start_wdhGT117n7TOHpsasG2lRP
nn.options.Logger.Info("Existing state detected, Restoring now")
for _, c := range agentSuper.Children() {
wl := getWorkloads(c.Name())
Expand Down Expand Up @@ -477,8 +476,8 @@ func (nn *nexNode) GetInfo(namespace string) (*actorproto.NodeInfo, error) {
}
resp := &actorproto.NodeInfo{
Id: pk,
//FINDME
//TargetXkey: nn.options.
// FINDME
// TargetXkey: nn.options.
Tags: nn.options.Tags,
Uptime: time.Since(nn.startedAt).String(),
Version: VERSION,
Expand Down Expand Up @@ -729,3 +728,11 @@ func (nn *nexNode) getState() (map[string]*actorproto.StartWorkload, error) {

return reqs, nil
}

func (nn *nexNode) StartWorkloadMessage() string {
return nn.options.StartWorkloadMessage
}

func (nn *nexNode) StopWorkloadMessage() string {
return nn.options.StopWorkloadMessage
}
4 changes: 2 additions & 2 deletions test/copy_workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestCopyWorkload(t *testing.T) {

be.Equal(t, "ok", string(msg.Data))

re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started$`)
re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started\n.*$`)
match := re.FindStringSubmatch(strings.TrimSpace(origStdOut.String()))
be.Equal(t, 2, len(match))
origWorkloadId := match[1]
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestMultipleCopyWorkload(t *testing.T) {
origDeploy.Stdout = origStdOut
be.NilErr(t, origDeploy.Run())

re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started$`)
re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started\n.*$`)
match := re.FindStringSubmatch(strings.TrimSpace(origStdOut.String()))
be.Equal(t, 2, len(match))
origWorkloadId := match[1]
Expand Down
2 changes: 1 addition & 1 deletion test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,6 @@ func startNexNodeCmd(t testing.TB, workingDir, nodeSeed, xkeySeed, natsServer, n
xkeySeed = string(xSeed)
}

cmd := exec.Command(cli, "node", "up", "--logger.level", "debug", "--logger.short", "-s", natsServer, "--resource-directory", workingDir, "--node-name", name, "--nexus", nexus, "--node-seed", nodeSeed, "--node-xkey-seed", xkeySeed)
cmd := exec.Command(cli, "node", "up", "--logger.level", "debug", "--logger.short", "-s", natsServer, "--resource-directory", workingDir, "--node-name", name, "--nexus", nexus, "--node-seed", nodeSeed, "--node-xkey-seed", xkeySeed, "--start-message", "test workload started", "--stop-message", "test workload stopped")
return cmd
}
Loading
Loading