From 5e703ec5f111e4e40ddac5831156b744e261da40 Mon Sep 17 00:00:00 2001 From: Jordan Rash <15827604+jordan-rash@users.noreply.github.com> Date: Fri, 13 Dec 2024 09:20:54 -0700 Subject: [PATCH] adds trigger data env and runtype to cmd output Signed-off-by: Jordan Rash <15827604+jordan-rash@users.noreply.github.com> --- api/nodecontrol/gen/node_info_response.go | 6 +++ api/nodecontrol/gen/workload_ping_response.go | 6 +++ api/nodecontrol/node-info-response.json | 5 ++ api/nodecontrol/workload-ping-response.json | 5 ++ cmd/nex/workload.go | 16 +++++- .../actors/control_api_conversions.go | 26 +++++----- node/internal/actors/pb/agents.pb.go | 51 +++++++++++-------- node/internal/actors/pb/agents.proto | 5 +- node/internal/actors/process_actor.go | 24 ++++++--- test/testdata/function/main.go | 1 + test/workload_test.go | 20 +++++++- 11 files changed, 121 insertions(+), 44 deletions(-) diff --git a/api/nodecontrol/gen/node_info_response.go b/api/nodecontrol/gen/node_info_response.go index 315b1bda..1d0f3e28 100644 --- a/api/nodecontrol/gen/node_info_response.go +++ b/api/nodecontrol/gen/node_info_response.go @@ -84,6 +84,9 @@ type WorkloadSummary struct { // The start time of the workload StartTime string `json:"start_time" yaml:"start_time" mapstructure:"start_time"` + // The runtype/lifecycle of the workload + WorkloadRuntype string `json:"workload_runtype" yaml:"workload_runtype" mapstructure:"workload_runtype"` + // The state of the workload WorkloadState string `json:"workload_state" yaml:"workload_state" mapstructure:"workload_state"` @@ -109,6 +112,9 @@ func (j *WorkloadSummary) UnmarshalJSON(b []byte) error { if _, ok := raw["start_time"]; raw != nil && !ok { return fmt.Errorf("field start_time in WorkloadSummary: required") } + if _, ok := raw["workload_runtype"]; raw != nil && !ok { + return fmt.Errorf("field workload_runtype in WorkloadSummary: required") + } if _, ok := raw["workload_state"]; raw != nil && !ok { return fmt.Errorf("field workload_state in WorkloadSummary: required") } diff --git a/api/nodecontrol/gen/workload_ping_response.go b/api/nodecontrol/gen/workload_ping_response.go index f6d19864..9cd88666 100644 --- a/api/nodecontrol/gen/workload_ping_response.go +++ b/api/nodecontrol/gen/workload_ping_response.go @@ -18,6 +18,9 @@ type Workload struct { // The start time of the workload StartTime string `json:"start_time" yaml:"start_time" mapstructure:"start_time"` + // The runtype/lifecycle of the workload + WorkloadRuntype string `json:"workload_runtype" yaml:"workload_runtype" mapstructure:"workload_runtype"` + // The state of the workload WorkloadState string `json:"workload_state" yaml:"workload_state" mapstructure:"workload_state"` @@ -48,6 +51,9 @@ func (j *Workload) UnmarshalJSON(b []byte) error { if _, ok := raw["start_time"]; raw != nil && !ok { return fmt.Errorf("field start_time in Workload: required") } + if _, ok := raw["workload_runtype"]; raw != nil && !ok { + return fmt.Errorf("field workload_runtype in Workload: required") + } if _, ok := raw["workload_state"]; raw != nil && !ok { return fmt.Errorf("field workload_state in Workload: required") } diff --git a/api/nodecontrol/node-info-response.json b/api/nodecontrol/node-info-response.json index 05eeccda..e3feafe4 100644 --- a/api/nodecontrol/node-info-response.json +++ b/api/nodecontrol/node-info-response.json @@ -75,6 +75,10 @@ "type": "string", "description": "The type of the workload" }, + "workload_runtype": { + "type": "string", + "description": "The runtype/lifecycle of the workload" + }, "workload_state": { "type": "string", "description": "The state of the workload" @@ -86,6 +90,7 @@ "start_time", "runtime", "workload_type", + "workload_runtype", "workload_state" ] } diff --git a/api/nodecontrol/workload-ping-response.json b/api/nodecontrol/workload-ping-response.json index 7f267927..3984829f 100644 --- a/api/nodecontrol/workload-ping-response.json +++ b/api/nodecontrol/workload-ping-response.json @@ -39,6 +39,10 @@ "type": "string", "description": "The type of the workload" }, + "workload_runtype": { + "type": "string", + "description": "The runtype/lifecycle of the workload" + }, "workload_state": { "type": "string", "description": "The state of the workload" @@ -50,6 +54,7 @@ "start_time", "runtime", "workload_type", + "workload_runtype", "workload_state" ] } diff --git a/cmd/nex/workload.go b/cmd/nex/workload.go index 812a681c..7aeb7195 100644 --- a/cmd/nex/workload.go +++ b/cmd/nex/workload.go @@ -17,6 +17,7 @@ import ( "net/http" "os" "path/filepath" + "time" "strings" @@ -332,8 +333,21 @@ func (i InfoWorkload) Run(ctx context.Context, globals *Globals) error { w := columns.New("Information about Workload %s", i.WorkloadId) w.AddRow("Name", resp.WorkloadSummary.Name) w.AddRow("Start Time", resp.WorkloadSummary.StartTime) - w.AddRowIf("Run Time", resp.WorkloadSummary.Runtime, resp.WorkloadSummary.Runtime != "") + w.AddRow("Run Time", func() string { + if resp.WorkloadSummary.Runtime == "" { + return "--" + } + if resp.WorkloadSummary.Runtime == "0s" { + t, err := time.Parse(time.DateTime, resp.WorkloadSummary.StartTime) + if err != nil { + return "--" + } + return time.Since(t).String() + } + return resp.WorkloadSummary.Runtime + }()) w.AddRow("Workload Type", resp.WorkloadSummary.WorkloadType) + w.AddRow("Workload Runtype", resp.WorkloadSummary.WorkloadRuntype) w.AddRow("Workload State", resp.WorkloadSummary.WorkloadState) s, err := w.Render() if err != nil { diff --git a/node/internal/actors/control_api_conversions.go b/node/internal/actors/control_api_conversions.go index 1ba64cc6..1ff00882 100644 --- a/node/internal/actors/control_api_conversions.go +++ b/node/internal/actors/control_api_conversions.go @@ -96,12 +96,13 @@ func infoResponseFromProto(response *actorproto.NodeInfo) *api.NodeInfoResponseJ for _, workload := range response.Workloads { ret.WorkloadSummaries = append(ret.WorkloadSummaries, api.WorkloadSummary{ - Id: workload.Id, - Name: workload.Name, - Runtime: workload.Runtime, - StartTime: workload.StartedAt.AsTime().Format(time.DateTime), - WorkloadType: workload.WorkloadType, - WorkloadState: workload.State, + Id: workload.Id, + Name: workload.Name, + Runtime: workload.Runtime, + StartTime: workload.StartedAt.AsTime().Format(time.DateTime), + WorkloadType: workload.WorkloadType, + WorkloadRuntype: workload.WorkloadRuntype, + WorkloadState: workload.State, }) } return ret @@ -147,12 +148,13 @@ func pingResponseFromProto(response *actorproto.PingNodeResponse) *api.NodePingR func workloadPingResponseFromProto(response *actorproto.PingWorkloadResponse) *api.WorkloadPingResponseJson { return &api.WorkloadPingResponseJson{ WorkloadSummary: &api.Workload{ - Id: response.Workload.Id, - Name: response.Workload.Name, - Runtime: response.Workload.Runtime, - StartTime: response.Workload.StartedAt.AsTime().Format(time.DateTime), - WorkloadType: response.Workload.WorkloadType, - WorkloadState: response.Workload.State, + Id: response.Workload.Id, + Name: response.Workload.Name, + Runtime: response.Workload.Runtime, + StartTime: response.Workload.StartedAt.AsTime().Format(time.DateTime), + WorkloadType: response.Workload.WorkloadType, + WorkloadRuntype: response.Workload.WorkloadRuntype, + WorkloadState: response.Workload.State, }, } } diff --git a/node/internal/actors/pb/agents.pb.go b/node/internal/actors/pb/agents.pb.go index fccec648..9af99fc7 100644 --- a/node/internal/actors/pb/agents.pb.go +++ b/node/internal/actors/pb/agents.pb.go @@ -1525,13 +1525,14 @@ type WorkloadSummary struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` - Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` - Runtime string `protobuf:"bytes,3,opt,name=runtime,proto3" json:"runtime,omitempty"` - StartedAt *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=started_at,json=startedAt,proto3" json:"started_at,omitempty"` - WorkloadType string `protobuf:"bytes,5,opt,name=workload_type,json=workloadType,proto3" json:"workload_type,omitempty"` - Namespace string `protobuf:"bytes,6,opt,name=namespace,proto3" json:"namespace,omitempty"` - State string `protobuf:"bytes,7,opt,name=state,proto3" json:"state,omitempty"` + Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"` + Name string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"` + Runtime string `protobuf:"bytes,3,opt,name=runtime,proto3" json:"runtime,omitempty"` + StartedAt *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=started_at,json=startedAt,proto3" json:"started_at,omitempty"` + WorkloadType string `protobuf:"bytes,5,opt,name=workload_type,json=workloadType,proto3" json:"workload_type,omitempty"` + WorkloadRuntype string `protobuf:"bytes,6,opt,name=workload_runtype,json=workloadRuntype,proto3" json:"workload_runtype,omitempty"` + Namespace string `protobuf:"bytes,7,opt,name=namespace,proto3" json:"namespace,omitempty"` + State string `protobuf:"bytes,8,opt,name=state,proto3" json:"state,omitempty"` } func (x *WorkloadSummary) Reset() { @@ -1599,6 +1600,13 @@ func (x *WorkloadSummary) GetWorkloadType() string { return "" } +func (x *WorkloadSummary) GetWorkloadRuntype() string { + if x != nil { + return x.WorkloadRuntype + } + return "" +} + func (x *WorkloadSummary) GetNamespace() string { if x != nil { return x.Namespace @@ -1961,7 +1969,7 @@ var file_agents_proto_rawDesc = []byte{ 0x61, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, - 0x3a, 0x02, 0x38, 0x01, 0x22, 0xe3, 0x01, 0x0a, 0x0f, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, + 0x3a, 0x02, 0x38, 0x01, 0x22, 0x8e, 0x02, 0x0a, 0x0f, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x53, 0x75, 0x6d, 0x6d, 0x61, 0x72, 0x79, 0x12, 0x0e, 0x0a, 0x02, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, @@ -1972,18 +1980,21 @@ var file_agents_proto_rawDesc = []byte{ 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x09, 0x73, 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x41, 0x74, 0x12, 0x23, 0x0a, 0x0d, 0x77, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0c, 0x77, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, - 0x61, 0x64, 0x54, 0x79, 0x70, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, - 0x61, 0x63, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, - 0x70, 0x61, 0x63, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x07, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x0d, 0x0a, 0x0b, 0x53, 0x65, - 0x74, 0x4c, 0x61, 0x6d, 0x65, 0x44, 0x75, 0x63, 0x6b, 0x22, 0x2c, 0x0a, 0x10, 0x4c, 0x61, 0x6d, - 0x65, 0x44, 0x75, 0x63, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, - 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, - 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x22, 0x06, 0x0a, 0x04, 0x48, 0x61, 0x6c, 0x74, 0x42, - 0x35, 0x5a, 0x33, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x79, - 0x6e, 0x61, 0x64, 0x69, 0x61, 0x2d, 0x69, 0x6f, 0x2f, 0x6e, 0x65, 0x78, 0x2f, 0x6e, 0x6f, 0x64, - 0x65, 0x2f, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x73, 0x2f, 0x70, 0x62, 0x3b, 0x61, 0x63, 0x74, 0x6f, - 0x72, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x61, 0x64, 0x54, 0x79, 0x70, 0x65, 0x12, 0x29, 0x0a, 0x10, 0x77, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, + 0x61, 0x64, 0x5f, 0x72, 0x75, 0x6e, 0x74, 0x79, 0x70, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x0f, 0x77, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x75, 0x6e, 0x74, 0x79, 0x70, + 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x18, 0x07, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x6e, 0x61, 0x6d, 0x65, 0x73, 0x70, 0x61, 0x63, 0x65, 0x12, + 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x65, 0x18, 0x08, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, + 0x73, 0x74, 0x61, 0x74, 0x65, 0x22, 0x0d, 0x0a, 0x0b, 0x53, 0x65, 0x74, 0x4c, 0x61, 0x6d, 0x65, + 0x44, 0x75, 0x63, 0x6b, 0x22, 0x2c, 0x0a, 0x10, 0x4c, 0x61, 0x6d, 0x65, 0x44, 0x75, 0x63, 0x6b, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, + 0x65, 0x73, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, + 0x73, 0x73, 0x22, 0x06, 0x0a, 0x04, 0x48, 0x61, 0x6c, 0x74, 0x42, 0x35, 0x5a, 0x33, 0x67, 0x69, + 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x79, 0x6e, 0x61, 0x64, 0x69, 0x61, + 0x2d, 0x69, 0x6f, 0x2f, 0x6e, 0x65, 0x78, 0x2f, 0x6e, 0x6f, 0x64, 0x65, 0x2f, 0x61, 0x63, 0x74, + 0x6f, 0x72, 0x73, 0x2f, 0x70, 0x62, 0x3b, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x70, 0x72, 0x6f, 0x74, + 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/node/internal/actors/pb/agents.proto b/node/internal/actors/pb/agents.proto index 90d1e342..c300fac8 100644 --- a/node/internal/actors/pb/agents.proto +++ b/node/internal/actors/pb/agents.proto @@ -159,8 +159,9 @@ message WorkloadSummary { string runtime = 3; google.protobuf.Timestamp started_at = 4; string workload_type = 5; - string namespace = 6; - string state = 7; + string workload_runtype = 6; + string namespace = 7; + string state = 8; } message SetLameDuck {} diff --git a/node/internal/actors/process_actor.go b/node/internal/actors/process_actor.go index 41bcb9bd..b22a5018 100644 --- a/node/internal/actors/process_actor.go +++ b/node/internal/actors/process_actor.go @@ -2,6 +2,7 @@ package actors import ( "context" + "fmt" "log/slog" "time" @@ -107,13 +108,14 @@ func (a *processActor) Receive(ctx *goakt.ReceiveContext) { ctx.Shutdown() case *actorproto.QueryWorkload: ctx.Response(&actorproto.WorkloadSummary{ - Id: a.id, - Name: a.processName, - Namespace: a.namespace, - Runtime: a.runTime.String(), - StartedAt: timestamppb.New(a.startedAt), - WorkloadType: a.workloadType, - State: a.state, + Id: a.id, + Name: a.processName, + Namespace: a.namespace, + Runtime: a.runTime.String(), + StartedAt: timestamppb.New(a.startedAt), + WorkloadType: a.workloadType, + WorkloadRuntype: a.runType, + State: a.state, }) default: a.logger.Warn("unknown message", slog.Any("msg", ctx.Message())) @@ -158,7 +160,8 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) { a.cancel = cancel // TODO: subscribe to all triggers or change to only allow one - s, err := a.nc.Subscribe(a.triggerSubs[0], func(msg *nats.Msg) { + // TODO: need to have a better quere group. possibly an ID at start time + s, err := a.nc.QueueSubscribe(a.triggerSubs[0], a.processName, func(msg *nats.Msg) { a.state = models.WorkloadStateRunning ticker := time.NewTicker(DefaultJobRunTime) @@ -175,6 +178,10 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) { stdout := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: false} stderr := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: true} + if a.env == nil { + a.env = make(map[string]string) + } + a.env["NEX_TRIGGER_DATA"] = string(msg.Data) a.process, err = NewOsProcess(a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, stdout, stderr) if err != nil { a.logger.Error("failed to create process", slog.Any("err", err)) @@ -192,6 +199,7 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) { a.runTime = a.runTime + exeEnd.Sub(exeStart) a.state = models.WorkloadStateWarm + ticker.Stop() }) if err != nil { a.logger.Error("failed to subscribe to trigger", slog.Any("err", err)) diff --git a/test/testdata/function/main.go b/test/testdata/function/main.go index 4ee9a964..5bcbffff 100644 --- a/test/testdata/function/main.go +++ b/test/testdata/function/main.go @@ -8,6 +8,7 @@ import ( func main() { fmt.Fprint(os.Stdout, "starting function") + fmt.Fprint(os.Stdout, os.Getenv("NEX_TRIGGER_DATA")) time.Sleep(750 * time.Millisecond) fmt.Fprint(os.Stdout, "ending function") } diff --git a/test/workload_test.go b/test/workload_test.go index 98e76064..bf3008eb 100644 --- a/test/workload_test.go +++ b/test/workload_test.go @@ -271,6 +271,7 @@ func TestDirectStartFunction(t *testing.T) { }() passed := false + triggerLogs := new(bytes.Buffer) go func() { pub, err := kp.PublicKey() if err != nil { @@ -341,8 +342,21 @@ func TestDirectStartFunction(t *testing.T) { t.Error(err) return } + defer nc.Close() - err = nc.Publish("test", []byte("test")) + workloadId := match[1] + sub, err := nc.Subscribe("$NEX.logs.system."+workloadId+".stdout", func(msg *nats.Msg) { + triggerLogs.Write(msg.Data) + }) + if err != nil { + t.Error(err) + return + } + defer func() { + _ = sub.Drain() + }() + + err = nc.Publish("test", []byte("test data 123")) if err != nil { t.Error(err) return @@ -392,4 +406,8 @@ func TestDirectStartFunction(t *testing.T) { if !passed { t.Fatal("expected workload to run for 500ms - 1s") } + + if !bytes.Contains(triggerLogs.Bytes(), []byte("test data 123")) { + t.Error("expected 'test data 123' to be consumed by workload") + } }