Skip to content

Commit

Permalink
Introduce Reset peer toxic.
Browse files Browse the repository at this point in the history
  • Loading branch information
chaosbox authored and miry committed Oct 17, 2021
1 parent 2f31679 commit d457170
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 1 deletion.
23 changes: 22 additions & 1 deletion link.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package toxiproxy

import (
"io"
"net"

"github.com/sirupsen/logrus"

"github.com/Shopify/toxiproxy/v2/stream"
"github.com/Shopify/toxiproxy/v2/toxics"
"github.com/sirupsen/logrus"
)

// ToxicLinks are single direction pipelines that connects an input and output via
Expand Down Expand Up @@ -62,6 +64,7 @@ func NewToxicLink(

// Start the link with the specified toxics.
func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser) {

go func() {
bytes, err := io.Copy(link.input, source)
if err != nil {
Expand All @@ -77,6 +80,24 @@ func (link *ToxicLink) Start(name string, source io.Reader, dest io.WriteCloser)
if stateful, ok := toxic.Toxic.(toxics.StatefulToxic); ok {
link.stubs[i].State = stateful.NewState()
}
if _, ok := toxic.Toxic.(*toxics.ResetToxic); ok {
if err := source.(*net.TCPConn).SetLinger(0); err != nil {
logrus.WithFields(logrus.Fields{
"name": link.proxy.Name,
"toxic": toxic.Type,
"err": err,
}).Error("source: Unable to setLinger(ms)")

}
if err := dest.(*net.TCPConn).SetLinger(0); err != nil {
logrus.WithFields(logrus.Fields{
"name": link.proxy.Name,
"toxic": toxic.Type,
"err": err,
}).Error("dest: Unable to setLinger(ms)")

}
}

go link.stubs[i].Run(toxic)
}
Expand Down
37 changes: 37 additions & 0 deletions toxics/reset_peer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package toxics

import (
"time"
)

/*
The ResetToxic sends closes the connection abruptly after a timeout (in ms). The behaviour of Close is set to discard any unsent/unacknowledged data by setting SetLinger to 0,
~= sets TCP RST flag and resets the connection.
If the timeout is set to 0, then the connection will be reset immediately.
Drop data since it will initiate a graceful close by sending the FIN/ACK. (io.EOF)
*/

type ResetToxic struct {
// Timeout in milliseconds
Timeout int64 `json:"timeout"`
}

func (t *ResetToxic) Pipe(stub *ToxicStub) {
timeout := time.Duration(t.Timeout) * time.Millisecond

for {
select {
case <-stub.Interrupt:
return
case <-stub.Input:
<-time.After(timeout)
stub.Close()
return
}
}
}

func init() {
Register("reset_peer", new(ResetToxic))
}
83 changes: 83 additions & 0 deletions toxics/reset_peer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package toxics_test

import (
"bufio"
"github.com/Shopify/toxiproxy/toxics"
"io"
"net"
"os"
"syscall"
"testing"
"time"
)

const msg = "reset toxic payload\n"

func TestResetToxicNoTimeout(t *testing.T) {
resetTCPHelper(t, ToxicToJson(t, "resettcp", "reset_peer", "upstream", &toxics.ResetToxic{}))
}

func TestResetToxicWithTimeout(t *testing.T) {
start := time.Now()
resetToxic := toxics.ResetToxic{Timeout: 100}
resetTCPHelper(t, ToxicToJson(t, "resettcp", "reset_peer", "upstream", &resetToxic))
AssertDeltaTime(t, "Reset after timeout", time.Since(start), time.Duration(resetToxic.Timeout)*time.Millisecond, time.Duration(resetToxic.Timeout+10)*time.Millisecond)
}

func TestResetToxicWithTimeoutDownstream(t *testing.T) {
start := time.Now()
resetToxic := toxics.ResetToxic{Timeout: 100}
resetTCPHelper(t, ToxicToJson(t, "resettcp", "reset_peer", "downstream", &resetToxic))
AssertDeltaTime(t, "Reset after timeout", time.Since(start), time.Duration(resetToxic.Timeout)*time.Millisecond, time.Duration(resetToxic.Timeout+10)*time.Millisecond)

}

func checkConnectionState(t *testing.T, listenAddress string) {
conn, err := net.Dial("tcp", listenAddress)
if err != nil {
t.Error("Unable to dial TCP server", err)
}
if _, err := conn.Write([]byte(msg)); err != nil {
t.Error("Failed writing TCP payload", err)
}
tmp := make([]byte, 1000)
_, err = conn.Read(tmp)
defer conn.Close()
if opErr, ok := err.(*net.OpError); ok {
syscallErr, _ := opErr.Err.(*os.SyscallError)
if !(syscallErr.Err == syscall.ECONNRESET) {
t.Error("Expected: connection reset by peer. Got:", err)
}
} else {
t.Error("Expected: connection reset by peer. Got:", err, "conn:", conn.RemoteAddr(), conn.LocalAddr())
}
_, err = conn.Read(tmp)
if err != io.EOF {
t.Error("expected EOF from closed connection")
}
}

func resetTCPHelper(t *testing.T, toxicJSON io.Reader) {
ln, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatal("Failed to create TCP server", err)
}
defer ln.Close()
proxy := NewTestProxy("test", ln.Addr().String())
proxy.Start()
proxy.Toxics.AddToxicJson(toxicJSON)
defer proxy.Stop()

go func() {
conn, err := ln.Accept()
if err != nil {
t.Error("Unable to accept TCP connection", err)
}
defer ln.Close()
scan := bufio.NewScanner(conn)
if scan.Scan() {
conn.Write([]byte(msg))
}
}()
checkConnectionState(t, proxy.Listen)
}

0 comments on commit d457170

Please sign in to comment.