diff --git a/node/internal/actors/direct_start_agent.go b/node/internal/actors/direct_start_agent.go index 778c07ae..88e255db 100644 --- a/node/internal/actors/direct_start_agent.go +++ b/node/internal/actors/direct_start_agent.go @@ -263,21 +263,7 @@ func (a *DirectStartAgent) startWorkload(m *actorproto.StartWorkload) (*actorpro if m.WorkloadId == "" { m.WorkloadId = nuid.New().Next() } - pa, err := createNewProcessActor( - a.ctx, - a.logger.WithGroup("workload"), - a.nc, - m.WorkloadId, - m.Argv, - m.Namespace, - m.WorkloadType, - m.WorkloadName, - ref, - env, - m.WorkloadRuntype, - m.TriggerSubject, - int(m.RetryCount), - ) + pa, err := newProcessActor(a.ctx, a.logger.WithGroup("workload"), a.nc, m, ref, env) if err != nil { a.logger.Error("Failed to create process actor", slog.String("name", a.self.Name()), slog.Any("err", err)) return nil, err diff --git a/node/internal/actors/os_procs.go b/node/internal/actors/os_procs.go index 9a84930c..0a8c0109 100644 --- a/node/internal/actors/os_procs.go +++ b/node/internal/actors/os_procs.go @@ -5,6 +5,7 @@ import ( "fmt" "log/slog" "os/exec" + "strings" "github.com/nats-io/nats.go" ) @@ -111,17 +112,17 @@ type logCapture struct { nc *nats.Conn namespace string stderr bool - name string + id string } // Log capture implementation of io.Writer for stdout and stderr func (cap logCapture) Write(p []byte) (n int, err error) { if !cap.stderr { - _ = cap.nc.Publish(fmt.Sprintf("$NEX.logs.%s.%s.stdout", cap.namespace, cap.name), p) - cap.logger.Debug(string(p), slog.String("process_name", cap.name)) + _ = cap.nc.Publish(fmt.Sprintf("$NEX.logs.%s.%s.stdout", cap.namespace, cap.id), p) + cap.logger.Debug(strings.ReplaceAll(string(p), "\n", "\\n")) } else { - _ = cap.nc.Publish(fmt.Sprintf("$NEX.logs.%s.%s.stderr", cap.namespace, cap.name), p) - cap.logger.Error(string(p), slog.String("process_name", cap.name)) + _ = cap.nc.Publish(fmt.Sprintf("$NEX.logs.%s.%s.stderr", cap.namespace, cap.id), p) + cap.logger.Error(strings.ReplaceAll(string(p), "\n", "\\n")) } return len(p), nil } diff --git a/node/internal/actors/process_actor.go b/node/internal/actors/process_actor.go index c3be2369..31030878 100644 --- a/node/internal/actors/process_actor.go +++ b/node/internal/actors/process_actor.go @@ -18,11 +18,14 @@ const ( ) type processActor struct { - ctx context.Context + ctx context.Context + cancel context.CancelFunc + logger *slog.Logger + nc *nats.Conn + startedAt time.Time runTime time.Duration id string - nc *nats.Conn argv []string env map[string]string @@ -37,49 +40,38 @@ type processActor struct { retryCounter int - cancel context.CancelFunc - logger *slog.Logger - self *goakt.PID + self *goakt.PID + process *OsProcess + stdout logCapture + stderr logCapture } -func createNewProcessActor( - ctx context.Context, - logger *slog.Logger, - nc *nats.Conn, - processId string, - argv []string, - namespace string, - workloadType string, - processName string, - ref *ArtifactReference, - env map[string]string, - runType string, - triggerSub string, - retryCount int, -) (*processActor, error) { - - ret := processActor{ +func newProcessActor(ctx context.Context, logger *slog.Logger, nc *nats.Conn, m *actorproto.StartWorkload, ref *ArtifactReference, clearEnv map[string]string) (*processActor, error) { + ctx, cancel := context.WithCancel(ctx) + ret := &processActor{ ctx: ctx, + cancel: cancel, + logger: logger, + nc: nc, startedAt: time.Now(), runTime: 0, - id: processId, - nc: nc, - workloadType: workloadType, - namespace: namespace, - processName: processName, - runType: runType, + id: m.WorkloadId, + workloadType: m.WorkloadType, + namespace: m.Namespace, + processName: m.WorkloadName, + runType: m.WorkloadRuntype, state: models.WorkloadStateStarting, ref: ref, - logger: logger, - argv: argv, - env: env, - triggerSub: triggerSub, - retryCount: retryCount, + argv: m.Argv, + env: clearEnv, + triggerSub: m.TriggerSubject, + retryCount: int(m.RetryCount), retryCounter: 0, + stdout: logCapture{logger: logger.With(slog.String("id", m.WorkloadId)), nc: nc, namespace: m.Namespace, id: m.WorkloadId, stderr: false}, + stderr: logCapture{logger: logger.With(slog.String("id", m.WorkloadId)), nc: nc, namespace: m.Namespace, id: m.WorkloadId, stderr: true}, } - - return &ret, nil + return ret, nil } func (a *processActor) PreStart(ctx context.Context) error { @@ -106,10 +98,7 @@ func (a *processActor) Receive(ctx *goakt.ReceiveContext) { return } - if a.cancel != nil { - a.cancel() - } - + a.cancel() ctx.Shutdown() case *actorproto.QueryWorkload: ctx.Response(&actorproto.WorkloadSummary{ @@ -136,10 +125,7 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) { for a.state != models.WorkloadStateStopped && a.retryCounter < a.retryCount { a.state = models.WorkloadStateRunning - stdout := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: false} - stderr := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: true} - - a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, stdout, stderr) + a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, a.stdout, a.stderr) if err != nil { a.logger.Error("failed to create process", slog.Any("err", err)) return @@ -159,14 +145,11 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) { if a.retryCounter == a.retryCount { a.logger.Error("failed to start process after retries", slog.Int("retryCount", a.retryCount)) } + ctx.Shutdown() case models.WorkloadRunTypeFunction: a.state = models.WorkloadStateWarm - cctx, cancel := context.WithCancel(context.Background()) - a.cancel = cancel - - // TODO: subscribe to all triggers or change to only allow one // TODO: need to have a better quere group. possibly an ID at start time s, err := a.nc.QueueSubscribe(a.triggerSub, a.processName, func(msg *nats.Msg) { a.state = models.WorkloadStateRunning @@ -182,14 +165,11 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) { a.state = models.WorkloadStateError }() - stdout := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: false} - stderr := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: true} - if a.env == nil { a.env = make(map[string]string) } a.env["NEX_TRIGGER_DATA"] = string(msg.Data) - a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, stdout, stderr) + a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, a.stdout, a.stderr) if err != nil { a.logger.Error("failed to create process", slog.Any("err", err)) return @@ -217,14 +197,11 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) { _ = s.Unsubscribe() }() - <-cctx.Done() + <-a.ctx.Done() case models.WorkloadRunTypeJob: a.state = models.WorkloadStateRunning - stdout := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: false} - stderr := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: true} - - a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, stdout, stderr) + a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, a.stdout, a.stderr) if err != nil { a.logger.Error("failed to create process", slog.Any("err", err)) return