Skip to content

Commit

Permalink
adds trigger data env and runtype to cmd output
Browse files Browse the repository at this point in the history
Signed-off-by: Jordan Rash <15827604+jordan-rash@users.noreply.github.com>
  • Loading branch information
jordan-rash committed Dec 13, 2024
1 parent c3369eb commit 5e703ec
Show file tree
Hide file tree
Showing 11 changed files with 121 additions and 44 deletions.
6 changes: 6 additions & 0 deletions api/nodecontrol/gen/node_info_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions api/nodecontrol/gen/workload_ping_response.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions api/nodecontrol/node-info-response.json
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@
"type": "string",
"description": "The type of the workload"
},
"workload_runtype": {
"type": "string",
"description": "The runtype/lifecycle of the workload"
},
"workload_state": {
"type": "string",
"description": "The state of the workload"
Expand All @@ -86,6 +90,7 @@
"start_time",
"runtime",
"workload_type",
"workload_runtype",
"workload_state"
]
}
Expand Down
5 changes: 5 additions & 0 deletions api/nodecontrol/workload-ping-response.json
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@
"type": "string",
"description": "The type of the workload"
},
"workload_runtype": {
"type": "string",
"description": "The runtype/lifecycle of the workload"
},
"workload_state": {
"type": "string",
"description": "The state of the workload"
Expand All @@ -50,6 +54,7 @@
"start_time",
"runtime",
"workload_type",
"workload_runtype",
"workload_state"
]
}
Expand Down
16 changes: 15 additions & 1 deletion cmd/nex/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"net/http"
"os"
"path/filepath"
"time"

"strings"

Expand Down Expand Up @@ -332,8 +333,21 @@ func (i InfoWorkload) Run(ctx context.Context, globals *Globals) error {
w := columns.New("Information about Workload %s", i.WorkloadId)
w.AddRow("Name", resp.WorkloadSummary.Name)
w.AddRow("Start Time", resp.WorkloadSummary.StartTime)
w.AddRowIf("Run Time", resp.WorkloadSummary.Runtime, resp.WorkloadSummary.Runtime != "")
w.AddRow("Run Time", func() string {
if resp.WorkloadSummary.Runtime == "" {
return "--"
}
if resp.WorkloadSummary.Runtime == "0s" {
t, err := time.Parse(time.DateTime, resp.WorkloadSummary.StartTime)
if err != nil {
return "--"
}
return time.Since(t).String()
}
return resp.WorkloadSummary.Runtime
}())
w.AddRow("Workload Type", resp.WorkloadSummary.WorkloadType)
w.AddRow("Workload Runtype", resp.WorkloadSummary.WorkloadRuntype)
w.AddRow("Workload State", resp.WorkloadSummary.WorkloadState)
s, err := w.Render()
if err != nil {
Expand Down
26 changes: 14 additions & 12 deletions node/internal/actors/control_api_conversions.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,12 +96,13 @@ func infoResponseFromProto(response *actorproto.NodeInfo) *api.NodeInfoResponseJ

for _, workload := range response.Workloads {
ret.WorkloadSummaries = append(ret.WorkloadSummaries, api.WorkloadSummary{
Id: workload.Id,
Name: workload.Name,
Runtime: workload.Runtime,
StartTime: workload.StartedAt.AsTime().Format(time.DateTime),
WorkloadType: workload.WorkloadType,
WorkloadState: workload.State,
Id: workload.Id,
Name: workload.Name,
Runtime: workload.Runtime,
StartTime: workload.StartedAt.AsTime().Format(time.DateTime),
WorkloadType: workload.WorkloadType,
WorkloadRuntype: workload.WorkloadRuntype,
WorkloadState: workload.State,
})
}
return ret
Expand Down Expand Up @@ -147,12 +148,13 @@ func pingResponseFromProto(response *actorproto.PingNodeResponse) *api.NodePingR
func workloadPingResponseFromProto(response *actorproto.PingWorkloadResponse) *api.WorkloadPingResponseJson {
return &api.WorkloadPingResponseJson{
WorkloadSummary: &api.Workload{
Id: response.Workload.Id,
Name: response.Workload.Name,
Runtime: response.Workload.Runtime,
StartTime: response.Workload.StartedAt.AsTime().Format(time.DateTime),
WorkloadType: response.Workload.WorkloadType,
WorkloadState: response.Workload.State,
Id: response.Workload.Id,
Name: response.Workload.Name,
Runtime: response.Workload.Runtime,
StartTime: response.Workload.StartedAt.AsTime().Format(time.DateTime),
WorkloadType: response.Workload.WorkloadType,
WorkloadRuntype: response.Workload.WorkloadRuntype,
WorkloadState: response.Workload.State,
},
}
}
51 changes: 31 additions & 20 deletions node/internal/actors/pb/agents.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions node/internal/actors/pb/agents.proto
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,9 @@ message WorkloadSummary {
string runtime = 3;
google.protobuf.Timestamp started_at = 4;
string workload_type = 5;
string namespace = 6;
string state = 7;
string workload_runtype = 6;
string namespace = 7;
string state = 8;
}

message SetLameDuck {}
Expand Down
24 changes: 16 additions & 8 deletions node/internal/actors/process_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package actors

import (
"context"
"fmt"

Check failure on line 5 in node/internal/actors/process_actor.go

View workflow job for this annotation

GitHub Actions / lint

"fmt" imported and not used) (typecheck)

Check failure on line 5 in node/internal/actors/process_actor.go

View workflow job for this annotation

GitHub Actions / lint

"fmt" imported and not used (typecheck)

Check failure on line 5 in node/internal/actors/process_actor.go

View workflow job for this annotation

GitHub Actions / test (ubuntu-latest)

"fmt" imported and not used

Check failure on line 5 in node/internal/actors/process_actor.go

View workflow job for this annotation

GitHub Actions / test (windows-latest)

"fmt" imported and not used

Check failure on line 5 in node/internal/actors/process_actor.go

View workflow job for this annotation

GitHub Actions / test (macos-latest)

"fmt" imported and not used
"log/slog"
"time"

Expand Down Expand Up @@ -107,13 +108,14 @@ func (a *processActor) Receive(ctx *goakt.ReceiveContext) {
ctx.Shutdown()
case *actorproto.QueryWorkload:
ctx.Response(&actorproto.WorkloadSummary{
Id: a.id,
Name: a.processName,
Namespace: a.namespace,
Runtime: a.runTime.String(),
StartedAt: timestamppb.New(a.startedAt),
WorkloadType: a.workloadType,
State: a.state,
Id: a.id,
Name: a.processName,
Namespace: a.namespace,
Runtime: a.runTime.String(),
StartedAt: timestamppb.New(a.startedAt),
WorkloadType: a.workloadType,
WorkloadRuntype: a.runType,
State: a.state,
})
default:
a.logger.Warn("unknown message", slog.Any("msg", ctx.Message()))
Expand Down Expand Up @@ -158,7 +160,8 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) {
a.cancel = cancel

// TODO: subscribe to all triggers or change to only allow one
s, err := a.nc.Subscribe(a.triggerSubs[0], func(msg *nats.Msg) {
// TODO: need to have a better quere group. possibly an ID at start time
s, err := a.nc.QueueSubscribe(a.triggerSubs[0], a.processName, func(msg *nats.Msg) {
a.state = models.WorkloadStateRunning

ticker := time.NewTicker(DefaultJobRunTime)
Expand All @@ -175,6 +178,10 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) {
stdout := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: false}
stderr := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: true}

if a.env == nil {
a.env = make(map[string]string)
}
a.env["NEX_TRIGGER_DATA"] = string(msg.Data)
a.process, err = NewOsProcess(a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, stdout, stderr)
if err != nil {
a.logger.Error("failed to create process", slog.Any("err", err))
Expand All @@ -192,6 +199,7 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) {

a.runTime = a.runTime + exeEnd.Sub(exeStart)
a.state = models.WorkloadStateWarm
ticker.Stop()
})
if err != nil {
a.logger.Error("failed to subscribe to trigger", slog.Any("err", err))
Expand Down
1 change: 1 addition & 0 deletions test/testdata/function/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

func main() {
fmt.Fprint(os.Stdout, "starting function")
fmt.Fprint(os.Stdout, os.Getenv("NEX_TRIGGER_DATA"))
time.Sleep(750 * time.Millisecond)
fmt.Fprint(os.Stdout, "ending function")
}
20 changes: 19 additions & 1 deletion test/workload_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ func TestDirectStartFunction(t *testing.T) {
}()

passed := false
triggerLogs := new(bytes.Buffer)
go func() {
pub, err := kp.PublicKey()
if err != nil {
Expand Down Expand Up @@ -341,8 +342,21 @@ func TestDirectStartFunction(t *testing.T) {
t.Error(err)
return
}
defer nc.Close()

err = nc.Publish("test", []byte("test"))
workloadId := match[1]
sub, err := nc.Subscribe("$NEX.logs.system."+workloadId+".stdout", func(msg *nats.Msg) {
triggerLogs.Write(msg.Data)
})
if err != nil {
t.Error(err)
return
}
defer func() {
_ = sub.Drain()
}()

err = nc.Publish("test", []byte("test data 123"))
if err != nil {
t.Error(err)
return
Expand Down Expand Up @@ -392,4 +406,8 @@ func TestDirectStartFunction(t *testing.T) {
if !passed {
t.Fatal("expected workload to run for 500ms - 1s")
}

if !bytes.Contains(triggerLogs.Bytes(), []byte("test data 123")) {
t.Error("expected 'test data 123' to be consumed by workload")
}
}

0 comments on commit 5e703ec

Please sign in to comment.