From 524143c8051a7540b598d30f29870680982ad38d Mon Sep 17 00:00:00 2001 From: Michael Nikitochkin Date: Mon, 29 Aug 2022 00:10:21 +0200 Subject: [PATCH] ToxicStub allow to unblocking write to Output It could happen when Link has no reciever and there is some packets in buffer. It produces deadlock. Dynamic test with latest version of toxiproxy --- CHANGELOG.md | 2 + Makefile | 11 ++- api.go | 4 +- link.go | 49 +++++++++---- link_test.go | 62 ++++++++++++++++ scripts/hazelcast.xml | 31 ++++++++ scripts/test-e2e | 5 +- scripts/test-e2e-hazelcast | 142 +++++++++++++++++++++++++++++++++++++ toxics/bandwidth.go | 12 +++- toxics/toxic.go | 18 +++++ toxics/toxic_test.go | 41 +++++++++++ 11 files changed, 358 insertions(+), 19 deletions(-) create mode 100644 scripts/hazelcast.xml create mode 100755 scripts/test-e2e-hazelcast diff --git a/CHANGELOG.md b/CHANGELOG.md index b3516ee5..476b58f3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ * Show uniq request id in API HTTP response. (#425, @miry) * Add method to parse `stream.Direction` from string. Allow to convert `stream.Direction` to string. (#430, @miry) +* Add posibility to write to Output with deadline. + On interrupting badnwidth toxic use non blocking write. (#436, @miry) # [2.4.0] - 2022-03-07 diff --git a/Makefile b/Makefile index 9d228dbd..970cdbbd 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ OS := $(shell uname -s) +ARCH := $(shell uname -m) GO_VERSION := $(shell go version | cut -f3 -d" ") GO_MINOR_VERSION := $(shell echo $(GO_VERSION) | cut -f2 -d.) GO_PATCH_VERSION := $(shell echo $(GO_VERSION) | cut -f3 -d. | sed "s/^\s*$$/0/") @@ -13,8 +14,9 @@ test: $(MALLOC_ENV) go test -v -race -timeout 1m ./... .PHONY: test-e2e -test-e2e: build +test-e2e: build container.build scripts/test-e2e + timeout -v --foreground 20m scripts/test-e2e-hazelcast toxiproxy .PHONY: test-release test-release: test bench test-e2e release-dry @@ -45,6 +47,13 @@ build: dist clean go build -ldflags="-s -w" -o ./dist/toxiproxy-server ./cmd/server go build -ldflags="-s -w" -o ./dist/toxiproxy-cli ./cmd/cli +.PHONY: container.build +container.build: + env GOOS=linux CGO_ENABLED=0 go build -ldflags="-s -w" -o ./dist/toxiproxy-server-linux-$(ARCH) ./cmd/server + env GOOS=linux CGO_ENABLED=0 go build -ldflags="-s -w" -o ./dist/toxiproxy-cli-linux-$(ARCH) ./cmd/cli + docker build -f Dockerfile -t toxiproxy dist + docker run --rm toxiproxy --version + .PHONY: release release: goreleaser release --rm-dist diff --git a/api.go b/api.go index abe48ba8..c9c4cbed 100644 --- a/api.go +++ b/api.go @@ -27,7 +27,7 @@ func stopBrowsersMiddleware(next http.Handler) http.Handler { } func timeoutMiddleware(next http.Handler) http.Handler { - return http.TimeoutHandler(next, 30*time.Second, "") + return http.TimeoutHandler(next, 25*time.Second, "") } type ApiServer struct { @@ -121,7 +121,7 @@ func (server *ApiServer) Listen(host string, port string) { srv := &http.Server{ Handler: r, Addr: net.JoinHostPort(host, port), - WriteTimeout: 10 * time.Second, + WriteTimeout: 30 * time.Second, ReadTimeout: 10 * time.Second, } diff --git a/link.go b/link.go index 24c417de..03ae9d56 100644 --- a/link.go +++ b/link.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "net" + "time" "github.com/rs/zerolog" @@ -73,7 +74,10 @@ func (link *ToxicLink) Start( dest io.WriteCloser, ) { logger := link.Logger - logger.Debug().Msg("Setup connection") + logger. + Debug(). + Str("direction", link.Direction()). + Msg("Setup connection") labels := []string{ link.Direction(), @@ -133,23 +137,33 @@ func (link *ToxicLink) read( func (link *ToxicLink) write( metricLabels []string, name string, - server *ApiServer, + server *ApiServer, // TODO: Replace with AppConfig for Metrics and Logger dest io.WriteCloser, ) { - logger := link.Logger + logger := link.Logger. + With(). + Str("component", "ToxicLink"). + Str("method", "write"). + Str("link", name). + Str("proxy", link.proxy.Name). + Str("link_addr", fmt.Sprintf("%p", link)). + Logger() + bytes, err := io.Copy(dest, link.output) if err != nil { logger.Warn(). Int64("bytes", bytes). Err(err). - Msg("Destination terminated") - } - if server.Metrics.proxyMetricsEnabled() { + Msg("Could not write to destination") + } else if server.Metrics.proxyMetricsEnabled() { server.Metrics.ProxyMetrics.SentBytesTotal. WithLabelValues(metricLabels...).Add(float64(bytes)) } + dest.Close() + logger.Trace().Msgf("Remove link %s from ToxicCollection", name) link.toxics.RemoveLink(name) + logger.Trace().Msgf("RemoveConnection %s from Proxy %s", name, link.proxy.Name) link.proxy.RemoveConnection(name) } @@ -211,11 +225,11 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp } } - log.Trace().Msg("Interrupt the previous toxic to update its output") + log.Trace().Msg("Interrupting the previous toxic to update its output") stop := make(chan bool) - go func() { - stop <- link.stubs[toxic_index-1].InterruptToxic() - }() + go func(stub *toxics.ToxicStub, stop chan bool) { + stop <- stub.InterruptToxic() + }(link.stubs[toxic_index-1], stop) // Unblock the previous toxic if it is trying to flush // If the previous toxic is closed, continue flusing until we reach the end. @@ -231,9 +245,14 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp if !stopped { <-stop } - return + return // TODO: There are some steps after this to clean buffer + } + + err := link.stubs[toxic_index].WriteOutput(tmp, 5*time.Second) + if err != nil { + log.Err(err). + Msg("Could not write last packets after interrupt to Output") } - link.stubs[toxic_index].Output <- tmp } } @@ -244,7 +263,11 @@ func (link *ToxicLink) RemoveToxic(ctx context.Context, toxic *toxics.ToxicWrapp link.stubs[toxic_index].Close() return } - link.stubs[toxic_index].Output <- tmp + err := link.stubs[toxic_index].WriteOutput(tmp, 5*time.Second) + if err != nil { + log.Err(err). + Msg("Could not write last packets after interrupt to Output") + } } link.stubs[toxic_index-1].Output = link.stubs[toxic_index].Output diff --git a/link_test.go b/link_test.go index beade186..55379899 100644 --- a/link_test.go +++ b/link_test.go @@ -247,6 +247,7 @@ func TestStateCreated(t *testing.T) { if flag.Lookup("test.v").DefValue == "true" { log = zerolog.New(os.Stdout).With().Caller().Timestamp().Logger() } + link := NewToxicLink(nil, collection, stream.Downstream, log) go link.stubs[0].Run(collection.chain[stream.Downstream][0]) collection.links["test"] = link @@ -261,3 +262,64 @@ func TestStateCreated(t *testing.T) { t.Fatalf("New toxic did not have state object created.") } } + +func TestRemoveToxicWithBrokenConnection(t *testing.T) { + ctx := context.Background() + + log := zerolog.Nop() + if flag.Lookup("test.v").DefValue == "true" { + log = zerolog.New(os.Stdout).With().Caller().Timestamp().Logger() + } + ctx = log.WithContext(ctx) + + collection := NewToxicCollection(nil) + link := NewToxicLink(nil, collection, stream.Downstream, log) + go link.stubs[0].Run(collection.chain[stream.Downstream][0]) + collection.links["test"] = link + + toxics := [2]*toxics.ToxicWrapper{ + { + Toxic: &toxics.BandwidthToxic{ + Rate: 0, + }, + Type: "bandwidth", + Direction: stream.Downstream, + Toxicity: 1, + }, + { + Toxic: &toxics.BandwidthToxic{ + Rate: 0, + }, + Type: "bandwidth", + Direction: stream.Upstream, + Toxicity: 1, + }, + } + + collection.chainAddToxic(toxics[0]) + collection.chainAddToxic(toxics[1]) + + done := make(chan struct{}) + defer close(done) + + var data uint16 = 42 + go func(log zerolog.Logger) { + for { + select { + case <-done: + link.input.Close() + return + case <-time.After(10 * time.Second): + log.Print("Finish load") + return + default: + buf := make([]byte, 2) + binary.BigEndian.PutUint16(buf, data) + link.input.Write(buf) + } + } + }(log) + + collection.chainRemoveToxic(ctx, toxics[0]) + collection.chainRemoveToxic(ctx, toxics[1]) +} diff --git a/scripts/hazelcast.xml b/scripts/hazelcast.xml new file mode 100644 index 00000000..40b60f47 --- /dev/null +++ b/scripts/hazelcast.xml @@ -0,0 +1,31 @@ + + + + + + 15 + 20 + false + deadline + 3 + 10 + + + + member-proxy:${proxyPort} + 5701 + + + + + member-proxy:${proxyPort0} + member-proxy:${proxyPort1} + member-proxy:${proxyPort2} + + + + + diff --git a/scripts/test-e2e b/scripts/test-e2e index c710a0e9..42dafba5 100755 --- a/scripts/test-e2e +++ b/scripts/test-e2e @@ -28,6 +28,10 @@ function cleanup() { } trap "cleanup" EXIT SIGINT SIGTERM +echo "= Toxiproxy E2E tests" +echo +echo "== Setup" +echo echo "=== Starting Web service" pkill -15 "toxiproxy-server" || true @@ -56,7 +60,6 @@ cli toggle shopify_http echo -e "-----------------\n" echo "== Benchmarking" - echo echo "=== Without toxics" diff --git a/scripts/test-e2e-hazelcast b/scripts/test-e2e-hazelcast new file mode 100755 index 00000000..27f17726 --- /dev/null +++ b/scripts/test-e2e-hazelcast @@ -0,0 +1,142 @@ +#!/bin/bash + +# Usage: +# test-e2e-hazelcast [docker image name for toxiproxy] + +set -ueo pipefail + +cd "$(dirname "$0")" + +toxiproxy="../dist/toxiproxy-cli" +state="started" + +cli() { + ../dist/toxiproxy-cli "$@" 2>&1 | sed -e 's/^/[client] /' +} + +wait_for_url() { + curl -s --retry-connrefused --retry 5 --retry-delay 2 --retry-max-time 30 \ + --max-time 1 -L -I -X GET "${1}" +} + +# Stop all background jobs on exit +function cleanup() { + echo -e "\n\n== Teardown: state=${state}" + if [[ $state != "success" ]]; then + docker kill -s SIGQUIT member-proxy + docker logs -t member-proxy + fi + docker stop member-proxy member0 member1 member2 &> /dev/null || true + docker network rm toxiproxy-e2e &> /dev/null || true +} +trap "cleanup" EXIT SIGINT SIGTERM + +LATEST_TAG=$(git describe --tags --abbrev=0) +IMAGE_HAZELCAST="hazelcast/hazelcast:5.1.2-slim" +IMAGE_TOXIPROXY="${1:-ghcr.io/shopify/toxiproxy:${LATEST_TAG:1}}" +TOXIPROXY_BASE_URL="http://localhost:8474" + +echo "= Toxiproxy E2E tests with Hazelcast cluster" +echo +echo "== Setup" +echo +echo "=== Starting Toxiproxy" + +docker rm -f member-proxy member0 member1 member2 &>/dev/null +docker network rm toxiproxy-e2e &>/dev/null || true + +docker network create --subnet 172.18.5.0/24 toxiproxy-e2e + +docker run --rm -t "${IMAGE_TOXIPROXY}" --version +docker run -d \ + --name member-proxy \ + --network toxiproxy-e2e \ + --ip 172.18.5.2 \ + -p 8474:8474 \ + -e LOG_LEVEL=trace \ + "$IMAGE_TOXIPROXY" + +echo "=== Wait Toxiproxy API is available" +wait_for_url "${TOXIPROXY_BASE_URL}/version" + +echo "=== Prepare proxies for Hazelcast cluster" +for i in {0..2}; do + echo "> Create proxy for member${i} on port 600${i}" + # curl --data "{\"name\": \"member${i}\", \"upstream\": \"member${i}:5701\", \"listen\": \"0.0.0.0:600${i}\"}" "${TOXIPROXY_BASE_URL}/proxies" + cli create -l "0.0.0.0:600${i}" -u "member${i}:5701" "member${i}" + echo +done + +echo +echo "=== Strating Hazelcast containers" +for i in {0..2}; do + echo "> Start Hazelcast on host member${i}" + docker run -d --rm \ + --name "member${i}" \ + --network toxiproxy-e2e \ + --ip "172.18.5.1${i}" \ + --volume "${PWD}/hazelcast.xml:/opt/hazelcast/config/hazelcast-docker.xml" \ + --env HZ_PHONE_HOME_ENABLED=false \ + --env JAVA_OPTS="-DproxyPort=600${i} -DproxyPort0=6000 -DproxyPort1=6001 -DproxyPort2=6002" \ + "$IMAGE_HAZELCAST" +done + +echo "> Wait for cluster join (30s)..." +sleep 30 + +echo "> Output of member0" +docker logs -t -n 10 member0 + +echo +echo "=== Initialize toxics for cluster" +for i in {0..2}; do + echo "> Adding toxics to member${i} proxy" + # curl --data "{\"name\": \"member${i}_downstream\", \"stream\": \"downstream\", \"toxicity\": 1.0, \"type\": \"bandwidth\", \"attributes\": { \"rate\": 0 }}" "${TOXIPROXY_BASE_URL}/proxies/member${i}/toxics" + cli toxic add --type=bandwidth \ + --downstream \ + --toxicName="member${i}_downstream" \ + --attribute="rate=0" \ + --toxicity=1 \ + "member${i}" + # curl --data "{\"name\": \"member${i}_upstream\", \"stream\": \"upstream\", \"toxicity\": 1.0, \"type\": \"bandwidth\", \"attributes\": { \"rate\": 0 }}" "${TOXIPROXY_BASE_URL}/proxies/member${i}/toxics" + cli toxic add --type=bandwidth \ + --upstream \ + --toxicName="member${i}_upstream" \ + --attribute="rate=0" \ + --toxicity=1 \ + "member${i}" + echo + cli inspect "member${i}" + echo +done + +echo "=== Wait for a the Hazelcast cluster split-brain (60s)..." +sleep 60 + +echo "=== Validate output of Toxiproxy and single member" +docker logs -t -n 10 member0 +docker logs -t -n 10 member-proxy + +echo "=== Removing toxics from proxies" +for i in {0..2}; do + echo "[$(date)] > Remove downstream bandwith Toxic for member${i} proxy" + # curl -v -X DELETE "${TOXIPROXY_BASE_URL}/proxies/member${i}/toxics/member${i}_downstream" + cli toxic delete --toxicName="member${i}_downstream" "member${i}" + echo "[$(date)] > Remove ustream bandwith Toxic for member${i} proxy" + # curl -v -X DELETE "${TOXIPROXY_BASE_URL}/proxies/member${i}/toxics/member${i}_upstream" + cli toxic delete --toxicName="member${i}_upstream" "member${i}" +done + +echo "=== Validate output of Toxiproxy and single member after removing toxics" +docker logs -t -n 10 member0 +docker logs -t -n 10 member-proxy + +cli list +cli inspect member0 +cli inspect member1 +cli inspect member2 + +echo -e "=================\n" + +echo "Succcess!" +state="success" diff --git a/toxics/bandwidth.go b/toxics/bandwidth.go index ede9fc31..2dd1e881 100644 --- a/toxics/bandwidth.go +++ b/toxics/bandwidth.go @@ -50,7 +50,11 @@ func (t *BandwidthToxic) Pipe(stub *ToxicStub) { sleep -= 100 * time.Millisecond case <-stub.Interrupt: logger.Trace().Msg("BandwidthToxic was interrupted during writing data") - stub.Output <- p // Don't drop any data on the floor + err := stub.WriteOutput(p, 5*time.Second) // Don't drop any data on the floor + if err != nil { + logger.Warn().Err(err). + Msg("Could not write last packets after interrupt to Output") + } return } } @@ -62,7 +66,11 @@ func (t *BandwidthToxic) Pipe(stub *ToxicStub) { stub.Output <- p case <-stub.Interrupt: logger.Trace().Msg("BandwidthToxic was interrupted during writing data") - stub.Output <- p // Don't drop any data on the floor + err := stub.WriteOutput(p, 5*time.Second) // Don't drop any data on the floor + if err != nil { + logger.Warn().Err(err). + Msg("Could not write last packets after interrupt to Output") + } return } } diff --git a/toxics/toxic.go b/toxics/toxic.go index 67338c96..058c60d9 100644 --- a/toxics/toxic.go +++ b/toxics/toxic.go @@ -1,9 +1,11 @@ package toxics import ( + "fmt" "math/rand" "reflect" "sync" + "time" "github.com/Shopify/toxiproxy/v2/stream" ) @@ -87,6 +89,22 @@ func (s *ToxicStub) Run(toxic *ToxicWrapper) { } } +// WriteOutput allows to write to Output with timeout to avoid deadlocks. +// If duration is 0, then wait until other goroutines finish reading from Output. +func (s *ToxicStub) WriteOutput(p *stream.StreamChunk, d time.Duration) error { + if d == 0 { + s.Output <- p + return nil + } + + select { + case s.Output <- p: + return nil + case <-time.After(d): + return fmt.Errorf("timeout: could not write to output in %d seconds", int(d.Seconds())) + } +} + // Interrupt the flow of data so that the toxic controlling the stub can be replaced. // Returns true if the stream was successfully interrupted, or false if the stream is closed. func (s *ToxicStub) InterruptToxic() bool { diff --git a/toxics/toxic_test.go b/toxics/toxic_test.go index ef3172b7..9e6de978 100644 --- a/toxics/toxic_test.go +++ b/toxics/toxic_test.go @@ -4,6 +4,7 @@ import ( "bufio" "bytes" "context" + "crypto/rand" "encoding/json" "io" "net" @@ -17,6 +18,7 @@ import ( "github.com/Shopify/toxiproxy/v2" "github.com/Shopify/toxiproxy/v2/collectors" + "github.com/Shopify/toxiproxy/v2/stream" "github.com/Shopify/toxiproxy/v2/toxics" ) @@ -351,3 +353,42 @@ func BenchmarkProxyBandwidth(b *testing.B) { b.Error("Failed to close TCP connection", err) } } + +func TestToxicStub_WriteOutput(t *testing.T) { + input := make(chan *stream.StreamChunk) + output := make(chan *stream.StreamChunk) + stub := toxics.NewToxicStub(input, output) + + buf := make([]byte, 42) + rand.Read(buf) + + t.Run("when no read in 1 second", func(t *testing.T) { + err := stub.WriteOutput(&stream.StreamChunk{Data: buf}, time.Second) + if err == nil { + t.Error("Expected to have error") + } + + expected := "timeout: could not write to output in 1 seconds" + if err.Error() != expected { + t.Errorf("Expected error: %s, got %s", expected, err) + } + }) + + t.Run("when read is available", func(t *testing.T) { + go func(t *testing.T, stub *toxics.ToxicStub, expected []byte) { + select { + case <-time.After(5 * time.Second): + t.Error("Timeout of running test to read from output.") + case chunk := <-output: + if !bytes.Equal(chunk.Data, buf) { + t.Error("Data in Output different from Write") + } + } + }(t, stub, buf) + + err := stub.WriteOutput(&stream.StreamChunk{Data: buf}, 5*time.Second) + if err != nil { + t.Errorf("Unexpected error: %+v", err) + } + }) +}