Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Process Actor Refactor #471

Merged
merged 5 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 1 addition & 15 deletions node/internal/actors/direct_start_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,21 +263,7 @@ func (a *DirectStartAgent) startWorkload(m *actorproto.StartWorkload) (*actorpro
if m.WorkloadId == "" {
m.WorkloadId = nuid.New().Next()
}
pa, err := createNewProcessActor(
a.ctx,
a.logger.WithGroup("workload"),
a.nc,
m.WorkloadId,
m.Argv,
m.Namespace,
m.WorkloadType,
m.WorkloadName,
ref,
env,
m.WorkloadRuntype,
m.TriggerSubject,
int(m.RetryCount),
)
pa, err := NewProcessActor(a.ctx, a.logger.WithGroup("workload"), a.nc, m, ref, env)
if err != nil {
a.logger.Error("Failed to create process actor", slog.String("name", a.self.Name()), slog.Any("err", err))
return nil, err
Expand Down
11 changes: 6 additions & 5 deletions node/internal/actors/os_procs.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"log/slog"
"os/exec"
"strings"

"github.com/nats-io/nats.go"
)
Expand Down Expand Up @@ -111,17 +112,17 @@ type logCapture struct {
nc *nats.Conn
namespace string
stderr bool
name string
id string
}

// Log capture implementation of io.Writer for stdout and stderr
func (cap logCapture) Write(p []byte) (n int, err error) {
if !cap.stderr {
_ = cap.nc.Publish(fmt.Sprintf("$NEX.logs.%s.%s.stdout", cap.namespace, cap.name), p)
cap.logger.Debug(string(p), slog.String("process_name", cap.name))
_ = cap.nc.Publish(fmt.Sprintf("$NEX.logs.%s.%s.stdout", cap.namespace, cap.id), p)
cap.logger.Debug(strings.ReplaceAll(string(p), "\n", "\\n"))
} else {
_ = cap.nc.Publish(fmt.Sprintf("$NEX.logs.%s.%s.stderr", cap.namespace, cap.name), p)
cap.logger.Error(string(p), slog.String("process_name", cap.name))
_ = cap.nc.Publish(fmt.Sprintf("$NEX.logs.%s.%s.stderr", cap.namespace, cap.id), p)
cap.logger.Error(strings.ReplaceAll(string(p), "\n", "\\n"))
}
return len(p), nil
}
89 changes: 33 additions & 56 deletions node/internal/actors/process_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ const (
)

type processActor struct {
ctx context.Context
ctx context.Context
cancel context.CancelFunc
logger *slog.Logger
nc *nats.Conn

startedAt time.Time
runTime time.Duration
id string
nc *nats.Conn

argv []string
env map[string]string
Expand All @@ -37,49 +40,38 @@ type processActor struct {

retryCounter int

cancel context.CancelFunc
logger *slog.Logger
self *goakt.PID
self *goakt.PID

process *OsProcess
stdout logCapture
stderr logCapture
}

func createNewProcessActor(
ctx context.Context,
logger *slog.Logger,
nc *nats.Conn,
processId string,
argv []string,
namespace string,
workloadType string,
processName string,
ref *ArtifactReference,
env map[string]string,
runType string,
triggerSub string,
retryCount int,
) (*processActor, error) {

ret := processActor{
func NewProcessActor(ctx context.Context, logger *slog.Logger, nc *nats.Conn, m *actorproto.StartWorkload, ref *ArtifactReference, clearEnv map[string]string) (*processActor, error) {
jordan-rash marked this conversation as resolved.
Show resolved Hide resolved
ctx, cancel := context.WithCancel(ctx)
ret := &processActor{
ctx: ctx,
cancel: cancel,
logger: logger,
nc: nc,
startedAt: time.Now(),
runTime: 0,
id: processId,
nc: nc,
workloadType: workloadType,
namespace: namespace,
processName: processName,
runType: runType,
id: m.WorkloadId,
workloadType: m.WorkloadType,
namespace: m.Namespace,
processName: m.WorkloadName,
runType: m.WorkloadRuntype,
state: models.WorkloadStateStarting,
ref: ref,
logger: logger,
argv: argv,
env: env,
triggerSub: triggerSub,
retryCount: retryCount,
argv: m.Argv,
env: clearEnv,
triggerSub: m.TriggerSubject,
retryCount: int(m.RetryCount),
retryCounter: 0,
stdout: logCapture{logger: logger.With(slog.String("id", m.WorkloadId)), nc: nc, namespace: m.Namespace, id: m.WorkloadId, stderr: false},
stderr: logCapture{logger: logger.With(slog.String("id", m.WorkloadId)), nc: nc, namespace: m.Namespace, id: m.WorkloadId, stderr: true},
}

return &ret, nil
return ret, nil
}

func (a *processActor) PreStart(ctx context.Context) error {
Expand All @@ -106,10 +98,7 @@ func (a *processActor) Receive(ctx *goakt.ReceiveContext) {
return
}

if a.cancel != nil {
a.cancel()
}

a.cancel()
ctx.Shutdown()
case *actorproto.QueryWorkload:
ctx.Response(&actorproto.WorkloadSummary{
Expand All @@ -136,10 +125,7 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) {
for a.state != models.WorkloadStateStopped && a.retryCounter < a.retryCount {
a.state = models.WorkloadStateRunning

stdout := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: false}
stderr := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: true}

a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, stdout, stderr)
a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, a.stdout, a.stderr)
if err != nil {
a.logger.Error("failed to create process", slog.Any("err", err))
return
Expand All @@ -159,14 +145,11 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) {
if a.retryCounter == a.retryCount {
a.logger.Error("failed to start process after retries", slog.Int("retryCount", a.retryCount))
}

ctx.Shutdown()
case models.WorkloadRunTypeFunction:
a.state = models.WorkloadStateWarm

cctx, cancel := context.WithCancel(context.Background())
a.cancel = cancel

// TODO: subscribe to all triggers or change to only allow one
// TODO: need to have a better quere group. possibly an ID at start time
s, err := a.nc.QueueSubscribe(a.triggerSub, a.processName, func(msg *nats.Msg) {
a.state = models.WorkloadStateRunning
Expand All @@ -182,14 +165,11 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) {
a.state = models.WorkloadStateError
}()

stdout := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: false}
stderr := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: true}

if a.env == nil {
a.env = make(map[string]string)
}
a.env["NEX_TRIGGER_DATA"] = string(msg.Data)
a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, stdout, stderr)
a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, a.stdout, a.stderr)
if err != nil {
a.logger.Error("failed to create process", slog.Any("err", err))
return
Expand Down Expand Up @@ -217,14 +197,11 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) {
_ = s.Unsubscribe()
}()

<-cctx.Done()
<-a.ctx.Done()
case models.WorkloadRunTypeJob:
a.state = models.WorkloadStateRunning

stdout := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: false}
stderr := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: true}

a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, stdout, stderr)
a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, a.stdout, a.stderr)
if err != nil {
a.logger.Error("failed to create process", slog.Any("err", err))
return
Expand Down
Loading