Skip to content

Commit

Permalink
adds options start/stop message
Browse files Browse the repository at this point in the history
Signed-off-by: Jordan Rash <jordan@synadia.com>
  • Loading branch information
jordan-rash committed Jan 3, 2025
1 parent 82fac7b commit 9d1f5f5
Show file tree
Hide file tree
Showing 13 changed files with 92 additions and 33 deletions.
4 changes: 1 addition & 3 deletions api/nodecontrol/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@ import (
"github.com/synadia-io/nex/models"
)

var (
DefaultRequestTimeout = 5 * time.Second
)
var DefaultRequestTimeout = 5 * time.Second

type ControlAPIClient struct {
nc *nats.Conn
Expand Down
6 changes: 6 additions & 0 deletions api/nodecontrol/gen/start_workload_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions api/nodecontrol/gen/stop_workload_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion api/nodecontrol/start-workload-response.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
},
"started": {
"type": "boolean"
},
"message": {
"type": "string"
}
},
"required": [
"id",
"issuer",
"name",
"started"
"started",
"message"
],
"definitions": {},
"additionalProperties": false
Expand Down
6 changes: 5 additions & 1 deletion api/nodecontrol/stop-workload-response.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
},
"stopped": {
"type": "boolean"
},
"message": {
"type": "string"
}
},
"required": [
"id",
"issuer",
"stopped"
"stopped",
"message"
],
"definitions": {},
"additionalProperties": false
Expand Down
4 changes: 4 additions & 0 deletions cmd/nex/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,8 @@ type Up struct {
HideWorkloadLogs bool `name:"hide-workload-logs" help:"Hide logs from workloads" default:"false"`
ShowSystemLogs bool `name:"system-logs" help:"Show verbose level logs from inside actor framework" default:"false" hidden:""`
OCICache string `name:"oci-cache" help:"Path to OCI cache registry" placeholder:"localhost:5000"`
StartMessage string `name:"start-message" help:"Message to display on the successful start of a workload" placeholder:"Great job!"`
StopMessage string `name:"stop-message" help:"Message to display on the successful stop of a workload" placeholder:"Goodbye!"`

HostServicesConfig HostServicesConfig `embed:"" prefix:"hostservices." group:"Host Services Configuration"`
OtelConfig OtelConfig `embed:"" prefix:"otel." group:"OpenTelemetry Configuration"`
Expand Down Expand Up @@ -435,6 +437,8 @@ func (u Up) Run(ctx context.Context, globals *Globals, n *Node) error {
options.WithValidIssuers(u.ValidIssuers),
options.WithOCICacheRegistry(u.OCICache),
options.WithDevMode(globals.DevMode),
options.WithStartWorkloadMessage(u.StartMessage),
options.WithStopWorkloadMessage(u.StopMessage),
options.WithOTelOptions(options.OTelOptions{
MetricsEnabled: u.OtelConfig.OtelMetrics,
MetricsPort: u.OtelConfig.OtelMetricsPort,
Expand Down
13 changes: 10 additions & 3 deletions cmd/nex/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ import (
"net/http"
"os"
"path/filepath"
"time"

"strings"
"time"

"github.com/jedib0t/go-pretty/v6/table"
"github.com/synadia-io/nex/api/nodecontrol"
Expand Down Expand Up @@ -239,8 +238,14 @@ func (r RunWorkload) Run(ctx context.Context, globals *Globals, w *Workload) err
if resp.Started {
if r.NodeId != "" {
fmt.Printf("Workload %s [%s] started on node %s\n", r.WorkloadName, resp.Id, r.NodeId)
if resp.Message != "" {
fmt.Println(resp.Message)
}
} else {
fmt.Printf("Workload %s [%s] started\n", r.WorkloadName, resp.Id)
if resp.Message != "" {
fmt.Println(resp.Message)
}
}
} else {
fmt.Printf("Workload %s failed to start\n", r.WorkloadName)
Expand Down Expand Up @@ -289,6 +294,9 @@ func (s StopWorkload) Run(ctx context.Context, globals *Globals, w *Workload) er

if resp.Stopped {
fmt.Printf("Workload %s stopped\n", s.WorkloadId)
if resp.Message != "" {
fmt.Println(resp.Message)
}
} else {
fmt.Printf("Workload %s failed to stop\n", s.WorkloadId)
}
Expand Down Expand Up @@ -779,7 +787,6 @@ func (b BundleWorkload) Run(ctx context.Context, globals *Globals) error {
_, err = io.Copy(tw, file)
return err
})

if err != nil {
return err
}
Expand Down
15 changes: 15 additions & 0 deletions models/node_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type NodeOptions struct {
OCICacheRegistry string
DevMode bool

StartWorkloadMessage string
StopWorkloadMessage string

Errs error
}

Expand Down Expand Up @@ -136,6 +139,18 @@ func WithDevMode(b bool) NodeOption {
}
}

func WithStartWorkloadMessage(s string) NodeOption {
return func(n *NodeOptions) {
n.StartWorkloadMessage = s
}
}

func WithStopWorkloadMessage(s string) NodeOption {
return func(n *NodeOptions) {
n.StopWorkloadMessage = s
}
}

type OTelOptions struct {
MetricsEnabled bool
MetricsExporter string
Expand Down
22 changes: 14 additions & 8 deletions node/internal/actors/control_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
actorproto "github.com/synadia-io/nex/node/internal/actors/pb"
)

const ControlAPIActorName = "control_api"
const DefaultAskDuration = 10 * time.Second
const (
ControlAPIActorName = "control_api"
DefaultAskDuration = 10 * time.Second
)

const (
AuctionResponseType = "io.nats.nex.v2.auction_response"
Expand Down Expand Up @@ -59,6 +61,8 @@ type NodeCallback interface {
EncryptPayload([]byte, string) ([]byte, string, error)
DecryptPayload([]byte) ([]byte, error)
EmitEvent(string, json.RawMessage) error
StartWorkloadMetadata() string
StopWorkloadMetadata() string
}

type StateCallback interface {
Expand Down Expand Up @@ -95,7 +99,6 @@ func (a *ControlAPI) PreStart(ctx context.Context) error {
}

func (a *ControlAPI) PostStop(ctx context.Context) error {

return nil
}

Expand Down Expand Up @@ -362,8 +365,9 @@ func (api *ControlAPI) handleDeploy(m *nats.Msg) {
models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err))
return
}

models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(&workloadStarted), "")
resp := startResponseFromProto(&workloadStarted)
resp.Message = api.nodeCallback.StartWorkloadMetadata()
models.RespondEnvelope(m, RunResponseType, 200, resp, "")
}

func (api *ControlAPI) handleUndeploy(m *nats.Msg) {
Expand All @@ -386,7 +390,7 @@ findWorkload:
for _, grandchild := range child.Children() { // iterate over all workloads
if grandchild.Name() == workloadId {
askResp, err = api.self.Ask(context.Background(), child, &actorproto.StopWorkload{Namespace: namespace, WorkloadId: workloadId}, DefaultAskDuration)
//err = api.self.Tell(context.Background(), child, &actorproto.StopWorkload{Namespace: namespace, WorkloadId: workloadId})
// err = api.self.Tell(context.Background(), child, &actorproto.StopWorkload{Namespace: namespace, WorkloadId: workloadId})
if err != nil {
api.logger.Error("Failed to stop workload", slog.Any("error", err))
models.RespondEnvelope(m, StopResponseType, 500, "", fmt.Sprintf("Failed to stop workload: %s", err))
Expand Down Expand Up @@ -418,7 +422,9 @@ findWorkload:
models.RespondEnvelope(m, StopResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err))
return
}
models.RespondEnvelope(m, StopResponseType, 200, stopResponseFromProto(&workloadStopped), "")
resp := stopResponseFromProto(&workloadStopped)
resp.Message = api.nodeCallback.StopWorkloadMetadata()
models.RespondEnvelope(m, StopResponseType, 200, resp, "")
}

func (api *ControlAPI) handleInfo(m *nats.Msg) {
Expand Down Expand Up @@ -477,7 +483,7 @@ func (api *ControlAPI) handleLameDuck(m *nats.Msg) {
}

ticker := time.NewTicker(100 * time.Millisecond)
for _ = range ticker.C {
for range ticker.C {
if agentSuper.ChildrenCount() == 0 {
ticker.Stop()
cancel()
Expand Down
29 changes: 18 additions & 11 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,10 @@ func NewNexNode(serverKey nkeys.KeyPair, nc *nats.Conn, opts ...models.NodeOptio
HostServiceOptions: models.HostServiceOptions{
Services: make(map[string]models.ServiceConfig),
},
OCICacheRegistry: "",
DevMode: false,
OCICacheRegistry: "",
DevMode: false,
StartWorkloadMessage: "",
StopWorkloadMessage: "",
},
}

Expand Down Expand Up @@ -209,9 +211,9 @@ func (nn *nexNode) initializeSupervisionTree() error {
goakt.WithPassivationDisabled(),
// In the non-v2 version of goakt, these functions were supported.
// TODO: figure out why they're gone or how we can plug in our own impls
//goakt.WithTelemetry(telemetry),
//goakt.WithTracing(),
//goakt.WithSupervisorDirective(restartDirective),
// goakt.WithTelemetry(telemetry),
// goakt.WithTracing(),
// goakt.WithSupervisorDirective(restartDirective),
goakt.WithActorInitMaxRetries(3))
if err != nil {
return err
Expand All @@ -230,7 +232,6 @@ func (nn *nexNode) initializeSupervisionTree() error {
agentSuper, err := nn.actorSystem.Spawn(nn.ctx, actors.AgentSupervisorActorName, actors.CreateAgentSupervisor(nn.actorSystem, *nn.options),
goakt.WithSupervisorStrategies(goakt.NewSupervisorStrategy(nil, restartDirective)),
)

if err != nil {
return err
}
Expand All @@ -248,7 +249,6 @@ func (nn *nexNode) initializeSupervisionTree() error {
_, err = nn.actorSystem.Spawn(nn.ctx, actors.InternalNatsServerActorName, inats,
goakt.WithSupervisorStrategies(goakt.NewSupervisorStrategy(nil, restartDirective)),
)

if err != nil {
return err
}
Expand All @@ -274,7 +274,6 @@ func (nn *nexNode) initializeSupervisionTree() error {
_, err = agentSuper.SpawnChild(nn.ctx, models.DirectStartActorName, actors.CreateDirectStartAgent(nn.ctx, nn.nc, pk, *nn.options, nn.options.Logger.WithGroup(models.DirectStartActorName), nn),
goakt.WithSupervisorStrategies(goakt.NewSupervisorStrategy(nil, restartDirective)),
)

if err != nil {
return err
}
Expand Down Expand Up @@ -320,7 +319,7 @@ func (nn *nexNode) initializeSupervisionTree() error {
}

if len(wl) > 0 {
//direct-start_wdhGT117n7TOHpsasG2lRP
// direct-start_wdhGT117n7TOHpsasG2lRP
nn.options.Logger.Info("Existing state detected, Restoring now")
for _, c := range agentSuper.Children() {
wl := getWorkloads(c.Name())
Expand Down Expand Up @@ -477,8 +476,8 @@ func (nn *nexNode) GetInfo(namespace string) (*actorproto.NodeInfo, error) {
}
resp := &actorproto.NodeInfo{
Id: pk,
//FINDME
//TargetXkey: nn.options.
// FINDME
// TargetXkey: nn.options.
Tags: nn.options.Tags,
Uptime: time.Since(nn.startedAt).String(),
Version: VERSION,
Expand Down Expand Up @@ -729,3 +728,11 @@ func (nn *nexNode) getState() (map[string]*actorproto.StartWorkload, error) {

return reqs, nil
}

func (nn *nexNode) StartWorkloadMetadata() string {
return nn.options.StartWorkloadMessage
}

func (nn *nexNode) StopWorkloadMetadata() string {
return nn.options.StopWorkloadMessage
}
4 changes: 2 additions & 2 deletions test/copy_workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestCopyWorkload(t *testing.T) {

be.Equal(t, "ok", string(msg.Data))

re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started$`)
re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started\n.*$`)
match := re.FindStringSubmatch(strings.TrimSpace(origStdOut.String()))
be.Equal(t, 2, len(match))
origWorkloadId := match[1]
Expand Down Expand Up @@ -134,7 +134,7 @@ func TestMultipleCopyWorkload(t *testing.T) {
origDeploy.Stdout = origStdOut
be.NilErr(t, origDeploy.Run())

re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started$`)
re := regexp.MustCompile(`^Workload tester \[(?P<workload>[A-Za-z0-9]+)\] started\n.*$`)
match := re.FindStringSubmatch(strings.TrimSpace(origStdOut.String()))
be.Equal(t, 2, len(match))
origWorkloadId := match[1]
Expand Down
2 changes: 1 addition & 1 deletion test/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,6 @@ func startNexNodeCmd(t testing.TB, workingDir, nodeSeed, xkeySeed, natsServer, n
xkeySeed = string(xSeed)
}

cmd := exec.Command(cli, "node", "up", "--logger.level", "debug", "--logger.short", "-s", natsServer, "--resource-directory", workingDir, "--node-name", name, "--nexus", nexus, "--node-seed", nodeSeed, "--node-xkey-seed", xkeySeed)
cmd := exec.Command(cli, "node", "up", "--logger.level", "debug", "--logger.short", "-s", natsServer, "--resource-directory", workingDir, "--node-name", name, "--nexus", nexus, "--node-seed", nodeSeed, "--node-xkey-seed", xkeySeed, "--start-message", "test workload started", "--stop-message", "test workload stopped")
return cmd
}
Loading

0 comments on commit 9d1f5f5

Please sign in to comment.