diff --git a/api/nodecontrol/client.go b/api/nodecontrol/client.go index 34e489b9..522b2ede 100644 --- a/api/nodecontrol/client.go +++ b/api/nodecontrol/client.go @@ -107,13 +107,14 @@ func (c *ControlAPIClient) DirectPing(nodeId string) (*nodegen.NodePingResponseJ if err != nil { return nil, err } - resp := new(nodegen.NodePingResponseJson) - err = json.Unmarshal(msg.Data, resp) + + envelope := new(models.Envelope[nodegen.NodePingResponseJson]) + err = json.Unmarshal(msg.Data, envelope) if err != nil { return nil, err } - return resp, nil + return &envelope.Data, nil } func (c *ControlAPIClient) FindWorkload(namespace, workloadId string) (*nodegen.WorkloadPingResponseJson, error) { diff --git a/api/nodecontrol/client_test.go b/api/nodecontrol/client_test.go new file mode 100644 index 00000000..6273b4dc --- /dev/null +++ b/api/nodecontrol/client_test.go @@ -0,0 +1,470 @@ +package nodecontrol + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/json" + "errors" + "fmt" + "log/slog" + "os" + "path/filepath" + "testing" + "time" + + "disorder.dev/shandler" + "github.com/nats-io/nats-server/v2/server" + "github.com/nats-io/nats.go" + "github.com/nats-io/nkeys" + "github.com/synadia-io/nex/api/nodecontrol/gen" + "github.com/synadia-io/nex/models" + "github.com/synadia-io/nex/node" +) + +const ( + // NCUU2YIYXEPGTCDXDKQR7LL5PXDHIDG7SDFLWKE3WY63ZGCZL2HKIAJT - pub key + Node1ServerSeed string = "SNAB2T3VG2363NDA2JK7NT5O3FN5VCXI2MYJHOPFO2NIDXQU6DIWQTBQC4" + // XAL54S5FE6SRPONXRNVE4ZDAOHOT44GFIY2ZW33DHLR2U3H2HJSXXRKY - pub xkey + Node1XKeySeed string = "SXAOUP7RZFW5QPE2GDWTPABUDM5UIAK6BPULJPWZQAFFL2RZ5K3UYWHYY4" +) + +func startNatsServer(t testing.TB, workingDir string) (*server.Server, error) { + t.Helper() + + s := server.New(&server.Options{ + Port: -1, + JetStream: true, + StoreDir: workingDir, + }) + + s.Start() + + go s.WaitForShutdown() + + return s, nil +} + +func startNexus(t testing.TB, ctx context.Context, logger *slog.Logger, workingDir, natsUrl string, numNodes int) error { + t.Helper() + + nc, err := nats.Connect(natsUrl) + if err != nil { + return err + } + + for i := 0; i < numNodes; i++ { + var kp, xkp nkeys.KeyPair + if i == 0 { + kp, err = nkeys.FromSeed([]byte(Node1ServerSeed)) + if err != nil { + return err + } + xkp, err = nkeys.FromSeed([]byte(Node1XKeySeed)) + if err != nil { + return err + } + } else { + kp, err = nkeys.CreateServer() + if err != nil { + return err + } + xkp, err = nkeys.CreateCurveKeys() + if err != nil { + return err + } + } + nn, err := node.NewNexNode(kp, nc, + models.WithContext(ctx), + models.WithLogger(logger), + models.WithXKeyKeyPair(xkp), + models.WithNodeName(fmt.Sprintf("node-%d", i+1)), + models.WithNexus("testnexus"), + models.WithResourceDirectory(workingDir), + ) + if err != nil { + return err + } + + err = nn.Validate() + if err != nil { + return err + } + + go func() { + err = nn.Start() + if err != nil { + t.Error(err) + t.FailNow() + } + }() + } + + return nil +} + +func TestAuction(t *testing.T) { + workingDir := t.TempDir() + natsServer, err := startNatsServer(t, workingDir) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + + t.Cleanup(func() { + os.RemoveAll(filepath.Join(os.TempDir(), "inex-NCUU2YIYXEPGTCDXDKQR7LL5PXDHIDG7SDFLWKE3WY63ZGCZL2HKIAJT")) + cancel() + natsServer.Shutdown() + }) + + stdout := new(bytes.Buffer) + stderr := new(bytes.Buffer) + logger := slog.New(shandler.NewHandler( + shandler.WithLogLevel(slog.LevelDebug), + shandler.WithGroupFilter([]string{"actor_system"}), + shandler.WithStdOut(stdout), + shandler.WithStdErr(stderr), + )) + + err = startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) + if err != nil { + t.Fatal(err) + } + + time.Sleep(1000 * time.Millisecond) + nc, err := nats.Connect(natsServer.ClientURL()) + if err != nil { + t.Fatal(err) + } + + control, err := NewControlApiClient(nc, logger) + if err != nil { + t.Fatal(err) + } + + resp, err := control.Auction("system", map[string]string{}) + if err != nil { + t.Fatal(err) + } + + if len(resp) != 1 { + t.Fatalf("expected 1 response, got %d", len(resp)) + } + + resp, err = control.Auction("system", map[string]string{"nex.node": "notreal"}) + if err != nil { + t.Fatal(err) + } + + if len(resp) != 0 { + t.Fatalf("expected 0 response, got %d", len(resp)) + } +} + +func TestPing(t *testing.T) { + workingDir := t.TempDir() + natsServer, err := startNatsServer(t, workingDir) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + + t.Cleanup(func() { + os.RemoveAll(filepath.Join(os.TempDir(), "inex-NCUU2YIYXEPGTCDXDKQR7LL5PXDHIDG7SDFLWKE3WY63ZGCZL2HKIAJT")) + cancel() + natsServer.Shutdown() + }) + + stdout := new(bytes.Buffer) + stderr := new(bytes.Buffer) + logger := slog.New(shandler.NewHandler( + shandler.WithLogLevel(slog.LevelDebug), + shandler.WithGroupFilter([]string{"actor_system"}), + shandler.WithStdOut(stdout), + shandler.WithStdErr(stderr), + )) + + err = startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 5) + if err != nil { + t.Fatal(err) + } + + time.Sleep(1000 * time.Millisecond) + nc, err := nats.Connect(natsServer.ClientURL()) + if err != nil { + t.Fatal(err) + } + + control, err := NewControlApiClient(nc, logger) + if err != nil { + t.Fatal(err) + } + + resp, err := control.Ping() + if err != nil { + t.Fatal(err) + } + + if len(resp) != 5 { + t.Fatalf("expected 5 responses, got %d", len(resp)) + } +} + +func TestDirectPing(t *testing.T) { + workingDir := t.TempDir() + natsServer, err := startNatsServer(t, workingDir) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + + t.Cleanup(func() { + os.RemoveAll(filepath.Join(os.TempDir(), "inex-NCUU2YIYXEPGTCDXDKQR7LL5PXDHIDG7SDFLWKE3WY63ZGCZL2HKIAJT")) + cancel() + natsServer.Shutdown() + }) + + stdout := new(bytes.Buffer) + stderr := new(bytes.Buffer) + logger := slog.New(shandler.NewHandler( + shandler.WithLogLevel(slog.LevelDebug), + shandler.WithGroupFilter([]string{"actor_system"}), + shandler.WithStdOut(stdout), + shandler.WithStdErr(stderr), + )) + + err = startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) + if err != nil { + t.Fatal(err) + } + + time.Sleep(1000 * time.Millisecond) + nc, err := nats.Connect(natsServer.ClientURL()) + if err != nil { + t.Fatal(err) + } + + control, err := NewControlApiClient(nc, logger) + if err != nil { + t.Fatal(err) + } + + resp, err := control.DirectPing("NCUU2YIYXEPGTCDXDKQR7LL5PXDHIDG7SDFLWKE3WY63ZGCZL2HKIAJT") + if err != nil { + t.Fatal(err) + } + + if resp.NodeId != "NCUU2YIYXEPGTCDXDKQR7LL5PXDHIDG7SDFLWKE3WY63ZGCZL2HKIAJT" { + t.Fatalf("expected node id NCUU2YIYXEPGTCDXDKQR7LL5PXDHIDG7SDFLWKE3WY63ZGCZL2HKIAJT, got %s", resp.NodeId) + } +} + +func TestAuctionDeployAndFindWorkload(t *testing.T) { + workingDir := t.TempDir() + natsServer, err := startNatsServer(t, workingDir) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + + t.Cleanup(func() { + os.RemoveAll(filepath.Join(os.TempDir(), "inex-NCUU2YIYXEPGTCDXDKQR7LL5PXDHIDG7SDFLWKE3WY63ZGCZL2HKIAJT")) + cancel() + natsServer.Shutdown() + }) + + stdout := new(bytes.Buffer) + stderr := new(bytes.Buffer) + logger := slog.New(shandler.NewHandler( + shandler.WithLogLevel(slog.LevelDebug), + shandler.WithGroupFilter([]string{"actor_system"}), + shandler.WithStdOut(stdout), + shandler.WithStdErr(stderr), + )) + + err = startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) + if err != nil { + t.Fatal(err) + } + + time.Sleep(1000 * time.Millisecond) + nc, err := nats.Connect(natsServer.ClientURL()) + if err != nil { + t.Fatal(err) + } + + control, err := NewControlApiClient(nc, logger) + if err != nil { + t.Fatal(err) + } + + auctionResp, err := control.Auction("system", map[string]string{}) + if err != nil { + t.Fatal(err) + } + + env := make(map[string]string) + envB, err := json.Marshal(env) + if err != nil { + t.Fatal(err) + } + + tAKey, err := nkeys.CreateCurveKeys() + if err != nil { + t.Fatal(err) + } + + tAPub, err := tAKey.PublicKey() + if err != nil { + t.Fatal(err) + } + + encEnv, err := tAKey.Seal(envB, auctionResp[0].TargetXkey) + if err != nil { + t.Fatal(err) + } + + resp, err := control.AuctionDeployWorkload("system", auctionResp[0].BidderId, gen.StartWorkloadRequestJson{ + Description: "Test Workload", + Namespace: "system", + RetryCount: 3, + Uri: "file://../../test/testdata/forever/forever", + WorkloadName: "testworkload", + WorkloadRuntype: "service", + WorkloadType: "direct-start", + EncEnvironment: gen.SharedEncEnvJson{ + Base64EncryptedEnv: base64.StdEncoding.EncodeToString(encEnv), + EncryptedBy: tAPub, + }, + }) + if err != nil { + t.Fatal(err) + } + + if !resp.Started { + t.Fatalf("expected workload to be started") + } + + pingResp, err := control.FindWorkload("system", resp.Id) + if err != nil { + t.Fatal(err) + } + + if pingResp.WorkloadSummary.Id != resp.Id { + t.Fatalf("expected workload id %s, got %s", resp.Id, pingResp.WorkloadSummary.Id) + } + + if pingResp.WorkloadSummary.WorkloadState != models.WorkloadStateRunning { + t.Fatalf("expected workload status running, got %s", pingResp.WorkloadSummary.WorkloadState) + } + + _, err = control.FindWorkload("badnamespace", resp.Id) + if !errors.Is(err, nats.ErrTimeout) { + t.Fatalf("expected timeout error, got %v", err) + } +} + +func TestDirectDeployAndListWorkloads(t *testing.T) { + workingDir := t.TempDir() + natsServer, err := startNatsServer(t, workingDir) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) + + t.Cleanup(func() { + os.RemoveAll(filepath.Join(os.TempDir(), "inex-NCUU2YIYXEPGTCDXDKQR7LL5PXDHIDG7SDFLWKE3WY63ZGCZL2HKIAJT")) + cancel() + natsServer.Shutdown() + }) + + stdout := new(bytes.Buffer) + stderr := new(bytes.Buffer) + logger := slog.New(shandler.NewHandler( + shandler.WithLogLevel(slog.LevelDebug), + shandler.WithGroupFilter([]string{"actor_system"}), + shandler.WithStdOut(stdout), + shandler.WithStdErr(stderr), + )) + + err = startNexus(t, ctx, logger, workingDir, natsServer.ClientURL(), 1) + if err != nil { + t.Fatal(err) + } + + time.Sleep(1000 * time.Millisecond) + nc, err := nats.Connect(natsServer.ClientURL()) + if err != nil { + t.Fatal(err) + } + + control, err := NewControlApiClient(nc, logger) + if err != nil { + t.Fatal(err) + } + + env := make(map[string]string) + envB, err := json.Marshal(env) + if err != nil { + t.Fatal(err) + } + + tAKey, err := nkeys.CreateCurveKeys() + if err != nil { + t.Fatal(err) + } + + tAPub, err := tAKey.PublicKey() + if err != nil { + t.Fatal(err) + } + + encEnv, err := tAKey.Seal(envB, "XAL54S5FE6SRPONXRNVE4ZDAOHOT44GFIY2ZW33DHLR2U3H2HJSXXRKY") + if err != nil { + t.Fatal(err) + } + + resp, err := control.DeployWorkload("system", "NCUU2YIYXEPGTCDXDKQR7LL5PXDHIDG7SDFLWKE3WY63ZGCZL2HKIAJT", gen.StartWorkloadRequestJson{ + Description: "Test Workload", + Namespace: "system", + RetryCount: 3, + Uri: "file://../../test/testdata/forever/forever", + WorkloadName: "testworkload", + WorkloadRuntype: "service", + WorkloadType: "direct-start", + EncEnvironment: gen.SharedEncEnvJson{ + Base64EncryptedEnv: base64.StdEncoding.EncodeToString(encEnv), + EncryptedBy: tAPub, + }, + }) + if err != nil { + t.Fatal(err) + } + + if !resp.Started { + t.Fatalf("expected workload to be started") + } + + wl, err := control.ListWorkloads("system") + if err != nil { + t.Fatal(err) + } + + if len(wl) != 1 { + t.Fatalf("expected 1 workload, got %d", len(wl)) + } + + wl, err = control.ListWorkloads("badnamespace") + if err != nil { + t.Fatal(err) + } + + if len(wl) != 0 { + t.Fatalf("expected 0 workloads, got %d", len(wl)) + } +} diff --git a/models/node_options.go b/models/node_options.go index 1db90c47..89116778 100644 --- a/models/node_options.go +++ b/models/node_options.go @@ -1,6 +1,7 @@ package models import ( + "context" "encoding/json" "errors" "log/slog" @@ -22,6 +23,7 @@ const ( var ReservedTagPrefixes = []string{"nex."} type NodeOptions struct { + Context context.Context Logger *slog.Logger Xkey nkeys.KeyPair AgentHandshakeTimeout int @@ -40,6 +42,14 @@ type NodeOptions struct { type NodeOption func(*NodeOptions) +func WithContext(ctx context.Context) NodeOption { + return func(n *NodeOptions) { + if ctx != nil { + n.Context = ctx + } + } +} + func WithLogger(s *slog.Logger) NodeOption { return func(n *NodeOptions) { if s != nil { diff --git a/node/internal/actors/control_api.go b/node/internal/actors/control_api.go index 6ee65d24..df37d6c6 100644 --- a/node/internal/actors/control_api.go +++ b/node/internal/actors/control_api.go @@ -133,7 +133,7 @@ func (api *ControlAPI) subscribe() error { }{ {AuctionSubscribeSubject(), api.handleAuction}, {UndeploySubscribeSubject(), api.handleUndeploy}, - {AuctionDeploySubscribeSubject(), api.handleADeploy}, + {AuctionDeploySubscribeSubject(), api.handleDeploy}, {CloneWorkloadSubscribeSubject(), api.handleCloneWorkload}, {NamespacePingSubscribeSubject(), api.handleNamespacePing}, {WorkloadPingSubscribeSubject(), api.handleWorkloadPing}, @@ -272,10 +272,11 @@ func (api *ControlAPI) handleAuction(m *nats.Msg) { models.RespondEnvelope(m, AuctionResponseType, 200, auctionResponseFromProto(auctResp), "") } -func (api *ControlAPI) handleADeploy(m *nats.Msg) { - // $NEX.control.default.ADEPLOY.bidderId +func (api *ControlAPI) handleDeploy(m *nats.Msg) { + // $NEX.control.default.ADEPLOY.bidderId <- Auction Deploy + // $NEX.control.system.DDEPLOY.NNODEID... <- Direct Deploy splitSub := strings.SplitN(m.Subject, ".", 5) - // splitSub[4] is the bidderId + // splitSub[4] is the bidderId or the Node ID target, xkp, err := api.nodeCallback.IsTargetNode(splitSub[4]) if err != nil { api.logger.Error("Failed to check if target node", slog.Any("error", err)) @@ -365,58 +366,6 @@ func (api *ControlAPI) handleADeploy(m *nats.Msg) { models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(&workloadStarted), "") } -func (api *ControlAPI) handleDeploy(m *nats.Msg) { - req := new(nodegen.StartWorkloadRequestJson) - err := json.Unmarshal(m.Data, req) - if err != nil { - api.logger.Error("Failed to unmarshal deploy request", slog.Any("error", err)) - models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to unmarshal deploy request: %s", err)) - return - } - - if req.WorkloadName == "" { - req.WorkloadName = nuid.New().Next() - } - - ctx := context.Background() - _, agent, err := api.self.ActorSystem().ActorOf(ctx, req.WorkloadType) - if err != nil { - api.logger.Error("Failed to locate agent actor", slog.String("type", req.WorkloadType), slog.Any("error", err)) - models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to locate [%s] agent actor: %s", req.WorkloadType, err)) - return - } - - askResp, err := api.self.Ask(ctx, agent, startRequestToProto(req), DefaultAskDuration) - if err != nil { - api.logger.Error("Failed to start workload", slog.Any("error", err)) - models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("Failed to start workload: %s", err)) - return - } - - protoResp, ok := askResp.(*actorproto.Envelope) - if !ok { - api.logger.Error("Start workload response from agent was not the correct type") - models.RespondEnvelope(m, RunResponseType, 500, "", "Agent returned the wrong data type") - return - } - - if protoResp.Error != nil { - api.logger.Error("Agent returned an error", slog.Any("error", protoResp.Error)) - models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("agent returned an error: %s", protoResp.Error)) - return - } - - var workloadStarted actorproto.WorkloadStarted - err = protoResp.Payload.UnmarshalTo(&workloadStarted) - if err != nil { - api.logger.Error("Failed to unmarshal workload started response", slog.Any("error", err)) - models.RespondEnvelope(m, RunResponseType, 500, "", fmt.Sprintf("failed to unmarshal workload started response: %s", err)) - return - } - - models.RespondEnvelope(m, RunResponseType, 200, startResponseFromProto(&workloadStarted), "") -} - func (api *ControlAPI) handleUndeploy(m *nats.Msg) { // $NEX.control.namespace.UNDEPLOY.workloadid splitSub := strings.SplitN(m.Subject, ".", 5) diff --git a/node/node.go b/node/node.go index 0909ceb7..07f86f46 100644 --- a/node/node.go +++ b/node/node.go @@ -62,12 +62,12 @@ func NewNexNode(serverKey nkeys.KeyPair, nc *nats.Conn, opts ...models.NodeOptio } nn := &nexNode{ - ctx: context.Background(), nc: nc, publicKey: serverKey, auctionMap: NewTTLMap(time.Second * 10), options: &models.NodeOptions{ + Context: context.Background(), Logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{})), AgentHandshakeTimeout: 5000, ResourceDirectory: "./resources", @@ -181,7 +181,7 @@ func (nn *nexNode) Validate() error { // Can be stopped by canceling the provided context func (nn *nexNode) Start() error { var cancel context.CancelFunc - nn.ctx, cancel = context.WithCancel(nn.ctx) + nn.ctx, cancel = context.WithCancel(nn.options.Context) defer cancel() nn.interrupt = make(chan os.Signal, 1)