Skip to content

Commit

Permalink
Merge pull request #118 from mailgun/maxim/develop
Browse files Browse the repository at this point in the history
Allow infinite retries
  • Loading branch information
horkhe authored Sep 8, 2017
2 parents 4e75001 + 069d83d commit 3a20634
Show file tree
Hide file tree
Showing 12 changed files with 284 additions and 113 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
#### Version 0.14.0 (TBD)

Implemented:
* Added HTTP API endpoints to list topics optionally with partitions and
configuration.
* Ack timeout can now be greater then subscription timeout. So in absence of
incoming consume requests for a topic, rebalancing may be delayed until
ack timeout expires for all offered messages. If a new request comes while
Expand Down
31 changes: 30 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ group inactivity on all Kafka-Pixy working with the Kafka cluster.

```
GET /topics/<topic>/consumers
GET /clusters/<topic>/topics/<topic>/consumers
GET /clusters/<cluster>/topics/<topic>/consumers
```

Returns a list of consumers that are subscribed to a topic.
Expand Down Expand Up @@ -344,6 +344,35 @@ yields:
}
```

### List Topics

```
GET /topics
GET /clusters/<cluster>/topics
```

Returns a list of topics optionally with detailed configuration and partitions.

Parameter | Opt | Description
----------------|-----|------------------------------------------------
cluster | yes | The name of a cluster to operate on. By default the cluster mentioned first in the `proxies` section of the config file is used.
withPartitions | yes | Whether a list of partitions should be returned for every topic.
withConfig | yes | Whether configuration should be returned for every topic.

### Get Topic Config

```
GET /topics/<topic>
GET /clusters/<cluster>/topics/<topic>
```

Returns topic configuration optionally with a list of partitions.

Parameter | Opt | Description
----------------|-----|------------------------------------------------
cluster | yes | The name of a cluster to operate on. By default the cluster mentioned first in the `proxies` section of the config file is used.
withPartitions | yes | Whether a list of partitions should be returned.

## Configuration

Kafa-Pixy is designed to be very simple to run. It consists of a single
Expand Down
86 changes: 50 additions & 36 deletions admin/admin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package admin

import (
"encoding/json"
"fmt"
"sort"
"strconv"
Expand Down Expand Up @@ -65,15 +66,20 @@ type PartitionMetadata struct {
ID int32
Leader int32
Replicas []int32
Isr []int32
ISR []int32
}

type TopicMetadata struct {
Topic string
Config []byte
Config *TopicConfig
Partitions []PartitionMetadata
}

type TopicConfig struct {
Version int32 `json:"version"`
Config map[string]string `json:"config"`
}

type indexedPartition struct {
index int
partition int32
Expand Down Expand Up @@ -173,15 +179,15 @@ func (a *T) getGroupOffsets(group, topic string) ([]PartitionOffset, error) {
// Fetch the last committed offsets for all partitions of the group/topic.
coordinator, err := kafkaClt.Coordinator(group)
if err != nil {
return nil, errors.Wrapf(err, "failed to get coordinator")
return nil, errors.Wrap(err, "failed to get coordinator")
}
req := sarama.OffsetFetchRequest{ConsumerGroup: group, Version: ProtocolVer1}
for _, p := range partitions {
req.AddPartition(topic, p)
}
res, err := coordinator.FetchOffset(&req)
if err != nil {
return nil, errors.Wrapf(err, "failed to fetch offsets")
return nil, errors.Wrap(err, "failed to fetch offsets")
}
for i, p := range partitions {
block := res.GetBlock(topic, p)
Expand Down Expand Up @@ -212,7 +218,7 @@ func (a *T) setGroupOffsets(group, topic string, offsets []PartitionOffset) erro
}
coordinator, err := kafkaClt.Coordinator(group)
if err != nil {
return errors.Wrapf(err, "failed to get coordinator")
return errors.Wrap(err, "failed to get coordinator")
}

req := sarama.OffsetCommitRequest{
Expand All @@ -225,7 +231,7 @@ func (a *T) setGroupOffsets(group, topic string, offsets []PartitionOffset) erro
}
res, err := coordinator.CommitOffset(&req)
if err != nil {
return errors.Wrapf(err, "failed to commit offsets")
return errors.Wrap(err, "failed to commit offsets")
}
for p, err := range res.Errors[topic] {
if err != sarama.ErrNoError {
Expand All @@ -249,7 +255,7 @@ func (a *T) GetTopicConsumers(group, topic string) (map[string][]int32, error) {
if err == zk.ErrNoNode {
return nil, ErrInvalidParam(errors.New("either group or topic is incorrect"))
}
return nil, errors.Wrapf(err, "failed to fetch partition owners data")
return nil, errors.Wrap(err, "failed to fetch partition owners data")
}

consumers := make(map[string][]int32)
Expand All @@ -261,7 +267,7 @@ func (a *T) GetTopicConsumers(group, topic string) (map[string][]int32, error) {
partitionPath := fmt.Sprintf("%s/%s", consumedPartitionsPath, partitionNode)
partitionNodeData, _, err := zkConn.Get(partitionPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to fetch partition owner")
return nil, errors.Wrap(err, "failed to fetch partition owner")
}
clientID := string(partitionNodeData)
consumers[clientID] = append(consumers[clientID], int32(partition))
Expand All @@ -285,7 +291,7 @@ func (a *T) GetAllTopicConsumers(topic string) (map[string]map[string][]int32, e
groupsPath := fmt.Sprintf("%s/consumers", a.cfg.ZooKeeper.Chroot)
groups, _, err := kzConn.Children(groupsPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to fetch consumer groups")
return nil, errors.Wrap(err, "failed to fetch consumer groups")
}

consumers := make(map[string]map[string][]int32)
Expand Down Expand Up @@ -342,7 +348,8 @@ func getOffsetResult(res *sarama.OffsetResponse, topic string, partition int32)
return block.Offsets[0], nil
}

func (a *T) GetTopicsMetadata(withPartitions, withConfig bool) ([]TopicMetadata, error) {
// ListTopics returns a list of all topics existing in the Kafka cluster.
func (a *T) ListTopics(withPartitions, withConfig bool) ([]TopicMetadata, error) {
kafkaClt, err := a.lazyKafkaClt()
if err != nil {
return nil, err
Expand All @@ -355,15 +362,28 @@ func (a *T) GetTopicsMetadata(withPartitions, withConfig bool) ([]TopicMetadata,

topicsMetadata := make([]TopicMetadata, len(topics))
for i, topic := range topics {
tm := &topicsMetadata[i]
tm.Topic = topic
if !withPartitions {
continue
tm, err := a.GetTopicMetadata(topic, withPartitions, withConfig)
if err != nil {
return nil, errors.Wrapf(err, "failed to get %s topic metadata", topic)
}
topicsMetadata[i] = tm
}
return topicsMetadata, nil
}

// GetTopicMetadata returns a topic metadata. An optional partition metadata
// can be requested and/or detailed topic configuration can be requested.
func (a *T) GetTopicMetadata(topic string, withPartitions, withConfig bool) (TopicMetadata, error) {
kafkaClt, err := a.lazyKafkaClt()
if err != nil {
return TopicMetadata{}, errors.Wrap(err, "failed to connect to Kafka")
}

tm := TopicMetadata{Topic: topic}
if withPartitions {
partitions, err := kafkaClt.Partitions(topic)
if err != nil {
return nil, errors.Wrap(err, "failed to get partitions")
return TopicMetadata{}, errors.Wrap(err, "failed to get partitions")
}

tm.Partitions = make([]PartitionMetadata, len(partitions))
Expand All @@ -375,44 +395,38 @@ func (a *T) GetTopicsMetadata(withPartitions, withConfig bool) ([]TopicMetadata,
if err == sarama.ErrLeaderNotAvailable {
pm.Leader = -1
} else if err != nil {
return nil, errors.Wrap(err, "failed to get leader")
return TopicMetadata{}, errors.Wrap(err, "failed to get leader")
}
pm.Leader = leader.ID()

isr, err := kafkaClt.InSyncReplicas(topic, partition)
if err != nil {
return nil, errors.Wrap(err, "failed to get ISR")
return TopicMetadata{}, errors.Wrap(err, "failed to get ISR")
}
pm.Isr = isr
pm.ISR = isr

replicas, err := kafkaClt.Replicas(topic, partition)
if err != nil {
return nil, errors.Wrap(err, "failed to get replicas")
return TopicMetadata{}, errors.Wrap(err, "failed to get replicas")
}
pm.Replicas = replicas
}
}

if withConfig {
kzConn, err := a.lazyZKConn()
if err != nil {
return nil, errors.Wrap(err, "failed to connect to zookeeper")
return TopicMetadata{}, errors.Wrap(err, "failed to connect to zookeeper")
}
for i := range topicsMetadata {
t := &topicsMetadata[i]
cfgPath := fmt.Sprintf("%s/config/topics/%s", a.cfg.ZooKeeper.Chroot, t.Topic)
cfg, _, err := kzConn.Get(cfgPath)
if err != nil {
return nil, errors.Wrapf(err, "failed to fetch topic configuration")
}
t.Config = cfg
cfgPath := fmt.Sprintf("%s/config/topics/%s", a.cfg.ZooKeeper.Chroot, topic)
cfg, _, err := kzConn.Get(cfgPath)
if err != nil {
return TopicMetadata{}, errors.Wrap(err, "failed to fetch topic configuration")
}
topicConfig := TopicConfig{}
if err = json.Unmarshal(cfg, &topicConfig); err != nil {
return TopicMetadata{}, errors.Wrap(err, "bad config")
}
tm.Config = &topicConfig
}
return topicsMetadata, nil
return tm, nil
}

type int32Slice []int32

func (p int32Slice) Len() int { return len(p) }
func (p int32Slice) Less(i, j int) bool { return p[i] < p[j] }
func (p int32Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
17 changes: 10 additions & 7 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,13 @@ type Proxy struct {
// errors, until some of the pending messages are acknowledged.
MaxPendingMessages int `yaml:"max_pending_messages"`

// The maximum number of times a message can be offered to a consumer.
// If a message was offered that many times and no acknowledgment has
// been received, then it is considered to be acknowledged and will
// never be offered again.
// The maximum number of retries Kafka-Pixy will make to offer an
// unack message. Messages that exceeded the number of retries are
// discarded by Kafka-Pixy and acknowledged in Kafka. Zero retries
// means that messages will be offered just once.
//
// If you want Kafka-Pixy to retry indefinitely, then set this
// parameter to -1.
MaxRetries int `yaml:"max_retries"`

// How frequently to commit offsets to Kafka.
Expand Down Expand Up @@ -367,8 +370,8 @@ func (p *Proxy) validate() error {
return errors.New("consumer.long_polling_timeout must be > 0")
case p.Consumer.MaxPendingMessages <= 0:
return errors.New("consumer.max_pending_messages must be > 0")
case p.Consumer.MaxRetries <= 0:
return errors.New("consumer.max_retries must be > 0")
case p.Consumer.MaxRetries < -1:
return errors.New("consumer.max_retries must be >= -1")
case p.Consumer.OffsetsCommitInterval <= 0:
return errors.New("consumer.offsets_commit_interval must be > 0")
case p.Consumer.OffsetsCommitTimeout <= 0:
Expand Down Expand Up @@ -422,7 +425,7 @@ func defaultProxyWithClientID(clientID string) *Proxy {
c.Consumer.FetchMaxWait = 250 * time.Millisecond
c.Consumer.LongPollingTimeout = 3 * time.Second
c.Consumer.MaxPendingMessages = 300
c.Consumer.MaxRetries = 3
c.Consumer.MaxRetries = -1
c.Consumer.OffsetsCommitInterval = 500 * time.Millisecond
c.Consumer.OffsetsCommitTimeout = 1500 * time.Millisecond
c.Consumer.RebalanceDelay = 250 * time.Millisecond
Expand Down
4 changes: 2 additions & 2 deletions consumer/offsettrk/offsettrk.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ func (ot *T) OnAcked(offset int64) (offsetmgr.Offset, int) {
offerRemoved := ot.removeOffer(offset)
ackedRangesUpdated := ot.updateAckedRanges(offset)
if !offerRemoved || !ackedRangesUpdated {
ot.actDesc.Log().Errorf("Bad ack: offerMissing=%t, duplicateAck=%t",
!offerRemoved, !ackedRangesUpdated)
ot.actDesc.Log().Errorf("Bad ack: offset=%d, offerMissing=%t, duplicateAck=%t",
offset, !offerRemoved, !ackedRangesUpdated)
}
if ackedRangesUpdated {
ot.offset.Meta = encodeAckedRanges(ot.offset.Val, ot.ackedRanges)
Expand Down
2 changes: 1 addition & 1 deletion consumer/partitioncsm/partitioncsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ func (pc *T) runFetchLoop() bool {
// returned or there are no more messages to be retried.
func (pc *T) nextRetry() (consumer.Message, bool) {
msg, retryNo, ok := pc.offsetTrk.NextRetry()
for ok && retryNo > pc.cfg.Consumer.MaxRetries {
for ok && pc.cfg.Consumer.MaxRetries >= 0 && retryNo > pc.cfg.Consumer.MaxRetries {
pc.actDesc.Log().Errorf("Too many retries: retryNo=%d, offset=%d, key=%s, msg=%s",
retryNo, msg.Offset, string(msg.Key), base64.StdEncoding.EncodeToString(msg.Value))
pc.submittedOffset, _ = pc.offsetTrk.OnAcked(msg.Offset)
Expand Down
66 changes: 66 additions & 0 deletions consumer/partitioncsm/partitioncsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,72 @@ func (s *PartitionCsmSuite) TestSparseAckedAfterStop(c *C) {
c.Assert(offsettrk.SparseAcks2Str(offsetsAfter[partition]), Equals, "1-3,4-7")
}

// If the max retries limit set to 0, then messages are offered only once.
func (s *PartitionCsmSuite) TestZeroRetries(c *C) {
offsetsBefore := s.kh.GetOldestOffsets(topic)
s.cfg.Consumer.AckTimeout = 100 * time.Millisecond
s.cfg.Consumer.MaxRetries = 0
s.kh.SetOffsets(group, topic, []offsetmgr.Offset{{Val: sarama.OffsetOldest}})

pc := Spawn(s.ns, group, topic, partition, s.cfg, s.groupMember, s.msgFetcherF, s.offsetMgrF)

msg0 := <-pc.Messages()
log.Infof("*** First: offset=%v", msg0.Offset)
sendEvOffered(msg0)

// Wait for the retry timeout to expire.
time.Sleep(200 * time.Millisecond)

// When/Then: maxRetries has been reached, only new messages are returned.
for i := 0; i < 5; i++ {
msg := <-pc.Messages()
log.Infof("*** Next: offset=%v", msg.Offset)
c.Assert(msg.Offset, Equals, msg0.Offset+int64(1+i), Commentf("i=%d", i))
sendEvOffered(msg)
sendEvAcked(msg)
}

pc.Stop()
offsetsAfter := s.kh.GetCommittedOffsets(group, topic)
c.Assert(offsetsAfter[partition].Val, Equals, offsetsBefore[partition]+6)
c.Assert(offsettrk.SparseAcks2Str(offsetsAfter[partition]), Equals, "")
}

// If the max retries limit is set to -1 then retries never stop.
func (s *PartitionCsmSuite) TestIndefiniteRetries(c *C) {
offsetsBefore := s.kh.GetOldestOffsets(topic)
s.cfg.Consumer.AckTimeout = 100 * time.Millisecond
s.cfg.Consumer.MaxRetries = -1
s.kh.SetOffsets(group, topic, []offsetmgr.Offset{{Val: sarama.OffsetOldest}})

pc := Spawn(s.ns, group, topic, partition, s.cfg, s.groupMember, s.msgFetcherF, s.offsetMgrF)

msg0 := <-pc.Messages()
sendEvOffered(msg0)

// The logic is so that the even when an offer is expired, at first a
// freshly fetched message is offered, and then retries follow.
base := offsetsBefore[partition] + int64(1)
for i := 0; i < 5; i++ {
time.Sleep(100 * time.Millisecond)
// Newly fetched message is acknowledged...
msgI := <-pc.Messages()
c.Assert(msgI.Offset, Equals, base+int64(i))
sendEvOffered(msgI)
sendEvAcked(msgI)
// ...but retried messages are not.
msg0_i := <-pc.Messages()
c.Assert(msg0_i, DeepEquals, msg0, Commentf(
"got: %d, want: %d", msg0_i.Offset, msg0.Offset))
sendEvOffered(msg0)
}

pc.Stop()
offsetsAfter := s.kh.GetCommittedOffsets(group, topic)
c.Assert(offsetsAfter[partition].Val, Equals, offsetsBefore[partition])
c.Assert(offsettrk.SparseAcks2Str(offsetsAfter[partition]), Equals, "1-6")
}

// If the max retries limit is reached for a message that results in
// termination of the partition consumer. Note that offset is properly
// committed to reflect sparsely acknowledged regions.
Expand Down
Loading

0 comments on commit 3a20634

Please sign in to comment.