Skip to content

Commit

Permalink
Merge branch 'master' into remove-redundant-code
Browse files Browse the repository at this point in the history
Signed-off-by: Ripul Handoo <107461226+RipulHandoo@users.noreply.github.com>
  • Loading branch information
RipulHandoo authored Nov 3, 2023
2 parents 5b9f4c9 + 997d1ab commit e3f2deb
Show file tree
Hide file tree
Showing 152 changed files with 706 additions and 369 deletions.
2 changes: 1 addition & 1 deletion chaoscenter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,5 +45,5 @@ helm install my-release bitnami/mongodb --values mongo-values.yml -n <NAMESPACE>
Applying the manifest file will install all the required service account configuration and ChaosCenter in cluster scope.

```shell
kubectl apply -f https://github.com/litmuschaos/litmus/blob/master/mkdocs/docs/3.0.0/litmus-cluster-scope-3.0.0.yaml
kubectl apply -f https://raw.githubusercontent.com/litmuschaos/litmus/master/mkdocs/docs/3.0.0/litmus-cluster-scope-3.0.0.yaml
```
Original file line number Diff line number Diff line change
Expand Up @@ -1208,7 +1208,7 @@ func (c *ChaosExperimentHandler) GetProbesInExperimentRun(ctx context.Context, p
probeDetails []*model.GetProbesInExperimentRunResponse
probeStatusMap = make(map[string]model.ProbeVerdict)
probeDescriptionMap = make(map[string]*string)
executionData types.ExecutionData
probeModeMap = make(map[string]model.Mode)
)

wfRun, err := c.chaosExperimentRunOperator.GetExperimentRun(bson.D{
Expand All @@ -1220,16 +1220,12 @@ func (c *ChaosExperimentHandler) GetProbesInExperimentRun(ctx context.Context, p
return nil, err
}

if err = json.Unmarshal([]byte(wfRun.ExecutionData), &executionData); err != nil {
return nil, errors.New("failed to unmarshal workflow manifest")
}

for _, _probe := range wfRun.Probes {
if _probe.FaultName == faultName {

mode := "SOT"
for _, probeName := range _probe.ProbeNames {
var executionData types.ExecutionData
probeStatusMap[probeName] = model.ProbeVerdictNa
probeModeMap[probeName] = model.ModeSot
description := "Either probe is not executed or not evaluated"
probeDescriptionMap[probeName] = &description

Expand All @@ -1250,7 +1246,7 @@ func (c *ChaosExperimentHandler) GetProbesInExperimentRun(ctx context.Context, p

for _, probeStatus := range probeStatuses {
if probeStatus.Name == probeName {
mode = probeStatus.Mode
probeModeMap[probeName] = model.Mode(probeStatus.Mode)

description := probeStatus.Status.Description
probeDescriptionMap[probeStatus.Name] = &description
Expand Down Expand Up @@ -1286,7 +1282,7 @@ func (c *ChaosExperimentHandler) GetProbesInExperimentRun(ctx context.Context, p

probeDetails = append(probeDetails, &model.GetProbesInExperimentRunResponse{
Probe: singleProbe.GetOutputProbe(),
Mode: model.Mode(mode),
Mode: probeModeMap[probeName],
Status: &model.Status{
Verdict: probeStatusMap[probeName],
Description: probeDescriptionMap[probeName],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ func (c *ChaosExperimentRunHandler) GetExperimentRun(ctx context.Context, projec
{"$project", bson.D{
{"name", 1},
{"is_custom_experiment", 1},
{"experiment_type", 1},
{"revision", bson.D{{
"$filter", bson.D{
{"input", "$revision"},
Expand Down Expand Up @@ -229,10 +230,13 @@ func (c *ChaosExperimentRunHandler) GetExperimentRun(ctx context.Context, projec
}
}

expType := string(wfRun.ExperimentDetails[0].ExperimentType)

expRunResponse = &model.ExperimentRun{
ExperimentName: wfRun.ExperimentDetails[0].ExperimentName,
ExperimentID: wfRun.ExperimentID,
ExperimentRunID: wfRun.ExperimentRunID,
ExperimentType: &expType,
NotifyID: wfRun.NotifyID,
Weightages: weightages,
ExperimentManifest: workflowRunManifest,
Expand Down
33 changes: 16 additions & 17 deletions chaoscenter/subscriber/pkg/events/chaosengine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"errors"
"fmt"
"strconv"
"subscriber/pkg/k8s"
"subscriber/pkg/types"

wfclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
Expand All @@ -23,13 +22,13 @@ import (
)

// ChaosEventWatcher initializes the Litmus ChaosEngine event watcher
func ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
func (ev *subscriberEvents) ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string) {
startTime, err := strconv.Atoi(infraData["START_TIME"])
if err != nil {
logrus.WithError(err).Fatal("failed to parse startTime")
}

cfg, err := k8s.GetKubeConfig()
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("could not get kube config")
}
Expand All @@ -55,17 +54,17 @@ func ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, in
informer = f.Litmuschaos().V1alpha1().ChaosEngines().Informer()
}

go startWatchEngine(stopCh, informer, stream, int64(startTime))
go ev.startWatchEngine(stopCh, informer, stream, int64(startTime))
}

// handles the different events events - add, update and delete
func startWatchEngine(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
// handles the different*subscriberEvents - add, update and delete
func (ev *subscriberEvents) startWatchEngine(stopCh <-chan struct{}, s cache.SharedIndexInformer, stream chan types.WorkflowEvent, startTime int64) {
handlers := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
chaosEventHandler(obj, "ADD", stream, startTime)
ev.chaosEventHandler(obj, "ADD", stream, startTime)
},
UpdateFunc: func(oldObj, obj interface{}) {
chaosEventHandler(obj, "UPDATE", stream, startTime)
ev.chaosEventHandler(obj, "UPDATE", stream, startTime)
},
}

Expand All @@ -74,7 +73,7 @@ func startWatchEngine(stopCh <-chan struct{}, s cache.SharedIndexInformer, strea
}

// responsible for extracting the required data from the event and streaming
func chaosEventHandler(obj interface{}, eventType string, stream chan types.WorkflowEvent, startTime int64) {
func (ev *subscriberEvents) chaosEventHandler(obj interface{}, eventType string, stream chan types.WorkflowEvent, startTime int64) {
workflowObj := obj.(*chaosTypes.ChaosEngine)
if workflowObj.Labels["workflow_id"] == "" {
logrus.WithFields(map[string]interface{}{
Expand All @@ -89,7 +88,7 @@ func chaosEventHandler(obj interface{}, eventType string, stream chan types.Work
return
}

cfg, err := k8s.GetKubeConfig()
cfg, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
logrus.WithError(err).Fatal("could not get kube config")
}
Expand All @@ -104,12 +103,12 @@ func chaosEventHandler(obj interface{}, eventType string, stream chan types.Work
var cd *types.ChaosData = nil

//extracts chaos data
cd, err = getChaosData(v1alpha1.NodeStatus{StartedAt: workflowObj.ObjectMeta.CreationTimestamp}, workflowObj.Name, workflowObj.Namespace, chaosClient)
cd, err = ev.getChaosData(v1alpha1.NodeStatus{StartedAt: workflowObj.ObjectMeta.CreationTimestamp}, workflowObj.Name, workflowObj.Namespace, chaosClient)
if err != nil {
logrus.WithError(err).Print("FAILED PARSING CHAOS ENGINE CRD")
}

// considering chaos events has only 1 artifact with manifest as raw data
// considering chaos*subscriberEvents has only 1 artifact with manifest as raw data
finTime := int64(-1)
if workflowObj.Status.EngineStatus == chaosTypes.EngineStatusCompleted || workflowObj.Status.EngineStatus == chaosTypes.EngineStatusStopped {
if len(workflowObj.Status.Experiments) > 0 {
Expand Down Expand Up @@ -157,7 +156,7 @@ func chaosEventHandler(obj interface{}, eventType string, stream chan types.Work
}

// StopChaosEngineState is used to patch all the chaosEngines with engineState=stop
func StopChaosEngineState(namespace string, workflowRunID *string) error {
func (ev *subscriberEvents) StopChaosEngineState(namespace string, workflowRunID *string) error {
ctx := context.TODO()

//Define the GVR
Expand All @@ -168,7 +167,7 @@ func StopChaosEngineState(namespace string, workflowRunID *string) error {
}

//Generate the dynamic client
_, dynamicClient, err := k8s.GetDynamicAndDiscoveryClient()
_, dynamicClient, err := ev.subscriberK8s.GetDynamicAndDiscoveryClient()
if err != nil {
return errors.New("failed to get dynamic client, error: " + err.Error())
}
Expand All @@ -185,7 +184,7 @@ func StopChaosEngineState(namespace string, workflowRunID *string) error {
return errors.New("failed to list chaosengines: " + err.Error())
}

//Foe every chaosEngine patch the engineState to Stop
//Foe every subscriber patch the engineState to Stop
for _, val := range chaosEngines.Items {
patch := []byte(`{"spec":{"engineState":"stop"}}`)
patched, err := dynamicClient.Resource(resourceType).Namespace(namespace).Patch(ctx, val.GetName(), mergeType.MergePatchType, patch, v1.PatchOptions{})
Expand All @@ -200,9 +199,9 @@ func StopChaosEngineState(namespace string, workflowRunID *string) error {
}

// StopWorkflow will patch the workflow based on workflow name using the shutdown strategy
func StopWorkflow(wfName string, namespace string) error {
func (ev *subscriberEvents) StopWorkflow(wfName string, namespace string) error {

conf, err := k8s.GetKubeConfig()
conf, err := ev.subscriberK8s.GetKubeConfig()
wfClient := wfclientset.NewForConfigOrDie(conf).ArgoprojV1alpha1().Workflows(namespace)
patch := []byte(`{"spec":{"shutdown":"Stop"}}`)
wf, err := wfClient.Patch(context.TODO(), wfName, mergeType.MergePatchType, patch, v1.PatchOptions{})
Expand Down
38 changes: 38 additions & 0 deletions chaoscenter/subscriber/pkg/events/definations.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package events

import (
"subscriber/pkg/graphql"
"subscriber/pkg/types"

"subscriber/pkg/k8s"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha13 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
v1alpha12 "github.com/litmuschaos/chaos-operator/pkg/client/clientset/versioned/typed/litmuschaos/v1alpha1"
)

type SubscriberEvents interface {
ChaosEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string)
StopChaosEngineState(namespace string, workflowRunID *string) error
CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error)
GetWorkflowObj(uid string) (*v1alpha1.Workflow, error)
ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error)
GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error)
WorkflowEventWatcher(stopCh chan struct{}, stream chan types.WorkflowEvent, infraData map[string]string)
WorkflowEventHandler(workflowObj *v1alpha1.Workflow, eventType string, startTime int64) (types.WorkflowEvent, error)
SendWorkflowUpdates(infraData map[string]string, event types.WorkflowEvent) (string, error)
WorkflowUpdates(infraData map[string]string, event chan types.WorkflowEvent)
StopWorkflow(wfName string, namespace string) error
}

type subscriberEvents struct {
gqlSubscriberServer graphql.SubscriberGql
subscriberK8s k8s.SubscriberK8s
}

func NewSubscriberEventsOperator(gqlSubscriberServer graphql.SubscriberGql, subscriberK8s k8s.SubscriberK8s) SubscriberEvents {
return &subscriberEvents{
gqlSubscriberServer: gqlSubscriberServer,
subscriberK8s: subscriberK8s,
}
}
19 changes: 9 additions & 10 deletions chaoscenter/subscriber/pkg/events/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"regexp"
"strconv"
"strings"
"subscriber/pkg/k8s"
"subscriber/pkg/types"

"github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
Expand All @@ -24,7 +23,7 @@ import (
)

// util function, extracts the chaos data using the litmus go-client
func getChaosData(nodeStatus v1alpha13.NodeStatus, engineName, engineNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (*types.ChaosData, error) {
func (ev *subscriberEvents) getChaosData(nodeStatus v1alpha13.NodeStatus, engineName, engineNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (*types.ChaosData, error) {
cd := &types.ChaosData{}
cd.EngineName = engineName
cd.Namespace = engineNS
Expand Down Expand Up @@ -79,7 +78,7 @@ func getChaosData(nodeStatus v1alpha13.NodeStatus, engineName, engineNS string,
}

// CheckChaosData util function, checks if event is a chaos-exp event, if so - extract the chaos data
func CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error) {
func (ev *subscriberEvents) CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosClient *v1alpha12.LitmuschaosV1alpha1Client) (string, *types.ChaosData, error) {
nodeType := string(nodeStatus.Type)
var cd *types.ChaosData = nil
// considering chaos events has only 1 artifact with manifest as raw data
Expand All @@ -92,7 +91,7 @@ func CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosCli
if nodeStatus.Phase != "Pending" {
name := obj.GetName()
if obj.GetGenerateName() != "" {
log, err := k8s.GetLogs(nodeStatus.ID, workflowNS, "main")
log, err := ev.subscriberK8s.GetLogs(nodeStatus.ID, workflowNS, "main")
if err != nil {
return nodeType, nil, err
}
Expand All @@ -101,7 +100,7 @@ func CheckChaosData(nodeStatus v1alpha13.NodeStatus, workflowNS string, chaosCli
return nodeType, nil, errors.New("Chaos-Engine Generated Name couldn't be retrieved")
}
}
cd, err = getChaosData(nodeStatus, name, obj.GetNamespace(), chaosClient)
cd, err = ev.getChaosData(nodeStatus, name, obj.GetNamespace(), chaosClient)
return nodeType, cd, err
}
}
Expand Down Expand Up @@ -130,9 +129,9 @@ func StrConvTime(time int64) string {
}
}

func GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
func (ev *subscriberEvents) GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
ctx := context.TODO()
conf, err := k8s.GetKubeConfig()
conf, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
return nil, err
}
Expand All @@ -153,9 +152,9 @@ func GetWorkflowObj(uid string) (*v1alpha1.Workflow, error) {
return nil, nil
}

func ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
func (ev *subscriberEvents) ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
ctx := context.TODO()
conf, err := k8s.GetKubeConfig()
conf, err := ev.subscriberK8s.GetKubeConfig()
if err != nil {
return nil, err
}
Expand All @@ -171,7 +170,7 @@ func ListWorkflowObject(wfid string) (*v1alpha1.WorkflowList, error) {
}

// GenerateWorkflowPayload generate graphql mutation payload for events event
func GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error) {
func (ev *subscriberEvents) GenerateWorkflowPayload(cid, accessKey, version, completed string, wfEvent types.WorkflowEvent) ([]byte, error) {
infraID := `{infraID: \"` + cid + `\", version: \"` + version + `\", accessKey: \"` + accessKey + `\"}`
for id, event := range wfEvent.Nodes {
event.Message = strings.Replace(event.Message, `"`, ``, -1)
Expand Down
Loading

0 comments on commit e3f2deb

Please sign in to comment.