From 21e443d0aa57e5b3d23a7341facaca8ad67bf9c5 Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Tue, 16 Feb 2016 15:11:23 +0900 Subject: [PATCH] Implemented #103 ("local://" endpoint). Also implemented proc inspector for `earthquake-container` (#104). Close #103 #104 --- README.md | 51 +++-- earthquake-container/cli/cli.go | 5 +- earthquake-container/cli/run.go | 203 ------------------ earthquake-container/cli/run/run.go | 96 +++++++++ earthquake-container/cli/run/runflag.go | 70 ++++++ earthquake-container/cli/run/runprereq.go | 47 ++++ .../cli/{prereq.go => run/runutil.go} | 21 +- earthquake-container/container/boot.go | 53 +++++ earthquake-container/container/ns.go | 51 +++++ earthquake-container/container/remove.go | 34 +++ earthquake-container/container/util.go | 89 -------- earthquake-container/core/earthquake.go | 62 ++++++ earthquake-container/core/nfqueue.go | 53 +++++ earthquake-container/core/start.go | 68 ++++++ earthquake/cli/inspectors/ethernet.go | 15 +- earthquake/cli/inspectors/fs.go | 14 +- earthquake/cli/inspectors/proc.go | 12 +- .../explorepolicy/random/randompolicy.go | 1 + earthquake/explorepolicy/random/randomproc.go | 32 +-- .../inspector/ethernet/ethernet_hookswitch.go | 6 +- earthquake/inspector/ethernet/ethernet_nfq.go | 6 +- earthquake/inspector/fs/fs.go | 6 +- earthquake/inspector/proc/proc.go | 30 +-- .../inspector/transceiver/localtransceiver.go | 90 ++++++++ .../transceiver/resttransceiver.go} | 29 ++- .../inspector/transceiver/transceiver.go | 37 ++++ .../inspectorhandler/inspectorhandler.go | 30 ++- .../localinspectorhandler.go | 77 +++++++ .../pbinspectorhandler/pbinspectorhandler.go | 13 +- .../restinspectorhandler.go | 14 +- earthquake/orchestrator/orchestrator.go | 2 +- earthquake/signal/event_filesystem.go | 18 +- earthquake/signal/event_packet.go | 18 +- earthquake/signal/event_procset.go | 18 +- earthquake/util/config/config.go | 41 +++- .../orchestrator/orchestratorutil.go} | 12 +- earthquake/util/rest/apiroot.go | 1 - example/etcd/3517-degrade-check/config.toml | 1 + example/etcd/3517-reproduce/config.toml | 1 + example/template/config.toml | 2 + example/template/config_mypolicy.toml | 1 + example/template/mypolicy.go | 2 + example/zk-found-2212.nfqhook/config.toml | 1 + example/zk-found-2212.ryu/config.toml | 1 + example/zk-found-2212.ryu/config_dumb.toml | 1 + 45 files changed, 993 insertions(+), 442 deletions(-) delete mode 100644 earthquake-container/cli/run.go create mode 100644 earthquake-container/cli/run/run.go create mode 100644 earthquake-container/cli/run/runflag.go create mode 100644 earthquake-container/cli/run/runprereq.go rename earthquake-container/cli/{prereq.go => run/runutil.go} (63%) create mode 100644 earthquake-container/container/boot.go create mode 100644 earthquake-container/container/ns.go create mode 100644 earthquake-container/container/remove.go create mode 100644 earthquake-container/core/earthquake.go create mode 100644 earthquake-container/core/nfqueue.go create mode 100644 earthquake-container/core/start.go create mode 100644 earthquake/inspector/transceiver/localtransceiver.go rename earthquake/{util/rest/clientutil.go => inspector/transceiver/resttransceiver.go} (85%) create mode 100644 earthquake/inspector/transceiver/transceiver.go create mode 100644 earthquake/inspectorhandler/localinspectorhandler/localinspectorhandler.go rename earthquake/{cli/inspectors/util.go => util/orchestrator/orchestratorutil.go} (74%) diff --git a/README.md b/README.md index 434d94c..3d5ebfb 100644 --- a/README.md +++ b/README.md @@ -10,8 +10,8 @@ Earthquake is a programmable fuzzy scheduler for testing real implementations of Blog: [http://osrg.github.io/earthquake/](http://osrg.github.io/earthquake/) Earthquakes permutes C/Java function calls, Ethernet packets, Filesystem events, and injected faults in various orders so as to find implementation-level bugs of the distributed system. -When Earthquake finds a bug, Earthquake automatically records [the event history](http://osrg.github.io/earthquake/post/zookeeper-2212/) and helps you to analyze which permutation of events triggers the bug. -Earthquake also collects [branch patterns](http://osrg.github.io/earthquake/post/zookeeper-2080/) for deeper analysis. +Earthquake can also control non-determinism of the thread interleaving (by calling `sched_setattr(2)` with randomized parameters). +So Earthquake can be also used for testing standalone multi-threaded software. Basically, Earthquake permutes events in a random order, but you can write your [own state exploration policy](doc/arch.md) (in Golang) for finding deep bugs efficiently. @@ -28,29 +28,52 @@ Basically, Earthquake permutes events in a random order, but you can write your ## Quick Start The following instruction shows how you can start *Earthquake Container*, the simplified CLI for Earthquake. -(For full-stack Earthquake environment, please refer to [doc/how-to-setup-env-full.md](doc/how-to-setup-env-full.md).) $ sudo apt-get install libzmq3-dev libnetfilter-queue-dev $ go get github.com/osrg/earthquake/earthquake-container - $ sudo earthquake-container run -it --rm --eq-config config.toml ubuntu bash + $ sudo earthquake-container run -it --rm ubuntu bash + +In *Earthquake Container*, you can run arbitrary command that might be *flaky*. +JUnit tests are interesting to try. + + earthquake-container$ git clone something + earthquake-container$ cd something + earthquake-container$ for f in $(seq 1 1000);do mvn test; done + + +You can also specify a config file (`-eq-config` option for `earthquake-container`.) A typical configuration file (`config.toml`) is as follows: ```toml +# Policy for observing events and yielding actions +# You can also implement your own policy. +# Default: "random" explorePolicy = "random" + [explorePolicyParam] + # for Ethernet/Filesystem/Java inspectors, event are non-deterministically delayed. + # minInterval and maxInterval are bounds for the non-deterministic delays + # Default: 0 and 0 minInterval = "80ms" maxInterval = "3000ms" + +[containerParam] + # Default: false + enableEthernetInspector = true + # Default: true + enableProcInspector = true + # Default: "1s" + procWatchInterval = "1s" ``` -In *Earthquake Container*, you can run arbitrary command that might be *flaky*. -JUnit tests are interesting to try. +If you don't want to use containers, you can also use Earthquake with an arbitrary process tree. - earthquake-container$ git clone something - earthquake-container$ cd something - earthquake-container$ for f in $(seq 1 1000);do mvn test; done + $ go get github.com/osrg/earthquake/earthquake + $ sudo earthquake inspectors proc -root-pid $TARGET_PID -watch-interval 1s -autopilot config.toml +For full-stack (fully-distributed) Earthquake environment, please refer to [doc/how-to-setup-env-full.md](doc/how-to-setup-env-full.md).) [The slides for the presentation at FOSDEM](http://www.slideshare.net/AkihiroSuda/tackling-nondeterminism-in-hadoop-testing-and-debugging-distributed-systems-with-earthquake-57866497/42) might be also helpful. @@ -86,11 +109,13 @@ func (p *MyPolicy) QueueNextEvent(event Event) { // - JavaFunctionEvent (byteman) // - PacketEvent (Netfilter, Openflow) // - FilesystemEvent (FUSE) + // - ProcSetEvent (Linux procfs) // - LogEvent (syslog) fmt.Printf("Event: %s\n", event) // You can also inject fault actions // - PacketFaultAction // - FilesystemFaultAction + // - ProcSetSchedAction // - ShellAction action, err := event.DefaultAction() if err != nil { @@ -117,8 +142,6 @@ func main(){ ``` Please refer to [example/template](example/template) for further information. ---------------------------------------- - -# On-going Work - -We are also working on a method to control the non-deternimism in the kernel task scheduler. Preview is available at [here](https://github.com/AkihiroSuda/MicroEarthquake). +## Known Limitation +After running Earthquake (process inspector) many times, `sched_setattr(2)` can fail with `EBUSY`. +This seems to be a bug of kernel; We're looking into this. diff --git a/earthquake-container/cli/cli.go b/earthquake-container/cli/cli.go index b84c5f6..4b0ba33 100644 --- a/earthquake-container/cli/cli.go +++ b/earthquake-container/cli/cli.go @@ -18,6 +18,7 @@ package cli import ( "fmt" log "github.com/cihub/seelog" + "github.com/osrg/earthquake/earthquake-container/cli/run" eqcli "github.com/osrg/earthquake/earthquake/cli" "os" ) @@ -50,8 +51,8 @@ func CLIMain(args []string) int { } switch args[1] { case "run": - return run(args[1:]) + return run.Run(args[1:]) } - fmt.Fprintf(os.Stderr, "'%s' is not a earthquake-container command.", args[1]) + fmt.Fprintf(os.Stderr, "'%s' is not a earthquake-container command.\n", args[1]) return 1 } diff --git a/earthquake-container/cli/run.go b/earthquake-container/cli/run.go deleted file mode 100644 index a561a66..0000000 --- a/earthquake-container/cli/run.go +++ /dev/null @@ -1,203 +0,0 @@ -// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or -// implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package cli - -import ( - "fmt" - log "github.com/cihub/seelog" - flag "github.com/docker/docker/pkg/mflag" - dockerpty "github.com/fgrehm/go-dockerpty" - docker "github.com/fsouza/go-dockerclient" - . "github.com/osrg/earthquake/earthquake-container/container" - "os" - "time" -) - -func parseRun(cmd *flag.FlagSet, args []string) (*docker.CreateContainerOptions, error) { - var ( - flStdin = cmd.Bool([]string{"i", "-interactive"}, false, "Keep STDIN open even if not attached") - flTty = cmd.Bool([]string{"t", "-tty"}, false, "Allocate a pseudo-TTY") - flDetach = cmd.Bool([]string{"d", "-detach"}, false, "[NOT SUPPORTED] Run container in background and print container ID") - flName = cmd.String([]string{"-name"}, "", "Assign a name to the container") - // the caller should handle "-rm" with cmd.IsSet() - _ = cmd.Bool([]string{"-rm"}, false, "Automatically remove the container when it exits") - // the caller should handle "-eq-config" - flEqConfig = cmd.String([]string{"-eq-config"}, "", "Earthquake configuration file") - ) - if err := cmd.Parse(args); err != nil { - return nil, err - } - if !*flStdin { - return nil, fmt.Errorf("-interactive is expected.") - } - if !*flTty { - return nil, fmt.Errorf("-tty is expected.") - } - if *flDetach { - return nil, fmt.Errorf("Currently, -detach is not supported.") - } - if *flEqConfig == "" { - return nil, fmt.Errorf("-eq-config is expected.") - } - - parsedArgs := cmd.Args() - if len(parsedArgs) < 2 { - return nil, fmt.Errorf("requires a minimum of 2 arguments") - } - image := parsedArgs[0] - execCmd := parsedArgs[1:] - - dockerOpt := docker.CreateContainerOptions{ - Name: *flName, - Config: &docker.Config{ - Image: image, - Cmd: execCmd, - OpenStdin: *flStdin, - StdinOnce: true, - AttachStdin: true, - AttachStdout: true, - AttachStderr: true, - Tty: *flTty, - }, - HostConfig: &docker.HostConfig{}, - } - return &dockerOpt, nil -} - -func bootContainer(client *docker.Client, opt *docker.CreateContainerOptions, - exitCh chan error) (*docker.Container, error) { - log.Debugf("Creating container for image %s", opt.Config.Image) - container, err := client.CreateContainer(*opt) - if err != nil { - return container, err - } - - log.Debugf("Starting container %s", container.ID) - go func() { - exitCh <- dockerpty.Start(client, container, opt.HostConfig) - }() - - trial := 0 - for { - container, err = client.InspectContainer(container.ID) - if container.State.StartedAt.Unix() > 0 { - break - } - if trial > 30 { - return container, fmt.Errorf("container %s seems not started. state=%#v", container.ID, container.State) - } - trial += 1 - time.Sleep(time.Duration(trial*100) * time.Millisecond) - } - log.Debugf("container state=%#v", container.State) - return container, nil -} - -func removeContainer(client *docker.Client, container *docker.Container) error { - log.Debugf("Removing container %s", container.ID) - err := client.RemoveContainer(docker.RemoveContainerOptions{ - ID: container.ID, - Force: true, - }) - log.Debugf("Removed container %s", container.ID) - if err != nil { - log.Error(err) - } - return err -} - -func startRoutines(container *docker.Container, nfqueueNum int, configPath string) error { - log.Debugf("Configuring NFQUEUE %d for container %s", nfqueueNum, container.ID) - err := SetupNFQUEUE(container, nfqueueNum, false, false) - if err != nil { - return err - } - - // TODO: refactor - log.Debugf("Starting Orchestrator") - go func() { - oerr := StartOrchestrator(configPath) - if oerr != nil { - panic(log.Critical(oerr)) - } - }() - - log.Debugf("Starting Inspector") - go func() { - ierr := StartEthernetInspector(container, nfqueueNum) - if ierr != nil { - panic(log.Critical(ierr)) - } - }() - - return nil -} - -// FIXME: too long function scope -func run(args []string) int { - if len(args) < 3 { - // FIXME - fmt.Fprintf(os.Stderr, "bad argument: %s\n", args) - return 1 - } - flagSet := flag.NewFlagSet("run", flag.ExitOnError) - dockerOpt, err := parseRun(flagSet, args[1:]) - if err != nil { - fmt.Fprintf(os.Stderr, "%s\n", err) - return 1 - } - removeOnExit := flagSet.IsSet("-rm") - - nfqueueNum := 42 // FIXME - configPath := flagSet.Lookup("-eq-config").Value.String() - - if err = checkPrerequisite(); err != nil { - fmt.Fprintf(os.Stderr, "prerequisite error: %s\n", err) - return 1 - } - - client, err := NewDockerClient() - if err != nil { - panic(err) - } - - exited := make(chan error) - container, err := bootContainer(client, dockerOpt, exited) - if err == docker.ErrNoSuchImage { - log.Critical(err) - // TODO: pull the image automatically - log.Infof("You need to run `docker pull %s`", dockerOpt.Config.Image) - return 1 - } else if err != nil { - panic(err) - } - if removeOnExit { - defer removeContainer(client, container) - } - - err = startRoutines(container, nfqueueNum, configPath) - if err != nil { - panic(err) - } - - err = <-exited - if err != nil { - log.Error(err) - } - log.Debugf("Exiting..") - - return 0 -} diff --git a/earthquake-container/cli/run/run.go b/earthquake-container/cli/run/run.go new file mode 100644 index 0000000..77c5528 --- /dev/null +++ b/earthquake-container/cli/run/run.go @@ -0,0 +1,96 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package run + +import ( + "fmt" + log "github.com/cihub/seelog" + flag "github.com/docker/docker/pkg/mflag" + docker "github.com/fsouza/go-dockerclient" + "github.com/osrg/earthquake/earthquake-container/container" + "github.com/osrg/earthquake/earthquake-container/core" + "github.com/osrg/earthquake/earthquake/util/config" + "os" +) + +func prepare(args []string) (dockerOpt *docker.CreateContainerOptions, removeOnExit bool, eqCfg config.Config, err error) { + if len(args) < 3 { + // FIXME + err = fmt.Errorf("bad argument: %s", args) + return + } + flagSet := flag.NewFlagSet("run", flag.ExitOnError) + dockerOpt, err = parseRun(flagSet, args[1:]) + if err != nil { + return + } + removeOnExit = flagSet.IsSet("-rm") + + eqCfgPath := flagSet.Lookup("-eq-config").Value.String() + eqCfg, err = newConfig(eqCfgPath) + if err != nil { + err = fmt.Errorf("bad config: %s", err) + return + } + log.Debugf("Earthquake Config=%s", eqCfg) + + if err = checkPrerequisite(eqCfg); err != nil { + err = fmt.Errorf("prerequisite error: %s", err) + } + return +} + +func Run(args []string) int { + dockerOpt, removeOnExit, eqCfg, err := prepare(args) + if err != nil { + // do not panic here + fmt.Fprintf(os.Stderr, "%s\n", err) + return 1 + } + + client, err := container.NewDockerClient() + if err != nil { + panic(log.Critical(err)) + } + + containerExitStatusChan := make(chan error) + c, err := container.Boot(client, dockerOpt, containerExitStatusChan) + if err == docker.ErrNoSuchImage { + log.Critical(err) + // TODO: pull the image automatically + log.Infof("You need to run `docker pull %s`", dockerOpt.Config.Image) + return 1 + } else if err != nil { + panic(log.Critical(err)) + } + if removeOnExit { + defer container.Remove(client, c) + } + + err = core.StartEarthquakeRoutines(c, eqCfg) + if err != nil { + panic(log.Critical(err)) + } + + err = <-containerExitStatusChan + if err != nil { + // do not panic here + log.Error(err) + } + log.Debugf("Exiting..") + // TODO: propagate err + return 0 +} diff --git a/earthquake-container/cli/run/runflag.go b/earthquake-container/cli/run/runflag.go new file mode 100644 index 0000000..04b8c9d --- /dev/null +++ b/earthquake-container/cli/run/runflag.go @@ -0,0 +1,70 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package run + +import ( + "fmt" + flag "github.com/docker/docker/pkg/mflag" + docker "github.com/fsouza/go-dockerclient" +) + +func parseRun(cmd *flag.FlagSet, args []string) (*docker.CreateContainerOptions, error) { + var ( + flStdin = cmd.Bool([]string{"i", "-interactive"}, false, "Keep STDIN open even if not attached") + flTty = cmd.Bool([]string{"t", "-tty"}, false, "Allocate a pseudo-TTY") + flDetach = cmd.Bool([]string{"d", "-detach"}, false, "[NOT SUPPORTED] Run container in background and print container ID") + flName = cmd.String([]string{"-name"}, "", "Assign a name to the container") + // the caller should handle "-rm" with cmd.IsSet() + _ = cmd.Bool([]string{"-rm"}, false, "Automatically remove the container when it exits") + // the caller should handle "-eq-config" + _ = cmd.String([]string{"-eq-config"}, "", "Earthquake configuration file") + ) + if err := cmd.Parse(args); err != nil { + return nil, err + } + if !*flStdin { + return nil, fmt.Errorf("-interactive is expected.") + } + if !*flTty { + return nil, fmt.Errorf("-tty is expected.") + } + if *flDetach { + return nil, fmt.Errorf("Currently, -detach is not supported.") + } + + parsedArgs := cmd.Args() + if len(parsedArgs) < 2 { + return nil, fmt.Errorf("requires a minimum of 2 arguments") + } + image := parsedArgs[0] + execCmd := parsedArgs[1:] + + dockerOpt := docker.CreateContainerOptions{ + Name: *flName, + Config: &docker.Config{ + Image: image, + Cmd: execCmd, + OpenStdin: *flStdin, + StdinOnce: true, + AttachStdin: true, + AttachStdout: true, + AttachStderr: true, + Tty: *flTty, + }, + HostConfig: &docker.HostConfig{}, + } + return &dockerOpt, nil +} diff --git a/earthquake-container/cli/run/runprereq.go b/earthquake-container/cli/run/runprereq.go new file mode 100644 index 0000000..413c358 --- /dev/null +++ b/earthquake-container/cli/run/runprereq.go @@ -0,0 +1,47 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package run + +import ( + "fmt" + "github.com/osrg/earthquake/earthquake/util/config" + cap "github.com/syndtr/gocapability/capability" +) + +func checkPrerequisite(cfg config.Config) error { + dummyPID := 0 + capInst, err := cap.NewPid(dummyPID) + if err != nil { + return err + } + + if cfg.GetBool("containerParam.enableEthernetInspector") { + if !capInst.Get(cap.EFFECTIVE, cap.CAP_NET_ADMIN) { + return fmt.Errorf("CAP_NET_ADMIN is needed.") + } + if !capInst.Get(cap.EFFECTIVE, cap.CAP_SYS_ADMIN) { + return fmt.Errorf("CAP_SYS_ADMIN is needed.") + } + } + + if cfg.GetBool("containerParam.enableProcInspector") { + if !capInst.Get(cap.EFFECTIVE, cap.CAP_SYS_NICE) { + return fmt.Errorf("CAP_SYS_NICE is needed.") + } + } + + return nil +} diff --git a/earthquake-container/cli/prereq.go b/earthquake-container/cli/run/runutil.go similarity index 63% rename from earthquake-container/cli/prereq.go rename to earthquake-container/cli/run/runutil.go index 4a70d82..7de839c 100644 --- a/earthquake-container/cli/prereq.go +++ b/earthquake-container/cli/run/runutil.go @@ -13,23 +13,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -package cli +package run import ( - "fmt" - cap "github.com/syndtr/gocapability/capability" + "github.com/osrg/earthquake/earthquake/util/config" ) -func checkPrerequisite() error { - c, err := cap.NewPid(0) - if err != nil { - return err +func newConfig(pathMaybeEmpty string) (config.Config, error) { + if pathMaybeEmpty == "" { + return config.New(), nil + } else { + return config.NewFromFile(pathMaybeEmpty) } - if !c.Get(cap.EFFECTIVE, cap.CAP_NET_ADMIN) { - return fmt.Errorf("CAP_NET_ADMIN is needed.") - } - if !c.Get(cap.EFFECTIVE, cap.CAP_SYS_ADMIN) { - return fmt.Errorf("CAP_SYS_ADMIN is needed.") - } - return nil } diff --git a/earthquake-container/container/boot.go b/earthquake-container/container/boot.go new file mode 100644 index 0000000..901e74f --- /dev/null +++ b/earthquake-container/container/boot.go @@ -0,0 +1,53 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package container + +import ( + "fmt" + log "github.com/cihub/seelog" + dockerpty "github.com/fgrehm/go-dockerpty" + docker "github.com/fsouza/go-dockerclient" + "time" +) + +func Boot(client *docker.Client, opt *docker.CreateContainerOptions, + exitCh chan error) (*docker.Container, error) { + log.Debugf("Creating container for image %s", opt.Config.Image) + container, err := client.CreateContainer(*opt) + if err != nil { + return container, err + } + + log.Debugf("Starting container %s", container.ID) + go func() { + exitCh <- dockerpty.Start(client, container, opt.HostConfig) + }() + + trial := 0 + for { + container, err = client.InspectContainer(container.ID) + if container.State.StartedAt.Unix() > 0 { + break + } + if trial > 30 { + return container, fmt.Errorf("container %s seems not started. state=%#v", container.ID, container.State) + } + trial += 1 + time.Sleep(time.Duration(trial*100) * time.Millisecond) + } + log.Debugf("container state=%#v", container.State) + return container, nil +} diff --git a/earthquake-container/container/ns.go b/earthquake-container/container/ns.go new file mode 100644 index 0000000..7e5618b --- /dev/null +++ b/earthquake-container/container/ns.go @@ -0,0 +1,51 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package container + +import ( + log "github.com/cihub/seelog" + docker "github.com/fsouza/go-dockerclient" + "github.com/vishvananda/netns" + "runtime" +) + +var origNs netns.NsHandle +var newNs netns.NsHandle + +func EnterDockerNetNs(container *docker.Container) error { + var err error + origNs, err = netns.Get() + if err != nil { + return log.Criticalf("could not obtain current netns. netns not supported?: %s", err) + } + // netns.GetFromDocker() does not work for recent dockers. + // So we use netns.GetFromPid() directly. + // https://github.com/vishvananda/netns/pull/10 + newNs, err = netns.GetFromPid(container.State.Pid) + if err != nil { + return log.Criticalf("Could not get netns for container %s (pid=%d). The container has exited??: %s", container.ID, container.State.Pid, err) + } + runtime.LockOSThread() + netns.Set(newNs) + return nil +} + +func LeaveNetNs() { + netns.Set(origNs) + runtime.UnlockOSThread() + origNs.Close() + newNs.Close() +} diff --git a/earthquake-container/container/remove.go b/earthquake-container/container/remove.go new file mode 100644 index 0000000..11f4484 --- /dev/null +++ b/earthquake-container/container/remove.go @@ -0,0 +1,34 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package container + +import ( + log "github.com/cihub/seelog" + docker "github.com/fsouza/go-dockerclient" +) + +func Remove(client *docker.Client, container *docker.Container) error { + log.Debugf("Removing container %s", container.ID) + err := client.RemoveContainer(docker.RemoveContainerOptions{ + ID: container.ID, + Force: true, + }) + log.Debugf("Removed container %s", container.ID) + if err != nil { + log.Error(err) + } + return err +} diff --git a/earthquake-container/container/util.go b/earthquake-container/container/util.go index c29a4f8..f69caf7 100644 --- a/earthquake-container/container/util.go +++ b/earthquake-container/container/util.go @@ -16,15 +16,9 @@ package container import ( - "fmt" log "github.com/cihub/seelog" docker "github.com/fsouza/go-dockerclient" - cliinsputil "github.com/osrg/earthquake/earthquake/cli/inspectors" - "github.com/osrg/earthquake/earthquake/inspector/ethernet" - "github.com/vishvananda/netns" "os" - "os/exec" - "runtime" "strings" ) @@ -37,86 +31,3 @@ func NewDockerClient() (*docker.Client, error) { } return docker.NewClientFromEnv() } - -var origNs netns.NsHandle -var newNs netns.NsHandle - -func enterDockerNetNs(container *docker.Container) error { - var err error - origNs, err = netns.Get() - if err != nil { - return log.Criticalf("could not obtain current netns. netns not supported?: %s", err) - } - // netns.GetFromDocker() does not work for recent dockers. - // So we use netns.GetFromPid() directly. - // https://github.com/vishvananda/netns/pull/10 - newNs, err = netns.GetFromPid(container.State.Pid) - if err != nil { - return log.Criticalf("Could not get netns for container %s (pid=%d). The container has exited??: %s", container.ID, container.State.Pid, err) - } - runtime.LockOSThread() - netns.Set(newNs) - return nil -} - -func leaveNetNs() { - netns.Set(origNs) - runtime.UnlockOSThread() - origNs.Close() - newNs.Close() -} - -func SetupNFQUEUE(container *docker.Container, queueNum int, hookInput bool, disableBypass bool) error { - err := enterDockerNetNs(container) - if err != nil { - return err - } - defer leaveNetNs() - - chain := "OUTPUT" - if hookInput { - chain = "INPUT" - } - iptArg := []string{"-A", chain, "-j", "NFQUEUE", "--queue-num", fmt.Sprintf("%d", queueNum)} - if !disableBypass { - iptArg = append(iptArg, "--queue-bypass") - } - - log.Debugf("Running `iptables` with %s", iptArg) - cmd := exec.Command("iptables", iptArg...) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - cmd.Env = os.Environ() - err = cmd.Run() - if err != nil { - return err - } - return nil -} - -func StartOrchestrator(path string) error { - // TODO: refactor. should not use cliinsputil. - autopilotOrchestrator, err := cliinsputil.NewAutopilotOrchestrator(path) - if err != nil { - return err - } - autopilotOrchestrator.Start() - return nil -} - -func StartEthernetInspector(container *docker.Container, queueNum int) error { - err := enterDockerNetNs(container) - if err != nil { - return err - } - // FIXME: should not use REST RPC for in-process communication - insp := ðernet.NFQInspector{ - OrchestratorURL: "http://localhost:10080/api/v3", - EntityID: "_earthquake_ethernet_inspector", - NFQNumber: uint16(queueNum), - EnableTCPWatcher: true, - } - defer leaveNetNs() - insp.Start() - return nil -} diff --git a/earthquake-container/core/earthquake.go b/earthquake-container/core/earthquake.go new file mode 100644 index 0000000..8997d35 --- /dev/null +++ b/earthquake-container/core/earthquake.go @@ -0,0 +1,62 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + docker "github.com/fsouza/go-dockerclient" + "github.com/osrg/earthquake/earthquake-container/container" + "github.com/osrg/earthquake/earthquake/inspector/ethernet" + "github.com/osrg/earthquake/earthquake/inspector/proc" + "github.com/osrg/earthquake/earthquake/util/config" + ocutil "github.com/osrg/earthquake/earthquake/util/orchestrator" + "time" +) + +func StartOrchestrator(cfg config.Config) error { + autopilotOrchestrator, err := ocutil.NewAutopilotOrchestrator(cfg) + if err != nil { + return err + } + autopilotOrchestrator.Start() + return nil +} + +func StartEthernetInspector(c *docker.Container, queueNum int) error { + err := container.EnterDockerNetNs(c) + if err != nil { + return err + } + insp := ðernet.NFQInspector{ + OrchestratorURL: ocutil.LocalOrchestratorURL, + EntityID: "_earthquake_container_ethernet_inspector", + NFQNumber: uint16(queueNum), + EnableTCPWatcher: true, + } + defer container.LeaveNetNs() + insp.Start() + return nil +} + +func StartProcInspector(c *docker.Container, watchInterval time.Duration) error { + insp := &proc.ProcInspector{ + OrchestratorURL: ocutil.LocalOrchestratorURL, + EntityID: "_earthquake_container_proc_inspector", + RootPID: c.State.Pid, + WatchInterval: watchInterval, + } + insp.Start() + return nil +} diff --git a/earthquake-container/core/nfqueue.go b/earthquake-container/core/nfqueue.go new file mode 100644 index 0000000..c0d6c37 --- /dev/null +++ b/earthquake-container/core/nfqueue.go @@ -0,0 +1,53 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "fmt" + log "github.com/cihub/seelog" + docker "github.com/fsouza/go-dockerclient" + "github.com/osrg/earthquake/earthquake-container/container" + "os" + "os/exec" +) + +func SetupNFQUEUE(c *docker.Container, queueNum int, hookInput bool, disableBypass bool) error { + err := container.EnterDockerNetNs(c) + if err != nil { + return err + } + defer container.LeaveNetNs() + + chain := "OUTPUT" + if hookInput { + chain = "INPUT" + } + iptArg := []string{"-A", chain, "-j", "NFQUEUE", "--queue-num", fmt.Sprintf("%d", queueNum)} + if !disableBypass { + iptArg = append(iptArg, "--queue-bypass") + } + + log.Debugf("Running `iptables` with %s", iptArg) + cmd := exec.Command("iptables", iptArg...) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = os.Environ() + err = cmd.Run() + if err != nil { + return err + } + return nil +} diff --git a/earthquake-container/core/start.go b/earthquake-container/core/start.go new file mode 100644 index 0000000..6b22f60 --- /dev/null +++ b/earthquake-container/core/start.go @@ -0,0 +1,68 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package core + +import ( + "fmt" + log "github.com/cihub/seelog" + docker "github.com/fsouza/go-dockerclient" + "github.com/osrg/earthquake/earthquake/util/config" +) + +func StartEarthquakeRoutines(c *docker.Container, cfg config.Config) error { + log.Debugf("Starting Orchestrator") + go func() { + oerr := StartOrchestrator(cfg) + if oerr != nil { + panic(log.Critical(oerr)) + } + }() + + if cfg.GetBool("containerParam.enableEthernetInspector") { + nfqNum := cfg.GetInt("containerParam.ethernetNFQNumber") + if nfqNum <= 0 { + return fmt.Errorf("strange containerParam.ethernetNFQNumber: %d", nfqNum) + } + log.Debugf("Configuring NFQUEUE %d for container %s", nfqNum, c.ID) + err := SetupNFQUEUE(c, nfqNum, false, false) + if err != nil { + return err + } + log.Debugf("Starting Ethernet Inspector") + go func() { + ierr := StartEthernetInspector(c, nfqNum) + if ierr != nil { + panic(log.Critical(ierr)) + } + }() + } + + if cfg.GetBool("containerParam.enableProcInspector") { + watchInterval := cfg.GetDuration("containerParam.procWatchInterval") + if watchInterval <= 0 { + return fmt.Errorf("strange containerParam.procWatchInterval: %s", watchInterval) + } + log.Debugf("Starting Process Inspector") + go func() { + ierr := StartProcInspector(c, watchInterval) + if ierr != nil { + panic(log.Critical(ierr)) + } + }() + } + + return nil +} diff --git a/earthquake/cli/inspectors/ethernet.go b/earthquake/cli/inspectors/ethernet.go index c2f15b5..a9b1f25 100644 --- a/earthquake/cli/inspectors/ethernet.go +++ b/earthquake/cli/inspectors/ethernet.go @@ -17,12 +17,11 @@ package inspectors import ( "flag" - - // "fmt" log "github.com/cihub/seelog" "github.com/mitchellh/cli" inspector "github.com/osrg/earthquake/earthquake/inspector/ethernet" - // restutil "github.com/osrg/earthquake/earthquake/util/rest" + "github.com/osrg/earthquake/earthquake/util/config" + ocutil "github.com/osrg/earthquake/earthquake/util/orchestrator" ) type etherFlags struct { @@ -40,7 +39,7 @@ var ( func init() { etherFlagset.StringVar(&_etherFlags.OrchestratorURL, "orchestrator-url", - defaultOrchestratorURL, "orchestrator rest url") + ocutil.LocalOrchestratorURL, "orchestrator rest url") etherFlagset.StringVar(&_etherFlags.EntityID, "entity-id", "_earthquake_ethernet_inspector", "Entity ID") etherFlagset.StringVar(&_etherFlags.HookSwitchZMQAddr, "hookswitch", @@ -87,13 +86,17 @@ func runEtherInspector(args []string) int { return 1 } - if _etherFlags.AutopilotConfig != "" && _etherFlags.OrchestratorURL != defaultOrchestratorURL { + if _etherFlags.AutopilotConfig != "" && _etherFlags.OrchestratorURL != ocutil.LocalOrchestratorURL { log.Critical("non-default orchestrator url set for autopilot orchestration mode") return 1 } if _etherFlags.AutopilotConfig != "" { - autopilotOrchestrator, err := NewAutopilotOrchestrator(_etherFlags.AutopilotConfig) + cfg, err := config.NewFromFile(_etherFlags.AutopilotConfig) + if err != nil { + panic(log.Critical(err)) + } + autopilotOrchestrator, err := ocutil.NewAutopilotOrchestrator(cfg) if err != nil { panic(log.Critical(err)) } diff --git a/earthquake/cli/inspectors/fs.go b/earthquake/cli/inspectors/fs.go index cf62d64..980f27b 100644 --- a/earthquake/cli/inspectors/fs.go +++ b/earthquake/cli/inspectors/fs.go @@ -17,12 +17,12 @@ package inspectors import ( "flag" - - //"fmt" log "github.com/cihub/seelog" "github.com/mitchellh/cli" inspector "github.com/osrg/earthquake/earthquake/inspector/fs" + "github.com/osrg/earthquake/earthquake/util/config" logutil "github.com/osrg/earthquake/earthquake/util/log" + ocutil "github.com/osrg/earthquake/earthquake/util/orchestrator" "github.com/osrg/hookfs/hookfs" ) @@ -40,7 +40,7 @@ var ( ) func init() { - fsFlagset.StringVar(&_fsFlags.OrchestratorURL, "orchestrator-url", defaultOrchestratorURL, "orchestrator rest url") + fsFlagset.StringVar(&_fsFlags.OrchestratorURL, "orchestrator-url", ocutil.LocalOrchestratorURL, "orchestrator rest url") fsFlagset.StringVar(&_fsFlags.EntityID, "entity-id", "_earthquake_fs_inspector", "Entity ID") fsFlagset.StringVar(&_fsFlags.OriginalDir, "original-dir", "", "FUSE Original Directory") fsFlagset.StringVar(&_fsFlags.Mountpoint, "mount-point", "", "FUSE Mount Point") @@ -83,7 +83,7 @@ func runFsInspector(args []string) int { return 1 } - if _fsFlags.AutopilotConfig != "" && _fsFlags.OrchestratorURL != defaultOrchestratorURL { + if _fsFlags.AutopilotConfig != "" && _fsFlags.OrchestratorURL != ocutil.LocalOrchestratorURL { log.Critical("non-default orchestrator url set for autopilot orchestration mode") return 1 } @@ -96,7 +96,11 @@ func runFsInspector(args []string) int { } if _fsFlags.AutopilotConfig != "" { - autopilotOrchestrator, err := NewAutopilotOrchestrator(_fsFlags.AutopilotConfig) + cfg, err := config.NewFromFile(_fsFlags.AutopilotConfig) + if err != nil { + panic(log.Critical(err)) + } + autopilotOrchestrator, err := ocutil.NewAutopilotOrchestrator(cfg) if err != nil { panic(log.Critical(err)) } diff --git a/earthquake/cli/inspectors/proc.go b/earthquake/cli/inspectors/proc.go index 9a72edb..f84af18 100644 --- a/earthquake/cli/inspectors/proc.go +++ b/earthquake/cli/inspectors/proc.go @@ -20,6 +20,8 @@ import ( log "github.com/cihub/seelog" "github.com/mitchellh/cli" inspector "github.com/osrg/earthquake/earthquake/inspector/proc" + "github.com/osrg/earthquake/earthquake/util/config" + ocutil "github.com/osrg/earthquake/earthquake/util/orchestrator" "time" ) @@ -37,7 +39,7 @@ var ( ) func init() { - procFlagset.StringVar(&_procFlags.OrchestratorURL, "orchestrator-url", defaultOrchestratorURL, "orchestrator rest url") + procFlagset.StringVar(&_procFlags.OrchestratorURL, "orchestrator-url", ocutil.LocalOrchestratorURL, "orchestrator rest url") procFlagset.StringVar(&_procFlags.EntityID, "entity-id", "_earthquake_proc_inspector", "Entity ID") procFlagset.IntVar(&_procFlags.RootPID, "root-pid", -1, "PID for the target process tree") procFlagset.DurationVar(&_procFlags.WatchInterval, "watch-interval", 1*time.Second, "Watching interval") @@ -75,13 +77,17 @@ func runProcInspector(args []string) int { return 1 } - if _procFlags.AutopilotConfig != "" && _procFlags.OrchestratorURL != defaultOrchestratorURL { + if _procFlags.AutopilotConfig != "" && _procFlags.OrchestratorURL != ocutil.LocalOrchestratorURL { log.Critical("non-default orchestrator url set for autopilot orchestration mode") return 1 } if _procFlags.AutopilotConfig != "" { - autopilotOrchestrator, err := NewAutopilotOrchestrator(_procFlags.AutopilotConfig) + cfg, err := config.NewFromFile(_procFlags.AutopilotConfig) + if err != nil { + panic(log.Critical(err)) + } + autopilotOrchestrator, err := ocutil.NewAutopilotOrchestrator(cfg) if err != nil { panic(log.Critical(err)) } diff --git a/earthquake/explorepolicy/random/randompolicy.go b/earthquake/explorepolicy/random/randompolicy.go index 3b24c0a..118b56f 100644 --- a/earthquake/explorepolicy/random/randompolicy.go +++ b/earthquake/explorepolicy/random/randompolicy.go @@ -218,6 +218,7 @@ func (r *Random) dequeueEventRoutine() { qItem := <-r.queueDeqCh event := qItem.Value().(signal.Event) action, err := r.makeActionForEvent(event) + log.Debugf("RANDOM: Determined action %#v for event %#v", action, event) if err != nil { panic(log.Critical(err)) } diff --git a/earthquake/explorepolicy/random/randomproc.go b/earthquake/explorepolicy/random/randomproc.go index 48f1518..ee2a752 100644 --- a/earthquake/explorepolicy/random/randomproc.go +++ b/earthquake/explorepolicy/random/randomproc.go @@ -30,33 +30,35 @@ var ( ) func (r *Random) makeActionForProcSetEvent(event *signal.ProcSetEvent) (signal.Action, error) { - xprocs, ok := event.Option()["procs"].([]interface{}) - if !ok { - return nil, fmt.Errorf("no procs? this should be an implementation error. event=%#v", event) - } - - // due to JSON nature, we need to convert []interface{} to []string here - procs := []string{} - for _, xproc := range xprocs { - proc, ok := xproc.(string) - if !ok { - return nil, fmt.Errorf("non-string %#v", xproc) - } - procs = append(procs, proc) + procs, err := r.parseProcSetEvent(event) + if err != nil { + return nil, err } - attrs := r.dirichletSchedDeadline(procs, time.Millisecond, 1.0) for pidStr, attr := range attrs { - log.Infof("For PID=%s, setting Attr=%v", pidStr, attr) + log.Debugf("For PID=%s, setting Attr=%v", pidStr, attr) } return signal.NewProcSetSchedAction(event, attrs) } +// due to JSON nature, we use string for PID representation +func (r *Random) parseProcSetEvent(event *signal.ProcSetEvent) ([]string, error) { + option := event.Option() + procs, ok := option["procs"].([]string) + if !ok { + // FIXME: this may not work with REST endpoint. + // we need to convert []interface{} to []string here + return nil, fmt.Errorf("no procs? this should be an implementation error. event=%#v", event) + } + return procs, nil +} + // due to JSON nature, we use string for PID representation func (r *Random) dirichletSchedDeadline(procs []string, base time.Duration, eff float64) map[string]linuxsched.SchedAttr { attrs := make(map[string]linuxsched.SchedAttr, len(procs)) ratios := drng.FlatDirichlet(len(procs)) for i, pidStr := range procs { + // FIXME: we should obtain actual available NumCPU for the PID rather than runtime.NumCPU() numCPU := runtime.NumCPU() runtime := time.Duration(int(float64(base) * ratios[i] * eff * float64(numCPU))) deadline := base diff --git a/earthquake/inspector/ethernet/ethernet_hookswitch.go b/earthquake/inspector/ethernet/ethernet_hookswitch.go index ebf4393..d060b3f 100644 --- a/earthquake/inspector/ethernet/ethernet_hookswitch.go +++ b/earthquake/inspector/ethernet/ethernet_hookswitch.go @@ -23,8 +23,8 @@ import ( "github.com/google/gopacket/layers" "github.com/osrg/earthquake/earthquake/inspector/ethernet/hookswitch" "github.com/osrg/earthquake/earthquake/inspector/ethernet/tcpwatcher" + "github.com/osrg/earthquake/earthquake/inspector/transceiver" "github.com/osrg/earthquake/earthquake/signal" - restutil "github.com/osrg/earthquake/earthquake/util/rest" zmq "github.com/vaughan0/go-zmq" ) @@ -34,7 +34,7 @@ type HookSwitchInspector struct { EntityID string HookSwitchZMQAddr string EnableTCPWatcher bool - trans *restutil.Transceiver + trans transceiver.Transceiver zmqChannels *zmq.Channels tcpWatcher *tcpwatcher.TCPWatcher } @@ -47,7 +47,7 @@ func (this *HookSwitchInspector) Start() error { this.tcpWatcher = tcpwatcher.New() } - this.trans, err = restutil.NewTransceiver(this.OrchestratorURL, this.EntityID) + this.trans, err = transceiver.NewTransceiver(this.OrchestratorURL, this.EntityID) if err != nil { return err } diff --git a/earthquake/inspector/ethernet/ethernet_nfq.go b/earthquake/inspector/ethernet/ethernet_nfq.go index 74d0105..6f06870 100644 --- a/earthquake/inspector/ethernet/ethernet_nfq.go +++ b/earthquake/inspector/ethernet/ethernet_nfq.go @@ -21,8 +21,8 @@ import ( log "github.com/cihub/seelog" "github.com/google/gopacket/layers" "github.com/osrg/earthquake/earthquake/inspector/ethernet/tcpwatcher" + "github.com/osrg/earthquake/earthquake/inspector/transceiver" "github.com/osrg/earthquake/earthquake/signal" - restutil "github.com/osrg/earthquake/earthquake/util/rest" ) // TODO: support user-written MapPacketToEventFunc @@ -31,7 +31,7 @@ type NFQInspector struct { EntityID string NFQNumber uint16 EnableTCPWatcher bool - trans *restutil.Transceiver + trans transceiver.Transceiver tcpWatcher *tcpwatcher.TCPWatcher } @@ -43,7 +43,7 @@ func (this *NFQInspector) Start() error { this.tcpWatcher = tcpwatcher.New() } - this.trans, err = restutil.NewTransceiver(this.OrchestratorURL, this.EntityID) + this.trans, err = transceiver.NewTransceiver(this.OrchestratorURL, this.EntityID) if err != nil { return err } diff --git a/earthquake/inspector/fs/fs.go b/earthquake/inspector/fs/fs.go index 11bc091..aced671 100644 --- a/earthquake/inspector/fs/fs.go +++ b/earthquake/inspector/fs/fs.go @@ -18,8 +18,8 @@ package fs import ( "fmt" log "github.com/cihub/seelog" + "github.com/osrg/earthquake/earthquake/inspector/transceiver" . "github.com/osrg/earthquake/earthquake/signal" - . "github.com/osrg/earthquake/earthquake/util/rest" "github.com/osrg/hookfs/hookfs" "syscall" ) @@ -33,7 +33,7 @@ type EQFSHookContext struct { type FilesystemInspector struct { OrchestratorURL string EntityID string - trans *Transceiver + trans transceiver.Transceiver } func (this *FilesystemInspector) String() string { @@ -44,7 +44,7 @@ func (this *FilesystemInspector) String() string { func (this *FilesystemInspector) Init() error { log.Debugf("Initializing FS Inspector %#v", this) var err error - this.trans, err = NewTransceiver(this.OrchestratorURL, this.EntityID) + this.trans, err = transceiver.NewTransceiver(this.OrchestratorURL, this.EntityID) if err != nil { return err } diff --git a/earthquake/inspector/proc/proc.go b/earthquake/inspector/proc/proc.go index dc3145e..908afe8 100644 --- a/earthquake/inspector/proc/proc.go +++ b/earthquake/inspector/proc/proc.go @@ -19,10 +19,9 @@ import ( "fmt" "github.com/AkihiroSuda/go-linuxsched" log "github.com/cihub/seelog" - "github.com/mitchellh/mapstructure" + "github.com/osrg/earthquake/earthquake/inspector/transceiver" "github.com/osrg/earthquake/earthquake/signal" procutil "github.com/osrg/earthquake/earthquake/util/proc" - restutil "github.com/osrg/earthquake/earthquake/util/rest" "strconv" "time" ) @@ -32,14 +31,14 @@ type ProcInspector struct { EntityID string RootPID int WatchInterval time.Duration - trans *restutil.Transceiver + trans transceiver.Transceiver } func (this *ProcInspector) Start() error { log.Debugf("Initializing Process Inspector %#v", this) var err error - this.trans, err = restutil.NewTransceiver(this.OrchestratorURL, this.EntityID) + this.trans, err = transceiver.NewTransceiver(this.OrchestratorURL, this.EntityID) if err != nil { return err } @@ -49,7 +48,9 @@ func (this *ProcInspector) Start() error { <-time.After(this.WatchInterval) procs, err := procutil.DescendantLWPs(this.RootPID) if err != nil { - log.Error(err) + // this happens frequently, but does not matter. + // e.g. "open /proc/11193/task/11193/children: no such file or directory" + log.Warn(err) continue } if err = this.onWatch(procs); err != nil { @@ -84,26 +85,25 @@ func (this *ProcInspector) onWatch(procs []int) error { } func (this *ProcInspector) onAction(action *signal.ProcSetSchedAction) error { - xattrs, ok := action.Option()["attrs"].(map[string]interface{}) + // FIXME: this may not work with REST endpoint. + // we need to convert interface{} to linuxsched.SchedAttr here + + attrs, ok := action.Option()["attrs"].(map[string]linuxsched.SchedAttr) if !ok { return fmt.Errorf("no attrs? this should be an implementation error. action=%#v", action) } - for pidStr, xattr := range xattrs { - // due to JSON nature, we need to convert interface{} to linuxsched.SchedAttr here - var attr linuxsched.SchedAttr - err := mapstructure.Decode(xattr, &attr) - if err != nil { - return err - } + for pidStr, attr := range attrs { + // due to JSON nature, we use string for PID representation pid, err := strconv.Atoi(pidStr) if err != nil { log.Warnf("Non PID string: %s", pidStr) continue } if warn := linuxsched.SetAttr(pid, attr); warn != nil { - // this happens frequently, but does not matter - log.Warnf("could not apply %#v to %d: %s", attr, pid, warn) + // this happens frequently, but does not matter. + // so use log.Debugf rather than log.Warnf + log.Debugf("could not apply %#v to %d: %s", attr, pid, warn) } } return nil diff --git a/earthquake/inspector/transceiver/localtransceiver.go b/earthquake/inspector/transceiver/localtransceiver.go new file mode 100644 index 0000000..886a58e --- /dev/null +++ b/earthquake/inspector/transceiver/localtransceiver.go @@ -0,0 +1,90 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transceiver + +import ( + "fmt" + log "github.com/cihub/seelog" + "github.com/osrg/earthquake/earthquake/inspectorhandler" + . "github.com/osrg/earthquake/earthquake/signal" + "sync" +) + +type LocalTransceiver struct { + EntityID string + m map[string]chan Action // key: event id + mMutex sync.Mutex +} + +func NewLocalTransceiver(entityID string) (Transceiver, error) { + t := LocalTransceiver{ + EntityID: entityID, + m: make(map[string]chan Action), + mMutex: sync.Mutex{}, + } + return &t, nil +} + +func (this *LocalTransceiver) SendEvent(event Event) (chan Action, error) { + if event.EntityID() != this.EntityID { + return nil, fmt.Errorf("bad entity id for event %s (want %s)", event, this.EntityID) + } + ch := make(chan Action) + this.mMutex.Lock() + // put ch to m BEFORE calling SendEvent(), otherwise race may occur + this.m[event.ID()] = ch + this.mMutex.Unlock() + go func() { + inspectorhandler.GlobalLocalInspectorHandler.EventChan <- event + }() + return ch, nil +} + +func (this *LocalTransceiver) onAction(action Action) error { + event := action.Event() + if event == nil { + return fmt.Errorf("No event found for action %s", action) + } + this.mMutex.Lock() + defer this.mMutex.Unlock() + actionChan, ok := this.m[event.ID()] + if !ok { + return fmt.Errorf("No channel found for action %s (event id=%s)", action, event.ID()) + } + delete(this.m, event.ID()) + go func() { + actionChan <- action + }() + return nil +} + +func (this *LocalTransceiver) routine() { + onActionError := func(err error) { + log.Error(err) + } + for { + action := <-inspectorhandler.GlobalLocalInspectorHandler.ActionChan + err := this.onAction(action) + if err != nil { + onActionError(err) + continue + } + } +} + +func (this *LocalTransceiver) Start() { + go this.routine() +} diff --git a/earthquake/util/rest/clientutil.go b/earthquake/inspector/transceiver/resttransceiver.go similarity index 85% rename from earthquake/util/rest/clientutil.go rename to earthquake/inspector/transceiver/resttransceiver.go index 9219908..d0bc320 100644 --- a/earthquake/util/rest/clientutil.go +++ b/earthquake/inspector/transceiver/resttransceiver.go @@ -13,7 +13,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -package rest +package transceiver import ( "bytes" @@ -31,7 +31,7 @@ import ( // idempotent (RFC 7231) // // client can be http.DefaultClient in most cases -func DeleteAction(client *http.Client, ocURL string, action Action) error { +func deleteAction(client *http.Client, ocURL string, action Action) error { url := ocURL + "/actions/" + action.EntityID() + "/" + action.ID() log.Debugf("REST deleting action of %s", action, url) req, err := http.NewRequest("DELETE", url, nil) @@ -53,7 +53,7 @@ func DeleteAction(client *http.Client, ocURL string, action Action) error { } // client can be http.DefaultClient in most cases -func SendEvent(client *http.Client, ocURL string, event Event) error { +func sendEvent(client *http.Client, ocURL string, event Event) error { jsonStr, err := json.Marshal(event.JSONMap()) if err != nil { return err @@ -75,7 +75,7 @@ func SendEvent(client *http.Client, ocURL string, event Event) error { } // client can be http.DefaultClient in most cases -func GetAction(client *http.Client, ocURL string, entityID string) (Action, error) { +func getAction(client *http.Client, ocURL string, entityID string) (Action, error) { url := ocURL + "/actions/" + entityID log.Debugf("REST getting action %s", url) resp, err := client.Get(url) @@ -96,8 +96,7 @@ func GetAction(client *http.Client, ocURL string, entityID string) (Action, erro return action, nil } -// util for SendEvent/GetAction/DeleteAction -type Transceiver struct { +type RESTTransceiver struct { OrchestratorURL string EntityID string Client *http.Client @@ -105,8 +104,8 @@ type Transceiver struct { mMutex sync.Mutex } -func NewTransceiver(orchestratorURL string, entityID string) (*Transceiver, error) { - t := Transceiver{ +func NewRESTTransceiver(orchestratorURL string, entityID string) (Transceiver, error) { + t := RESTTransceiver{ OrchestratorURL: orchestratorURL, EntityID: entityID, Client: http.DefaultClient, @@ -116,7 +115,7 @@ func NewTransceiver(orchestratorURL string, entityID string) (*Transceiver, erro return &t, nil } -func (this *Transceiver) SendEvent(event Event) (chan Action, error) { +func (this *RESTTransceiver) SendEvent(event Event) (chan Action, error) { if event.EntityID() != this.EntityID { return nil, fmt.Errorf("bad entity id for event %s (want %s)", event, this.EntityID) } @@ -125,7 +124,7 @@ func (this *Transceiver) SendEvent(event Event) (chan Action, error) { // put ch to m BEFORE calling SendEvent(), otherwise race may occur this.m[event.ID()] = ch this.mMutex.Unlock() - err := SendEvent(this.Client, this.OrchestratorURL, event) + err := sendEvent(this.Client, this.OrchestratorURL, event) if err != nil { this.mMutex.Lock() delete(this.m, event.ID()) @@ -135,7 +134,7 @@ func (this *Transceiver) SendEvent(event Event) (chan Action, error) { return ch, nil } -func (this *Transceiver) onAction(action Action) error { +func (this *RESTTransceiver) onAction(action Action) error { event := action.Event() if event == nil { return fmt.Errorf("No event found for action %s", action) @@ -153,7 +152,7 @@ func (this *Transceiver) onAction(action Action) error { return nil } -func (this *Transceiver) routine() { +func (this *RESTTransceiver) routine() { errors := 0 onHTTPError := func(err error) { log.Error(err) @@ -164,12 +163,12 @@ func (this *Transceiver) routine() { log.Error(err) } for { - action, err := GetAction(this.Client, this.OrchestratorURL, this.EntityID) + action, err := getAction(this.Client, this.OrchestratorURL, this.EntityID) if err != nil { onHTTPError(err) continue } - err = DeleteAction(this.Client, this.OrchestratorURL, action) + err = deleteAction(this.Client, this.OrchestratorURL, action) if err != nil { onHTTPError(err) continue @@ -183,6 +182,6 @@ func (this *Transceiver) routine() { } } -func (this *Transceiver) Start() { +func (this *RESTTransceiver) Start() { go this.routine() } diff --git a/earthquake/inspector/transceiver/transceiver.go b/earthquake/inspector/transceiver/transceiver.go new file mode 100644 index 0000000..717f0d2 --- /dev/null +++ b/earthquake/inspector/transceiver/transceiver.go @@ -0,0 +1,37 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package transceiver + +import ( + "fmt" + "github.com/osrg/earthquake/earthquake/signal" + "strings" +) + +type Transceiver interface { + SendEvent(event signal.Event) (chan signal.Action, error) + Start() +} + +func NewTransceiver(orchestratorURL string, entityID string) (Transceiver, error) { + if strings.HasPrefix(orchestratorURL, "local://") { + return NewLocalTransceiver(entityID) + } else if strings.HasPrefix(orchestratorURL, "http://") { + return NewRESTTransceiver(orchestratorURL, entityID) + } else { + return nil, fmt.Errorf("strange orchestrator url: %s", orchestratorURL) + } +} diff --git a/earthquake/inspectorhandler/inspectorhandler.go b/earthquake/inspectorhandler/inspectorhandler.go index 3a19763..ede51ad 100644 --- a/earthquake/inspectorhandler/inspectorhandler.go +++ b/earthquake/inspectorhandler/inspectorhandler.go @@ -16,7 +16,9 @@ package inspectorhandler import ( + log "github.com/cihub/seelog" . "github.com/osrg/earthquake/earthquake/entity" + . "github.com/osrg/earthquake/earthquake/inspectorhandler/localinspectorhandler" . "github.com/osrg/earthquake/earthquake/inspectorhandler/pbinspectorhandler" . "github.com/osrg/earthquake/earthquake/inspectorhandler/restinspectorhandler" . "github.com/osrg/earthquake/earthquake/util/config" @@ -26,7 +28,29 @@ type InspectorHandler interface { StartAccept(readyEntityCh chan *TransitionEntity) } -func StartAllInspectorHandler(readyEntityCh chan *TransitionEntity, cfg Config) { - go NewPBInspectorHanlder(cfg).StartAccept(readyEntityCh) - go NewRESTInspectorHanlder(cfg).StartAccept(readyEntityCh) +var ( + GlobalLocalInspectorHandler = NewLocalInspectorHandler() +) + +func StartInspectorHandlers(readyEntityCh chan *TransitionEntity, cfg Config) { + GlobalLocalInspectorHandler.StartAccept(readyEntityCh) + + if cfg.IsSet("pbPort") { + pbPort := cfg.GetInt("pbPort") + if pbPort > 0 { + go NewPBInspectorHanlder(pbPort).StartAccept(readyEntityCh) + } else if pbPort < 0 { + log.Warnf("ignoring negative pbPort: %d", pbPort) + } + } + + if cfg.IsSet("restPort") { + restPort := cfg.GetInt("restPort") + if restPort > 0 { + go NewRESTInspectorHanlder(restPort).StartAccept(readyEntityCh) + } else if restPort < 0 { + log.Warnf("ignoring restPort: %d", restPort) + } + } + } diff --git a/earthquake/inspectorhandler/localinspectorhandler/localinspectorhandler.go b/earthquake/inspectorhandler/localinspectorhandler/localinspectorhandler.go new file mode 100644 index 0000000..4cb1db1 --- /dev/null +++ b/earthquake/inspectorhandler/localinspectorhandler/localinspectorhandler.go @@ -0,0 +1,77 @@ +// Copyright (C) 2015 Nippon Telegraph and Telephone Corporation. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package localinspectorhandler + +import ( + log "github.com/cihub/seelog" + . "github.com/osrg/earthquake/earthquake/entity" + . "github.com/osrg/earthquake/earthquake/signal" +) + +type LocalInspectorHandler struct { + EventChan chan Event + ActionChan chan Action +} + +func (handler *LocalInspectorHandler) handleConn(readyEntityCh chan *TransitionEntity) { + for { + log.Debugf("Handler<-Inspector: receiving an event") + event := <-handler.EventChan + entityID := event.EntityID() + log.Debugf("Handler[%s]<-Inspector: received an event %s", entityID, event) + entity := GetTransitionEntity(entityID) + if entity == nil { + // FIXME: orchestrator requires this registration - can we remove this? + entity = &TransitionEntity{ + ID: entityID, + ActionFromMain: make(chan Action), + EventToMain: make(chan Event), + } + err := RegisterTransitionEntity(entity) + if err != nil { + panic(log.Critical(err)) + } + log.Debugf("Handler: initialized the entity structure", entityID) + } + + log.Debugf("Handler[%s]->Main: sending an event %s", entityID, event) + // FIXME: can we make this buffered? + go func() { + entity.EventToMain <- event + }() + readyEntityCh <- entity + log.Debugf("Handler[%s]->Main: sent an event %s", entityID, event) + + log.Debugf("Handler[%s]<-Main: receiving an action", entityID) + action := <-entity.ActionFromMain + log.Debugf("Handler[%s]<-Main: received an action %s", entityID, action) + + log.Debugf("Handler[%s]->Inspector: sending an action %s", entityID, action) + handler.ActionChan <- action + log.Debugf("Handler[%s]->Inspector: sent an action %s", entityID, action) + } // for +} // func + +func (handler *LocalInspectorHandler) StartAccept(readyEntityCh chan *TransitionEntity) { + go handler.handleConn(readyEntityCh) +} + +func NewLocalInspectorHandler() *LocalInspectorHandler { + return &LocalInspectorHandler{ + EventChan: make(chan Event), + ActionChan: make(chan Action), + } +} diff --git a/earthquake/inspectorhandler/pbinspectorhandler/pbinspectorhandler.go b/earthquake/inspectorhandler/pbinspectorhandler/pbinspectorhandler.go index b9d2d28..f51ab74 100644 --- a/earthquake/inspectorhandler/pbinspectorhandler/pbinspectorhandler.go +++ b/earthquake/inspectorhandler/pbinspectorhandler/pbinspectorhandler.go @@ -24,11 +24,12 @@ import ( log "github.com/cihub/seelog" . "github.com/osrg/earthquake/earthquake/entity" . "github.com/osrg/earthquake/earthquake/signal" - . "github.com/osrg/earthquake/earthquake/util/config" . "github.com/osrg/earthquake/earthquake/util/pb" ) -type PBInspectorHandler struct{} +type PBInspectorHandler struct { + Port int +} func recvPBMsgViaChan(conn net.Conn, eventReqRecv chan *InspectorMsgReq) { for { @@ -130,7 +131,7 @@ func (handler *PBInspectorHandler) handleConn(conn net.Conn, readyEntityCh chan } // func func (handler *PBInspectorHandler) StartAccept(readyEntityCh chan *TransitionEntity) { - sport := fmt.Sprintf(":%d", 10000) // FIXME (config.GetInt("inspectorHandler.pb.port")) + sport := fmt.Sprintf(":%d", handler.Port) ln, err := net.Listen("tcp", sport) if err != nil { panic(log.Criticalf("failed to listen on port %s: %s", sport, err)) @@ -147,6 +148,8 @@ func (handler *PBInspectorHandler) StartAccept(readyEntityCh chan *TransitionEnt } } -func NewPBInspectorHanlder(cfg Config) *PBInspectorHandler { - return &PBInspectorHandler{} +func NewPBInspectorHanlder(port int) *PBInspectorHandler { + return &PBInspectorHandler{ + Port: port, + } } diff --git a/earthquake/inspectorhandler/restinspectorhandler/restinspectorhandler.go b/earthquake/inspectorhandler/restinspectorhandler/restinspectorhandler.go index f8de387..4e5b293 100644 --- a/earthquake/inspectorhandler/restinspectorhandler/restinspectorhandler.go +++ b/earthquake/inspectorhandler/restinspectorhandler/restinspectorhandler.go @@ -28,7 +28,6 @@ import ( . "github.com/osrg/earthquake/earthquake/entity" . "github.com/osrg/earthquake/earthquake/inspectorhandler/restinspectorhandler/queue" . "github.com/osrg/earthquake/earthquake/signal" - "github.com/osrg/earthquake/earthquake/util/config" restutil "github.com/osrg/earthquake/earthquake/util/rest" "runtime" ) @@ -150,14 +149,15 @@ func actionsOnDelete(w http.ResponseWriter, r *http.Request) { } } -type RESTInspectorHandler struct{} +type RESTInspectorHandler struct { + Port int +} func (handler *RESTInspectorHandler) StartAccept(readyEntityCh chan *TransitionEntity) { mainReadyEntityCh = readyEntityCh - sport := fmt.Sprintf(":%d", restutil.DefaultPort) // FIXME + sport := fmt.Sprintf(":%d", handler.Port) apiRoot := restutil.APIRoot log.Debugf("REST API root=%s%s", sport, apiRoot) - // TODO: mux should be split from this class router := mux.NewRouter().StrictSlash(true) router.HandleFunc("/", rootOnGet).Methods("GET") router.HandleFunc(path.Join(apiRoot, "/events/{entity_id}/{event_uuid}"), eventsOnPost).Methods("POST") @@ -170,6 +170,8 @@ func (handler *RESTInspectorHandler) StartAccept(readyEntityCh chan *TransitionE } } -func NewRESTInspectorHanlder(cfg config.Config) *RESTInspectorHandler { - return &RESTInspectorHandler{} +func NewRESTInspectorHanlder(port int) *RESTInspectorHandler { + return &RESTInspectorHandler{ + Port: port, + } } diff --git a/earthquake/orchestrator/orchestrator.go b/earthquake/orchestrator/orchestrator.go index c8a53af..b4ba6bb 100644 --- a/earthquake/orchestrator/orchestrator.go +++ b/earthquake/orchestrator/orchestrator.go @@ -101,7 +101,7 @@ func (this *Orchestrator) doDefaultAction(event Event) { func (this *Orchestrator) Start() { readyEntityCh := make(chan *TransitionEntity) - StartAllInspectorHandler(readyEntityCh, this.cfg) + StartInspectorHandlers(readyEntityCh, this.cfg) policyNextActionChan := this.policy.GetNextActionChan() running := true log.Debugf("Main[running=%t]<-ExplorePolicy: receiving an action", running) diff --git a/earthquake/signal/event_filesystem.go b/earthquake/signal/event_filesystem.go index 3f5925d..e0e16d9 100644 --- a/earthquake/signal/event_filesystem.go +++ b/earthquake/signal/event_filesystem.go @@ -35,13 +35,13 @@ const ( ) func NewFilesystemEvent(entityID string, op FilesystemOp, path string, m map[string]interface{}) (Event, error) { - action := &FilesystemEvent{} - action.InitSignal() - action.SetID(uuid.NewV4().String()) - action.SetEntityID(entityID) - action.SetType("event") - action.SetClass("FilesystemEvent") - action.SetDeferred(true) + event := &FilesystemEvent{} + event.InitSignal() + event.SetID(uuid.NewV4().String()) + event.SetEntityID(entityID) + event.SetType("event") + event.SetClass("FilesystemEvent") + event.SetDeferred(true) opt := map[string]interface{}{ "op": op, "path": path, @@ -49,8 +49,8 @@ func NewFilesystemEvent(entityID string, op FilesystemOp, path string, m map[str for k, v := range m { opt[k] = v } - action.SetOption(opt) - return action, nil + event.SetOption(opt) + return event, nil } // implements Event diff --git a/earthquake/signal/event_packet.go b/earthquake/signal/event_packet.go index c64e824..ba4f42b 100644 --- a/earthquake/signal/event_packet.go +++ b/earthquake/signal/event_packet.go @@ -23,13 +23,13 @@ type PacketEvent struct { } func NewPacketEvent(entityID, srcEntityID, dstEntityID string, m map[string]interface{}) (Event, error) { - action := &PacketEvent{} - action.InitSignal() - action.SetID(uuid.NewV4().String()) - action.SetEntityID(entityID) - action.SetType("event") - action.SetClass("PacketEvent") - action.SetDeferred(true) + event := &PacketEvent{} + event.InitSignal() + event.SetID(uuid.NewV4().String()) + event.SetEntityID(entityID) + event.SetType("event") + event.SetClass("PacketEvent") + event.SetDeferred(true) opt := map[string]interface{}{ "src_entity": srcEntityID, "dst_entity": dstEntityID, @@ -37,8 +37,8 @@ func NewPacketEvent(entityID, srcEntityID, dstEntityID string, m map[string]inte for k, v := range m { opt[k] = v } - action.SetOption(opt) - return action, nil + event.SetOption(opt) + return event, nil } // implements Event diff --git a/earthquake/signal/event_procset.go b/earthquake/signal/event_procset.go index 17eaf5b..4b6a286 100644 --- a/earthquake/signal/event_procset.go +++ b/earthquake/signal/event_procset.go @@ -23,21 +23,21 @@ type ProcSetEvent struct { } func NewProcSetEvent(entityID string, procs []string, m map[string]interface{}) (Event, error) { - action := &FilesystemEvent{} - action.InitSignal() - action.SetID(uuid.NewV4().String()) - action.SetEntityID(entityID) - action.SetType("event") - action.SetClass("ProcSetEvent") - action.SetDeferred(false) + event := &ProcSetEvent{} + event.InitSignal() + event.SetID(uuid.NewV4().String()) + event.SetEntityID(entityID) + event.SetType("event") + event.SetClass("ProcSetEvent") + event.SetDeferred(false) opt := map[string]interface{}{ "procs": procs, } for k, v := range m { opt[k] = v } - action.SetOption(opt) - return action, nil + event.SetOption(opt) + return event, nil } // implements Event diff --git a/earthquake/util/config/config.go b/earthquake/util/config/config.go index aa8f9c6..14cba34 100644 --- a/earthquake/util/config/config.go +++ b/earthquake/util/config/config.go @@ -16,8 +16,10 @@ package config import ( + "github.com/kr/pretty" "github.com/spf13/viper" "strings" + "time" ) type Config struct { @@ -29,39 +31,67 @@ func New() Config { ///// INIT and RUN // Used for "init" command. + // "earthquake-container" ignores this. // e.g. "init.sh" cfg.SetDefault("init", "") // Used for "run" command. + // "earthquake-container" ignores this. // e.g. "run.sh" cfg.SetDefault("run", "") // Used for "run" command. + // "earthquake-container" ignores this. // e.g. "clean.sh" cfg.SetDefault("clean", "") // Used for "run" command. + // "earthquake-container" ignores this. // e.g. "validate.sh" cfg.SetDefault("validate", "") + // Used for something deprecated? // if true, skip clean.sh when validate.sh failed. + // "earthquake-container" ignores this. cfg.SetDefault("notCleanIfValidationFail", false) ///// STORAGE + // "earthquake-container" ignores this. // Used for "run" command cfg.SetDefault("storageType", "naive") + ///// INSPECTOR HANDLER ENDPOINT + // "earthquake-container" ignores these values. + // Used for PB inspector handler (used by Java and C inspector) + // if non-positive, PB inspector handler is disabled + // e.g. 10000 + cfg.SetDefault("pbPort", 0) + + // Used for REST inspector handler + // if non-positive, REST inspector handler is disabled + // e.g. 10080 + cfg.SetDefault("restPort", 0) + ///// EXPLORATION POLICY // "earthquake-container" also uses these params cfg.SetDefault("explorePolicy", "random") cfg.SetDefault("explorePolicyParam", map[string]interface{}{}) + + ///// CONTAINER + // used only in "earthquake-container" + cfg.SetDefault("containerParam", map[string]interface{}{ + "enableEthernetInspector": false, + "enableProcInspector": true, + "ethernetNFQNumber": 42, + "procWatchInterval": time.Second, + }) return cfg } func NewFromString(s string, typ string) (Config, error) { cfg := New() cfg.SetConfigType(typ) - err := viper.ReadConfig(strings.NewReader(s)) + err := cfg.ReadConfig(strings.NewReader(s)) return cfg, err } @@ -72,3 +102,12 @@ func NewFromFile(filePath string) (Config, error) { err := cfg.ReadInConfig() return cfg, err } + +func (cfg Config) String() string { + m := make(map[string]interface{}) + err := cfg.Unmarshal(&m) + if err != nil { + panic(err) + } + return pretty.Sprintf("Config{%# v}", m) +} diff --git a/earthquake/cli/inspectors/util.go b/earthquake/util/orchestrator/orchestratorutil.go similarity index 74% rename from earthquake/cli/inspectors/util.go rename to earthquake/util/orchestrator/orchestratorutil.go index edda519..167bc4e 100644 --- a/earthquake/cli/inspectors/util.go +++ b/earthquake/util/orchestrator/orchestratorutil.go @@ -16,25 +16,21 @@ package inspectors import ( - "fmt" . "github.com/osrg/earthquake/earthquake/explorepolicy" . "github.com/osrg/earthquake/earthquake/orchestrator" "github.com/osrg/earthquake/earthquake/util/config" - restutil "github.com/osrg/earthquake/earthquake/util/rest" ) -var defaultOrchestratorURL = fmt.Sprintf("http://localhost:%d%s", restutil.DefaultPort, restutil.APIRoot) +const LocalOrchestratorURL = "local://" // instantiate new autopilot-mode orchestrator. // -// autopilot-mode is useful when you are not interested in non-determinism -func NewAutopilotOrchestrator(configFilePath string) (*Orchestrator, error) { - cfg, err := config.NewFromFile(configFilePath) +// autopilot-mode is useful when you do not need PB/REST RPC +func NewAutopilotOrchestrator(cfg config.Config) (*Orchestrator, error) { + policy, err := CreatePolicy(cfg.GetString("explorePolicy")) if err != nil { return nil, err } - - policy, err := CreatePolicy(cfg.GetString("explorePolicy")) policy.LoadConfig(cfg) orchestrator := NewOrchestrator(cfg, policy, false) return orchestrator, nil diff --git a/earthquake/util/rest/apiroot.go b/earthquake/util/rest/apiroot.go index 8df3917..2bf3b7b 100644 --- a/earthquake/util/rest/apiroot.go +++ b/earthquake/util/rest/apiroot.go @@ -16,4 +16,3 @@ package rest const APIRoot = "/api/v3" -const DefaultPort = 10080 diff --git a/example/etcd/3517-degrade-check/config.toml b/example/etcd/3517-degrade-check/config.toml index a306765..7004a06 100644 --- a/example/etcd/3517-degrade-check/config.toml +++ b/example/etcd/3517-degrade-check/config.toml @@ -2,6 +2,7 @@ init = "init.sh" run = "run.sh" validate = "validate.sh" clean = "clean.sh" +restPort = 10080 explorePolicy = "random" notCleanIfValidationFail= "true" diff --git a/example/etcd/3517-reproduce/config.toml b/example/etcd/3517-reproduce/config.toml index 23efb92..380b90c 100644 --- a/example/etcd/3517-reproduce/config.toml +++ b/example/etcd/3517-reproduce/config.toml @@ -2,6 +2,7 @@ init = "init.sh" run = "run.sh" validate = "validate.sh" clean = "clean.sh" +restPort = 10080 explorePolicy = "random" notCleanIfValidationFail= "true" diff --git a/example/template/config.toml b/example/template/config.toml index 4dc1928..ce035f6 100644 --- a/example/template/config.toml +++ b/example/template/config.toml @@ -3,6 +3,8 @@ run = "run.sh" # validate = "validate.sh" # clean = "clean.sh" +restPort = 10080 + explorePolicy = "random" [explorePolicyParam] diff --git a/example/template/config_mypolicy.toml b/example/template/config_mypolicy.toml index d98aa2c..f774791 100644 --- a/example/template/config_mypolicy.toml +++ b/example/template/config_mypolicy.toml @@ -1,2 +1,3 @@ run = "run.sh" +restPort = 10080 explorePolicy = "mypolicy" diff --git a/example/template/mypolicy.go b/example/template/mypolicy.go index 6fe0170..09048d3 100644 --- a/example/template/mypolicy.go +++ b/example/template/mypolicy.go @@ -47,11 +47,13 @@ func (p *MyPolicy) QueueNextEvent(event signal.Event) { // - JavaFunctionEvent // - PacketEvent // - FilesystemEvent + // - ProcSetEvent (Linux procfs) // - LogEvent fmt.Printf("Event: %s\n", event) // You can also inject fault actions // - PacketFaultAction // - FilesystemFaultAction + // - ProcSetSchedAction // - ShellAction action, err := event.DefaultAction() if err != nil { diff --git a/example/zk-found-2212.nfqhook/config.toml b/example/zk-found-2212.nfqhook/config.toml index f6557dc..266961c 100644 --- a/example/zk-found-2212.nfqhook/config.toml +++ b/example/zk-found-2212.nfqhook/config.toml @@ -2,6 +2,7 @@ init = "init.sh" run = "run.sh" validate = "validate.sh" clean = "clean.sh" +restPort = 10080 explorePolicy = "random" [explorePolicyParam] diff --git a/example/zk-found-2212.ryu/config.toml b/example/zk-found-2212.ryu/config.toml index 1b4cea6..c3e032f 100644 --- a/example/zk-found-2212.ryu/config.toml +++ b/example/zk-found-2212.ryu/config.toml @@ -2,6 +2,7 @@ init = "init.sh" run = "run.sh" validate = "validate.sh" clean = "clean.sh" +restPort = 10080 explorePolicy = "random" [explorePolicyParam] diff --git a/example/zk-found-2212.ryu/config_dumb.toml b/example/zk-found-2212.ryu/config_dumb.toml index 5cb1a30..6a20d3f 100644 --- a/example/zk-found-2212.ryu/config_dumb.toml +++ b/example/zk-found-2212.ryu/config_dumb.toml @@ -2,6 +2,7 @@ init = "init.sh" run = "run.sh" validate = "validate.sh" clean = "clean.sh" +restPort = 10080 explorePolicy = "dumb" # storageType = "mongodb"