diff --git a/CHANGELOG.md b/CHANGELOG.md index b3516ee5..476b58f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ * Show uniq request id in API HTTP response. (#425, @miry) * Add method to parse `stream.Direction` from string. Allow to convert `stream.Direction` to string. (#430, @miry) +* Add posibility to write to Output with deadline. + On interrupting badnwidth toxic use non blocking write. (#436, @miry) # [2.4.0] - 2022-03-07 diff --git a/Makefile b/Makefile index 9d228dbd..970cdbbd 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ OS := $(shell uname -s) +ARCH := $(shell uname -m) GO_VERSION := $(shell go version | cut -f3 -d" ") GO_MINOR_VERSION := $(shell echo $(GO_VERSION) | cut -f2 -d.) GO_PATCH_VERSION := $(shell echo $(GO_VERSION) | cut -f3 -d. | sed "s/^\s*$$/0/") @@ -13,8 +14,9 @@ test: $(MALLOC_ENV) go test -v -race -timeout 1m ./... .PHONY: test-e2e -test-e2e: build +test-e2e: build container.build scripts/test-e2e + timeout -v --foreground 20m scripts/test-e2e-hazelcast toxiproxy .PHONY: test-release test-release: test bench test-e2e release-dry @@ -45,6 +47,13 @@ build: dist clean go build -ldflags="-s -w" -o ./dist/toxiproxy-server ./cmd/server go build -ldflags="-s -w" -o ./dist/toxiproxy-cli ./cmd/cli +.PHONY: container.build +container.build: + env GOOS=linux CGO_ENABLED=0 go build -ldflags="-s -w" -o ./dist/toxiproxy-server-linux-$(ARCH) ./cmd/server + env GOOS=linux CGO_ENABLED=0 go build -ldflags="-s -w" -o ./dist/toxiproxy-cli-linux-$(ARCH) ./cmd/cli + docker build -f Dockerfile -t toxiproxy dist + docker run --rm toxiproxy --version + .PHONY: release release: goreleaser release --rm-dist diff --git a/api.go b/api.go index abe48ba8..c9c4cbed 100644 --- a/api.go +++ b/api.go @@ -27,7 +27,7 @@ func stopBrowsersMiddleware(next http.Handler) http.Handler { } func timeoutMiddleware(next http.Handler) http.Handler { - return http.TimeoutHandler(next, 30*time.Second, "") + return http.TimeoutHandler(next, 25*time.Second, "") } type ApiServer struct { @@ -121,7 +121,7 @@ func (server *ApiServer) Listen(host string, port string) { srv := &http.Server{ Handler: r, Addr: net.JoinHostPort(host, port), - WriteTimeout: 10 * time.Second, + WriteTimeout: 30 * time.Second, ReadTimeout: 10 * time.Second, } diff --git a/link.go b/link.go index 24c417de..03ae9d56 100644 --- a/link.go +++ b/link.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "time" "github.com/rs/zerolog" @@ -73,7 +74,10 @@ func (link *ToxicLink) Start( dest io.WriteCloser, ) { logger := link.Logger - logger.Debug().Msg("Setup connection") + logger. + Debug(). + Str("direction", link.Direction()). + Msg("Setup connection") labels := []string{ link.Direction(), @@ -133,23 +137,33 @@ func (link *ToxicLink) read( func (link *ToxicLink) write( metricLabels []string, name string, - server *ApiServer, + server *ApiServer, // TODO: Replace with AppConfig for Metrics and Logger dest io.WriteCloser, ) { - logger := link.Logger + logger := link.Logger. + With(). + Str("component", "ToxicLink"). + Str("method", "write"). + Str("link", name). + Str("proxy", link.proxy.Name). + Str("link_addr", fmt.Sprintf("%p", link)). + Logger() + bytes, err := io.Copy(dest, link.output) if err != nil { logger.Warn(). Int64("bytes", bytes). Err(err). - Msg("Destination terminated") - } - if server.Metrics.proxyMetricsEnabled() { + Msg("Could not write to destination") + } else if server.Metrics.proxyMetricsEnabled() { server.Metrics.ProxyMetrics.SentBytesTotal. WithLabelValues(metricLabels...).Add(float64(bytes)) } + dest.Close() + logger.Trace().Msgf("Remove link %s from ToxicCollection", name) link.toxics.RemoveLink(name) + logger.Trace().Msgf("RemoveConnection %s from Proxy %s", name, link.proxy.Name) link.proxy.RemoveConnection(name) } @@ -211,11 +225,11 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp } } - log.Trace().Msg("Interrupt the previous toxic to update its output") + log.Trace().Msg("Interrupting the previous toxic to update its output") stop := make(chan bool) - go func() { - stop <- link.stubs[toxic_index-1].InterruptToxic() - }() + go func(stub *toxics.ToxicStub, stop chan bool) { + stop <- stub.InterruptToxic() + }(link.stubs[toxic_index-1], stop) // Unblock the previous toxic if it is trying to flush // If the previous toxic is closed, continue flusing until we reach the end. @@ -231,9 +245,14 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp if !stopped { <-stop } - return + return // TODO: There are some steps after this to clean buffer + } + + err := link.stubs[toxic_index].WriteOutput(tmp, 5*time.Second) + if err != nil { + log.Err(err). + Msg("Could not write last packets after interrupt to Output") } - link.stubs[toxic_index].Output <- tmp } } @@ -244,7 +263,11 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp link.stubs[toxic_index].Close() return } - link.stubs[toxic_index].Output <- tmp + err := link.stubs[toxic_index].WriteOutput(tmp, 5*time.Second) + if err != nil { + log.Err(err). + Msg("Could not write last packets after interrupt to Output") + } } link.stubs[toxic_index-1].Output = link.stubs[toxic_index].Output diff --git a/link_test.go b/link_test.go index beade186..55379899 100644 --- a/link_test.go +++ b/link_test.go @@ -247,6 +247,7 @@ func TestStateCreated(t *testing.T) { if flag.Lookup("test.v").DefValue == "true" { log = zerolog.New(os.Stdout).With().Caller().Timestamp().Logger() } + link := NewToxicLink(nil, collection, stream.Downstream, log) go link.stubs[0].Run(collection.chain[stream.Downstream][0]) collection.links["test"] = link @@ -261,3 +262,64 @@ func TestStateCreated(t *testing.T) { t.Fatalf("New toxic did not have state object created.") } } + +func TestRemoveToxicWithBrokenConnection(t *testing.T) { + ctx := context.Background() + + log := zerolog.Nop() + if flag.Lookup("test.v").DefValue == "true" { + log = zerolog.New(os.Stdout).With().Caller().Timestamp().Logger() + } + ctx = log.WithContext(ctx) + + collection := NewToxicCollection(nil) + link := NewToxicLink(nil, collection, stream.Downstream, log) + go link.stubs[0].Run(collection.chain[stream.Downstream][0]) + collection.links["test"] = link + + toxics := [2]*toxics.ToxicWrapper{ + { + Toxic: &toxics.BandwidthToxic{ + Rate: 0, + }, + Type: "bandwidth", + Direction: stream.Downstream, + Toxicity: 1, + }, + { + Toxic: &toxics.BandwidthToxic{ + Rate: 0, + }, + Type: "bandwidth", + Direction: stream.Upstream, + Toxicity: 1, + }, + } + + collection.chainAddToxic(toxics[0]) + collection.chainAddToxic(toxics[1]) + + done := make(chan struct{}) + defer close(done) + + var data uint16 = 42 + go func(log zerolog.Logger) { + for { + select { + case <-done: + link.input.Close() + return + case <-time.After(10 * time.Second): + log.Print("Finish load") + return + default: + buf := make([]byte, 2) + binary.BigEndian.PutUint16(buf, data) + link.input.Write(buf) + } + } + }(log) + + collection.chainRemoveToxic(ctx, toxics[0]) + collection.chainRemoveToxic(ctx, toxics[1]) +} diff --git a/scripts/hazelcast.xml b/scripts/hazelcast.xml new file mode 100644 index 00000000..40b60f47 --- /dev/null +++ b/scripts/hazelcast.xml @@ -0,0 +1,31 @@ + + + + + + 15 + 20 + false + deadline + 3 + 10 + + + + member-proxy:${proxyPort} + 5701 + + + + + member-proxy:${proxyPort0} + member-proxy:${proxyPort1} + member-proxy:${proxyPort2} + + + + + diff --git a/scripts/test-e2e b/scripts/test-e2e index c710a0e9..42dafba5 100755 --- a/scripts/test-e2e +++ b/scripts/test-e2e @@ -28,6 +28,10 @@ function cleanup() { } trap "cleanup" EXIT SIGINT SIGTERM +echo "= Toxiproxy E2E tests" +echo +echo "== Setup" +echo echo "=== Starting Web service" pkill -15 "toxiproxy-server" || true @@ -56,7 +60,6 @@ cli toggle shopify_http echo -e "-----------------\n" echo "== Benchmarking" - echo echo "=== Without toxics" diff --git a/scripts/test-e2e-hazelcast b/scripts/test-e2e-hazelcast new file mode 100755 index 00000000..27f17726 --- /dev/null +++ b/scripts/test-e2e-hazelcast @@ -0,0 +1,142 @@ +#!/bin/bash + +# Usage: +# test-e2e-hazelcast [docker image name for toxiproxy] + +set -ueo pipefail + +cd "$(dirname "$0")" + +toxiproxy="../dist/toxiproxy-cli" +state="started" + +cli() { + ../dist/toxiproxy-cli "$@" 2>&1 | sed -e 's/^/[client] /' +} + +wait_for_url() { + curl -s --retry-connrefused --retry 5 --retry-delay 2 --retry-max-time 30 \ + --max-time 1 -L -I -X GET "${1}" +} + +# Stop all background jobs on exit +function cleanup() { + echo -e "\n\n== Teardown: state=${state}" + if [[ $state != "success" ]]; then + docker kill -s SIGQUIT member-proxy + docker logs -t member-proxy + fi + docker stop member-proxy member0 member1 member2 &> /dev/null || true + docker network rm toxiproxy-e2e &> /dev/null || true +} +trap "cleanup" EXIT SIGINT SIGTERM + +LATEST_TAG=$(git describe --tags --abbrev=0) +IMAGE_HAZELCAST="hazelcast/hazelcast:5.1.2-slim" +IMAGE_TOXIPROXY="${1:-ghcr.io/shopify/toxiproxy:${LATEST_TAG:1}}" +TOXIPROXY_BASE_URL="http://localhost:8474" + +echo "= Toxiproxy E2E tests with Hazelcast cluster" +echo +echo "== Setup" +echo +echo "=== Starting Toxiproxy" + +docker rm -f member-proxy member0 member1 member2 &>/dev/null +docker network rm toxiproxy-e2e &>/dev/null || true + +docker network create --subnet 172.18.5.0/24 toxiproxy-e2e + +docker run --rm -t "${IMAGE_TOXIPROXY}" --version +docker run -d \ + --name member-proxy \ + --network toxiproxy-e2e \ + --ip 172.18.5.2 \ + -p 8474:8474 \ + -e LOG_LEVEL=trace \ + "$IMAGE_TOXIPROXY" + +echo "=== Wait Toxiproxy API is available" +wait_for_url "${TOXIPROXY_BASE_URL}/version" + +echo "=== Prepare proxies for Hazelcast cluster" +for i in {0..2}; do + echo "> Create proxy for member${i} on port 600${i}" + # curl --data "{\"name\": \"member${i}\", \"upstream\": \"member${i}:5701\", \"listen\": \"0.0.0.0:600${i}\"}" "${TOXIPROXY_BASE_URL}/proxies" + cli create -l "0.0.0.0:600${i}" -u "member${i}:5701" "member${i}" + echo +done + +echo +echo "=== Strating Hazelcast containers" +for i in {0..2}; do + echo "> Start Hazelcast on host member${i}" + docker run -d --rm \ + --name "member${i}" \ + --network toxiproxy-e2e \ + --ip "172.18.5.1${i}" \ + --volume "${PWD}/hazelcast.xml:/opt/hazelcast/config/hazelcast-docker.xml" \ + --env HZ_PHONE_HOME_ENABLED=false \ + --env JAVA_OPTS="-DproxyPort=600${i} -DproxyPort0=6000 -DproxyPort1=6001 -DproxyPort2=6002" \ + "$IMAGE_HAZELCAST" +done + +echo "> Wait for cluster join (30s)..." +sleep 30 + +echo "> Output of member0" +docker logs -t -n 10 member0 + +echo +echo "=== Initialize toxics for cluster" +for i in {0..2}; do + echo "> Adding toxics to member${i} proxy" + # curl --data "{\"name\": \"member${i}_downstream\", \"stream\": \"downstream\", \"toxicity\": 1.0, \"type\": \"bandwidth\", \"attributes\": { \"rate\": 0 }}" "${TOXIPROXY_BASE_URL}/proxies/member${i}/toxics" + cli toxic add --type=bandwidth \ + --downstream \ + --toxicName="member${i}_downstream" \ + --attribute="rate=0" \ + --toxicity=1 \ + "member${i}" + # curl --data "{\"name\": \"member${i}_upstream\", \"stream\": \"upstream\", \"toxicity\": 1.0, \"type\": \"bandwidth\", \"attributes\": { \"rate\": 0 }}" "${TOXIPROXY_BASE_URL}/proxies/member${i}/toxics" + cli toxic add --type=bandwidth \ + --upstream \ + --toxicName="member${i}_upstream" \ + --attribute="rate=0" \ + --toxicity=1 \ + "member${i}" + echo + cli inspect "member${i}" + echo +done + +echo "=== Wait for a the Hazelcast cluster split-brain (60s)..." +sleep 60 + +echo "=== Validate output of Toxiproxy and single member" +docker logs -t -n 10 member0 +docker logs -t -n 10 member-proxy + +echo "=== Removing toxics from proxies" +for i in {0..2}; do + echo "[$(date)] > Remove downstream bandwith Toxic for member${i} proxy" + # curl -v -X DELETE "${TOXIPROXY_BASE_URL}/proxies/member${i}/toxics/member${i}_downstream" + cli toxic delete --toxicName="member${i}_downstream" "member${i}" + echo "[$(date)] > Remove ustream bandwith Toxic for member${i} proxy" + # curl -v -X DELETE "${TOXIPROXY_BASE_URL}/proxies/member${i}/toxics/member${i}_upstream" + cli toxic delete --toxicName="member${i}_upstream" "member${i}" +done + +echo "=== Validate output of Toxiproxy and single member after removing toxics" +docker logs -t -n 10 member0 +docker logs -t -n 10 member-proxy + +cli list +cli inspect member0 +cli inspect member1 +cli inspect member2 + +echo -e "=================\n" + +echo "Succcess!" +state="success" diff --git a/toxics/bandwidth.go b/toxics/bandwidth.go index ede9fc31..2dd1e881 100644 --- a/toxics/bandwidth.go +++ b/toxics/bandwidth.go @@ -50,7 +50,11 @@ func (t *BandwidthToxic) Pipe(stub *ToxicStub) { sleep -= 100 * time.Millisecond case <-stub.Interrupt: logger.Trace().Msg("BandwidthToxic was interrupted during writing data") - stub.Output <- p // Don't drop any data on the floor + err := stub.WriteOutput(p, 5*time.Second) // Don't drop any data on the floor + if err != nil { + logger.Warn().Err(err). + Msg("Could not write last packets after interrupt to Output") + } return } } @@ -62,7 +66,11 @@ func (t *BandwidthToxic) Pipe(stub *ToxicStub) { stub.Output <- p case <-stub.Interrupt: logger.Trace().Msg("BandwidthToxic was interrupted during writing data") - stub.Output <- p // Don't drop any data on the floor + err := stub.WriteOutput(p, 5*time.Second) // Don't drop any data on the floor + if err != nil { + logger.Warn().Err(err). + Msg("Could not write last packets after interrupt to Output") + } return } } diff --git a/toxics/toxic.go b/toxics/toxic.go index 67338c96..058c60d9 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -1,9 +1,11 @@ package toxics import ( + "fmt" "math/rand" "reflect" "sync" + "time" "github.com/Shopify/toxiproxy/v2/stream" ) @@ -87,6 +89,22 @@ func (s *ToxicStub) Run(toxic *ToxicWrapper) { } } +// WriteOutput allows to write to Output with timeout to avoid deadlocks. +// If duration is 0, then wait until other goroutines finish reading from Output. +func (s *ToxicStub) WriteOutput(p *stream.StreamChunk, d time.Duration) error { + if d == 0 { + s.Output <- p + return nil + } + + select { + case s.Output <- p: + return nil + case <-time.After(d): + return fmt.Errorf("timeout: could not write to output in %d seconds", int(d.Seconds())) + } +} + // Interrupt the flow of data so that the toxic controlling the stub can be replaced. // Returns true if the stream was successfully interrupted, or false if the stream is closed. func (s *ToxicStub) InterruptToxic() bool { diff --git a/toxics/toxic_test.go b/toxics/toxic_test.go index ef3172b7..9e6de978 100644 --- a/toxics/toxic_test.go +++ b/toxics/toxic_test.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "crypto/rand" "encoding/json" "io" "net" @@ -17,6 +18,7 @@ import ( "github.com/Shopify/toxiproxy/v2" "github.com/Shopify/toxiproxy/v2/collectors" + "github.com/Shopify/toxiproxy/v2/stream" "github.com/Shopify/toxiproxy/v2/toxics" ) @@ -351,3 +353,42 @@ func BenchmarkProxyBandwidth(b *testing.B) { b.Error("Failed to close TCP connection", err) } } + +func TestToxicStub_WriteOutput(t *testing.T) { + input := make(chan *stream.StreamChunk) + output := make(chan *stream.StreamChunk) + stub := toxics.NewToxicStub(input, output) + + buf := make([]byte, 42) + rand.Read(buf) + + t.Run("when no read in 1 second", func(t *testing.T) { + err := stub.WriteOutput(&stream.StreamChunk{Data: buf}, time.Second) + if err == nil { + t.Error("Expected to have error") + } + + expected := "timeout: could not write to output in 1 seconds" + if err.Error() != expected { + t.Errorf("Expected error: %s, got %s", expected, err) + } + }) + + t.Run("when read is available", func(t *testing.T) { + go func(t *testing.T, stub *toxics.ToxicStub, expected []byte) { + select { + case <-time.After(5 * time.Second): + t.Error("Timeout of running test to read from output.") + case chunk := <-output: + if !bytes.Equal(chunk.Data, buf) { + t.Error("Data in Output different from Write") + } + } + }(t, stub, buf) + + err := stub.WriteOutput(&stream.StreamChunk{Data: buf}, 5*time.Second) + if err != nil { + t.Errorf("Unexpected error: %+v", err) + } + }) +}