Skip to content

Commit

Permalink
Merge pull request #61 from mailgun/maxim/develop
Browse files Browse the repository at this point in the history
Massive refactoring continues
  • Loading branch information
horkhe committed May 3, 2016
2 parents f4e42bf + 94cb810 commit 5cda4e9
Show file tree
Hide file tree
Showing 26 changed files with 1,464 additions and 1,323 deletions.
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
# Changelog

#### Version TBD (TBD)
#### Version 0.11.0 (TBD)

* [#56](https://github.com/mailgun/kafka-pixy/issues/56) Invalid stored offset makes consumer panic.
* [#59](https://github.com/mailgun/kafka-pixy/issues/59) Messages are skipped by consumer during rebalancing.
* [#62](https://github.com/mailgun/kafka-pixy/issues/62) Messages consumed twice during rebalancing.

#### Version 0.10.1 (2015-12-21)

Expand Down
21 changes: 10 additions & 11 deletions apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/mailgun/kafka-pixy/actor"
"github.com/mailgun/kafka-pixy/admin"
"github.com/mailgun/kafka-pixy/consumer"
"github.com/mailgun/kafka-pixy/consumer/consumermsg"
"github.com/mailgun/kafka-pixy/prettyfmt"
"github.com/mailgun/kafka-pixy/producer"
"github.com/mailgun/log"
Expand Down Expand Up @@ -45,16 +44,16 @@ type T struct {
addr string
listener net.Listener
httpServer *manners.GracefulServer
producer *producer.T
consumer *consumer.T
prod *producer.T
cons consumer.T
admin *admin.T
errorCh chan error
}

// New creates an HTTP server instance that will accept API requests at the
// specified `network`/`address` and execute them with the specified `producer`,
// `consumer`, or `admin`, depending on the request type.
func New(network, addr string, producer *producer.T, consumer *consumer.T, admin *admin.T) (*T, error) {
func New(network, addr string, prod *producer.T, cons consumer.T, admin *admin.T) (*T, error) {
// Start listening on the specified network/address.
listener, err := net.Listen(network, addr)
if err != nil {
Expand All @@ -74,8 +73,8 @@ func New(network, addr string, producer *producer.T, consumer *consumer.T, admin
addr: addr,
listener: manners.NewListener(listener),
httpServer: httpServer,
producer: producer,
consumer: consumer,
prod: prod,
cons: cons,
admin: admin,
errorCh: make(chan error, 1),
}
Expand Down Expand Up @@ -154,12 +153,12 @@ func (as *T) handleProduce(w http.ResponseWriter, r *http.Request) {

// Asynchronously submit the message to the Kafka cluster.
if !isSync {
as.producer.AsyncProduce(topic, toEncoderPreservingNil(key), sarama.StringEncoder(message))
as.prod.AsyncProduce(topic, toEncoderPreservingNil(key), sarama.StringEncoder(message))
respondWithJSON(w, http.StatusOK, EmptyResponse)
return
}

prodMsg, err := as.producer.Produce(topic, toEncoderPreservingNil(key), sarama.StringEncoder(message))
prodMsg, err := as.prod.Produce(topic, toEncoderPreservingNil(key), sarama.StringEncoder(message))
if err != nil {
var status int
switch err {
Expand Down Expand Up @@ -189,13 +188,13 @@ func (as *T) handleConsume(w http.ResponseWriter, r *http.Request) {
return
}

consMsg, err := as.consumer.Consume(group, topic)
consMsg, err := as.cons.Consume(group, topic)
if err != nil {
var status int
switch err.(type) {
case consumermsg.ErrRequestTimeout:
case consumer.ErrRequestTimeout:
status = http.StatusRequestTimeout
case consumermsg.ErrBufferOverflow:
case consumer.ErrBufferOverflow:
status = 429 // StatusTooManyRequests
default:
status = http.StatusInternalServerError
Expand Down
7 changes: 7 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ type T struct {
// A consumer should wait this long after it gets notification that a
// consumer joined/left its consumer group before it should rebalance.
RebalanceDelay time.Duration
// How frequently to commit updated offsets. Defaults to 0.5s.
OffsetsCommitInterval time.Duration
// If enabled, any errors that occurred while consuming are returned on
// the Errors channel (default disabled).
ReturnErrors bool
}
}

Expand All @@ -76,6 +81,8 @@ func Default() *T {
config.Consumer.RegistrationTimeout = 20 * time.Second
config.Consumer.BackOffTimeout = 500 * time.Millisecond
config.Consumer.RebalanceDelay = 250 * time.Millisecond
config.Consumer.OffsetsCommitInterval = 500 * time.Millisecond
config.Consumer.ReturnErrors = false

return config
}
Expand Down
36 changes: 36 additions & 0 deletions consumer/consumer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package consumer

type T interface {
// Consume consumes a message from the specified topic on behalf of the
// specified consumer group. If there are no more new messages in the topic
// at the time of the request then it will block for
// `Config.Consumer.LongPollingTimeout`. If no new message is produced during
// that time, then `ErrRequestTimeout` is returned.
//
// Note that during state transitions topic subscribe<->unsubscribe and
// consumer group register<->deregister the method may return either
// `ErrBufferOverflow` or `ErrRequestTimeout` even when there are messages
// available for consumption. In that case the user should back off a bit
// and then repeat the request.
Consume(group, topic string) (*Message, error)

// Stop sends a shutdown signal to all internal goroutines and blocks until
// they are stopped. It is guaranteed that all last consumed offsets of all
// consumer groups/topics are committed to Kafka before Consumer stops.
Stop()
}

// Message encapsulates a Kafka message returned by the consumer.
type Message struct {
Key, Value []byte
Topic string
Partition int32
Offset int64
HighWaterMark int64
}

type (
ErrSetup error
ErrBufferOverflow error
ErrRequestTimeout error
)
108 changes: 108 additions & 0 deletions consumer/consumerimpl/consumerimpl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package consumerimpl

import (
"fmt"
"time"

"github.com/Shopify/sarama"
"github.com/mailgun/kafka-pixy/actor"
"github.com/mailgun/kafka-pixy/config"
"github.com/mailgun/kafka-pixy/consumer"
"github.com/mailgun/kafka-pixy/consumer/dispatcher"
"github.com/mailgun/kafka-pixy/consumer/groupcsm"
"github.com/mailgun/kafka-pixy/consumer/offsetmgr"
"github.com/wvanbergen/kazoo-go"
)

// T is a Kafka consumer implementation that automatically maintains consumer
// groups registrations and topic subscriptions. Whenever a message from a
// particular topic is consumed by a particular consumer group T checks if it
// has registered with the consumer group, and registers otherwise. Then it
// checks if it has subscribed for the topic, and subscribes otherwise. Later
// if a particular topic has not been consumed for
// `Config.Consumer.RegistrationTimeout` period of time, the consumer
// unsubscribes from the topic, likewise if a consumer group has not seen any
// requests for that period then the consumer deregisters from the group.
//
// implements `consumer.T`.
// implements `dispatcher.Factory`.
type t struct {
namespace *actor.ID
cfg *config.T
dispatcher *dispatcher.T
kafkaClient sarama.Client
kazooConn *kazoo.Kazoo
offsetMgrFactory offsetmgr.Factory
}

// Spawn creates a consumer instance with the specified configuration and
// starts all its goroutines.
func Spawn(namespace *actor.ID, cfg *config.T) (*t, error) {
saramaCfg := sarama.NewConfig()
saramaCfg.ClientID = cfg.ClientID
saramaCfg.ChannelBufferSize = cfg.Consumer.ChannelBufferSize
saramaCfg.Consumer.Offsets.CommitInterval = 50 * time.Millisecond
saramaCfg.Consumer.Retry.Backoff = cfg.Consumer.BackOffTimeout
saramaCfg.Consumer.Fetch.Default = 1024 * 1024

namespace = namespace.NewChild("cons")

kafkaClient, err := sarama.NewClient(cfg.Kafka.SeedPeers, saramaCfg)
if err != nil {
return nil, consumer.ErrSetup(fmt.Errorf("failed to create sarama.Client: err=(%v)", err))
}
offsetMgrFactory := offsetmgr.SpawnFactory(namespace, cfg, kafkaClient)

kazooCfg := kazoo.NewConfig()
kazooCfg.Chroot = cfg.ZooKeeper.Chroot
// ZooKeeper documentation says following about the session timeout: "The
// current (ZooKeeper) implementation requires that the timeout be a
// minimum of 2 times the tickTime (as set in the server configuration) and
// a maximum of 20 times the tickTime". The default tickTime is 2 seconds.
// See http://zookeeper.apache.org/doc/trunk/zookeeperProgrammers.html#ch_zkSessions
kazooCfg.Timeout = 15 * time.Second

kazooConn, err := kazoo.NewKazoo(cfg.ZooKeeper.SeedPeers, kazooCfg)
if err != nil {
return nil, consumer.ErrSetup(fmt.Errorf("failed to create kazoo.Kazoo: err=(%v)", err))
}

c := &t{
namespace: namespace,
cfg: cfg,
kafkaClient: kafkaClient,
offsetMgrFactory: offsetMgrFactory,
kazooConn: kazooConn,
}
c.dispatcher = dispatcher.New(c.namespace, c, c.cfg)
c.dispatcher.Start()
return c, nil
}

// implements `consumer.T`
func (sc *t) Consume(group, topic string) (*consumer.Message, error) {
replyCh := make(chan dispatcher.Response, 1)
sc.dispatcher.Requests() <- dispatcher.Request{time.Now().UTC(), group, topic, replyCh}
result := <-replyCh
return result.Msg, result.Err
}

// implements `consumer.T`
func (sc *t) Stop() {
sc.dispatcher.Stop()
}

// implements `dispatcher.Factory`.
func (sc *t) KeyOf(req dispatcher.Request) string {
return req.Group
}

// implements `dispatcher.Factory`.
func (sc *t) NewTier(key string) dispatcher.Tier {
return groupcsm.New(sc.namespace, key, sc.cfg, sc.kafkaClient, sc.kazooConn, sc.offsetMgrFactory)
}

// String returns a string ID of this instance to be used in logs.
func (sc *t) String() string {
return sc.namespace.String()
}
Loading

0 comments on commit 5cda4e9

Please sign in to comment.