Skip to content

Commit

Permalink
Client ID hack
Browse files Browse the repository at this point in the history
  • Loading branch information
brunodebus committed Sep 30, 2024
1 parent 71d0761 commit 41e319a
Show file tree
Hide file tree
Showing 14 changed files with 49,366 additions and 11 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ require (
github.com/spf13/cobra v1.6.1
github.com/spf13/viper v1.0.2
github.com/stretchr/testify v1.8.1
github.com/twmb/franz-go v1.17.1
github.com/twmb/franz-go/pkg/kmsg v1.8.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76
golang.org/x/net v0.23.0
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,10 @@ github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1F
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/twmb/franz-go v1.17.1 h1:0LwPsbbJeJ9R91DPUHSEd4su82WJWcTY1Zzbgbg4CeQ=
github.com/twmb/franz-go v1.17.1/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0=
Expand Down
13 changes: 12 additions & 1 deletion proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,18 @@ func (c *Client) handleConn(conn Conn) {
}
c.conns.Add(conn.BrokerAddress, conn.LocalConnection)
localDesc := "local connection on " + conn.LocalConnection.LocalAddr().String() + " from " + conn.LocalConnection.RemoteAddr().String() + " (" + conn.BrokerAddress + ")"
copyThenClose(c.processorConfig, server, conn.LocalConnection, conn.BrokerAddress, conn.BrokerAddress, localDesc)

var id *string

tlsConn, ok := conn.LocalConnection.(*tls.Conn)
if ok {
if len(tlsConn.ConnectionState().PeerCertificates) > 0 {
commonName := tlsConn.ConnectionState().PeerCertificates[0].Subject.CommonName
id = &commonName
}
}

copyThenClose(c.processorConfig, server, conn.LocalConnection, conn.BrokerAddress, id, conn.BrokerAddress, localDesc)
if err := c.conns.Remove(conn.BrokerAddress, conn.LocalConnection); err != nil {
logrus.Info(err)
}
Expand Down
7 changes: 4 additions & 3 deletions proxy/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import (
"bytes"
"errors"
"fmt"
"github.com/sirupsen/logrus"
"io"
"net"
"sync"
"time"

"github.com/sirupsen/logrus"
)

type DeadlineReadWriteCloser interface {
Expand Down Expand Up @@ -111,9 +112,9 @@ func copyError(readDesc, writeDesc string, readErr bool, err error) {
logrus.Infof("%v had error: %s", desc, err.Error())
}

func copyThenClose(cfg ProcessorConfig, remote, local DeadlineReadWriteCloser, brokerAddress string, remoteDesc, localDesc string) {
func copyThenClose(cfg ProcessorConfig, remote, local DeadlineReadWriteCloser, brokerAddress string, id *string, remoteDesc, localDesc string) {

processor := newProcessor(cfg, brokerAddress)
processor := newProcessor(cfg, brokerAddress, id)

firstErr := make(chan error, 1)

Expand Down
12 changes: 10 additions & 2 deletions proxy/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package proxy

import (
"errors"
"time"

"github.com/grepplabs/kafka-proxy/config"
"github.com/grepplabs/kafka-proxy/proxy/protocol"
"time"
)

const (
Expand All @@ -17,6 +18,7 @@ const (
minOpenRequests = 16

apiKeyProduce = int16(0)
apiKeyFetch = int16(1)
apiKeySaslHandshake = int16(17)
apiKeyApiApiVersions = int16(18)

Expand Down Expand Up @@ -61,11 +63,14 @@ type processor struct {
forbiddenApiKeys map[int16]struct{}
// metrics
brokerAddress string

clientID *string

// producer will never send request with acks=0
producerAcks0Disabled bool
}

func newProcessor(cfg ProcessorConfig, brokerAddress string) *processor {
func newProcessor(cfg ProcessorConfig, brokerAddress string, id *string) *processor {
maxOpenRequests := cfg.MaxOpenRequests
if maxOpenRequests < minOpenRequests {
maxOpenRequests = minOpenRequests
Expand Down Expand Up @@ -103,6 +108,7 @@ func newProcessor(cfg ProcessorConfig, brokerAddress string) *processor {
readTimeout: readTimeout,
writeTimeout: writeTimeout,
brokerAddress: brokerAddress,
clientID: id,
localSasl: cfg.LocalSasl,
authServer: cfg.AuthServer,
forbiddenApiKeys: cfg.ForbiddenApiKeys,
Expand All @@ -125,6 +131,7 @@ func (p *processor) RequestsLoop(dst DeadlineWriter, src DeadlineReaderWriter) (
nextResponseHandlerChannel: p.nextResponseHandlerChannel,
timeout: p.writeTimeout,
brokerAddress: p.brokerAddress,
clientID: p.clientID,
forbiddenApiKeys: p.forbiddenApiKeys,
buf: make([]byte, p.requestBufferSize),
localSasl: p.localSasl,
Expand All @@ -142,6 +149,7 @@ type RequestsLoopContext struct {

timeout time.Duration
brokerAddress string
clientID *string
forbiddenApiKeys map[int16]struct{}
buf []byte // bufSize

Expand Down
86 changes: 81 additions & 5 deletions proxy/processor_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,14 @@ import (
"bytes"
"errors"
"fmt"
"github.com/grepplabs/kafka-proxy/proxy/protocol"
"github.com/sirupsen/logrus"
"io"
"strconv"
"time"

"github.com/grepplabs/kafka-proxy/proxy/protocol"
"github.com/sirupsen/logrus"
"github.com/twmb/franz-go/pkg/kbin"
"github.com/twmb/franz-go/pkg/kmsg"
)

type DefaultRequestHandler struct {
Expand Down Expand Up @@ -85,9 +88,82 @@ func (handler *DefaultRequestHandler) handleRequest(dst DeadlineWriter, src Dead
}
}

mustReply, readBytes, err := handler.mustReply(requestKeyVersion, src, ctx)
if err != nil {
return true, err
mustReply := true
var readBytes []byte

switch requestKeyVersion.ApiKey {
case apiKeyProduce:
request := kmsg.NewPtrProduceRequest()
request.SetVersion(requestKeyVersion.ApiVersion)

readBytes = make([]byte, requestKeyVersion.Length-4)

_, err := io.ReadFull(src, readBytes)
if err != nil {
return false,
fmt.Errorf("could not read response: %w", err)
}

reader := kbin.Reader{Src: readBytes}
reader.Uint32() // Correlation ID
reader.String() // client_id

if request.IsFlexible() {
kmsg.ReadTags(&reader)
}

err = request.ReadFrom(reader.Src)
if err != nil {
return false,
fmt.Errorf("could not read response: %w", err)
}

mustReply = request.Acks != 0

if ctx.clientID != nil {
for _, topic := range request.Topics {
if *ctx.clientID == "test1" {
if topic.Topic != "allowed" {
return true, errors.New(fmt.Sprintf("Client %s is not allowed to produce to %s", *ctx.clientID, topic.Topic))
}
}
}
}
case apiKeyFetch:
request := kmsg.NewPtrFetchRequest()
request.SetVersion(requestKeyVersion.ApiVersion)

readBytes = make([]byte, requestKeyVersion.Length-4)

_, err := io.ReadFull(src, readBytes)
if err != nil {
return false,
fmt.Errorf("could not read response: %w", err)
}

reader := kbin.Reader{Src: readBytes}
reader.Uint32() // Correlation ID
reader.String() // client_id

if request.IsFlexible() {
kmsg.ReadTags(&reader)
}

err = request.ReadFrom(reader.Src)
if err != nil {
return false,
fmt.Errorf("could not read response: %w", err)
}

if ctx.clientID != nil {
for _, topic := range request.Topics {
if *ctx.clientID == "test2" {
if topic.Topic != "allowed" {
return true, errors.New(fmt.Sprintf("Client %s is not allowed to produce to %s", *ctx.clientID, topic.Topic))
}
}
}
}
}

// send inFlightRequest to channel before myCopyN to prevent race condition in proxyResponses
Expand Down
24 changes: 24 additions & 0 deletions vendor/github.com/twmb/franz-go/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 41e319a

Please sign in to comment.