Skip to content

Commit

Permalink
simplify process actor constructor
Browse files Browse the repository at this point in the history
Signed-off-by: Jordan Rash <jordan@synadia.com>
  • Loading branch information
jordan-rash committed Dec 26, 2024
1 parent caa0440 commit 31b89d5
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 71 deletions.
16 changes: 1 addition & 15 deletions node/internal/actors/direct_start_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,21 +263,7 @@ func (a *DirectStartAgent) startWorkload(m *actorproto.StartWorkload) (*actorpro
if m.WorkloadId == "" {
m.WorkloadId = nuid.New().Next()
}
pa, err := createNewProcessActor(
a.ctx,
a.logger.WithGroup("workload"),
a.nc,
m.WorkloadId,
m.Argv,
m.Namespace,
m.WorkloadType,
m.WorkloadName,
ref,
env,
m.WorkloadRuntype,
m.TriggerSubject,
int(m.RetryCount),
)
pa, err := NewProcessActor(a.ctx, a.logger.WithGroup("workload"), a.nc, m, ref, env)
if err != nil {
a.logger.Error("Failed to create process actor", slog.String("name", a.self.Name()), slog.Any("err", err))
return nil, err
Expand Down
89 changes: 33 additions & 56 deletions node/internal/actors/process_actor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@ const (
)

type processActor struct {
ctx context.Context
ctx context.Context
cancel context.CancelFunc
logger *slog.Logger
nc *nats.Conn

startedAt time.Time
runTime time.Duration
id string
nc *nats.Conn

argv []string
env map[string]string
Expand All @@ -37,49 +40,38 @@ type processActor struct {

retryCounter int

cancel context.CancelFunc
logger *slog.Logger
self *goakt.PID
self *goakt.PID

process *OsProcess
stdout logCapture
stderr logCapture
}

func createNewProcessActor(
ctx context.Context,
logger *slog.Logger,
nc *nats.Conn,
processId string,
argv []string,
namespace string,
workloadType string,
processName string,
ref *ArtifactReference,
env map[string]string,
runType string,
triggerSub string,
retryCount int,
) (*processActor, error) {

ret := processActor{
func NewProcessActor(ctx context.Context, logger *slog.Logger, nc *nats.Conn, m *actorproto.StartWorkload, ref *ArtifactReference, clearEnv map[string]string) (*processActor, error) {
ctx, cancel := context.WithCancel(ctx)
ret := &processActor{
ctx: ctx,
cancel: cancel,
logger: logger,
nc: nc,
startedAt: time.Now(),
runTime: 0,
id: processId,
nc: nc,
workloadType: workloadType,
namespace: namespace,
processName: processName,
runType: runType,
id: m.WorkloadId,
workloadType: m.WorkloadType,
namespace: m.Namespace,
processName: m.WorkloadName,
runType: m.WorkloadRuntype,
state: models.WorkloadStateStarting,
ref: ref,
logger: logger,
argv: argv,
env: env,
triggerSub: triggerSub,
retryCount: retryCount,
argv: m.Argv,
env: clearEnv,
triggerSub: m.TriggerSubject,
retryCount: int(m.RetryCount),
retryCounter: 0,
stdout: logCapture{logger: logger, nc: nc, namespace: m.Namespace, name: m.WorkloadId, stderr: false},
stderr: logCapture{logger: logger, nc: nc, namespace: m.Namespace, name: m.WorkloadId, stderr: true},
}

return &ret, nil
return ret, nil
}

func (a *processActor) PreStart(ctx context.Context) error {
Expand All @@ -106,10 +98,7 @@ func (a *processActor) Receive(ctx *goakt.ReceiveContext) {
return
}

if a.cancel != nil {
a.cancel()
}

a.cancel()
ctx.Shutdown()
case *actorproto.QueryWorkload:
ctx.Response(&actorproto.WorkloadSummary{
Expand All @@ -136,10 +125,7 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) {
for a.state != models.WorkloadStateStopped && a.retryCounter < a.retryCount {
a.state = models.WorkloadStateRunning

stdout := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: false}
stderr := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: true}

a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, stdout, stderr)
a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, a.stdout, a.stderr)
if err != nil {
a.logger.Error("failed to create process", slog.Any("err", err))
return
Expand All @@ -159,14 +145,11 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) {
if a.retryCounter == a.retryCount {
a.logger.Error("failed to start process after retries", slog.Int("retryCount", a.retryCount))
}

ctx.Shutdown()
case models.WorkloadRunTypeFunction:
a.state = models.WorkloadStateWarm

cctx, cancel := context.WithCancel(context.Background())
a.cancel = cancel

// TODO: subscribe to all triggers or change to only allow one
// TODO: need to have a better quere group. possibly an ID at start time
s, err := a.nc.QueueSubscribe(a.triggerSub, a.processName, func(msg *nats.Msg) {
a.state = models.WorkloadStateRunning
Expand All @@ -182,14 +165,11 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) {
a.state = models.WorkloadStateError
}()

stdout := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: false}
stderr := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: true}

if a.env == nil {
a.env = make(map[string]string)
}
a.env["NEX_TRIGGER_DATA"] = string(msg.Data)
a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, stdout, stderr)
a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, a.stdout, a.stderr)
if err != nil {
a.logger.Error("failed to create process", slog.Any("err", err))
return
Expand Down Expand Up @@ -217,14 +197,11 @@ func (a *processActor) SpawnOsProcess(ctx *goakt.ReceiveContext) {
_ = s.Unsubscribe()
}()

<-cctx.Done()
<-a.ctx.Done()
case models.WorkloadRunTypeJob:
a.state = models.WorkloadStateRunning

stdout := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: false}
stderr := logCapture{logger: a.logger, nc: a.nc, namespace: a.namespace, name: a.id, stderr: true}

a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, stdout, stderr)
a.process, err = NewOsProcess(a.ctx, a.id, a.ref.LocalCachePath, a.env, a.argv, a.logger, a.stdout, a.stderr)
if err != nil {
a.logger.Error("failed to create process", slog.Any("err", err))
return
Expand Down

0 comments on commit 31b89d5

Please sign in to comment.